当前位置: 首页 > Spring Boot, 缓存 > 正文

基于Spring 的 Redis Sentinel 读写分离 Slave 连接池

目 录
 [ 隐藏 ]

0. 背景

Reids除了配置集群实现高可用之外,对于单机版的Redis,可以通过Master-Slave架构,配合使用Sentinel机制实现高可用架构,
同时客户端可以实现自动失效转移。

类似于JdbcTemplate,Spring中使用RedisTemplate来操作Redis。Spring Boot中只需引入如下Maven依赖,即可自动配置
一个RedisTemplate实例。

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
  <groupid>redis.clients</groupid>
  <artifactid>jedis</artifactid>
  <version>2.9.0</version>
</dependency>

RedisTemplate需要一个RedisConnectionFactory来管理Redis连接。 可以在项目中定义一个RedisSentinelConfiguration给
RedisConnectionFactory,即可生成一个基于Sentinel的连接池,并且实现了自动失效转移:当master失效时,Sentinel自动提升一个slave
成为master保证Redis的master连接高可用。

下面是基于Sentinel的RedisConnectionFactory的典型配置

@Value("${spring.redis.password}")
    private String redisPasswd;

@Bean
public RedisConnectionFactory jedisConnectionFactory() {
    RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
            .master("mymaster") 
            .sentinel("192.168.0.1", 26479)
            .sentinel("192.168.0.2", 26479)
            .sentinel("192.168.0.3", 26479); 
    sentinelConfig.setPassword(RedisPassword.of(redisPasswd)); 
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(sentinelConfig);
    System.out.println(jedisConnectionFactory.getClientConfiguration().getClientName());
    return jedisConnectionFactory;
}

查看 org.springframework.data.redis.connection.jedis.JedisConnectionFactory源码发现,
当配置了RedisSentinelConfiguration后,RedisConnectionFactory会返回一个JedisSentinelPool连接池。该连接池里面所有的连接
都是连接到Master上面的。 同时,在JedisSentinelPool中为每一个Sentinel都配置了+switch-master频道的监听。 当监听到+switch-master消息后
表示发生了master切换,有新的Master产生,然后会重新初始化到新Master的连接池。

至此,我们知道基于Sentinel可以创建RedisConnectionFactory,并可实现自动失效转移,
但RedisConnectionFactory只会创建到Master的连接。 一般情况下,如果所有的连接都是连接到Master上面,Slave就完全当成Master的备份了,造成性能浪费。
通常,Slave只是单纯的复制Master的数据,为避免数据不一致,不应该往Slave写数据,可以在Redis配置文件中配置slave-read-only yes,让Slave拒绝所有的写操作。
于是,对于一个基于Sentinel的Master-Slave Redis 服务器来说,可以将Master配置为可读写服务器,将所有Slave配置为只读服务器来实现读写分离,以充分利用服务器资源,
并提高整个Redis系统的性能。

1. 提出问题

JedisSentinelPool连接池中的连接都是到Master的连接,那么如何获取到Slave的连接池呢? 分析了spring-boot-starter-data-redis和jedis之后,发现,
并没有现成的Slave连接池可以拿来用,于是决定写一个。

2. 分析问题

通过RedisSentinelConfiguration,可以拿到sentinel的IP和端口,就可以连接到sentinel,再调用sentinel slaves mymaster命令,就可以拿到slave的IP和port。
然后就可以创建到slave的连接了。

继续查看JedisFactory源码,了解到其实现了PooledObjectFactory接口,该接口来自org.apache.commons.pool2,由此可见,Jedis连接池是借助Apache
commons.pool2来实现的。

"" 由图看到,JedisConnectionFactory创建一个JedisSentinelPool,JedisSentinelPool创建JedisFactory,JedisFactory实现了PooledObjectFactory接口
,在MakeObject()方法中产生新的Redis连接。 在JedisSentinelPool中定义MasterListener还订阅+switch-master频道,一旦发生Master转移事件,自动作失效转移
重新初始化master连接池。

3. 解决问题

模仿JedisConnectionFactory,JedisSentinelPool,和JedisFactory, 创建JedisSentinelSlaveConnectionFactory,JedisSentinelSlavePool和JedisSentinelSlaveFactory
它们之间的关系,如图UML-2所示。

""

其中,JedisSentinelSlaveConnectionFactory就是可以传递给RedisTemplate的。JedisSentinelSlaveConnectionFactory继承自JedisConnectionFactory
并且覆盖了createRedisSentinelPool方法,在JedisConnectionFactory中,该方法会返回一个JedisSentinelPool,而新的方法会返回JedisSentinelSlavePool。
JedisSentinelSlavePool和JedisSentinelPool都是继承自Pool的。 JedisSentinelSlavePool会生成JedisSentinelSlaveFactory,
JedisSentinelSlaveFactory实现了PooledObjectFactory接口,在public PooledObject makeObject()方法中,通过sentinel连接,
调用sentinel slaves命令,获取所有可用的slave的ip和port,然后随机的创建一个slave连接并返回。

