欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Sprintboot redis 采用gzip和Snappy compress压缩数据

程序员文章站 2024-03-13 23:53:10
...

1 前言

怎么使用和结合Sprintboot + redis ,可以参照我前面的文章:https://blog.****.net/zzhongcy/article/details/102584028

这里主要讲述当生产环境中,单个redis数据很大时,我们可能就要考虑压缩数据后再存入redis了。

压缩数据优缺点:

  1. 优点1:压缩会减少redis存储数据量,增加redis的吞吐量
  2. 优点2:压缩会较少网络带宽
  3. 缺点就是会增加CPU消耗

2 Sprintboot redis配置

有两种配置方式,如下

2.1 方式1:RedisTemplate 配置

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);

        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);

        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);

        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();
        return template;
    }
}

其中存储方式代码是:

// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);

2.2 方式2:RedisCacheConfiguration 配置

@Bean
public RedisCacheConfiguration redisCacheConfiguration (long defaultExpiration ) {

    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);

    //设置压缩格式
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.activateDefaultTyping(
                    LaissezFaireSubTypeValidator.instance,
                    ObjectMapper.DefaultTyping.NON_FINAL,
                    JsonTypeInfo.As.WRAPPER_ARRAY);
    
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

    //创建config
    RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
    return config.entryTtl( Duration.ofSeconds( defaultExpiration ) )
         .serializeKeysWith( SerializationPair.fromSerializer( new StringRedisSerializer() ) )
         .serializeValuesWith( SerializationPair.fromSerializer( jackson2JsonRedisSerializer) );
}

