Skip to content
标签
服务发现
字数
1070 字
阅读时间
6 分钟

工具类

java

import com.commnetsoft.core.utils.SpringContextUtil;
import com.netflix.discovery.DiscoveryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * 服务发现相关工具服务
 *
 * @author Brack.zhu
 * @date 2020/10/9
 */
@Component
public class DiscoveryHelper {

    @Autowired
    private DiscoverService discoverService;

    private Logger log = LoggerFactory.getLogger(DiscoveryHelper.class);


    /**
     * 获取服务发现中服务IP集合(有缓存,可以保障性能和实时性)。
     *
     * @return
     */
    public Set<String> getDiscoveryHosts() {
        return discoverService.getDiscoveryHosts();
    }

    /**
     * 获取服务发现中服务IP集合(有缓存,可以保障性能和实时性)。
     *
     * @param preIp 预处理IP,如果不存在则主动刷新本地
     * @return
     */
    public Set<String> getDiscoveryHosts(String preIp) {
        Set<String> set = getDiscoveryHosts();
        if (!set.contains(preIp) && invokeDiscoveryClientRefresh()) {
            set = getDiscoveryHosts();
        }
        return set;
    }

    /**
     * 触发DiscoveryClient缓存立即刷新
     *
     * @return
     */
    public boolean invokeDiscoveryClientRefresh() {
        try {
            DiscoveryClient discoveryClient = SpringContextUtil.getBean(DiscoveryClient.class);
            Method method = DiscoveryClient.class.getDeclaredMethod("refreshRegistry", new Class[]{});
            method.setAccessible(true);
            method.invoke(discoveryClient, new Object[]{});
            return true;
        } catch (Exception e) {
            log.error("注册发现数据强制反射刷新(refreshRegistry)方法异常:", e);
        }
        return false;
    }


}

服务端和客户端接口

java

import com.commnetsoft.commons.utils.StringUtils;

import java.util.HashSet;
import java.util.Set;

/**
 * 服务发现
 * @author Brack.zhu
 * @date 2020/11/5
 */
public interface DiscoverService {

    /**
     * 获取服务中服务IP集合(需要有缓存,保障性能和实时性)。
     *
     * @return
     */
    Set<String> getDiscoveryHosts();


    /**
     * 根据实例id和主机IP,获取服务IP集合
     * @param instanceId
     * @param host
     * @return
     */
    default Set<String> getDiscoveryHosts(String instanceId,String host) {
        Set<String> hosts = new HashSet<>();
        if (StringUtils.isNoneEmpty(instanceId,host)) {
            hosts.add(host);
            //docker(bridge网络模式下)内部ip,如172.17.0.2:9001
            // 目前仅支持ipV4
            if (instanceId.contains(":")) {
                String instanceHost = instanceId.split(":")[0];
                if (!host.equals(instanceHost)) {
                    hosts.add(instanceHost);
                    //增加(bridge网络模式下)的网关ip
                    hosts.add(getNetGatewayIp(instanceHost));
                }
            }
        }
        return hosts;
    }

    /**
     * 获取指定IP的网关ip,一般是对ip尾号修改为1<br/>>
     * 仅支持IPv4
     *
     * @param ipV4Ip
     * @return
     */
    default String getNetGatewayIp(String ipV4Ip) {
        String[] ips = ipV4Ip.split("\\.");
        return ips[0] + "." + ips[1] + "." + ips[2] + "." + 1;
    }

}

服务端服务

java

import com.commnetsoft.core.discovery.DiscoverService;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.google.common.collect.Sets;
import com.netflix.discovery.shared.Application;
import com.netflix.eureka.registry.InstanceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRegisteredEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * 服务注册发现管理--面向服务端<br/>
 *  如果同时存在EurekaServer/EurekaClient 时优先客户端端
 * @author Brack.zhu
 * @date 2020/11/5
 */
@Component
@ConditionalOnExpression("#{'eureka'.equals(environment.getProperty('spring.application.name'))}")
public class ServerDiscoverService implements DiscoverService, ApplicationListener<EurekaInstanceRegisteredEvent> {

    /**
     * 注册服务中的Ip集合,去除重复。
     */
    private volatile Set<String> cacheDiscoveryHosts;

    private Logger log= LoggerFactory.getLogger(ServerDiscoverService.class);