JedisSentinelSlaveConnectionFactory的createRedisSentinelPool方法

@Override 
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){ 
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); 
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), 
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); 
    } 
} 

1) 通过配置RedisSentinelConfiguration传递sentinel配置和master name给JedisSentinelSlaveConnectionFactory,然后sentinel配置和master name
会传递到JedisSentinelSlavePool和JedisSentinelSlaveFactory中
2)创建 JedisSentinelSlavePool,在JedisSentinelSlavePool中启动监听,监听"+switch-master"频道,一旦新master产生,即初始化连接池
3) 连接池有JedisSentinelSlaveFactory来代理,JedisSentinelSlaveFactory实现了PooledObjectFactory
在makeObject()中首先根据配置的Sentinel Set找到一个可用的sentinel连接,然后执行sentinel slaves master_name获取所有slave列表
随机选择一个slave创建连接。 如果连接不成功则重试,最大重试5次,依然不能成功创建连接则抛出异常。
4) 由图uml-2可知,JedisConnectionFactory实现了InitializingBean,Spring会在Bean初始化之后,调用接口方法void afterPropertiesSet() throws Exception;
在这个方法中创建连接池
5) JedisConnectionFactory实现了DisposableBean,会在Spring 容器销毁时,调用public void destroy() 方法销毁连接池

4 实战

4.1 redis-sentinel-slave-connection-factory 工程结构

1) pom文件
<?xml version="1.0" encoding="UTF-8"?> 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
  <modelVersion>4.0.0</modelVersion> 

  <groupId>com.jack.yin</groupId> 
  <artifactId>redis-sentinel-slave-connection-factory</artifactId> 
  <version>1.0-SNAPSHOT</version> 

  <name>spring-boot-starter-redis-readonly-connection-factory</name> 
  <!-- FIXME change it to the project's website --> 
  <url>http://www.example.com</url> 

  <properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <maven.compiler.source>1.8</maven.compiler.source> 
    <maven.compiler.target>1.8</maven.compiler.target> 
  </properties> 

  <dependencies> 
    <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-data-redis</artifactId> 
      <version>2.0.1.RELEASE</version> 
    </dependency> 

    <dependency> 
      <groupId>redis.clients</groupId> 
      <artifactId>jedis</artifactId> 
      <version>2.9.0</version> 
    </dependency> 

    <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.12</version> 
      <scope>test</scope> 
    </dependency> 
  </dependencies> 

  <build> 
    <pluginManagement> 
      <plugins> 
        <plugin> 
          <artifactId>maven-clean-plugin</artifactId> 
          <version>3.0.0</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-resources-plugin</artifactId> 
          <version>3.0.2</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-compiler-plugin</artifactId> 
          <version>3.7.0</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-surefire-plugin</artifactId> 
          <version>2.20.1</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-jar-plugin</artifactId> 
          <version>3.0.2</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-install-plugin</artifactId> 
          <version>2.5.2</version> 
        </plugin> 
        <plugin> 
          <artifactId>maven-deploy-plugin</artifactId> 
          <version>2.8.2</version> 
        </plugin> 
      </plugins> 
    </pluginManagement> 
  </build> 
</project> 
2) JedisSentinelSlaveFactory.java
package redis.clients.jedis; 

import org.apache.commons.pool2.PooledObject; 
import org.apache.commons.pool2.PooledObjectFactory; 
import org.apache.commons.pool2.impl.DefaultPooledObject; 
import redis.clients.jedis.exceptions.InvalidURIException; 
import redis.clients.jedis.exceptions.JedisException; 
import redis.clients.util.JedisURIHelper; 

import javax.net.ssl.HostnameVerifier; 
import javax.net.ssl.SSLParameters; 
import javax.net.ssl.SSLSocketFactory; 
import java.net.URI; 
import java.security.SecureRandom; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.atomic.AtomicReference; 

public class JedisSentinelSlaveFactory implements PooledObjectFactory<Jedis> { 
    private final  String masterName; 
    private final int retryTimeWhenRetrieveSlave = 5; 

    private final AtomicReference<HostAndPort> hostAndPortOfASentinel = new AtomicReference<HostAndPort>(); 
    private final int connectionTimeout; 
    private final int soTimeout; 
    private final String password; 
    private final int database; 
    private final String clientName; 
    private final boolean ssl; 
    private final SSLSocketFactory sslSocketFactory; 
    private SSLParameters sslParameters; 
    private HostnameVerifier hostnameVerifier; 

