欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot(十三):整合Redis哨兵,集群模式实践

程序员文章站 2022-05-11 08:04:13
前面的两篇文章( "Redis的持久化方案" , "一文掌握Redis的三种集群方案" )分别介绍了Redis的持久化与集群方案 —— 包括主从复制模式、哨兵模式、Cluster模式,其中主从复制模式由于不能自动做故障转移,当节点出现故障时需要人为干预,不满足生产环境的高可用需求,所以在生产环境一般 ......

前面的两篇文章(redis的持久化方案一文掌握redis的三种集群方案)分别介绍了redis的持久化与集群方案 —— 包括主从复制模式、哨兵模式、cluster模式,其中主从复制模式由于不能自动做故障转移,当节点出现故障时需要人为干预,不满足生产环境的高可用需求,所以在生产环境一般使用哨兵模式或cluster模式。那么在spring boot项目中,如何访问这两种模式的redis集群,可能遇到哪些问题,是本文即将介绍的内容。

spring boot 2 整合redis

spring boot中整合redis非常简单,在pom.xml中添加依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>

spring boot 2的spring-boot-starter-data-redis中,默认使用的是lettuce作为redis客户端,它与jedis的主要区别如下:

  1. jedis是同步的,不支持异步,jedis客户端实例不是线程安全的,需要每个线程一个jedis实例,所以一般通过连接池来使用jedis
  2. lettuce是基于netty框架的事件驱动的redis客户端,其方法调用是异步的,lettuce的api也是线程安全的,所以多个线程可以操作单个lettuce连接来完成各种操作,同时lettuce也支持连接池

如果不使用默认的lettuce,使用jedis的话,可以排除lettuce的依赖,手动加入jedis依赖,配置如下

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-data-redis</artifactid>
    <exclusions>
        <exclusion>
            <groupid>io.lettuce</groupid>
            <artifactid>lettuce-core</artifactid>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupid>redis.clients</groupid>
    <artifactid>jedis</artifactid>
    <version>2.9.0</version>
</dependency>

在配置文件application.yml中添加配置(针对单实例)

spring:
  redis:
    host: 192.168.40.201
    port: 6379
    password: passw0rd
    database: 0 # 数据库索引,默认0
    timeout: 5000  # 连接超时,单位ms
    jedis:  # 或lettuce, 连接池配置,springboot2.0中使用jedis或者lettuce配置连接池,默认为lettuce连接池
      pool:
        max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
        max-wait: -1 # 连接池分配连接最大阻塞等待时间(阻塞时间到,抛出异常。使用负值表示无限期阻塞)
        max-idle: 8 # 连接池中的最大空闲连接数
        min-idle: 0 # 连接池中的最小空闲连接数

然后添加配置类。其中@enablecaching注解是为了使@cacheable、@cacheevict、@cacheput、@caching注解生效

@configuration
@enablecaching
public class redisconfig {

    @bean
    public redistemplate<string, object> redistemplate(redisconnectionfactory factory) {
        redistemplate<string, object> template = new redistemplate<>();
        template.setconnectionfactory(factory);

        // 使用jackson2jsonredisserialize 替换默认的jdkserializeable序列化
        jackson2jsonredisserializer jackson2jsonredisserializer = new jackson2jsonredisserializer(object.class);
        objectmapper om = new objectmapper();
        om.setvisibility(propertyaccessor.all, jsonautodetect.visibility.any);
        om.enabledefaulttyping(objectmapper.defaulttyping.non_final);
        jackson2jsonredisserializer.setobjectmapper(om);

        stringredisserializer stringredisserializer = new stringredisserializer();

        // key采用string的序列化方式
        template.setkeyserializer(stringredisserializer);
        // hash的key也采用string的序列化方式
        template.sethashkeyserializer(stringredisserializer);
        // value序列化方式采用jackson
        template.setvalueserializer(jackson2jsonredisserializer);
        // hash的value序列化方式采用jackson
        template.sethashvalueserializer(jackson2jsonredisserializer);
        template.afterpropertiesset();
        return template;
    }
}

