Skip to content
标签
远程调用
字数
15373 字
阅读时间
82 分钟

介绍

基于websocket开发,添加自定义注解,解析发送的地址、类型(请求或响应)及数据,支持集群

自定义注解及枚举

java

import java.lang.annotation.*;

/**
 * WebSocket 处理方法注解
 *
 *  业务实现样例,参照登录处理
 *
 * @author Brack.zhu
 * @date 2019/12/3
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsAction {

    /**
     * 处理方法 为空取方法名(长度小于20)
     */
    String action();

    /**
     *  接口方法描述
     */
    String desc() default "";

    /**
     * WS操作类型
     * @return
     */
    WsType type();

    /**
     * 权限 默认登录
     */
    WsPermit permit() default WsPermit.LOGIN;

    /**
     * 消息失败处理类---熔断
     * @return
     */
    Class<?> fallback() default void.class;
}

标注在发送的实体类上,发送时自动对请求对象进行封装

java

import java.lang.annotation.*;

/**
 *  WS VO对象 注解
 * @author Brack.zhu
 * @date 2019/12/4
 */
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsData {

    /**
     * 接口方法 为空取方法名(长度小于20)
     */
    String action();
}

权限类型枚举

java

/**
 * WS接口权限
 */
public enum WsPermit {
    //公开
    PUBLIC,
    //登录
    LOGIN,
}

通讯类型枚举

java

/**
 * 请求类型
 * @author Brack.zhu
 * @date 2019/12/10
 */
public enum WsType {
    //请求类型
    req,
    //响应类型
    resp,
}

基础服务

标注在ws请求映射的对应方法上

java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.IErrorCode;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.boot.ReqWsActionOp;
import com.commnetsoft.ws.boot.WsActionBeanPostProcessor;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.service.WsReceiveService;
import com.commnetsoft.ws.service.WsSendHelper;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.*;

/**
 * WebSocket抽象基础服务,对外提供WebSocket服务需要继承该抽象类
 *
 * @author Brack.zhu
 * @date 2019/12/4
 */
public abstract class AbstractWsService {

    /**
     * WS协议 type key
     */
    public final static String WS_TYPE_KEY = "type";

    /**
     * WS协议 ID key
     */
    public final static String WS_ID_KEY = "id";

    /**
     * WS协议 Action key
     */
    public final static String WS_ACTION_KEY = "action";

    /**
     * 登录操作名
     */
    public final static String WS_LOGIN_ACTION_NAME = "login";

    /**
     * 心跳操作名
     */
    public final static String WS_HEARTBEAT_ACTION_NAME = "heartbeat";

    /**
     * 未知操作名
     */
    public final static String WS_UNKNOWN_ACTION_NAME = "unknown";

    private Logger log = LoggerFactory.getLogger(AbstractWsService.class);

    public WsActionBeanPostProcessor wsActionBeanPostProcessor() {
        return SpringContextUtil.getBean(WsActionBeanPostProcessor.class);
    }

    public WsSessionService wsSessionService() {
        return SpringContextUtil.getBean(WsSessionService.class);
    }

    public WsSendHelper wsSendHelper() {
        return SpringContextUtil.getBean(WsSendHelper.class);
    }

    public WsReceiveService wsReceiveService() {
        return SpringContextUtil.getBean(WsReceiveService.class);
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        log.info("新连接创建,{}!", WsUtil.toStr(session));
        //设置最大超时时间--默认30秒
        long maxIdleTimeout = 30000;
        try {
            WsConfig wsConfig = SpringContextUtil.getBean(WsConfig.class);
            if (null != wsConfig) {
                maxIdleTimeout = wsConfig.getWsSessionTimeout();
            }
        } catch (Exception e) {
            log.warn("获取WS会话最大空闲时间参数异常,使用默认值:{}", maxIdleTimeout, e);
        }
        session.setMaxIdleTimeout(maxIdleTimeout);
        //统计
        wsSessionService().getLocal().openIncrement();
        log.warn("OPEN 后 WS会话总数:{}", session.getOpenSessions().size());

        //TODO 需完善管理OPEN后不处理会话;一段时间内未认证则关闭
    }

    /**
     * 收到客户端消息后调用的方法
     * 请求的数据进行响应,响应类型的数据进行解析
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        try {
            log.info("B-S:{},{}", message, WsUtil.toStr(session));
            JSONObject msgJson = JSON.parseObject(message);
            if (null != msgJson) {
                String type = msgJson.getString(WS_TYPE_KEY);
                if (WsType.req.toString().equals(type)) {
                    onReqMessage(msgJson, session);
                } else if (WsType.resp.toString().equals(type)) {
                    onRespMessage(msgJson, session);
                }
            }
        } catch (MicroRuntimeException mre) {
            log.error("WS onMessage MicroRuntimeException {},{}", message, WsUtil.toStr(session), mre);
            WsResp wsResp = buildUnknownWsResp(mre);
            wsSendHelper().sendWsResp(session, wsResp);
        } catch (JSONException jsone) {
            log.error("WS onMessage JSONException(协议错误) {},{}", message, WsUtil.toStr(session), jsone);
            MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_invalid_msg, jsone);
            WsResp wsResp = buildUnknownWsResp(microRuntimeException);
            wsSendHelper().sendWsResp(session, wsResp);
        } catch (Exception e) {
            log.error("WS onMessage Exception {},{}", message, WsUtil.toStr(session), e);
            MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_error, e);
            WsResp wsResp = buildUnknownWsResp(microRuntimeException);
            wsSendHelper().sendWsResp(session, wsResp);
        }
    }

    /**
     * 发生错误时调用
     **/
    @OnError
    public void onError(Session session, Throwable error) {
        log.info("连接异常!,{}", WsUtil.toStr(session), error);
    }

    /**
     * 连接关闭调用的方法,无需调用关闭,调用方后自动触发关闭
     */
    @OnClose
    public void onClose(Session session) {
        String uid = WsUtil.getLoginUid(session);
        if (StringUtils.isNotEmpty(uid)) {
            wsSessionService().removeAuthSession(uid);
            log.info("认证连接关闭,uid:{},{}!", uid, WsUtil.toStr(session));
        } else {
            log.info("未认证连接关闭,{}!", WsUtil.toStr(session));
        }
        //统计
        wsSessionService().getLocal().closeIncrement();
        log.warn("CLOSE 后 WS会话总数:{}", session.getOpenSessions().size());
    }

    /**
     * 接收请求消息
     *
     * @param msgJson
     * @param session
     */
    public void onReqMessage(JSONObject msgJson, Session session) {
        String action = msgJson.getString(WS_ACTION_KEY);
        //获取入参类型
        ReqWsActionOp reqWsActionOp = wsActionBeanPostProcessor().req();
        Class<?> dataVoClazz = reqWsActionOp.getDataVoClazz(action);
        //回调对应实现方法
        WsReq<?> wsReq = WsUtil.toWsReq(msgJson, dataVoClazz);
        WsResp<?> wsResp = null;
        if (!reqWsActionOp.isPub(action) && !WsUtil.isLogin(session)) {
            //非公开方法且未登录返回异常,3秒后关闭会话
            wsResp = WsResp.create(wsReq, WsError.ws_unauthorized);
        } else {
            wsResp = reqWsActionOp.invokeWsAction(wsReq, new LocalWsSession(session));
            if (wsResp.successful() && WS_LOGIN_ACTION_NAME.equals(wsResp.getAction())) {
                //登录处理成功
                wsSessionService().addAuthSession(session);
            }
        }
        wsSendHelper().sendWsResp(session, wsReq, wsResp);
    }

    /**
     * 接收响应数据
     *
     * @param msgJson
     * @param session
     */
    public void onRespMessage(JSONObject msgJson, Session session) throws Exception {
        wsReceiveService().onRespMessage(msgJson, new LocalWsSession(session));
    }


    /**
     * 请求消息转换成WsReq对象
     *
     * @param message
     * @param dataClazz 请求data转换成对象Class
     * @param <T>
     * @return
     */
    public static <T> WsReq<T> toWsReq(String message, Class<T> dataClazz) {
        try {
            JSONObject object = JSON.parseObject(message);
            return WsUtil.toWsReq(object, dataClazz);
        } catch (JSONException je) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
    }


    /**
     * 响应消息转换成WsResp对象
     *
     * @param msgJson
     * @param dataClazz 响应data转换成对象Class
     * @param <T>
     * @return
     */
    public static  <T> WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) {
        return WsUtil.toWsResp(msgJson, dataClazz);
    }

    /**
     * 构建一个未知响应消息,一般在错误协议是使用
     *
     * @param code
     * @param <T>
     * @return
     */
    public static <T> WsResp<T> buildUnknownWsResp(IErrorCode code) {
        WsReq<?> wsReq = new WsReq<>();
        wsReq.setId(UUIDUtils.generate());
        wsReq.setType(WsType.resp);
        wsReq.setAction(WS_UNKNOWN_ACTION_NAME);
        return WsResp.create(wsReq, code);
    }


}

配置

java

/**
 * ws自定义配置<br/>
 *  例如:@ServerEndpoint(value = "/ws/edcall.ws",configurator = CommnetServerEndpointConfig.class)<br/>
 *    指定自定义配置后的WsSession对象中可以获取到请求端IP
 *
 * @author Brack.zhu
 * @date 2020/9/15
 */
public class CommnetServerEndpointConfig extends DefaultServerEndpointConfigurator {

    /**
     * HTTP 头数据 获取请求方IP
     */
    String HEADER_REAL_IP = "X-Real-IP";

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        super.modifyHandshake(sec, request, response);
        String ip=request.getHeaders().get(HEADER_REAL_IP).toString();
        sec.getUserProperties().put(WsUtil.KEY_HTTP_IP, ip);
    }
}

封装获取到的请求消息的实体类

java

import com.commnetsoft.ws.annotation.WsAction;
import com.commnetsoft.ws.annotation.WsType;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;

/**
 * 监听WebSocket协议业务处理组件
 *
 * @author Brack.zhu
 * @date 2019/12/3
 */
@Component
public class WsActionBeanPostProcessor implements BeanPostProcessor {

    private static ReqWsActionOp reqWsActionOp=new ReqWsActionOp();

    private static RespWsActionOp respWsActionOp=new RespWsActionOp();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            WsAction wsAction = AnnotationUtils.findAnnotation(method, WsAction.class);
            if (wsAction == null) {
                continue;
            }
            String actionVal = wsAction.action();
            WsType wsType=wsAction.type();
            WsActionOpData wsActionOpData = new WsActionOpData();
            wsActionOpData.setBean(bean);
            wsActionOpData.setMethod(method);
            wsActionOpData.setWsAction(wsAction);
            if(WsType.req.equals(wsType)){
                reqWsActionOp.put(actionVal, wsActionOpData);
            }else if(WsType.resp.equals(wsType)){
                respWsActionOp.put(actionVal,wsActionOpData);
            }
        }
        return bean;
    }

    public ReqWsActionOp req(){
        return reqWsActionOp;
    }

    public RespWsActionOp resp(){
        return  respWsActionOp;
    }

}

数据实体类

java

import com.commnetsoft.ws.annotation.WsAction;

import java.lang.reflect.Method;

/**
 * WsActionOp相应操作对象
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class WsActionOpData {

    WsAction wsAction;

    Object bean;

    Method method;

    public WsAction getWsAction() {
        return wsAction;
    }

    public void setWsAction(WsAction wsAction) {
        this.wsAction = wsAction;
    }

    public Object getBean() {
        return bean;
    }

    public void setBean(Object bean) {
        this.bean = bean;
    }

    public Method getMethod() {
        return method;
    }

    public void setMethod(Method method) {
        this.method = method;
    }

}

封装处理数据的抽象类

由该类实现 获取数据、获取权限、调用对应方法、对数据封装、熔断。

java

import com.commnetsoft.commons.Result;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsPermit;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;

/**
 * @author Brack.zhu
 * @date 2019/12/10
 */
public abstract class AbstractWsActionOp {

    private transient Logger log = LoggerFactory.getLogger(AbstractWsActionOp.class);

    /**
     * 获取请求类型指定WsActionOp对象
     *
     * @param wsAction
     * @return
     */
    public abstract WsActionOpData getWsActionOpData(String wsAction);

    /**
     * 获取指定WsAction对应的接口权限
     *
     * @param wsAction
     * @return
     */
    public WsPermit getWsPermit(String wsAction) {
        WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
        if (null != wsActionOpData) {
            return wsActionOpData.getWsAction().permit();
        }
        return null;
    }


    /**
     * 指定WsAction对应的接口权限是否公开
     *
     * @param wsAction
     * @return true 公开
     */
    public boolean isPub(String wsAction) {
        WsPermit apiPermit = getWsPermit(wsAction);
        if (null != apiPermit) {
            return WsPermit.PUBLIC.equals(apiPermit) ? true : false;
        }
        return false;
    }


    /**
     * 获取反射方法形参类型数组
     *
     * @param wsAction
     * @return
     */
    public Class<?>[] getArgsClazz(String wsAction) {
        WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
        if (null != wsActionOpData) {
            return wsActionOpData.getMethod().getParameterTypes();
        }
        return new Class<?>[]{};
    }

    /**
     * 获取反射方法形参类型数组,获取失败使用默认Void
     *
     * @param wsAction
     * @return
     */
    public Class<?> getDataVoClazz(String wsAction) {
        Class<?> clazz=getDataVoClazzNull(wsAction);
        if(null==clazz){
            return Void.class;
        }
        return clazz;
    }

    /**
     * 获取反射方法形参类型数组,获取失败使用默认Void
     *
     * @param wsAction
     * @return
     */
    public Class<?> getDataVoClazzNull(String wsAction) {
        Class<?>[] argsClazz = getArgsClazz(wsAction);
        if (null != argsClazz && argsClazz.length >= 1) {
            Class<?> dataVoClazz = argsClazz[0];
            if (!WsSession.class.isAssignableFrom(dataVoClazz)) {
                return dataVoClazz;
            }
        }
        return null;
    }