    public JedisSentinelSlaveFactory(final String host, final int port, final int connectionTimeout, 
                        final int soTimeout, final String password, final int database, final String clientName, 
                        final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, 
                        final HostnameVerifier hostnameVerifier,String masterName) { 
        this.hostAndPortOfASentinel.set(new HostAndPort(host, port)); 
        this.connectionTimeout = connectionTimeout; 
        this.soTimeout = soTimeout; 
        this.password = password; 
        this.database = database; 
        this.clientName = clientName; 
        this.ssl = ssl; 
        this.sslSocketFactory = sslSocketFactory; 
        this.sslParameters = sslParameters; 
        this.hostnameVerifier = hostnameVerifier; 
        this.masterName = masterName; 
    } 

    public JedisSentinelSlaveFactory(final URI uri, final int connectionTimeout, final int soTimeout, 
                        final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, 
                        final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier,String masterName) { 
        if (!JedisURIHelper.isValid(uri)) { 
            throw new InvalidURIException(String.format( 
                "Cannot open Redis connection due invalid URI. %s", uri.toString())); 
        } 

        this.hostAndPortOfASentinel.set(new HostAndPort(uri.getHost(), uri.getPort())); 
        this.connectionTimeout = connectionTimeout; 
        this.soTimeout = soTimeout; 
        this.password = JedisURIHelper.getPassword(uri); 
        this.database = JedisURIHelper.getDBIndex(uri); 
        this.clientName = clientName; 
        this.ssl = ssl; 
        this.sslSocketFactory = sslSocketFactory; 
        this.sslParameters = sslParameters; 
        this.hostnameVerifier = hostnameVerifier; 
        this.masterName = masterName; 
    } 

    public void setHostAndPortOfASentinel(final HostAndPort hostAndPortOfASentinel) { 
        this.hostAndPortOfASentinel.set(hostAndPortOfASentinel); 
    } 

    @Override 
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception { 
        final BinaryJedis jedis = pooledJedis.getObject(); 
        if (jedis.getDB() != database) { 
            jedis.select(database); 
        } 

    } 

    @Override 
    public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception { 
        final BinaryJedis jedis = pooledJedis.getObject(); 
        if (jedis.isConnected()) { 
            try { 
                try { 
                    jedis.quit(); 
                } catch (Exception e) { 
                } 
                jedis.disconnect(); 
            } catch (Exception e) { 

            } 
        } 

    } 

    @Override 
    public PooledObject<Jedis> makeObject() throws Exception { 
        final Jedis jedisSentinel = getASentinel(); 

        List<Map<String,String>> slaves = jedisSentinel.sentinelSlaves(this.masterName); 
        if(slaves == null || slaves.isEmpty()) { 
            throw new JedisException(String.format("No valid slave for master: %s",this.masterName)); 
        } 

        DefaultPooledObject<Jedis> result = tryToGetSlave(slaves); 

        if(null != result) { 
            return result; 
        } else { 
            throw new JedisException(String.format("No valid slave for master: %s, after try %d times.", 
                this.masterName,retryTimeWhenRetrieveSlave)); 
        } 

    } 

    private DefaultPooledObject<Jedis> tryToGetSlave(List<Map<String,String>> slaves) { 
        SecureRandom sr = new SecureRandom(); 
        int retry = retryTimeWhenRetrieveSlave; 
        while(retry >= 0) { 
            retry--; 
            int randomIndex = sr.nextInt(slaves.size()); 
            String host = slaves.get(randomIndex).get("ip"); 
            String port = slaves.get(randomIndex).get("port"); 
            final Jedis jedisSlave = new Jedis(host,Integer.valueOf(port), connectionTimeout,soTimeout, 
                ssl, sslSocketFactory,sslParameters, hostnameVerifier); 
            try { 
                jedisSlave.connect(); 
                if (null != this.password) { 
                    jedisSlave.auth(this.password); 
                } 
                if (database != 0) { 
                    jedisSlave.select(database); 
                } 
                if (clientName != null) { 
                    jedisSlave.clientSetname(clientName); 
                } 
                return  new DefaultPooledObject<Jedis>(jedisSlave); 

            } catch (Exception e) { 
                jedisSlave.close(); 
                slaves.remove(randomIndex); 
                continue; 
            } 
        } 

        return null; 
    } 

    private Jedis getASentinel() { 
        final HostAndPort hostAndPort = this.hostAndPortOfASentinel.get(); 
        final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, 
            soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); 

        try { 
            jedis.connect(); 
        } catch (JedisException je) { 
            jedis.close(); 
            throw je; 
        } 
        return jedis; 
    } 

    @Override 
    public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception { 
        // TODO maybe should select db 0? Not sure right now. 
    } 

    @Override 
    public boolean validateObject(PooledObject<Jedis> pooledJedis) { 
        final BinaryJedis jedis = pooledJedis.getObject(); 
        try { 
            HostAndPort hostAndPort = this.hostAndPortOfASentinel.get(); 

            String connectionHost = jedis.getClient().getHost(); 
            int connectionPort = jedis.getClient().getPort(); 

            return hostAndPort.getHost().equals(connectionHost) 
                && hostAndPort.getPort() == connectionPort && jedis.isConnected() 
                && jedis.ping().equals("PONG"); 
        } catch (final Exception e) { 
            return false; 
        } 
    } 
}
3) JedisSentinelSlavePool.java
package redis.clients.jedis; 

