向redis添加数据
程序员文章站
2024-01-02 11:48:22
from flask import Flask,send_file,send_from_directoryimport timeimport datetimeimport redisimport randomfrom flask import requestimport ioimport osfrom flask import Responseapp = Flask(__name__)r = redis.Redis()task_dick = {}@app.route('/c...
from flask import Flask,send_file,send_from_directory
import time
import datetime
import redis
import random
from flask import request
import io
import os
from flask import Response
app = Flask(__name__)
r = redis.Redis()
task_dick = {}
@app.route('/cc',methods=['POST','GET'])
def get_base64str():
if request.method == 'GET':
print("cc")
return send_from_directory(r"/root", filename="out.mp4",as_attachment=True )
@app.route('/bb',methods=['POST','GET'])
def stream_mp3():
def generate():
path = '/root/out.mp4'
with open(path, 'rb') as fmp3:
data = fmp3.read(1024)
while data:
yield data
data = fmp3.read(1024)
return Response(generate(), mimetype="video/mp4")
@app.route('/cc1',methods=['POST','GET'])
def start22():
if request.method == 'GET':
file=os.path.join("/root","out.mp4")
return send_file(file,as_attachment=True)
@app.route('/add2',methods=['POST','GET'])
def start1():
if request.method == 'GET':
print('redis======')
start_app()
return "ok"
@app.route('/add3',methods=['POST','GET'])
def start():
if request.method == 'GET':
b()
return "ok"
@app.route('/add',methods=['POST','GET'])
def add():
if request.method == 'GET':
print("生成任务时间号==========")
global task_dick
a=random.randint(0,9)
b=random.randint(0,9)
c=random.randint(0,9)
d=random.randint(0,9)
name="task"+str(a)+str(b)+str(c)+str(d)
task_id = str(name)
task_value=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
task_value_str =time_str_2_timestamp(task_value)
task_dick[name]=task_value_str
print("生成的任务",task_dick)
#b()
return 'ok'
def time_str_2_timestamp(time_str):
time_obj = datetime.datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
timestamp=time_obj.timestamp()
return timestamp
def start_app():
print('开始遍历,向数据库写入',task_dick)
print("查看全局数据,是否存在",task_dick)
for key in task_dick:
print("存在数据,添加中。。。。。。。")
time_str = task_dick[key]
#time_str1=str(time_str)
print("key=========",key)
print('value======="',time_str)
r.zadd('timed_task',key,time_str)
return "写入ok"
#def b():
from flask import Flask,send_file,send_from_directory
import time
import datetime
import redis
import random
from flask import request
import io
import os
from flask import Response
app = Flask(__name__)
r = redis.Redis()
task_dick = {}
@app.route('/add2',methods=['POST','GET'])
def start1():
if request.method == 'GET':
print('redis======')
start_app()
return "ok"
@app.route('/add3',methods=['POST','GET'])
def start():
if request.method == 'GET':
b()
return "ok"
@app.route('/add',methods=['POST','GET'])
def add():
if request.method == 'GET':
print("生成任务时间号==========")
global task_dick
a=random.randint(0,9)
b=random.randint(0,9)
c=random.randint(0,9)
d=random.randint(0,9)
name="task"+str(a)+str(b)+str(c)+str(d)
task_id = str(name)
task_value=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
task_value_str =time_str_2_timestamp(task_value)
task_dick[name]=task_value_str
print("生成的任务",task_dick)
#b()
return 'ok'
def time_str_2_timestamp(time_str):
time_obj = datetime.datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
timestamp=time_obj.timestamp()
return timestamp
def start_app():
print('开始遍历,向数据库写入',task_dick)
print("查看全局数据,是否存在",task_dick)
for key in task_dick:
print("存在数据,添加中。。。。。。。")
time_str = task_dick[key]
#time_str1=str(time_str)
print("key=========",key)
print('value======="',time_str)
r.zadd('timed_task',key,time_str)
return "写入ok"
#def b():
print("启动循环监听redis")
#while True:
#now = datetime.datetime.now()
#print('当前时间---now',now)
# now_lower = now.replace(second=0,microsecond=0)
#print('当前时间去0--now_lower',now_lower)
#now_higher = now_lower + datetime.timedelta(minutes=1)
#print("下一分钟时间·----now_higher",now_higher)
#bb = datetime.timedelta(minutes=1)
#print('拼接的时间尾巴',bb)
#print("时间上线---",now_lower.timestamp())
#print("时间下线---",now_higher.timestamp())
#print("时间类型---",type(now_higher.timestamp()))
#a=now_lower.timestamp()
#b=a-30
#c=a-60
#task_value=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
#task_value_str =time_str_2_timestamp(task_value)
#task_value_str1=task_value_str
#task_value_str2=task_value_str+30
#task_for_this_minute=r.zrangebyscore('timed_task',now_lower.timestamp(),now_higher.timestamp(),0,1000,withscores=True)
#print('现在运行的任务=========',task_for_this_minute)
#time.sleep(5)
if __name__ == '__main__':
app.run(host='192.168.56.102',port='5000',debug=True)
vip = {
'xiaoming':900,
'xiaoname':1000,
'xiaoce':800
}
normal = {
'one':1,
'two':1,
'three':1
}
#r.zadd("game",vip) # 注意没有引号
#a=r.zpopmax("game")
#print(a)
#a1=r.zpopmax("game")
#print(a1)
#a2=r.zpopmax("game")
#print(a3)
本文地址:https://blog.csdn.net/zhaojikun521521/article/details/110859248