12.python process\coroutine\asynchronous IO

12.python process\coroutine\asynchronous IO


Multi-threading in Python cannot take advantage of multi-core, so if we want to make full use of the resources of multi-core CPU, we can only rely on multi-process

The multiprocessing module provides Process, Queue, Pipe, Lock, RLock, Event, Condition and other components, which have many similarities with the threading module

1. Create a process

from multiprocessing import Process
import time

def func(name):

if __name__ =='__main__':
    p = Process(target=func,args=('derek',))
    # p.join()

2. Inter-process communication

(1) Queue

The memory between different processes is not shared, in order to realize the data exchange between the two processes. There are two main forms of inter-process communication, queues and pipes.

from multiprocessing import Process, Queue #Queue is the process arrangement

def f(test):
    test.put('22') #Add data to the queue through the created child process, solid line parent-child process interaction

if __name__ =='__main__':
    q = Queue() #parent process

    p = Process(target=f, args=(q,)) #sub process


#The parent process clones q to the child process when creating the child process
#Through pickle serialization, deserialization, to achieve the interaction between the two processes

Get to: 11
Get to: 22

(2) Pipe

The  Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

from multiprocessing import Process, Pipe

def f(conn):
    print("from parent:",conn.recv())
    print("from parent:", conn.recv())

if __name__ =='__main__':
    parent_conn, child_conn = Pipe() #Generate pipe instance, you can send() and recv() to each other

    p = Process(target=f, args=(child_conn,))

    print(parent_conn.recv()) # prints "11"
    print(parent_conn.recv()) # prints "22"
    parent_conn.send("33") # parent send message to child


The processes are independent of each other. Queue and pipe only realize data interaction, but do not realize data sharing. Manager can realize data sharing between processes.

Manager also supports many operations in the process, such as Condition, Lock, Namespace, Queue, RLock, Semaphore, etc.

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] =os.getpid()

if __name__ =='__main__':
    with Manager() as manager:
        d = manager.dict() #{} #Generate a dictionary, which can be shared and passed between multiple processes

        l = manager.list(range(5)) #Generate a list, which can be shared and passed between multiple processes
        p_list = []
        for i in range(2):
            p = Process(target=f, args=(d, l))
        for res in p_list: #waiting for the result


from multiprocessing import Process, Lock

def f(l, i):
    print('hello world', i)

if __name__ =='__main__':
    lock = Lock()

    for num in range(100):
        Process(target=f, args=(lock, num)).start() #To pass the lock to the parameter of the function l
#lockPrevent chaos when printing on the screen

5. Process pool

A process sequence is maintained inside the process pool. When used, a process is obtained from the process pool. If there is no available process in the process pool sequence, the program will wait until there is a process available in the process pool.

There are several main methods in the process pool:

  1. apply: Take a process from the process pool and execute it
  2. apply_async: the asynchronous version of apply
  3. terminate: close the thread pool immediately
  4. join: The main process waits for the completion of all child processes, and must be after close or terminate
  5. close: Wait for the end of all processes before closing the thread pool
from multiprocessing import Process, Pool
import time
import os

def Foo(i):
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ =='__main__': #Multi-process, this sentence must be added (windows system)
    pool = Pool(processes=3) #Allow the process pool to put 3 processes at the same time
    print("Main Process",os.getpid())
    for i in range(10):       
        pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=callback, after executing Foo(), then execute Bar()
        # pool.apply(func=Foo, args=(i,)) #serial
    pool.join() #Close the process after the process in the process pool is executed. If you comment, then the program is closed directly. Must first close (), then join ()


1 Introduction

Coroutine: It is concurrency under single thread, also known as microthread, fiber. Coroutine is a kind of user-mode lightweight thread, that is, the coroutine has the user's own control and scheduling

The coroutine has its own register context and stack. When the coroutine scheduling switch, save the register context and stack to other places, and restore the previously saved register context and stack when switching back.

The coroutine can retain the state of the last call (that is, a specific combination of all local states), and each time the process is reentered, it is equivalent to entering the state of the last call

Advantages and disadvantages of using coroutines


  1. The switching overhead of the coroutine is smaller, which belongs to the program-level switching, and is more lightweight
  2. Concurrency can be achieved in a single thread, maximizing the use of CPU


  1. The essence of the coroutine is that it is single-threaded and cannot use multiple cores. It can be that one program opens multiple processes, each process opens multiple threads, and each thread opens the coroutine
  2. The coroutine refers to a single thread, so once the coroutine is blocked, the entire thread will be blocked