import org.apache.commons.pool2.impl.GenericObjectPoolConfig; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import redis.clients.jedis.exceptions.JedisConnectionException; 
import redis.clients.jedis.exceptions.JedisException; 
import redis.clients.util.Pool; 

import java.security.InvalidParameterException; 
import java.util.Arrays; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Set; 
import java.util.concurrent.atomic.AtomicBoolean; 

public class JedisSentinelSlavePool extends Pool<Jedis> { 
    private final  String masterName; 

    protected GenericObjectPoolConfig poolConfig; 

    protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT; 
    protected int soTimeout = Protocol.DEFAULT_TIMEOUT; 

    protected String password; 

    protected int database = Protocol.DEFAULT_DATABASE; 

    protected String clientName; 

    protected final Set<JedisSentinelSlavePool.MasterListener> masterListeners = new HashSet<JedisSentinelSlavePool.MasterListener>(); 

    protected Logger logger = LoggerFactory.getLogger(JedisSentinelSlavePool.class.getName()); 

    private volatile JedisSentinelSlaveFactory factory; 
    private volatile HostAndPort currentSentinel; 

    private Set<String> sentinels; 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig) { 
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, 
            Protocol.DEFAULT_DATABASE); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels) { 
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, 
            Protocol.DEFAULT_DATABASE); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, String password) { 
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password) { 
        this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, final int timeout) { 
        this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, final String password) { 
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password, 
                             final int database) { 
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password, 
                             final int database, final String clientName) { 
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout, 
                             final String password, final int database) { 
        this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null); 
    } 

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, 
                             final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, 
                             final String password, final int database, final String clientName) { 
        this.poolConfig = poolConfig; 
        this.connectionTimeout = connectionTimeout; 
        this.soTimeout = soTimeout; 
        this.password = password; 
        this.database = database; 
        this.clientName = clientName; 
        this.masterName = masterName; 
        this.sentinels = sentinels; 

        HostAndPort aSentinel = initsentinels(this.sentinels, masterName); 
        initPool(aSentinel); 
    } 

    public void destroy() { 
        for (JedisSentinelSlavePool.MasterListener m : masterListeners) { 
            m.shutdown(); 
        } 

        super.destroy(); 
    } 

    public HostAndPort getCurrentSentinel() { 
        return currentSentinel; 
    } 

    private void initPool(HostAndPort sentinel) { 
        if (!sentinel.equals(currentSentinel)) { 
            currentSentinel = sentinel; 
            if (factory == null) { 
                factory = new JedisSentinelSlaveFactory(sentinel.getHost(), sentinel.getPort(), connectionTimeout, 
                    soTimeout, password, database, clientName, false, null, null, null,masterName); 
                initPool(poolConfig, factory); 
            } else { 
                factory.setHostAndPortOfASentinel(currentSentinel); 
                // although we clear the pool, we still have to check the 
                // returned object 
                // in getResource, this call only clears idle instances, not 
                // borrowed instances 
                internalPool.clear(); 
            } 

            logger.info("Created JedisPool to sentinel at " + sentinel); 
        } 
    } 

    private HostAndPort initsentinels(Set<String> sentinels, final String masterName) { 

        HostAndPort aSentinel = null; 
        boolean sentinelAvailable = false; 

        logger.info("Trying to find a valid sentinel from available Sentinels..."); 

        for (String sentinelStr : sentinels) { 
            final HostAndPort hap = HostAndPort.parseString(sentinelStr); 

            logger.info("Connecting to Sentinel " + hap); 

            Jedis jedis = null; 
            try { 
                jedis = new Jedis(hap.getHost(), hap.getPort()); 
                sentinelAvailable = true; 

                List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); 
                if (masterAddr == null || masterAddr.size() != 2) { 
                    logger.warn("Can not get master addr from sentinel, master name: " + masterName 
                        + ". Sentinel: " + hap + "."); 
                    continue; 
                } 

                aSentinel = hap; 
                logger.info("Found a Redis Sentinel at " + aSentinel); 
                break; 
            } catch (JedisException e) { 
                logger.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e 
                    + ". Trying next one."); 
            } finally { 
                if (jedis != null) { 
                    jedis.close(); 
                } 
            } 
        } 

        if (aSentinel == null) { 
            if (sentinelAvailable) { 
                // can connect to sentinel, but master name seems to not monitored 
                throw new JedisException("Can connect to sentinel, but " + masterName 
                    + " seems to be not monitored..."); 
            } else { 
                throw new JedisConnectionException("All sentinels down, cannot determine where is " 
                    + masterName + " master is running..."); 
            } 
        } 

        logger.info("Found Redis sentinel running at " + aSentinel + ", starting Sentinel listeners..."); 

        for (String sentinel : sentinels) { 
            final HostAndPort hap = HostAndPort.parseString(sentinel); 
            JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort()); 
            // whether MasterListener threads are alive or not, process can be stopped 
            masterListener.setDaemon(true); 
            masterListeners.add(masterListener); 
            masterListener.start(); 
        } 

        return aSentinel; 
    } 

    /** 
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be 
     *             done using @see {@link redis.clients.jedis.Jedis#close()} 
     */ 
    @Override 
    @Deprecated 
    public void returnBrokenResource(final Jedis resource) { 
        if (resource != null) { 
            returnBrokenResourceObject(resource); 
        } 
    } 

    /** 
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be 
     *             done using @see {@link redis.clients.jedis.Jedis#close()} 
     */ 
    @Override 
    @Deprecated 
    public void returnResource(final Jedis resource) { 
        if (resource != null) { 
            resource.resetState(); 
            returnResourceObject(resource); 
        } 
    } 

    private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) { 
        String host = getMasterAddrByNameResult.get(0); 
        int port = Integer.parseInt(getMasterAddrByNameResult.get(1)); 

        return new HostAndPort(host, port); 
    } 

    protected class MasterListener extends Thread { 

        protected String masterName; 
        protected String host; 
        protected int port; 
        protected long subscribeRetryWaitTimeMillis = 5000; 
        protected volatile Jedis j; 
        protected AtomicBoolean running = new AtomicBoolean(false); 

        protected MasterListener() { 
        } 

        public MasterListener(String masterName, String host, int port) { 
            super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); 
            this.masterName = masterName; 
            this.host = host; 
            this.port = port; 
        } 

        public MasterListener(String masterName, String host, int port, 
                              long subscribeRetryWaitTimeMillis) { 
            this(masterName, host, port); 
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; 
        } 

        @Override 
        public void run() { 

            running.set(true); 

            while (running.get()) { 

                j = new Jedis(host, port); 

                try { 
                    // double check that it is not being shutdown 
                    if (!running.get()) { 
                        break; 
                    } 

                    j.subscribe(new SentinelSlaveChangePubSub(), "+switch-master","+slave","+sdown","+odown","+reboot"); 

                } catch (JedisConnectionException e) { 

                    if (running.get()) { 
                        logger.error("Lost connection to Sentinel at " + host + ":" + port 
                            + ". Sleeping 5000ms and retrying.", e); 
                        try { 
                            Thread.sleep(subscribeRetryWaitTimeMillis); 
                        } catch (InterruptedException e1) { 
                            logger.info( "Sleep interrupted: ", e1); 
                        } 
                    } else { 
                        logger.info("Unsubscribing from Sentinel at " + host + ":" + port); 
                    } 
                } finally { 
                    j.close(); 
                } 
            } 
        } 

        public void shutdown() { 
            try { 
                logger.info("Shutting down listener on " + host + ":" + port); 
                running.set(false); 
                // This isn't good, the Jedis object is not thread safe 
                if (j != null) { 
                    j.disconnect(); 
                } 
            } catch (Exception e) { 
                logger.error("Caught exception while shutting down: ", e); 
            } 
        } 

        private class SentinelSlaveChangePubSub extends JedisPubSub { 
            @Override 
            public void onMessage(String channel, String message) { 
                if(masterName==null) { 
                    logger.error("Master Name is null!"); 
                    throw new InvalidParameterException("Master Name is null!"); 
                } 
                logger.info("Get message on chanel: "  + channel + " published: " + message + "." +   " current sentinel " + host + ":" + port ); 

                String[] msg = message.split(" "); 
                List<String> msgList = Arrays.asList(msg); 
                if(msgList.isEmpty()) {return;} 
                boolean needResetPool = false; 
                if( masterName.equalsIgnoreCase(msgList.get(0))) { //message from channel +switch-master 
                    //message looks like [+switch-master mymaster 192.168.0.2 6479 192.168.0.1 6479] 
                    needResetPool = true; 
                } 
                int tmpIndex = msgList.indexOf("@") + 1; 
                //message looks like  [+reboot slave 192.168.0.3:6479 192.168.0.3 6479 @ mymaster 192.168.0.1 6479] 
                if(tmpIndex >0 && masterName.equalsIgnoreCase(msgList.get(tmpIndex)) ) { //message from other channels 
                    needResetPool = true; 
                } 
                if(needResetPool) { 
                    HostAndPort aSentinel = initsentinels(sentinels, masterName); 
                    initPool(aSentinel); 
                } else { 
                    logger.info("message is not for master " + masterName); 
                } 

            } 
        } 
    } 
}
4) JedisSentinelSlaveConnectionFactory.java
package redis.clients.jedis; 

