欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并行计算

程序员文章站 2022-03-07 11:29:54
...

Python有很多库可以支持并行计算。

>>> import threading
>>> def thread_hello():
        other = threading.Thread(target=thread_say_hello, args=())
        other.start()
        thread_say_hello()
>>> def thread_say_hello():
        print('hello from', threading.current_thread().name)
>>> thread_hello()
hello from Thread-1
hello from MainThread

>>> import multiprocessing
>>> def process_hello():
        other = multiprocessing.Process(target=process_say_hello, args=())
        other.start()
        process_say_hello()
>>> def process_say_hello():
        print('hello from', multiprocessing.current_process().name)
>>> process_hello()
hello from MainProcess
hello from Process-1

threadingmultiprocessing库有着类似的API,但是前者只是建立单个线程,后者对多进程封装得更完善,对多核CPU的支持更好。更多可阅读Python标准库08 多线程与同步 (threading包), Python标准库10 多进程初步 (multiprocessing包), Python多进程并发(multiprocessing)

threading模块使用线程,multiprocessing使用进程。其区别不同在于,线程使用同一内存空间,而进程分配有不同的内存空间。因此进程间难以共享对象。但两个线程则有可能同时改写同一内存空间。为防止出现冲突,可以使用GIL保证不会同时执行可能冲突的线程。
更多对比

下面是一个线程冲突的实例

import threading
from time import sleep

counter = [0]

def increment():
    count = counter[0]
    sleep(0) # try to force a switch to the other thread
    counter[0] = count + 1

other = threading.Thread(target=increment, args=())
other.start()
increment()
print('count is now: ', counter[0])

下面是执行过程:

Thread 0                    Thread 1
read counter[0]: 0
                            read counter[0]: 0
calculate 0 + 1: 1
write 1 -> counter[0]
                            calculate 0 + 1: 1
                            write 1 -> counter[0]

问题在于:尽管执行了两次加法,但结果仍然是:1

在Python中,最简单的保证数据同步的方法是使用queue模块的Queue类。

from queue import Queue

queue = Queue()

def synchronized_consume():
    while True:
        print('got an item:', queue.get())  # 得到对象
        queue.task_done()                       # 队列任务结束

def synchronized_produce():
    consumer = threading.Thread(target=synchronized_consume, args=())
    consumer.daemon = True
    consumer.start()
    for i in range(10):
        queue.put(i)           # 加入新对象
    queue.join()               # 确保所有队列任务结束后,退出

synchronized_produce()

如果上面这个办法因为某些原因做不到,那我们可以使用threading模块中的Lock类。

seen = set()
seen_lock = threading.Lock()

def already_seen(item):
    seen_lock.acquire()       # 在Lock类的
    result = True             # acquire方法
    if item not in seen:      # 和release方法
        seen.add(item)        # 之间的代码
        result = False        # 仅能同时被
    seen_lock.release()       # 一个线程访问
    return result

def already_seen(item):
    with seen_lock:
        if item not in seen:
            seen.add(item)
            return False
        return True

还有一个办法是threading模块中的Barrier类。

counters = [0, 0]
barrier = threading.Barrier(2)

def count(thread_num, steps):
    for i in range(steps):
        other = counters[1 - thread_num]
        barrier.wait() # wait for reads to complete
        counters[thread_num] = other + 1
        barrier.wait() # wait for writes to complete

def threaded_count(steps):
    other = threading.Thread(target=count, args=(1, steps))
    other.start()
    count(0, steps)
    print('counters:', counters)

threaded_count(10)

更多参考Python的多线程编程模块 threading 参考17.1. threading — Thread-based parallelism

防止共享数据错误读写的终极机制是完全避免并发地接触同一数据。进程的内存空间的独立性完全符合这一要求。为了解决进程之间的交流问题,multiprocessing模块特别提供了Pipe类。Pipe默认为两条通道,如果传入参数False则为一条通道。

def process_consume(in_pipe):
    while True:
        item = in_pipe.recv()  # 只有接收成功后才会继续执行
        if item is None:
            return
        print('got an item:', item)

def process_produce():
    pipe = multiprocessing.Pipe(False)
    consumer = multiprocessing.Process(target=process_consume, args=(pipe[0],))
    consumer.start()
    for i in range(10):
        pipe[1].send(i)        # 通过通道发送对象
    pipe[1].send(None) # done signal

process_produce()

在执行并发计算时,程序员往往会犯下错误:

  1. 同步不足(Under-synchronization):一些线程没有被同步
  2. 过度同步(Over-synchronization):某些本可以并发执行的线程,被串行化
  3. 死锁(Deadlock):被同步的进程相互等候对方完成某些步骤才进行下一步,导致程序锁死。一个栗子:
def deadlock(in_pipe, out_pipe):
    item = in_pipe.recv()
    print('got an item:', item)
    out_pipe.send(item + 1)

def create_deadlock():
    pipe = multiprocessing.Pipe()
    other = multiprocessing.Process(target=deadlock, args=(pipe[0], pipe[1]))
    other.start()
    deadlock(pipe[1], pipe[0])

create_deadlock()