欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python MySQL的API

程序员文章站 2022-05-29 11:34:41
...

1. 安装:

pip install mysqlclient

2. 连接

import MySQLdb

# 连接
def get_conn(self):
	try:
		self.conn = MySQLdb.connect(
			host='127.0.0.1',
			user='root',
			passwd='',
			db='news',
			port=3308,
			charset='utf8'
		)
	except MySQLdb.Error as e:
		print('Error: %s' % e)

# 关闭连接
def close_conn(self):
	try:
		if self.conn:
			# 关闭链接
			self.conn.close()
	except MySQLdb.Error as e:
		print('Error: %s' % e)	

3. 查询:

def get_one(self):
	# 准备SQL
	sql = 'SELECT * FROM `news` WHERE `types` = %s ORDER BY `created_at` DESC;'
	# 找到cursor
	cursor = self.conn.cursor()
	# 执行SQL
	cursor.execute(sql, ('百家', ))
	# print(dir(cursor))
	# 拿到结果
	rest = dict(zip([k[0] for k in cursor.description], cursor.fetchone()))
	# 处理数据
	# 关闭cursor/链接
	cursor.close()
	self.close_conn()
	return rest

def get_more(self):
	# 准备SQL
	sql = 'SELECT * FROM `news` WHERE `types` = %s ORDER BY `created_at` DESC;'
	# 找到cursor
	cursor = self.conn.cursor()
	# 执行SQL
	cursor.execute(sql, ('百家', ))
	# print(dir(cursor))
	# 拿到结果
	rest = [dict(zip([k[0] for k in cursor.description], row))
		for row in cursor.fetchall()]
	# 处理数据
	# 关闭cursor/链接
	cursor.close()
	self.close_conn()
	return rest

4. 插入数据:

def add_one(self):
	''' 插入数据 '''
	# 受影响的行数
	row_count = 0
	try:
		# 准备SQL
		sql = (
			"INSERT INTO `news`(`title`,`image`, `content`, `types`, `is_valid`) VALUE"
			"(%s, %s, %s, %s, %s);"
		)
		# 获取链接和cursor
		cursor = self.conn.cursor()
		# 执行sql
		# 提交数据到数据库
		cursor.execute(sql, ('标题11', '/static/img/news/01.png', '新闻内容5', '推荐', 1))
		# cursor.execute(sql, ('标题12', '/static/img/news/01.png', '新闻内容6', '推荐', 1))
		# 提交事务
		self.conn.commit()
	except :
		print('error')
		# 回滚事务
		self.conn.rollback()
	# 执行最后一条SQL影响的行数
	row_count = cursor.rowcount
	# 关闭cursor和链接
	cursor.close()
	self.close_conn()
	# row_count > 0 表示成功
	return row_count > 0

5. 在 scrapy 的 pipeline 使用 twisted 异步插入数据到 MySQL 中

使用 twisted.enterprise.adbapi.ConnectionPool

 

class MysqlTwistedPipline(object):
	def __init__(self, dbpool):
		self.dbpool = dbpool  #接收

	@classmethod 
	def from_settings(cls, settings):  
		dbparms = dict(  # 将它们转为dict[],才可以将经传给connectionpool函数
			host = settings["MYSQL_HOST"],
			db = settings["MYSQL_DBNAME"],
			user = settings["MYSQL_USER"],
			passwd = settings["MYSQL_PASSWORD"],
			charset='utf8',
			cursorclass=MySQLdb.cursors.DictCursor,
			use_unicode=True,
		)
		dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)#  
		return cls(dbpool) 

	def process_item(self, item, spider):
		#使用twisted将mysql插入变成异步执行
		query = self.dbpool.runInteraction(self.do_insert, item) 
		# 这个函数的第一个参数就是我们自己定义的函数,它会将这个函数变成异步的,我们就是要在这里写插入的函数
		# 第二个参数就是item,即我们要插入的数据
		query.addErrback(self.handle_error, item, spider) #处理异常

	def handle_error(self, failure, item, spider):
		#处理异步插入的异常
		print (failure)

	def do_insert(self, cursor, item):
		#执行具体的插入
		#根据不同的item 构建不同的sql语句并插入到mysql中
		# insert_sql, params = item.get_insert_sql()
		# cursor.execute(insert_sql, params)
		insert_sql = """
			insert into jobbole_article(title, url, create_date, fav_nums)
			VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums)
		"""
		cursor.execute(insert_sql,(item["title"], item["url"], item["create_date"], item["fav_nums"]))