欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

day10_进程、协程、异步IO、多路复用

程序员文章站 2022-05-14 19:31:34
...

进程

之前我们了解的线程,接下来我们学习多进程,进程之间是相互独立的,python是启动进程的时候,是启动的是原生进程。进程是没有GIL锁的,而且不存在锁的概念,进程之间的数据式不能共享的,而线程是可以的。
线程的使用场景:
1、IO操作:不占用cpu的操作,比如:从磁盘上读块数据,从网络读块数据,从内存上读块数据都算是io的操作。
2、计算是占用cpu的,比如:计算1+1。线程利用上下文切换,也是消耗资源的。如果大量计算,用多线程就不快了,线程之前的来回切换,运算不要用。
3、python的多线程不适合cpu的密集型操作的任务。但是,适合io密集型的任务,比如:socket_server 的使用。

进程的定义

用muliprocessing这个包中的Process来定义多进程,跟定义多线程差不多。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process#导入进程模块
import time

def run(name):
    time.sleep(2)
    print('hello',name)

if __name__ == '__main__':
    p_obj_list = list()#存放进程对象
    for i in range(10):#启动10个进程
        p=Process(target=run,args=('li{0}'.format(i),))#产生一个进程实例
        p.start()#启动进程
        p_obj_list.append(p)

        for p in p_obj_list:
            p.join()#等待结束进程

进程中嵌入线程

在进程中去嵌入线程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from  multiprocessing import Process
import time,threading

def thead_run(name):#定义线程的方法
    print("{0}:{1}".format(name,threading.get_ident()))

def run(name):
    time.sleep(2)
    print('hello',name)
    t= threading.Thread(target=thead_run,args=(name,))#嵌入线程
    t.start()#执行线程

if __name__ == '__main__':
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run,args=("li{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

父子进程

每个子进程都是由一个父进程启动的,即便是我们这种程序也是有一个父进程的

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:',__name__)
    print('parent process:',os.getppid())
    print('proocess id:',os.getpid())
    print('\n\n')

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello',name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
#输出
main process line
module name: __main__
parent process: 20987
proocess id: 21004



function f
module name: __main__
parent process: 21004
proocess id: 21006



hello bob

在Linux上执行这个父进程就是terminal自己,init 0所有的进程都是它启动的。


进程间数据交互

我们知道不同进程之间内存是不共享的,要想实现两个进程间的通信,咋办呢?我们用什么知识来解决呐?今天就来说说进程间的通信。
之前我们说了queue,这个是线程queue,它的主要目的是两个线程之间的数据,一个是生产者,一个是消费者的模型,而且你必须是线程。只能在这个主线程内的其他线程访问,出了这个进程,你就不能被访问了。

进程访问queue
from multiprocessing import Process
import queue

def f(qq):
    qq.put([42, None, 'hello'])

if __name__ == '__main__':
    q = queue.Queue()  #把这个q传给了子进程
    p = Process(target=f, args=(q,))  #子进程访问父进程的q
    p.start()
    print(q.get())
    p.join()

报错,进程类型错误。

线程访问queue
import queue,threading


def f(qq):
    qq.put([42, None, 'hello'])


if __name__ == '__main__':
    q = queue.Queue()  # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))  # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()
#输出
[42, None, 'hello']

很明显是可以的。由此得出,线程queue传给子进程是不可以的,你以为传给它了,其实传不了,如果你想传的话,必须是进程Queue。

Queue

这个Queue是用于进程之间的数据通信,使用方法跟threading里的queue差不多。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process,Queue


def f(qq):
    qq.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()  # 把这个q传给了子进程
    p = Process(target=f, args=(q,))  # 子进程访问父进程的q
    p.start()
    print(q.get())
    p.join()
#输出
[42, None, 'hello']

父进程的q是怎么传给子进程的?
父进程相当于克隆一个Q,把自己的Q克隆了一份交给子进程,子进程这个时候往Q里面放了一份数据,然后父进程又能实际的获取到。但是你克隆了一份是不是就和父进程没有关系了,为什么还能联系在一起呢?但是实际上:等于这两个Q里面的数据又把它序列化了,序列化到一个中间的地方,类似于翻译,然后反序列化给这个父进程这边来了,其实这两个Q就是通过pickle来序列化的,不是一个真正的Q。
小结:
两个线程之间可以修改一个数据,不加锁,可能就会出错。现在进程中的Queue,是实现了数据的传递,不是在修改同一份数据,只是实现一个进程的数据传给了另外一个进程。


进程间数据交互及共享

之前我们讲述了进程之间的是通过进程中的Queue,来进行数据共享的,其实还有一种方式实现数据共享,那就是管道,pipe,以及数据共享manger。

数据通信

1、Pipe()函数
管道函数会返回由管道双方连接的一组连接对象,该管道默认是双向的(双向的)。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])  #发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn=Pipe()#管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f,args=(child_conn,))
    p.start()
    print(parent_conn.recv())#接收子进程的消息
    p.join()

