整体框架图

|575

如上图,
Client 分为以下几层:

  1. Interface Stub: 处理接口,将相关属性封装为 metadata
  2. Stub Proxy: 通过调用信息,获取 metadata, 基于调用参数,生成代理类,然后传递 Invocation
  3. Filter ClientInvoker:调用过滤器, 基于业务逻辑依次过滤 Invocation
  4. ClientInvoker TrasnportClient: 最后经过 ClientInvoker, 然后开始处理 Invocation, 将 Invocation 转化成 Message, 作为传递的参数。过程中会经过 MessageIntercetor 从而定制不同的 Message
  5. TransportClient: 封装 客户端负载均衡 的逻辑,获取对应的客户端,然后将 Message 转化成 Request, 然后发送到服务器端
  6. TransportClient End: Client 收到响应,将之封装成 Result, 然后经过拦截器 ResultInterceptor 将之转化为需要的数据接口,比如 xxBean, 最后返回给调用方,结束本次调用

Server 分为以下几层:

  1. TransportServerHandler TransportServer: 首先要接受到请求 Request, 因为不清楚会从什么样的方式接受到,所以抽象除了 TransportServerHandlerRequest 转化成 Message 后传递给 Server 。核心的逻辑在 Server 中处理。
  2. TransportServer Filter: Server 转化 MessageInvocation, 然后传递给 Invoker
  3. Filter ServerInvoker: 和 Client 的第三部一致,也是过滤 Invocation
  4. ServerInvoker: 最后一个为 ServerInvoker, 会获取到 implement, 然后执行调用逻辑

后面行文,会针对具体各个层级的细节,按照 关键类类图描述 来具体详述。

这里需要注意的是,当前的 RPC 支持以下三种方式

  1. 兼容 v1 版本的消息注入逻辑,接收需要之前版本的服务器框架支持。
  2. RPC+JSON 的框架,接收需要该框架支持。
  3. HTTP+JSON 的框架,接收只需要实现任意服务器框架即可,比如 SpringMVC
    这里的 Server 只是代表 RPC 配套的 Server 的部分。如果是其他的方式, Server 部分的图是不符合的。

Client

Stub 层

Stub

public class Stub<T> {  
    private String serviceName;  
    private Class<T> serviceClass;  
    private final Map<AnyKey<?>, MetaData> metaDataMap = new HashMap<>();
    // lazy
    private T proxy;
}

Stub 会解析类,然后获取 name class metadata
然后等到第一次调用时,基于上文中的信息, 生成 proxy ,并缓存。

|500

MetaData

public class MetaData {  
    private String service;  
    private String endpoint;  
    private Class<?> proxyClass;  
    private Method method;  
    private Type[] parameterTypes;  
    private Type returnType;  
    private TransferMode sendMode = TransferMode.Common;  
    private TransferMode retMode = TransferMode.Common;    
    private Attributes attributes = new Attributes();

在处理成 MetaData 的时候,

  1. 考虑到泛型抽象出了 Type
  2. 考虑到发送,接受方案,抽象出 TransferMode,
  3. 考虑到可能存在的一些注解属性,还处于未知,所以,先处理成 Attributes

Proxy 层

|475

@Override  
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
    // gen or get invocation  
    MetaData behavior = servicePool.getMetaData(proxy, method);  
    Invocation invocation = behavior.derive(new Args(args));  
    RpcCall rpcCall = context.startInternalCall();  
    rpcCall.setInvocation(invocation);  
    Result result = clientInvoker.invoke(invocation);  
    // 简单设计,如果有必要后置,就给 result 加 hook  
    context.endCallIfComplete();  
 
	// 如果有异常,则抛出
    if (result.hasEx()) {  
        throw result.getEx();  
    }  
    return result.getValue();  
}

Filter 层

|500