Greenlet is a coroutine module implemented in C. Compared with the yield that comes with python, it allows you to switch between arbitrary functions at will, without the need to declare this function as a generator.

Manual switch

from greenlet import greenlet

def test1():
    gr2.switch() #Switch to gr2 here, execute test2()
    gr2.switch() #Switch to the location of the last gr2 run

def test2():
    gr1.switch() #Switch to the location of the last gr1 run

gr1 = greenlet(test1) #Start a coroutine gr1
gr2 = greenlet(test2) #Start a coroutine gr2

gr1.switch() #Start running gr1


Gevent is a third-party library that can easily implement concurrent synchronous or asynchronous programming through gevent. The main mode used in gevent is Greenlet, which is a lightweight coroutine that connects to Python in the form of a C extension module.

(1) IO blocking automatic switching

import gevent

def foo():
    print('Running in foo')
    print('The longest blocking time, run last')

def bar():
    print('running in bar')
    print('foo() is still blocking, here is the second run')

def func3():
    print("running in func3")
    print("The other two are still running before IO blocking")

#Create a coroutine instance
    gevent.spawn(foo), #spawn,

#Automatic switch when encountering IO

Running in foo
running in bar
running in func3 
The other two are still running first in IO blocking
foo() is still blocking, here the second one runs
Longest blocking time, last run

Process finished with exit code 0

 Since the switch is automatically completed during IO operations, gevent needs to modify some standard libraries that come with Python. This process is completed by monkey patch at startup:

(2) Examples of crawlers:

from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all() #Function: Mark all the io operations of the current program individually

def f(url):
    print('GET: %s'% url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.'% (len(data), url))

#Synchronization time
urls = ['https://www.python.org/',
time_start = time.time()
for url in urls:
print("synchronization cost",time.time()-time_start)

#Below is the time spent asynchronously
async_time_start = time.time()
print("asynchronous cost",time.time()-async_time_start)

GET: https://www.python.org/
48954 bytes received from https://www.python.org/.
GET: https://www.yahoo.com/
491871 bytes received from https://www.yahoo.com/.
GET: https://github.com/
51595 bytes received from https://github.com/.
Synchronization cost 4.928282260894775
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
48954 bytes received from https://www.python.org/.
494958 bytes received from https://www.yahoo.com/.
51599 bytes received from https://github.com/.
Asynchronous cost 1.4920852184295654

IO multiplexing

Detailed explanation: http://www.cnblogs.com/alex3714/articles/5876749.html

selectors module

selectors is based on the select module to achieve IO multiplexing, calling statements selectors.DefaultSelector(), characterized by automatically selecting the best IO multiplexing mechanism according to the platform, the calling sequence: epoll> poll> select

Make a socket servers

import selectors
import socket
sel = selectors.DefaultSelector() # Automatically select the best IO multiplexing mechanism according to the platform

def accept(sock, mask):
    conn, addr = sock.accept() # Should be ready
    # print('accepted', conn,'from', addr,mask)
    conn.setblocking(False) #Set to non-blocking IO
    sel.register(conn, selectors.EVENT_READ, read)
                                         #New connection registration read callback function
                                         #Register conn and read function together, execute read function when conn changes

def read(conn, mask):
    data = conn.recv(1024) # Should be ready
    if data:
        print('echoing', repr(data),'to', conn)
        conn.send(data) # Hope it won't block
        print('closing', conn)

sock = socket.socket()
sock.bind(('localhost', 9999))
sock.setblocking(False) #Set to non-blocking IO
sel.register(sock, selectors.EVENT_READ, accept)
                                    # Register the sock and accept functions together, and execute the accept function when the sock changes

while True:
    events = sel.select() #default blocking, if there is an active connection, return the active connection list, monitor [(key1,mask1),(key2),(mask2)]

    for key, mask in events:
        callback = key.data #accept #1 key.data is accept # 2 key.data is read
        callback(key.fileobj, mask) #key.fileobj = file handle
                                            # 1 key.fileobj is sock # 2 key.fileobj is conn
import socket
import sys

messages = [b'This is the message.',
             b'It will be sent',
             b'in parts.',
server_address = ('localhost', 9999)

# Create a TCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)]
# Connect the socket to the port where the server is listening
print('connecting to %s port %s'% server_address)
for s in socks:

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"'% (s.getsockname(), message))

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print('%s: received "%s"'% (s.getsockname(), data))
        if not data:
            print('closing socket', s.getsockname())
Reference: https://cloud.tencent.com/developer/article/1091804 12.python process\coroutine\asynchronous IO-cloud + community-Tencent Cloud