    /**
     * 反射对应方法
     * @param data
     * @param wsSession
     * @param action
     * @param <T>
     * @param <D>
     * @return
     */
    public <T, D> Result<T> invokeWsActionMethod(D data, WsSession wsSession, String action) {
        Result<T> rs = null;
        try {
            WsActionOpData wsActionOp = getWsActionOpData(action);
            if(null!=wsActionOp){
                int parsLength = wsActionOp.getMethod().getParameterTypes().length;
                if (1 == parsLength) {
                    rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), wsSession);
                } else if (2 == parsLength) {
                    rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), data, wsSession);
                } else {
                    log.error("WsAction 处理方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, wsSession);
                    return Result.create(WsError.ws_action_error);
                }
            }else{
                log.error("WsAction 处理方法未找到{},{}", data, wsSession);
                return Result.create(WsError.ws_action_notfound);
            }
        } catch (MicroRuntimeException mre) {
            log.error("WsAction 处理方法反射异常,{},{}", data, wsSession, mre);
            return Result.create(mre);
        } catch (Exception e) {
            log.error("WsAction 处理方法反射异常,{},{}", data, wsSession, e);
            return Result.create(WsError.ws_action_error);
        }
        return rs;
    }


    /**
     * 反射对应失败熔断方法
     * @param data
     * @param wsSession
     * @param action
     * @param <D>
     * @return
     */
    public <D> void invokeWsActionFallbackMethod(D data, WsSession wsSession, String action) {
        try {
            WsActionOpData wsActionOp = getWsActionOpData(action);
            if(null!=wsActionOp){
                Class<?> fallbackClazz=wsActionOp.getWsAction().fallback();
                if(null==fallbackClazz){
                    return ;
                }
                Object fallbackBean=SpringContextUtil.getBean(fallbackClazz);
                if(null==fallbackBean){
                    return;
                }
                Method method=wsActionOp.getMethod();
                Class<?>[] paraClass= method.getParameterTypes();
                Method fallbackMethod=fallbackClazz.getMethod(method.getName(),paraClass);
                if(null==fallbackMethod){
                    return;
                }
                Result<?> rs = null;
                int parsLength = paraClass.length;
                if (1 == parsLength) {
                    rs=(Result<?>)fallbackMethod.invoke(fallbackBean,wsSession);
                } else if (2 == parsLength) {
                    rs = (Result<?>) fallbackMethod.invoke(fallbackBean, data, wsSession);
                } else {
                    log.error("WsAction 处理失败熔断方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, wsSession);
                }
                if(!rs.successful()) {
                    log.error("WsAction 处理失败熔断方法反射结果:{},{},{}",rs,data,wsSession);
                }
            }else{
                log.error("WsAction 处理失败熔断方法未找到{},{}", data, wsSession);
            }
        } catch (MicroRuntimeException mre) {
            log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, wsSession, mre);
        } catch (Exception e) {
            log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, wsSession, e);
        }
    }

}

请求方式实现类

java

import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 请求类型操作
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class ReqWsActionOp extends AbstractWsActionOp {

    private transient Logger log = LoggerFactory.getLogger(ReqWsActionOp.class);

    /**
     * WsAction 请求类型相关数据
     */
    private final transient Map<String, WsActionOpData> wsReqActions = new ConcurrentHashMap<>(256);

    public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
        return wsReqActions.put(wsAction, wsActionOpData);
    }

    public WsActionOpData remove(String wsAction) {
        return wsReqActions.remove(wsAction);
    }

    @Override
    public WsActionOpData getWsActionOpData(String wsAction) {
        return wsReqActions.get(wsAction);
    }

    /**
     * 调用WsAction对应方法
     * @param req
     * @param wsSession
     * @param <T>
     * @param <V>
     * @return
     */
    public <T,V> WsResp<T> invokeWsAction(WsReq<V> req, WsSession wsSession){
        String action = req.getAction();
        V reqData = req.getData();
        Result<T> rs=invokeWsActionMethod(reqData,wsSession,action);
        if(!rs.successful()){
            log.error("WsAction 处理方法反射错误{},{},{}",req,wsSession,rs);
        }
        return WsResp.create(req,rs);
    }


    /**
     * 调用WsAction对应失败熔断方法
     * @param req
     * @param wsSession
     * @param <V>
     * @return
     */
    public <V> void invokeWsActionFallback(WsReq<V> req, WsSession wsSession){
        String action = req.getAction();
        V reqData = req.getData();
        invokeWsActionFallbackMethod(reqData,wsSession,action);
    }
}

响应方式实现

java

import com.commnetsoft.commons.Result;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsReceiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 响应类型操作
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class RespWsActionOp extends AbstractWsActionOp {

    private transient Logger log = LoggerFactory.getLogger(RespWsActionOp.class);

    /**
     * WsAction 响应类型相关数据
     */
    private final transient Map<String, WsActionOpData> wsRespActions = new ConcurrentHashMap<>(256);

    public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
        return wsRespActions.put(wsAction, wsActionOpData);
    }

    public WsActionOpData remove(String wsAction) {
        return wsRespActions.remove(wsAction);
    }

    @Override
    public WsActionOpData getWsActionOpData(String wsAction) {
        return wsRespActions.get(wsAction);
    }

    /**
     * 调用WsAction对应方法
     * @param resp
     * @param wsSession
     * @param <T>
     * @param <V>
     */
    public <T,V> void invokeWsAction(WsResp<V> resp, WsSession wsSession){
        String action = resp.getAction();
        Result<T> rs=invokeWsActionMethod(resp,wsSession,action);
        if(!rs.successful()){
            log.error("WsAction 处理方法反射错误{},{},{}",resp,wsSession,rs);
        }
    }

    /**
     * 通用实现响应
     * 响应对请求响应锁减一  唤醒响应线程
     * @param <V>
     * @param resp
     * @return
     */
    public <V> void generalRespAction(WsResp<V> resp) {
        WsReceiveService wsReceiveService= SpringContextUtil.getBean(WsReceiveService.class);
        wsReceiveService.respCountDownLatch(resp);
    }

    /**
     * 重写
     */
    @Override
    public <T, D> Result<T> invokeWsActionMethod(D data, WsSession wsSession,String action) {
        WsActionOpData wsActionOp = getWsActionOpData(action);
        Result<T> result=null;
        if(null!=wsActionOp) {
            //自定义
            WsResp<?> resp=( WsResp<?>)data;
            result=super.invokeWsActionMethod(resp.getReqdata(),wsSession, action);
        }
        //使用通用实现
        generalRespAction((WsResp<?>)data);
        if(null==result){
            result=Result.create();
        }
        return result;
    }


    /**
     * 调用WsAction对应失败熔断方法
     * @param resp
     * @param wsSession
     * @param <V>
     * @return
     */
    public <V> void invokeWsActionFallback(WsResp<V> resp, WsSession wsSession){
        String action = resp.getAction();
        V respData = resp.getResult();
        invokeWsActionFallbackMethod(respData,wsSession,action);
    }
}

WS消息发送响应结果处理器

java

import com.alibaba.fastjson.JSON;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;

/**
 * WS消息发送响应结果处理器
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class WsSendHandler<T,V> implements SendHandler {

    private WsReq<T>  req;

    private WsResp<V> resp;

    private Session session;

    private static Logger log = LoggerFactory.getLogger(WsSendHandler.class);

    private WsSendHandler() {
    }

    public WsSendHandler(WsReq<T> req,WsResp<V> resp, Session session) {
        this.req=req;
        this.resp=resp;
        this.session=session;
    }

    @Override
    public void onResult(SendResult result) {
        if(result.isOK()) {
            //成功
            log.debug("S-B:{},{}", JSON.toJSONString(resp), WsUtil.toStr(session));
        }else {
            //失败熔断
            log.debug("S-B:失败-req:{},resp:{},{}",JSON.toJSONString(req),JSON.toJSONString(resp), WsUtil.toStr(session));
            WsActionBeanPostProcessor wsActionBeanPostProcessor=SpringContextUtil.getBean(WsActionBeanPostProcessor.class);
            if(null==wsActionBeanPostProcessor){
                log.warn("WsActionBeanPostProcessor 对象获取失败!");
                return;
            }
            String uid=WsUtil.getLoginUid(session);
            WsSessionService wsSessionService=SpringContextUtil.getBean(WsSessionService.class);
            WsSession wsSession=wsSessionService.getLocal().getSession(uid);
            if(null==wsSession){
                wsSession=new LocalWsSession(session);
            }
            wsActionBeanPostProcessor.req().invokeWsActionFallback(req,wsSession);
        }
    }

}

会话管理

会话相关实体类

消息接口

java
/**
 * @author Brack.zhu
 * @date 2020/12/14
 */
public interface WsMsg {
}

请求消息模型

java

import com.commnetsoft.ws.annotation.WsType;

/**
 * 请求消息模型
 * @author Brack.zhu
 * @date 2019/12/4
 */
public class WsReq<T> implements WsMsg {

    private String id;

    private WsType type = WsType.req;

    private String action;

    private T data;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public WsType getType() {
        return type;
    }

    public void setType(WsType type) {
        this.type = type;
    }
}

响应消息模型

java

import com.commnetsoft.commons.IErrorCode;
import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.annotation.WsType;

import java.util.Objects;

/**
 * 响应消息模型
 * @author Brack.zhu
 * @date 2019/12/4
 */
public class WsResp<T> extends Result<T> implements WsMsg {

    private String id;

    private WsType type= WsType.resp;

    private String action;

    private Object reqdata;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public Object getReqdata() {
        return reqdata;
    }

    public void setReqdata(Object reqdata) {
        this.reqdata = reqdata;
    }

    public WsType getType() {
        return type;
    }

    public void setType(WsType type) {
        this.type = type;
    }

    /**
     * 创建一个正确的数据返回结果
     *
     * @param result 数据
     */
    public static <T,V> WsResp<T> create(WsReq<V> req,Result<T> result) {
        WsResp<T> rt = new WsResp<>();
        rt.setId(req.getId());
        rt.setAction(req.getAction());
        rt.setCode(result.getCode());
        rt.setMessage(result.getMessage());
        rt.setDesc(result.getDesc());
        rt.setResult(result.getResult());
        return rt;
    }

    /**
     * 创建一个错误的返回结果
     *
     * @param code 错误信息
     */
    public static <T, V> WsResp<T> create(WsReq<V> req, IErrorCode code) {
        Objects.requireNonNull(code, "错误的返回结果错误码不能为空");
        WsResp<T> rt = new WsResp<>();
        rt.setCode(code.getCode());
        rt.setMessage(code.getMessage());
        rt.setDesc(code.getDesc());
        rt.setId(req.getId());
        rt.setAction(req.getAction());
        rt.setReqdata(req.getData());
        return rt;
    }

}

请求响应闭锁

继承CountDownLatch,构造器传入一个值,当该值为0时其中方法才会执行。

即请求发出后,得到响应后值会减一,然后执行方法。若一直未响应则有超时时间

java

import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.WsConfig;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * 请求响应闭锁
 * @author Brack.zhu
 * @date 2020年11月19日
 */
public class ReqRespCountDownLatch extends CountDownLatch {

	private WsResp<?> resp;

	private Class<?> respDataType;

	public ReqRespCountDownLatch(Class<?> respDataType) {
		super(1);
		this.respDataType=respDataType;
	}
	
	/**
	 * 同步等待请求响应
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> WsResp<T> awaitResp() {
		try {
			WsConfig wsConfig = SpringContextUtil.getBean(WsConfig.class);
			long awaitTime= wsConfig.getWsSyncSendTimeout();
			super.await(awaitTime, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			//ignore
		}
		return (WsResp<T>)getResp();
	}

	public WsResp<?> getResp() {
		return resp;
	}

	public void setResp(WsResp<?> resp) {
		this.resp = resp;
	}

	public Class<?> getRespDataType() {
		return respDataType;
	}
}

session接口

java

import com.commnetsoft.ws.service.comm.CommunicateType;

/**
 * WS模块会话对象
 *
 * @author Brack.zhu
 * @date 2020/12/24
 */
public abstract class WsSession {

    private CommunicateType type;

    private String uid;

    public WsSession(CommunicateType type) {
        this.type = type;
    }

    /**
     * 是否认证成功的
     *
     * @return
     */
    public abstract boolean isAuthed();


    public CommunicateType getType() {
        return type;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }


}

本地session

java

import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.service.comm.CommunicateType;
import com.commnetsoft.ws.util.WsUtil;

import javax.websocket.Session;

/**
 * 本地会话对象
 * @author Brack.zhu
 * @date 2020/12/24
 */
public class LocalWsSession extends WsSession {

    private Session session;

    public LocalWsSession(Session session) {
        super(CommunicateType.Local);
        this.session = session;
        String uid = WsUtil.getLoginUid(session);
        super.setUid(uid);
    }

    public Session getSession() {
        return session;
    }

    @Override
    public boolean isAuthed() {
        return StringUtils.isNotBlank(getUid()) ? true : false;
    }
}

集群session

java

import com.commnetsoft.commons.utils.EncryptUtils;
import com.commnetsoft.ws.service.comm.CommunicateType;

import java.math.BigInteger;


/**
 * 集群会话对象<br/>
 *
 * @author Brack.zhu
 * @date 2020/12/24
 */
public class ClusterWsSession extends WsSession {

    /**
     * 用户连接所在服务器实例id
     */
    private String serverInstanceId;

    public ClusterWsSession(String uid,String serverInstanceId) {
        super(CommunicateType.Cluster);
        setUid(uid);
        setServerInstanceId(serverInstanceId);
    }


    @Override
    public boolean isAuthed() {
        return true;
    }

    public String getServerInstanceId() {
        return serverInstanceId;
    }

    public void setServerInstanceId(String serverInstanceId) {
        this.serverInstanceId = serverInstanceId;
    }

    @Override
    public int hashCode() {
        String str = this.getUid() + this.getServerInstanceId();
        String md5 = EncryptUtils.MD5.encrypt(str);
        BigInteger bigInteger = new BigInteger(md5.getBytes());
        return bigInteger.intValue();
    }

    @Override
    public boolean equals(Object obj) {
        if(obj instanceof ClusterWsSession){
            ClusterWsSession clusterWsSession=(ClusterWsSession)obj;
            String objStr = clusterWsSession.getUid() + clusterWsSession.getServerInstanceId();
            String str = this.getUid() + this.getServerInstanceId();
            if(objStr.equals(str)){
                return true;
            }
        }
        return false;
    }
}

Session管理

包含增加、获取、移除会话的方法。本地+集群

java

import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.ws.model.ClusterWsSession;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.session.ClusterWsSessionService;
import com.commnetsoft.ws.service.session.LocalWsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.websocket.Session;

/**
 * WS会话管理服务
 *
 * @author Brack.zhu
 * @date 2019/12/4
 */
@Service
public class WsSessionService {

    @Autowired
    private LocalWsSessionService localWsSessionService;

    @Autowired
    private ClusterWsSessionService clusterWsSessionService;


    /**
     * 增加验证成功会话
     *
     * @return
     */
    public boolean addAuthSession(Session session) {
        if (localWsSessionService.addSession(new LocalWsSession(session))) {
            clusterWsSessionService.addSession(new ClusterWsSession(WsUtil.getLoginUid(session), ApplicationUtil.getInstanceId()));
            return true;
        }
        return false;
    }

    /**
     * 移除指定用户验证会话对象
     *
     * @param uid
     * @return
     */
    public WsSession removeAuthSession(String uid) {
        WsSession localWsSession = localWsSessionService.removeSession(uid);
        WsSession clusterWsSession = clusterWsSessionService.removeSession(uid);
        return null != localWsSession ? localWsSession : clusterWsSession;
    }