public Invoker build() {  
    Invoker last = originInvoker;  
    Iterator<Filter> filterIterator = filters.descendingIterator();  
    while (filterIterator.hasNext()) {  
        Filter filter = filterIterator.next();  
        Invoker next = last;  
        last = new FilterNode(next, filter);  
    }  
    return last;  
}

创建过滤链,然后包装成 Invoker, 然后依次过滤 Invocation, 直到最后一个 ClientInvoker

ClientInvoker 层

public Result invoke(Invocation invocation) throws Exception {  
    try {  
        // get channel  
        TransportClient channel = this.clientProvider.getClient();  
        // genTemplate  
        Message message = messageParser.parse(invocation);  
        // interceptor select  
        InterceptorGroup group = interceptorSelector.select(message);  
        // intercept msg  
        MessageInterceptor msgInterceptor = group.getMsgInterceptor();  
        msgInterceptor.intercept(message);  
        // 通过数据管道 发送/获取 结果  
        Result result = channel.send(message);  
        // intercept result  
        ResultInterceptor resultInterceptor = group.getResultInterceptor();  
        resultInterceptor.intercept(result);  
        return result;  
    } catch (FuncException sre) {  
        return new CommonResult(sre.getCause());  
    } catch (Throwable e) {  
        return new CommonResult(e);  
    }  
}

|525

  1. 传入 Invocation 后,首先通过 TransportMessageParser 转换为 Message
  2. 然后经过一轮拦截器 MsgInterceptor
  3. 然后经过 Client 发送 Message
  4. 获取响应 Result
  5. 然后在经过一轮拦截器 ResultInterceptor
  6. 最后返回

MessageParser

public interface TransportMessageParser {  
    Message parse(Invocation invocation);  
}
/**  
 * 放到 headers 里面的属性  
 */  
private Map<String, String> headers = new HashMap<>();  
/**  
 * 放到 参数 的属性  
 */  
private Map<String, String> parameters = new HashMap<>();  
private HttpMethod httpMethod;  
private MediaType consumeType;  
/**  
 * 默认为空  
 */  
private String path = "";  
private byte[] body = new byte[0];  
private Invocation invocation;  
private final Attachments attachments = new Attachments();  
public HttpMessage() {  
}

取决于协议的不同,可以转化为不同的 Message, 供后面 TransportClient 使用。
这里是 HttpMessage, 包含 header, parameter, method(post/get), mediatype, path, body, attachments

MessageInterceptor

|500

MessageInterceptor 主要是往 Message 中注入信息,比如 Body , Parameter 等需要用到的部分。

其中转化为 Body 时,需要序列化框架,因此抽象出了 BodyCodec 来满足不同情况下,对 Body 的处理。BodyCodec 又依赖底层的 Codec 来进行相应的序列化。

RequestBodyInterceptor

@Override  
public void intercept(Message msg) {  
    byte[] body = msg.getBody();  
    if (ArrayUtils.isEmpty(body)) {  
        try {  
            byte[] encode = bodyEncoder.encode(msg);  
            msg.setBody(encode);  
        } catch (Exception e) {  
            FineLoggerFactory.getLogger().error(e.getMessage(), e);  
        }  
    }  
}

HttpRpcBodyCodec

@Override  
public byte[] encode(Message message) throws Exception {  
    byte[] body = message.getBody();  
    MetaData metaData = message.getInvocation().getMetaData();  
    if (metaData.getSendMode().isCommon()) {  
        Invocation invocation = message.getInvocation();  
        Args args = invocation.getArgs();  
        ByteArrayOutputStream baos = new ByteArrayOutputStream();  
        getEncoder().encode(baos, args);  
        return baos.toByteArray();  
    }  
    return body;  
}

JsonCodec

@Override  
public void encode(OutputStream out, Object obj) throws Exception {  
    objectMapper.writeValue(out, obj);  
}