上述配置类注入了自定义的redistemplate<string, object>, 替换redisautoconfiguration中自动配置的redistemplate<object, object>类(redisautoconfiguration另外还自动配置了stringredistemplate)。

此时,我们可以通过定义一个基于redistemplate的工具类,或通过在service层添加@cacheable、@cacheevict、@cacheput、@caching注解来使用缓存。比如定义一个redisservice类,封装常用的redis操作方法,

@component
@slf4j
public class redisservice {

    @autowired
    private redistemplate<string, object> redistemplate;

    /**
     * 指定缓存失效时间
     *
     * @param key 键
     * @param time 时间(秒)
     * @return
     */
    public boolean expire(string key, long time) {
        try {
            if (time > 0) {
                redistemplate.expire(key, time, timeunit.seconds);
            }
            return true;
        } catch (exception e) {
            log.error("exception when expire key {}. ", key, e);
            return false;
        }
    }

    /**
     * 根据key获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getexpire(string key) {
        return redistemplate.getexpire(key, timeunit.seconds);
    }

    /**
     * 判断key是否存在
     *
     * @param key  键
     * @return true 存在 false不存在
     */
    public boolean haskey(string key) {
        try {
            return redistemplate.haskey(key);
        } catch (exception e) {
            log.error("exception when check key {}. ", key, e);
            return false;
        }
    }

   ...
}

出于篇幅,完整代码请查阅本文示例源码:

或在service层使用注解,如

@service
@cacheconfig(cachenames = "users")
public class userservice {

    private static map<string, user> usermap = new hashmap<>();

    @cacheput(key = "#user.username")
    public user adduser(user user){
        user.setuid(uuid.randomuuid().tostring());
        system.out.println("add user: " + user);
        usermap.put(user.getusername(), user);
        return user;
    }

    @caching(put = {
            @cacheput( key = "#user.username"),
            @cacheput( key = "#user.uid")
    })
    public user adduser2(user user) {
        user.setuid(uuid.randomuuid().tostring());
        system.out.println("add user2: " + user);
        usermap.put(user.getusername(), user);
        return user;
    }
    ...
}

spring boot 2 整合redis哨兵模式

spring boot 2 整合redis哨兵模式除了配置稍有差异,其它与整合单实例模式类似,配置示例为

spring:
  redis:
    password: passw0rd
    timeout: 5000
    sentinel:
      master: mymaster
      nodes: 192.168.40.201:26379,192.168.40.201:36379,192.168.40.201:46379 # 哨兵的ip:port列表
    jedis: # 或lettuce
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

完整示例可查阅源码:

上述配置只指定了哨兵节点的地址与master的名称,但redis客户端最终访问操作的是master节点,那么redis客户端是如何获取master节点的地址,并在发生故障转移时,如何自动切换master地址的呢?我们以jedis连接池为例,通过源码来揭开其内部实现的神秘面纱。

在 jedissentinelpool 类的构造函数中,对连接池做了初始化,如下

 public jedissentinelpool(string mastername, set<string> sentinels,
      final genericobjectpoolconfig poolconfig, final int connectiontimeout, final int sotimeout,
      final string password, final int database, final string clientname) {
    this.poolconfig = poolconfig;
    this.connectiontimeout = connectiontimeout;
    this.sotimeout = sotimeout;
    this.password = password;
    this.database = database;
    this.clientname = clientname;

    hostandport master = initsentinels(sentinels, mastername);
    initpool(master);
 }

