Posted in

如何手搓一个自定义的 RPC(远程过程调用框架)_AI阅读总结 — 包阅AI

包阅导读总结

1. 关键词:RPC、远程过程调用、自定义框架、Netty、通信协议

2. 总结:

本文介绍了 RPC 的概念、特点、应用场景,重点阐述如何基于 Netty 实现自定义的 RPC 框架,包括通信协议定义、客户端调用及相关功能实现、服务端接收数据及处理等环节。

3. 主要内容:

– 远程过程调用概述

– RPC 概念及特点

– 透明性、客户端-服务器模型、序列化及反序列化、同步及异步调用、错误处理、协议及传输层

– RPC 应用场景

– 分布式系统、客户端-服务器架构、跨平台调用、API 服务、大数据处理、云计算、跨网络服务调用

– 常见的 RPC 框架

– 实现自定义的 RPC

– 主要问题

– 客户端调用、参数序列化、服务端数据接收、执行远程过程、返回结果、客户端接收调用结果

– 自定义通信协议

– 分为 Header 及 Body Content 两部分,规定各部分的作用

– 客户端调用

– 客户端的使用

– 客户端代理工厂的核心功能

– 请求参数序列化

– 请求参数通过网络发送

– Netty 消息编码器

– 服务端接收数据

– 消息解码器

– 请求参数反序列化

思维导图:

文章地址:https://mp.weixin.qq.com/s/dQUebz5N5WFSoe56VO898A

文章来源:mp.weixin.qq.com

作者:京东科技??陈国友

发布时间:2024/8/1 8:41

语言:中文

总字数:6873字

预计阅读时间:28分钟

评分:86分

标签:RPC,Netty,动态代理,序列化,分布式系统


以下为原文内容

本内容来源于用户推荐转载,旨在分享知识与观点,如有侵权请联系删除 联系邮箱 media@ilingban.com

RPC(远程过程调用概述)

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将
远程过程调用(RPC, Remote Procedure Call)是一种通过网络从远程计算机程序上请求服务,而无需了解网络细节的通信技术。在分布式系统中,RPC是一种常用的技术,能够简化客户端与服务器之间的交互。本文将介绍如何基于Netty(网络编程框架)实现一个自定义的简单的RPC框架。

1.1 RPC远程过程调用的主要特点

  • 透明性:调用方(客户端)调用远程服务就像调用本地API函数一样,而无需关心执行过程中的底层的网络通信细节。

  • 客户端-服务器模型:RPC通常基于客户端-服务器模型,客户端发送请求到服务器,服务器处理请求并返回结果。

  • 序列化及反序列化:RPC需要将请求参数序列化成字节流(即数据转换成网络可传输的格式)并通过网络传输到服务器端,服务器端接收到字节流后,需按照约定的协议将数据进行反序列化(即恢复成三原始格式)

  • 同步及异步调用:RPC支持同步、异步调用。同步调用会阻塞直到服务器返回结果,或超时、异常等。而异步调用则可以立即返回,通过注册一个回调函数,在有结果返回的时候再进行处理。从而让客户端可以继续执行其它操作。

  • 错误处理:PRC由于涉及网络通信,因此需要处理各种可能的网络异常,如网络故障,服务宕机,请求超时,服务重启、或上下线、扩缩容等,这些对调用方来说需要保持透明。

  • 协议及传输居:RPC可以基于多种协议和传输层实现,如HTTP、TCP等,本文采用的是基于TCP的自定义协议。

1.2 RPC的应用场景

  • 分布式系统:多个服务之间进行通信,如微服务框架。

  • 客户端-服务器架构:如移动应用与后台服务器的交互。

  • 跨平台调用:不同技术栈之间的服务调用。

  • API服务:通过公开API对外提供功能,使用客户端能方便使用服务提供的功能,如支付网关,身份验证服务等。

  • 大数据处理:在大数据处理框架中,不同节点之间需要频繁通信来协调任务和交接数据,RPC可以提供高效的节点通信机制,如Hadoop 和Spark等大数据框架中节点间的通信。

  • 云计算:在云计算环境中,服务通常分布在多个虚拟机或容器中,通过RPC实现实现服务间的通信和管理。

  • 跨网络服务调用:当应用需要调用部署在不同网络中的服务时,RPC提供了一种简单而建议目前的调用方式,如。跨数据中心或嘴唇地域的服务调用。

1.3 常见的RPC框架

  • JSF:京东开源的分布式服务框架,提供高性能、可扩展、稳定的服务治理能力,支持服务注册及发现,负载均衡、容错机制、服务监控、多种协议支持等。

  • gRPC:基于HTTP/2和Protocol Buffers的高性能RPC框架,由Google开发。

  • Dubbo:一个高性能、轻量级的Java RPC框架,用于提供基于接口的远程服务调用,支持负载均衡、服务自动注册及服务、容错等。

  • JSON-RPC:使用JSON格式编码调用和结果的RPC协议。

  • Apache Thrift:由Facebook开发,支持多种编程语言和协议