import org.apache.commons.pool2.impl.GenericObjectPoolConfig; 
import org.springframework.data.redis.connection.RedisNode; 
import org.springframework.data.redis.connection.RedisSentinelConfiguration; 
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; 
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; 
import org.springframework.lang.Nullable; 
import org.springframework.util.CollectionUtils; 
import redis.clients.util.Pool; 

import javax.net.ssl.HostnameVerifier; 
import javax.net.ssl.SSLParameters; 
import javax.net.ssl.SSLSocketFactory; 
import java.time.Duration; 
import java.util.*; 

public class JedisSentinelSlaveConnectionFactory extends JedisConnectionFactory { 
    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig) { 
        super(sentinelConfig); 
    } 

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisClientConfiguration clientConfig){ 
        super(sentinelConfig,clientConfig); 
    } 

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisPoolConfig poolConfig) { 
        super(sentinelConfig,poolConfig); 
    } 

    @Override 
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){ 
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); 
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), 
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); 
    } 

    private int getConnectTimeout() { 
        return Math.toIntExact(getClientConfiguration().getConnectTimeout().toMillis()); 
    } 

    private Set<String> convertToJedisSentinelSet(Collection<RedisNode> nodes) { 

        if (CollectionUtils.isEmpty(nodes)) { 
            return Collections.emptySet(); 
        } 

        Set<String> convertedNodes = new LinkedHashSet<>(nodes.size()); 
        for (RedisNode node : nodes) { 
            if (node != null) { 
                convertedNodes.add(node.asString()); 
            } 
        } 
        return convertedNodes; 
    } 

    private int getReadTimeout() { 
        return Math.toIntExact(getClientConfiguration().getReadTimeout().toMillis()); 
    } 

    static class MutableJedisClientConfiguration implements JedisClientConfiguration { 

        private boolean useSsl; 
        private @Nullable 
        SSLSocketFactory sslSocketFactory; 
        private @Nullable 
        SSLParameters sslParameters; 
        private @Nullable 
        HostnameVerifier hostnameVerifier; 
        private boolean usePooling = true; 
        private GenericObjectPoolConfig poolConfig = new JedisPoolConfig(); 
        private @Nullable 
        String clientName; 
        private Duration readTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT); 
        private Duration connectTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT); 

        public static JedisClientConfiguration create(JedisShardInfo shardInfo) { 

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration(); 
            configuration.setShardInfo(shardInfo); 
            return configuration; 
        } 

        public static JedisClientConfiguration create(GenericObjectPoolConfig jedisPoolConfig) { 

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration(); 
            configuration.setPoolConfig(jedisPoolConfig); 
            return configuration; 
        } 

        /* (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUseSsl() 
         */ 
        @Override 
        public boolean isUseSsl() { 
            return useSsl; 
        } 

        public void setUseSsl(boolean useSsl) { 
            this.useSsl = useSsl; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslSocketFactory() 
         */ 
        @Override 
        public Optional<SSLSocketFactory> getSslSocketFactory() { 
            return Optional.ofNullable(sslSocketFactory); 
        } 

        public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) { 
            this.sslSocketFactory = sslSocketFactory; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters() 
         */ 
        @Override 
        public Optional<SSLParameters> getSslParameters() { 
            return Optional.ofNullable(sslParameters); 
        } 

        public void setSslParameters(SSLParameters sslParameters) { 
            this.sslParameters = sslParameters; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getHostnameVerifier() 
         */ 
        @Override 
        public Optional<HostnameVerifier> getHostnameVerifier() { 
            return Optional.ofNullable(hostnameVerifier); 
        } 

        public void setHostnameVerifier(HostnameVerifier hostnameVerifier) { 
            this.hostnameVerifier = hostnameVerifier; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUsePooling() 
         */ 
        @Override 
        public boolean isUsePooling() { 
            return usePooling; 
        } 

        public void setUsePooling(boolean usePooling) { 
            this.usePooling = usePooling; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getPoolConfig() 
         */ 
        @Override 
        public Optional<GenericObjectPoolConfig> getPoolConfig() { 
            return Optional.ofNullable(poolConfig); 
        } 

        public void setPoolConfig(GenericObjectPoolConfig poolConfig) { 
            this.poolConfig = poolConfig; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getClientName() 
         */ 
        @Override 
        public Optional<String> getClientName() { 
            return Optional.ofNullable(clientName); 
        } 

        public void setClientName(String clientName) { 
            this.clientName = clientName; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getReadTimeout() 
         */ 
        @Override 
        public Duration getReadTimeout() { 
            return readTimeout; 
        } 

        public void setReadTimeout(Duration readTimeout) { 
            this.readTimeout = readTimeout; 
        } 

        /* 
         * (non-Javadoc) 
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getConnectTimeout() 
         */ 
        @Override 
        public Duration getConnectTimeout() { 
            return connectTimeout; 
        } 

        public void setConnectTimeout(Duration connectTimeout) { 
            this.connectTimeout = connectTimeout; 
        } 

        public void setShardInfo(JedisShardInfo shardInfo) { 

            setSslSocketFactory(shardInfo.getSslSocketFactory()); 
            setSslParameters(shardInfo.getSslParameters()); 
            setHostnameVerifier(shardInfo.getHostnameVerifier()); 
            setUseSsl(shardInfo.getSsl()); 
            setConnectTimeout(Duration.ofMillis(shardInfo.getConnectionTimeout())); 
            setReadTimeout(Duration.ofMillis(shardInfo.getSoTimeout())); 
        } 
    } 
}

4.2 测试

在应用中,只需配置如下的JedisSentinelSlaveConnectionFactory,Spring Boot会自动配置一个
RedisTemplate redisTemplate和StringRedisTemplate stringRedisTemplate;
在代码中使用@Autowired注入即可。

@Bean 
    public RedisConnectionFactory jedisConnectionFactory() { 
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration() 
                .master("mymaster") 
                .sentinel("192.168.0.1", 26479) 
                .sentinel("192.168.0.2", 26479) 
                .sentinel("192.168.0.3", 26479); 
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd)); 
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder() 
            .clientName("MyRedisClient") 
            .build(); 
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory; 
    } 
1) pom.xml
<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>com.example</groupId> 
<artifactId>demo</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>demo</name> 
<description>Demo project for Spring Boot</description> 

<parent> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-parent</artifactId> 
<version>2.0.1.RELEASE</version> 
<relativePath/> <!-- lookup parent from repository --> 
</parent> 

<properties> 
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
<java.version>1.8</java.version> 
</properties> 

<dependencies> 
<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-data-redis</artifactId> 
</dependency> 

<dependency> 
<groupId>redis.clients</groupId> 
<artifactId>jedis</artifactId> 
<version>2.9.0</version> 
</dependency> 

<dependency> 
<groupId>com.jack.yin</groupId> 
<artifactId>redis-sentinel-slave-connection-factory</artifactId> 
<version>1.0-SNAPSHOT</version> 
</dependency> 

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-test</artifactId> 
<scope>test</scope> 
</dependency> 
</dependencies> 

<build> 
<plugins> 
<plugin> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-maven-plugin</artifactId> 
</plugin> 
</plugins> 
</build> 

</project> 
2) RedisConfiguration.java
package com.jack.yin.redis.configuration; 