private hostandport initsentinels(set<string> sentinels, final string mastername) {

    for (string sentinel : sentinels) {
      final hostandport hap = hostandport.parsestring(sentinel);

      log.fine("connecting to sentinel " + hap);

      jedis jedis = null;
      try {
        jedis = new jedis(hap.gethost(), hap.getport());

        list<string> masteraddr = jedis.sentinelgetmasteraddrbyname(mastername);

        // connected to sentinel...
        sentinelavailable = true;

        if (masteraddr == null || masteraddr.size() != 2) {
          log.warning("can not get master addr, master name: " + mastername + ". sentinel: " + hap
              + ".");
          continue;
        }

        master = tohostandport(masteraddr);
        log.fine("found redis master at " + master);
        break;
      } catch (jedisexception e) {
        // resolves #1036, it should handle jedisexception there's another chance
        // of raising jedisdataexception
        log.warning("cannot get master address from sentinel running @ " + hap + ". reason: " + e
            + ". trying next one.");
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
    //省略了非关键代码

    for (string sentinel : sentinels) {
      final hostandport hap = hostandport.parsestring(sentinel);
      masterlistener masterlistener = new masterlistener(mastername, hap.gethost(), hap.getport());
      // whether masterlistener threads are alive or not, process can be stopped
      masterlistener.setdaemon(true);
      masterlisteners.add(masterlistener);
      masterlistener.start();
    }

    return master;
  }

initsentinels 方法中主要干了两件事:

  1. 遍历哨兵节点,通过get-master-addr-by-name命令获取master节点的地址信息,找到了就退出循环。get-master-addr-by-name命令执行结果如下所示
[root@dev-server-1 master-slave]# redis-cli -p 26379
127.0.0.1:26379> sentinel get-master-addr-by-name mymaster
1) "192.168.40.201"
2) "7001"
127.0.0.1:26379>
  1. 对每一个哨兵节点通过一个 masterlistener 进行监听(redis的发布订阅功能),订阅哨兵节点+switch-master频道,当发生故障转移时,客户端能收到哨兵的通知,通过重新初始化连接池,完成主节点的切换。
    masterlistener.run方法中监听哨兵部分代码如下
 j.subscribe(new jedispubsub() {
            @override
            public void onmessage(string channel, string message) {
              log.fine("sentinel " + host + ":" + port + " published: " + message + ".");

              string[] switchmastermsg = message.split(" ");

              if (switchmastermsg.length > 3) {

                if (mastername.equals(switchmastermsg[0])) {
                  initpool(tohostandport(arrays.aslist(switchmastermsg[3], switchmastermsg[4])));
                } else {
                  log.fine("ignoring message on +switch-master for master name "
                      + switchmastermsg[0] + ", our master name is " + mastername);
                }

              } else {
                log.severe("invalid message received on sentinel " + host + ":" + port
                    + " on channel +switch-master: " + message);
              }
            }
          }, "+switch-master");

initpool 方法如下:如果发现新的master节点与当前的master不同,则重新初始化。

private void initpool(hostandport master) {
    if (!master.equals(currenthostmaster)) {
      currenthostmaster = master;
      if (factory == null) {
        factory = new jedisfactory(master.gethost(), master.getport(), connectiontimeout,
            sotimeout, password, database, clientname, false, null, null, null);
        initpool(poolconfig, factory);
      } else {
        factory.sethostandport(currenthostmaster);
        // although we clear the pool, we still have to check the
        // returned object
        // in getresource, this call only clears idle instances, not
        // borrowed instances
        internalpool.clear();
      }

      log.info("created jedispool to master at " + master);
    }
  }

通过以上两步,jedis客户端在只知道哨兵地址的情况下便能获得master节点的地址信息,并且当发生故障转移时能自动切换到新的master节点地址。

spring boot 2 整合redis cluster模式

spring boot 2 整合redis cluster模式除了配置稍有差异,其它与整合单实例模式也类似,配置示例为

spring:
  redis:
    password: passw0rd
    timeout: 5000
    database: 0
    cluster:
      nodes: 192.168.40.201:7100,192.168.40.201:7200,192.168.40.201:7300,192.168.40.201:7400,192.168.40.201:7500,192.168.40.201:7600
      max-redirects: 3  # 重定向的最大次数
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

完整示例可查阅源码:

一文掌握redis的三种集群方案 中已经介绍了cluster模式访问的基本原理,可以通过任意节点跳转到目标节点执行命令,上面配置中 max-redirects 控制在集群中跳转的最大次数。

