Python 多线程问题
引言
最近在学习Python爬虫,发现Scrapy框架是解决了Python多线程问题,不用自己去理解多线程,只需遵循它的框架来写就行了。但是在我用requests爬虫的时候,就要自己编写多线程的逻辑了,还是需要去查资料好好理解一下的。
个人理解
1. python的多线程常用的有两种:
(1) 正常的启动多个thread,每个线程跑一个任务,进程间用队列queue通信。具体实现如下:
def put_to_queue(arg1,arg2):
......
# queue有两种存数据的方法,一种是put(),这种方法在队列满的时,会一直等待,直到队列有
# 空位置可以放数据。另一种是put_nowait(),这种方法会在队列满时,抛出异常。同样取数据
# 也有get()和get_nowait(),get_nowait()会在队列空时,抛出异常。
queue.put_nowait(data)
def get_from_queue(arg1):
......
while True:
try:
data = queue.get_nowait() # 循环取出数据,如果队列为空,则抛出异常
......
queue.task_done() # 操作完成,标记该数据已从队列取出
except:
print('queue is empty wait for a while')
if __name__ == '__main__':
......
put_thread = Thread(target=put_to_queue,args=(arg1,arg2)) # 一个生产者
put_thread.setDaemon(True) # 守护进程,当主程序执行完,该线程自动结束
put_thread.start()
time.sleep(10) # 队列存数据线程先跑10秒,让队列里有初始数据量,防止队列里没数据,join()直接不阻塞了
for i in range(3): # 多个消费者
get_thread = Thread(target=get_from_queue,args=(arg1,)) # 注意单个参数的使用形式
get_thread.setDaemon(True)
get_thread.start()
# join()将主程序阻塞,直到queue里的所有数据都标记task_done,并且queue为空时才放通
# 因为这里的模式是生产者速度慢,消费者速度快,有可能导致
# 还没生产出来就已经消费完了,导致所有数据都被标记了task_done,而且queue为空了,这就
# 使join()直接放通了,解决办法先让存数据的线程跑一段时间,queue里有足够的初始数据量
queue.join()
......
相关的说明和注意事项都在注释中,这种多线程方式还是最适合生产者速度快,消费者速度慢,并且queue有一定的初始数据量的情况下,这样join()就不会误放通了。
(2) 使用multiprocessing.dummy中的Pool线程池
from multiprocessing.dummy import Pool as ThreadPool
......
pool = ThreadPool(10) # 线程池中最大线程数
pool.map(download, datas) # download为每个线程中跑的任务,datas为所有需要处理的数据
# 一行代码实现多线程,大致相当于下面的代码
# for data in datas:
# Thread(target=download,args=(data,)).start()
这种使用线程池的多线程的优势在于编写简单,但是只适用于所有需要处理的数据都已经生成,而且相对于正常写线程来说不够灵活
2. 涉及到多线程的数据库存储问题,那么就必须提到锁。
多线程在读数据库时,会有资源的竞争问题。因为一个connection在同一时间只能被一个线程占用,其他线程没办法读取。
一个解决办法就是开启多个connection,每个线程使用一个,但是这样的话就太浪费资源,因为一个线程不可能总是占用connection读取数据库。
最好的办法就是多个线程共用一个connection。当一个线程读取数据后,将connection给下一个线程使用,这就需要锁来实现了。简单来说,锁就是在数据库读写时加个标志,当需要读取数据库时,先锁定,读取完成后再解锁,然后另一个线程要读取数据库之前,先判断锁的状态,如果是锁定状态,就先等待,等到解锁后再读取数据库。具体实现如下:
def save_data_to_db(self, data):
while self.mutex == 1: # connetion正在被其他线程使用,需要等待
time.sleep(1)
print('db connect is using...')
self.mutex = 1 # 当没被锁定时,可以读取数据库。读取之前先锁定
try:
with self.db.cursor() as cursor:
sql = '...'
cursor.execute(sql)
self.db.commit()
self.mutex = 0 # 解锁
except Exception as e:
print('save_data_to_db fail,error:' + str(e))
例子
https://github.com/AmazingUU/sogou_word_spider
上面是一个用requests实现搜狗词库爬虫的例子,用到了多线程,可以当作例子参考。至于爬虫逻辑上的问题,在我的前一篇文章已经讲了,这里就不赘述了。项目觉得还可以的话,给个star哦
上一篇: PHP中面向对象之Static关键字详解
下一篇: 爬虫实战--爬取CPU天梯榜单