欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python:sqlalchemy连接数据库

程序员文章站 2024-03-02 19:40:22
...

安装:

pip install sqlalchemy
# 安装数据库驱动:
pip install pymysql
pip install cx_oracle

举例:(在url后面加入?charset=utf8可以防止乱码)

from sqlalchemy import create_engine
engine=create_engine('mysql+pymysql://username:[email protected]:port/dbname', echo=True)  #echo=True 打印sql语句信息

create_engine接受一个url,格式为:

# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
# 常用的
engine = create_engine('sqlite:///:memory:', echo=True)     # sqlite内存
engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件
engine = create_engine("mysql+pymysql://username:[email protected]:port/dbname",echo=True) # mysql+pymysql
engine = create_engine('mssql+pymssql://username:[email protected]:port/dbname',echo=True) # mssql+pymssql
engine = create_engine('postgresql://scott:[email protected]:5432/dbname') # postgresql示例
engine = create_engine('oracle://scott:[email protected]:1521/sidname') # oracle
engine = create_engine('oracle+cx_oracle://scott:[email protected]') #pdb就可以用tns连接

简单demo:

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base 

engine = create_engine('oracle://spark:[email protected]',echo=True) #echo要求打印sql语句等调试信息
session_maker = sessionmaker(bind=engine)
session = session_maker()

Base = declarative_base()
#对应一张表
class Student(Base):  
    __tablename__ = 'STUDENT'
    id = Column('STUID', Integer, primary_key=True)
    name = Column('STUNAME', String(32), nullable=False)
    age = Column('STUAGE', Integer)
    def __repr__(self):
        return '<Student(id:%s, name:%s, age:%s)>' % (self.id, self.name, self.age)

Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。
queryObject = session.query(Student).order_by(Student.id.desc())
for ins in queryObject:
    print(ins.id, ins.name, ins.age)

'''
4 hey 24
3 lwtxxs 27
2 gyb 89
1 ns 23
'''

将查询结果映射为DataFrame:

import pandas as pd
df = pd.read_sql(session.query(Student).filter(Student.id > 1).statement, engine) 
print(df)

'''
   STUID STUNAME  STUAGE
0      4     hey      24
1      2     gyb      89
2      3  lwtxxs      27
'''

查询:

session的query方法除了可以接受Base子类对象作为参数外,还可以是字段,如:

query = session.query(Student.name, Student.age) # query为一个sqlalchemy.orm.query.Query对象
for stu_name, stu_age in query:
    print(stu_name, stu_age)

查询条件filter:

# = / like
query.filter(Student.name == 'wendy')
query.filter(Student.name.like('%ed%'))

# in
query.filter(Student.name.in_(['wendy', 'jack']))
query.filter(Student.name.in_(
        session.query(User.name).filter(User.name.like('%ed%'))
))
# not in
query.filter(~Student.name.in_(['ed', 'wendy', 'jack']))

# is null / is not null
query.filter(Student.name == None)
query.filter(Student.name.is_(None))
query.filter(Student.name != None)
query.filter(Student.name.isnot(None))

# and
from sqlalchemy import and_, or_
query.filter(and_(Student.name == 'ed', Student.age != 23))
query.filter(Student.name == 'ed', Student.age != 23)
query.filter(Student.name == 'ed').filter(Student.age != 23)
# or
query.filter(or_(Student.name == 'ed', Student.name == 'wendy'))

# match
query.filter(Student.name.match('wendy'))

Query的方法:

all()方法以列表形式返回结果集:

from sqlalchemy import or_, and_
queryObject = session.query(Student).filter(or_(Student.id == 1, Student.id == 2))
print(queryObject.all())    # [<Student(id:1, name:ns, age:23)>, <Student(id:2, name:gyb, age:89)>]
queryObject = session.query(Student.name).filter(or_(Student.id == 1, Student.id == 2))
print(queryObject.all())    # [('ns',), ('gyb',)]

first()方法返回单个结果。(若结果集为空则返回None

print(queryObject.first())  # ('ns',)

one()方法返回单个结果,与first()方法不同的是:当结果集中没有元素或有多于一个元素会抛出异常。
one_or_none()方法同one()一样,不同是结果集为空则返回None,为多个抛出异常。

查询数量:

from sqlalchemy import func
session.query(func.count(Student.id)).scalar() # SELECT count("STUDENT"."STUID") AS count_1 FROM "STUDENT"

分组:

session.query(func.count(Student.id), Student.name).group_by(Student.name).all()

嵌套SQL语句:

from sqlalchemy import text
query = session.query(Student.id, Student.name).filter(text('stuid>2'))
query = session.query('stuid', 'stuname', 'stuage').from_statement(\
text("select * from student where stuname=:stuname")).params(stuname='hey').all()    #[(4, 'hey', 24)]