欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python sqlalchemy增删改查,多表查询join操作

程序员文章站 2024-03-03 22:44:10
...

sqlalchemy对象:

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import BIGINT
from sqlalchemy import INTEGER
from sqlalchemy import String
from db_manager import TTDModel


class CertTB(TTDModel):
    """
    证书表
    """
    __tablename__ = 'cert_info'
    uuid = Column(BIGINT, primary_key=True)
    create_user_name = Column(String(255), default=None)  #
    create_user_uuid = Column(BIGINT, default=None)  #
    file_paths = Column(String(255), default=None)  #
    file_ids = Column(String(255), default=None)  #
    cert_no = Column(String(255), default=None)  #
    cert_type = Column(INTEGER, default=None)  # 1:t1 2:t2
    validity_start_time = Column(DateTime, default=None)  #
    validity_end_time = Column(DateTime, default=None)  #

    def __init__(self):
        self.__table__ = None

    def get_columns(self):
        return [col.name for col in self.__table__.columns]

    def to_dict(self):
        return dict([(col, getattr(self, col)) for col in self.get_columns()])

db:

engine = create_engine(DB_URI,
                       pool_size=100,
                       max_overflow=0,
                       pool_timeout=60,  # 超时时间
                       pool_pre_ping=True)  # 从连接池获取连接之前检查有效性

Session = sessionmaker(bind=engine)
db = Session()

1.添加/新增

		# 获取用户的请求参数
		cType = request.form['cType']
        cNo = request.form['cNo']
        startDate = request.form['startDate']
        endDate = request.form['endDate']
        filePaths = request.form.get("filePaths");
        if filePaths:
            filePaths = request.form['filePaths']
		
		#组装对象数据
        cert = CertTB()
        cert.cert_no = cNo
        cert.cert_type = cType
        if filePaths:
            cert.file_paths = filePaths
        cert.validity_start_time = date_format.get_date_str_ymdhms(startDate)
        cert.validity_end_time = date_format.get_date_str_ymdhms(endDate)
        db.add(cert)   # 添加对象数据到数据库
        db.commit()   # 提交
        db.close()		# 关闭数据库连接

2 更新
①查询结果直接更新

db.query(Certificate) \    #查询证书
        .filter(Certificate.uuid == cert_id) \  # 查询条件
        .update({Certificate.cNumber: cert_code,   # “:”后面部分是更新后的值
                 Certificate.operationCer: cert_type,
                 Certificate.startDate: approve_date,
                 Certificate.cExpireDate: end_date})
db.flush()

②查询结果,更新结果,提交

		uuid = request.form['certId']
        cType = request.form['cType']
        cNo = request.form['cNo']
        startDate = request.form['startDate']
        endDate = request.form['endDate']
        filePaths = request.form.get("filePaths");
        if filePaths:
            filePaths = request.form['filePaths']

        cert = db.query(CertTB).filter(CertTB.uuid == uuid).first()
        if not cert:
            return make_api_respone(201, '未找到相关技工认证证书', {}, None)
        cert.cert_no = cNo
        cert.cert_type = cType
        if filePaths:
            cert.file_paths = filePaths
        cert.validity_start_time = startDate   # 设置新数据
        cert.validity_end_time = endDate    # 设置新数据
        db.commit()  # 提交更新
        db.close()

3 删除

db.query(CerAttachment).filter(CerAttachment.cerId == cert_id).delete()   # 查询并删除
db.flush()  # 处理

4 查询
单表查询就不介绍了,这里直接多表查询,join操作【提示:join操作是一个费时操作,尽量少用】

	hlog.info("查询count") 
    count = db.query(EvInfo.uuid.label("evId"), EvInfo.evOrder, ProjectInfo.name, ProjectInfo.address) \
        .filter(and_(ProjectInfo.city == city_id, ProjectInfo.name.like("%" + project_name + "%"))) \
        .join(ProjectInfo, EvInfo.projectId == ProjectInfo.uuid) \
        .count()
    hlog.info("批量查询电梯")
    data = db.query(EvInfo.uuid.label("evId"), EvInfo.evOrder, ProjectInfo.name, ProjectInfo.address) \
        .filter(and_(ProjectInfo.city == city_id, ProjectInfo.name.like("%" + project_name + "%"))) \
        .join(ProjectInfo, EvInfo.projectId == ProjectInfo.uuid) \		# join 项目表
        .limit(page_size) \   # page_size 分页—每页的数量
        .offset((page_num - 1) * page_size) \    # page_num 当前页数
        .all()

下面这段代码:查询在CLocation中“不存在”的WoInfo

data = db.query(WoInfo.uuid.label("woId"), WoInfo.type.label('woType'), CElevator.evId, WoInfo.date) \
        .filter(and_(WoInfo.userId == user_id, ~exists().where(CLocation.woId == WoInfo.uuid))) \
        .join(CElevator, CElevator.woId == WoInfo.uuid) \
        .limit(page_size) \
        .offset((page - 1) * page_size) \
        .all()