实现自定义的RPC

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将

要实现一个自定义的RPC框架需解决以下几个主要问题:

1.客户端调用:客户端调用本地的代理函数(stub代码,这个函数负责将调用转换为RPC请求)。这其实就是一个接口描述文件,它可以有多种形式如JSON、XML、甚至是一份word文档或是口头约定均可,只要客户端及服务端都是遵守这份接口描述文件契约即可。在我们的实际开发中一种常见的方式是服务提供者发布一个包含服务接口类的jar包到maven 中央仓库,调用方通过pom文件将之依赖到本地。

2.参数序列化:代理函数将调用参数进行序列化,并将请求发送到服务器。

3.服务端数据接收:服务器端接收到请求,并将其反序列化,恢复成原始参数。

4.执行远程过程:服务端调用实际的服务过程(函数)并获取结果。

5.返回结果:服务端将调用结果进行序列化,并通过网络传给客户端。

6.客户端接收调用结果:客户到接收到服务端传输的字节流,进行反序列化,转换为实际的结果数据格式,并返回到原始调用方。
下面需我们通过代码一一展示上述各功能是如何实现的。

2.1 自定义通信协议

本文的目的是要实现一个自定义通信协议的远程调用框架,所以首先要定义一个通信协议数据格式。
整个自定义协议总体上分为Header 及 Body Content两部分;Header 占16个字节,又分为4个部分。
前2位为魔法值用于Netty编解码组件,解决网络通信中的粘包、半包等问题,此处不展开细讲。
msgtype用于表示消息的类型,如request(请求)、respone(响应)、heartbeat(心跳)等。
code 占1位,表示请求的响应状态,成功还是失败。
request id占8位,表示请求的序列号,用于后续调用结果的匹配,保证线程内唯一。
body size 占4位,表示实现请求内容的长度,在反序化时读取此长度的内容字节,解析出正确的数据。
客户端、服务端在通信过程中都要按照上述约定的通信协议进行数据的编、解码工作。

2.2 客户端调用

2.2.1 客户端的使用

客户端一般通过接口代理工厂通过动态代理技术来生成一个代理实例,所有的远程调用中的细节,如参数序列化,网络传输,异常处理等都隐藏在代理实例中实现,对调用方来说调用过程是透明的,就像调用本地方法一样。

首先看一下客户端的使用方式,本文假设一个IShoppingCartService (购物车)的接口类,基中有一个方法根据传入的用户pin,返回购物车详情。

ShoppingCart shopping(String pin);
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)                                .setSerializerType(SerializerType.JDK)                 .newProxyInstance(); 
ShoppingCart result = serviceProxy.shopping("userPin");log.info("result={}", JSONObject.toJSONString(result));

2.2.2 客户端代理工厂的核心功能

public class ProxyFactory<I> {            public I newProxyInstance() {                     ServiceData serviceData = new ServiceData(                group,                 providerName,                 StringUtils.isNotBlank(version) ? version : "1.0.0"         );
Calller caller = newCaller().timeoutMillis(timeoutMillis); Strategy strategy = StrategyConfigContext.of(strategy, retries); Object handler = null; switch (invokeType) { case "syncCall": handler = new SyncCaller(serviceData, caller); break; case "asyncCall": handler = new AsyncCaller(client.appName(), serviceData, caller, strategy); break; default: throw new RuntimeException("未知类型: " + invokeType); }
return ProxyEnum.getDefault().newProxy(interfaceClass, handler); } }

代码 ProxyEnum.getDefault().newProxy(interfaceClass, handler) 返回一个具体的代理实例,此方法要求传入两个参数,interfaceClass 被代理的接口类class,即服务方所发布的服务接口类。

handler 为动态代理所需要代码增强逻辑,即所有的调用细节都由此增强类完成。按照动态代理的实现方式的不同,本文支持两种动态代理方式:

1.JDK动态代码,如采用此方式,handler 需要实现接口 InvocationHandler

2.ByteBuddy,它是一个用于在运行时生成、修改和操作Java类的库,允许开发者通过简单的API生成新的类或修改已有的类,而无需手动编写字节码,它广泛应用于框架开发、动态代理、字节码操作和类加载等领域。

本文默认采用第二种方式,通过代码简单展示一下代理实例的的生成方式。

