python中flask接口的增删查改
程序员文章站
2023-10-27 15:17:52
import jsonfrom flask import Flask, requestapp = Flask(__name__)import decimal# 防止json报错class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return float(o) return super(De...
import json
from flask import Flask, request
app = Flask(__name__)
import decimal
# 防止json报错
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
return super(DecimalEncoder, self).default(o)
# 只接受get方法访问
@app.route("/getUser", methods=["GET"])
def getUser():
# 默认返回内容
ret_dict = {"code": 200, "msg": "success", "data": False}
# 判断入参是否为空
if request.args.to_dict() == {}:
ret_dict["code"] = 5004
ret_dict["msg"] = "param invalid"
return json.dumps(ret_dict, ensure_ascii=False)
# 获取传入的params参数
param = request.args.to_dict()
my_id = param.get("my_id")
resp = getUserModel(my_id)
# 对参数进行操作
ret_dict["data"] = resp
return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)
@app.route("/delete", methods=["DELETE"])
def delete():
ret_dict = {"code": 200, "msg": "success", "data": False}#成功标准
param = request.args.to_dict()
my_id = param.get("my_id")
deleteUserModel(my_id)
return json.dumps(ret_dict,ensure_ascii=False)
# 改
@app.route("/update", methods=["PATCH"])
def update():
# 默认返回内容
ret_dict = {"code": 200, "msg": "success", "data": False}
data = {}
param = request.args.to_dict()
for k in ["my_id","name","player_name","height"]:
data[k] = param.get(k)
ppd= updateuser(data)
# 对参数进行操作
ret_dict["data"] = ppd
return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)
@app.route("/add", methods=["POST"])
def add():
ret_dict = {"code":200,"msg":"success","date":False}
ad={}
param = request.args.to_dict()
for k in ["my_id", "name", "player_name", "height"]:
ad[k] = param.get(k)
pdd = adduser(ad)
# 对参数进行操作
ret_dict["data"] = pdd
return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)
import sqlalchemy as sq
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, Float, create_engine
from sqlalchemy.ext.declarative import declarative_base
# 连接数据库
engine = create_engine('mysql+pymysql://用户名:密码@数据库ip/数据库名')
# 注意下载pymysql的库
# 创建基类
Base = declarative_base()
# 定义User对象
class Model(Base):
# 表的名字
__tablename__ = 'user'
# 表的结构
my_id = Column(Integer, primary_key=True)
# primary_key若为True,这列就是表的主键
name = Column(String(20))
player_name = Column(String(255))
height = Column(Float(3, 2))
# 创造字典
def dict(self):
return {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}
# 改
# def updateUser2( name, play_name, heghit):
# user = session.query(Model).filter_by(my_id=my_id).first()
# if user is not None:
# user.name = name
# user.play_name = play_name
# user.heghit = heghit
# session.commit()
DBSession = sessionmaker(bind=engine)
# 创建 session 对象:n
session = DBSession()
Base.metadata.create_all(engine)
# 加
def adduser(ad):
art = Model(player_name=ad["player_name"],height=ad["height"],name=ad["name"])
session.add(art)
session.commit()
# 查
def getUserModel( my_id):
row = session.query(Model).get(my_id)
if row is None:
return {}
else:
return row.dict()
# 删
def deleteUserModel( my_id):
lie = session.query(Model).filter_by(my_id=my_id).first()
if lie is not None:
session.delete(lie)
session.commit()
# 改
def updateuser(data):
user = session.query(Model).filter_by(my_id=data["my_id"]).first()
if user is not None:
user.name=data["name"]
user.player_name=data["player_name"]
user.height=data["height"]
session.commit()
# 关闭模型
def close():
session.close()
# 数据库调用
# row= getUser(session)
# print([row.dict() for row in row])
# updateUser("")
# u.delete()
# adduser(session)
# rows = getUser(session)
# print([row.dict() for row in rows])
# close()
if __name__ == '__main__':
app.run()
本文地址:https://blog.csdn.net/ddym66/article/details/107563934
下一篇: 吃鳕鱼的好处