import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.data.redis.connection.RedisConnectionFactory; 
import org.springframework.data.redis.connection.RedisPassword; 
import org.springframework.data.redis.connection.RedisSentinelConfiguration; 
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; 
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; 
import redis.clients.jedis.JedisPoolConfig; 
import redis.clients.jedis.JedisSentinelSlaveConnectionFactory; 

@Configuration 
public class RedisConfiguration { 
    @Value("${spring.redis.password}") 
    private String redisPasswd; 

    @Bean 
    public RedisConnectionFactory jedisConnectionFactory() { 
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration() 
                .master("mymaster") 
                .sentinel("192.168.0.1", 26479) 
                .sentinel("192.168.0.2", 26479) 
                .sentinel("192.168.0.3", 26479); 
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd)); 
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder() 
            .clientName("MyRedisClient") 
            .build(); 
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory; 
    } 

} 
3) RedisDemoApplication.java
package com.jack.yin.redis.demo; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 

@SpringBootApplication(scanBasePackages = "com.jack.yin.redis") 
public class RedisDemoApplication { 

  public static void main(String[] args) { 
    SpringApplication.run(RedisDemoApplication.class, args); 
  } 
} 
4) DemoApplicationTests.java
package com.jack.yin.redis.demo; 

import org.junit.Test; 
import org.junit.runner.RunWith; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.test.context.SpringBootTest; 
import org.springframework.data.redis.core.RedisTemplate; 
import org.springframework.data.redis.core.StringRedisTemplate; 
import org.springframework.test.context.junit4.SpringRunner; 

