logo

Python Processing 和 Threading

wangzf / 2021-12-30


目录

进程、线程、并发、并行、高并发

多任务执行

进程(Process)、线程(Threading)

并发

并发是指在同一时刻只能有一条指令执行, 但多个进程指令被快速轮换执行, 使得在宏观上具有多个 进程/线程 执行的效果

并行

并行指在同一时刻有多条指令(任务)在多个 CUP 处理器 上同时执行

高并发

Python 多进程

在使用 multiprocessing 库实现多进程之前, 我们先来了解一下操作系统相关的知识.

import os
print("Process (%s) start..." % os.getpid())
# Only works on Unix/Linux/Mac
pid = os.fock()
if pid == 0:
    print(f"I am child process ({os.getpid()}) and my parent is {os.getppid()}.")
else:
    print(f"I ({os.getpid()}) just created a child process ({pid}).")

multiprocessing–基于进程的并行

概述

由于 Python 是跨平台的, 自然也应该提供一个跨平台的多进程支持. multiprocessing 模块就是跨平台版本的多进程模块. multiprocessing 模块提供了一个 Process 类来代表一个进程对象.

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
   print("Run child process %s (%s)..." % (name, os.getpid()))

if __name__ == "__main__":
   print("Parent process %s." % os.getpid())
   p = Process(target = run_proc, args = ("test",))
   print("Child process will start.")
   p.start()
   p.join()
   print("Child process end.")

Process 类

multiprocessing 中, 通过创建一个 Process 对象然后调用它的 start() 方法来生成进程. Processthreading.Thread API 相同. 一个简单的多进程程序示例是:

from multiprocessing import Process

def f(name):
   print("hello", name)

if __name__ == "__main__":
   p = Process(target = f, args = ("bob",))
   p.start()
   p.join()

要显示所涉及的各个进程 ID, 这是一个扩展示例:

from multiprocessing import Process
import os

def info(title):
   print(title)
   print("module name:", __name__)
   print("parent process:", os.getppid())
   print("process id:", os.getpid())

def f(name):
   info("function f")
   print("hello", name)

if __name__ == "__main__":
   info("main line")
   p = Process(target = f, args = ("bob",))
   p.start()
   p.join()

Pool

如果要启动大量的子进程, 可以用进程池的方式批量创建子进程.

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
   print("Run task %s (%s)..." % ())

子进程

进程间通信

Process 之间肯定是需要通信的, 操作系统提供了很多机制来实现进程间的通信. Python 的 multiprocessing 模块包装了底层机制, 提供了 Queue、Pipes 等多种方式来交换数据.

以 Queue 为例, 在父进程中创建两个子进程, 一个往 Queue 里写数据, 一个从 Queue 里读数据.

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
   print('Process to write: %s' % os.getpid())
   for value in ['A', 'B', 'C']:
      print('Put %s to queue...' % value)
      q.put(value)
      time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
   print('Process to read: %s' % os.getpid())
   while True:
      value = q.get(True)
      print('Get %s from queue.' % value)

if __name__=='__main__':
   # 父进程创建Queue, 并传给各个子进程: 
   q = Queue()
   pw = Process(target=write, args=(q,))
   pr = Process(target=read, args=(q,))
   # 启动子进程pw, 写入:
   pw.start()
   # 启动子进程pr, 读取:
   pr.start()
   # 等待pw结束:
   pw.join()
   # pr进程里是死循环, 无法等待其结束, 只能强行终止:
   pr.terminate()
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Python 多线程

调用 Thread 类的构造器创建线程

继承Thread类创建线程类

IO 编程

IO 编程

异步 IO

消息模型其实早在应用在桌面应用程序中了. 一个GUI程序的主线程就负责不停地读取消息并处理消息. 所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中, 然后由GUI程序的主线程处理.

由于GUI线程处理键盘、鼠标等消息的速度非常快, 所以用户感觉不到延迟. 某些时候, GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长, 此时, 用户会感觉到整个GUI程序停止响应了, 敲键盘、点鼠标都没有反应. 这种情况说明在消息模型中, 处理一个消息必须非常迅速, 否则, 主线程将无法及时处理消息队列中的其他消息, 导致程序看上去停止响应.

老张爱喝茶, 废话不说, 煮开水. 出场人物: 老张, 水壶两把(普通水壶, 简称水壶; 会响的水壶, 简称响水壶).

所谓同步异步, 只是对于水壶而言:

