欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

REDIS优化、TTL、ETCD

程序员文章站 2022-07-13 22:32:19
...

一、redis优化

缩短键值对的存储长度;
使用 lazy free(延迟删除)特性;
设置键值的过期时间;
禁用长耗时的查询命令;
使用 slowlog 优化耗时命令;
使用 Pipeline 批量操作数据;
避免大量数据同时失效;
客户端使用优化;
限制 Redis 内存大小;
使用物理机而非虚拟机安装 Redis 服务;
检查数据持久化策略;
禁用 THP 特性;
使用分布式架构来增加读写速度

二、transmittable-thread-local实现方式TTL

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.11.4</version>
        </dependency>
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServiceTransmittable {

    /**
     * 模拟tomcat线程池
     */
    private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);

    /**
     * 业务线程池,默认Control中异步任务执行线程池
     */
    private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4)); // 使用ttl线程池,该框架的使用,请查阅官方文档。

    /**
     * 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值。
     */
    private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            tomcatExecutors.submit(new ControlThread(i));
        }
        //简单粗暴的关闭线程池
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        businessExecutors.shutdown();
        tomcatExecutors.shutdown();

    }


    /**
     * 模拟Control任务
     */
    static class ControlThread implements Runnable {
        private int i;
        public ControlThread(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            requestIdThreadLocal.set(i);
            //使用线程池异步处理任务
            businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
        }
    }

    /**
     * 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
     */
    static class BusinessTask implements Runnable {
        private String parentThreadName;
        public BusinessTask(String parentThreadName) {
            this.parentThreadName = parentThreadName;
        }
        @Override
        public void run() {
            //如果与上面的能对应上来,则说明正确,否则失败
            System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
        }
    }

}

三、etcd是一个分布式一致性键值存储系统

         <dependency>
            <groupId>io.etcd</groupId>
            <artifactId>jetcd-core</artifactId>
            <version>0.5.3</version>
        </dependency>
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author nick
 */
@Slf4j
public class EtcdUtil {

    /**
     * etcd客户端链接
     */
    private static  Client client = null;

    private static String IP_PORT;

    static {
        IP_PORT = System.getProperty("IP_PORT", "http://127.0.0.1:1000");
        getEtcdClient();
    }

    /**
     * 链接初始化
     * @return
     */
    public static Client getEtcdClient() {
        if (client == null) {
            synchronized (EtcdUtil.class) {
                client = Client.builder().lazyInitialization(false).endpoints(IP_PORT).build();
            }
        }
        return client;
    }

    /**
     * 根据指定的配置名称获取对应的value
     *
     * @param key
     *            配置项
     * @return
     * @throws Exception
     */
    public static String getEtcdValueByKey(String key) throws Exception {
        KeyValue kv = getEtcdKeyValueByKey(key);
        if (kv != null) {
            return kv.getValue().toString();
        } else {
            return null;
        }
    }

    /**
     * 根据指定的配置名称获取对应的key value
     *
     * @param key
     *            配置项
     * @return
     * @throws Exception
     */
    public static KeyValue getEtcdKeyValueByKey(String key) throws Exception {
        List<KeyValue> kvs = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
        if (kvs.size() > 0) {
            return kvs.get(0);
        } else {
            return null;
        }
    }

    /**
     * 新增或者修改指定的配置
     *
     * @param key
     * @param value
     * @return
     */
    public static void putEtcdValueByKey(String key, String value) {
        client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8),
                ByteSequence.from(value.getBytes(StandardCharsets.UTF_8)));
    }

    /**
     * 删除指定的配置
     *
     * @param key
     * @return
     */
    public static void deleteEtcdValueByKey(String key) {
        client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8));
    }

    /**
     * V3 api配置初始化和监听
     * @param key 案例:test
     */
    @PostConstruct
    public void init(String key) {
        try {
            // 加载配置
            getConfig(client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs());
            // 启动监听线程
            new Thread(() -> {
                // 对某一个配置进行监听
                Watch.Watcher watcher = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),new WatchListener());
                watcher.close();
            }).start();
        } catch (Exception e) {
           log.error("初始化失败{}",e);
        }
    }

    private String getConfig(List<KeyValue> kvs) {
        if (kvs.size() > 0) {
            String config = kvs.get(0).getKey().toString();
            String value = kvs.get(0).getValue().toString();
            return config;
        } else {
            return null;
        }
    }


    /**
     * 持续监控某个key变化的方法,
     * 执行后如果key有变化会被监控到
     * @param key
     */
    public static void watchEtcdKey(String key) {
        // 最大事件数量
        Integer maxEvents = Integer.MAX_VALUE;
        CountDownLatch latch = new CountDownLatch(maxEvents);
        try {
            ByteSequence watchKey = ByteSequence.from(key, StandardCharsets.UTF_8);
            WatchOption watchOpts = WatchOption.newBuilder().build();
            Watch.Watcher watcher = client.getWatchClient().watch(watchKey, watchOpts, response -> {
                response.getEvents().stream().forEach(e->{
                    KeyValue keyValue = e.getKeyValue();
                    log.info("keyValue:{}",keyValue.toString());
                    WatchEvent.EventType eventType = e.getEventType();
                    log.info("eventType:{}",eventType.toString());
                    KeyValue prevKV = e.getPrevKV();
                    log.info("prevKV:{}",prevKV.toString());
                });
                latch.countDown();
            });
            latch.await();
            watcher.close();
            client.close();
        } catch (Exception e) {
            log.error("获取数据失败{}",e);
        }
    }

}
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.watch.WatchResponse;

public class WatchListener implements Watch.Listener {
    @Override
    public void onNext(WatchResponse watchResponse) {

    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }
}

上一篇: etcd

下一篇: etcd 配置优化