import java.util.Enumeration; 
import java.util.logging.ConsoleHandler; 
import java.util.logging.Level; 
import java.util.logging.LogManager; 
import java.util.logging.Logger; 

@RunWith(SpringRunner.class) 
@SpringBootTest(classes=RedisDemoApplication.class) 
public class DemoApplicationTests { 

@Autowired 
private RedisTemplate<String,String> redisTemplate; 
@Autowired 
private StringRedisTemplate stringRedisTemplate; 

    protected Logger log = Logger.getLogger(getClass().getName()); 

    @Test 
    public void testGetAndSet() throws  Exception{ 
    System.out.println(redisTemplate.opsForValue().get("hello")); 
    redisTemplate.opsForValue().set("set-key","don't allowed to set"); 
    //org.springframework.dao.InvalidDataAccessApiUsageException: READONLY You can't write against a read only slave.; 
    System.out.println(redisTemplate.opsForValue().get("sss")); 
    System.out.println(redisTemplate.opsForValue().get("bbb")); 
  } 

} 

5. 总结

优点:
连接池中的连接是随机建立的到所有slave的连接
当监测到master失效转移会自动初始化连接池,确保不会连接到master上
新增slave时可以自动被发现
slave下线会被自动侦测到,然后重新初始化连接池,确保不会连接到已经下线的slave
缺点:
reids slave 需要设置slave-read-only yes
slave同步master数据需要时间,在一个短暂时间内master和slave数据会不一致

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/spring-boot/2683.html | 边城网事

