REDIS优化、TTL、ETCD
程序员文章站
2022-07-13 22:32:19
...
一、redis优化
缩短键值对的存储长度;
使用 lazy free(延迟删除)特性;
设置键值的过期时间;
禁用长耗时的查询命令;
使用 slowlog 优化耗时命令;
使用 Pipeline 批量操作数据;
避免大量数据同时失效;
客户端使用优化;
限制 Redis 内存大小;
使用物理机而非虚拟机安装 Redis 服务;
检查数据持久化策略;
禁用 THP 特性;
使用分布式架构来增加读写速度
二、transmittable-thread-local实现方式TTL
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.4</version>
</dependency>
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServiceTransmittable {
/**
* 模拟tomcat线程池
*/
private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
/**
* 业务线程池,默认Control中异步任务执行线程池
*/
private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4)); // 使用ttl线程池,该框架的使用,请查阅官方文档。
/**
* 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值。
*/
private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
tomcatExecutors.submit(new ControlThread(i));
}
//简单粗暴的关闭线程池
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
businessExecutors.shutdown();
tomcatExecutors.shutdown();
}
/**
* 模拟Control任务
*/
static class ControlThread implements Runnable {
private int i;
public ControlThread(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + i);
requestIdThreadLocal.set(i);
//使用线程池异步处理任务
businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
}
}
/**
* 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
*/
static class BusinessTask implements Runnable {
private String parentThreadName;
public BusinessTask(String parentThreadName) {
this.parentThreadName = parentThreadName;
}
@Override
public void run() {
//如果与上面的能对应上来,则说明正确,否则失败
System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
}
}
}
三、etcd是一个分布式一致性键值存储系统
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.5.3</version>
</dependency>
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author nick
*/
@Slf4j
public class EtcdUtil {
/**
* etcd客户端链接
*/
private static Client client = null;
private static String IP_PORT;
static {
IP_PORT = System.getProperty("IP_PORT", "http://127.0.0.1:1000");
getEtcdClient();
}
/**
* 链接初始化
* @return
*/
public static Client getEtcdClient() {
if (client == null) {
synchronized (EtcdUtil.class) {
client = Client.builder().lazyInitialization(false).endpoints(IP_PORT).build();
}
}
return client;
}
/**
* 根据指定的配置名称获取对应的value
*
* @param key
* 配置项
* @return
* @throws Exception
*/
public static String getEtcdValueByKey(String key) throws Exception {
KeyValue kv = getEtcdKeyValueByKey(key);
if (kv != null) {
return kv.getValue().toString();
} else {
return null;
}
}
/**
* 根据指定的配置名称获取对应的key value
*
* @param key
* 配置项
* @return
* @throws Exception
*/
public static KeyValue getEtcdKeyValueByKey(String key) throws Exception {
List<KeyValue> kvs = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
if (kvs.size() > 0) {
return kvs.get(0);
} else {
return null;
}
}
/**
* 新增或者修改指定的配置
*
* @param key
* @param value
* @return
*/
public static void putEtcdValueByKey(String key, String value) {
client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8),
ByteSequence.from(value.getBytes(StandardCharsets.UTF_8)));
}
/**
* 删除指定的配置
*
* @param key
* @return
*/
public static void deleteEtcdValueByKey(String key) {
client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8));
}
/**
* V3 api配置初始化和监听
* @param key 案例:test
*/
@PostConstruct
public void init(String key) {
try {
// 加载配置
getConfig(client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs());
// 启动监听线程
new Thread(() -> {
// 对某一个配置进行监听
Watch.Watcher watcher = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),new WatchListener());
watcher.close();
}).start();
} catch (Exception e) {
log.error("初始化失败{}",e);
}
}
private String getConfig(List<KeyValue> kvs) {
if (kvs.size() > 0) {
String config = kvs.get(0).getKey().toString();
String value = kvs.get(0).getValue().toString();
return config;
} else {
return null;
}
}
/**
* 持续监控某个key变化的方法,
* 执行后如果key有变化会被监控到
* @param key
*/
public static void watchEtcdKey(String key) {
// 最大事件数量
Integer maxEvents = Integer.MAX_VALUE;
CountDownLatch latch = new CountDownLatch(maxEvents);
try {
ByteSequence watchKey = ByteSequence.from(key, StandardCharsets.UTF_8);
WatchOption watchOpts = WatchOption.newBuilder().build();
Watch.Watcher watcher = client.getWatchClient().watch(watchKey, watchOpts, response -> {
response.getEvents().stream().forEach(e->{
KeyValue keyValue = e.getKeyValue();
log.info("keyValue:{}",keyValue.toString());
WatchEvent.EventType eventType = e.getEventType();
log.info("eventType:{}",eventType.toString());
KeyValue prevKV = e.getPrevKV();
log.info("prevKV:{}",prevKV.toString());
});
latch.countDown();
});
latch.await();
watcher.close();
client.close();
} catch (Exception e) {
log.error("获取数据失败{}",e);
}
}
}
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.watch.WatchResponse;
public class WatchListener implements Watch.Listener {
@Override
public void onNext(WatchResponse watchResponse) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
}