目录
协程
引子
本章的主题是基于单线程来实现并发,即只用一个线程(很明显可以使用的CPU只有1个)情况下实现并发,为此我们需要回顾下并发的本质:切换+保存状态。
CPU正在运行一个任务,会在两种情况下切走去执行其他任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它。
其中第二种情况并不能提升效率,只是为了让CPU能够雨露均沾,实现看起来所有任务都被‘同时’执行的效果,如果多个任务全是纯计算的,这种切换反而会降低效率。为此我们用yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:
1.yield可以保存状态,yield的状态保存和操作系统的保存线程状态很像。但是yield是代码级别控制的,更轻量级2.send可以把一个函数的结果传递到另一个函数,以实现单线程程序之间的切换
单纯的切换反而会影响效率
# 串行执行import timedef consumer(res): ''' 任务1:接收数据,处理数据 :return: ''' passdef producer(): ''' 任务2:产生数据 :return: ''' res = [] for i in range(100000000): res.append(i) return res# 串行执行start = time.time()res = producer()consumer(res)stop = time.time()print('time,',stop-start) # time, 12.530351638793945
让我们基于yield来实现并发执行
# yield并发执行import timedef consumer(): ''' 任务1:接收数据,处理数据 :return: ''' while True: x = yielddef producer(): ''' 任务2:产生数据 :return: ''' g = consumer() next(g) for i in range(100000000): g.send(i)start = time.time()# 基于yield保存状态,实现两个任务之间的来回切换,即并发的效果# PS:如果把每个任务都加上打印,那么明显的看到两个任务是你一次我一次,即并发执行的producer()stop = time.time()print('time:',stop-start) # time: 10.936712741851807
第一种情况的切换,在任务一遇到IO操作时吗,切换到任务二运行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。
yield并不能实现遇到IO切换
# 当yield遇到IO阻塞时# 当yield遇到IO阻塞时import timedef consumer(): ''' 处理数据 :return: ''' while True: x = yield()def procuder(): ''' 产生数据 :return: ''' g = consumer() next(g) for i in range(100000000): g.send(i) time.sleep(0.01)start_time = time.time()procuder()stop_time = time.time()print(stop_time-start_time)
对于单线程下,我们不可避免程序中出现IO操作,但如果我们能在自己的程序中(即用户级别程序,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到IO阻塞时就切换到另一个任务去计算,这样就保证了该线程能够最大限度的处于就绪状态,即随时可以被CPU执行的状态,相当于我们在用户级别将自己的IO操作最大限度的隐藏了起来,从而可以迷惑操作系统,让其看到:该线程好像一直在计算,IO比较少,从而更多的将CPU执行权限分配给我们的线程。
协程的本质上就是在单线程下,由用户自己去控制一个任务遇到IO阻塞就切换另一个任务去执行,以此来提高工作效率,为了实现它,我们需要寻找一种可以同时满足以下条件的解决方案:1.可以控制多个任务之间的切换,切换之前将任务的状态保存起来,以便重新运作时,可以基于暂停的位置继续执行2.可以检测IO操作,在遇到IO操作的情况下才发生协程
协程介绍
协程:是单线程下的并发,又称微线程,一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到IO或执行时间过长就会被迫交出CPU执行权限,切换其他线程执行)2.单线程内开启协程,一旦遇到IO,就会从应用级别(而非操作系统)控制切换,以此来提升效率(非IO操作的切换与提升效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
1.协程的开销会更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级2.单线程内就可以实现并发的效果,最大限度的利用CPU
缺点如下:
1.协程的本质是单线程下,所以无法利用多核优势,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程2.协程指的是单个线程,因此当协程出现阻塞,将会阻塞整个线程
总结协程特点
1.必须在只有一个单线程内实现并发2.修改共享数据不需要加锁3.用户程序里自己保存多个控制流的上下文4.一个协程遇到IO操作会自动切换到其他协程
greenlet模块
如果我们在单个线程内20个任务,想要实现多个任务之间的切换,使用yield生成器的方式过于麻烦,(需要先得到初始化一次的生成器,然后再调用send),而使用greenlet模块就可以非常简单的实现20个任务直接的切换。
安装: pip3 install greenlet# 使用greenlet模块from greenlet import greenletdef eat(name): print('%s is 1 eat'%name) g2.switch('肖亚飞') print('%s is 2 eat'%name) g2.switch()def run(name): print('%s is 1 run'%name) g1.switch() print('%s is 2 run'%name)g1 = greenlet(eat)g2 = greenlet(run)g1.switch('肖亚飞') # 需要在第一次执行的时候传入参数,后来就不用了# 运行结果肖亚飞 is 1 eat肖亚飞 is 1 run肖亚飞 is 2 eat肖亚飞 is 2 run
单纯的切换(在没有IO情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度,以下同一段代码的两种执行方式:
# 顺序执行import timedef f1(): res = 1 for i in range(100000000): res += 1def f2(): res = 1 for i in range(100000000): res *= istart = time.time()f1()f2()stop = time.time()print(stop-start) # 9.733676195144653# 切换执行from greenlet import greenletimport timedef f1(): res = 1 for i in range(100000000): res += i g2.switch()def f2(): res = 1 for i in range(100000000): res *= 1 g1.switch()start = time.time()g1 = greenlet(f1)g2 = greenlet(f2)g1.switch()stop = time.time()print(stop-start) # 53.54862999916077
greenlet虽然提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2 ,如此才能提高效率,这就用到了gevent模块了。
gevent模块
安装: pip3 install gevent
gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是greenlet,它是以 C扩展模块形式接入python的轻量级协程,greenlet全部运行在主程序操作系统进程的内部,但它们都被协作式调度。
# 用法g1 = gevent.spawn(func,1,2,x=3)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以使位置参数或者关键字参数,都是传递给函数eat的g2 = gevent.spawn(func2)g1.join() # 等待g1结束g2.joiin() # 等待g2结束或者上述两步合作成一步:gevent.joinall([g1,g2])g1.value # 拿到func1的值
遇到IO阻塞时会自动切换任务
# 遇到IO时会自动切换任务import geventdef eat(name): print('%s eat 1 '%name) gevent.sleep(2) print('%s eat 2 '%name)def run(name): print('%s run 1 '%name) gevent.sleep(2) print('%s run 2 '%name)g1 = gevent.spawn(eat,'肖亚飞')g2 = gevent.spawn(eat,'大胖')g1.join()g2.join()print('主')# 运行结果肖亚飞 eat 1 大胖 eat 1 肖亚飞 eat 2 大胖 eat 2 主
上面的例子中,gevent.sleep(2)主要是模拟gevent碰到IO阻塞而切换,但是time.sleep(2)或者其他的阻塞,gevent是不识别的,所以需要在文件开始打一行补丁:
from gevent import monkey;monkey.patch_all()
这行补丁必须放在被打补丁者 的前面,如time,socket模块的前面,或者我们干脆可以理解成补丁需要放在文件的最开头。
import geventfrom gevent import monkey;monkey.patch_all()import timedef eat(name): print('%s eat 1 '%name) time.sleep(2) print('%s eat 2 '%name)def run(name): print('%s run 1 '%name) time.sleep(2) print('%s run 2 '%name)g1 = gevent.spawn(eat,'肖亚飞')g2 = gevent.spawn(eat,'大胖')g1.join()g2.join()print('主')# 运行结果肖亚飞 eat 1 大胖 eat 1 肖亚飞 eat 2 大胖 eat 2 主
我们可以使用threading.current_thread.getName()来查看每个g1和g2的名字,查看的结果为:Dummy Thread n,即假线程。