    /**
     * 获取指定用户验证会话对象
     *
     * @param uid
     * @return
     */
    public WsSession getAuthSession(String uid) {
        WsSession wsSession = localWsSessionService.getSession(uid);
        if (null != wsSession) {
            return wsSession;
        }
        return clusterWsSessionService.getSession(uid);
    }


    public LocalWsSessionService getLocal() {
        return localWsSessionService;
    }

    public ClusterWsSessionService getCluster() {
        return clusterWsSessionService;
    }
}

Session管理抽象类

java

import com.commnetsoft.ws.model.WsSession;

/**
 * 会话管理抽象类
 * @author Brack.zhu
 * @date 2020/12/28
 */
public abstract class AbstractWsSessionService {

    /**
     * 增加会话
     * @param wsSession
     * @return
     */
    public abstract boolean addSession(WsSession wsSession);

    /**
     * 获取指定会话
     * @param uid
     * @return
     */
    public abstract WsSession getSession(String uid);

    /**
     * 删除指定会话
     * @param uid
     * @return
     */
    public abstract WsSession removeSession(String uid);


}

本地session管理

添加成功后通过mq发送到集群添加集群session

java

import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.mq.AmqpHelper;
import com.commnetsoft.ws.WsSessionEvents;
import com.commnetsoft.ws.WsSessionType;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.model.WsSessionChangeMsg;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 本地会话管理实现
 * @author Brack.zhu
 * @date 2020/12/28
 */
@Component
public class LocalWsSessionService extends AbstractWsSessionService {


    /**
     * 认证会话关系<br/>
     * 网络IO对象只能存储在本地
     * key:uid
     */
    private static ConcurrentHashMap<String, LocalWsSession> authUidSessionMap = new ConcurrentHashMap();

    /**
     * 认证会话关系<br/>
     * key:id
     * val:uid
     */
    private static ConcurrentHashMap<String, String> authIdUidMap = new ConcurrentHashMap();

    /**
     * 打开会话总数
     */
    private AtomicLong openCount = new AtomicLong(0);

    /**
     * 关闭会话总数
     */
    private AtomicLong closeCount = new AtomicLong(0);

    @Autowired
    AmqpHelper amqpHelper;

    private static Logger log = LoggerFactory.getLogger(LocalWsSessionService.class);

    /**
     * 会话打开统计数增加1
     *
     * @return
     */
    public long openIncrement() {
        long rs = openCount.incrementAndGet();
        //统计日志输出
        sessionInfo2Log();
        return rs;
    }

    /**
     * 获取会话打开统计数
     *
     * @return
     */
    public long getOpenCount() {
        return openCount.get();
    }

    /**
     * 会话关闭统计数增加1
     *
     * @return
     */
    public long closeIncrement() {
        long rs = closeCount.incrementAndGet();
        //统计日志输出
        sessionInfo2Log();
        return rs;
    }

    /**
     * 获取关闭会话统计数
     *
     * @return
     */
    public long getCloseCount() {
        return closeCount.get();
    }

    /**
     * 获取认证成功的会话数
     *
     * @return
     */
    public long getAuthCount() {
        return authUidSessionMap.mappingCount();
    }

    @Override
    public synchronized boolean addSession(WsSession wsSession) {
        LocalWsSession localWsSession=(LocalWsSession)wsSession;
        String uid = wsSession.getUid();
        if (StringUtils.isNotEmpty(uid)) {
            log.info("增加认证成功会话:{},{}", uid, wsSession);
            //是否存在老的会话
            LocalWsSession oldSession = (LocalWsSession)getSession(uid);
            if (null != oldSession && !localWsSession.getSession().equals(oldSession.getSession())) {
                removeSession(uid);
                log.warn("{}新认证Ws会话加入{},强制关闭老的会话:{}", uid, wsSession, oldSession);
                WsUtil.closeSession(oldSession.getSession());
            }
            //认证对象
            authUidSessionMap.put(uid, localWsSession);
            String sessionId=localWsSession.getSession().getId();
            authIdUidMap.put(sessionId, uid);
            //发送WS会话有效消息
            WsSessionChangeMsg wsSessionChangeMsg = new WsSessionChangeMsg();
            wsSessionChangeMsg.setType(WsSessionType.VALID);
            wsSessionChangeMsg.setId(sessionId);
            wsSessionChangeMsg.setIdmId(uid);
            amqpHelper.sendToExchange(WsSessionEvents.WS_SESSION_CHANGE, wsSessionChangeMsg);
            //统计日志输出
            sessionInfo2Log();
            return true;
        } else {
            log.error("WsSession无用户UID信息,强制关闭会话:{}", wsSession);
            WsUtil.closeSession(localWsSession.getSession());
        }
        return false;
    }

    public WsSession getSessionById(String sessionId) {
        String uid = authIdUidMap.get(sessionId);
        if (StringUtils.isBlank(uid)) {
            return null;
        }
        return getSession(uid);
    }

    @Override
    public WsSession getSession(String uid) {
        return authUidSessionMap.get(uid);
    }

    @Override
    public synchronized WsSession removeSession(String uid) {
        LocalWsSession session = authUidSessionMap.remove(uid);
        if (null == session) {
            return null;
        }
        authIdUidMap.remove(session.getSession().getId());
        //发送WS会话无效消息
        WsSessionChangeMsg wsSessionChangeMsg = new WsSessionChangeMsg();
        wsSessionChangeMsg.setType(WsSessionType.INVALID);
        wsSessionChangeMsg.setId(session.getSession().getId());
        wsSessionChangeMsg.setIdmId(uid);
        amqpHelper.sendToExchange(WsSessionEvents.WS_SESSION_CHANGE, wsSessionChangeMsg);
        return session;
    }

    /**
     * 会话统计输出到日志信息
     *
     * @return
     */
    public void sessionInfo2Log() {
        log.info("WS 会话当前统计 open:{},close:{},atuh:{}", getOpenCount(), getCloseCount(), getAuthCount());
    }
}

集群session服务 通过一定规则构建key,并将key和服务的instanceid存入redis

获取通过key获取instanceid

java

import com.commnetsoft.core.CoreConstant;
import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.model.ClusterWsSession;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author Brack.zhu
 * @date 2020/12/28
 */
@Component
public class ClusterWsSessionService extends AbstractWsSessionService {

    @Value("#{coreConfig.applicationName}")
    private String applicationName;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private WsConfig wsConfig;

    /**
     * 缓存有效期延长时间--毫秒
     */
    private long cacheExpireLengthen = 30000;

    /**
     * 本地保存到集群缓存的用户集合
     * key:uid
     */
    private static Set<String> clusterUidsSets = Collections.synchronizedSet(new HashSet<>());

    private static Logger log = LoggerFactory.getLogger(ClusterWsSessionService.class);

    @PostConstruct
    public void postConstruct() {
        //启动集群会话维持看门狗
        new Thread("ClusterWsSessionServiceLockWatchDogThread") {
            @Override
            public void run() {
                while (true) {
                    try {
                        long wsClusterSessionKeepInterval=wsConfig.getWsClusterSessionKeepInterval();
                        Thread.sleep(wsClusterSessionKeepInterval);
                    } catch (InterruptedException e) {
                        //ignore
                    }
                    long start = System.currentTimeMillis();
                    extensionAllClusterSessionCacheExpire();
                    long end = System.currentTimeMillis();
                    long opTime = end - start;
                    if (opTime > cacheExpireLengthen) {
                        log.warn("缓存延续操作时间大于默认缓存有效期延长时间,为防止缓存被回收将cacheExpireLengthen({})值更新为:{}", cacheExpireLengthen, opTime);
                        cacheExpireLengthen = opTime;
                    }
                }
            }
        }.start();
    }

    @Override
    public boolean addSession(WsSession wsSession) {
        String userId = wsSession.getUid();
        if (addClusterSession(userId)) {
            clusterUidsSets.add(userId);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public WsSession getSession(String uid) {
        String clusterCacheKey = buildClusterSessionKey(uid);
        try {
            Object valObj = redisTemplate.opsForValue().get(clusterCacheKey);
            if(null!=valObj){
                ClusterWsSession wsSession = new ClusterWsSession(uid,String.valueOf(valObj));
                return wsSession;
            }
        } catch (Exception e) {
            log.error("获取WS集群环境中的会话连接实例信息异常,{}", clusterCacheKey, e);
        }
        return null;
    }

    @Override
    public WsSession removeSession(String uid) {
        clusterUidsSets.remove(uid);
        WsSession wsSession = getSession(uid);
        if (null != wsSession) {
            String clusterCacheKey = buildClusterSessionKey(uid);
            try {
                redisTemplate.delete(clusterCacheKey);
            } catch (Exception e) {
                log.error("移除集群环境缓存中的WS会话连接实例信息异常,{}", clusterCacheKey, e);
            }
        }
        return wsSession;
    }

    /**
     * 构建WS会话连接信息集群缓存KEY
     *
     * @param userid
     * @return
     */
    public String buildClusterSessionKey(String userid) {
        //key:项目名:wsClusterSessionInstance:userid:
        String mark = CoreConstant.Cache.REDIS_KEY_MARK;
        return applicationName + mark + "wsClusterSessionInstance" + mark + userid;
    }

    /**
     * 获取集群回去缓存有效期--单位毫秒
     *
     * @return
     */
    public long getClusterSessionCacheExpire() {
        return wsConfig.getWsClusterSessionKeepInterval() + cacheExpireLengthen;
    }

    /**
     * 延续所有集群会话缓存有效期
     */
    private void extensionAllClusterSessionCacheExpire() {
        Iterator<String> uids = clusterUidsSets.iterator();
        while (uids.hasNext()) {
            String uid = uids.next();
            addClusterSession(uid);
        }
    }

    /**
     * 将WS会话连接实例信息增加到集群环境缓存中
     *
     * @param userid
     */
    private boolean addClusterSession(String userid) {
        String clusterCacheKey = buildClusterSessionKey(userid);
        try {
            redisTemplate.opsForValue().set(clusterCacheKey, ApplicationUtil.getInstanceId());
            //设置有效期---集群会话看门狗维持
            redisTemplate.expire(clusterCacheKey, getClusterSessionCacheExpire(), TimeUnit.MILLISECONDS);
            return true;
        } catch (Exception e) {
            log.error("将WS会话连接对应实例信息增加到集群环境缓存中异常,{},{}", clusterCacheKey, ApplicationUtil.getInstanceId(), e);
        }
        return false;
    }

}

消息发送服务类

消息分发器,获取要发送消息的类型,发送普通消息、同步消息、响应消息及控制发送同步消息时接收到响应时取消线程阻塞

java

import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.locks.ReentrantLock;

/**
 * WS消息分发器
 *
 * @author Brack.zhu
 * @date 2020/12/9
 */
@Component
public class WsDispatch {

    @Autowired
    private LocalWsCommunicate localWsCommunicate;

    @Autowired
    private ClusterWsCommunicate clusterWsCommunicate;

    @Autowired
    private WsSessionService wsSessionService;

    @Autowired
    private WsConfig wsConfig;

    private Logger log = LoggerFactory.getLogger(WsDispatch.class);

    /**
     * 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除-
     * 没有线程同步
     */

    private final ReentrantLock cdlLock = new ReentrantLock();

    /**
     * key:msgid
     */
    private PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCdls;


    @PostConstruct
    public void postConstruct() {
        long wsSyncSendTimeout = wsConfig.getWsSyncSendTimeout();
        cdlLock.lock();
        try {
            long timeToLiveMillis = wsSyncSendTimeout + 30000;
            log.info("请求响应消息CDL闭锁关系MAP有效期时间调整为:{}",timeToLiveMillis);
            PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCDLsTemp = new PassiveExpiringMap<>(timeToLiveMillis);
            if (null != reqRespCdls) {
                reqRespCDLsTemp.putAll(reqRespCdls);
                log.info("请求响应消息CDL闭锁关系MAP迁移数:{}",reqRespCdls.size());
            }
            reqRespCdls = reqRespCDLsTemp;
        } catch (Exception e) {
            log.error("", e);
        } finally {
            cdlLock.unlock();
        }
    }



    public ReqRespCountDownLatch getReqRespCdl(String key) {
        return reqRespCdls.get(key);
    }

    public void putReqRespCdl(String key, ReqRespCountDownLatch reqRespCdl) {
        cdlLock.lock();
        try {
            reqRespCdls.put(key, reqRespCdl);
        } catch (Exception e) {
            log.error("", e);
        } finally {
            cdlLock.unlock();
        }
    }

    public ReqRespCountDownLatch removeReqRespCdl(String key) {
        cdlLock.lock();
        try {
            return reqRespCdls.remove(key);
        } catch (Exception e) {
            log.error("", e);
        } finally {
            cdlLock.unlock();
        }
        return null;
    }

    /**
     * 参数变更事件监听
     *
     * @param envChangeEvent
     */
    @EventListener
    public void EnvironmentChangeEventListener(EnvironmentChangeEvent envChangeEvent) {
        if (envChangeEvent.getKeys().contains("ws.sync.send.timeout")) {
            postConstruct();
        }
    }

    /**
     * 响应请求
     * 将请求响应闭锁线程推出阻塞
     * 根据id唤醒同步请求并返回结果
     *
     * @param resp
     * @param <V>
     */
    public <V> void respCountDownLatch(WsResp<V> resp) {
        ReqRespCountDownLatch cdl = removeReqRespCdl(resp.getId());
        if (null != cdl) {
            cdl.setResp(resp);
            cdl.countDown();
        }
    }

    /**
     * 发送请求消息<br/>
     * 支持集群
     *
     * @param uid 用户uid
     * @param req 请求消息
     * @param <T> 请求消息Data 模型
     * @throws MicroRuntimeException
     */
    public <T> void sendWsReq(String uid, WsReq<T> req) throws MicroRuntimeException {
        CommunicateType commType = getSessionCommType(uid);
        if (CommunicateType.Local.equals(commType)) {
            localWsCommunicate.sendWsReq(uid, req);
        } else if (CommunicateType.Cluster.equals(commType)) {
            clusterWsCommunicate.sendWsReq(uid, req);
        } else {
            throw new MicroRuntimeException(WsError.ws_error, "无效通讯类型" + commType);
        }

    }

    /**
     * 发送同步请求消息<br/>
     *支持集群
     * @param uid
     * @param req
     * @param <T>
     * @throws MicroRuntimeException
     */
    public <T, R> WsResp<R> sendAndReceiveWsReq(String uid, WsReq<T> req, Class<R> respDataType) throws MicroRuntimeException {
        CommunicateType commType = getSessionCommType(uid);
        if (CommunicateType.Local.equals(commType)) {
            ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
            String id = req.getId();
            putReqRespCdl(id, cdl);
            localWsCommunicate.sendWsReq(uid, req);
            return cdl.awaitResp();
        } else if (CommunicateType.Cluster.equals(commType)) {
            ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
            String id = req.getId();
            putReqRespCdl(id, cdl);
            clusterWsCommunicate.sendWsReq(uid, req);
            return cdl.awaitResp();
        } else {
            throw new MicroRuntimeException(WsError.ws_session_notfound, "无效通讯类型" + commType);
        }
    }

    /**
     * 发送响应消息<br/>
     * 不支持集群发送响应消息
     *
     * @param uid
     * @param resp
     * @param <T>
     * @throws MicroRuntimeException
     */
    public <T> void sendWsResp(String uid, WsResp<T> resp) throws MicroRuntimeException {
        localWsCommunicate.sendWsResp(uid, resp);
    }

    /**
     * 获取会话通讯类型
     *
     * @param uid
     * @return
     */
    public CommunicateType getSessionCommType(String uid) {
        WsSession session = wsSessionService.getAuthSession(uid);
        if (session instanceof LocalWsSession) {
            return CommunicateType.Local;
        } else if (session instanceof ClusterWsSession) {
            return CommunicateType.Cluster;
        }
        throw new MicroRuntimeException(WsError.ws_session_notfound);
    }


    public LocalWsCommunicate getLocal() {
        return localWsCommunicate;
    }

    public ClusterWsCommunicate getCluster() {
        return clusterWsCommunicate;
    }

}

服务类型枚举 本地/集群

java

/**
 * @author Brack.zhu
 * @date 2020/12/24
 */
public enum CommunicateType {

    Local,
    Cluster,
    ;
}

本地通讯服务类

本地会话的获取和发送消息

java

import com.alibaba.fastjson.JSON;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.Session;

/**
 * WS本地通讯
 *
 * @author Brack.zhu
 * @date 2020/12/9
 */
@Component
public class LocalWsCommunicate implements IWsCommunicate {

    @Autowired
    private WsSessionService wsSessionService;

    private Logger log = LoggerFactory.getLogger(LocalWsCommunicate.class);

    @Override
    public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
        Session session = getSession(uid);
        sendWsReq(session, wsReq);
    }


    public <T> void sendWsReq(Session session, WsReq<T> wsReq) throws MicroRuntimeException {
        if (null == wsReq) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
        WsUtil.checkSession(session);
        try {
            String text = JSON.toJSONString(wsReq);
            WsUtil.sendText(session, text);
        } catch (Exception e) {
            log.error("S-B 请求消息异常:{},{},{}", WsUtil.getLoginUid(session), session, wsReq, e);
            throw new MicroRuntimeException(WsError.ws_session_send_req_error, e);
        }
    }


    public <T> void sendWsResp(String uid, WsResp<T> wsResp) throws MicroRuntimeException {
        Session session = getSession(uid);
        sendWsResp(session, wsResp);
    }

    public <T> void sendWsResp(Session session, WsResp<T> wsResp) throws MicroRuntimeException {
        if (null == wsResp) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
        WsUtil.checkSession(session);
        try {
            String text = JSON.toJSONString(wsResp);
            WsUtil.sendText(session, text);
        } catch (Exception e) {
            log.error("S-B 响应消息异常:{},{},{}", WsUtil.getLoginUid(session), session, wsResp, e);
            throw new MicroRuntimeException(WsError.ws_session_send_resp_error, e);
        }
    }

    /**
     * 根据用户UID获取本地WS会话对象
     *
     * @param uid
     * @return
     */
    private Session getSession(String uid) {
        WsSession wsSession = wsSessionService.getLocal().getSession(uid);
        if (wsSession instanceof LocalWsSession) {
            LocalWsSession localWsSession = (LocalWsSession) wsSession;
            return localWsSession.getSession();
        }
        return null;
    }

}

集群会话service 集群会话的获取和发送消息

集群发送消息通过mq发送到特定的服务里,由特定服务响应

java

import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.mq.AmqpHelper;
import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.WsSessionEvents;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * 集群WS通讯
 *
 * @author Brack.zhu
 * @date 2020/12/9
 */
@Component
public class ClusterWsCommunicate implements IWsCommunicate {

