0x0 前言
最近一直在研究 Python 中的多进程、多线程和模块 Cythonize
编译兼容等问题,踩了不少坑,在这里记录一下。
本文使用的 Cython
模块版本为 0.29.33
和 3.0.0a11
。
0x1 pydantic 与 Cython 的兼容性问题
pydantic
是 Python 中一个方便好用的数据模型管理模块,可借助类型标注等方法实现复杂数据类型的建模。然而,当模块需要进行 Cythonize
编译时,以下写法在 0.29.33
版本的 Cython
模块中会出现问题。
from pydantic import BaseModel
class TestModel(BaseModel):
num1: Optional[int] = None # ConfigError
报错信息如下:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "test.py", line 4, in init test
File "pydantic/main.py", line 222, in pydantic.main.ModelMetaclass.__new__
File "pydantic/fields.py", line 506, in pydantic.fields.ModelField.infer
File "pydantic/fields.py", line 436, in pydantic.fields.ModelField.__init__
File "pydantic/fields.py", line 546, in pydantic.fields.ModelField.prepare
File "pydantic/fields.py", line 578, in pydantic.fields.ModelField._set_default_and_type
pydantic.errors.ConfigError: unable to infer type for attribute "num1"
目前初步判断是 Cython
对 Optional
类型标注未完全支持导致的。理论上说,TestModel
类的 num1
成员变量类型应为 int
,但 Optional
的存在使其变为了可变类型 [int, NoneType]
,导致 Cythonize
后的 num1
变量类型无法正确推断。
更换 Cython
模块版本为 3.0.0a11
后,该问题解决。
0x2 模块 Cythonize 后的线程切换问题
在 Python 的 _thread
模块中,提供了一个方法 _thread.interrupt_main()
,可由子线程调用,向主线程发送一个 SIGINT
信号,从而使主线程产生 KeyboardInterrupt
异常,起到中断主线程执行的作用。
该方法与以下操作理论上是等价的:
import os
import signal
# 对子线程来说,getpid() 方法返回的就是当前进程的 PID
# SIGINT 信号默认情况下会被主线程处理
os.kill(os.getpid(), signal.SIGINT)
另一方面,在正常的 Python 解释器中,因全局解释器锁 GIL 的存在,一个解释器同一时刻只能有一个线程在真正执行。GIL 锁的释放(即线程切换)的条件主要包括两种:
- 线程进行需要等待的操作(如 I/O 操作及
time.sleep()
等),主动释放 GIL 锁 - 线程时间片耗尽,由解释器强制释放 GIL 锁
然而,当代码采用 Cython
模块进行 Cythonize 编译后,目前来看,GIL 锁释放的第二条件不再生效,即解释器在线程时间片耗尽后不再强制释放 GIL 锁。这导致的一个问题就是,如果某一线程在执行 CPU 密集型计算任务,则它无法被中断,其它线程也因此陷入长时间等待而无法运行。
按照之前的设计,每个子进程会开启一条监控线程(MonitorThread
),用于通过管道发送和接收数据,实现与主进程的通信。然而,因 Cythonize
后的 GIL 锁问题,若子进程的某条线程 长时间执行计算操作且无任何 I/O 操作,将会导致监控线程被持续阻塞,进而无法正常完成进程间通信。
例如,当主线程写法如下时,将会导致监控线程无法正常运行:
th = threading.Thread(target=mon_thread, daemon=True)
th.start()
i = 0
while True:
i += 1 # 持续密集计算
由于无法脱离对 Cython
模块的依赖,目前想到的针对该问题的缓解措施主要包括:
- 子进程中尽量避免长时间的 CPU 密集计算任务
- 对于密集计算且无任何 I/O 操作的任务,适当在循环中穿插
time.sleep()
或其它 I/O 操作出让 CPU - 主进程通过
pipe.poll()
方法等待子进程的监控线程返回数据的超时时间适当增大,重试次数适当增加