編程學習網 > 編程語言 > Python > 基于協程的 Python 網絡庫 gevent 介紹教程!
2023
09-21

基于協程的 Python 網絡庫 gevent 介紹教程!


繼續Python協程方面的介紹,這次要講的是gevent,它是一個并發網絡庫。它的協程是基于greenlet的,并基于libev實現快速事件循環(Linux上是epoll,FreeBSD上是kqueue,Mac OS X上是select)。有了gevent,協程的使用將無比簡單,你根本無須像greenlet一樣顯式的切換,每當一個協程阻塞時,程序將自動調度,gevent處理了所有的底層細節。讓我們看個例子來感受下吧!


import gevent


def test1():

    print 12

    gevent.sleep(0)

    print 34


def test2():

    print 56

    gevent.sleep(0)

    print 78


gevent.joinall([

    gevent.spawn(test1),

    gevent.spawn(test2),

])



解釋下,”gevent.spawn()”方法會創建一個新的greenlet協程對象,并運行它?!眊event.joinall()”方法會等待所有傳入的greenlet協程運行結束后再退出,這個方法可以接受一個”timeout”參數來設置超時時間,單位是秒。運行上面的程序,執行順序如下:



先進入協程test1,打印12

遇到”gevent.sleep(0)”時,test1被阻塞,自動切換到協程test2,打印56

之后test2被阻塞,這時test1阻塞已結束,自動切換回test1,打印34

當test1運行完畢返回后,此時test2阻塞已結束,再自動切換回test2,打印78

所有協程執行完畢,程序退出



所以,程序運行下來的輸出就是:



12

56

34

78



注意,這里與上一篇greenlet中第一個例子運行的結果不一樣,greenlet一個協程運行完后,必須顯式切換,不然會返回其父協程。而在gevent中,一個協程運行完后,它會自動調度那些未完成的協程。



我們換一個更有意義的例子:



import gevent

import socket


urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=5)


print [job.value for job in jobs]



我們通過協程分別獲取三個網站的IP地址,由于打開遠程地址會引起IO阻塞,所以gevent會自動調度不同的協程。另外,我們可以通過協程對象的”value”屬性,來獲取協程函數的返回值。



猴子補丁 Monkey patching



細心的朋友們在運行上面例子時會發現,其實程序運行的時間同不用協程是一樣的,是三個網站打開時間的總和??墒抢碚撋蠀f程是非阻塞的,那運行時間應該等于最長的那個網站打開時間呀?其實這是因為Python標準庫里的socket是阻塞式的,DNS解析無法并發,包括像urllib庫也一樣,所以這種情況下用協程完全沒意義。那怎么辦?



一種方法是使用gevent下的socket模塊,我們可以通過”from gevent import socket”來導入。不過更常用的方法是使用猴子布?。∕onkey patching):



from gevent import monkey; monkey.patch_socket()

import gevent

import socket


urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=5)


print [job.value for job in jobs]



上述代碼的第一行就是對socket標準庫打上猴子補丁,此后socket標準庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標準庫也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法將所有標準庫都替換。



from gevent import monkey; monkey.patch_all()



使用猴子補丁褒貶不一,但是官網上還是建議使用”patch_all()”,而且在程序的第一行就執行。



獲取協程狀態



協程狀態有已啟動和已停止,分別可以用協程對象的”started”屬性和”ready()”方法來判斷。對于已停止的協程,可以用”successful()”方法來判斷其是否成功運行且沒拋異常。如果協程執行完有返回值,可以通過”value”屬性來獲取。另外,greenlet協程運行過程中發生的異常是不會被拋出到協程外的,因此需要用協程對象的”exception”屬性來獲取協程中的異常。下面的例子很好的演示了各種方法和屬性的使用。



#coding:utf8

import gevent


def win():

    return 'You win!'


def fail():

    raise Exception('You failed!')


winner = gevent.spawn(win)

loser = gevent.spawn(fail)


print winner.started # True

print loser.started  # True


# 在Greenlet中發生的異常,不會被拋到Greenlet外面。

# 控制臺會打出Stacktrace,但程序不會停止

try:

    gevent.joinall([winner, loser])

except Exception as e:

    # 這段永遠不會被執行

    print 'This will never be reached'


print winner.ready() # True

print loser.ready()  # True


print winner.value # 'You win!'

print loser.value  # None


print winner.successful() # True

print loser.successful()  # False


# 這里可以通過raise loser.exception 或 loser.get()

# 來將協程中的異常拋出

print loser.exception



協程運行超時



之前我們講過在”gevent.joinall()”方法中可以傳入timeout參數來設置超時,我們也可以在全局范圍內設置超時時間:



import gevent

from gevent import Timeout


timeout = Timeout(2)  # 2 seconds

timeout.start()


def wait():

    gevent.sleep(10)


try:

    gevent.spawn(wait).join()

except Timeout:

    print('Could not complete')