从上面的关键逻辑中可以看出

  • BodyInterceptor : 负责赋值
  • BodyCodec 负责解析 Message , 然后将选中的部分转化成 bytes
    • 参见上文中的 HttpRpcBodyCodec, 这里是会将所有的 Args 转化成 Body, 对应的服务器部分也是将 Body 解析成 Args, 从而正常的使用。类似下图
      • |500
  • JsonCodec 负责处理任意 Object, 转化成 JSON 后写入 OutputStream

ResultInterceptor

ResultInterceptor 和上文中的 MessageInterceptor 功能类似,都是拦截 Result 后进行一些处理。

|500

目前是有三个实现,组合后处理不同的响应场景。

Transport 层

|500

  1. Provider 通过 ConnectorConnectionInfo 创建 Client
  2. Client 接收到 Message 后,
  3. 通过 ClientRequestConstructor 处理成对应的 Request
  4. Client 发送 Request, 接受响应 Response
  5. ClientResponse 转化为 Result
@NotNull  
private Result execute(HttpMessage message) throws Exception {  
    // 构建请求  
    HttpRequestBase request = requestConstructor.construct(message);  
    CloseableHttpResponse response = httpClient.execute(request);  
    // check status  
    StatusLine statusLine = response.getStatusLine();  
    int responseCode = statusLine.getStatusCode();  
    // check ex  
    if (responseCode >= HttpStatus.SC_BAD_REQUEST) {  
        CommonResult httpResult = new CommonResult();  
        String reason = statusLine.getReasonPhrase();  
        String msg = "status is " + responseCode + " reason is " + reason;  
        httpResult.setEx(new TransportException(msg));  
        return httpResult;  
    }  
    // get return bytes  
    Result result = toResult(message, response);  
    result.setInvocation(message.getInvocation());  
    return result;  
}

注:考虑到客户端负载均衡的场景, 如果需要,直接在 Provider 这部分去进行实现即可。

Server

todo

Codec

类图

|500

2023-12-07 修改
|350

接口设计

|450

流程图

sequenceDiagram
	participant codec as RpcJsonCodec
	participant context as JsonCodecContext
	participant registry as JsonModuleRegistry
	participant B as Biz
	B->>registry: 注册 module
    codec->>context: 获取 codec 单例
    context->>registry: 获取 module
    registry->>context: 返回 module
    context->>codec: 返回 codec 单例
    B->>registry: 注册 module
    registry->>context: 失效 codec 单例
    codec->>context: 获取 codec 单例
	context->>registry: 获取 module
    registry->>context: 返回 module
    context->>codec: 获取 codec 单例

关键类

ResponseDeserializer

在服务器返回响应的时候,基于实际业务,我们返回的数据类型如下。

public class Response {  
    //响应状态  
    private int status;  
    //错误码  
    private String errorCode;  
    //错误信息  
    private String errorMsg;  
    //响应实体内容  
    private Object data;
}

这样的数据类型,如果不记录 data 的 class 类型, 很难进行反序列化。比如

ObjectMapper mapper = new ObjectMapper();  
Response ok = Response.ok(new JSONObj("111"));  
String value = mapper.writeValueAsString(ok);  
Response response = mapper.readValue(value, Response.class);
 
private class JSONObj {  
    private String val1;
}

输出的结果如下

|425

可以发现是无法将 value 正常的处理会原来的 JSONObj 的类的。

然而,如果将类的 class 写进去,可以解决这个问题,但会导致Java 中的反序列化漏洞

所以设计了如下结构 BoxType

public class BoxType extends Type {  
    private final Type self;  
    private final Type[] fields;
}

当需要反序列化时,传入 Type(实际是 BoxType)。

@Override  
public Object decode(InputStream in, Type type) throws Exception {  
    try {  
        beforeDecode(type);  
        JsonParser parser = objectMapper.getFactory().createParser(in);  
        return JsonUtils.decode(parser, objectMapper.getTypeFactory(), type);  
    } finally {  
        afterDecode();  
    }  
}