其中velue压缩设置是:

      .serializeValuesWith( SerializationPair.fromSerializer( jackson2JsonRedisSerializer)

      这里使用了自定义的Jackson2JsonRedisSerializer类,

      json数据内部包含java.util.ArrayList、com.server.model.class等类型字符串。

当然也可以使用默认的泛型格式GenericJackson2JsonRedisSerializer,如下:

      .serializeValuesWith( SerializationPair.fromSerializer( new GenericJackson2JsonRedisSerializer())

       json数据内部包含java.util.ArrayList、com.server.model.class、@class等类型字符串。

 

3 数据压缩

3.1 gzip压缩

public class RedisSerializerGzip extends JdkSerializationRedisSerializer {

    @Override
    public Object deserialize(byte[] bytes) {
        return super.deserialize(decompress(bytes));
    }

    @Override
    public byte[] serialize(Object object) {
        return compress(super.serialize(object));
    }


    private byte[] compress(byte[] content) {
        byte[] ret = null;
        ByteArrayOutputStream byteArrayOutputStream = null;
        try {
            byteArrayOutputStream = new ByteArrayOutputStream();
            GZIPOutputStream gzipOutputStream= new GZIPOutputStream(byteArrayOutputStream);
            gzipOutputStream.write(content);
            //gzipOutputStream.flush();     //只调用flush不会刷新,压缩类型的流需要执行close或者finish才会完成
            stream.close();   //内部调用finish

            ret = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.flush();
            byteArrayOutputStream.close();
        } catch (IOException e) {
            throw new SerializationException("Unable to compress data", e);
        }
        return ret;
    }

    private byte[] decompress(byte[] contentBytes) {
        byte[] ret = null;
        ByteArrayOutputStream out = null;
        try {
            out = new ByteArrayOutputStream();
            GZIPInputStream stream = new GZIPInputStream(new ByteArrayInputStream(contentBytes));
            IOUtils.copy(stream, out);
            stream.close();

            ret = out.toByteArray();
            out.flush();
            out.close();
        } catch (IOException e) {
            throw new SerializationException("Unable to decompress data", e);
        }
        return ret;
    }

}

注意:

//gzipOutputStream.flush();     //只调用flush不会刷新,压缩类型的流需要执行close或者finish才会完成
gzipOutputStream.close();   //内部调用finish

不然压缩数据不完成,解压会报错:

java.io.EOFException : Unexpected end of ZLIB input stream

at java.util.zip.InflaterInputStream.fill( InflaterInputStream.java:240)

at java.util.zip.InflaterInputStream.read( InflaterInputStream.java:158)

at java.util.zip.GZIPInputStream.read( GZIPInputStream.java:117)

 

3.1.1 自定缓冲区解压缩方式:

/**
     * GZIP解压缩
     * 
     * @param bytes
     * @return
     */
    public static byte[] uncompress(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        try {
            GZIPInputStream ungzip = new GZIPInputStream(in);
            byte[] buffer = new byte[256];
            int n;
            while ((n = ungzip.read(buffer)) >= 0) {
                out.write(buffer, 0, n);
            }
        } catch (IOException e) {
            ApiLogger.error("gzip uncompress error.", e);
        }
 
        return out.toByteArray();
    }
 

    /**
     * 
     * @param bytes
     * @param encoding
     * @return
     */
    public static String uncompressToString(byte[] bytes, String encoding) {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        try {
            GZIPInputStream ungzip = new GZIPInputStream(in);
            byte[] buffer = new byte[256];
            int n;
            while ((n = ungzip.read(buffer)) >= 0) {
                out.write(buffer, 0, n);
            }
            return out.toString(encoding);
        } catch (IOException e) {
            ApiLogger.error("gzip uncompress to string error.", e);
        }
        return null;
    }

3.1.2 IOUtils.copy源码解读

其实IOUtils.copy函数内部,也是调用while ((n = ungzip.read(buffer)) >= 0)实现的。

public static int copy(InputStream input, OutputStream output) throws IOException {
        long count = copyLarge(input, output);
        return count > 2147483647L ? -1 : (int)count;
    }

    public static long copyLarge(InputStream input, OutputStream output) throws IOException {
        byte[] buffer = new byte[4096];
        long count = 0L;

        int n;
        for(boolean var5 = false; -1 != (n = input.read(buffer)); count += (long)n) {
            output.write(buffer, 0, n);
        }

        return count;
    }

只是IOUtils.copy内部缓冲区大小是4k,如果设置大点可能会提高读取速度。

 

3.2 Snappy压缩

import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.util.SerializationUtils;
import org.xerial.snappy.Snappy;
import java.io.Serializable;

public class RedisSerializerSnappy extends JdkSerializationRedisSerializer {

    private RedisSerializer<Object> innerSerializer;

    public RedisSerializerSnappy(RedisSerializer<Object> innerSerializer) {
        this.innerSerializer = innerSerializer;
    }

    @Override
    public Object deserialize(byte[] bytes) {
        return super.deserialize(decompress(bytes));
    }

    @Override
    public byte[] serialize(Object object) {
        return compress(super.serialize(object));
    }

    private byte[] compress(byte[] content) {
        try {
            byte[] bytes = innerSerializer != null ? innerSerializer.serialize(content)
                    : SerializationUtils.serialize((Serializable) content);
            return Snappy.compress(bytes);

        } catch (Exception e) {
            throw new SerializationException(e.getMessage(), e);
        }
    }

    private byte[] decompress(byte[] contentBytes) {
        try {
            byte[] bos = Snappy.uncompress(contentBytes);
            return (byte[]) (innerSerializer != null ? innerSerializer.deserialize(bos) : SerializationUtils.deserialize(bos));
        } catch (Exception e) {
            throw new SerializationException(e.getMessage(), e);
        }
    }
}

3.3 压缩设置

这里就简单说明RedisTemplate的配置,RedisCacheConfiguration的配置可以参考上面说明。

@Bean
    public RedisTemplate<Object, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);

        // Set a custom serializer that will compress/decompress data to/from redis
        RedisSerializerGzip serializerGzip = new RedisSerializerGzip();
        template.setValueSerializer(serializerGzip);
        template.setHashValueSerializer(serializerGzip);


        //RedisSerializerSnappy serializerSnappy = new RedisSerializerSnappy(null);
        //redisTemplate.setValueSerializer(serializerSnappy);
        //redisTemplate.setHashValueSerializer(serializerSnappy);

        return template;
    }

这样就实现了数据压缩。

4 终结

4.1 压缩率效果

我这里简单测试了一下压缩效率(注意:压缩效率与压缩内容有密切关系,这里只是简单采用字符串举例,进行测试):

  • 1、使用Jackson2JsonRedisSerializer,测试数据长度:13.70 Kb
  • 2、使用RedisSerializerGzip压缩,测试数据长度:9.19 Kb
  • 3、使用RedisSerializerSnappy压缩,测试数据长度:13.37 Kb

可以看出,RedisSerializerGzip压缩率高,而RedisSerializerSnappy压缩率低,原因是Snappy追求的是压缩速度而不是压缩率。采用何种压缩,根据大家项目的情况自行定夺吧。

4.2 性能对比:

可以参考我以前的一篇文章:

  压缩格式gzip/snappy/lzo/bzip2 比较与总结:https://blog.****.net/zzhongcy/article/details/89375346

压缩格式 压缩比 压缩速率 解压速率
gzip 13.4% 21 MB/s 118 MB/s
lzo 20.5% 135 MB/s 410 MB/s
snappy 22.2% 172 MB/s 409 MB/s
bzip2 13.2% 2.4MB/s 9.5MB/s

是否压缩数据以及使用何种压缩格式对性能具有重要的影响

 

5 其他序列换

5.1 Fst 序列化

import org.nustaq.serialization.FSTConfiguration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

import javax.xml.crypto.Data;
import java.time.LocalDateTime;
import java.util.Date;

/**
 * Description: Fst 序列化
 */