public <T> T newProxy(Class<T> interfaceType, Object handler) {            Class<? extends T> cls = new ByteBuddy()                                         .subclass(interfaceType)                                          .method(ElementMatchers.isDeclaredBy(interfaceType))                                                             .intercept(MethodDelegation.to(handler, "handlerInstance"))                    .make()                                        .load(interfaceType.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)                    .getLoaded();
try { return cls.newInstance(); } catch (Throwable t) { …… } }
本文以同步调用为例,现在展示一下 SyncInvoker 的具体实现逻辑。
public class SyncCaller extends AbstractCaller {            @RuntimeType    public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {                StarGateRequest request = createRequest(methodName, args);                        Invoker invoker = new FastFailInvoker();                Future<?> future = invoker.invoke(request, method.getReturnType());        if (sync) {                        Object result = future.getResult();            return result;        } else {            return future;        }    }}
通过以上核心代码,客户端就完成了服务调用环节,下一步RPC框架需要将客户端请求的接口方法及方法参数进行序列化并通过网络进行传输。下面通过代码片段展示一下序列化的实现方式。

2.2.3 请求参数序列化

我们将请求参数序列化的目的就是将具体的请求参数转换成字节组,填充进入上述自定义协议的 body content 部分。下面通过代码演示一下如何进行反序列化。

本文默认采用JDK原生的对象序列化及反序列化框架,也可通过SPI技术扩展支持Protocol Buffers等。

public <T> Future<T> invoke(StarGateRequest request, Class<T> returnType) throws Exception { final Serializer _serializer = serializer(); final Message message = request.message(); Channel channel = selectChannel(message.getMetadata()); byte code = _serializer.code(); byte[] bytes = _serializer.writeObject(message); request.bytes(code, bytes);
return write(channel, request, returnType); }
public <T> byte[] writeObject(T obj) {        ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();        try (ObjectOutputStream output = new ObjectOutputStream(buf)) {            output.writeObject(obj);            output.flush();            return buf.toByteArray();        } catch (IOException e) {            ThrowUtil.throwException(e);        } finally {            OutputStreams.resetBuf(buf);        }        return null;     }

2.2.4 请求参数通过网络发送

protected <T> DefaultFuture<T> write(final Channel channel,                                               final StarGateRequest request,                                               final Class<T> returnType) {        
final Future<T> future = DefaultFuture.newFuture(request.invokeId(), channel, timeoutMillis, returnType);
channel.writeAndFlush(request).addListener((ChannelFutureListener) listener -> { if (listener.isSuccess()) { …… } else { DefaultFuture.errorFuture(channel, response, dispatchType); } });
return future; }
2.2.4.1 Netty 消息编码器
消息写入Netty channel 后,会依次经过 channel pipline 上所安装的各种handler处理,然后再通过物理网络将数据发送出去,这里展示了客户端及服务端所使用的自定义编、解解器。
public class StarGateEncoder extends MessageToByteEncoder<Payload> {

private void doEncodeRequest(RequestPayload request, ByteBuf out) { byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST); long invokeId = request.invokeId(); byte[] bytes = request.bytes(); int length = bytes.length;
out.writeShort(StarGateProtocolHeader.Head) .writeByte(sign) .writeByte(0x00) .writeLong(invokeId) .writeInt(length) .writeBytes(bytes); }
}

至此,通过上述核心代码,客户的请求已经按照自定义的协议格式进行了序列化,并把数据写入到Netty channel中,最后通过物理网络传输到服务器端。

2.3 服务端接收数据

2.3.1 消息解码器

服务器端接收到客户端的发送的数据后,需要进行正确的消息解码,下面是解码器的实现。

public class StarGateDecoder extends ReplayingDecoder<StarGateDecoder.State> {

@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { switch (state()) { case HEAD: checkMagic(in.readShort()); checkpoint(State.HEAD); case SIGN: header.sign(in.readByte()); checkpoint(State.STATUS); case STATUS: header.status(in.readByte()); checkpoint(State.ID); case ID: header.id(in.readLong()); checkpoint(State.BODY_SIZE); case BODY_SIZE: header.bodySize(in.readInt()); checkpoint(State.BODY); case BODY: switch (header.messageCode()) { case StarGateProtocolHeader.REQUEST: { int length = checkBodySize(header.bodySize()); byte[] bytes = new byte[length]; in.readBytes(bytes); RequestPayload request = new RequestPayload(header.id()); request.bytes(header.serializerCode(), bytes); out.add(request); break; }
default: throw new Exception("错误标志位"); } checkpoint(State.HEAD); } }
}

2.3.2 请求参数反序列化

public class ServiceHandler extends ChannelInboundHandlerAdapter {

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (msg instanceof RequestPayload) { StarGateRequest request = new StarGateRequest((RequestPayload) msg); byte code = request.serializerCode(); Serializer serializer = SerializerFactory.getSerializer(code); byte[] bytes = payload.bytes(); Message message = serializer.readObject(bytes, Message.class); log.info("message={}", JSONObject.toJSONString(message));
request.message(message);
process(message); } else { ReferenceCountUtil.release(msg); } }
}