然后此 Type 会通过 ThreadLocal 保存下来。

private final ThreadLocal<Type> types;
 
protected void beforeDecode(Type type) {  
    types.set(type);  
}

然后添加反序列化器 ResponseDeserializer, 当遇到 Response.class 的时候,使用此序列化器

module.addDeserializer(Response.class, new ResponseDeserializer(types));

此序列化器,当处理到内部的 data 的时候,将 BoxType 里面存储的子结构获取出来。供反序列化的时候确认反序列化值。

case "data":
default:
	// 处理序列化值
	Type type = typeHolder.get();
	if (type instanceof BoxType) {
		BoxType structure = (BoxType) type;
		Type[] fields = structure.getFields();
		Type dataType = fields[0];
		Object data = JsonUtils.decode(p, ctxt.getTypeFactory(), dataType);
		response.setData(data);
	}

ArgsRpcSerializer

public class Args {  
    private Object[] args;
}

Args 和上文的 Response 类似,这里就不赘述。

SafeAnnotationIntrospector

如上文所说,反序列化漏洞往往发生在

  • 调用了 ObjectMapper.enableDefaultTyping()函数;
  • 对要进行反序列化的类的属性使用了值为 JsonTypeInfo.Id.CLASS 的@JsonTypeInfo 注解;
  • 对要进行反序列化的类的属性使用了值为 JsonTypeInfo.Id.MINIMAL_CLASS 的@JsonTypeInfo 注解;

所以为了限制用户的使用操作,防止出现异常,这里限制使用注解 JsonTypeInfo
不允许在

  • Object[]
  • Container<x,x,x>
  • Object
    上使用该注解。

测试如下
首先构造一个 Bean, 里面包含 @JsonTypeInfo, 并且为 Object

private static class ContainerBean {  
    @JsonTypeInfo(use = Id.CLASS)  
    private List<Object> list;  
    @JsonTypeInfo(use = Id.CLASS)  
    private Map<String, Object> map;  
    @JsonTypeInfo(use = Id.CLASS)  
    private Object box;  
    @JsonTypeInfo(use = Id.CLASS)  
    private Object[] boxArray;  
    @JsonTypeInfo(use = Id.CLASS)  
    private Object[][] boxArray2;  
}

未处理前,结果为下

{
  "list": [
    {
      "@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$SimpleBean",
      "val": 1
    }
  ],
  "map": {
    "1": {
      "@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$SimpleBean",
      "val": 1
    }
  },
  "box": {
    "@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$BoxBean",
    "bean": {
      "val": 1
    }
  },
  "boxArray": [
    {
      "@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$BoxBean",
      "bean": {
        "val": 1
      }
    }
  ],
  "boxArray2": [
    [
      "[Ljava.lang.Object;",
      [
        {
          "bean": {
            "val": 1
          }
        }
      ]
    ]
  ]
}

可以看到 map, list, object, object[] 默认都会将 class 记录下来。因此会出现反序列化问题。

通过以下方式改造

private static class SafeAnnotationIntrospector extends JacksonAnnotationIntrospector {  
    private static final long serialVersionUID = -7768565555692182793L;  
	@Override
	protected TypeResolverBuilder<?> _findTypeResolver(MapperConfig<?> config, Annotated ann, JavaType baseType) {
		JsonTypeInfo info = _findAnnotation(ann, JsonTypeInfo.class);
		if (info != null) {
			Id use = info.use();
			if (use == Id.CLASS || use == Id.MINIMAL_CLASS) {
				if (baseType.getRawClass() == Object.class) {
					return null;
				}
				if ((baseType.isArrayType() && baseType.getContentType().getRawClass() == Object.class)) {
					return null;
				}
				if (baseType.isContainerType()) {
					int count = baseType.containedTypeCount();
					for (int i = 0; i < count; i++) {
						JavaType javaType = baseType.containedTypeOrUnknown(i);
						if (javaType.getRawClass() == Object.class) {
							return null;
						}
					}
				}
			}
		}
		return super._findTypeResolver(config, ann, baseType);
	}
 
}