public class FstSerializer<T> implements RedisSerializer<T> {

    private static FSTConfiguration configuration = FSTConfiguration.createStructConfiguration();
    private Class<T> clazz;

    public FstSerializer(Class<T> clazz) {
        super();
        this.clazz = clazz;
        configuration.registerClass(clazz);
    }

    @Override
    public byte[] serialize(T t) throws SerializationException {
        return configuration.asByteArray(t);
    }

    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        return (T) configuration.asObject(bytes);
    }
}

5.2 Kryo 序列化

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.esotericsoftware.kryo.util.Pool;
import de.javakaffee.kryoserializers.ArraysAsListSerializer;
import de.javakaffee.kryoserializers.BitSetSerializer;
import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
import de.javakaffee.kryoserializers.JdkProxySerializer;
import de.javakaffee.kryoserializers.RegexSerializer;
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
import de.javakaffee.kryoserializers.URISerializer;
import de.javakaffee.kryoserializers.UUIDSerializer;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

import java.awt.print.Book;
import java.io.ByteArrayInputStream;
import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

/**
 * Description: Kryo 序列化.<br>
 */
public class KryoSerializer<T> implements RedisSerializer<T> {
    private static final int BUFFER_SIZE = 2048;
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private static Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, 8) {
        @Override
        protected Kryo create() {
            Kryo kryo = new Kryo();
            kryo.setRegistrationRequired(false);
            kryo.register(Book.class);
            kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
            kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
            kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
            kryo.register(InvocationHandler.class, new JdkProxySerializer());
            kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
            kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
            kryo.register(Pattern.class, new RegexSerializer());
            kryo.register(BitSet.class, new BitSetSerializer());
            kryo.register(URI.class, new URISerializer());
            kryo.register(UUID.class, new UUIDSerializer());
            UnmodifiableCollectionsSerializer.registerSerializers(kryo);
            SynchronizedCollectionsSerializer.registerSerializers(kryo);
            kryo.register(HashMap.class);
            kryo.register(ArrayList.class);
            kryo.register(LinkedList.class);
            kryo.register(HashSet.class);
            kryo.register(TreeSet.class);
            kryo.register(Hashtable.class);
            kryo.register(Date.class);
            kryo.register(Calendar.class);
            kryo.register(ConcurrentHashMap.class);
            kryo.register(SimpleDateFormat.class);
            kryo.register(GregorianCalendar.class);
            kryo.register(Vector.class);
            kryo.register(BitSet.class);
            kryo.register(StringBuffer.class);
            kryo.register(StringBuilder.class);
            kryo.register(Object.class);
            kryo.register(Object[].class);
            kryo.register(String[].class);
            kryo.register(byte[].class);
            kryo.register(char[].class);
            kryo.register(int[].class);
            kryo.register(float[].class);
            kryo.register(double[].class);
            return kryo;
        }
    };
    private static Pool<Output> outputPool = new Pool<Output>(true, false, 16) {
        @Override
        protected Output create() {
            return new Output(BUFFER_SIZE, -1);
        }
    };
    private static Pool<Input> inputPool = new Pool<Input>(true, false, 16) {
        @Override
        protected Input create() {
            return new ByteBufferInput(BUFFER_SIZE);
        }
    };
    private Class<T> clazz;

    public KryoSerializer(Class<T> clazz) {
        super();
        this.clazz = clazz;
    }

    @Override
    public byte[] serialize(T t) throws SerializationException {
        if (null == t) {
            return EMPTY_BYTE_ARRAY;
        }
        Kryo kryo = null;
        Output output = null;
        byte[] bytes;
        try {
            output = outputPool.obtain();
            kryo = kryoPool.obtain();
            kryo.writeClassAndObject(output, t);
            output.flush();
            return output.toBytes();
        } finally {
            if (output != null) {
                outputPool.free(output);
            }
            if (kryo != null) {
                kryoPool.free(kryo);
            }
        }
    }

    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        if (null == bytes || bytes.length <= 0) {
            return null;
        }
        Kryo kryo = null;
        Input input = null;
        try {
            input = inputPool.obtain();
            input.setInputStream(new ByteArrayInputStream(bytes));
            kryo = kryoPool.obtain();
            return (T) kryo.readClassAndObject(input);
        } finally {
            if (input != null) {
                inputPool.free(input);
            }
            if (kryo != null) {
                kryoPool.free(kryo);
            }
        }
    }
}

 

6 参考

https://*.com/questions/58829724/adding-compression-to-spring-data-redis-with-lettuceconnectionfactory

https://github.com/cboursinos/java-spring-redis-compression-snappy-kryo

https://ld246.com/article/1532328272348

https://gitee.com/SoftMeng/spring-boot-skill/tree/master/redis-serializer-line

相关标签: SpringBoot