欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

一个简单的分布式系统数据同步解决方案

程序员文章站 2024-03-25 23:59:58
...

在分布式系统中,最常见的问题就是如何保证数据一致性,本文主要解决最终一致性。项目A中表A进行新增和修改操作时,项目B中的表B也需要进行新增和修改,生产者为A,消费者为B。
在项目A中新建实体MsgConsumerMessage,在项目B中新建表MsgProducerMessage:

public class MsgConsumerMessage {
    //主键ID,对应MsgProducerMessage的ID,不能设置为自增
    private Long id;
    //消息服务类型ID
    private Integer serviceId;
    //消息要同步的json数据
    private String params;
    //同步的时间
    private Long consumTime;
}
public class MsgProducerMessage {
    //主键ID
    private Long id;
    //消息服务类型ID
    private Integer serviceId;
    //消费者提供的API地址,用于push消息给消费者
    private String node;
    //要push的json数据
    private String params;
    //消息创建时间
    private Long createTime;
    //是否同步
    private Boolean sync;
}

在项目A需要同步数据的方法中,新增一条MsgProducerMessage记录,使用httpclient向项目B发送请求,接收到项目B接口的返回值后,r若成功则将sync改为true,失败则不改。另在项目A中添加定时任务,循环遍历未同步的MsgProducerMessage记录,依次访问消费者的API地址。
项目B提供的api接口如下:

//同步加锁用MAP
    public static final ConcurrentMap<String,byte[]> SYN_MSG_MAP = new ConcurrentHashMap<String, byte[]>();

    @RequestMapping(value = "pushMessage",method = RequestMethod.POST,produces = "application/json")
    @ResponseBody
    public String pushMessage(@RequestParam String message){
        try {
            if (StringUtils.isBlank(message)){
                throw new BusinessException("参数不正确");
            }
            log.info("pushMessage begin:" + message);
            //设置同步锁
            if(!SYN_MSG_MAP.containsKey(message)){
                SYN_MSG_MAP.putIfAbsent(message, new byte[0]);
            }
            //设置同步代码块
            synchronized (SYN_MSG_MAP.get(message)) {
                //消费信息对象并加锁
                MsgConsumerMessage msgConsumerMessage = msgConsumerMessageService.consumeMessage(message);
                //事务执行完成打印日志
                if(msgConsumerMessage != null){
                    log.info("pushMessage success:" + msgConsumerMessage.toString());
                }else{
                    log.info("pushMessage alreadySuccess:" + message);
                }
            }
           return JsonTemplate.createSimpleJson("result", "success");
        } catch (Exception e){
            log.error("pushMessage failure:" + message + ",error:" + e.getMessage());
            return JsonTemplate.createSimpleJson("result", "failure");
        } finally {
            if(SYN_MSG_MAP.containsKey(message))
                SYN_MSG_MAP.remove(message);
        }

    }
}

为防止多条重复消息发送并发访问此api接口,这里使用了ConcurrentHashMap做为同步锁进行同步,consumeMessage方法就是同步json数据的方法,这里就不再详细写明。只需注意在同步前查询一下是否已存在MsgConsumerMessage记录,不存在则开始同步,并给需要同步的数据实体加上更新锁。如果要保证数据更新的顺序性,可在所需更新的数据的表中添加updateTime字段,同步的时候进行时间判断。同步成功后添加一条MsgConsumerMessage记录,表明已同步,同时返回给项目A同步成功标志。

转载于:https://www.jianshu.com/p/5ad948f6dd13