一、前言

前一段时间在公司写了一个链路追踪的服务,其中 SpringMVC 做为门面对外提供服务,微服务之间采用 Dubbo 接口调用。对于 Dubbo 接口之间传递链路信息,采用 RpcContext 将需要的参数透传过去。然而在使用 RpcContext 时遇到了几个问题导致 RpcContext 未按我设想的方式传递。

二、RpcContext 介绍

RpcContext 本质上是一个使用 ThreadLocal 实现的临时状态记录器,当接收到 RPC 请求,或发起 RPC 请求时,RpcContext 的状态都会变化。
比如:A 调 B,B 再调 C,则 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。

RpcContext 使用 ThreadLocal 的部分源码如下:

public class RpcContext {
 
    /**
     * 存放主体内容
     */
    private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
 
    /**
     * 获取RpcContext信息
     */
    public static RpcContext getContext() {
        return (RpcContext)LOCAL.get();
    }
 
    /**
     * 清空RpcContext信息
     */
    public static void removeContext() {
            LOCAL.remove();
        }
}

注意:不同 Dubbo 版本的 RpcContext 略有区别,本质上都是使用的 ThreadLocal。

二、RpcContext 的使用

//服务提供方使用,获取参数
RpcContext.getContext().getAttachments()
//服务器消费方使用,设置参数
RpcContext.getContext().setAttachment()

1> 消费端(DubboConsumer):

// 远程调用之前,通过attachment传KV给提供方
RpcContext.getContext().setAttachment("userKey", "userValue");
// 远程调用
xxxService.xxx();
// 此时 RpcContext 的状态已变化
RpcContext.getContext();

2> 服务端(DubboProvider):

public class XxxServiceImpl implements XxxService {
 
    public void xxx() {
        // 通过RpcContext获取用户传参,这里会返回userValue  
        String value = RpcContext.getContext().getAttachment("userKey");  
        // 本端是否为提供端,这里会返回true  
        boolean isProviderSide = RpcContext.getContext().isProviderSide();  
        // 获取调用方IP地址  
        String clientIP = RpcContext.getContext().getRemoteHost();  
        // 获取当前服务配置信息,所有配置信息都将转换为URL的参数  
        String application = RpcContext.getContext().getUrl().getParameter("application");   
    }
}

三、结合 Filter 使用 RpcContext

一般修改 RpcContext 信息都是在 Dubbo 的拦截器中,这样有两个好处:

  1. 统一入口设置参数,方便维护。
  2. 解决一次完整请求调用涉及多次嵌套 RPC 调用时获取不到上下文中设置的参数值问题。

例如:

1> 在 DubboConsumerFilter 中设置 RpcContext 信息:

@Slf4j
@Activate(group = {CommonConstants.CONSUMER})
public class DubboConsumerFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 
        try {
            // 服务端从dubbo上下文中取出rpcContext信息
            String jsonStr = null;
            if (!CollectionUtils.isEmpty(RpcContext.getContext().getObjectAttachments())) {
                jsonStr = RpcContext.getContext().getAttachment(HttpHeaderKeys.APP_NAME.getKey());
            }
            // 获取不到rpcContext信息,则手动塞入
            if (StringUtils.isEmpty(jsonStr)) {
                RpcContext.getContext().setAttachment("userKey", "saint");
            }
        } catch (Exception e){
            log.error("Exception in process DubboConsumerFilter" ,e);
            // do nothing
        }
 
        return invoker.invoke(invocation);
 
    }
}

2> 在 DubboProviderFilter 中获取 RpcContext 信息:

@Slf4j
@Activate(group = {CommonConstants.PROVIDER})
public class DubboProviderFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 
        // 服务端从dubbo上下文中取出traceContext信息
        String jsonStr = null;
        if (!CollectionUtils.isEmpty(RpcContext.getContext().getObjectAttachments())) {
            jsonStr = RpcContext.getContext().getAttachment(HttpHeaderKeys.APP_NAME.getKey());
            log.info("DubboProviderFilter get dubbo RpcContext is : {}", jsonStr);
        }
        return invoker.invoke(invocation);
 
    }
}

四、使用 RpcContext 的坑

1、一个 dubbo 接口调用多个 dubbo 接口,RpcContext 会改变

一个 dubbo 接口同步调用多个 dubbo 接口(比如 Dubbo 接口 B 和 Dubbo 接口 C)时,在调用 Dubbo 接口 C 时,RPCContext 已经发生改变了,需要重新获取调用链路信息。

原因分析见后面的 RpcContext原理

参考解决方案

可以采用 ThreadLocal 保存,然后在 ProviderFilter 中清除 ThreadLocal,防止数据错乱,但是会造成一定的内存泄漏(数据量较小是可以接收的)。

