Skip to content
字数
4553 字
阅读时间
19 分钟

一、概述

1.1 互联网的发展

fazhan

  • 单一应用架构 当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,用于简化增删改查工作量的数据访问框架(ORM)是关键。
  • 垂直应用架构 当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。此时,用于加速前端页面开发的Web框架(MVC)是关键。
  • 分布式服务架构 当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
  • 流动计算架构 当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。

在大规模服务化之前,应用可能只是通过 RMI 或 Hessian 等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用,通过 F5 等硬件进行负载均衡。 当服务越来越多时,服务 URL 配置管理变得非常困难,F5 硬件负载均衡器的单点压力也越来越大。 此时需要一个服务注册中心,动态的注册和发现服务,使服务的位置透明。并通过在消费方获取服务提供方地址列表,实现软负载均衡和 Failover,降低对 F5 硬件负载均衡器的依赖,也能减少部分成本。 当进一步发展,服务间依赖关系变得错踪复杂,甚至分不清哪个应用要在哪个应用之前启动,架构师都不能完整的描述应用的架构关系。 这时,需要自动画出应用间的依赖关系图,以帮助架构师理清理关系。 接着,服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器? 为了解决这些问题,第一步,要将服务现在每天的调用量,响应时间,都统计出来,作为容量规划的参考指标。其次,要可以动态调整权重,在线上,将某台机器的权重一直加大,并在加大的过程中记录响应时间的变化,直到响应时间到达阈值,记录此时的访问量,再以此访问量乘以机器数反推总容量。

1.2 dubbo://协议

Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。 反之,Dubbo 缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 tbremoting 交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO 异步传输
  • 序列化:Hessian 二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
  • 适用场景:常规远程服务方法调用

1.3 常见RPC框架

RPC(Remote Procedure Call)— 远程过程调用 ,它是一种通过 网络 从远程计算机程序上请求服务,而不需要了解底层网络技术的协议,在面向对象的编程语言中,远程过程调用即是 远程方法调用

  • RMI

    1)RMI(remote method invocation)是java原生支持的远程调用,RMI采用JRMP(JavaRemoteMessageing Protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案。

    RMI步骤

    1. 创建远程接口, 并且继承java.rmi.Remote接口
    2. 实现远程接口,并且继承:UnicastRemoteObject
    3. 创建服务器程序: createRegistry()方法注册远程对象
    4. 创建客户端程序 (获取注册信息,调用接口方法)
  • Hessian

    Hessian使用C/S方式,基于HTTP协议传输,使用Hessian二进制序列化。 server端:

    添加hessian的maven依赖

    创建接口service及实现类

    在web.xml中配置HessianServlet

    客户端:

    添加hessian的maven依赖

    创建跟server端相同的接口service

    通过HessianProxyFactory构建调用

  • Thrift:FaceBook开源RPC框架,典型的CS架构,支持跨语言,客户端和服务端可以使用不同的语言开发,thrift通过IDL(Interface Description Language)来关联客户端和服务端。

  • gRPC google

  • dubbo

1.4 手写Rpc框架

provider服务提供

consumer服务消费

registry注册

protocol协议

服务提供者:

1、定义服务接口

接口HelloService

java
public interface HelloService {
 String sayHello(String message);
}

2、实现类HelloServiceImpl

java
public class HelloServiceImpl implements HelloService {
 @Override
 public String sayHello(String name) {
        
        return name+ "调用了myRPC的服务";
    }
}

3、服务注册:注册中心

此处注册中心我们将服务注册在map集合中,结构:Map<String,Map<URL,Class>> 外边map的key存储 服务接口的全类名,URL封装了调用服务的ip和port,里边value指定指定具体实现类 注册中心类提供注册服务并暴露服务和发现服务功能:

java
public class URL {

    private String hostname;
    private Integer port;
    
    @Override
    public boolean equals(Object obj) {
        if(obj==null){
            return false;
        }
        if(!(obj instanceof  URL)){
            return false;
        }
        URL url = (URL) obj;
        if(hostname.equals(((URL) obj).getHostname())  && port.intValue() == url.port.intValue()){
            return true;
        }
        return false;
    }

    @Override
    public int hashCode() {
        return hostname.hashCode();
    }
}
java
public class NativeRegistry {


    private static Map<String, Map<URL,Class>> registCenter = new HashMap<>();


    /**
     * 注册服务
     * @param url
     * @param interfaceName
     * @param implClass
     */
    public static void regist(URL url,String interfaceName,Class implClass){

        Map<URL,Class> map = new HashMap<>();
        map.put(url,implClass);
        registCenter.put(interfaceName,map);
    }

    /**
     * 从注册中心获取服务
     * @param url
     * @param interfaceName
     * @return
     */
    public static Class get(URL url,String interfaceName){
        return registCenter.get(interfaceName).get(url);
    }


}

注册服务

java
public class ServiceProvider {

    public static void main(String[] args) {

        //创建URL
        URL url = new URL("localhost", 8080);

        //注册中心中注册服务
        NativeRegistry.regist(url, HelloService.class.getName(), HelloServiceImpl.class);

        //启动并暴露服务
        HttpServer httpServer = new HttpServer();
        httpServer.start(url.getHostname(),url.getPort());

    }
}

4、暴露服务

服务之间调用的通信协议采用http协议,所以在服务provider中启动tomcat暴露服务

添加内嵌tomcat的依赖

xml
  <!--内嵌tomcat-->
    <dependencies>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-core</artifactId>
            <version>9.0.12</version>
        </dependency>
    </dependencies>

创建HttpServer

java
public class HttpServer {


    /**
     * tomcat服务启动
     * 参考tomcat配置
     * <Server port="8005" shutdown="SHUTDOWN">
     * 	<Service name="Catalina">
     * 		<Connector port="8080" protocol="HTTP/1.1"
     *                connectionTimeout="20000"
     *                redirectPort="8443"
     * 	       URIEncoding="UTF-8"/>
     * 		<Engine name="Catalina" defaultHost="localhost">
     * 			<Host name="localhost"  appBase="webapps"
     *             	unpackWARs="true" autoDeploy="true">
     *     		 	<Context path="" doBase="WORKDIR" reloadable="true"/>
     *      		</Host>
     *      </Engine>
     *   </Service>
     * </Server>
     */


    /**
     * 启动服务
     * @param hostname
     * @param port
     */
    public void start(String hostname,int port){
        // 实例一个tomcat
        Tomcat tomcat = new Tomcat();

        // 构建server
        Server server = tomcat.getServer();

        // 获取service
        Service service = server.findService("Tomcat");

        // 构建Connector
        Connector connector = new Connector();
        connector.setPort(port);
        connector.setURIEncoding("UTF-8");

        // 构建Engine
        Engine engine = new StandardEngine();
        engine.setDefaultHost(hostname);

        // 构建Host
        Host host = new StandardHost();
        host.setName(hostname);

        // 构建Context
        String contextPath = "";
        Context context = new StandardContext();
        context.setPath(contextPath);
        context.addLifecycleListener(new Tomcat.FixContextListener());// 生命周期监听器

        // 然后按照server.xml,一层层把子节点添加到父节点
        host.addChild(context);
        engine.addChild(host);
        service.setContainer(engine);
        service.addConnector(connector);
        // service在getServer时就被添加到server节点了

        // tomcat是一个servlet,设置路径与映射
        tomcat.addServlet(contextPath,"dispatcher",new DispatcherServlet());
        context.addServletMappingDecoded("/*","dispatcher");

        try {
            tomcat.start();// 启动tomcat
            tomcat.getServer().await();// 接受请求
        }catch (LifecycleException e){
            e.printStackTrace();
        }
    }


}

DispatcherServlet

public class DispatcherServlet extends HttpServlet {

    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        new HttpServerHandler().handle(req,resp);
    }
}

