欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python 多线程问题

程序员文章站 2022-01-28 21:53:47
...

引言

最近在学习Python爬虫,发现Scrapy框架是解决了Python多线程问题,不用自己去理解多线程,只需遵循它的框架来写就行了。但是在我用requests爬虫的时候,就要自己编写多线程的逻辑了,还是需要去查资料好好理解一下的。

个人理解

1. python的多线程常用的有两种:

(1) 正常的启动多个thread,每个线程跑一个任务,进程间用队列queue通信。具体实现如下:
def put_to_queue(arg1,arg2):
	......
	# queue有两种存数据的方法,一种是put(),这种方法在队列满的时,会一直等待,直到队列有
	# 空位置可以放数据。另一种是put_nowait(),这种方法会在队列满时,抛出异常。同样取数据
	# 也有get()和get_nowait(),get_nowait()会在队列空时,抛出异常。
	queue.put_nowait(data)
	
def get_from_queue(arg1):
	......
	while True:
		try:
			data = queue.get_nowait() # 循环取出数据,如果队列为空,则抛出异常
			......
			queue.task_done() # 操作完成,标记该数据已从队列取出
		except:
			print('queue is empty wait for a while')

if __name__ == '__main__':
	......
	put_thread = Thread(target=put_to_queue,args=(arg1,arg2)) # 一个生产者
	put_thread.setDaemon(True) # 守护进程,当主程序执行完,该线程自动结束
	put_thread.start()

	time.sleep(10) # 队列存数据线程先跑10秒,让队列里有初始数据量,防止队列里没数据,join()直接不阻塞了
	
	for i in range(3): # 多个消费者
		get_thread = Thread(target=get_from_queue,args=(arg1,)) # 注意单个参数的使用形式
		get_thread.setDaemon(True)
		get_thread.start()

	# join()将主程序阻塞,直到queue里的所有数据都标记task_done,并且queue为空时才放通
	# 因为这里的模式是生产者速度慢,消费者速度快,有可能导致
	# 还没生产出来就已经消费完了,导致所有数据都被标记了task_done,而且queue为空了,这就
	# 使join()直接放通了,解决办法先让存数据的线程跑一段时间,queue里有足够的初始数据量
	queue.join()
	......

相关的说明和注意事项都在注释中,这种多线程方式还是最适合生产者速度快,消费者速度慢,并且queue有一定的初始数据量的情况下,这样join()就不会误放通了。

(2) 使用multiprocessing.dummy中的Pool线程池
from multiprocessing.dummy import Pool as ThreadPool
	......
	pool = ThreadPool(10) # 线程池中最大线程数
	pool.map(download, datas) # download为每个线程中跑的任务,datas为所有需要处理的数据
	
    # 一行代码实现多线程,大致相当于下面的代码
    # for data in datas:
    #   Thread(target=download,args=(data,)).start()

这种使用线程池的多线程的优势在于编写简单,但是只适用于所有需要处理的数据都已经生成,而且相对于正常写线程来说不够灵活

2. 涉及到多线程的数据库存储问题,那么就必须提到锁。

多线程在读数据库时,会有资源的竞争问题。因为一个connection在同一时间只能被一个线程占用,其他线程没办法读取。
一个解决办法就是开启多个connection,每个线程使用一个,但是这样的话就太浪费资源,因为一个线程不可能总是占用connection读取数据库。
最好的办法就是多个线程共用一个connection。当一个线程读取数据后,将connection给下一个线程使用,这就需要锁来实现了。简单来说,锁就是在数据库读写时加个标志,当需要读取数据库时,先锁定,读取完成后再解锁,然后另一个线程要读取数据库之前,先判断锁的状态,如果是锁定状态,就先等待,等到解锁后再读取数据库。具体实现如下:

	def save_data_to_db(self, data):	
		while self.mutex == 1:  # connetion正在被其他线程使用,需要等待
            time.sleep(1)
            print('db connect is using...')
        self.mutex = 1  # 当没被锁定时,可以读取数据库。读取之前先锁定
        try:
            with self.db.cursor() as cursor:
                sql = '...'
                cursor.execute(sql)
                self.db.commit()
                self.mutex = 0  # 解锁
        except Exception as e:
            print('save_data_to_db fail,error:' + str(e))

例子

https://github.com/AmazingUU/sogou_word_spider
上面是一个用requests实现搜狗词库爬虫的例子,用到了多线程,可以当作例子参考。至于爬虫逻辑上的问题,在我的前一篇文章已经讲了,这里就不赘述了。项目觉得还可以的话,给个star哦

相关标签: 爬虫