    @Autowired
    private WsSessionService wsSessionService;

    @Autowired
    private AmqpHelper amqpHelper;


    private Logger log = LoggerFactory.getLogger(ClusterWsCommunicate.class);


    @Override
    public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
        if (null == wsReq) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
        String serviceInstanceId = getServerInstanceId(uid);
        if (StringUtils.isBlank(serviceInstanceId)) {
            throw new MicroRuntimeException(WsError.ws_session_notfound);
        }
        try {
            WsClusterMsg wsClusterReqMsg = new WsClusterMsg();
            wsClusterReqMsg.setUid(uid);
            wsClusterReqMsg.setMsg(wsReq);
            wsClusterReqMsg.setFrom(ApplicationUtil.getInstanceId());
            if (null != wsReq.getData()) {
                wsClusterReqMsg.setDataClassType(wsReq.getData().getClass());
            }
            amqpHelper.sendToExchange(buildMqRoutingKey(serviceInstanceId), wsClusterReqMsg);
        } catch (Exception e) {
            log.error("S-S 请求消息异常:{},{},{}", uid, serviceInstanceId, wsReq, e);
            throw new MicroRuntimeException(WsError.ws_session_send_req_error, "集群发送请求消息失败", e);
        }
    }

    /**
     * 响应消息集群环境中同步 <br/>
     * AS--->BS--req-->C--resp->BS--sync-->AS
     * @param uid
     * @param respJson
     * @param serviceInstanceId
     * @param <T>
     * @throws MicroRuntimeException
     */
    public <T> void wsRespClusterSync(String uid,JSONObject respJson,String serviceInstanceId) throws MicroRuntimeException {
        if (null == respJson) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
        try {
            WsClusterMsg wsClusterRespMsg = new WsClusterMsg();
            wsClusterRespMsg.setUid(uid);
            wsClusterRespMsg.setMsg(respJson);
            wsClusterRespMsg.setFrom(ApplicationUtil.getInstanceId());
            amqpHelper.sendToExchange(buildMqRoutingKey(serviceInstanceId), wsClusterRespMsg);
        } catch (Exception e) {
            log.error("S-S 响应消息异常:{},{},{}", uid, serviceInstanceId, respJson, e);
            throw new MicroRuntimeException(WsError.ws_session_send_req_error, "集群发送响应消息失败", e);
        }
    }

    /**
     * 构建TOPIC的 RoutingKey
     *
     * @param serviceInstanceId
     * @return
     */
    public static String buildMqRoutingKey(String serviceInstanceId) {
        return WsSessionEvents.WS_SESSION_SEND_CLUSTER_ + serviceInstanceId;
    }


    /**
     * 根据用户UID获取会话所在的服务实例
     * @param uid
     * @return
     */
    private String getServerInstanceId(String uid){
        WsSession wsSession=wsSessionService.getCluster().getSession(uid);
        if(wsSession instanceof ClusterWsSession){
            ClusterWsSession clusterWsSession=(ClusterWsSession)wsSession;
            return clusterWsSession.getServerInstanceId();
        }
        return null;
    }


}

集群获取mq中需响应的信息

java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsReceiveService;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.util.WsUtil;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Brack.zhu
 * @date 2020/12/22
 */
@Component
public class ClusterWsMqListener {

    @Autowired
    private WsDispatch wsDispatch;

    @Autowired
    private WsReceiveService wsReceiveService;

    @Autowired
    private WsSessionService wsSessionService;

    @Autowired
    private WsConfig wsConfig;

    /**
     * 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除-
     * 没有线程同步
     */

    private final ReentrantLock lock = new ReentrantLock();

    /**
     * key:msgid
     * val:ServiceInstanceId 服务实例id
     */
    private PassiveExpiringMap<String, String> reqRespClusterMaps;


    private Logger log = LoggerFactory.getLogger(ClusterWsMqListener.class);


    @PostConstruct
    public void postConstruct() {
        long wsSyncSendTimeout = wsConfig.getWsSyncSendTimeout();
        lock.lock();
        try {
            long timeToLiveMillis = wsSyncSendTimeout + 30000;
            log.info("WS集群MQ消息发送响应关联集合有效期时间调整为:{}",timeToLiveMillis);
            PassiveExpiringMap<String, String> reqRespClusterMapsTemp = new PassiveExpiringMap<>(timeToLiveMillis);
            if (null != reqRespClusterMaps) {
                reqRespClusterMapsTemp.putAll(reqRespClusterMaps);
                log.info("WS集群MQ消息发送响应关联集合迁移数:{}",reqRespClusterMaps.size());
            }
            reqRespClusterMaps = reqRespClusterMapsTemp;
        } catch (Exception e) {
            log.error("", e);
        } finally {
            lock.unlock();
        }
    }


    public String getReqRespClusterMap(String msgid) {
        return reqRespClusterMaps.get(msgid);
    }

    public void putReqRespClusterMap(String msgid, String uid) {
        lock.lock();
        try {
            reqRespClusterMaps.put(msgid, uid);
        } catch (Exception e) {
            log.error("", e);
        } finally {
            lock.unlock();
        }
    }

    public String removeReqRespClusterMap(String msgid) {
        lock.lock();
        try {
            return reqRespClusterMaps.remove(msgid);
        } catch (Exception e) {
            log.error("", e);
        } finally {
            lock.unlock();
        }
        return null;
    }


    /**
     * WS集群消息分发监听器--接收来自其他节点的消息
     *
     * @param msg MQ集群消息对象
     */
    @RabbitListener(queues = "#{wsConfig.wsClusterQueueName()}")
    public void wsClusterDispatchListener(WsClusterMsg msg) {
        try {
            log.info("接收到集群WS消息:{},{}", JSON.toJSONString(msg));
            String uid = msg.getUid();
            Object msgObj = msg.getMsg();
            if (msgObj instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) msgObj;
                String type = jsonObject.getString(AbstractWsService.WS_TYPE_KEY);
                if (WsType.req.name().equals(type)) {
                    Class<?> dataClassType = msg.getDataClassType();
                    if (null == dataClassType) {
                        dataClassType = Void.class;
                    }
                    WsReq wsReq = WsUtil.toWsReq(jsonObject, dataClassType);
                    putReqRespClusterMap(wsReq.getId(),msg.getFrom());
                    wsDispatch.getLocal().sendWsReq(uid, wsReq);
                } else if (WsType.resp.name().equals(type)) {
                    //消息
                    WsSession wsSession=wsSessionService.getCluster().getSession(uid);
                    wsReceiveService.onRespMessage(jsonObject,wsSession);
                } else {
                    log.error("集群WS消息暂不支持的type类型:{}", type);
                }
            } else {
                log.error("集群WS消息暂不支持的msg数据类型:{}", msgObj);
            }
        } catch (Exception e) {
            log.error("WS集群消息分发监听处理异常,{}", msg, e);
        }
    }

    /**
     * 参数变更事件监听
     *
     * @param envChangeEvent
     */
    @EventListener
    public void EnvironmentChangeEventListener(EnvironmentChangeEvent envChangeEvent) {
        if (envChangeEvent.getKeys().contains("ws.sync.send.timeout")) {
            postConstruct();
        }
    }

}

接收响应数据service

java

import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.boot.RespWsActionOp;
import com.commnetsoft.ws.boot.WsActionBeanPostProcessor;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.service.mq.ClusterWsMqListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * WS消息接收处理服务
 * @author Brack.zhu
 * @date 2020/12/23
 */
@Component
public class WsReceiveService {

    @Autowired
    private WsDispatch wsDispatch;

    @Autowired
    private WsActionBeanPostProcessor wsActionBeanPostProcessor;

    @Autowired
    private WsSendHelper wsSendHelper;

    @Autowired
    private ClusterWsMqListener clusterWsMqListener;

    private Logger log = LoggerFactory.getLogger(WsReceiveService.class);

    /**
     * 响应请求
     * 根据id唤醒同步请求并返回结果
     *
     * @param resp
     * @param <V>
     */
    public <V> void respCountDownLatch(WsResp<V> resp) {
        wsDispatch.respCountDownLatch(resp);
    }


    /**
     * 接收响应数据
     *
     * @param msgJson
     * @param wsSession
     */
    public void onRespMessage(JSONObject msgJson, WsSession wsSession) throws Exception {
        String id = msgJson.getString(AbstractWsService.WS_ID_KEY);
        String clusterServiceInstanceId  = clusterWsMqListener.getReqRespClusterMap(id);
        if (StringUtils.isNotBlank(clusterServiceInstanceId)) {
            //是否来自本服务会话响应集群消息 AS--req->BS--->C--resp->BS--sync->AS
            //转发--集群处理响应消息
            String uid = wsSession.getUid();
            wsDispatch.getCluster().wsRespClusterSync(uid, msgJson,clusterServiceInstanceId);
        } else {
            //本地处理响应消息--可能来自本地或者集群
            String action = msgJson.getString(AbstractWsService.WS_ACTION_KEY);
            //获取入参类型
            RespWsActionOp respWsActionOp = wsActionBeanPostProcessor.resp();
            Class<?> dataVoClazz = respWsActionOp.getDataVoClazzNull(action);
            if (null == dataVoClazz) {
                dataVoClazz = wsSendHelper.getReqCdlRespDataType(id);
                if (null == dataVoClazz) {
                    dataVoClazz = Void.class;
                }
            }
            //回调对应实现方法
            WsResp<?> wsResp = AbstractWsService.toWsResp(msgJson, dataVoClazz);
            onRespMessage(wsResp, wsSession);
        }
    }




    /**
     * 接收响应数据
     *
     * @param wsResp
     * @param wsSession
     */
    public void onRespMessage(WsResp<?> wsResp, WsSession wsSession) throws Exception {
        String action = wsResp.getAction();
        //获取入参类型
        RespWsActionOp respWsActionOp = wsActionBeanPostProcessor.resp();
        if (!respWsActionOp.isPub(action) && !wsSession.isAuthed()) {
            //非公开方法且未登录
            log.warn("响应消息{},于处理要求不一致(该响应需要登录),{}", wsResp, wsSession);
        } else {
            respWsActionOp.invokeWsAction(wsResp, wsSession);
        }
    }

}

WS工具类

java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsData;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import org.apache.tomcat.websocket.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;

import javax.websocket.SendHandler;
import javax.websocket.Session;

/**
 * WebSocket 工具类
 *
 * @author Brack.zhu
 * @date 2019/12/3
 */
public class WsUtil {

    private static Logger log = LoggerFactory.getLogger(WsUtil.class);

    /**
     * 会话属性-用户uid Key
     */
    public static final String KEY_UID = "wsUid";

    /**
     * 会话属性-请求IP
     */
    public static final String KEY_HTTP_IP = "httpIp";

    /**
     * 设置会话属性对象
     *
     * @param session
     * @param key
     * @param obj
     */
    public static void setUserProperties(Session session, String key, Object obj) {
        session.getUserProperties().put(key, obj);
    }