输出结果

{
  "list": [
    {
      "val": 1
    }
  ],
  "map": {
    "1": {
      "val": 1
    }
  },
  "box": {
    "bean": {
      "val": 1
    }
  },
  "boxArray": [
    {
      "bean": {
        "val": 1
      }
    }
  ],
  "boxArray2": [
    [
      "[Ljava.lang.Object;",
      [
        {
          "bean": {
            "val": 1
          }
        }
      ]
    ]
  ]
}

可以看到不会有任何 class 属性, 存在序列化问题的部分,都过滤掉

Type

Note

如何正确读取参数的类型,以及当前的支持程度

上文有讲到,反序列化时,需要记录具体的 Type 类型
目前有两种类型

  • com.fr.workspace.rpc.invocation.type.ContainerType 任意 n 个泛型的类,包含 Map, List 等泛型形式
  • com.fr.workspace.rpc.invocation.type.BoxType 盒类型,用于内部有 Object, 并且不指定泛型的类型。

限制为

  • 不支持嵌套的泛型。比如 List<Map<a,b>>

JsonModuleProvider

Note

针对具体类的扩展逻辑,和正常的 @JsonDeserialize 的区别、优势

使用 Jackson 对具体的类进行序列化/反序列化时。有几种区别。

1、使用 SimpleModule

类似下图

ObjectMapper mapper = getMapper();  
SimpleModule module = new SimpleModule(RpcJsonCodec.class.getSimpleName());  
module.addSerializer(A.class, new ASerializer());  
module.addDeserializer(A.class, new ADeserializer());  
mapper.registerModule(module);

但是该方案有一个问题。
当继承 A 时,比如 AImpl。这个时候,序列化时,可以捕获到对应的 Serializer, 反序列化时,由于匹配规则 com.fr.third.fasterxml.jackson.databind.module.SimpleDeserializers#_find

private final JsonDeserializer<?> _find(JavaType type) {  
    if (_classMappings == null) {  
        return null;  
    }  
    return _classMappings.get(new ClassKey(type.getRawClass()));  
}

只会匹配当前类型的 class, 所以导致如果要反序列化 AImpl , 那么是匹配不到 A 的。
而由于我们有 XMLable 的场景,所以必然要判断当前是否是 XMLable 。 所以直接使用如上方案不可以。

但根据上文所述,只需要处理匹配规则即可。 所以通过继承后重写匹配规则,即可以解决上述问题。

SimpleDeserializers deserializers = new SimpleDeserializers() {  
    private static final long serialVersionUID = 7861280549299306670L;  
    @Override  
    public JsonDeserializer<?> findBeanDeserializer(JavaType type, DeserializationConfig config, BeanDescription beanDesc) throws JsonMappingException {  
        JavaType superType = type.findSuperType(Filter.class);  
        if (superType != null) {  
            return deserializer;  
        }  
        return super.findBeanDeserializer(type, config, beanDesc);  
    }  
};
2、使用 @JsonDeserialize @JsonSerialize
@JsonDeserialize(using="")
@JsonSerialize(using="")
private XMLable xmlObj;

该注解有以下几种方法

方法描述
contentConverter指定自定义的内容转换器类
converter指定自定义的转换器类
include指定序列化时要包含的属性
nullsUsing指定自定义的处理 null 值的序列化器类
keyUsing指定自定义的处理键的序列化器类,用在 Map
contentUsing指定自定义的处理内容信息的序列化器类,用在 List
using指定自定义的序列化器类

这个注解的问题有 1 个, 放在对象上时不能识别嵌套的内部 Class

Container {
	Xmlable xmlable;
}

ContainerImpl extends Container {}

