11.源码分析—SOFARPC数据透传是实现的?

SOFARPC源码解析系列:

1. 源码分析—SOFARPC可扩展的机制SPI

2. 源码分析—SOFARPC客户端服务引用

3. 源码分析—SOFARPC客户端服务调用

4. 源码分析—SOFARPC服务端暴露

5.源码分析—SOFARPC调用服务

6.源码分析—和dubbo相比SOFARPC是如何实现负载均衡的?

7.源码分析—SOFARPC是如何实现连接管理与心跳?

8.源码分析—从设计模式中看SOFARPC中的EventBus?

9.源码分析—SOFARPC是如何实现故障剔除的?

10.源码分析—SOFARPC内置链路追踪SOFATRACER是怎么做的?


先把栗子放上,让大家方便测试用:
Service端

public static void main(String[] args) {
    ServerConfig serverConfig = new ServerConfig()
        .setProtocol("bolt") // 设置一个协议,默认bolt
        .setPort(12200) // 设置一个端口,默认12200
        .setDaemon(false); // 非守护线程

    ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
        .setInterfaceId(HelloService.class.getName()) // 指定接口
        .setRef(new HelloServiceImpl()) // 指定实现
        .setServer(serverConfig); // 指定服务端

    providerConfig.export(); // 发布服务
}

public class HelloServiceImpl implements HelloService {

    private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);

    @Override
    public String sayHello(String string) {
        LOGGER.info("Server receive: " + string);

        // 获取请求透传数据并打印
        System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));
        // 设置响应透传数据到当前线程的上下文中
        RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");

        return "hello " + string + " !";
    }
}

client端

public static void main(String[] args) {
    ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
        .setInterfaceId(HelloService.class.getName()) // 指定接口
        .setProtocol("bolt") // 指定协议
        .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
        .setConnectTimeout(10 * 1000);

    RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");

    HelloService helloService = consumerConfig.refer();

    while (true) {
        System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));
        try {
            LOGGER.info(helloService.sayHello("world"));
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

通过上面的栗子我们可以看出整个流程应该是:

  1. 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
  2. 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
  3. 客户端收到透传数据

所以下面我们从客户端开始源码讲解。

客户端数据透传给服务端

首先客户端在引用之前要设置putRequestBaggage,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。

如下:
ClientProxyInvoker#invoke

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    ....
        // 包装请求
        decorateRequest(request);
       ....
}

通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。

DefaultClientProxyInvoker#decorateRequest

protected void decorateRequest(SofaRequest request) {
    ....
    RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
    RpcInternalContext internalContext = RpcInternalContext.getContext();
    if (invokeCtx != null) {
       ....
        // 如果用户指定了透传数据
        if (RpcInvokeContext.isBaggageEnable()) {
            // 需要透传
            BaggageResolver.carryWithRequest(invokeCtx, request);
            internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
        }
    }
    ....
} 

在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面

BaggageResolver#carryWithRequest

public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
    if (context != null) {
          //获取所有的透传数据
        Map<String, String> requestBaggage = context.getAllRequestBaggage();
        if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传
            request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
        }
    }
}

这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。

服务端接受透传数据

服务端的调用流程如下:

BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 

所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:

ProviderBaggageFilter#invoke

public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
    SofaResponse response = null;
    try {
        //从request中获取透传数据存入到requestBaggage中
        BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
        response = invoker.invoke(request);
    } finally {
        if (response != null) {
            BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
        }
    }
    return response;
}

ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest从request中获取数据

BaggageResolver#pickupFromRequest

public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
    if (context == null && !init) {
        return;
    }
    // 解析请求 
    Map<String, String> requestBaggage = (Map<String, String>) request
        .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
    if (CommonUtils.isNotEmpty(requestBaggage)) {
        if (context == null) {
            context = RpcInvokeContext.getContext();
        }
        context.putAllRequestBaggage(requestBaggage);
    }
}

最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse把响应透传数据回写到response里面。

public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
    if (context != null) {
        Map<String, String> responseBaggage = context.getAllResponseBaggage();
        if (CommonUtils.isNotEmpty(responseBaggage)) {
            String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
            for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                response.addResponseProp(prefix + entry.getKey(), entry.getValue());
            }
        }
    }
}

客户端收到响应透传数据

最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        ....
     // 包装响应
     decorateResponse(response);
        ....
}

decorateResponse是在子类DefaultClientProxyInvoker实现的:

DefaultClientProxyInvoker#decorateResponse

protected void decorateResponse(SofaResponse response) {
   ....
    //如果开启了透传
    if (RpcInvokeContext.isBaggageEnable()) {
        BaggageResolver.pickupFromResponse(invokeCtx, response, true);
    }
   ....
}

这个方法里面会调用BaggageResolver#pickupFromResponse

public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
    if (context == null && !init) {
        return;
    }
    Map<String, String> responseBaggage = response.getResponseProps();
    if (CommonUtils.isNotEmpty(responseBaggage)) {
        String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
        for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
            if (entry.getKey().startsWith(prefix)) {
                if (context == null) {
                    context = RpcInvokeContext.getContext();
                }
                //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉
                context.putResponseBaggage(entry.getKey().substring(prefix.length()),
                    entry.getValue());
            }
        }
    }
}

这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。

到这里SOFARPC数据透传就分析完毕了