虽然都能干活, 但响水壶可以在自己完工之后, 提示老张水开了. 这是普通水壶所不能及的. 同步只能让调用者去轮询自己(情况2中), 造成老张效率的低下.

情况 1 和情况 3 中老张就是阻塞的, 媳妇喊他都不知道. 虽然 3 中响水壶是异步的, 可对于立等的老张没有太大的意义. 所以一般异步是配合非阻塞使用的, 这样才能发挥异步的效用.

协程

子程序调用总是一个入口, 一次返回, 调用顺序是明确的, 而协程的调用和子程序不同. 协程看上去也是子程序, 但执行过程中, 在子程序内部可中断, 然后转而执行别的子程序, 在适当的时候再返回来接着执行.

因为协程是一个线程执行, 那怎么利用多核CPU呢? 最简单的方法是多进程+协程, 既充分利用多核, 又充分发挥协程的高效率, 可获得极高的性能.

Python 对协程的支持是通过 generator 实现的, 在 generator 中, 不但可以通过 for 循环来迭代, 还可以不断调用 next() 函数获取由 yield 语句返回的下一个值. 但是 Python 的 yield 不但可以返回一个值, 它还可以接收调用者发出的参数

asyncio、async/await、aiohttp

asyncio

asyncio 是 Python3.4 一如的标准库, 直接内置了对异步 IO 的支持. asyncio 的编程模型就是一个消息循环. 从 asyncio 模块中直接获取一个 EventLoop 的引用, 然后把需要执行的协程扔到 EventLoop 中执行, 就实现了异步 IO.

import asyncio

@asyncio.coroutine
def hello():
    print("Hello, world!")
    # 异步调用 asyncio.sleep(1)
    r = yield from asyncio.sleep(1)
    print("Hello, again!")

# 获取 EventLoop
loop = asyncio.get_event_loop()
# 执行 coroutine
loop.run_until_complete(hello())
loop.close()

@asyncio.coroutine把一个generator标记为coroutine类型, 然后, 我们就把这个coroutine扔到EventLoop中执行.

hello()会首先打印出Hello world!, 然后, yield from语法可以让我们方便地调用另一个generator. 由于asyncio.sleep()也是一个coroutine, 所以线程不会等待asyncio.sleep(), 而是直接中断并执行下一个消息循环. 当asyncio.sleep()返回时, 线程就可以从yield from拿到返回值(此处是None), 然后接着执行下一行语句.

把asyncio.sleep(1)看成是一个耗时1秒的IO操作, 在此期间, 主线程并未等待, 而是去执行EventLoop中其他可以执行的coroutine了, 因此可以实现并发执行.

import threading
import asyncio

@asyncio.coroutine
def hello():
   print("Hello, world! (%s)" % threading.currentThread())
   yield from asyncio.sleep(1)
   print("Hello again! (%s)" % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暂停约1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

async/await

用 asyncio 提供的 @asyncio.coroutine 可以把一个 generator 标记为 coroutine 类型, 然后在 coroutine 内部用 yield from 调用另一个 coroutine 实现异步操作.

为了简化并更好地标识异步 IO, 从 Python3.5 开始引入了新的语法 async 和 await, 可以让 coroutine 的代码更简洁易读.

请注意, async 和 await 是针对 coroutine 的新语法, 要使用新的语法, 只需要两步简单的替换:

示例:

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")
async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

aiohttp

asyncio 可以实现单线程并发 IO 操作. 如果仅用在客户端, 发挥的威力不大. 如果把 asyncio 用在服务器端, 例如 Web 服务器, 由于 HTTP 连接就是 IO 操作, 因此可以用单线程 + coroutine 实现多用户的高并发支持.

asyncio 实现了 TCP、UDP、SSL 等协议, aiohttp 则是基于 asyncio 实现的 HTTP 框架.

$ pip install aiohttp

代码

import asyncio
from aiohttp import web
import async

async def index(request):
await asyncio.sleep(0.5)
return web.Response(body = b"<h1>Index</h1>")

async def hello(request):
await asyncio.sleep(0.5)
text = f"<h1>hello, {request.match_info["name"]}!</h1>"
return web.Response(body = text.encode("utf-8"))

async def init(loop):
app = web.Application(loop = loop)
app.router.add_router("GET", "/", index)
app.router.add_router("GET", "/hello/{name}", hello)
srv = await loop.create_server(app.make_handler(), "127.0.0.1", 8000)
print("Server started at http://127.0.0.1:8000...")
return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

参考资料

https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376