python MySQL的API
程序员文章站
2022-05-29 11:34:41
...
1. 安装:
pip install mysqlclient
2. 连接
import MySQLdb
# 连接
def get_conn(self):
try:
self.conn = MySQLdb.connect(
host='127.0.0.1',
user='root',
passwd='',
db='news',
port=3308,
charset='utf8'
)
except MySQLdb.Error as e:
print('Error: %s' % e)
# 关闭连接
def close_conn(self):
try:
if self.conn:
# 关闭链接
self.conn.close()
except MySQLdb.Error as e:
print('Error: %s' % e)
3. 查询:
def get_one(self):
# 准备SQL
sql = 'SELECT * FROM `news` WHERE `types` = %s ORDER BY `created_at` DESC;'
# 找到cursor
cursor = self.conn.cursor()
# 执行SQL
cursor.execute(sql, ('百家', ))
# print(dir(cursor))
# 拿到结果
rest = dict(zip([k[0] for k in cursor.description], cursor.fetchone()))
# 处理数据
# 关闭cursor/链接
cursor.close()
self.close_conn()
return rest
def get_more(self):
# 准备SQL
sql = 'SELECT * FROM `news` WHERE `types` = %s ORDER BY `created_at` DESC;'
# 找到cursor
cursor = self.conn.cursor()
# 执行SQL
cursor.execute(sql, ('百家', ))
# print(dir(cursor))
# 拿到结果
rest = [dict(zip([k[0] for k in cursor.description], row))
for row in cursor.fetchall()]
# 处理数据
# 关闭cursor/链接
cursor.close()
self.close_conn()
return rest
4. 插入数据:
def add_one(self):
''' 插入数据 '''
# 受影响的行数
row_count = 0
try:
# 准备SQL
sql = (
"INSERT INTO `news`(`title`,`image`, `content`, `types`, `is_valid`) VALUE"
"(%s, %s, %s, %s, %s);"
)
# 获取链接和cursor
cursor = self.conn.cursor()
# 执行sql
# 提交数据到数据库
cursor.execute(sql, ('标题11', '/static/img/news/01.png', '新闻内容5', '推荐', 1))
# cursor.execute(sql, ('标题12', '/static/img/news/01.png', '新闻内容6', '推荐', 1))
# 提交事务
self.conn.commit()
except :
print('error')
# 回滚事务
self.conn.rollback()
# 执行最后一条SQL影响的行数
row_count = cursor.rowcount
# 关闭cursor和链接
cursor.close()
self.close_conn()
# row_count > 0 表示成功
return row_count > 0
5. 在 scrapy 的 pipeline 使用 twisted 异步插入数据到 MySQL 中
使用 twisted.enterprise.adbapi.ConnectionPool
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool #接收
@classmethod
def from_settings(cls, settings):
dbparms = dict( # 将它们转为dict[],才可以将经传给connectionpool函数
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)#
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
# 这个函数的第一个参数就是我们自己定义的函数,它会将这个函数变成异步的,我们就是要在这里写插入的函数
# 第二个参数就是item,即我们要插入的数据
query.addErrback(self.handle_error, item, spider) #处理异常
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
#执行具体的插入
#根据不同的item 构建不同的sql语句并插入到mysql中
# insert_sql, params = item.get_insert_sql()
# cursor.execute(insert_sql, params)
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums)
"""
cursor.execute(insert_sql,(item["title"], item["url"], item["create_date"], item["fav_nums"]))
上一篇: windows10安装Prophet
下一篇: django.core.exceptions.ImproperlyConfigured: mysqlclient 1.3.13 or newer is required; you have 0.9.3
推荐阅读
-
Mysql 8.0版本导出的sql文件在Mysql 5.5中运行出错
-
用Python复制文件的9个方法
-
搞清楚 Python 的迭代器、可迭代对象、生成器
-
让Python给你讲笑话、段子,一个有趣的Python案例
-
Python文字转换语音,让你的文字会「说话」,抠脚大汉秒变撒娇萌妹
-
天气变冷了,用Python给你的爱人制作一个天气提醒小助手
-
Java日期时间API系列8-----Jdk8中java.time包中的新的日期时间API类的LocalDate源码分析
-
开始你的api:NetApiStarter
-
有哪些你不知道的Python小工具
-
Python入门(一个有趣的画图例子实战)你肯定不会