美文网首页大数据 爬虫Python AI SqlPython小哥哥
Python协程进阶,原来实现一个事件循环可以如此简单!

Python协程进阶,原来实现一个事件循环可以如此简单!

作者: 14e61d025165 | 来源:发表于2019-06-29 15:00 被阅读0次

引言

目前很多公司选择将python项目使用golang重构,很大一方面原因是因为golang的并发能力,golang自带的语法糖支持使并发编程变的相对简单,也更能充分的使用多核CPU的计算资源。

相应的,python长期受制于GIL,无法在多线程时使用多核CPU,所以一直以来在谈及python的缺陷时,性能总是无法回避的一个问题。当然,一些python著名的第三方组织也一直通过各种手段来改善python的并发性能,如twisted的异步模型使用事件驱动机制来提升python性能,著名的爬虫框架scrapy便是以twisted作为底层网络库来开发的,还有gevent,它使用greenlet在用户态完成栈和上下文切换来减少切换带来的性能损耗,同样还有著名的web协程框架tornado,他使用生成器来保存协程上下文及状态,使用原生的python语法实现了协程。但从python3.4开始python引入asyncio标准库,随后又在3.5引入async/await关键字,从根本上规范了python异步编程标准,使python异步编程逐渐流行起来。

关于什么是python协程,相信网上已经有了不少资料,但是只描述抽象的上层建筑难免会让人接受困难,本文希望可以通过从最简单的代码和逻辑,使用最基础的数据结构,从实现出发,带领大家理解什么是python协程。

首先需要补充一些基础知识

什么是生成器

我们都应该听说过迭代器,这在很多语言中都有类似的概念,简单的说,迭代器就是可以被迭代的对象,对其使用next操作可以返回一个元素,通常多次迭代后迭代器会中止,此时迭代器无法再使用。比如python中可以通过iter方法来将一个列表转换成迭代器:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">In [1]: lst = [1, 2, 3]

Python学习交流群:1004391443
In [2]: iterator = iter(lst)
In [3]: next(iterator)
Out[3]: 1
In [4]: next(iterator)
Out[4]: 2
In [5]: next(iterator)
Out[5]: 3
In [6]: next(iterator)


StopIteration Traceback (most recent call last)
<ipython-input-7-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:
</pre>

生成器可以看作是迭代器的子类,同时提供了比迭代器更强大的功能,python中,可以使用yield关键字使函数返回生成器对象。

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">In [8]: def fun():
...: yield 1
...: yield 2
...: yield 3
...:
In [9]: iterator = fun()
In [10]: next(iterator)
Out[10]: 1
In [11]: next(iterator)
Out[11]: 2
In [12]: next(iterator)
Out[12]: 3
In [13]: next(iterator)


StopIteration Traceback (most recent call last)
<ipython-input-13-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:
</pre>

每次next调用, fun函数只执行四分之一,如果我们拥有多个生成器对象, 按照一定规则 可控的对他分别调用next,生成器每次的暂停都保存了执行进度和内部状态。如果将这三个生成器理解成协程,那不正是我们熟悉的协程间的切换?

事件循环

所以,我们可以想象,现在有一个循环和一个生成器列表,每次循环,我们都将所有的生成器进行一次调用,所有生成器交替执行。如下:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">In [16]: gen_list = [fun(), fun(), fun()]
In [17]: while True:
...: for gen in gen_list:
...: print(next(gen))
...:
1
1
1
2
2
2
3
3
3


StopIteration Traceback (most recent call last)
<ipython-input-17-f2c1d557da29> in <module>()
1 while True:
2 for gen in gen_list:
----> 3 print(next(gen))
4
StopIteration:
</pre>

当然,我们还可以换一种写法,将生成器的每一步都当成是一次调用,把生成器包装成一个Handle对象,每次调用handle对象的call来完成生成器的调用,同时,我们还可以在调用完成后做一些准备来控制下一次调用的时间,将Handle对应放到一个scheduled_list里面:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def fun():
print("step1")
yield
print("step2")
yield
print("step3")
yield
scheduled_list = []
class Handle(object):
def init(self, gen):
self.gen = gen
def call(self):
next(self.gen)
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
if name == "main":
loop(fun(), fun(), fun())
</pre>

协程中的阻塞

在有了以上的基础后,我们来分析上面提到的切换规则,什么时候应该切换协程(生成器)?显而易见,当遇到阻塞时,我们才需要切换协程,以避免CPU的浪费。我将阻塞分为了以下三种:

  1. IO调用,如socket,file,pipe等。
  2. 人为制造的阻塞,如sleep。
  3. 异步调用。