HttpServerHandler处理远程调用请求

java
public class HttpServerHandler {


    /**
     *  服务的处理
     * @param req
     * @param resp
     * @throws ServletException
     * @throws IOException
     */
    public void handle(HttpServletRequest req, HttpServletResponse resp){
        try {
            //服务请求的处理逻辑

            //1 通过请求流获取请求服务调用的参数
            InputStream inputStream = req.getInputStream();
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);

            Invocation invocation = (Invocation) objectInputStream.readObject();

            //2 从注册中心获取服务的列表
            Class implCass = NativeRegistry.get(new URL("localhost", 8080), invocation.getInterfaceName());

            //3 调用服务 反射
            Method method = implCass.getMethod(invocation.getMethodName(),invocation.getParamTypes());

            String result = (String) method.invoke(implCass.newInstance(), invocation.getParams());

            //4 结果返回
            IOUtils.write(result,resp.getOutputStream());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }


    }

}

封装调用参数Invocation

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Invocation implements Serializable {

    private String interfaceName;
    private String methodName;
    private Object[] params;
    private Class[] paramTypes;

}

启动服务

java
public class ServiceProvider {

    public static void main(String[] args) {

        //创建URL
        URL url = new URL("localhost", 8080);

        //注册中心中注册服务
        NativeRegistry.regist(url, HelloService.class.getName(), HelloServiceImpl.class);

        //启动并暴露服务
        HttpServer httpServer = new HttpServer();
        httpServer.start(url.getHostname(),url.getPort());

    }
}

4、consumer服务消费端

封装HttpClient对象,发起远程调用j

java
public class HttpClient {

