欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

异步事件框架

程序员文章站 2022-05-19 14:45:55
...

简介

  • 基于生产者与消费者模型实现
  • 生产者在为框架的外部接口,发送待处理事件至同步优先队列中。
  • 消费者从事件队列中读取事件
  • 根据读取到的事件,根据事件的类型分发至具体的处理器

流程图如下:
异步事件框架

  • 同步优先队列使用Redis存储,方便分布式环境使用
  • 在消费者中使用线程池并发从同步队列中获取待处理事件
  • 每个事件处理器均接口同一接口,在Spring中可以直接获取所有的处理器实现类,再根据期望处理的事件类型初始化 事件映射表
  • 事件处理期望处理的类型应为事件类型集合,因为一个事件处理器可能期望处理多种事件类型

代码实现

事件模型

public class EventModel {

    /** 事件类型 */
    private EventType eventType;

    /** 触发者ID */
    private int actorId;

    /** 数据实体ID */
    private int entityId;

    /** 数据实体类型 */
    private int entityType;

    /** 事件所属者 */
    private int entityOwnerId;

    /** 扩展数据字段 */
    private Map<String, String> exts = new HashMap<String, String>();

    public EventModel() {}

    public EventModel(EventType eventType) {
        this.eventType = eventType;
    }

    public EventModel setExts(String key, String value) {
        this.exts.put(key, value);

        return this;
    }

    public EventModel setExts(Map<String, String> map) {
        this.exts.putAll(map);

        return this;
    }

    public EventType getEventType() {
        return eventType;
    }

    public EventModel setEventType(EventType eventType) {
        this.eventType = eventType;

        return this;
    }

    public int getActorId() {
        return actorId;
    }

    public EventModel setActorId(int actorId) {
        this.actorId = actorId;

        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;

        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;

        return this;
    }

    public int getEntityOwnerId() {
        return entityOwnerId;
    }

    public EventModel setEntityOwnerId(int entityOwnerId) {
        this.entityOwnerId = entityOwnerId;

        return this;
    }

    public Map<String, String> getExts() {
        return exts;
    }

    public String getExts(String key) {
        return this.exts.get(key);
    }
}

事件类型(枚举类)

public enum EventType {

    LIKE(0),
    COMMENT(1),
    LOGIN(2),
    MAIL(3),
    FOLLOW(4),
    UNFOLLOW(5);

    private int value;

    EventType(int value) {
        this.value = value;
    }

    public int getValue() {
        return this.value;
    }
}

生产者

@Service
public class EventProducer {

    private static final Logger logger = LoggerFactory.getLogger(EventProducer.class);

    @Autowired
    private JedisAdapter jedisAdapter;

    /**
     * 向事件队列发送事件
     * @param eventModel
     * @return
     */
    public boolean emitEvent(EventModel eventModel) {
        try {
            String json = JSONObject.toJSONString(eventModel);
            String key = RedisUtil.getEventQueueKey();

            jedisAdapter.lpush(key, json);
            return true;
        } catch(Exception e) {
            logger.error("出现异常"+e.getMessage());
        }

        return false;
    }
}

消费者

@Service
public class EventConsumer implements InitializingBean, ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    /** 映射表 */
    private Map<EventType, List<EventHandler>> config = new HashMap<EventType, List<EventHandler>>();

    private ApplicationContext applicationContext;

    @Autowired
    private JedisAdapter jedisAdapter;

    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);

        if(beans != null) {
            for(Map.Entry<String, EventHandler> entry : beans.entrySet()) {
                List<EventType> eventTypes = entry.getValue().getSupportEventTypes();

                for(EventType type : eventTypes) {
                    if(!config.containsKey(type))
                        config.put(type, new ArrayList<EventHandler>());

                    config.get(type).add(entry.getValue());
                }
            }
        }

        Thread thread = new Thread() {
            @Override
            public void run() {
                // 使用线程池分发事件
                ExecutorService exec = Executors.newCachedThreadPool();
                while(true) {
                    String key = RedisUtil.getEventQueueKey();
                    List<String> events = jedisAdapter.brpop(0, key);

                    for(String event : events) {
                        if(event.equals(key))
                            continue;

                        EventModel model = JSONObject.parseObject(event, EventModel.class);

                        exec.execute(new Consumer(model));
                    }
                }
            }
        };
        thread.start();
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        this.applicationContext = applicationContext;
    }

    class Consumer implements Runnable {

        private EventModel eventModel;

        public Consumer(EventModel eventModel) {
            this.eventModel = eventModel;
        }

        @Override
        public void run() {
            if(!config.containsKey(eventModel.getEventType())) {
                logger.error("不能识别的事件类型"+eventModel.getEventType());
                return;
            }

            for(EventHandler handler : config.get(eventModel.getEventType())) {
                handler.doHandle(eventModel);
            }
        }
    }
}

事件处理接口

public interface EventHandler {

    // 具体的处理操作
    void doHandle(EventModel eventModel);

    // 支持处理的事件类型
    List<EventType> getSupportEventTypes();
}
相关标签: 异步框架