上一篇文章主要介绍了服务提供端的大致实现,这一篇我们继续研究服务消费端如何实现。
服务提供端启动后,接口已经对外暴露出来了,且提供端已经注册到注册中心里面,那消费端的主要工作就是从配置中心获取提供端的注册信息,然后订阅其提供的接口。
消费端配置如下:
package com.zxm.rpc.config;
import com.zxm.rpc.proxy.ReferenceProxyInvoker;
public class ReferenceConfig<T> {
private ReferenceProxyInvoker referenceProxyInvoker;
//ms
private long timeout = 5000;
private String apiName;
private String providerName;
private String registryHost;
private Integer registryPort = 6379;
public long getTimeout() {
return timeout;
}
public ReferenceConfig timeout(long timeout) {
this.timeout = timeout;
return this;
}
public String getApiName() {
return apiName;
}
public ReferenceConfig apiName(String apiName) {
this.apiName = apiName;
return this;
}
public String getProviderName() {
return providerName;
}
public ReferenceConfig providerName(String providerName) {
this.providerName = providerName;
return this;
}
public String getRegistryHost() {
return registryHost;
}
public ReferenceConfig registryHost(String registryHost) {
this.registryHost = registryHost;
return this;
}
public Integer getRegistryPort() {
return registryPort;
}
public ReferenceConfig registryPort(Integer registryPort) {
this.registryPort = registryPort;
return this;
}
public static ReferenceConfig instance(){
return new ReferenceConfig();
}
public ReferenceConfig build(){
this.referenceProxyInvoker = new ReferenceProxyInvoker(this);
return this;
}
public T register() {
try {
return this.referenceProxyInvoker.register();
} catch (ClassNotFoundException e) {
throw new RuntimeException("register fail, msg is :" + e.getMessage());
}
}
}
主要包括注册中心的信息、超时时间的设置以及接口信息等等。register方法的作用是初始化消费端的代理层,内部完成注册信息的获取及底层通信组件的初始化工作等等。
public ReferenceProxyInvoker(ReferenceConfig referenceConfig) {
this.referenceConfig = referenceConfig;
template = JedisTemplateFactory.newInstance(referenceConfig.getRegistryHost(), referenceConfig.getRegistryPort());
initNettyBootstrap();
}
private void initNettyBootstrap() {
String url = template.getUrls(referenceConfig.getProviderName());
String[] urlArgs = url.split(":");
this.nettyClientBootstrap = NettySocketClientFactory.newInstance(Integer.valueOf(urlArgs[1]), urlArgs[0]);
}
请求调用器的初始化逻辑,先是初始化redis(注册中心)连接客户端(单例),然后初始化底层通信模块的netty客户端,从配置中心中获取接口的相关信息,最后完成初始化工作。
public <T> T register() throws ClassNotFoundException {
Class clazz = Class.forName(referenceConfig.getApiName());
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (Object proxy, Method method, Object[] args) -> invoke(clazz, method, args));
}
private Object invoke(Class clazz, Method method, Object[] args) {
RpcProtocol protocol = rpcProtocolWrapper(clazz, method, args);
this.nettyClientBootstrap.getSocketChannel().writeAndFlush(RpcSerializer.serialize(protocol));
return poll(protocol);
}
private Object poll(RpcProtocol protocol) {
long pollTime = System.currentTimeMillis();
while (true) {
if ((System.currentTimeMillis() - pollTime) > referenceConfig.getTimeout()) {
throw new RuntimeException("rpc request remote timeout");
}
RpcResult rpcResult = this.nettyClientBootstrap.getRpcResult(protocol.getId());
if (rpcResult != null) {
this.nettyClientBootstrap.removeRpcResult(protocol.getId());
return rpcResult.getValue();
}
}
}
上述register方法的实现就是基于java反射原理,完成订阅的远程目标类的初始化工作。在代理方法中通过netty通信完成远程调用。poll方法是同步在轮询远程调用的返回结果,一旦获取到响应结果,就立刻获取到,返回给上层。
netty通信的过程中,需要对传输的数据进行一个封装,并序列化,协议规则如下:
请求协议
package com.zxm.rpc.utils;
import java.io.Serializable;
public class RpcProtocol implements Serializable{
private static final long serialVersionUID = -8478844905397248186L;
private String id;
private String interfaceName;
private String methodName;
private Class[] parameterTypes;
private Object[] args;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
}
响应结果协议
package com.zxm.rpc.utils;
import java.io.Serializable;
public class RpcResult implements Serializable{
private static final long serialVersionUID = -8478844905397248185L;
private String id;
private Object value;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
}
请求远程调用时的初始化
private RpcProtocol rpcProtocolWrapper(Class clazz, Method method, Object[] args) {
RpcProtocol protocol = new RpcProtocol();
protocol.setId(UUID.randomUUID().toString().replace("-", ""));
protocol.setInterfaceName(clazz.getName());
protocol.setMethodName(method.getName());
protocol.setParameterTypes(method.getParameterTypes());
protocol.setArgs(args);
return protocol;
}
结果响应和请求的匹配主要是基于id(uuid)来关联的。
这样,服务消费端的实现基本是完成了,基本原理相对是比较简单的,dubbo等类似的开源框架的基础实现原理都是一样的,不过是在实现细节上处理的更加到位而已,支持的特性包括协议、loadbalance策略等等也比较丰富,后面有机会可以分享下其实现方式,有兴趣的朋友们可以阅读下源码,定会获益匪浅。
至此,我们徒手编写服务发现框架系列的文章基本靠一段落了,后续可能会继续分享一些相关的小专题。作者文笔不佳,水平有限,仅以知识分享和沉淀为目的。一步步也是从初学者走来,建议大家多动手,多实践,读万卷书不如行万里路!互联网的一些技术本身原理及使用并不复杂,站在巨人的肩膀上学习,成功就在眼前!
项目地址: https://github.com/zhangxiaomin1993/rpc-server-sdk
声明:文章和项目不以商业盈利为目的,仅作为本人技术积累的沉淀,分享给大家,有兴趣的朋友欢迎访问交流,共同学习和进步!大佬和专家路过,不喜勿喷!