    /**
     * 设置会话登录用户id属性
     *
     * @param session
     * @return
     */
    public static void setLoginUid(Session session, String uid) {
        setUserProperties(session, KEY_UID, uid);
    }

    /**
     * 获取会话属性对象
     *
     * @param session
     * @param key
     * @return
     */
    public static Object getUserProperties(Session session, String key) {
        return session.getUserProperties().get(key);
    }

    /**
     * 获取会话登录用户id属性
     *
     * @param session
     * @return
     */
    public static String getLoginUid(Session session) {
        Object uidObj = getUserProperties(session, KEY_UID);
        if (null == uidObj) {
            return null;
        }
        return String.valueOf(uidObj);
    }

    /**
     * 判断会话是否登录
     *
     * @param session
     * @return
     */
    public static boolean isLogin(Session session) {
        String uid = getLoginUid(session);
        return StringUtils.isNotBlank(uid) ? true : false;
    }

    /**
     * 获取指定wsDataCalzz 类的WsData Action值,无返回null
     *
     * @param wsDataCalzz
     * @param <T>
     * @return
     */
    public static <T> String getWsDataAction(Class<T> wsDataCalzz) {
        WsData wsData = AnnotationUtils.findAnnotation(wsDataCalzz, WsData.class);
        if (null != wsData) {
            return wsData.action();
        }
        return null;
    }

    /**
     * 清除会话自定义属性
     *
     * @param session
     */
    public static void clearUserProperties(Session session) {
        if (null != session) {
            session.getUserProperties().clear();
        }
    }

    /**
     * 关闭Ws会话,忽略会话关闭异常
     *
     * @param session
     */
    public static void closeSession(Session session) {
        if (null != session) {
            try {
                //自定义关闭---测试不推荐
//                CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, null);
//                if (session instanceof WsSession) {
//                    //WsSession 默认关闭方法 中 是不强制关闭 closeSocket 默认值false,需传入强制关闭
//                    WsSession wsSession = (WsSession) session;
//                    wsSession.onClose(closeReason);
//                    log.warn("强制关闭WsSession完成:{}", WsUtil.toStr(session));
//                } else {
//                    log.warn("强制关闭session类型时,不是WsSession类型调用父类关闭方法:{}", WsUtil.toStr(session));
//                    session.close(closeReason);
//                    log.warn("强制关闭session完成:{}", WsUtil.toStr(session));
//                }
                session.close();
                log.warn("强制关闭session完成:{}", WsUtil.toStr(session));
            } catch (Exception e) {
                log.error("closeSession Exception,{}", WsUtil.toStr(session), e);
            }
        }
    }

    /**
     * 获取会话信息
     *
     * @param session
     * @return
     */
    public static String toStr(Session session) {
        if (null != session) {
            //base
            StringBuilder sb = new StringBuilder();
            sb.append("session[id:").append(session.getId())
                    .append(",hashCode:").append(Integer.toHexString(session.hashCode()))
            ;
            if (session instanceof WsSession) {
                WsSession wsSession = (WsSession) session;
                sb.append(",httpSessionId:").append(wsSession.getHttpSessionId())
//                        .append(",httpIp:").append(getUserProperties(session, KEY_HTTP_IP));
                ;
            }
            sb.append("]");
            return sb.toString();
        }
        return null;
    }




    /**
     * 会话是否开启的
     *
     * @param session
     * @return
     */
    public static boolean isOpened(Session session) {
        if (null != session && session.isOpen()) {
            return true;
        }
        return false;
    }

    /**
     * 检查WS会话对象
     *
     * @param session
     * @throws MicroRuntimeException
     */
    public static void checkSession(Session session) throws MicroRuntimeException {
        if (null == session) {
            throw new MicroRuntimeException(WsError.ws_session_notfound);
        }
        if (!WsUtil.isOpened(session)) {
            throw new MicroRuntimeException(WsError.ws_session_invalid, "session信息:" + WsUtil.toStr(session));
        }
    }

    /**
     * WS发送文本
     * @param session
     * @param text
     * @throws MicroRuntimeException
     */
    public static void sendText(Session session,String text)throws  MicroRuntimeException{
        checkSession(session);
        try {
            session.getBasicRemote().sendText(text);
            //成功
            log.info("S-B:[{}]-{},{}", session.getId(), text, session);
        } catch (Exception e) {
            log.error("S-B 发送消息异常:{},{}", session, text, e);
            throw new MicroRuntimeException(WsError.ws_session_send_error, e);
        }
    }

    /**
     * 异步发送WS消息
     *
     * @param message    消息内容
     * @param session    会话
     * @param completion 结果异步通知对象
     */
    public static void asyncSendWsText(String message, Session session, SendHandler completion) throws  MicroRuntimeException{
        checkSession(session);
        session.getAsyncRemote().sendText(message, completion);
    }

    /**
     * 响应消息转换成WsResp对象
     *
     * @param msgJson
     * @param dataClazz 响应data转换成对象Class
     * @param <T>
     * @return
     */
    public static <T>  WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) throws  MicroRuntimeException{
        try {
            String type = msgJson.getString(AbstractWsService.WS_TYPE_KEY);
            if (WsType.resp.toString().equals(type)) {
                return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsResp<T>>(dataClazz) {
                });
            } else {
                throw new MicroRuntimeException(WsError.ws_invalid_type);
            }
        } catch (JSONException je) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
    }


    /**
     * 请求消息转换成WsReq对象
     *
     * @param msgJson
     * @param dataClazz 请求data转换成对象Class
     * @param <T>
     * @return
     */
    public static <T> WsReq<T> toWsReq(JSONObject msgJson, Class<T> dataClazz) {
        try {
            String type = msgJson.getString(AbstractWsService.WS_TYPE_KEY);
            if (WsType.req.toString().equals(type)) {
                return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsReq<T>>(dataClazz) {
                });
            } else {
                throw new MicroRuntimeException(WsError.ws_invalid_type);
            }
        } catch (JSONException je) {
            throw new MicroRuntimeException(WsError.ws_invalid_msg);
        }
    }

}

WS发送消息工具类

java

import com.alibaba.fastjson.JSON;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.core.CommonError;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.boot.WsSendHandler;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.comm.IWsCommunicate;
import com.commnetsoft.ws.service.comm.LocalWsCommunicate;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.Session;

/**
 * WS消息发送服务
 *
 * @author Brack.zhu
 * @date 2020/3/24
 */
@Component
public class WsSendHelper {

    private static Logger log = LoggerFactory.getLogger(WsSendHelper.class);

    @Autowired
    private WsDispatch wsDispatch;

    /**
     * 发送WS请求消息-支持集群<br/>
     *
     * @param uid  目标用户UID
     * @param data 请求消息Data对象
     * @throws MicroRuntimeException 发送WS消息异常
     */
    public <T> void sendWsReq(String uid, T data) throws MicroRuntimeException {
        if (StringUtils.isNotEmpty(uid) && null != data) {
            String wsAction = WsUtil.getWsDataAction(data.getClass());
            if (StringUtils.isEmpty(wsAction)) {
                throw new MicroRuntimeException(WsError.ws_data_action_error);
            }
            WsReq<T> wsReq = new WsReq<>();
            wsReq.setId(UUIDUtils.generate());
            wsReq.setType(WsType.req);
            wsReq.setAction(wsAction);
            wsReq.setData(data);
            sendWsReq(uid, wsReq);
        }
    }


    /**
     * 发送WS请求消息-支持集群
     */
    public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
        if (StringUtils.isNotEmpty(uid) && null != wsReq) {
            wsDispatch.sendWsReq(uid, wsReq);
        }
    }


    /**
     * 同步发送请求消息;同步等待返回消息-支持集群
     * 如果发送失败重试,重试间隔时间:3秒
     * {@link #syncSendWsReq(String, WsReq, Class, int, long)}
     *
     * @param uid          消息接收者uid
     * @param wsReq        发送请求消息
     * @param respDataType 响应数据类型
     * @param retryCount   失败重试次数
     * @param <R>          请求消息数据类型
     * @param <P>          响应消息数据类型
     * @return
     * @throws MicroRuntimeException
     */
    public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType, int retryCount) throws MicroRuntimeException {
        return syncSendWsReq(uid, wsReq, respDataType, retryCount, 3000L);
    }

    /**
     * 同步发送请求消息;同步等待返回消息-支持集群
     * 如果发送失败重试将重试。
     * 发送失败定义:目标对象不存在(未登录)
     *
     * @param uid            消息接收者uid
     * @param wsReq          发送请求消息
     * @param respDataType   响应数据类型
     * @param retryCount     失败重试次数
     * @param retrySleepTime 失败重试间隔时间,单位毫秒
     * @param <R>            请求消息数据类型
     * @param <P>            响应消息数据类型
     * @return
     * @throws MicroRuntimeException
     */
    public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType, int retryCount, long retrySleepTime) throws MicroRuntimeException {
        if (StringUtils.isEmpty(uid) || null == wsReq) {
            throw new MicroRuntimeException(CommonError.illegal_args);
        }
        int i = 0;
        boolean retry = false;
        do {
            try {
                WsResp<P> resp = wsDispatch.sendAndReceiveWsReq(uid, wsReq, respDataType);
                return resp;
            } catch (MicroRuntimeException e) {
                if (WsError.ws_session_notfound.equals(e)) {
                    i++;
                    retry = true;
                    try {
                        Thread.sleep(retrySleepTime);
                    } catch (InterruptedException ex) {
                        //ignore
                    }
                } else {
                    //向上抛异常
                    throw e;
                }
            }
        } while (retry && i < retryCount);
        throw new MicroRuntimeException(WsError.ws_error, "发送(重试)失败");
    }

    /**
     * 同步发送请求消息;同步等待返回消息-支持集群
     *
     * @param uid          消息接收者uid
     * @param wsReq        发送请求消息
     * @param respDataType 响应数据类型
     * @param <R>          请求消息数据类型
     * @param <P>          响应消息数据类型
     * @return
     * @throws MicroRuntimeException
     */
    public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType) throws MicroRuntimeException {
        if (StringUtils.isEmpty(uid) || null == wsReq) {
            throw new MicroRuntimeException(CommonError.illegal_args);
        }
        return wsDispatch.sendAndReceiveWsReq(uid, wsReq, respDataType);
    }


    /**
     * 发送WS响应消息<br/>
     * 仅支持本地发送
     * 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
     */
    public <V> void sendWsResp(String uid, WsResp<V> wsResp) throws MicroRuntimeException {
        if (StringUtils.isNotEmpty(uid) && null != wsResp) {
            wsDispatch.sendWsResp(uid, wsResp);
        }
    }


    /**
     * 发送WS响应消息<br/>
     * 仅支持指定Session(本地发送)
     * 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
     */
    public <V> void sendWsResp(Session session, WsResp<V> wsResp) {
        IWsCommunicate localWsComm = wsDispatch.getLocal();
        if (localWsComm instanceof LocalWsCommunicate) {
            LocalWsCommunicate localWsCommunicate = (LocalWsCommunicate) localWsComm;
            localWsCommunicate.sendWsResp(session, wsResp);
        } else {
            log.warn("未知本地通讯实现,{}", localWsComm);
        }
    }

    /**
     * 发送WS响应消息<br/>
     * 仅支持指定Session(本地发送)
     * 会对发送结果进行逻辑处理,会产生熔断逻辑触发。
     */
    public <T, V> void sendWsResp(Session session, WsReq<T> wsReq, WsResp<V> wsResp) {
        if (null != wsResp) {
            WsUtil.asyncSendWsText(JSON.toJSONString(wsResp), session, new WsSendHandler(wsReq, wsResp, session));
        }
    }

    /**
     * 根据响应ID获取对应的同步响应数据类型
     *
     * @param id
     * @return
     */
    public Class<?> getReqCdlRespDataType(String id) {
        ReqRespCountDownLatch cdl = wsDispatch.getReqRespCdl(id);
        if (null != cdl) {
            return cdl.getRespDataType();
        }
        return null;
    }

}

登录,心跳检查

接口

java

import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.model.WsSession;


/**
 * 基础服务,登录,心跳检查 默认实现
 * @author Brack.zhu
 * @date 2019/12/10
 */
public interface IWsBaseService {

    /**
     * 登录请求处理
     * @param wsLoginVo
     * @param wsSession
     * @return
     */
    Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession);

    /**
     * 心跳检查请求处理
     * @param wsHeartbeatVo
     * @param wsSession
     * @return
     */
    Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession);

    /**
     * 心跳检查响应处理
     * @param wsHeartbeatVo
     * @param wsSession
     * @return
     */
    Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession);

}

实现service

java

import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.auth.api.v2.model.ValidDto;
import com.commnetsoft.auth.model.HttpRouteHead;
import com.commnetsoft.commons.Result;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsAction;
import com.commnetsoft.ws.annotation.WsPermit;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.feign.AuthApi;
import com.commnetsoft.ws.feign.ValidationApi;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.websocket.Session;
import java.util.Map;

/**
 * WS基础业务处理服务</br>
 * 登录</br>
 * 心跳检测</br>
 *
 * @author Brack.zhu
 * @date 2019/12/4
 */
@Service
public class WsBaseService implements IWsBaseService {

    @Autowired
    private AuthApi authApi;

    @Autowired
    private ValidationApi validationApi;


    private Logger log = LoggerFactory.getLogger(WsBaseService.class);