    @PostConstruct
    public void init(){
        log.info("服务注册发现管理--面向服务端构建完成!{}",ServerDiscoverService.class);
    }

    @Override
    public void onApplicationEvent(EurekaInstanceRegisteredEvent event) {
        //注册服务发生的所有事件都将缓存重置
        log.debug("EurekaRegistered服务事件:{}",event);
        cacheDiscoveryHosts = null;
    }

    @Override
    public Set<String> getDiscoveryHosts() {
        if (null != cacheDiscoveryHosts) {
            return  Sets.newCopyOnWriteArraySet(cacheDiscoveryHosts);
        }
        InstanceRegistry instanceRegistry = SpringContextUtil.getBean(InstanceRegistry.class);
        List<Application> apps = instanceRegistry.getSortedApplications();
        Set<String> hosts = new HashSet<>();
        if (null != apps) {
            apps.forEach(application ->
                    application.getInstances().forEach(instanceInfo -> {
                                Set<String> tempHosts = getDiscoveryHosts(instanceInfo.getInstanceId(), instanceInfo.getIPAddr());
                                hosts.addAll(tempHosts);
                            }
                    ));
            cacheDiscoveryHosts = hosts;
        }
        log.info("新的白名单服务IP集合:{}",hosts);
        return hosts;
    }


}

客户端接口

java

import com.commnetsoft.core.discovery.DiscoverService;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.google.common.collect.Sets;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEvent;
import com.netflix.discovery.EurekaEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.Set;

/**
 * 服务注册发现管理--面向客户端
 *   如果同时存在EurekaServer/EurekaClient 时优先客户端端
 *   注意:由于存在EurekaClient于服务端数据同步时间差,存在最长
 *   {eureka.client.registry-fetch-interval-seconds}x2 的时间内是老数据的情况
 *
 *
 *   DiscoveryManager.getInstance()
 * @author Brack.zhu
 * @date 2020/11/5
 */
@Primary
@Component
@ConditionalOnExpression("#{!'eureka'.equals(environment.getProperty('spring.application.name'))}")
public class ClientDiscoverService implements DiscoverService, EurekaEventListener, ApplicationListener<EnvironmentChangeEvent> {


    /**
     * 注册服务中的Ip集合,去除重复。
     */
    private volatile Set<String> cacheDiscoveryHosts;

    private Logger log= LoggerFactory.getLogger(ClientDiscoverService.class);

    @PostConstruct
    public void init(){
        log.info("服务注册发现管理--面向客户端构建完成!{}",ClientDiscoverService.class);
    }

    public ClientDiscoverService() {
        registerEurekaClientEventListener();
    }

    /**
     * 注册EurekaClient事件监听
     */
    private  void registerEurekaClientEventListener(){
        EurekaClient eurekaClient = SpringContextUtil.getBean(EurekaClient.class);
        eurekaClient.registerEventListener(this);
    }

    @Override
    public Set<String> getDiscoveryHosts() {
        if (null != cacheDiscoveryHosts) {
            return  Sets.newCopyOnWriteArraySet(cacheDiscoveryHosts);
        }
        DiscoveryClient discoveryClient = SpringContextUtil.getBean(DiscoveryClient.class);
        Set<String> hosts = new HashSet<>();
        discoveryClient.getServices().forEach(service -> {
            discoveryClient.getInstances(service).forEach(instance -> {
                Set<String> tempHosts = getDiscoveryHosts(instance.getInstanceId(),instance.getHost());
                hosts.addAll(tempHosts);
            });
        });
        cacheDiscoveryHosts = hosts;
        log.info("新的白名单服务IP集合:{}",hosts);
        return hosts;
    }

    @Override
    public void onEvent(EurekaEvent event) {
        //注册服务发生的所有事件都将缓存重置
        log.debug("EurekaClient服务事件:{}",event);
        cacheDiscoveryHosts = null;
    }

    @Override
    public void onApplicationEvent(EnvironmentChangeEvent event) {
        //统一配置导致DiscoveryClient对象重新创建,则需要在新的DiscoveryClient对象重新注册EurekaEvent监听
        new Thread(){

            @Override
            public void run() {
                //延迟创建监听---等待DiscoveryClient重载成功
                //暂停方案存在不确定性-后续完善
                try {
                    Thread.sleep(15000);
                } catch (InterruptedException e) {
                    //ignore
                }
                registerEurekaClientEventListener();
            }
        }.start();
    }
}