欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SQLAlchemy 使用总结

程序员文章站 2024-03-02 21:59:16
...
    # coding: utf-8

    from sqlalchemy import * 
    import tushare as ts
    import pandas as pd
    from sqlalchemy.orm import sessionmaker,mapper
    from datetime import *

    engine = create_engine('mysql+pymysql://root:[email protected]/mystock?charset=utf8')

    #%%  1  hand-written SQL 方法
    result = engine.execute('select * from stock_basics where pe < %s', 2)

    # sqlalchemy推荐使用text()函数封装一下sql字符串,不同数据库, 可以使用统一的sql参数传递写法. 参数须以:号引出.
    result = engine.execute(text('select * from stock_basics where pe < :pe'), {'pe': 2})

    # 遍历result时, 得到的每一个行都是RowProxy对象, 获取字段的方法非常灵活, 下标和字段名甚至属性都行.
    # rowproxy[0] == rowproxy['id'] == rowproxy.id
    ans = result.fetchall() # 获取所有数据
    ans1 = pd.DataFrame(ans) # 将数据转成 DataFrame格式

    #  事务处理
    conn = engine.connect()
    conn.begin()
    try:
        dosomething(connection)
        conn.commit()
    except:   
        conn.rollback()  
    conn.close()  


        #%%  SQL-expressions in Python 方法
    meta = MetaData(bind=engine, reflect=True)
    table = meta.tables['stock_basics']
    result2 = list(engine.execute(table.select(table.c.pe < 2)))   # pe为stock_basics的一个列名

    #%% ORM 方法   表中要有主键
    engine.echo = True  # We want to see the SQL we're creating
    metadata = MetaData(engine)

    # The stock_basics table already exists, so no need to redefine it. Just
    # load it from the database using the "autoload" feature.
    users = Table('stock_basics', metadata, autoload=True)

    def run(stmt):
        rs = stmt.execute()
        for row in rs:
            print(row)

    # Most WHERE clauses can be constructed via normal comparisons
    s = users.select(users.c.code == '000001')
    run(s)
    s = users.select(users.c.pe < 1)  # pe为stock_basics的一个列名
    rs = s.execute().fetchall()
    ans2 = pd.DataFrame(rs)    #将结果转换成 DataFrame格式

    # Python keywords like "and", "or", and "not" can't be overloaded, so
    # SQLAlchemy uses functions instead
    s = users.select(and_(users.c.age < 40, users.c.name != 'Mary'))
    s = users.select(or_(users.c.age < 40, users.c.name != 'Mary'))
    s = users.select(not_(users.c.name == 'Susan'))

    # Or you could use &, | and ~ -- but watch out for priority!
    s = users.select((users.c.age < 40) & (users.c.name != 'Mary'))  #最好添加(),注意优先级
    s = users.select((users.c.age < 40) | (users.c.name != 'Mary'))
    s = users.select(~(users.c.name == 'Susan'))

    # There's other functions too, such as "like", "startswith", "endswith"
    s = users.select(users.c.name.startswith('M'))
    s = users.select(users.c.name.like('%a%'))
    s = users.select(users.c.name.endswith('n'))

    # The "in" and "between" operations are also available
    s = users.select(users.c.age.between(30,39))
    # Extra underscore after "in" to avoid conflict with Python keyword
    s = users.select(users.c.name.in_('Mary', 'Susan'))

    # If you want to call an SQL function, use "func"
    s = users.select(func.substr(users.c.name, 2, 1) == 'a')

    # You don't have to call select() on a table; it's got a bare form
    s = select([users], users.c.name != 'Carl')
    s = select([users.c.name, users.c.age], users.c.name != 'Carl')

    # This can be handy for things like count()
    s = select([func.count(users.c.user_id)])
    # Here's how to do count(*)
    s = select([func.count("*")], from_obj=[users])
    #%%多表联查
    #    现在存在两个表
    users = Table('users', metadata,
        Column('user_id', Integer, primary_key=True),
        Column('name', String(40)),
        Column('age', Integer),)
    users.create()
    #    emails = Table('emails', metadata,
    #        Column('email_id', Integer, primary_key=True),
    #        Column('address', String),
    #        Column('user_id', Integer, ForeignKey('users.user_id')),)
    s = select([users, emails], emails.c.user_id == users.c.user_id)
    # 查询部分列
    s = select([users.c.name, emails.c.address], emails.c.user_id == users.c.user_id)
    #基于外键的李娜和查询
    s = join(users, emails).select()
    #使用 outerjoin 查询所有用户,不论是否有邮箱
    s = outerjoin(users, emails).select()

    #%% 将数据库中的对象映射到对象中
    users = Table('users', metadata, autoload=True)
    # These are the empty classes that will become our data classes
    class User(object):
        pass

    usermapper = mapper(User, users)
    session = DBSession()
    #  查询 -----------------
    query = session.query(User)
    print(query) # 显示SQL 语句
    print(query.statement) # 同上
    for user in query: # 遍历时查询
        print(user.name)
    print(query.all()) # 返回的是一个类似列表的对象
    print(query.first().name) # 记录不存在时,first() 会返回 None
    # print(query.one().name) # 不存在,或有多行记录时会抛出异常
    print(query.filter(User.id == 2).first().name)
    print(query.get(2).name) # 以主键获取,等效于上句
    print(query.filter('id = 2').first().name) # 支持字符串
    query2 = session.query(User.name)
    print(query2.all()) # 每行是个元组
    print(query2.limit(1).all()) # 最多返回 1 条记录
    print(query2.offset(1).all()) # 从第 2 条记录开始返回
    print(query2.order_by(User.name).all())
    print(query2.order_by('name').all())
    print(query2.order_by(User.name.desc()).all())
    print(query2.order_by('name desc').all())
    print(session.query(User.id).order_by(User.name.desc(), User.id).all())
    print(query2.filter(User.id == 1).scalar()) # 如果有记录,返回第一条记录的第一个元素
    print(session.query('id').select_from(User).filter('id = 1').scalar())
    print(query2.filter(User.id > 1, User.name != 'a').scalar()) # and
    query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
    query3 = query3.filter(User.name != 'a')
    print(query3.scalar())
    print(query2.filter(or_(User.id == 1, User.id == 2)).all()) # or
    print(query2.filter(User.id.in_((1, 2))).all()) # in
    query4 = session.query(User.id)
    print(query4.filter(User.name == None).scalar())
    print(query4.filter('name is null').scalar())
    print(query4.filter(not_(User.name == None)).all()) # not
    print(query4.filter(User.name != None).all())
    print(query4.count())
    print(session.query(func.count('*')).select_from(User).scalar())
    print(session.query(func.count('1')).select_from(User).scalar())
    print(session.query(func.count(User.id)).scalar())
    print(session.query(func.count('*')).filter(User.id > 0).scalar()) # filter() 中包含 User,因此不需要指定表
    print(session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1) # 可以用 limit() 限制 count() 的返回数
    print(session.query(func.sum(User.id)).scalar())
    print(session.query(func.now()).scalar()) # func 后可以跟任意函数名,只要该数据库支持
    print(session.query(func.current_timestamp()).scalar())
    print(session.query(func.md5(User.name)).filter(User.id == 1).scalar())
    # 修删------
    query.filter(User.id == 1).update({User.name: 'c'})
    user = query.get(1)
    print(user.name)
    user.name = 'd'
    session.flush() # 写数据库,但并不提交
    print(query.get(1).name)
    session.delete(user)
    session.flush()
    session.rollback()  # 回滚
    query.filter(User.id == 1).delete()
    session.commit()  #提交,保存到数据库
    print query.get(1)
    session.close()  # 关闭session


#and
print 'And:'
for u in query1.filter(User.id>80,User.age>25).all():
    print 'Id:\t',u.id,'\tname:\t',u.name,'\tage:\t',u.age

#or
print 'Or:'
for u in query1.filter(or_(User.id>88,User.age>29)).all():
    print 'Id:\t',u.id,'\tname:\t',u.name,'\tage:\t',u.age

#in
print 'in:'
for u in query1.filter(User.id.in_((79,81,90))).all():
    print 'Id:\t',u.id,'\tname:\t',u.name,'\tage:\t',u.age

#not in
print 'not in:'
for u in query1.filter(User.id.notin_((81,82,88))).all():
    print 'Id:\t',u.id,'\tname:\t',u.name,'\tage:\t',u.age

#not
print 'not:'
for u in query1.filter(not_(or_(User.name=='Tom',User.name==u'小花'))).all():
    print 'Id:\t',u.id,'\tname:\t',u.name,'\tage:\t',u.age
相关标签: sqlalchemy