    /**
     * 远程方法调用
     * @param hostname :远程主机名
     * @param port :远程端口号
     * @param invocation :封装远程调用的信息
     */
    public String post(String hostname, int port, Invocation invocation) {


        try {
            URL url = new URL("http", hostname, port, "/client/");
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("POST");
            connection.setDoOutput(true);// 必填项

            //发送调用的信息
            OutputStream os = connection.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeObject(invocation);
            oos.flush();
            oos.close();

            // 将输入流转为字符串(此处可是java对象) 获取远程调用的结果
            InputStream is = connection.getInputStream();
            return IOUtils.toString(is);

        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;

    }

}

调用测试

java
public class Consumer {
    

    
    public static void main(String[] args) {

        //封装一个invocation
        Invocation invocation = new Invocation(HelloService.class.getName(), "sayHello2",
                new Object[]{"学IT,来黑马"}, new Class[]{String.class});

        //远程调用服务
        String result = new HttpClient().post("localhost", 8080, invocation);

        System.out.println("远程调用执行的结果result="+result);
    }
}

二、dubbo源码解析

2.1 dubbo SPI源码解析:

通过 ExtensionLoader 的 getExtensionLoader 方法获取一个 ExtensionLoader 实例,该方法方法先从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例

createExtension方法包含如下步骤

  • 通过 getExtensionClasses 获取所有的拓展类
  • 通过反射创建拓展对象
  • 向拓展对象中注入依赖
  • 将拓展对象包裹在相应的 Wrapper 对象中

injectExtension方法依赖注入实现原理:Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。

第一个步骤中获取所有的扩展类方法getExtensionClasses,该方法先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过loadExtensionClasses 加载拓展类

loadExtensionClasses 方法做了两件事,一是解析SPI注解,二是调用 loadDirectory 方法加载指定文件夹配置文件

loadDirectory 方法获取classLoader ,通过classLoader获取URL资源信息,遍历URL通过loadResource加载资源

loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法

loadClass方法主要用用用于操作缓存

2.2 dubbo服务暴露

image-20220827232400717

spring容器启动,会加载BeanDefinitionParser类来解析配置文件,dubbo配置文件的加载依赖实现类DubboBeanDefinitionParser,DubboBeanDefinitionParser解析器会将配置文件中不同的标签解析成不同的xxxConfig,<dubbo:service/><dubbo:reference/>分别解析成serviceBean和 referenceBean

serviceBean实现了InitializingBean和ApplicationListener接口,在afterPropertiesSet方法中主要将配置文件中的属性依次配置到对应的bean中,在 Spring 上下文刷新事件后会回调onApplicationEvent方 法

调用父类ServiceConfig对象的export方法,检查延迟和是否导出,执行doExportUrls方法

doExportUrls方法首先是通过 loadRegistries 加载注册中心链接,后再遍历 ProtocolConfig 集合导出每个服务。并在导出服务的过程中,将服务注册到注册中心。下面,我们先来看一下 loadRegistries 方法的逻辑

doExportUrlsFor1Protocol方法主要将版本、时间戳、方法名以及各种配置对象的字段信息放入到map 中,map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息,最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象

无论是导出服务到本地还是远程都需要创建Invoker对象,Invoker是ProxyFactory 代理工厂创建的对象,invoke封装了调用实体。然后根据 scope 参数,决定导出服务到本地还是导出到远程。在这里我们重点讨论到导出服务到远程,其中包含服务导出和服务注册两个过程

在doLocalExport方法中导出服务,其中包含创建DubboExporter 和openServer

openServer方法中通过createServer方法创建服务实例

dubbo服务实例默认使用NettyServer,参考源码分析

服务注册调用getRegistry方法,创建连接注册中心,调用register方法注册服务,如果是zookeeper做为注册中心,调用zookeeper客户端创建服务节点,服务注册成功,在ZooInspector可以查看注册服务的节点数据

2.3 服务引入

image-20220827232439688

服务引入原理

dubbo服务引入时机

Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启

服务引入

当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务,管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类(ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入。

服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法

Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。配置解析逻辑封装在 ReferenceConfig 的 init 方法中

现在我们重点来看创建Invoker实例的过程,Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用,Invoker 是由Protocol 实现类DubboProtocol调用refer方法

Dubbo 使用 NettyClient 进行通信,getClients逻辑

根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法。

getSharedClient方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的ExchangeClient 实例。initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 会在 request 方法被调用时通过 Exchangers 的 connect 方法创建ExchangeClient 客户端

connect连接方法中,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,Transporters 的connect方法中调用getTransporter().connect(url, handler),getTransporter 方法是自适应扩展类,默认加载NettyTransporter,调用该类的connect 方法,往下就是通过Netty API创建Netty客户端了。接下来就是为服务接口生成代理对象,代理对象生成的入口方法为 ProxyFactory 的 getProxy方法

最后getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,实现类 JavassistProxyFactory 对该方法的实现

通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现自 JDK 的InvocationHandler 接口。