    @WsAction(action = AbstractWsService.WS_LOGIN_ACTION_NAME, desc = "WS登录验证请求处理", type = WsType.req, fallback = WsBaseServiceFallback.class, permit = WsPermit.PUBLIC)
    @Override
    public Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession) {
        if(wsSession instanceof  LocalWsSession){
            LocalWsSession localWsSession=(LocalWsSession)wsSession;
            String jwt = wsLoginVo.getJwt();
            if (StringUtils.isNotBlank(jwt)) {
                return wsLoginReqByJwt(jwt, localWsSession.getSession());
            }
            String username = wsLoginVo.getUsername();
            String pwd = wsLoginVo.getPwd();
            if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(pwd)) {
                return wsLoginReqByUsernamePwd(username, pwd, localWsSession.getSession());
            }
            return Result.create(WsError.ws_login_type_nonsupport);
        }
        return Result.create(WsError.ws_session_invalid,"非本地会话对象");
    }

    /**
     * 根据JWT方式登录
     *
     * @param jwt
     * @param session
     * @return
     */
    public Result<Void> wsLoginReqByJwt(String jwt, Session session) {
        Result<Map<String, String>> result = authApi.jwtParse(jwt);
        if (result.successful()) {
            Object uidObj = result.getResult().get(HttpRouteHead.idmuid.getValue());
            String uid = String.valueOf(uidObj);
            WsUtil.setLoginUid(session, uid);
            return Result.create();
        } else {
            return Result.create(result);
        }
    }

    /**
     * 根据用户名密码方式登录
     *
     * @param username
     * @param pwd
     * @param session
     * @return
     */
    public Result<Void> wsLoginReqByUsernamePwd(String username, String pwd, Session session) {
        JSONObject login=new JSONObject();
        login.put("username", username);
        login.put("pwd",pwd);
        Result<ValidDto> result = validationApi.valid(login);
        if (result.successful()) {
            Object uidObj = result.getResult().getUid();
            String uid = String.valueOf(uidObj);
            WsUtil.setLoginUid(session, uid);
            return Result.create();
        } else {
            return Result.create(result);
        }
    }


    @WsAction(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME, desc = "WS心跳检测请求", type = WsType.req, fallback = WsBaseServiceFallback.class)
    @Override
    public Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
        if (!wsSession.isAuthed()) {
            log.warn("收到心跳消息,但是该会话已经不可用{}", wsSession);
        }
        if(wsSession instanceof LocalWsSession){
            LocalWsSession localWsSession=(LocalWsSession)wsSession;
            if (!localWsSession.getSession().isOpen()) {
                log.warn("收到心跳消息,但是会话状态不是OPEN:{}", wsSession);
            }
        }
        return Result.create();
    }

    @WsAction(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME, desc = "WS心跳检测响应", type = WsType.resp, fallback = WsBaseServiceFallback.class)
    @Override
    public Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
        if (!wsSession.isAuthed()) {
            log.warn("响应心跳消息,但是该会话已经不可用{}", wsSession);
        }
        if(wsSession instanceof LocalWsSession){
            LocalWsSession localWsSession=(LocalWsSession)wsSession;
            if (!localWsSession.getSession().isOpen()) {
                log.warn("响应心跳消息,但是会话状态不是OPEN:{}", wsSession);
            }
        }
        return Result.create();
    }

}

service熔断类

java

import com.commnetsoft.commons.Result;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsSessionService;
import org.springframework.stereotype.Component;


/**
 *  WS基础业务处理服务--熔断方法
 * @author Brack.zhu
 * @date 2019/12/9
 */
@Component
public class WsBaseServiceFallback implements IWsBaseService {

    @Override
    public Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession) {
        //熔断错误业务逻辑---将认证成功的会话移除
        String uid=wsSession.getUid();
        if(StringUtils.isNotBlank(uid)){
            WsSessionService wsSessionService= SpringContextUtil.getBean(WsSessionService.class);
            if (null!=wsSessionService){
                wsSessionService.removeAuthSession(uid);
            }
        }
        //该结果不会返回给浏览器,一般定义标准成功即可
        return Result.create();
    }

    @Override
    public Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
        return Result.create();
    }

    @Override
    public Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
        return Result.create();
    }
}

实体类

java

import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.annotation.WsData;

/**
 * WS心跳包
 * @author Brack.zhu
 * @date 2019/12/9
 */
@WsData(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME)
public class WsHeartbeatVo {
}



import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.annotation.WsData;

/**
 * Ws登录数据模型
 *  登录支持3种方式
 *   1:JWT
 *   2: 用户名/密码
 *   3:票据 --暂时未实现
 * @author Brack.zhu
 * @date 2019/12/4
 */
@WsData(action = AbstractWsService.WS_LOGIN_ACTION_NAME)
public class WsLoginVo {

    /**
     * JWT
     */
    private String jwt;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String pwd;

    public String getJwt() {
        return jwt;
    }

    public void setJwt(String jwt) {
        this.jwt = jwt;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPwd() {
        return pwd;
    }

    public void setPwd(String pwd) {
        this.pwd = pwd;
    }
}

客户端使用

实体类/枚举


import java.lang.annotation.*;

import cn.edcall.module.device.ws.WsType;

/**
 * WebSocket 处理方法注解
 *
 *  业务实现样例,参照登录处理
 *
 * @author Brack.zhu
 * @date 2019/12/3
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsAction {

    /**
     * 处理方法 为空取方法名(长度小于20)
     */
    String action();

    /**
     *  接口方法描述
     */
    String desc();

    /**
     * WS操作类型
     * @return
     */
    WsType type();

//    /**
//     * 权限 默认登录
//     */
//    WsPermit permit() default WsPermit.LOGIN;

//    /**
//     * 消息失败处理类---熔断
//     * @return
//     */
//    Class<?> fallback() default void.class;
}

该注解标识有接收ws请求的类

java

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * WS消息处理器注解
 * @author Brack.zhu
 * @date 2020年11月19日
 */
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsHandler {

}
java

/**
 * IOT服务能力触发WS消息模型
 * @author Brack.zhu
 * @date 2020年11月20日
 */
public class WsIotServiceCallReqVo {
	
	/**
	 * iot 注册时返回的 ID
	 */
	private String iotid;
	
	/**
	 * 约定服务能力
	 */
	private String service;

	public String getIotid() {
		return iotid;
	}

	public void setIotid(String iotid) {
		this.iotid = iotid;
	}

	public String getService() {
		return service;
	}

	public void setService(String service) {
		this.service = service;
	}
	
	
	
}
java

public class WsLoginReqVo {

//	private String jwt;
//
//	public String getJwt() {
//		return jwt;
//	}
//
//	public void setJwt(String jwt) {
//		this.jwt = jwt;
//	}
	
	private String username;
	
	private String pwd;
	

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getPwd() {
		return pwd;
	}

	public void setPwd(String pwd) {
		this.pwd = pwd;
	}
	
	
}
java

import cn.edcall.module.device.ws.WsType;

/**
 * 请求消息模型
 * @author Brack.zhu
 * @date 2019/12/4
 */
public class WsReq<T> {

    private String id;
    
    private WsType type = WsType.req;

    private String action;

    private T data;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public WsType getType() {
        return type;
    }

    public void setType(WsType type) {
        this.type = type;
    }
}
java

import java.util.Objects;

import cn.edcall.module.device.exception.IErrorCode;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsType;

/**
 * 响应消息模型
 * @author Brack.zhu
 * @date 2019/12/4
 */
public class WsResp<T> extends Result<T> {

    private String id;

    private WsType type= WsType.resp;

    private String action;

    private Object reqdata;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public Object getReqdata() {
        return reqdata;
    }

    public void setReqdata(Object reqdata) {
        this.reqdata = reqdata;
    }

    public WsType getType() {
        return type;
    }

    public void setType(WsType type) {
        this.type = type;
    }

    /**
     * 创建一个正确的数据返回结果
     *
     * @param result 数据
     */
    public static <T,V> WsResp<T> create(WsReq<V> req,Result<T> result) {
        WsResp<T> rt = new WsResp<>();
        rt.setId(req.getId());
        rt.setAction(req.getAction());
        rt.setCode(result.getCode());
        rt.setMessage(result.getMessage());
        rt.setDesc(result.getDesc());
        rt.setResult(result.getResult());
        return rt;
    }

    /**
     * 创建一个错误的返回结果
     *
     * @param code 错误信息
     */
    public static <T, V> WsResp<T> create(WsReq<V> req, IErrorCode code) {
        Objects.requireNonNull(code, "错误的返回结果错误码不能为空");
        WsResp<T> rt = new WsResp<>();
        rt.setCode(code.getCode());
        rt.setMessage(code.getMessage());
        rt.setDesc(code.getDesc());
        rt.setId(req.getId());
        rt.setAction(req.getAction());
        rt.setReqdata(req.getData());
        return rt;
    }

}
java

/**
 * 请求类型
 * @author Brack.zhu
 * @date 2019/12/10
 */
public enum WsType {
    //请求类型
    req,
    //响应类型
    resp,
}
java

import java.lang.reflect.Method;

import cn.edcall.module.device.ws.annotation.WsAction;

/**
 * WsActionOp相应操作对象
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class WsActionOpData {

    WsAction wsAction;

    Object bean;

    Method method;

    public WsAction getWsAction() {
        return wsAction;
    }

    public void setWsAction(WsAction wsAction) {
        this.wsAction = wsAction;
    }

    public Object getBean() {
        return bean;
    }

    public void setBean(Object bean) {
        this.bean = bean;
    }

    public Method getMethod() {
        return method;
    }

    public void setMethod(Method method) {
        this.method = method;
    }

}

请求响应闭锁

java

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import cn.edcall.module.device.ws.model.WsResp;

/**
 * 请求响应闭锁
 * @author Brack.zhu
 * @date 2020年11月19日
 */
public class ReqRespCountDownLatch extends CountDownLatch {
	
	/**
	 * 同步请求响应data数据类型
	 */
	private Class<?> respDataType;

	private WsResp<?> resp;
	
	public ReqRespCountDownLatch(Class<?> respDataType) {
		super(1);
		this.respDataType=respDataType;
	}
	
	/**
	 * 同步等待请求响应
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> WsResp<T> awaitResp() {
		try {
			super.await(WsClientModule.REQ_RESP_TIME_OUT, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			//ignore
		}
		return (WsResp<T>)getResp();
	}

	public WsResp<?> getResp() {
		return resp;
	}

	public void setResp(WsResp<?> resp) {
		this.resp = resp;
	}

	public Class<?> getDataType() {
		return respDataType;
	}

	
	
	
}

请求处理

java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsError;

/**
 * @author Brack.zhu
 * @date 2019/12/10
 */
public abstract class AbstractWsActionOp {

	private transient Logger log = LoggerFactory.getLogger(AbstractWsActionOp.class);

	/**
	 * 获取请求类型指定WsActionOp对象
	 *
	 * @param wsAction
	 * @return
	 */
	public abstract WsActionOpData getWsActionOpData(String wsAction);

	// /**
	// * 获取指定WsAction对应的接口权限
	// *
	// * @param wsAction
	// * @return
	// */
	// public WsPermit getWsPermit(String wsAction) {
	// WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
	// if (null != wsActionOpData) {
	// return wsActionOpData.getWsAction().permit();
	// }
	// return null;
	// }

	// /**
	// * 指定WsAction对应的接口权限是否公开
	// *
	// * @param wsAction
	// * @return true 公开
	// */
	// public boolean isPub(String wsAction) {
	// WsPermit apiPermit = getWsPermit(wsAction);
	// if (null != apiPermit) {
	// return WsPermit.PUBLIC.equals(apiPermit) ? true : false;
	// }
	// return false;
	// }

	/**
	 * 获取反射方法形参类型数组
	 *
	 * @param wsAction
	 * @return
	 */
	public Class<?>[] getArgsClazz(String wsAction) {
		WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
		if (null != wsActionOpData) {
			return wsActionOpData.getMethod().getParameterTypes();
		}
		return new Class<?>[] {};
	}

	/**
	 * 获取反射方法形参类型数组,获取失败使用默认的Void类型
	 *
	 * @param wsAction
	 * @return
	 */
	public Class<?> getDataVoClazz(String wsAction) {
		Class<?> clazz=getDataVoClazzNull(wsAction);
		if(null==clazz) {
			return Void.class;
		}
		return clazz;
	}

	/**
	 * 获取反射方法形参类型数组
	 *
	 * @param wsAction
	 * @return
	 */
	public Class<?> getDataVoClazzNull(String wsAction) {
		Class<?>[] argsClazz = getArgsClazz(wsAction);
		if (null != argsClazz && argsClazz.length >= 1) {
			Class<?> dataVoClazz = argsClazz[0];
			return dataVoClazz;
		}
		return null;
	}

	
	/**
	 * 反射对应方法
	 * 
	 * @param data
	 * @param session
	 * @param action
	 * @param <T>
	 * @param <D>
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T, D> Result<T> invokeWsActionMethod(D data, String action) {
		Result<T> rs = null;
		try {
			WsActionOpData wsActionOp = getWsActionOpData(action);
			if (null != wsActionOp) {
				int parsLength = wsActionOp.getMethod().getParameterTypes().length;
				if (1 == parsLength) {
					rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), data);
				} else {
					log.error("WsAction 处理方法反射异常,实现类形参定义错误({}),{}", parsLength, data);
					return Result.create(WsError.ws_action_error);
				}
			} else {
				log.error("WsAction 处理方法未找到{},{}", data);
				return Result.create(WsError.ws_action_notfound);
			}
		} catch (MicroRuntimeException mre) {
			log.error("WsAction 处理方法反射异常,{},{}", data, mre);
			return Result.create(mre);
		} catch (Exception e) {
			log.error("WsAction 处理方法反射异常,{},{}", data, e);
			return Result.create(WsError.ws_action_error);
		}
		return rs;
	}
//
//	/**
//	 * 反射对应失败熔断方法
//	 * 
//	 * @param data
//	 * @param session
//	 * @param action
//	 * @param <D>
//	 * @return
//	 */
//	public <D> void invokeWsActionFallbackMethod(D data, Session session, String action) {
//		try {
//			WsActionOpData wsActionOp = getWsActionOpData(action);
//			if (null != wsActionOp) {
//				Class<?> fallbackClazz = wsActionOp.getWsAction().fallback();
//				if (null == fallbackClazz) {
//					return;
//				}
//				Object fallbackBean = SpringContextUtil.getBean(fallbackClazz);
//				if (null == fallbackBean) {
//					return;
//				}
//				Method method = wsActionOp.getMethod();
//				Class<?>[] paraClass = method.getParameterTypes();
//				Method fallbackMethod = fallbackClazz.getMethod(method.getName(), paraClass);
//				if (null == fallbackMethod) {
//					return;
//				}
//				Result<?> rs = null;
//				int parsLength = paraClass.length;
//				if (1 == parsLength) {
//					rs = (Result<?>) fallbackMethod.invoke(fallbackBean, session);
//				} else if (2 == parsLength) {
//					rs = (Result<?>) fallbackMethod.invoke(fallbackBean, data, session);
//				} else {
//					log.error("WsAction 处理失败熔断方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, session);
//				}
//				if (!rs.successful()) {
//					log.error("WsAction 处理失败熔断方法反射结果:{},{},{}", rs, data, session);
//				}
//			} else {
//				log.error("WsAction 处理失败熔断方法未找到{},{}", data, session);
//			}
//		} catch (MicroRuntimeException mre) {
//			log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, session, mre);
//		} catch (Exception e) {
//			log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, session, e);
//		}
//	}

}
java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 请求类型操作
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class ReqWsActionOp extends AbstractWsActionOp {

    private transient Logger log = LoggerFactory.getLogger(ReqWsActionOp.class);

    /**
     * WsAction 请求类型相关数据
     */
    private final transient Map<String, WsActionOpData> wsReqActions = new ConcurrentHashMap<>(256);

    public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
        return wsReqActions.put(wsAction, wsActionOpData);
    }

    public WsActionOpData remove(String wsAction) {
        return wsReqActions.remove(wsAction);
    }

    @Override
    public WsActionOpData getWsActionOpData(String wsAction) {
        return wsReqActions.get(wsAction);
    }

