标签
发布订阅
字数
2485 字
阅读时间
13 分钟
一、方式一
单发布者关联单订阅者,发布者创建时需关联订阅者,启动30没有消息则关闭发布消息线程,发送消息时再开启,发布订阅对象创建时需添加订阅者的对象。
发布者,声明订阅者和发布者接口,由处理程序实现发布者的接口
订阅者接口
java
/**
* 订阅者模式 订阅者接口 <i>服务器规范接口</i>
* @author chenli
* @date 2016-2-3
* @version 1.0
* @see 组织订阅接口{@link AbstractOrgSubscriber}
* @see 用户订阅接口{@link AbstractUserSubscriber}
* @see 用户账户安全配置订阅接口{@link AbstractUserSafetySubscriber }
* @see 资源订阅接口 {@link AbstractAppSubscriber}
* @see 资源的用户角色订阅接口 {@link AbstractAppUserRoleSubscriber}
* @see 发布器{@link Publisher}
*/
@SuppressWarnings("rawtypes")
public interface ISubscriber<T extends IdEntity> extends IName,IPriority{
/**
* 新增事件通知.
* <br>通知者已经使用生产者和消费者模式
* @param obj 新增的对象
* @param extend 对象的自定义属性
* @param number 操作流水号
*/
public void insert(T obj,Map<String,String> extend, OptSerialNum number);
/**
* 变更事件通知
* <br>通知者已经使用生产者和消费者模式
* @param obj 最新的对象
* @param extend 自定义属性。最新新值
* @param change 发生变更的数据。 包括用户扩展数据等用户基础信息
* @param number 操作流水号
*/
public void update(T obj, Map<String,String> extend, Map<String,Object> change, OptSerialNum number);
/**
* 删除事件通知
* <br>通知者已经使用生产者和消费者模式
* @param obj 删除的对象
* @param number 操作流水号
*/
public void delete(T obj, OptSerialNum number);
/**
* 是否使用异步通知
* @return
*/
public boolean isAsync();
}发布者接口
java
/**
* 订阅者模式 消息发布者 <i>服务器规范接口</i>
* @author chenli
* @date 2016-2-3
* @version 1.0
* @see 多例模式使用spring管理实例对象{@link Publisher}
*/
@SuppressWarnings("rawtypes")
public interface IPublisher<T extends IdEntity> extends ISubscriber<T>{
/**
* 增加订阅者
* @param subscriber
*/
public void addSubscriber(ISubscriber<T> subscriber);
/**
* 删除订阅者
* @param subscriber
*/
public boolean delSubscriber(ISubscriber<T> subscriber);
/**
* 返回订阅者的数量
* @return
*/
public int size();
}发布者
发布者实现接口,内部包含订阅者集合,可以对订阅者进行添加、修改何删除。可定义多个发布者对应不同的订阅者。订阅者也可进行抽象成某一类型
java
/**
* 建议使用spring多例模式管理,针对每一个类型(泛型)生成单例。
* 使用生产者消费者模式。
* @author chenli
* @date 2016-2-3
* @version 1.0
*/
@SuppressWarnings("rawtypes")
public class Publisher<T extends IdEntity> implements IPublisher<T>, Runnable{
private final static Logger logger = LoggerFactory.getLogger(Publisher.class);
private String name;
private String desc;
private boolean ischangeSubs = true;
private ISubscriber<T>[] subs;
private Vector<ISubscriber<T>> subscribers = new Vector<ISubscriber<T>>();
private ReentrantLock Sublock = new ReentrantLock();
private ReentrantLock msglock = new ReentrantLock();
private ReentrantLock msglock2 = new ReentrantLock();
//创建本线程
private Thread publisherThread;
//创建本线程
private Thread publisherThread2;
//通知队列
private BlockingQueue<SubscriberMsg<T>> msgqueue = new LinkedBlockingQueue<SubscriberMsg<T>>(1024*5);
@Override
public void addSubscriber(ISubscriber<T> subscriber) {
try {
Sublock.lock();
if(subscribers.contains(subscriber) == false){
subscribers.add(subscriber);
ischangeSubs = true;
}
} catch (Exception e) {
// IGNORE
}finally{
Sublock.unlock();
}
}
@Override
public boolean delSubscriber(ISubscriber<T> subscriber) {
try {
Sublock.lock();
if(subscribers.contains(subscriber)){
boolean result = subscribers.remove(subscriber);
if(result)ischangeSubs = true;
return result;
}
} catch (Exception e) {
// IGNORE
}finally{
Sublock.unlock();
}
return false;
}
@Override
public int size() {
return subscribers.size();
}
/**
* 获取对象的时候 返回copy对象,防止出现{@link ConcurrentModificationException}异常
* @return
* @author chenli
* @data 2016-2-3
*/
@SuppressWarnings("unchecked")
protected ISubscriber<T>[] getSubscribers(){
if(!ischangeSubs){
return subs;
}
try {
Sublock.lock();
subs = new ISubscriber[subscribers.size()];
Collections.sort(subscribers,new PriorityComparator());
subs = subscribers.toArray(subs);
ischangeSubs = false;
} catch (Exception e) {
// IGNORE
}finally{
Sublock.unlock();
}
return subs;
}
@Override
public void run() {
while(true){
try {
//30分钟没有则退出线程
SubscriberMsg<T> msg = msgqueue.poll(30, TimeUnit.MINUTES);
if(msg == null){
break;
}
switch (msg.getType()) {
case SubscriberMsg.TYPE_INSERT:
noticeInsert(msg.getObj(), msg.getExtend(), msg.getNumber(), true);
break;
case SubscriberMsg.TYPE_UPDATE:
noticeUpdate(msg.getObj(), msg.getExtend(), msg.getChange(), msg.getNumber(), true);
break;
case SubscriberMsg.TYPE_DELETE:
noticeDelete(msg.getObj(), msg.getNumber(), true);
break;
default:
logger.warn("发布者【{}】发布错误的消息类型", this.getName());
break;
}
} catch (Exception e) {
logger.error("发布者{}订阅者{},事件处理异常",getName(),SerializerUtil.serialize(getSubscribers()),e);
}
}
}
/**
* 使用生产者消费者方式通知订阅者。
* <br>可能对通知对象存在消息滞后。 (如后期增加的订阅者,可能也会通知之前的消息。)
*/
public void insert(T obj, java.util.Map<String,String> extend, OptSerialNum number) {
runThread();
SubscriberMsg<T> msg = new SubscriberMsg<T>();
msg.setType(SubscriberMsg.TYPE_INSERT);
msg.setObj(obj);
msg.setNumber(number);
msg.setExtend(extend);
try {
//同步通知
noticeInsert(msg.getObj(), msg.getExtend(), msg.getNumber(), false);
//异步通知
msgqueue.put(msg);
} catch (Exception e) {
String str = "发布者(%s)通知insert事件异常,参数:obj=%s,number=%s";
try {
str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, this.getName(),obj,number);
}
logger.error(str,e);
}
}
public void delete(T obj, OptSerialNum number) {
runThread();
SubscriberMsg<T> msg = new SubscriberMsg<T>();
msg.setType(SubscriberMsg.TYPE_DELETE);
msg.setObj(obj);
msg.setNumber(number);
try {
//同步通知
noticeDelete(msg.getObj(), msg.getNumber(), false);
//异步通知
msgqueue.put(msg);
} catch (Exception e) {
String str = "发布者(%s)通知delete事件异常,参数:obj=%s,number=%s";
try {
str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, this.getName(),obj,number);
}
logger.error(str,e);
}
};
public void update(T obj, java.util.Map<String,String> extend, java.util.Map<String,Object> change, OptSerialNum number) {
runThread();
SubscriberMsg<T> msg = new SubscriberMsg<T>();
msg.setType(SubscriberMsg.TYPE_UPDATE);
msg.setObj(obj);
msg.setNumber(number);
msg.setExtend(extend);
msg.setChange(change);
try {
//同步通知
noticeUpdate(msg.getObj(), msg.getExtend(), msg.getChange(), msg.getNumber(), false);
//异步通知
msgqueue.put(msg);
} catch (Exception e) {
String str = "发布者(%s)通知update事件异常,参数:obj=%s,number=%s";
try {
str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, this.getName(),obj,number);
}
logger.error(str,e);
}
};
/**
* 检测线程是否启动,如果未启动则启动线程
* @author chenli
* @data 2016-10-9
*/
private void runThread(){
if(publisherThread == null || publisherThread.isAlive() == false){
try {
msglock.lock();
if(publisherThread == null || publisherThread.isAlive() == false){
publisherThread = new Thread(this,this.getName());
publisherThread.start();
}
} catch (Exception e) {
logger.error("发布者1【{}】线程启动失败", this.getName(),e);
}finally{
msglock.unlock();
}
}
if(publisherThread2 == null || publisherThread2.isAlive() == false){
try {
msglock2.lock();
if(publisherThread2 == null || publisherThread2.isAlive() == false){
publisherThread2 = new Thread(this,this.getName());
publisherThread2.start();
}
} catch (Exception e) {
logger.error("发布者2【{}】线程启动失败", this.getName(),e);
}finally{
msglock2.unlock();
}
}
}
/**
* 通知订阅者。新增事件
* @param obj 对象
* @param extend 对象扩展
* @param number 操作号
* @param isasync 是否异步
* @author chenli
* @data 2016-10-10
*/
protected void noticeInsert(T obj, Map<String,String> extend, OptSerialNum number, boolean isasync) {
ISubscriber<T>[] subs = getSubscribers();
for (ISubscriber<T> subscriber : subs) {
try {
if(subscriber.isAsync() == isasync){
//long t1 = System.currentTimeMillis();
subscriber.insert(obj, extend, number);
//long t2 = System.currentTimeMillis() - t1;
/*if(t2 > 500){
logger.error(subscriber.getName() + " run too long:" + t2);
}*/
}
} catch (Exception e) {
String str = "订阅者subscriber=%s处理insert事件异常,参数:obj=%s,number=%s";
try {
str = String.format(str, subscriber.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, subscriber.getName(),obj,number);
}
logger.error(str,e);
}
}
}
/**
* 通知订阅者删除事件
* @param obj 对象
* @param extend 对象扩展
* @param number 操作号
* @param isasync 是否异步
* @author chenli
* @data 2016-10-10
*/
protected void noticeDelete(T obj, OptSerialNum number, boolean isasync) {
ISubscriber<T>[] subs = getSubscribers();
for (ISubscriber<T> subscriber : subs) {
try {
if(subscriber.isAsync() == isasync){
//long t1 = System.currentTimeMillis();
subscriber.delete(obj,number);
//long t2 = System.currentTimeMillis() - t1;
/*if(t2 > 500){
logger.error(subscriber.getName() + " run too long:" + t2);
}*/
}
} catch (Exception e) {
String str = "订阅者subscriber=%s处理delete事件异常,参数:obj=%s,number=%s";
try {
str = String.format(str, subscriber.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, subscriber.getName(),obj,number);
}
logger.error(str,e);
}
}
}
/**
* 通知订阅者更新事件
* @param obj 对象
* @param extend 对象扩展
* @param number 操作号
* @param isasync 是否异步
* @author chenli
* @data 2016-10-10
*/
protected void noticeUpdate(T obj, Map<String,String> extend, Map<String,Object> change, OptSerialNum number, boolean isasync) {
ISubscriber<T>[] subs = getSubscribers();
for (ISubscriber<T> subscriber : subs) {
try {
if(subscriber.isAsync() == isasync){
//long t1 = System.currentTimeMillis();
subscriber.update(obj, extend, change, number);
//long t2 = System.currentTimeMillis() - t1;
/*if(t2 > 500){
logger.error(subscriber.getName() + " run too long:" + t2);
}*/
}
} catch (Exception e) {
String str = "订阅者subscriber=%s处理update事件异常,参数:id=%s,change=%s,number=%s";
Serializable id = null;
if(obj != null){
id = obj.getId();
}
try {
str = String.format(str, subscriber.getName(),id,SerializerUtil.serialize(change),SerializerUtil.serialize(number));
} catch (Exception e2) {
str = String.format(str, subscriber.getName(),id,change,number);
}
logger.error(str,e);
}
}
}
@Override
public boolean isAsync() {
return true;
}
@Override
public int priority() {
return PRIORITY_DEAULT;
}
@Override
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}订阅者
订阅者抽象为一类,可进行多扩展
java
/**
* 用户角色订阅
* @author wuzm
*
*/
public abstract class AbstractUserRoleSubscriber implements ISubscriber<UserRole>,IPriority{
/**
* 默认优先级5.
* 如果需要设置优先级,子类请重写该方法。
*/
public int priority() {
return PRIORITY_DEAULT;
}
/**
* 默认使用异步通知
*/
@Override
public boolean isAsync() {
return true;
}
}二、方式二
定义事件和事件监听,通过事件发布控制程序发布,通过线程池执行特定的事件监听。全局发布订阅使用同一个控制程序。
事件接口
java
/**
* 事件模型接口
* @author Brack.zhu
* @date 2020年12月2日
*/
public interface Event {
/**
* 事件发起时间
*/
long time=System.currentTimeMillis();
}事件监听接口
java
/**
* 事件监听接口
* @author Brack.zhu
* @date 2020年12月2日
*/
public interface EventListener<E extends Event> {
/**
* 事件触发
* @param event
*/
void onEvent(E event);
}监听执行方法抽象类
java
/**
* 监听抽象类,默认注册到事件工具类中
* @author Brack.zhu
* @date 2020年12月3日
* @param <E>
*/
public abstract class AbstEventListener<E extends Event> implements EventListener<E> {
public AbstEventListener() {
EventHelper.registerEventListener(this);
}
}事件工具类
java
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edcall.module.device.util.ClassUtil;
/**
* 事件工具类
*
* @author Brack.zhu
* @date 2020年12月2日
*/
public class EventHelper {
private static HashMap<String, List<EventListener<? extends Event>>> eventListeners = new HashMap<>();
private static ExecutorService executors = Executors.newCachedThreadPool();
private static Logger log = LoggerFactory.getLogger(EventHelper.class);
/**
* 发布事件
*
* @param <E>
* @param event
*/
public static void publish(Event event) {
if (null == event) {
return;
}
List<EventListener<?>> listeners = eventListeners.get(event.getClass().getSimpleName());
if (null == listeners || listeners.size() == 0) {
return;
}
executors.execute(new Runnable() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void run() {
for (EventListener eventListener : listeners) {
try {
eventListener.onEvent(event);
} catch (Exception e) {
log.error("", e);
}
}
}
});
}
/**
* 注册事件监听
*
* @param <E>
* @param event
*/
public static <E extends Event> void registerEventListener(EventListener<E> event) {
Class<?> genericsClazz = ClassUtil.getClassGenericsType(event, 0);
String simpleName = genericsClazz.getSimpleName();
synchronized (simpleName) {
List<EventListener<? extends Event>> events = eventListeners.get(simpleName);
if (null == events) {
events = new LinkedList<>();
eventListeners.put(simpleName, events);
}
events.add(event);
}
}
}使用
java
实现Event接口标识事件类型
新建一个类继承监听实现抽象类AbstEventListener,反省未对应的Event事项类
使用
在方法中新建特定事件监听类,(构造方法中会将事件注册至EventHelper类中),使用EventHelper中的publish方法会通过线程池调用监听器中的实现逻辑。