缓存方式幂等性工具、消息队列方式幂等性工具、检测客户端服务端是否在线
程序员文章站
2022-06-24 15:09:50
1、缓存方式幂等性工具import cn.hutool.core.util.IdUtil;import com.alibaba.fastjson.JSON;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;im...
1、缓存方式幂等性工具
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* 通过唯一ID判断数据是否存在
* 主要逻辑 存入数据不通过mysql主键进行做唯一标识,
* 采用redis天然的幂等性,插入数据进行存储唯一标识,如果插入数据进行判断存在,不进行插入数据
*/
public class IdentityHash {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 将对象存入缓存中
* @param obj 对象数据
* @param timeout 超时时间
*/
public void set(Object obj, long timeout) {
if (obj instanceof String) {
stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
return;
}
stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
}
/**
* 根据Key查询缓存中的数据
* @param key
* @return
*/
public String get(final String key) {
if (StringUtils.isEmpty(key)) {
return null;
}
return stringRedisTemplate.opsForValue().get(key);
}
/**
* 根据key删除缓存数据
* @param key
*/
public void delete(String key) {
stringRedisTemplate.delete(key);
}
/**
* 查询缓存是否存在
* @param checkCase
* @return
*/
public boolean checkForm(String checkCase){
String cacheValue = get(checkCase);
/**如果查询缓存不为空,返回true*/
if (StringUtils.isNotEmpty(cacheValue)){
return true;
}
return false;
}
}
2、消息队列方式幂等性工具
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Slf4j
@Service
public class AmqpHash {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送消息
*
* @param queueName 队列名称
*/
public void send(String queueName, Object obj) {
Messages messages = new Messages();
messages.setObj(obj);
messages.setMessageId(String.valueOf(System.identityHashCode(obj)));
amqpTemplate.convertAndSend(queueName, JSON.toJSONString(messages));
}
@RabbitHandler
@RabbitListener(queues = "queue", concurrency = "1")
public void process(String msg, Message message, Channel channel) throws IOException {
log.debug("消费者接收到的消息 :{}, 来源:{} 时间:{}", msg, channel.getChannelNumber(), System.currentTimeMillis());
try {
if(msg.contains("5")){
throw new RuntimeException("抛出异常");
}
log.info("消息{}消费成功",msg);
//消息Id
message.getMessageProperties().getMessageId();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
log.error("接收消息过程中出现异常,执行nack");
//第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("消息{}异常",message.getMessageProperties().getHeaders());
}
}
@Data
static class Messages{
private Object obj;
private String messageId;
private String uuid;
}
}
3、检测客户端服务端是否在线
一、服务端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* 启动服务端服务
* 需要开启一个新的线程来执行netty 服务端 或者客户端
* ThreadUtil.execute(() -> {
* Server.initServer();
* Client.initClient();
* });
*/
@Slf4j
public class Server {
/**
* 装有所有客户端channel的组
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
static class ServerHeader extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Channel channel = ctx.channel();
//循环channel组,判断是不是其它客户端发送的消息
channelGroup.stream().forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush(channel.remoteAddress() + "---发送的消息为:" + msg + "\n");
} else {
ch.writeAndFlush("【自己的消息】" + msg + "\n");
}
});
}
/**
* 用户事件触发方法 判断事件
*
* @param ctx 上下文对象
* @param evt 事件对象
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//判断该事件是否为超时事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String type;
//判断具体事件 具体项目可以根据不同情况进行不同处理
switch (event.state()) {
case READER_IDLE:
type = "读空闲";
break;
case WRITER_IDLE:
type = "写空闲";
break;
default:
type = "读写空闲";
}
log.info("%s==超时事件==%s\n", ctx.channel().remoteAddress().toString(), type);
ctx.channel().close();
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
//通知其它客户端
channelGroup.writeAndFlush("【服务端:】" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
log.info("【客户端:】" + channel.remoteAddress() + "上线\n");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
log.info("【客户端:】" + channel.remoteAddress() + "下线\n");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("【客户端:】" + channel.remoteAddress() + "离开\n");
//此处不用从组中一处,掉线之后netty会自动的从组中移除
}
}
static class ServerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加处理器,netty通过解决不同的情景来添加不同的处理器
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new IdleStateHandler(100,200,300, TimeUnit.SECONDS))
.addLast(new ServerHeader());
}
}
/**
* 初始化
*/
public static void initServer(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ServerInit());
// 端口可以自定义
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
log.error("初始化失败:{}",e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
二、客户端:
import cn.hutool.cron.CronUtil;
import cn.hutool.cron.task.Task;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 启动客户端服务
* 需要开启一个新的线程来执行netty 服务端 或者客户端
* ThreadUtil.execute(() -> {
* Server.initServer();
* Client.initClient();
* });
*/
@Slf4j
public class Client {
static class ClientHeader extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
log.info("服务端消息:" + msg);
}
}
static class ClientInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加处理器,netty通过解决不同的情景来添加不同的处理器
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new ClientHeader());
}
}
/**
* 初始化客户端
*/
public static void initClient(){
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInit());
//获得channel对象
Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
// 支持秒级别定时任务
CronUtil.setMatchSecond(true);
CronUtil.start();
CronUtil.schedule("*/2 * * * * *", (Task) () -> {
String str = String.valueOf(System.currentTimeMillis());
BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(str.getBytes())));
try {
channel.writeAndFlush(br.readLine() + "\r\n");
} catch (IOException e) {
log.error("通道已关闭:{}",e);
}
});
} catch (Exception e) {
log.info("启动客户端失败:{}",e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
三、springboot方式启动:
import cn.hutool.core.thread.ThreadUtil;
import com.imooc.stream.utils.Client;
import com.imooc.stream.utils.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@Slf4j
@SpringBootApplication
public class StreamApplication extends SpringBootServletInitializer implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
/**
* 支持使用外置的tomcat启动
* @param application
* @return
*/
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
log.info("启动加载自定义的ServletInitializer");
return application.sources(StreamApplication.class);
}
@Override
public void run(String... args) {
log.info("启动成功, 同时启动netty服务");
//需要开启一个新的线程来执行netty 服务端 或者客户端
ThreadUtil.execute(() -> {
Server.initServer();
Client.initClient();
});
}
}
本文地址:https://blog.csdn.net/qq_32447301/article/details/112854611