欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Django使用Redis(自动缓存装饰器及封装好的工具类)

程序员文章站 2022-03-31 11:57:40
【Django基础】Django使用Redis,自动缓存装饰器,有则获取,无则自动缓存...

pip安装包
pip install django-redis4.8.0
pip install python-memcached
1.58

django setting.py 配置:

REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_NO = 11
REDIS_PASSWORD = 'Adminunicorn.yt'  # 线上
REDIS_TIMEOUT = 60 * 60 * 24 * 1
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://' + REDIS_HOST + ':' + str(REDIS_PORT) + '/' + str(REDIS_NO),
        "OPTIONS": {
            "PASSWORD": REDIS_PASSWORD,
            "IGNORE_EXCEPTIONS": True,  # memcached 异常行为
            "CONNECTION_POOL_KWARGS": {"max_connections": 100},  # 配置默认连接池
            "PICKLE_VERSION": -3,
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        },
    },
}

封装好的缓存工具类

# -*- coding:utf-8 -*-
"""
Created on 2018/12/03

@author: cjh
缓存工具类,操作redis
"""
import json
import datetime
from django.core.cache import cache


def exist_cache(key):
    """
    判断缓存数据是否存在
    :param key: 唯一标识
    :return: 存在结果
    """

    is_exist = False
    if key in cache:
        is_exist = True

    return is_exist


def clear_cache(key):
    """
    清除缓存数据
    :param key:唯一标识
    """
    if key in cache:
        cache.delete(key)


def clear_pattern_cache(fuzzy_key):
    """
    清除缓存数据
    :param fuzzy_key:模糊标识
    :return:
    """
    cache.delete_pattern("*%s*" % fuzzy_key)


def write_data_to_cache(key, value, expire=None):
    """
    将数据直接写入缓存
    :param key:唯一标识
    :param value:写入的数据
    :param expire:失效时间(单位为秒),默认为不失效
    """
    cache.set(key, value, timeout=expire)


def write_json_to_cache(key, value, expire=None):
    """
    将数据转为json格式写入缓存
    :param key:唯一标识
    :param value:写入的数据
    :param expire:失效时间(单位为秒),默认为不失效
    """
    cache.set(key, json.dumps(value, cls=CJsonEncoder), timeout=expire)


def write_counter_to_cache(key, step=1, expire=None):
    """
    计数器写入缓存
    :param key:唯一标识
    :param step:计数器每次递增的值(支持负数进行递减),默认每次递增1
    :param expire:失效时间(单位为秒),默认为不失效
    """
    if key in cache:
        cache.incr(key, step)
    else:
        cache.set(key, 1, expire)


def read_pattern_data_from_cache(fuzzy_key):
    """
    读取缓存
    :param fuzzy_key:模糊标识
    :return:缓存数据
    """
    keys = cache.keys("*%s*" % fuzzy_key)
    data = []
    for key in keys:
        data.append({
            'key': key,
            'value': cache.get(key)
        })
    return data


def read_data_from_cache(key):
    """
    直接读取缓存
    :param key:唯一标识
    :return:缓存数据
    """
    if key in cache:
        data = cache.get(key)
    else:
        data = None
    return data


def read_json_from_cache(key):
    """
    读取json格式缓存
    :param key:唯一标识
    :return:缓存进行json解析后数据
    """
    if key in cache:
        data = json.loads(cache.get(key))
    else:
        data = None
    return data


class CJsonEncoder(json.JSONEncoder):
    """
    重写构造json类,遇到日期特殊处理
    """

    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, datetime.date):
            return obj.strftime("%Y-%m-%d")
        else:
            return json.JSONEncoder.default(self, obj)


def redis_caches():
    """
    不失效缓存,key为加载此装饰器的函数名
    :param:
    :return:
    """

    def _deco(func):
        def __deco(*args, **kwargs):
            redis_key = func.__name__
            cache_info = read_data_from_cache(redis_key)
            if not cache_info:
                result = func(*args, **kwargs)
                write_data_to_cache(redis_key, result)
            else:
                result = cache_info
            return result

        return __deco

    return _deco

缓存装饰器使用方式

自己定义数据获取方式,使用上面的装饰器,即可实现自动缓存,直接调用就会拿到数据,当缓存中没有的时候,会先自动写入缓存,再返回缓存中的值。

@redis_caches()
def get_lot_device_type():
    """
        获取物联网设备及类型
    :return:
    """
    equip_dict = dict()
    for i in LotDevice.objects.filter(status='a').select_related('type'):
        if i.gateway_address in equip_dict:
            continue
        else:
            equip_dict[str(i.gateway_address)] = {'id': i.id,
                                                  'name': i.name,
                                                  'address': i.address,
                                                  'gateway_address': i.gateway_address,
                                                  'type_id': i.type_id,
                                                  'type_name': i.type.name if i.type else '',
                                                  'type_code': i.type.code if i.type else '',
                                                  'type_name_eng': i.type.name_eng if i.type else '',
                                                  }

    return equip_dict

本文地址:https://blog.csdn.net/weixin_37923411/article/details/107095100