    /**
     * 调用WsAction对应方法
     * @param req
     * @param <T>
     * @param <V>
     * @return
     */
    public <T,V> WsResp<T> invokeWsAction(WsReq<V> req){
        String action = req.getAction();
        V reqData = req.getData();
        Result<T> rs=invokeWsActionMethod(reqData,action);
        if(!rs.successful()){
            log.error("WsAction Req处理方法反射错误{},{}",req,rs);
        }
        return WsResp.create(req,rs);
    }


//    /**
//     * 调用WsAction对应失败熔断方法
//     * @param req
//     * @param session
//     * @param <V>
//     * @return
//     */
//    public <V> void invokeWsActionFallback(WsReq<V> req, Session session){
//        String action = req.getAction();
//        V reqData = req.getData();
//        invokeWsActionFallbackMethod(reqData,session,action);
//    }
}
java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.commnetsoft.pub.pro.base.module.ModuleManager;

import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsClientModule;
import cn.edcall.module.device.ws.model.WsResp;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 响应类型操作
 * @author Brack.zhu
 * @date 2019/12/10
 */
public class RespWsActionOp extends AbstractWsActionOp {

    private transient Logger log = LoggerFactory.getLogger(RespWsActionOp.class);

    /**
     * WsAction 响应类型相关数据
     */
    private final transient Map<String, WsActionOpData> wsRespActions = new ConcurrentHashMap<>(256);
    

	public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
        return wsRespActions.put(wsAction, wsActionOpData);
    }

    public WsActionOpData remove(String wsAction) {
        return wsRespActions.remove(wsAction);
    }

    @Override
    public WsActionOpData getWsActionOpData(String wsAction) {
    	WsActionOpData wsActionOpData=wsRespActions.get(wsAction);
    	return wsActionOpData;
    }
    
    

    /**
     * 调用WsAction对应方法
     * @param resp
     * @param session
     * @param <T>
     * @param <V>
     */
    public <T,V> void invokeWsAction(WsResp<V> resp){
        String action = resp.getAction();
//        V respResult = resp.getResult();
        Result<T> rs=invokeWsActionMethod(resp,action);
        if(!rs.successful()){
            log.error("WsAction Resp处理方法反射错误{},{}",resp,rs);
        }
    }
    
    
    /**
     * 通用实现响应
     * @param <T>
     * @param <V>
     * @param resp
     * @return
     */
    public <T,V> Result<T> generalRespAction(WsResp<V> resp) {
    	WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
    	wsClientModule.respCountDownLatch(resp);
    	return Result.create();
    }

    /**
     * 重写
     */
	@Override
	public <T, D> Result<T> invokeWsActionMethod(D data, String action) {
		WsActionOpData wsActionOp = getWsActionOpData(action);
		if(null==wsActionOp) {
			//使用通用实现
			return generalRespAction((WsResp<?>)data);
		}
		Result<T> result=super.invokeWsActionMethod(data, action);
		return result;
	}
    
    
    
    
//    /**
//     * 调用WsAction对应失败熔断方法
//     * @param resp
//     * @param session
//     * @param <V>
//     * @return
//     */
//    public <V> void invokeWsActionFallback(WsResp<V> resp, Session session){
//        String action = resp.getAction();
//        V respData = resp.getResult();
//        invokeWsActionFallbackMethod(respData,session,action);
//    }
}

WSClient

开启连接、结束、接收到消息对应的处理方法,会话开始、重连、销毁、判断准备是否完成、发送消息

java

import java.net.URI;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.commnetsoft.pub.pro.base.module.ModuleManager;

import cn.edcall.module.device.event.EventHelper;
import cn.edcall.module.device.exception.IErrorCode;
import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.event.WsConnectionEvent;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
import cn.edcall.module.device.ws.op.ReqWsActionOp;
import cn.edcall.module.device.ws.op.RespWsActionOp;

/**
 * WS客户端
 * 
 * @author Brack.zhu
 * @date 2020年11月16日
 */
public class WsClient extends WebSocketClient {

	private Logger log = LoggerFactory.getLogger(getClass());

	/**
	 * WS协议 id key
	 */
	public final static String WS_ID_KEY = "id";

	/**
	 * WS协议 type key
	 */
	public final static String WS_TYPE_KEY = "type";

	/**
	 * WS协议 Action key
	 */
	public final static String WS_ACTION_KEY = "action";

	/**
	 * WS协议 code key
	 */
	public final static String WS_CODE_KEY = "code";

	/**
	 * 登录操作名
	 */
	public final static String WS_LOGIN_ACTION_NAME = "login";

	/**
	 * 心跳包操作名
	 */
	public final static String WS_HEARTBEAT_ACTION_NAME = "heartbeat";

	/**
	 * IOT服务注册操作名
	 */
	public final static String WS_IOT_REGISTER_ACTION_NAME = "iot_register";

	/**
	 * IOT服务能力触发操作名
	 */
	public final static String WS_IOT_SERVICE_CALL_ACTION_NAME = "iot_service_call";

	/**
	 * 未知操作名
	 */
	public final static String WS_UNKNOWN_ACTION_NAME = "unknown";

	public WsClient(URI serverUri) {
		super(serverUri);
	}

	@Override
	public void onOpen(ServerHandshake handshakedata) {
		log.info("WS与服务端开始创建会话 {}!", clinetInfo());
	}

	@Override
	public void onMessage(String message) {
		log.info("WS接收消息 ,{}, {}!", message, clinetInfo());
		try {
			JSONObject msgJson = JSON.parseObject(message);
			if (null != msgJson) {
				String type = msgJson.getString(WS_TYPE_KEY);
				if (WsType.req.toString().equals(type)) {
					onReqMessage(msgJson);
				} else if (WsType.resp.toString().equals(type)) {
					onRespMessage(msgJson);
				}
			}
		} catch (MicroRuntimeException mre) {
			log.error("WS onMessage MicroRuntimeException {},{}", message, clinetInfo(), mre);
			WsResp<?> wsResp = buildUnknownWsResp(mre);
			sendWsResp(wsResp);
		} catch (JSONException jsone) {
			log.error("WS onMessage JSONException(协议错误) {},{}", message, clinetInfo(), jsone);
			MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_invalid_msg, jsone);
			WsResp<?> wsResp = buildUnknownWsResp(microRuntimeException);
			sendWsResp(wsResp);
		} catch (Exception e) {
			log.error("WS onMessage Exception {},{}", message, clinetInfo(), e);
			MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_handle_msg_unknown_error, e);
			WsResp<?> wsResp = buildUnknownWsResp(microRuntimeException);
			sendWsResp(wsResp);
		}

	}

	@Override
	public void onClose(int code, String reason, boolean remote) {
		log.warn("WS会话关闭code:{},reason:{},remote:{},{}!", code, reason, remote, clinetInfo());
		// 重连
		WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
		wsClientModule.wsReconnWake();
	}

	@Override
	public void onError(Exception ex) {
		log.error("WS会话异常{}", clinetInfo(), ex);
	}

	/**
	 * 同步等待连接 最长等待30秒
	 */
	private void awaitConn() {
		for (int i = 0; i < 10; i++) {// 等待30秒
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// ignore
			}
			if (clientIsOk()) {
				break;
			}
		}
	}

	/**
	 * 会话启动生效
	 * 
	 * @return
	 */
	public boolean clientStart() {
		try {
			this.connect();
			if (!clientIsOk()) {
				log.info("等待连接成功...");
				awaitConn();
				if (!clientIsOk()) {
					log.warn("连接失败...");
					return false;
				}
			}
			// 连接成功事件
			EventHelper.publish(new WsConnectionEvent());
			log.info("连接成功...");
			return true;
		} catch (Exception e) {
			log.error("会话启动连接异常:", e);
		}
		return false;
	}

	/**
	 * 会话重连
	 * 
	 * @return
	 */
	public boolean clientReConnect() {
		log.info("WS会话重连,重连前的会话信息 :{}", clinetInfo());
		boolean rs = false;
		try {
			boolean reconnRs = reconnectBlocking();
			if (reconnRs) {
				awaitConn();
				rs = clientIsOk();
				if (rs) {
					// 重新连接成功事件
					EventHelper.publish(new WsConnectionEvent());
				}
			}
		} catch (Exception e) {
			log.error("WS会话重连异常:", e);
		}
		log.info("WS会话重连结果:{}", rs);
		return rs;
	}

	/**
	 * 会话关闭销毁
	 */
	public void clientDestroy() throws MicroRuntimeException {
		log.info("WS会话销毁,销毁前的会话信息:{}", clinetInfo());
		this.close();
	}

	/**
	 * 通道是否准备完成
	 * 
	 * @return
	 */
	public boolean clientIsOk() {
		return getReadyState().equals(ReadyState.OPEN);
	}

	/**
	 * 会话信息
	 * 
	 * @return
	 */
	public String clinetInfo() {
		StringBuffer sb = new StringBuffer();
		sb.append("WsInfo{");
		sb.append("state:").append(super.getReadyState()).append(";");
		sb.append("SSL:").append(super.hasSSLSupport()).append(";");
		sb.append("}");
		return sb.toString();
	}

	/**
	 * 接收请求消息
	 *
	 * @param msgJson
	 * @param session
	 */
	public void onReqMessage(JSONObject msgJson) throws MicroRuntimeException {
		String action = msgJson.getString(WS_ACTION_KEY);
		WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
		// 获取入参类型
		ReqWsActionOp reqWsActionOp = wsClientModule.getReqWsActionOp();
		if (null == reqWsActionOp) {
			log.warn("未找到对应请求处理方法:{}", msgJson.toJSONString());
			return;
		}
		Class<?> dataVoClazz = reqWsActionOp.getDataVoClazz(action);
		// 回调对应实现方法
		WsReq<?> wsReq = toWsReq(msgJson, dataVoClazz);
		WsResp<?> wsResp = reqWsActionOp.invokeWsAction(wsReq);
		sendWsResp(wsResp);
	}

	/**
	 * 接收响应数据
	 *
	 * @param msgJson
	 * @param session
	 */
	public void onRespMessage(JSONObject msgJson) throws MicroRuntimeException {
		String action = msgJson.getString(WS_ACTION_KEY);
		// 获取入参类型
		WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
		RespWsActionOp respWsActionOp = wsClientModule.getRespWsActionOp();
		if (null == respWsActionOp) {
			log.warn("未找到对应响应处理方法:{}", msgJson.toJSONString());
			return;
		}
		Class<?> dataVoClazz = respWsActionOp.getDataVoClazzNull(action);
		if (null == dataVoClazz) {
			String id = msgJson.getString(WS_ID_KEY);
			dataVoClazz = wsClientModule.getReqRespCdlRespDataType(id);
			if (null == dataVoClazz) {
				dataVoClazz = Void.class;
			}
		}
		WsResp<?> wsResp = toWsResp(msgJson, dataVoClazz);
		// 回调对应实现方法
		respWsActionOp.invokeWsAction(wsResp);
	}

	/**
	 * 构建一个未知响应消息,一般在错误协议是使用
	 * 
	 * @param code
	 * @param <T>
	 * @return
	 */
	public <T> WsResp<T> buildUnknownWsResp(IErrorCode code) {
		WsReq<?> wsReq = new WsReq<>();
		wsReq.setId(UUIDUtils.generate());
		wsReq.setType(WsType.resp);
		wsReq.setAction(WS_UNKNOWN_ACTION_NAME);
		return WsResp.create(wsReq, code);
	}

	/**
	 * 请求消息转换成WsReq对象
	 *
	 * @param msgJson
	 * @param dataClazz
	 *            请求data转换成对象Class
	 * @param <T>
	 * @return
	 */
	public <T> WsReq<T> toWsReq(JSONObject msgJson, Class<T> dataClazz) {
		try {
			String type = msgJson.getString(WS_TYPE_KEY);
			if (WsType.req.toString().equals(type)) {
				// 请求格式验证
				if (!msgJson.containsKey(WS_ID_KEY)) {
					throw new MicroRuntimeException(WsError.ws_invalid_msg, "请求消息中未包含id字段:" + msgJson.toJSONString());
				}
				if (!msgJson.containsKey(WS_ACTION_KEY)) {
					throw new MicroRuntimeException(WsError.ws_invalid_msg, "请求消息中未包含action字段:" + msgJson.toJSONString());
				}
				return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsReq<T>>(dataClazz) {
				});
			} else {
				throw new MicroRuntimeException(WsError.ws_invalid_type);
			}
		} catch (JSONException je) {
			throw new MicroRuntimeException(WsError.ws_invalid_msg);
		}
	}

	/**
	 * 响应消息转换成WsResp对象
	 *
	 * @param msgJson
	 * @param dataClazz
	 *            响应data转换成对象Class
	 * @param <T>
	 * @return
	 */
	public <T> WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) {
		try {
			String type = msgJson.getString(WS_TYPE_KEY);
			if (WsType.resp.toString().equals(type)) {
				if (!msgJson.containsKey(WS_ID_KEY)) {
					throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含id字段:" + msgJson.toJSONString());
				}
				if (!msgJson.containsKey(WS_ACTION_KEY)) {
					throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含action字段:" + msgJson.toJSONString());
				}
				if (!msgJson.containsKey(WS_CODE_KEY)) {
					throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含code字段:" + msgJson.toJSONString());
				}
				return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsResp<T>>(dataClazz) {
				});
			} else {
				throw new MicroRuntimeException(WsError.ws_invalid_type);
			}
		} catch (JSONException je) {
			log.error("响应消息转换JSON异常,json:{},class:{}", msgJson.toJSONString(), dataClazz, je);
			throw new MicroRuntimeException(WsError.ws_invalid_msg, je);
		}
	}

	/**
	 * 发送WS请求消息
	 */
	public <T> void sendWsReq(WsReq<T> wsReq) throws MicroRuntimeException {
		if (null != wsReq) {
			if (!clientIsOk()) {
				throw new MicroRuntimeException(WsError.ws_connection_unavailable);
			}
			try {
				String text = JSON.toJSONString(wsReq);
				this.send(text);
			} catch (Exception e) {
				throw new MicroRuntimeException(WsError.ws_send_req_error, e);
			}
		}
	}

	/**
	 * 发送WS响应消息<br/>
	 * 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
	 */
	public <V> void sendWsResp(WsResp<V> wsResp) {
		if (null != wsResp) {
			if (!clientIsOk()) {
				throw new MicroRuntimeException(WsError.ws_connection_unavailable);
			}
			try {
				String text = JSON.toJSONString(wsResp);
				this.send(text);
			} catch (Exception e) {
				throw new MicroRuntimeException(WsError.ws_send_resp_error, e);
			}
		}
	}

}

WSClientModule

包含扫描wsHandle注解,获取ws接受请求ws的连接,将其存入map中,

ws会话连接、登录验证、启动重连和心跳线程、ws状态获取、重连线程唤醒、

发送同步请求,添加请求响应锁

java