该日志由 边城网事 于2018年05月29日发表在 Spring Boot, 缓存 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 基于Spring 的 Redis Sentinel 读写分离 Slave 连接池 | 边城网事

基于Spring 的 Redis Sentinel 读写分离 Slave 连接池:目前有12 条留言

  1. 0楼
    thomas:

    方法 validateObject() 需要重新改写,验证连接,不能用从节点的host、port与哨兵节点的host、port作比较

    2020-03-30 15:14 [回复]
    • 边城网事:

      感谢指出问题.
      感觉这篇文章有不少人看了,更新了下格式,方便阅读.

      2020-05-27 14:48 [回复]
  2. 0楼
    thomas:

    SentinelSlaveChangePubSub中,当master节点挂掉,主从切换,needResetPool为true时,调用方法initsentinels(),因为旧的监听器没有清空,会导致masterListeners数量一直增加

    2020-03-27 16:48 [回复]
    • thomas:

      上述问题解决:可以加一个判断,在初始化保证MasterListener的创建,后续调用initSentinels()时,不在创建listener;
      新问题2:从slave请求数据,当请求次数达到最大连接数时,无响应;可能存在连接使用后未归还or连接未能重复利用

      2020-03-27 17:13 [回复]
      • thomas:

        针对问题2,可在类JedisSentinelSlavePool中,重写方法getResource():

        @Override
        public Jedis getResource() {
        while (true) {
        Jedis jedis = super.getResource();
        jedis.setDataSource(this);

        // get a reference because it can change concurrently
        final HostAndPort sentinel = currentSentinel;
        Jedis jedisSentinel = new Jedis(sentinel.getHost(), sentinel.getPort());
        List<Map> slaves = jedisSentinel.sentinelSlaves(this.masterName);
        logger.debug(“sentinel 节点中,从节点状态:{}”, JSON.toJSONString(slaves));
        final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient()
        .getPort());

        //从节点只读连接池中获取的连接是否有效
        //如果从节点已不存在
        if(slaves.isEmpty()){
        logger.debug(“从节点为空,slaves is empty,当前连接需回收!host:{},port:{}”,connection.getHost(),connection.getPort());
        returnBrokenResource(jedis);
        }
        //是否在当前从节点列表中
        boolean isExist = false;
        for(Map slaveInfo:slaves){
        if(slaveInfo.get(“ip”).equals(connection.getHost())
        && slaveInfo.get(“port”).equals(String.valueOf(connection.getPort()))){
        isExist = true;
        if(slaveInfo.get(“flags”).contains(“s_down”)
        || slaveInfo.get(“master-link-status”).contains(“err”)){
        logger.debug(“从节点已不存在,当前连接需回收!host:{},port:{}”,connection.getHost(),connection.getPort());
        returnBrokenResource(jedis);
        break;
        }
        logger.debug(“获取slave连接,host:{},port:{}”,connection.getHost(),connection.getPort());
        return jedis;
        }
        }

        if(!isExist){
        returnBrokenResource(jedis);
        }
        }
        }

        2020-03-27 19:10 [回复]
      • thomas:

        1.因为sentinel的心跳频率为1秒,所以在这1秒内获取的只读连接可能为无效,可在读异常时切换到master连接;
        2.哨兵模式下,redis slave 应该不需要设置 slave-read-only yes ; 自测发现,即使不设置也能保证只读模式;

        2020-03-27 19:19 [回复]
  3. 0楼
    路过贵宝地:

    有个问题请教下,redis slaves设置slave-read-only yes 后当主从切换时候,备机变成主机会影响写入么。

    2019-09-30 16:23 [回复]
  4. 0楼
    路过贵宝地:

    有个问题请教下。

    2019-09-30 16:21 [回复]
  5. 0楼
    jikeou:

    JedisSentinelSlavePool 根本就没有去获取slave的信息

    2018-07-20 13:53 [回复]
    • 边城网事:

      连接池有JedisSentinelSlaveFactory来代理,JedisSentinelSlaveFactory实现了PooledObjectFactory
      在makeObject()中首先根据配置的Sentinel Set找到一个可用的sentinel连接,然后执行sentinel slaves master_name获取所有slave列表
      随机选择一个slave创建连接。

      2018-07-21 13:21 [回复]
    • 边城网事:

      看下 JedisSentinelSlaveFactory.makeObject()方法就明白了

      2018-07-21 13:30 [回复]

发表评论

快捷键:Ctrl+Enter