假设,在我们的生成器内有一次socket调用,我们不知道它多久会ready,我们希望不等待它的返回,切换到其它协程运行,等其准备好之后再切换回来,该怎么办?

有同学可能会想到了,将socket注册到epoll上。如下:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done(sock.recv(1024))
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
class Future(object):
def init(self):
self.callbacks = []
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def init(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlersfd
poll.unregister(fd)
del handlers[fd]
if name == "main":
loop(fun(), fun(), fun())
</pre>

这一步引入一个新的对象Future,他用来代指未来即将发生的调用,通过epoll上注册的事件,触发了它的完成,完成之后执行了将handle对象放回scheduled_list, 可从而切回了协程。

那么,人为制造的阻塞我们怎么切换协程呢?这里,我们又引入了一个新的对象Timeout

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []

创建一个timeout_list

timeout_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
def sleep(sec):
future = Future()
timeout = Timeout(sec, future.set_done)
timeout_list.append(timeout)
return future
class Timeout(object):
def init(self, timeout, callback):
self.deadline = time.time() + timeout
self.callback = callback
def call(self):
self.callback(None)
class Future(object):
def init(self):
self.callbacks = []
self.value = None
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def init(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
deadline = time.time()
for timeout in timeout_list[:]:
if timeout.deadline <= deadline:
timeout.call()
timeout_list.remove(timeout)
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
for timeout in timeout_list:
wait_time = timeout.deadline - deadline
if wait_time <= 0:
wait_time = 0
default_timeout = min(default_timeout, wait_time)
if not scheduled_list and not timeout_list and not handlers:
break
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlersfd
poll.unregister(fd)
del handlers[fd]
if name == "main":
loop(fun(), fun(), fun())
</pre>

通过创建一个Timeout对象,我们在deadline时触发了其回调,使Future完成,从而完成了协程的切换。

由以上两点,我们可以大致观察出一个规律,创建Future对象,切出协程,在合适的时机(如socket ready或到达deadline/timeout)让他完成,切入协程,这才是协程切换的关键所在,由此,我们可以使用Future来管理各种异步调用。

如,我们在python编码时遇到了一个计算密集型的函数,由于python单进程无法利用多核,我们可以创建一个子进程来处理计算,同时关联到一个Future中:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
future = Future()
from multiprocessing import Process
Process(target=long_time_call, args=(future, )).start()
yield future
def long_time_call(future):
#...
future.set_done()
</pre>

当协程执行到第三步时,遇到了长时间运行的函数调用,我们创建了一个Future,关联到一个子进程中,并在子进程完成时设置future完成,在子进程完成之前,父进程已完成协程的切出,将执行权交给其它协程执行。

这个地方遗漏了一个细节,当没有其它协程可以执行时,epoll会被设置成超时时间=10000,因而陷入到长时间的睡眠中,而子进程完成后需要切入协程,但父进程已经被epoll阻塞掉,如何唤醒主进程继续执行该协程呢?业界通用的做法是,创建一个管道,在切出协程时让epoll监听读fd,子进程完成后,往管道中写入一个字符,epoll监听的读fd 马上变成ready,因此epoll解除阻塞,事件循环得以继续执行。

当然,异步调用不仅仅可以使用子进程,子线程、远程计算框架都可以通过这种方式执行。

讲到这里,大家应该基本明白了一个协程函数是如何工作的了。下表可帮助我们从线程的角度理解协程

<tt-image data-tteditor-tag="tteditorTag" contenteditable="false" class="syl1561791282744" data-render-status="finished" data-syl-blot="image" style="box-sizing: border-box; cursor: text; color: rgb(34, 34, 34); font-family: "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei", "WenQuanYi Micro Hei", "Helvetica Neue", Arial, sans-serif; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; white-space: pre-wrap; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-style: initial; text-decoration-color: initial; display: block;"> image

<input class="pgc-img-caption-ipt" placeholder="图片描述(最多50字)" value="" style="box-sizing: border-box; outline: 0px; color: rgb(102, 102, 102); position: absolute; left: 187.5px; transform: translateX(-50%); padding: 6px 7px; max-width: 100%; width: 375px; text-align: center; cursor: text; font-size: 12px; line-height: 1.5; background-color: rgb(255, 255, 255); background-image: none; border: 0px solid rgb(217, 217, 217); border-radius: 4px; transition: all 0.2s cubic-bezier(0.645, 0.045, 0.355, 1) 0s;"></tt-image>

上面的表格表述线程和协程的一一对应关系,最后一栏可能还需要举例解释一下:

我们知道一个线程执行过程中,会嵌套调用多个函数,如:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def foo():
print("in foo")

def bar():
print("in bar")
def fun():
bar()
foo()

if name == "main":
fun()
</pre>

那么生成器如何嵌套调用呢?python3.4之前,嵌套的生成器只能这么使用:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def foo():
print("in foo")
yield
def bar():
print("in bar")
yield
def fun():
for i in bar():
yield i
for i in foo():
yield i
if name == "main":
for i in fun():
pass
</pre>

python3.4之后引入了新的语法糖yield from,简化了调用方式:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def foo():
print("in foo")
yield
def bar():
print("in bar")
yield
def fun():
yield from bar()
yield from foo()

if name == "main":
for i in fun():
pass
</pre>

yield from可以驱动子生成器,来逐一返回子生成器中的值,将嵌套的生成器打平。值得一提的是,yield from才是await的真实身份。

让我们用最初的例子来编写一个嵌套了的子生成器函数的协程demo。我们将fun生成器抽离成2种阻塞操作,并封装好:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def read(sock):
future = Future()
def handler():
buf = sock.recv(1024)
future.set_done(buf)
add_handler(sock.fileno(), handler, 0x001)
yield future
return future.get_result()
def sleep(sec):
future = Future()
timeout = Timeout(sec, future.set_done)
timeout_list.append(timeout)
yield future
</pre>

有了这两个基础函数之后,我们就可以自由的编写我们协程了

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">def coroutine(num):
client = socket.socket()
client.connect(("", 1234))
print(f"coroutine_{num} start")
buffer = yield from read(client)
print(f"coroutine_{num} recv: ", buffer)
yield from sleep(3)
print(f"coroutine_{num} wake up ")
client.close()
if name == "main":
loop(coroutine(1), coroutine(2))
</pre>

我们创建了两个协程,其中调用了一次socket读和一个睡眠,让我们看一下执行效果:

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">coroutine_1 start
coroutine_2 start
coroutine_2 recv: b'test'
coroutine_1 recv: b'test'
coroutine_2 wake up
coroutine_1 wake up
</pre>

两个协程异步交替执行。

asyncio的使用

相信看完上面的例子之后,大家应该对python协程的实现有了初步的认识,那标准的python协程如何使用呢?

<pre spellcheck="false" style="box-sizing: border-box; margin: 5px 0px; padding: 5px 10px; border: 0px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-variant-numeric: inherit; font-variant-east-asian: inherit; font-weight: 400; font-stretch: inherit; font-size: 16px; line-height: inherit; font-family: inherit; vertical-align: baseline; cursor: text; counter-reset: list-1 0 list-2 0 list-3 0 list-4 0 list-5 0 list-6 0 list-7 0 list-8 0 list-9 0; background-color: rgb(240, 240, 240); border-radius: 3px; white-space: pre-wrap; color: rgb(34, 34, 34); letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">import socket
import asyncio
async def read(sock):
loop = asyncio.get_event_loop()
future = loop.create_future()
def handler():
buf = sock.recv(1024)
future.set_result(buf)
loop.remove_reader(sock.fileno())
loop.add_reader(sock.fileno(), handler)
await future
return future.result()
async def coroutine(num):
client = socket.socket()
client.connect(("", 1234))
print(f"coroutine_{num} start")
buffer = await read(client)
print(f"coroutine_{num} recv: ", buffer)
await asyncio.sleep(3)
print(f"coroutine_{num} wake up ")
client.close()
if name == "main":
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(coroutine(1), coroutine(2)))
</pre>

几乎和我们的实现一模一样,其中await取代了yield from, 协程显式使用async来声明。

python协程的应用

python协程优势

通过上述例子我们可以很容易看出,python协程具有以下特点:

  1. 超级轻量,不需要维护协程栈,所有的上下文执行状态都被维护在了生成器中。
  2. 切换自由,通过yield from(python3.5以后是await)随意切换协程,协程切换完全可控以至于几乎不用加锁。
  3. 并发能力强,并发上限理论上取决于IO多路复用可注册的文件描述符的极限。

缺点

  1. 还是只能用到单核。
  2. 由于协程切换是非抢占式的,所以如果业务是CPU密集型的,可能其它协程长时间得不到执行。

综上所述,在使用python的高并发场景下,python多进程+协程是最优的解决方案。

相关文章

网友评论

    本文标题:Python协程进阶,原来实现一个事件循环可以如此简单!

    本文链接:https://www.haomeiwen.com/subject/kpdpcctx.html