2.3.3 处理客户端请求

经过反序列化后,服务端可以知道用户所请求的是哪个接口、方法、以及实际的参数值,下一步就可进行真实的方法调用。

public void process(Message message) {            try {       ServiceMetadata metadata = msg.getMetadata();        String providerName = metadata.getProviderName(); 
providerName = findServiceImpl(providerName); String methodName = msg.getMethodName(); Object[] args = msg.getArgs();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?> clazz = classLoader.loadClass(providerName); Object instance = clazz.getDeclaredConstructor().newInstance();
Method method = null; Class<?>[] parameterTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) { parameterTypes[i] = args[i].getClass(); } method = clazz.getMethod(methodName, parameterTypes);
Object invokeResult = method.invoke(instance, args); } catch (Exception e) { log.error("调用异常:", e); throw new RuntimeException(e); }
doProcess(invokeResult);
}

2.3.4 返回调用结果

通过反射调用接口实现类,获取调用结果,然后对结果进行序列化并包装成response响应消息,将消息写入到channel, 经过channel pipline 上所安装的编码器对消息对象进行编码,最后发送给调用客户端。
private void doProcess(Object realResult) {        ResultWrapper result = new ResultWrapper();        result.setResult(realResult);        byte code = request.serializerCode();        Serializer serializer = SerializerFactory.getSerializer(code);                Response response = new Response(request.invokeId());                byte[] bytes = serializer.writeObject(result);        response.bytes(code, bytes);        response.status(Status.OK.value());
channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { log.info("响应成功"); } else { log.error("响应失败, channel: {}, cause: {}.", channel, channelFuture.cause()); } } }); }

同样的,消息写入channel 后,先依次经过pipline 上所安装的 消息编码器,再发送给客户端。具体编码方式同客户端编码器类似,此处不再赘述。

2.4 客户端接收调用结果

客户端收到服务端写入响应消息后,同样经过Netty channel pipline 上所安装的解码器,进行正确的解码。然后再对解码后的对象进行正确的反序列化,最终获得调用结果 。具体的解码,反序列化过程不再赘述,流程基本同上面服务端的解码及反序列化类似。

public class consumerHandler extends ChannelInboundHandlerAdapter {

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (msg instanceof ResponseMessage) { try { ResponseMessage responseMessage= (ResponseMessage)msg StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg()); byte code = response.serializerCode(); Serializer serializer = SerializerFactory.getSerializer(code); byte[] bytes = responseMessage.bytes(); Result result = serializer.readObject(bytes, Result.class); response.result(result);
long invokeId = response.id(); DefaultFuture<?> future = FUTURES_MAP.remove(invokeId);
byte status = response.status(); if (status == Status.OK.value()) { complete((V) response.getResult()); } else { }
} catch (Throwable t) { log.error("调用记录: {}, on {} #channelRead().", t, ch); } } else { log.warn("消息类型不匹配: {}, channel: {}.", msg.getClass(), ch); ReferenceCountUtil.release(msg); } }
}
下面再通过一个简单的调用时序图展示一下一次典型的Rpc调用所经历的步骤。

结尾

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将

本文首先简单介绍了一下RPC的概念、应用场景及常用的RPC框架,然后讲述了一下如何自己手动实现一个RPC框架的基本功能。目的是想让大家对RPC框架的实现有一个大概思路,并对Netty 这一高效网络编程框架有一个了解,通过对Netty 的编、解码器的学习,了解如何自定义一个私有的通信协议。限于篇幅本文只简单讲解了RPC的核心的调用逻辑的实现。真正生产可用的RPC框架还需要有更多复杂的功能,如限流、负载均衡、融断、降级、泛型调用、自动重连、自定义可扩展的拦截器等等。

另外RPC框架中一般有三种角色,服务提供者、服务消费者、注册中心,本文并没有介绍注册中心如何实现。并假定服务提供者已经将服务发布到了注册中心,服务消费者跟服务提供者之间建立起了TCP 长连接。

后续会通过其它篇章介绍注册中心,服务自动注册,服务发现等功能的实现原理。

*注:参考资料开源代码库 Jupiter ( https://github.com/fengjiachun/Jupiter.git ),对RPC框架实现原理感兴趣的同学,强烈建议阅读此代码,肯定获益匪浅。
底层能力:维护用户基础数据、行为数据建模、用户画像分析、精准营销策略的制定

功能支撑:会员成长体系、等级计算策略、权益体系、营销底层能力支持

▪用户活跃:会员关怀、用户触达、活跃活动、业务线交叉获客、拉新促活

‍‍