0> 用于存储调用链路信息的 ThreadLocal:

import lombok.Data;
import lombok.experimental.Accessors;
 
import java.io.Serializable;
 
/**
 * 链路信息上下文
 *
 * @author Saint
 */
public class RpcTraceContext implements Serializable {
 
    private final static ThreadLocal<TraceContext> traceContextHolder = new ThreadLocal<>();
 
 
    public static ThreadLocal<TraceContext> get() {
        return traceContextHolder;
    }
 
    /**
     * 设置traceContext
     *
     * @param traceContext traceContext
     */
    public static void setTraceContext(TraceContext traceContext) {
        traceContextHolder.set(traceContext);
    }
 
    /**
     * 获取traceContext
     *
     * @return traceContext
     */
    public static TraceContext getTraceContext() {
        return traceContextHolder.get();
    }
 
    /**
     * 清空trace上下文
     */
    public static void clear() {
        traceContextHolder.remove();
    }
 
    @Data
    @Accessors(chain = true)
    public static class TraceContext implements Serializable {
        private Long userId;
        private String traceId;
        private String controllerAction;
        private String visitIp;
        private String appName;
    }
}

1> DubboProviderFilter:

@Slf4j
@Activate(group = {CommonConstants.CONSUMER})
public class DubboConsumerFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        
        try {
            //服务端从dubbo上下文中取出traceContext信息
            String jsonStr = null;
            if (!CollectionUtils.isEmpty(RpcContext.getContext().getAttachments())) {
                jsonStr = RpcContext.getContext().getAttachment("TRACE_CONTEXT");
            }
            if (StringUtils.isNotEmpty(jsonStr)) {
                // 这里是为了解决duboo接口调用多个dubbo接口,第一个dubbo接口之后的dubbo接口获取到的RpcContext为空的问题。
                RpcTraceContext.TraceContext traceContext = JSON.parseObject(jsonStr, RpcTraceContext.TraceContext.class);
                RpcTraceContext.setTraceContext(traceContext);
            }
        } catch (Exception e){
            log.error("Exception in process DubboConsumerFilter" ,e);
        }
 
        return invoker.invoke(invocation);
    }
}

2> DubboProviderFilter:

@Slf4j
@Activate(group = {CommonConstants.PROVIDER})
public class DubboProviderFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 
        RpcTraceContext.clear();
 
        // 服务端从dubbo上下文中取出traceContext信息
        String jsonStr = null;
        if (!CollectionUtils.isEmpty(RpcContext.getContext().getAttachments())) {
            jsonStr = RpcContext.getContext().getAttachment("TRACE_CONTEXT");
        }
        return invoker.invoke(invocation);
 
    }
}

2、异步调用的两个坑

1)异步调用依赖传递性

问题表现:

  • 如果 consumer-A 异步调用 provider-B,而 provider-B 本身又调用了 provider-C。当 provider-B 调用 provider-C 时,会变成异步。

问题原因:

  • 是否异步调用取决于 RpcContext 中 async 的值,其次才是服务本身的配置。
  • 当 A 调用 B 时,会把 async=true 传给 B 的 RpcContext;B 调用 C 时,虽然服务本身 async=false,但 RpcContext 中 async=true,自然也就成了异步调用。

2)异步回调返回 null

问题表现:

  • consumer-A 调用 provider-B,而 provider-B 本身又调用了 provider-C。consumer-A 调用 provider-B 返回 null。

问题原因:

  • 异步调用直接返回空的 RpcResult,需要后序通过 RpcContext.getContext().getFuture() .get()获取返回值。
  • async 透传到 provider-B 端之后,也是异步调用 provider-C,但是直接返回空的 RpcResult 给 consumer-A。

3)解决方案

不让 async 参数应用到 provider 端。需要修改 ContextFilter 源码,重写 RpcContext 时删除 async 参数;

五、RpcContext 原理

首先 RpcContext 内部有一个 ThreadLocal 变量(高版本用的 InternalThreadLocal 本质上也是 ThreadLocal),它是作为 ThreadLocalMap 的 key,表明每个线程有一个 RpcContext。

其次 Dubbo 内嵌了两 Filter,分别为:ContextFilter、ConsumerContextFilter,分别用来拦截 Dubbo 服务提供者和消费者。

1、ConsumerContextFilter

消费端在执行 Rpc 调用之前,经过 Filter 处理, 会将一些信息(比如:服务调用信息)写入 RpcContext。

2、ContextFilter

服务端在执行调用之前,也会经过 Filter 处理,将信息写入 RpcContext;最后清空 RpcContext。