Python3 Tornado 限制 IP在时间段内请求次数
程序员文章站
2022-04-02 11:04:52
原因:有个接口服务需要限制接口请求次数。处理逻辑:1、新建空列表;2、有IP 访问则记录 IP、时间、次数; eg:[{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]3、判断当前时间和列表中的IP访问时间是否相同: ①、相同则增加访问次数; ②、不同则更改原有的访问时间和次数;4、同一秒访问10次则限制3分钟后再......
原因:有个接口服务需要限制接口请求次数。
处理逻辑:
1、新建空列表;
2、有IP 访问则记录 IP、时间、次数; eg: [{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]
3、判断当前时间和列表中的IP访问时间是否相同:
①、相同则增加访问次数;
②、不同则更改原有的访问时间和次数;
4、同一秒访问10次则限制3分钟后再访问;
5、防止列表一直存储着信息,需要在限制的时候清空,防止占满内存。
代码:
import datetime # 时间包
#第三方 格式化时间包 pip install python-dateutil
from dateutil.relativedelta import relativedelta
expire_time = '' # 过期时间
iplist = [] # 存放访问的IP 格式 [{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]
def counts(host):
global expire_time, iplist
key_list = [] # 临时存储列表中的IP键
index_list = {} # 临时存储列表中的下标值
datetimes = datetime.datetime.now()
nowtime = datetimes.strftime("%Y-%m-%d %H:%M:%S") # 转化成字符串便于存储
print('nowtime', nowtime)
if expire_time == '': # 判断无过期时间
if iplist: # 判断有访问IP
for index, item in enumerate(iplist): # 将原有的访问IP遍历出来
tmp_key = tuple(item.keys())[0] # 取出第一层字典键
if tmp_key not in key_list: # 不在临时IP存储列表中
key_list.append(tmp_key) # 不在临时IP列表则加入
tmp_key_dict = {tmp_key: index} # 拼接新临时存储IP下标,方便从IPlist中取值
index_list.update(tmp_key_dict) # 更新到临时存储IP字典中
print('iplist', iplist)
print("key_list", key_list)
print("index_list", index_list)
if host in key_list: # 在列表中则判断时间和次数
index = index_list[host] # 取IP在IPList中的下标
item = iplist[index] # 取出IPList对应的字典值
times_key = tuple(item[host].keys())[0] # 取出IPList的第二层时间键
# 转为时间格式进行时间间隔计算
time_key = datetime.datetime.strptime(times_key, '%Y-%m-%d %H:%M:%S')
# print("时间格式",type(datetimes - time_key),datetimes - time_key)
# >>> 时间格式 <class 'datetime.timedelta'> 0:00:00.213644
# datetime.timedelta 格式数据通过 .seconds取相差的秒数,取分钟及其他类型会报错,有了解的大佬欢迎在评论区指教!
if (datetimes - time_key).seconds == 0: # 时间相同 则判断是否超过10次
if item[host][times_key] >= 10: # 同一秒访问等于或大于10 就不允许请求接口了
# 设置过期时间
expire_time = (datetimes + relativedelta(minutes=+3)).strftime("%Y-%m-%d %H:%M:%S")
iplist = [] # 重置记录列表
# 返回限制结果
return {'code': -1, 'msg': "Error", 'data': "访问频率过快,请3分钟后再访问!"}
else: # 不等于10次 则增加该IP请求次数
num = item[host].get(times_key, 1) + 1 # 取出该字典的次数+1
item[host][times_key] = num # 重新赋值给该IP
else: # 时间不同则重新给IP计数
item[host] = {nowtime: 1} # 重置该IP访问次数
else: # 表示第一次访问接口
tmp_data = {host: {nowtime: 1}} # 构建该IP的访问次数
iplist.append(tmp_data) # 添加到列表中
else: # 判断无访问IP即表示第一次访问接口
tmp_data = {host: {nowtime: 1}} # 构建该IP的访问次数
iplist.append(tmp_data) # 添加到列表中
else: # 限制时间已过就可以访问接口
if nowtime == expire_time:
tmp_data = {host: {nowtime: 1}} # 构建该IP的访问次数
iplist.append(tmp_data) # 添加到列表中
else: #限制时间未过则需要返回限制结果,否则返回None
iplist = [] # 重置记录列表
return {'code': -1, 'msg': "Error", 'data': "访问频率过快,请3分钟后再访问!"}
if __name__ == '__main__':
for i in range(11):
print('遍历的次数:【%s】' % i)
print(counts("192.168.1.124"))
print(counts("192.168.1.125"))
本文地址:https://blog.csdn.net/qq_42142258/article/details/110877394