import cn.edcall.module.device.DeviceBootstrap;
import cn.edcall.module.device.constant.Constants;
import cn.edcall.module.device.event.EventHelper;
import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.util.ConfigUtil;
import cn.edcall.module.device.util.LogExtendUtil;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.annotation.WsAction;
import cn.edcall.module.device.ws.annotation.WsHandler;
import cn.edcall.module.device.ws.event.WsConnectionInvalidEvent;
import cn.edcall.module.device.ws.event.WsLoginEvent;
import cn.edcall.module.device.ws.model.WsLoginReqVo;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
import cn.edcall.module.device.ws.op.ReqWsActionOp;
import cn.edcall.module.device.ws.op.RespWsActionOp;
import cn.edcall.module.device.ws.op.WsActionOpData;
import com.commnetsoft.pub.pro.base.module.SimpleAbstractModule;
import com.commnetsoft.pub.pro.base.service.ServiceManager;
import com.commnetsoft.pub.util.common.StringUtil;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;

/**
 * WS客户端通讯模块
 * 
 * @author Brack.zhu
 * @date 2020-11-13
 */
public class WsClientModule extends SimpleAbstractModule {

	private String EDCALL_DOMAIN;

	private WsClient wsClient;

	private ReqWsActionOp reqWsActionOp = new ReqWsActionOp();

	private RespWsActionOp respWsActionOp = new RespWsActionOp();

	private WsHeartbeatThread wsHeartbeatThread = new WsHeartbeatThread();

	private ClientReconnectThread clientReconnectThread = new ClientReconnectThread();

	/**
	 * 请求响应超时时间---debug时可以设长点
	 */
	public static long REQ_RESP_TIME_OUT = 30 * 1000;

	/**
	 * 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除
	 * 存数key 和请求响应锁的对象
	 */
	private PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCDL = new PassiveExpiringMap<String, ReqRespCountDownLatch>(REQ_RESP_TIME_OUT);

	private Logger log = LoggerFactory.getLogger(getClass());

	@Override
	public boolean init0() {
		LogExtendUtil.consoleLogInfoPrint(log, "----------WS客户端通讯模块----------");
		EDCALL_DOMAIN = ConfigUtil.get(Constants.BASE_DOMAIN);
		if (StringUtil.isEmpty(EDCALL_DOMAIN)) {
			LogExtendUtil.consoleLogInfoPrint(log, "WS客户端通讯模块初始化失败,原因:获取edcall主机地址失败," + EDCALL_DOMAIN);
			return false;
		}
		// 初始化ws消息处理方法
		try {
			initWsAction();
		} catch (Exception e) {
			LogExtendUtil.consoleLogErrorPrint(log, "WS消息处理方法初始化失败,", e);
			return false;
		}
		try {
			URI iotURI = new URI(buildIotWebsocketUrl());
			LogExtendUtil.consoleLogInfoPrint(log, "WS连接地址:" + iotURI);
			wsClient = new WsClient(iotURI);
		} catch (Exception e) {
			LogExtendUtil.consoleLogErrorPrint(log, "WS客户端通讯模块初始化失败,原因:IOT WS通讯通道客户端创建失败," + buildIotWebsocketUrl(), e);
			return false;
		}
		return true;
	}

	@Override
	public boolean start0() {
		if (!wsClient.clientStart()) {
			LogExtendUtil.consoleLogInfoPrint(log, "WS客户端通讯模块启动失败,原因:IOT WS通讯通道客户端启动失败!");
			return false;
		}
		try {
			// ws登录
			clientLogin();
			// 启动连接重连线程
			clientReconnectThread.start();
			// 启动心跳检测
			wsHeartbeatThread.start();
		} catch (MicroRuntimeException mre) {
			LogExtendUtil.consoleLogErrorPrint(log, "会话登录异常:", mre);
			return false;
		}
		LogExtendUtil.consoleLogInfoPrint(log, "----------WS客户端通讯模块----------");
		return true;
	}

	@Override
	public boolean stop0() {
		try {
			wsClient.clientDestroy();
			wsHeartbeatThread.interrupt();
		} catch (MicroRuntimeException mre) {
			LogExtendUtil.consoleLogErrorPrint(log, "会话登出异常:", mre);
			// 销毁异常,也强制退出
		}
		return true;
	}

	/**
	 * WS通道是否准备完成
	 *
	 * @return
	 */
	public boolean isOk() {
		return wsClient.clientIsOk();
	}

	/**
	 * WS通道重连线程唤醒---异步
	 *
	 * @return
	 */
	public void wsReconnWake() {
		//连接不可用事件发布
		EventHelper.publish(new WsConnectionInvalidEvent());
		clientReconnectThread.interrupt();
	}

	/**
	 * 构建IOT WS地址
	 * 
	 * @return
	 */
	public String buildIotWebsocketUrl() {
		return buildIotWebsocketUrl(EDCALL_DOMAIN);
	}
	
	/**
	 * WS会话重连-包含业务
	 * 产生连接成功事件
	 * 重新认证
	 * 产生登录  成功事件
	 * @return
	 */
	public boolean wsClientReconn() {
		LogExtendUtil.consoleLogInfoPrint(log,"----WS会话重连开始----");
		boolean result=false;
		if(wsClient.clientReConnect()) {
			try {
				clientLogin();
				result=true;
			}catch(Exception e) {
				LogExtendUtil.consoleLogWarnPrint(log,"WS会话重连失败,原因:WS登录失败!");
			}
		}else {
			LogExtendUtil.consoleLogWarnPrint(log,"WS会话重连失败,原因:通道重连失败!");
		}
		LogExtendUtil.consoleLogInfoPrint(log,"-----WS会话重连结束----");
		return result;
	}

	/**
	 * 构建IOT WS地址
	 * 
	 * @param host
	 *            主机域名/主地址
	 * @return
	 */
	public String buildIotWebsocketUrl(String host) {
		String wsHost = host;
		if (host.startsWith("https")) {
			wsHost = host.replaceFirst("https", "wss");
		} else {
			wsHost = host.replaceFirst("http", "ws");
		}
		return wsHost + "/ws/edcall/iot.ws";
	}

	/**
	 * 初始化WS处理方法
	 */
	public void initWsAction() throws Exception {
		List<Class<?>> clazzs = DeviceBootstrap.service.getClassByBasePackage();
		if (null == clazzs) {
			return;
		}
		for (Class<?> clazz : clazzs) {
			WsHandler wsHandler = clazz.getAnnotation(WsHandler.class);
			if (null == wsHandler) {
				continue;
			}
			if(!ServiceManager.getInstance().checkConditionalOnService(clazz)) {
				LogExtendUtil.consoleLogInfoPrint(log,clazz+"WsHandler未初始化,因为关联(ConditionalOnService)服务不存在!");
				//关联对象未找到
				continue;
			}
			
			Method[] methods = clazz.getMethods();
			if (null == methods) {
				continue;
			}
			Object wsHandlerObj = clazz.newInstance();
			for (Method method : methods) {
				WsAction wsAction = method.getAnnotation(WsAction.class);
				if (null == wsAction) {
					continue;
				}
				int parSize = method.getParameterTypes().length;
				if (parSize > 1) {
					throw new MicroRuntimeException(WsError.ws_action_pars_error);
				}
				WsType wsType = wsAction.type();
				WsActionOpData wsActionOpData = new WsActionOpData();
				wsActionOpData.setBean(wsHandlerObj);
				wsActionOpData.setMethod(method);
				wsActionOpData.setWsAction(wsAction);
				if (WsType.req.equals(wsType)) {
					reqWsActionOp.put(wsAction.action(), wsActionOpData);
				} else if (WsType.resp.equals(wsType)) {
					respWsActionOp.put(wsAction.action(), wsActionOpData);
				}
			}
		}
	}

	/**
	 * 同步发送请求消息
	 *
	 * @param <R>
	 * @param <P>
	 * @param wsReq
	 * @return 返回结果中为Void模型
	 * @throws MicroRuntimeException
	 */
	public <R> WsResp<Void> syncSendWsReq(WsReq<R> wsReq) throws MicroRuntimeException {
		if (null == wsReq) {
			return null;
		}
		return syncSendWsReq(wsReq, Void.class);
	}

	/**
	 * 同步发送请求消息
	 * 
	 * @param <R>
	 * @param <P>
	 * @param wsReq
	 * @return
	 * @throws MicroRuntimeException
	 */
	public <R, P> WsResp<P> syncSendWsReq(WsReq<R> wsReq,Class<P> respDataType) throws MicroRuntimeException {
		if (null == wsReq) {
			return null;
		}
		String id = wsReq.getId();
		try {
			ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
			reqRespCDL.put(id, cdl);
			wsClient.sendWsReq(wsReq);
			WsResp<P> resp = cdl.awaitResp();
			return resp;
		} catch (MicroRuntimeException mre) {
			reqRespCDL.remove(id);
			throw mre;
		} catch (Exception e) {
			reqRespCDL.remove(id);
			throw new MicroRuntimeException(WsError.ws_error, e);
		}
	}

	/**
	 * 发起请求数据
	 * 
	 * @param <T>
	 * @param wsReq
	 */
	public <T> void send(WsReq<T> wsReq) {
		wsClient.sendWsReq(wsReq);
	}

	/**
	 * 响应请求 根据id唤醒同步请求并返回结果
	 *
	 * @param wsResp
	 */
	public void respCountDownLatch(WsResp<?> wsResp) {
		ReqRespCountDownLatch cdl = reqRespCDL.remove(wsResp.getId());
		if (null != cdl) {
			cdl.setResp(wsResp);
			cdl.countDown();
		}
	}
	
	/**
	 * 根据消息id获取响应闭锁中的响应类型
	 * @param id
	 * @return
	 */
	public Class<?> getReqRespCdlRespDataType(String id) {
		ReqRespCountDownLatch cdl = reqRespCDL.get(id);
		if (null != cdl) {
			return cdl.getDataType();
		}
		return null;
	}

	/**
	 * 会话登录
	 * 
	 * @param username
	 * @param pwd
	 * @throws MicroRuntimeException
	 */
	public void clientLogin() throws MicroRuntimeException {
		String username = ConfigUtil.get(Constants.WS_USERNAME);
		String pwd = ConfigUtil.get(Constants.WS_PWD);
		LogExtendUtil.consoleLogInfoPrint(log, "WS会话开始登录," + username + "," + pwd);
		try {
			WsReq<WsLoginReqVo> loginReq = new WsReq<>();
			loginReq.setId(UUIDUtils.generate());
			loginReq.setAction(WsClient.WS_LOGIN_ACTION_NAME);
			WsLoginReqVo loginVo = new WsLoginReqVo();
			loginVo.setUsername(username);
			loginVo.setPwd(pwd);
			loginReq.setData(loginVo);
			WsResp<Void> loginResp = syncSendWsReq(loginReq);
			loginResp.tryData();
			//登录成功事件
			EventHelper.publish(new WsLoginEvent());
			LogExtendUtil.consoleLogInfoPrint(log, "WS会话登录成功:" + clinetInfo());
		} catch (MicroRuntimeException e) {
			LogExtendUtil.consoleLogErrorPrint(log, "WS会话登录异常:", e);
			throw e;
		}
	}

	public String clinetInfo() {
		return wsClient.clinetInfo();
	}

	public ReqWsActionOp getReqWsActionOp() {
		return reqWsActionOp;
	}

	public RespWsActionOp getRespWsActionOp() {
		return respWsActionOp;
	}

	public WsClient getWsClient() {
		return wsClient;
	}

}

重连线程

java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.commnetsoft.pub.pro.base.module.ModuleManager;

/**
 * WS客户端会话重连线程
 * 
 * @author Brack.zhu
 * @date 2020年11月26日
 */
public class ClientReconnectThread extends Thread {

	private Logger log = LoggerFactory.getLogger(getClass());

	private boolean stopFlag = false;
	
	private long succeedSleepTime=1000*60*10;
	
	private long failSleepTime=1000*30;

	public ClientReconnectThread() {
		super("WsClient Reconnect Thread");
	}
	
	/**
	 * 设置停止标签
	 * 线程逻辑退出
	 */
	public void setStopFlag() {
		this.stopFlag=true;
	}

	@Override
	public void run() {
		/log.warn("WS客户端会话重连线程启动!");
		long sleepTime=succeedSleepTime;
		while (!stopFlag) {
			try {
				Thread.sleep(sleepTime);
			} catch (InterruptedException e) {
				log.warn("WS客户端会话重连线程被唤醒!");
			}
			if(stopFlag) {
				//线程停止
				break;
			}
			WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
			if(!wsClientModule.isOk()) {
				if(wsClientModule.wsClientReconn()) {
					log.info("WS客户端重连成功,"+wsClientModule.getWsClient().clinetInfo());
					sleepTime=succeedSleepTime;
				}else {
					log.warn("WS客户端重连失败!"+wsClientModule.getWsClient().clinetInfo());
					sleepTime=failSleepTime;
				}	
			}
		}
		log.warn("WS客户端会话重连线程退出!");
	}

}

发送心跳包线程

java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.commnetsoft.pub.pro.base.module.ModuleManager;

import cn.edcall.module.device.constant.Constants;
import cn.edcall.module.device.util.ConfigUtil;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;

/**
 * ws心跳定时发送线程
 * 
 * @author Brack.zhu
 * @date 2020年11月25日
 */
public class WsHeartbeatThread extends Thread {

	private Logger log = LoggerFactory.getLogger(getClass());
	
	/**
	 * 心跳间隔时间-默认15秒
	 */
	private long sleepInterval=1500*10;
	
	public WsHeartbeatThread() {
		super("Ws Heartbeat Thread");
		sleepInterval=ConfigUtil.getLong(Constants.WS_HEARTBEAT_INTERVAL_MS, sleepInterval);
		log.info("WS心跳间隔时间(毫秒):"+sleepInterval);
	}

	@Override
	public void run() {
		log.warn("ws心跳定时发送线程启动!");
		while (true) {
			heartbeat();
			try {
				Thread.sleep(sleepInterval);
			} catch (InterruptedException e) {
				break;
			}
		}
		log.warn("ws心跳定时发送线程退出!");

	}

	/**
	 * 会话心跳
	 * 
	 * @param username
	 * @param pwd
	 * @throws MicroRuntimeException
	 */
	public void heartbeat() {
		try {
			WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
			if(wsClientModule.isOk()) {
				WsReq<Void> heartbeatReq = new WsReq<>();
				heartbeatReq.setId(UUIDUtils.generate());
				heartbeatReq.setAction(WsClient.WS_HEARTBEAT_ACTION_NAME);
				WsResp<Void> heartbeatResp = wsClientModule.syncSendWsReq(heartbeatReq);
				heartbeatResp.tryData();	
			}else {
				log.warn("心跳包发送失败,连接不可用,{}",wsClientModule.clinetInfo());
			}
		} catch (Exception e) {
			log.error("WS会话心跳异常:", e);
		}
	}

}