标签
服务发现
字数
1070 字
阅读时间
6 分钟
工具类
java
import com.commnetsoft.core.utils.SpringContextUtil;
import com.netflix.discovery.DiscoveryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.Set;
/**
* 服务发现相关工具服务
*
* @author Brack.zhu
* @date 2020/10/9
*/
@Component
public class DiscoveryHelper {
@Autowired
private DiscoverService discoverService;
private Logger log = LoggerFactory.getLogger(DiscoveryHelper.class);
/**
* 获取服务发现中服务IP集合(有缓存,可以保障性能和实时性)。
*
* @return
*/
public Set<String> getDiscoveryHosts() {
return discoverService.getDiscoveryHosts();
}
/**
* 获取服务发现中服务IP集合(有缓存,可以保障性能和实时性)。
*
* @param preIp 预处理IP,如果不存在则主动刷新本地
* @return
*/
public Set<String> getDiscoveryHosts(String preIp) {
Set<String> set = getDiscoveryHosts();
if (!set.contains(preIp) && invokeDiscoveryClientRefresh()) {
set = getDiscoveryHosts();
}
return set;
}
/**
* 触发DiscoveryClient缓存立即刷新
*
* @return
*/
public boolean invokeDiscoveryClientRefresh() {
try {
DiscoveryClient discoveryClient = SpringContextUtil.getBean(DiscoveryClient.class);
Method method = DiscoveryClient.class.getDeclaredMethod("refreshRegistry", new Class[]{});
method.setAccessible(true);
method.invoke(discoveryClient, new Object[]{});
return true;
} catch (Exception e) {
log.error("注册发现数据强制反射刷新(refreshRegistry)方法异常:", e);
}
return false;
}
}服务端和客户端接口
java
import com.commnetsoft.commons.utils.StringUtils;
import java.util.HashSet;
import java.util.Set;
/**
* 服务发现
* @author Brack.zhu
* @date 2020/11/5
*/
public interface DiscoverService {
/**
* 获取服务中服务IP集合(需要有缓存,保障性能和实时性)。
*
* @return
*/
Set<String> getDiscoveryHosts();
/**
* 根据实例id和主机IP,获取服务IP集合
* @param instanceId
* @param host
* @return
*/
default Set<String> getDiscoveryHosts(String instanceId,String host) {
Set<String> hosts = new HashSet<>();
if (StringUtils.isNoneEmpty(instanceId,host)) {
hosts.add(host);
//docker(bridge网络模式下)内部ip,如172.17.0.2:9001
// 目前仅支持ipV4
if (instanceId.contains(":")) {
String instanceHost = instanceId.split(":")[0];
if (!host.equals(instanceHost)) {
hosts.add(instanceHost);
//增加(bridge网络模式下)的网关ip
hosts.add(getNetGatewayIp(instanceHost));
}
}
}
return hosts;
}
/**
* 获取指定IP的网关ip,一般是对ip尾号修改为1<br/>>
* 仅支持IPv4
*
* @param ipV4Ip
* @return
*/
default String getNetGatewayIp(String ipV4Ip) {
String[] ips = ipV4Ip.split("\\.");
return ips[0] + "." + ips[1] + "." + ips[2] + "." + 1;
}
}服务端服务
java
import com.commnetsoft.core.discovery.DiscoverService;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.google.common.collect.Sets;
import com.netflix.discovery.shared.Application;
import com.netflix.eureka.registry.InstanceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRegisteredEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 服务注册发现管理--面向服务端<br/>
* 如果同时存在EurekaServer/EurekaClient 时优先客户端端
* @author Brack.zhu
* @date 2020/11/5
*/
@Component
@ConditionalOnExpression("#{'eureka'.equals(environment.getProperty('spring.application.name'))}")
public class ServerDiscoverService implements DiscoverService, ApplicationListener<EurekaInstanceRegisteredEvent> {
/**
* 注册服务中的Ip集合,去除重复。
*/
private volatile Set<String> cacheDiscoveryHosts;
private Logger log= LoggerFactory.getLogger(ServerDiscoverService.class);
@PostConstruct
public void init(){
log.info("服务注册发现管理--面向服务端构建完成!{}",ServerDiscoverService.class);
}
@Override
public void onApplicationEvent(EurekaInstanceRegisteredEvent event) {
//注册服务发生的所有事件都将缓存重置
log.debug("EurekaRegistered服务事件:{}",event);
cacheDiscoveryHosts = null;
}
@Override
public Set<String> getDiscoveryHosts() {
if (null != cacheDiscoveryHosts) {
return Sets.newCopyOnWriteArraySet(cacheDiscoveryHosts);
}
InstanceRegistry instanceRegistry = SpringContextUtil.getBean(InstanceRegistry.class);
List<Application> apps = instanceRegistry.getSortedApplications();
Set<String> hosts = new HashSet<>();
if (null != apps) {
apps.forEach(application ->
application.getInstances().forEach(instanceInfo -> {
Set<String> tempHosts = getDiscoveryHosts(instanceInfo.getInstanceId(), instanceInfo.getIPAddr());
hosts.addAll(tempHosts);
}
));
cacheDiscoveryHosts = hosts;
}
log.info("新的白名单服务IP集合:{}",hosts);
return hosts;
}
}客户端接口
java
import com.commnetsoft.core.discovery.DiscoverService;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.google.common.collect.Sets;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEvent;
import com.netflix.discovery.EurekaEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.Set;
/**
* 服务注册发现管理--面向客户端
* 如果同时存在EurekaServer/EurekaClient 时优先客户端端
* 注意:由于存在EurekaClient于服务端数据同步时间差,存在最长
* {eureka.client.registry-fetch-interval-seconds}x2 的时间内是老数据的情况
*
*
* DiscoveryManager.getInstance()
* @author Brack.zhu
* @date 2020/11/5
*/
@Primary
@Component
@ConditionalOnExpression("#{!'eureka'.equals(environment.getProperty('spring.application.name'))}")
public class ClientDiscoverService implements DiscoverService, EurekaEventListener, ApplicationListener<EnvironmentChangeEvent> {
/**
* 注册服务中的Ip集合,去除重复。
*/
private volatile Set<String> cacheDiscoveryHosts;
private Logger log= LoggerFactory.getLogger(ClientDiscoverService.class);
@PostConstruct
public void init(){
log.info("服务注册发现管理--面向客户端构建完成!{}",ClientDiscoverService.class);
}
public ClientDiscoverService() {
registerEurekaClientEventListener();
}
/**
* 注册EurekaClient事件监听
*/
private void registerEurekaClientEventListener(){
EurekaClient eurekaClient = SpringContextUtil.getBean(EurekaClient.class);
eurekaClient.registerEventListener(this);
}
@Override
public Set<String> getDiscoveryHosts() {
if (null != cacheDiscoveryHosts) {
return Sets.newCopyOnWriteArraySet(cacheDiscoveryHosts);
}
DiscoveryClient discoveryClient = SpringContextUtil.getBean(DiscoveryClient.class);
Set<String> hosts = new HashSet<>();
discoveryClient.getServices().forEach(service -> {
discoveryClient.getInstances(service).forEach(instance -> {
Set<String> tempHosts = getDiscoveryHosts(instance.getInstanceId(),instance.getHost());
hosts.addAll(tempHosts);
});
});
cacheDiscoveryHosts = hosts;
log.info("新的白名单服务IP集合:{}",hosts);
return hosts;
}
@Override
public void onEvent(EurekaEvent event) {
//注册服务发生的所有事件都将缓存重置
log.debug("EurekaClient服务事件:{}",event);
cacheDiscoveryHosts = null;
}
@Override
public void onApplicationEvent(EnvironmentChangeEvent event) {
//统一配置导致DiscoveryClient对象重新创建,则需要在新的DiscoveryClient对象重新注册EurekaEvent监听
new Thread(){
@Override
public void run() {
//延迟创建监听---等待DiscoveryClient重载成功
//暂停方案存在不确定性-后续完善
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
//ignore
}
registerEurekaClientEventListener();
}
}.start();
}
}