Bean {

	@JsonSerialize(using=xmlSerializer)
	@JsonDeserialize(using=xmlDeserializer)
	ContainerImpl container;
}

这样的场景下,Container 内部的 xmlable 是不能被使用 xmlDeserializer 反序列化成 Xmlable 的。
只能改成

Container {
	@JsonSerialize(using=xmlSerializer)
	@JsonDeserialize(using=xmlDeserializer)
	Xmlable xmlable;
}

这种需要改动的地方可能会很多,需要调用者自己考虑的比较多。
这里有有两种影响,
1、pure-http, 是必须考虑这种影响的。
2、rpc-http, 这种是尽量减少同学的感知。

或者

@JsonSerialize(using=xmlSerializer)
@JsonDeserialize(using=xmlDeserializer)
public interface Xmlable {

}

这种改动就导致所有的地方, xmlable 的序列化方式都会改变。

总结

|450

第一种方式,影响的 args 会比较多,但是只针对当前的 mapper, 重新创建一个 mapper, 就不会有影响。 第二种方式,影响的 mapper 会比较多, 因为是通用的注解,所以所有的 mapper 都会被影响到,但是影响的 args 会比较少,只影响有注解的部分。

使用

|450

关键类

@Retention(RetentionPolicy.RUNTIME)  
@Target({ElementType.TYPE})  
@Inherited  
public @interface Service {  
    /**  
     * 服务名称  
     *  
     * @return 名称  
     */  
    String name() default "";  
}
@Retention(RetentionPolicy.RUNTIME)  
@Target({ElementType.TYPE, ElementType.METHOD})  
@Inherited  
public @interface Request {  
    /**  
     * 服务器端接受的类型  
     *   
	 * @return 类型  
     */  
    MediaType consume() default MediaType.APPLICATION_JSON;  
    /**  
     * 传递的方法,post/get/put/delete  
     ** @return 方法  
     */  
    HttpMethod method() default HttpMethod.POST;  
    /**  
     * 请求路径,可以叠加  
     * 使用在类上,则为 base  
     * 使用在方法上,则为 path  
     * 最后的路径 = base + path  
     ** @return 路径  
     */  
    String path();  
}
flowchart TB
	A[service] --> B{exist}
	B -- yes --> C[v2]
	B -- no --> D[v1]
	C --> E[Request]
	E --> F{exist}
	F -- yes --> G[http]
	F -- no --> H[rpc]

RPC 方案

Note

在对应的类上添加 @Service, 代表是新方案。

@Service  
public interface DataFetchService {  
    <T extends Base> DataFetchResult<T> getConfig(Class<T> clazz, String nameSpace);  
    class DataFetchResult<T extends Base> {  
        @JsonTypeInfo(use = Id.CLASS)  
        private List<T> entities;  
        public DataFetchResult() {  
        }  
        public DataFetchResult(List<T> entities) {  
            this.entities =  entities;  
        }  
        public List<T> getEntities() {  
            return entities;  
        }  
        public void setEntities(List<T> entities) {  
            this.entities = entities;  
        }  
    }  
}

该方案,只需要在客户端服务器端同时注册该接口即可。

Http 方案

@Service  
@Request(path="/datafetch")
public interface DataFetchService {  
	@Request(path="/getConfig")
    <T extends Base> DataFetchResult<T> getConfig(Class<T> clazz, String nameSpace);  
}

该方案需要服务器端根据 path 实现对应的 Controller

  
@Controller  
@RequestMapping(value = "/datafetch")  
public class DataFetchController {  
  
    @RequestMapping(value = "/getConfig", method = RequestMethod.POST)  
    @ResponseBody  
    public Response getConfig(HttpServletRequest req,  
                                  HttpServletResponse res, @RequestParam Class<T> clazz, @RequestParam String nameSpace) throws Exception {  
  
		// get config
		DataFetchResult<T> results = xxx;
        return Response.ok(results);  
    }