查看jedisclusterconnection的execute方法,

public object execute(string command, byte[]... args) {

    assert.notnull(command, "command must not be null!");
    assert.notnull(args, "args must not be null!");

    return clustercommandexecutor
            .executecommandonarbitrarynode((jedisclustercommandcallback<object>) client -> jedisclientutils.execute(command,
                    empty_2d_byte_array, args, () -> client))
            .getvalue();
}

集群命令的执行是通过clustercommandexecutor.executecommandonarbitrarynode来实现的,

public <t> noderesult<t> executecommandonarbitrarynode(clustercommandcallback<?, t> cmd) {

    assert.notnull(cmd, "clustercommandcallback must not be null!");
    list<redisclusternode> nodes = new arraylist<>(getclustertopology().getactivenodes());
    return executecommandonsinglenode(cmd, nodes.get(new random().nextint(nodes.size())));
}

private <s, t> noderesult<t> executecommandonsinglenode(clustercommandcallback<s, t> cmd, redisclusternode node,
        int redirectcount) {

    assert.notnull(cmd, "clustercommandcallback must not be null!");
    assert.notnull(node, "redisclusternode must not be null!");

    if (redirectcount > maxredirects) {
        throw new toomanyclusterredirectionsexception(string.format(
                "cannot follow cluster redirects over more than %s legs. please consider increasing the number of redirects to follow. current value is: %s.",
                redirectcount, maxredirects));
    }

    redisclusternode nodetouse = lookupnode(node);

    s client = this.resourceprovider.getresourceforspecificnode(nodetouse);
    assert.notnull(client, "could not acquire resource for node. is your cluster info up to date?");

    try {
        return new noderesult<>(node, cmd.doincluster(client));
    } catch (runtimeexception ex) {

        runtimeexception translatedexception = converttodataaccessexception(ex);
        if (translatedexception instanceof clusterredirectexception) {
            clusterredirectexception cre = (clusterredirectexception) translatedexception;
            return executecommandonsinglenode(cmd,
                    topologyprovider.gettopology().lookup(cre.gettargethost(), cre.gettargetport()), redirectcount + 1);
        } else {
            throw translatedexception != null ? translatedexception : ex;
        }
    } finally {
        this.resourceprovider.returnresourceforspecificnode(nodetouse, client);
    }
}

上述代码逻辑如下

  1. 从集群节点列表中随机选择一个节点
  2. 从该节点获取一个客户端连接(如果配置了连接池,从连接池中获取),执行命令
  3. 如果抛出clusterredirectexception异常,则跳转到返回的目标节点上执行
  4. 如果跳转次数大于配置的值 max-redirects, 则抛出toomanyclusterredirectionsexception异常

可能遇到的问题

  1. redis连接超时

检查服务是否正常启动(比如 ps -ef|grep redis查看进程,netstat -ano|grep 6379查看端口是否起来,以及日志文件),如果正常启动,则查看redis服务器是否开启防火墙,关闭防火墙或配置通行端口。

  1. cluster模式下,报连接到127.0.0.1被拒绝错误,如 connection refused: no further information: /127.0.0.1:7600

这是因为在redis.conf中配置 bind 0.0.0.0bind 127.0.0.1导致,需要改为具体在外部可访问的ip,如 bind 192.168.40.201。如果之前已经起了集群,并产生了数据,则修改redis.conf文件后,还需要修改cluster-config-file文件,将127.0.0.1替换为bind 的具体ip,然后重启。

  1. master挂了,slave升级成为master,重启master,不能正常同步新的master数据

如果设置了密码,需要在master, slave的配置文件中都配置masterauth password

相关阅读:

  1. redis的持久化方案
  2. 一文掌握redis的三种集群方案

作者:空山新雨,一枚仍在学习路上的it老兵
近期作者写了几十篇技术博客,内容包括java、spring boot、spring cloud、docker,技术管理心得等
欢迎关注作者微信公众号:空山新雨的技术空间,一起学习成长

Spring Boot(十三):整合Redis哨兵,集群模式实践