欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

缓存方式幂等性工具、消息队列方式幂等性工具、检测客户端服务端是否在线

程序员文章站 2022-06-24 15:09:50
1、缓存方式幂等性工具import cn.hutool.core.util.IdUtil;import com.alibaba.fastjson.JSON;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;im...

1、缓存方式幂等性工具

import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;


/**
 * 通过唯一ID判断数据是否存在
 * 主要逻辑  存入数据不通过mysql主键进行做唯一标识,
 * 采用redis天然的幂等性,插入数据进行存储唯一标识,如果插入数据进行判断存在,不进行插入数据
 */
public class IdentityHash {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 将对象存入缓存中
     * @param obj   对象数据
     * @param timeout   超时时间
     */
    public void set(Object obj, long timeout) {
        if (obj instanceof String) {
            stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
            return;
        }
        stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
    }

    /**
     * 根据Key查询缓存中的数据
     * @param key
     * @return
     */
    public String get(final String key) {
        if (StringUtils.isEmpty(key)) {
            return null;
        }
        return stringRedisTemplate.opsForValue().get(key);
    }

    /**
     * 根据key删除缓存数据
     * @param key
     */
    public void delete(String key) {
        stringRedisTemplate.delete(key);
    }

    /**
     * 查询缓存是否存在
     * @param checkCase
     * @return
     */
    public boolean checkForm(String checkCase){
        String cacheValue = get(checkCase);
        /**如果查询缓存不为空,返回true*/
        if (StringUtils.isNotEmpty(cacheValue)){
            return true;
        }
        return false;
    }

}

2、消息队列方式幂等性工具

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class AmqpHash {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     *
     * @param queueName 队列名称
     */
    public void send(String queueName, Object obj) {
        Messages messages = new Messages();
        messages.setObj(obj);
        messages.setMessageId(String.valueOf(System.identityHashCode(obj)));
        amqpTemplate.convertAndSend(queueName, JSON.toJSONString(messages));
    }



    @RabbitHandler
    @RabbitListener(queues = "queue", concurrency = "1")
    public void process(String msg, Message message, Channel channel) throws IOException {
        log.debug("消费者接收到的消息 :{}, 来源:{} 时间:{}", msg, channel.getChannelNumber(), System.currentTimeMillis());
        try {
            if(msg.contains("5")){
                throw new RuntimeException("抛出异常");
            }
            log.info("消息{}消费成功",msg);
            //消息Id
            message.getMessageProperties().getMessageId();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            log.error("接收消息过程中出现异常,执行nack");
            //第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("消息{}异常",message.getMessageProperties().getHeaders());
        }
    }

    @Data
    static class Messages{

        private Object obj;

        private String messageId;

        private String uuid;

    }
}

3、检测客户端服务端是否在线

一、服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * 启动服务端服务
 * 需要开启一个新的线程来执行netty 服务端 或者客户端
 *         ThreadUtil.execute(() -> {
 *             Server.initServer();
 *             Client.initClient();
 *         });
 */
@Slf4j
public class Server {
    /**
     * 装有所有客户端channel的组
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    static class ServerHeader extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            Channel channel = ctx.channel();
            //循环channel组,判断是不是其它客户端发送的消息
            channelGroup.stream().forEach(ch -> {
                if (channel != ch) {
                    ch.writeAndFlush(channel.remoteAddress() + "---发送的消息为:" + msg + "\n");
                } else {
                    ch.writeAndFlush("【自己的消息】" + msg + "\n");
                }
            });
        }

        /**
         * 用户事件触发方法   判断事件
         *
         * @param ctx 上下文对象
         * @param evt 事件对象
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            //判断该事件是否为超时事件
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                String type;
                //判断具体事件  具体项目可以根据不同情况进行不同处理
                switch (event.state()) {
                    case READER_IDLE:
                        type = "读空闲";
                        break;
                    case WRITER_IDLE:
                        type = "写空闲";
                        break;
                    default:
                        type = "读写空闲";
                }
                log.info("%s==超时事件==%s\n", ctx.channel().remoteAddress().toString(), type);
                ctx.channel().close();
            }
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            //通知其它客户端
            channelGroup.writeAndFlush("【服务端:】" + channel.remoteAddress() + "加入\n");
            channelGroup.add(channel);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            log.info("【客户端:】" + channel.remoteAddress() + "上线\n");
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            log.info("【客户端:】" + channel.remoteAddress() + "下线\n");
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("【客户端:】" + channel.remoteAddress() + "离开\n");
            //此处不用从组中一处,掉线之后netty会自动的从组中移除
        }
    }



    static class ServerInit extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //添加处理器,netty通过解决不同的情景来添加不同的处理器
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    .addLast(new IdleStateHandler(100,200,300, TimeUnit.SECONDS))
                    .addLast(new ServerHeader());
        }
    }


    /**
     * 初始化
     */
    public static void initServer(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ServerInit());
            // 端口可以自定义
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            log.error("初始化失败:{}",e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

二、客户端:

import cn.hutool.cron.CronUtil;
import cn.hutool.cron.task.Task;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * 启动客户端服务
 * 需要开启一个新的线程来执行netty 服务端 或者客户端
 *         ThreadUtil.execute(() -> {
 *             Server.initServer();
 *             Client.initClient();
 *        });
 */
@Slf4j
public class Client {

    static class ClientHeader extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
            log.info("服务端消息:" + msg);
        }
    }

    static class ClientInit extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //添加处理器,netty通过解决不同的情景来添加不同的处理器
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    .addLast(new ClientHeader());
        }
    }

    /**
     * 初始化客户端
     */
    public static void initClient(){
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInit());
            //获得channel对象
            Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
            // 支持秒级别定时任务
            CronUtil.setMatchSecond(true);
            CronUtil.start();
            CronUtil.schedule("*/2 * * * * *", (Task) () -> {
                String str = String.valueOf(System.currentTimeMillis());
                BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(str.getBytes())));
                try {
                    channel.writeAndFlush(br.readLine() + "\r\n");
                } catch (IOException e) {
                    log.error("通道已关闭:{}",e);
                }
            });
        } catch (Exception e) {
            log.info("启动客户端失败:{}",e);
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

三、springboot方式启动:

import cn.hutool.core.thread.ThreadUtil;
import com.imooc.stream.utils.Client;
import com.imooc.stream.utils.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;

@Slf4j
@SpringBootApplication
public class StreamApplication extends SpringBootServletInitializer implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }


    /**
     * 支持使用外置的tomcat启动
     * @param application
     * @return
     */
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        log.info("启动加载自定义的ServletInitializer");
        return application.sources(StreamApplication.class);
    }

    @Override
    public void run(String... args) {
        log.info("启动成功, 同时启动netty服务");
        //需要开启一个新的线程来执行netty 服务端 或者客户端
        ThreadUtil.execute(() -> {
            Server.initServer();
            Client.initClient();
        });
    }
}

本文地址:https://blog.csdn.net/qq_32447301/article/details/112854611