上例中,我們將超時設為2秒,此后所有協程的運行,如果超過兩秒就會拋出”Timeout”異常。我們也可以將超時設置在with語句內,這樣該設置只在with語句塊中有效:



with Timeout(1):

    gevent.sleep(10)



此外,我們可以指定超時所拋出的異常,來替換默認的”Timeout”異常。比如下例中超時就會拋出我們自定義的”TooLong”異常。



class TooLong(Exception):

    pass


with Timeout(1, TooLong):

    gevent.sleep(10)

協程間通訊

greenlet協程間的異步通訊可以使用事件(Event)對象。該對象的”wait()”方法可以阻塞當前協程,而”set()”方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒后設置evt事件,所有的waiter協程即被喚醒。

#coding:utf8

import gevent

from gevent.event import Event


evt = Event()


def setter():

    print 'Wait for me'

    gevent.sleep(3)  # 3秒后喚醒所有在evt上等待的協程

    print "Ok, I'm done"

    evt.set()  # 喚醒


def waiter():

    print "I'll wait for you"

    evt.wait()  # 等待

    print 'Finish waiting'


gevent.joinall([

    gevent.spawn(setter),

    gevent.spawn(waiter),

    gevent.spawn(waiter),

    gevent.spawn(waiter),

    gevent.spawn(waiter),

    gevent.spawn(waiter)

])

除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞消息。讓我們將上例中的setter和waiter作如下改動:

from gevent.event import AsyncResult

aevt = AsyncResult()


def setter():

    print 'Wait for me'

    gevent.sleep(3)  # 3秒后喚醒所有在evt上等待的協程

    print "Ok, I'm done"

    aevt.set('Hello!')  # 喚醒,并傳遞消息


def waiter():

    print("I'll wait for you")

    message = aevt.get()  # 等待,并在喚醒時獲取消息

    print 'Got wake up message: %s' % message

隊列 Queue

隊列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取隊列中的元素。gevent的隊列對象可以讓greenlet協程之間安全的訪問。運行下面的程序,你會看到3個消費者會分別消費隊列中的產品,且消費過的產品不會被另一個消費者再取到:

import gevent

from gevent.queue import Queue


products = Queue()


def consumer(name):

    while not products.empty():

        print '%s got product %s' % (name, products.get())

        gevent.sleep(0)


    print '%s Quit'


def producer():

    for i in xrange(1, 10):

        products.put(i)


gevent.joinall([

    gevent.spawn(producer),

    gevent.spawn(consumer, 'steve'),

    gevent.spawn(consumer, 'john'),

    gevent.spawn(consumer, 'nancy'),

])

put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調用get方法時隊列為空,則拋出”gevent.queue.Empty”異常。

信號量

信號量可以用來限制協程并發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取信號量,而release就是釋放。當所有信號量都已被獲取,那剩余的協程就只能等待任一協程釋放信號量后才能得以運行:

import gevent

from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker(n):

    sem.acquire()

    print('Worker %i acquired semaphore' % n)

    gevent.sleep(0)

    sem.release()

    print('Worker %i released semaphore' % n)


gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])

上面的例子中,我們初始化了”BoundedSemaphore”信號量,并將其個數定為2。所以同一個時間,只能有兩個worker協程被調度。程序運行后的結果如下:

Worker 0 acquired semaphore

Worker 1 acquired semaphore

Worker 0 released semaphore

Worker 1 released semaphore

Worker 2 acquired semaphore

Worker 3 acquired semaphore

Worker 2 released semaphore

Worker 3 released semaphore

Worker 4 acquired semaphore

Worker 4 released semaphore

Worker 5 acquired semaphore

Worker 5 released semaphore

如果信號量個數為1,那就等同于同步鎖。

協程本地變量

同線程類似,協程也有本地變量,也就是只在當前協程內可被訪問的變量:

import gevent

from gevent.local import local


data = local()


def f1():

    data.x = 1

    print data.x


def f2():

    try:

        print data.x

    except AttributeError:

        print 'x is not visible'


gevent.joinall([

    gevent.spawn(f1),

    gevent.spawn(f2)

])

通過將變量存放在local對象中,即可將其的作用域限制在當前協程內,當其他協程要訪問該變量時,就會拋出異常。不同協程間可以有重名的本地變量,而且互相不影響。因為協程本地變量的實現,就是將其存放在以的”greenlet.getcurrent()”的返回為鍵值的私有的命名空間內。

實際應用

講到這里,大家肯定很想看一個gevent的實際應用吧,這里有一個簡單的聊天室程序,基于Flask實現,大家可以參考下。

以上就是基于協程的 Python 網絡庫 gevent 介紹教程!的詳細內容,想要了解更多Python教程歡迎持續關注編程學習網。

掃碼二維碼 獲取免費視頻學習資料

Python編程學習

查 看2022高級編程視頻教程免費獲取