2、接收多次和发送多次
上面我们只看到发送一次,接收一次,我们现在来看看,父进程和子进程都可以发送和接收。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])  #发送消息给父进程
    conn.send('li ya nan niu bi')
    print(conn.recv())#接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn=Pipe()#管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f,args=(child_conn,))
    p.start()
    print(parent_conn.recv())#接收子进程的消息
    print(parent_conn.recv())#接收子进程的消息2次
    parent_conn.send('父亲发给你的')

    p.join()
#输出
[42, None, 'hello']
li ya nan niu bi
父亲发给你的

如果子进程如果向父进程发送两次,但是父进程接收三次,那么会一直卡在那边。

共享

1、manger
manger可以完成数据间的共享

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process,Manager
import os

def f(d,l):
    d[os.getpid()]=os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':

    with Manager() as manager:
        d = manager.dict()#声明一个字典,注意,这个字典是用manger声明的,不是用dict()声明的
        l = manager.list(range(5))#声明一个列表
        p_list=[]
        for i in range(10):
            p = Process(target=f,args=(d,l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典,就是dict(),这个时候生产的对象就是d。
这边要不要加锁?
答案:不用加锁,因为这个manger已经帮你加锁了,它就默认不允许两个进程同时修改一份数据。两个进程没有办法同时修改一份数据,进程之间是独立的,它自己也要加锁,因为它把自己的东西同时copy好几份,跟刚刚的那个Queue一样,copy10个字典最终合成一个字典。


进程池的使用

进程同步

1、进程锁
通过multiprocessing中的Lock模块来实现进程锁。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Process,Lock

def f(l,i):
    l.acquire()#加锁
    try:
        print('hello world',i)
    finally:
        l.release()#释放锁

if __name__ == '__main__':
    lock= Lock()#定义锁
    for num in range(10):
        Process(target=f,args=(lock,num,)).start()#把锁传入进程中

有个疑问,就是进程中不是相互独立的吗?为啥还要加锁呐?
虽然每个进程都是独立运行的,但是问题来了,它们共享一块屏幕。这个锁存在的意义就是屏幕共享。如果也想抱着打印数据,而我想也想打印数据的情况,就有可能乱套了,然后通过这个锁来控制,去打印的时候,这个屏幕只有我独占,导致屏幕不会乱。
代码中的name == “main“是干嘛用的?
它的作用是为了区分你是主动执行这个脚本还是从别的地方把它当做一个模块去调用。如果主动执行这个脚本,那么name == “main” 以下的代码就会执行,如果其他模块导入的,则在其他模块中不执行name == “main“以下的代码。这边一般在一个模块中测试用的,但是其他模块又不能执行这部分代码。

进程池

作用:防止启动太多的进程把系统干趴下,才有了进程池的限制,比如说,你起了100个进程,会变慢,起一个进程相当于克隆一个父进程,父进程占1G内存空间,100个进程就占101G的内存,所以进程池就是在同一时间允许多个进程在cpu上运行。
1、apply
这个说明是同步执行的,也就是串行执行的

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan


from multiprocessing import Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('in process',os.getpid())#打印进程号
    return i+100
if __name__ == '__main__':
    pool = Pool(processes=5) #设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=Foo,args=(i,))#同步执行挂起进程
    print('end')
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

注:这边的5个进程只是挂起了,并没有真正的去执行,但是执行的时候,只能执行这个5个进程。这个apply是同步的,也就是串行的,一般不用。
2、apply_async
异步执行,也就是并行执行。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan


from multiprocessing import Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('in process',os.getpid())#打印进程号
    return i+100
if __name__ == '__main__':
    pool = Pool(processes=5) #设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=Foo,args=(i,))#采用异步的方式,加进程池
    print('end')
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

注:最后的pool.join()不能省略,不然的话,这个pool的进程池就直接关闭了, 它不会等待执行的结果的。
3、异步下的回调函数
在计算机中,这个程序执行完毕之后,再回调过来执行这个Bar。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from multiprocessing import Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('in process',os.getpid())#打印子进程号
    return i+100

def Bar(arg):
    print('-->exec done:',arg,os.getpid())

if __name__ == '__main__':
    pool = Pool(processes=5)
    print('主进程',os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo,args=(i,),callback=Bar)#执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
#输出
主进程 44045
end
in process 44051
in process 44047
in process 44048
in process 44049
in process 44050
-->exec done: 100 44045
-->exec done: 101 44045
-->exec done: 102 44045
-->exec done: 103 44045
-->exec done: 104 44045
in process 44051
in process 44047
in process 44048
in process 44049
in process 44050
-->exec done: 105 44045
-->exec done: 106 44045
-->exec done: 107 44045
-->exec done: 108 44045
-->exec done: 109 44045

1、回调函数说明fun=Foo干不完就不执行bar函数,等Foo执行完就去执行Bar
2、这个回调函数是主进程去调用的,而不是每个子进程去调用的。
使用场景
回调函数的用处:
比如说你从各个机器上备份完毕,在回调函数中自动写一个脚本,说备份完毕。
回调函数是主进程调用的原因?
如果是子进程去调用这个回调函数,有多少个子进程就有多少个连接,如果是主进程的话,只需要一次长连接就可以了,这个效率就高了。


协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

1、无需线程上下文切换的开销
2、无需原子操作锁定及同步的开销
“原子操作(atomic operation)是不需要synchronized”,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
3、方便切换控制流,简化编程模型
4、高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
1、无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
2、进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。


yield实现协程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan
import time

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield  # yield设置生成器
        print("[{0}] is eating baozi {1}".format(name, new_baozi))


def producer():
    r = con.__next__()  # 调用生成器
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)  # 唤醒生成器,并且向生成器传值
        con2.send(n)
        time.sleep(1)
        print("\033[32m[producer]\033[0m is making baozi {0}".format(n))


if __name__ == '__main__':
    con = consumer("c1")  # 创建一个生成器c1
    con2 = consumer("c2")  # 创建一个生产器C2
    p = producer()

send有两个作用?
①唤醒生产器 ②给yield传一个值,就是yield接收到的这个值。这个说明yield在被唤醒的时候可以接收数据。
怎么实现我们的单线程实现并发的效果呢?
遇到IO操作就切换,IO比较耗时,协程之所以能处理大并发,就是IO操作会挤掉大量的时间。没有IO操作的话,整个程序只有cpu在运算了,因为cpu很快,所以你感觉是在并发执行的。
IO操作完成了,程序什么时候切回去?
IO操作一旦完成,我们就自动切回去。

手动实现切换IO

1、greenlet实现手动切换
通过自带方法swith去手动切换IO

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()#切换到test2
    print(34)
    gr2.switch()#切换到test2

def test2():
    print(56)
    gr1.switch()#切换到test1
    print(78)




gr1 = greenlet(test1)#启动一个协程
gr2 = greenlet(test2)#启动一个协程
gr1.switch()#切换到test1

执行的步骤:
day10_进程、协程、异步IO、多路复用
所以输出:

12
56
34
78

gr1.switch(),它现在不是遇到IO就切换,就是你手动切换,就像刚才所讲的yield,next一下,就是跟那个意思差不多,就是切换一下。


小结

1、cpu值认识线程,协程cpu是不认识的,是用户自己控制的,cpu根本都不知道它们的存在。
2、线程的上下文切换保存在cpu的寄存器中,但是协程拥有自己的寄存上下文和栈。
3、协程是串行的,无需锁。
符合协程的条件:
1、必须在只有一个单线程里实现并发
2、修改共享数据不需加锁
3、用户程序里自己保存多个控制流的上下文栈
4、一个协程遇到IO操作自动切换到其它协程


协程遇到IO操作自动切换

感觉确实用着比generator还简单了,但好像还没有解决一个问题,就是遇到IO操作,自动切换,对不对?所以我们接下来就说说如何遇到IO就切换,不得不提到一个模块Gevent。
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

Gevent

gevent是第三方库,需要pip install一下

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

import gevent

def foo():
    print('running in foo')
    gevent.sleep(2)#模仿io操作,一遇到io操作就切换
    print('Explicit context switch to foo again')

def bar():
    print("Explicit context to bar")
    gevent.sleep(1)
    print("Implicit context switch back to bar")

def fun3():
    print("running fun3")
    gevent.sleep(0)  # 虽然是0秒,但是会触发一次切换
    print("running fun3 again")

gevent.joinall(
    [
        gevent.spawn(foo),#生成协程
        gevent.spawn(bar),
        gevent.spawn(fun3)
    ]
)
#输出
running in foo
Explicit context to bar
running fun3
running fun3 again
Implicit context switch back to bar
Explicit context switch to foo again

遇到IO就切换了,整体程序还需要花2秒钟执行的,最长的就是2秒,如果是串行的话就是3秒。
逻辑图:
day10_进程、协程、异步IO、多路复用
当foo遇到sleep(2)的时候,切自动切换到bar函数,执行遇到sleep(1)的时候自动切换到fun3函数,遇到sleep(0)又自动切换到foo。这个时候sleep(2)还没有执行完毕,又切换到bar的sleep(1)这边,发现又没有执行完毕,就有执行fun3这边,发现sleep(0)执行完毕,则继续执行,然后又切换到foo,发现sleep(2)又没有执行完毕,就切换到bar的sleep(1)这边,发现执行完了,有切回到foo这边,执行完毕。
用途:
比如说你现在又50处IO,然后总共加起来串行的的话,要花100秒,但是50处IO最长的那个IO只花了5秒钟,那代表中你的这个程序就是协程最多5秒就执行完毕


协程gevent并发爬网页

刚刚只是在理论上讲述了gevent遇到io自动切换,下面我们就来实际操作一下,在实战过程中我们用协程大面积的爬虫,看看如何用gevent去实现并发的效果的
1、串行爬网页
我们先来看看串行效果的爬网页的代码,看看消耗多长时间

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from urllib import request#简单的爬虫模块,复杂的不用这个
import gevent,time


def f(url):
    print('GET:%s'%url)
    resp = request.urlopen(url)#request.urlopen()函数
    data = resp.read()#读取爬到的数据
    print('%d bytes received from %s.'%(len(data),url))

urls=[
    'https://www.mi.com/',
    'https://www.meizu.com/',
    'https://www.smartisan.com/'
]

time_start = time.time()

for url in urls:
    f(url)

print('同步时间',time.time()-time_start)
#输出
GET:https://www.mi.com/
299798 bytes received from https://www.mi.com/.
GET:https://www.meizu.com/
128522 bytes received from https://www.meizu.com/.
GET:https://www.smartisan.com/
3026 bytes received from https://www.smartisan.com/.
同步时间 0.8214950561523438

2、gevent协程爬虫
刚刚是串行的执行的,我们现在用gevent并发执行一下,看看效果。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from urllib import request#简单的爬虫模块,复杂的不用这个
import gevent,time

def f(url):
    print('GET:%s'%url)
    resp = request.urlopen(url)#request.urlopen()函数
    data = resp.read()#读取爬到的数据
    print('%d bytes received from %s.'%(len(data),url))
async_time_start = time.time()
gevent.joinall(#用gevent启动协程
    [
        gevent.spawn(f,'https://www.mi.com/'),#第二个值是传入参数,之前我们没有讲,因为前面没有传参
        gevent.spawn(f,'https://www.meizu.com/'),
        gevent.spawn(f,'https://www.smartisan.com/'),
    ]
)

print('异步时间',time.time()-async_time_start)
#输出
GET:https://www.mi.com/
299798 bytes received from https://www.mi.com/.
GET:https://www.meizu.com/
128522 bytes received from https://www.meizu.com/.
GET:https://www.smartisan.com/
3026 bytes received from https://www.smartisan.com/.
异步时间 1.0542449951171875

为啥我用了并发,执行的时间没有缩短,反而变的更长了呢?
其实urllib默认跟gevent是没有关系的。urllib现在默认,如果你要通过gevent来去调用,它就是阻塞,gevent现在检测不到urllib的IO操作。它都不知道urllib进行了IO操作,所以它都不会进行切换,所以它就串行了。所以这个urllib和我们之前学的socket交给gevent不好使,因为gevent它不知道你进行了IO操作,所以就会卡住。

并发爬网页

既然上面那种情况都不行,那怎么让gevent知道urllib正在进行IO操作呢?
答:打补丁,通过导入monkey,来打这个补丁,在程序中什么都不写,就添加一行monkey.patch()即可

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from urllib import request#简单的爬虫模块,复杂的不用这个
import gevent,time
from  gevent import monkey#导入monkey

monkey.patch_all()#把当前程序的所有的io操作给我单独的作上标记,且就执行这一句即可

def f(url):
    print('GET:%s'%url)
    resp = request.urlopen(url)#request.urlopen()函数
    data = resp.read()#读取爬到的数据
    print('%d bytes received from %s.'%(len(data),url))

urls=[
    'https://www.mi.com/',
    'https://www.meizu.com/',
    'https://www.smartisan.com/'
]

time_start = time.time()

for url in urls:
    f(url)

print('同步时间',time.time()-time_start)

async_time_start = time.time()
gevent.joinall(
    [
        gevent.spawn(f,'https://www.mi.com/'),
        gevent.spawn(f,'https://www.meizu.com/'),
        gevent.spawn(f,'https://www.smartisan.com/'),
    ]
)

print('异步时间',time.time()-async_time_start)
#输出
GET:https://www.mi.com/
299798 bytes received from https://www.mi.com/.
GET:https://www.meizu.com/
128522 bytes received from https://www.meizu.com/.
GET:https://www.smartisan.com/
3026 bytes received from https://www.smartisan.com/.
同步时间 1.1604030132293701
GET:https://www.mi.com/
GET:https://www.meizu.com/
GET:https://www.smartisan.com/
299798 bytes received from https://www.mi.com/.
3026 bytes received from https://www.smartisan.com/.
128522 bytes received from https://www.meizu.com/.
异步时间 0.7338526248931885

看到效果了吧,其实差距不大,还有一个原因就是网络的原因也有。总之这个是需要通过打补丁的。其实就是说通过打补丁来检测到它有urllib,它就把urllib里面所有涉及到的有可能进行IO操作的地方直接花在前面加一个标记,这个标记就相当于gevent.sleep(),所以把urllib变成一个一有阻塞,它就切换了。
注意了,gevent.sleep()是模拟IO操作的,标记的意思是,这边是IO操作,遇到阻塞就切换。


gevent实现单线程下的多socket并发

server端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

import sys,gevent,socket,time
from  gevent import socket,monkey
monkey.patch_all()

def server(port):
    s=socket.socket()
    s.bind(('0.0.0.0',port))
    s.listen(500)
    while True:
        cli,addr = s.accept()
        gevent.spawn(hadle_request,cli)#协程

def hadle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print('recv:',data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as e:
        print(e)
    finally:
        conn.close()
if __name__ == '__main__':
    server(6969)

client端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

import socket
HOST = 'localhost'
PORT = 6969
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"),encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    print('Received', repr(data))
s.close()

事件驱动介绍

通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式

图文详解

在UI编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?
方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:
1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
所以,该方式是非常不好的。
方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
day10_进程、协程、异步IO、多路复用


事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

day10_进程、协程、异步IO、多路复用
在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
1、程序中有许多任务,而且…
2、任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
3、在等待事件到来时,某些任务会阻塞。
当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。


阻塞IO, 非阻塞IO, 同步IO,异步IO介绍

1、我们之前讲了IO操作什么时候切换回来呢?
我们刚刚讲了回调函数,这个回调函数是当你的程序一遇到IO操作,再一切换,这个切换的时候,切换之前你等着IO操作完了再回来
2、IO 为什么不阻塞呐?
因为IO操作是用操作系统完成的,咋们用户读一个文件,你以为自己的程序打开一个文件,然后去把文件的内容读出来。其实不然,是你的操作系统的调度接口打开这个文件,然后把这个数据读会开,其实是操作系统负责IO的控制。
3、我怎么切换回来?
加一个回调函数,就是我去切换之前,调操作系统IO接口的时候,告诉操作系统,说你处理完了之后,调一下这个回调函数,这个回调函数就会通知我,通知我了就代表执行完了,我就回来把这个IO拿到了,所以就是通过这个事件驱动的方式。出现这个IO操作,我就注册这个事件,就是IO事件交给操作系统,操作系统内部有一个队列,处理完了吧结果返回给你,通知回调函数通知你。
day10_进程、协程、异步IO、多路复用


IO多路复用

1、概念
在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O
2、用户空间与内核空间
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
3、进程切换
为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。
从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。
总而言之就是很耗资源。
进程控制块(Processing Control Block),是操作系统核心中一种数据结构,主要表示进程状态。其作用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB通常是系统内存占用区中的一个连续存区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息
4、进程阻塞
正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
5、文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
6、缓存I/O
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。


小结

1、操作系统本身运行也需要内存空间,所以你8G内存,往往使用,达不到8G。
2、只有内核是操作系统本身访问的,其他的程序是不能直接访问内核的,只有通过操作系统访问。

socket好几次send都合在一块了,黏包了?

为了减少从内核态到用户态的数据来回的copy,因为如果你打开一个文件,你读到内存里,你以为是直接读到你的用户的内存里面,其实是先读到缓存里面,也就是内核的缓存里面,然后再由内核帮你把这份数据copy到用户的内存里面。就是为了避免这里的来回copy,耗资源,这样的话效率就高。

别人发了一个数据,你是怎么接收到的?

别人发来数据不是发到你用户的socket程序里面,而是先发到操作系统的IO接口里,然后操作系统IO接口,其实读到了内核空间里面,再把这份数据copy到你的用户的内存空间里面的,数据要从内核态copy到用户态。这里面是cpu调copy这个接口需要消耗cpu,最主要的是内存的开销。都是在内存中的,只是有一个内核空间和用户空间,数据需要从内核把这个数据转给你。
day10_进程、协程、异步IO、多路复用


阻塞IO, 非阻塞IO, 同步IO,异步IO介绍2

IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。


阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
day10_进程、协程、异步IO、多路复用
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。


非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

day10_进程、协程、异步IO、多路复用

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。


I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

day10_进程、协程、异步IO、多路复用
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。


异步 I/O(asynchronous IO)

linux下的asynchronous IO其实用得很少。先看一下它的流程:

day10_进程、协程、异步IO、多路复用
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。


小结

blocking和non-blocking的区别
调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
synchronous IO和asynchronous IO的区别
在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
–A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
–An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。
有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。
而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IO Model的比较如图所示—>
day10_进程、协程、异步IO、多路复用


select poll epoll IO多路复用介绍及实现

select 负责监控和检测很多个socket连接,与下面这个内核态到用户态没什么关系,它只是说监控连接有一个连接,然后其中100个连接0k了,它就会返回。

IO多路复用

IO多路复用中包括 select、pool、epoll,这些都属于同步,还不属于异步。

1、select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll

直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速**这个文件描述符,当进程调用epoll_wait()时便得到通知。

sellect、poll、epoll三者的区别表

day10_进程、协程、异步IO、多路复用


select IO多路复用代码实例

之前我们讲了select、poll、epoll的介绍,我们今天先来说说select的IO多路复用代码实现。我们先来根据之前图说说select的原理。
Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

elect实现非阻塞连接

通过select是非阻塞的,accept和recive都不阻塞了,没有值就会报错。就是说没有连接就不要让它走server.accept()这一步。让select帮助去检测这个100个连接。所以说只要有连接,有活动了,有数据才去accept。服务端代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan
import select,socket
server = socket.socket()
server.bind(("localhost",9000))
server.listen(1000)
server.setblocking(False)#设置为非阻塞
inputs = [server,]  #一开始只有自己的连接,所以一开始把自己的连接发到列表中
outputs=[]
while True:
    #exceptional表示如果inputs列表中出现异常,会输出到这个exceptional中
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)
    # print(readable,writeable,exceptional)
    for r in readable:
        if r is server:  #代表一个新连接
            conn,addr = r.accept()
            print("来了一个新连接",addr)
            inputs.append(conn) #因为这个新建立的连接还没有发数据过来,现在就接收的话程序就报错了
            #所以要想实现这个客户端发数据过来时server端能知道,就需要让让select再检测这个conn
        else:
            data = r.recv(1024)
            print("收到数据:",data)
            r.send(data)
            print("send done....")


#

select用法
select(rlist, wlist, xlist, timeout=None)
-rlist:读列表
-wlist:写列表
-xlist:异常列表
select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select()

#读取连接的列表
inputs = [server,]
#写入连接的列表
outputs = []

这个inputs和outputs是什么鬼?
打个比方,就是想让内核帮我去检测100个连接,你传给它一个列表,就是需要检测的列表,就是这个inputs,就是说你想有多少检测的连接放在这个列表里面,然后交给select,就相当于交给内核了。
接下来我们看看上述代码中这行代码的用法:

readable,writeable,exceptional = select.select(inputs,outputs,inputs)

select中inputs,outputs这个好理解,那么第三个为什么也是inputs?
答:这个的意思是如果我inputs列表中有100个连接,这100个连接中有5个断了,出现了异常,就会输入到exceptional里面去,但是这个100个连接存在inputs里面,所以传入inputs。

nputs列表解释

我们知道用select这种方式,服务端收不到数据式不阻塞的,所以只有当客户端发数据的时候,内核通知你收数据,再把它监测列表中的连接,下一次循坏的时候,如果客户端判断这个连接活了,说明就有数据了。

for r in readable:
        if r is server:  #代表一个新连接
            conn,addr = r.accept()
            print("来了一个新连接",addr)
            inputs.append(conn) #因为这个新建立的连接还没有发数据过来,现在就接收的话程序就报错了
            #所以要想实现这个客户端发数据过来时server端能知道,就需要让让select再检测这个conn

代码中的这几行说明,如果是新连接的客户端,就把它加入到inputs列表中,交给select去循环监测。
但是这边有一个问题,我如果发现是新的客户端连接,那么我的inputs列表中判断就是已经存在的连接还是server?
答:input中已经有一个server了,在加一个连接的话就是inputs=[server,conn],两个有一个返回,有100个活动,就有100个连接conn,但是select给我们作了封装,inputs返回的就是一个,如果是conn活跃了,返回的就是conn,底层返回的还是两个,一个是server,一个是conn。但是select内部已经帮我们循环了一遍了,所以返回的这个时候就是conn,用户有可能是server,也有可能是conn,谁活动说不准。如果活动的是conn,说明客户发送数据过来了,你接收数据就行了,如果是server,代表又来了一个新连接,所以我来一个新连接,我就再建立一个新连接,把这个连接放到这个inputs列表中,如果返回的是conn,我就直接接数据。


逻辑图

day10_进程、协程、异步IO、多路复用

IO多路复用升级版

outputs的功能就是你发给select是什么链接,下次就给你这些链接,这些链接输出到writeable中。

outputs = []

readable,writeable,exceptional = select.select(inputs,outputs,inputs)

把链接发到outputs中,下一次select的时候,outputs就会把它返回,然后我就可以把它发回去。

升级版代码

就是我不想立刻发给客户端,需要等一会再发给客户端。那我要等到说明时候呐?我先扔到队列中,一会再发,每一个对了连接单独搞一个队列,放到队列中然后实现下一次循环。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

import select, socket, queue

server = socket.socket()
server.bind(("localhost", 9000))
server.listen(1000)
server.setblocking(False)  # 设置为非阻塞
msg_dic = dict()  # 生成一个队列字典
inputs = [server, ]  # 一开始只有自己的连接,所以一开始把自己的连接发到列表中
outputs = []
while True:
    # exceptional表示如果inputs列表中出现异常,会输出到这个exceptional中
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    # print(readable,writeable,exceptional)
    for r in readable:
        if r is server:  # 代表一个新连接
            conn, addr = r.accept()
            print("来了一个新连接", addr)
            inputs.append(conn)
            msg_dic[conn] = queue.Queue()  # 初始化一个队列,后面存要返回给这个客户端的数据
        else:
            data = r.recv(1024)
            print("收到数据:", data)
            msg_dic[r].put(data)
            outputs.append(r)
            # r.send(data)
            # print("send done....")
    for w in writeable:  # 要返回给客户端的链接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)  # 返回给客户端的源数据
        outputs.remove(w)  # 确保下次循环的时候writeable,不返回这个已经处理完的这个连接了

    for e in exceptional:  # 处理异常的连接
        if e in outputs:  # 因为e不一定在outputs,所以先要判断
            outputs.remove(e)
        inputs.remove(e)  # 删除inputs中异常连接
        del msg_dic[e]  # 删除此连接对应的队列

exceptional表示当这个连接出现问题了,那我服务端就不需要检测它了,那就要把链接的实例从inputs里面,outputs里面全都删除掉

完整代码

server端:

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import select
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里

    for s in readable: #每个s就是一个socket

        if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
            #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
            #新连接进来了,接受这个连接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
            #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
            #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的

            message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送

        else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
            #客户端的数据过来了,在这接收
            data = s.recv(1024)
            if data:
                print("收到来自[%s]的数据:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
                if s not  in outputs:
                    outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端


            else:#如果收不到data代表什么呢? 代表客户端断开了呀
                print("客户端断开了",s)

                if s in outputs:
                    outputs.remove(s) #清理已断开的连接

                inputs.remove(s) #清理已断开的连接

                del message_queues[s] ##清理已断开的连接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

client端:

#_*_coding:utf-8_*_
__author__ = 'Alex Li'


import socket
import sys

messages = [ b'This is the message. ',
            b'It will be sent ',
            b'in parts.',
            ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )


epoll IO多路复用实现单线程支持上万并发代码实例

之前我们就讲了select的这种方式,使用的是轮询方式去监测客户端的连接,效率比较低下,我们今天来聊聊epoll的方式,这种效率更高,但是这种方式在Windows下不支持,在Linux是支持的,那就不得不说下面的一个模块selectors。

selectors模块

selectors 默认是epoll,如果找不到epoll的话,比如说windows操作系统,就会找select

selectors模块使用

selectors实现非阻塞连接
默认它是阻塞的,只要不阻塞了,就代表肯定有活动的数据,我就循环这个events。

import selectors,socket

sel = selectors.DefaultSelector()

def accept(sock,mask):
    "接收客户端信息实例"
    conn,addr = sock.accept()
    print("accepted",conn,'from',addr)
    conn.setblocking(False)
    sel.register(conn,selectors.EVENT_READ,read)  #新连接注册read回调函数

def read(conn,mask):
    "接收客户端的数据"
    data = conn.recv(1024)
    if data:
        print("echoing",repr(data),'to',conn)
        conn.send(data)
    else:
        print("closing",conn)
        sel.unregister(conn)
        conn.close()

server = socket.socket()
server.bind(('localhost',9999))
server.listen(500)
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept)  #注册事件,只要来一个连接就调accept这个函数,
#sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,]

while True:
    events = sel.select()  #这个select,看起来是select,有可能调用的是epoll,看你操作系统是Windows的还是Linux的
                          #默认阻塞,有活动连接就返回活动连接列表
    print("事件:",events)
    for key,mask in events:
        callback = key.data #相当于调accept了
        callback(key.fileobj,mask)  #key.fileobj=文件句柄
selectors用法描述

①定义一个对象

sel = selectors.DefaultSelector()

②注册一个事件
说明:注册事件,只要来一个连接就调accept这个函数,就相当于之前select的用法,sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是一样的。

sel.register(server,selectors.EVENT_READ,accept)

③循环事件
这边这个events输出的是什么?

while True:
    events = sel.select()  #这个select,看起来是select,有可能调用的是epoll,看你操作系统是Windows的还是Linux的
    print("事件:",events)
    for key,mask in events:
        callback = key.data #相当于调accept了
        callback(key.fileobj,mask)  #key.fileobj=文件句柄

打印events的结果:

事件: [(SelectorKey(fileobj=<socket.socket fd=276, family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=276, events=1,
data=<function accept at 0x000002722BB1D510>), 1)]
accepted <socket.socket fd=352, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM,
proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 63829)> from ('127.0.0.1', 63829)

事件: [(SelectorKey(fileobj=<socket.socket fd=352, family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1',
63829)>, fd=352, events=1, data=<function read at 0x000002722BB1D840>), 1)]
echoing b'ls' to <socket.socket fd=352, family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1',
63829)>

这样很容易明白了:

callback = key.data #第一次调用的是accept,第二次调用的是read
callback(key.fileobj,mask)  #key.fileobj=文件句柄

高并发的客户端代码

这个在Windows上试的时候是select,最好在Linux上试,它默认支持的是epoll

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan

import socket, sys

messages = [b'This is the message. ',
            b'It will be sent ',
            b'in parts.',
            ]
server_address = ('localhost', 9999)

# Create a TCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message))
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print('%s: received "%s"' % (s.getsockname(), data))
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname())
相关标签: 异步