在现代软件开发中,并发编程是提高程序性能和响应能力的重要手段。Python提供了三种主要的并发编程方式:线程(Threading)、进程(Multiprocessing)和协程(Coroutines)

基础概念

进程

进程是操作系统进行资源分配和调度的基本单位。

核心特征

  • 拥有独立的内存空间(代码、数据、堆栈)

  • 包含至少一个执行线程

  • 系统资源分配的基本单元(CPU时间、内存、文件句柄等)

  • 进程间相互隔离,一个进程的崩溃不会影响其他进程

  • 进程间通信需要特殊机制(管道、消息队列、共享内存等)

在操作系统中的表现

  • 每个进程有唯一的进程ID(PID)

  • 进程上下文切换涉及内存映射、寄存器状态、打开文件等完整状态的保存和恢复

线程

线程是进程内的执行单元,是CPU调度的基本单位。

核心特征

  • 同一进程内的多个线程共享进程的内存空间和资源

  • 每个线程有独立的执行栈和寄存器状态

  • 线程切换开销小于进程切换

  • 线程间可以直接访问共享数据,但也需要同步机制来避免竞争条件

与进程的关系

  • 线程存在于进程内部,是进程的"子执行流"

  • 一个进程可以包含多个线程,这些线程并发执行

  • 主线程结束通常会导致整个进程结束

协程

协程是一种用户态的轻量级线程,由程序控制而非操作系统内核调度,并且不需要像线程或进程那样引入显式的锁或同步机制来处理并发访问问题。

核心特征

  • 在单个线程内实现多个执行流的切换

  • 切换由程序显式控制(通过await、yield等关键字)

  • 切换不涉及内核态操作,开销极小

  • 共享所在线程的所有资源

  • 通常用于异步I/O操作和高并发场景

与传统线程的区别

  • 线程由操作系统调度,协程由程序控制调度

  • 线程是抢占式的,协程是协作式的

  • 线程切换需要内核介入,协程切换完全在用户空间完成

三者的关系与区别

进程 (独立资源单位)
│
├── 线程1 (CPU调度单位)
│   ├── 协程A
│   ├── 协程B
│   └── 协程C
│
├── 线程2
│   ├── 协程D
│   └── 协程E
│
└── 线程3
    └── 协程F

关键区别如下:

  1. 资源分配:进程有独立资源,线程共享进程资源,协程共享线程资源

  2. 调度方式:进程和线程由操作系统调度,协程由程序控制调度

  3. 切换开销:进程 > 线程 > 协程

  4. 数据共享:进程间共享复杂,线程间共享简单,协程间共享最简单

  5. 并发粒度:进程提供宏观并发,线程提供中观并发,协程提供微观并发

三者的最佳应用场景

进程的最佳应用场景

  1. CPU密集型计算:需要大量CPU计算(如流体动力学模拟、天气预报模型、科学计算、数据挖掘和机器学习算法等CPU利用率较高的任务),计算过程中很少等待I/O

  2. 需要进程隔离的任务:任务需要高度稳定性,一个任务失败不应影响其他任务,如:

    1. 数据处理流水线的不同阶段

    2. 并行运行的独立微服务

    3. 需要不同运行环境的任务

线程的最佳应用场景

  1. I/O密集型任务:任务主要时间花在等待I/O操作上(如读取或写入文件到硬盘、通过网络发送和接收数据,涉及网络协议和数据包的传输、键盘输入,鼠标点击,屏幕输出、输出到控制台或从控制台读取数据等任务)

  2. 需要共享状态的并发任务:多个任务需要频繁访问和修改共享数据(如多个任务修改同一份文件)

  3. GUI应用程序:需要保持界面响应,同时执行后台任务

协程的最佳应用场景

  1. 高并发网络I/O:需要同时处理成千上万的网络连接(http请求)

  2. 实时数据处理流:需要处理连续的数据流,多个处理阶段可以并行(数据的生产、处理、消费三个阶段同时进行)

  3. Web服务器和API网关:需要高效处理大量并发的短时请求(后端服务器)

进程(multiprocessing)

Python的multiprocessing模块用于实现多进程编程的标准库。它提供了创建和管理进程的功能,使得在Python中可以轻松地利用多核处理器执行并行任务。

Process

multiprocessing模块提供了Process类,用于创建新的子进程。可以通过实例化Process类并传入一个可调用对象(通常是函数)来创建进程。可调用对象将在新的进程中执行。

Process类的初始化语法如下:

Process(group, target, name, args, kwargs, daemon)

参数如下:

  1. group: 进程组。它是一个ProcessGroup 对象的可选参数。默认为None。

  2. target: 要执行的函数或方法。它是必需的参数,用于指定子进程要执行的任务。

  3. name: 进程的名称。它是一个字符串类型的可选参数。如果不提供名称,系统会自动分配一个唯一的名称给进程。

  4. args: 传递给目标函数的参数值。它是一个元组类型的可选参数。如果目标函数需要接收多个参数,可以通过这个参数传递。

  5. kwargs: 传递给目标函数的关键字参数。它是一个字典类型的可选参数。

  6. daemon: 进程是否作为守护进程运行。它是一个布尔类型的可选参数,默认值为 False。如果将守护进程标记为 True,当主进程结束时,守护进程也会随之结束。

下面是一个简单的例子,用于快速掌握Process的使用:

from multiprocessing import Process
​
def func():
    print("Hello from a child process!")
​
if __name__ == '__main__':
    p = Process(target=func)
    p.start() # 启动新的进程
    p.join() # 等待子进程执行完成后才执行下面的语句
    p.kill() # 杀死子进程

以下介绍常见的可调用的Process方法:

方法

说明

start()

启动进程,调用进程中的run()方法。

run()

进程启动时运行的方法,正是它去调用target指定的函数,若自定义线程类,该类一定要实现该方法

terminate()

强制终止进程,不会进行任何清理操作。

如果该进程终止前,创建了子进程,那么该子进程在其强制结束后变为僵尸进程;

如果该进程还保存了一个锁那么也将不会被释放,进而导致死锁。

使用时,要注意。

is_alive()

判断某进程是否存活,存活返回True,否则False。

join(timeout)

主线程等待子线程终止。timeout为可选择超时时间;

需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

以下为Process方法中常见的属性:

方法

说明

daemon

默认值为False,如果设置为True,代表该进程为后台守护进程;

当该进程的父进程终止时,该进程也随之终止;

并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前

name

进程名称。

pid

进程pid

exitcode

进程运行时为None,如果为-N,表示被信号N结束了。

authkey

进程身份验证,默认是由os.urandom()随机生成32字符的字符串。

这个键的用途是设计涉及网络连接的底层进程间的通信提供安全性,

这类连接只有在具有相同身份验证才能成功

以下为几个常见的例子:

  1. 并行计算任务: 将一个计算密集型任务拆分成多个子任务,并使用多个进程并行执行。这可以显著提高计算速度。例如,计算素数的示例:

import multiprocessing
​
def is_prime(num):
    """检查一个数字是否为素数"""
    if num < 2:
        return False
    for i in range(2, int(num**0.5) + 1):
        if num % i == 0:
            return False
    return True
​
if __name__ == '__main__':
    numbers = [2, 3, 4, 5, 6, 7, 8, 9, 10]
    processes = []
    
    for number in numbers:
        p = multiprocessing.Process(target=is_prime, args=(number,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
  1. 并行执行独立的任务: 使用多个进程同时执行独立的任务,提高整体处理速度。例如,使用进程下载多个文件:

import multiprocessing
import urllib.request
​
def download_file(url, filename):
    """下载文件"""
    urllib.request.urlretrieve(url, filename)
    print(f'Downloaded: {filename}')
​
if __name__ == '__main__':
    urls = [
        ('http://example.com/file1.txt', 'file1.txt'),
        ('http://example.com/file2.txt', 'file2.txt'),
        ('http://example.com/file3.txt', 'file3.txt')
    ]
    processes = []
    
    for url, filename in urls:
        p = multiprocessing.Process(target=download_file, args=(url, filename))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

进程通信(Queue/Pipe)

多个进程之间需要进行数据交换和通信时,multiprocessing模块提供了多种方式。其中最常用的是使用队列(Queue)和管道(Pipe)。

队列可以通过multiprocessing.Queue类来创建,进程可以使用put()方法将数据放入队列,而其他进程可以使用get()方法从队列中获取数据。

使用multiprocessing.Queue()创建一个空的队列对象。

以下是使用队列进行进程间通信的示例:

from multiprocessing import Process, Queue
​
def worker(q):
    data = q.get()
    print("Received data:", data)
​
if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
​
    q.put("Hello from the main process!")
    p.join()

下面是Queue类的一些常用方法:

方法

说明

put(obj, block, timeout)

将一个项目放入队列。如果队列已满且block参数为True,则会阻塞直到队列有空间可用。

如果block参数为False,则会立即引发Full异常。

timeout参数指定在阻塞模式下等待的时间(以秒为单位)。

get(block, timeout)

从队列中获取并移除一个项目。

如果队列为空且block参数为True,则会阻塞直到队列有项目可用。

如果block参数为False,则会立即引发Empty异常。

timeout参数指定在阻塞模式下等待的时间(以秒为单位)。

empty()

判断队列是否为空。

full()

判断队列是否已满。

qsize()

返回队列中当前项目的数量。

close()

关闭队列。关闭后,不能再往队列中放入或获取项目。

join_thread()

等待队列关联的线程终止。这对于确保所有数据都被处理完毕非常有用。

除了队列,multiprocessing模块还提供了Pipe类,它提供了一种双向通信通道,可以用于在两个进程之间传递数据。

以下是使用管道进行进程间通信的示例:

import multiprocessing
​
def sender(pipe):
    """发送数据的进程"""
    message = 'Hello from sender!'
    pipe.send(message)  # 发送数据到管道
    pipe.close()  # 关闭管道连接
​
def receiver(pipe):
    """接收数据的进程"""
    message = pipe.recv()  # 从管道接收数据
    print(f'Message received: {message}')
    pipe.close()  # 关闭管道连接
​
if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()  # 创建管道
    
    # 创建发送数据的进程
    sender_process = multiprocessing.Process(target=sender, args=(parent_conn,))
    
    # 创建接收数据的进程
    receiver_process = multiprocessing.Process(target=receiver, args=(child_conn,))
    
    # 启动进程
    sender_process.start()
    receiver_process.start()
    
    # 等待进程执行完毕
    sender_process.join()
    receiver_process.join()

Pipe的初始化语法如下:

multiprocessing.Pipe(duplex)

改语法用于创建一个管道对象。duplex参数指定管道的类型,如果duplex为True,则为全双工管道,可以同时进行读取和写入;如果duplex为False,则为半双工管道,只能在一个方向上进行读取或写入。

下面是Pipe类的一些常用方法:

方法

说明

send(obj)

向管道发送一个对象。

recv()

从管道接收一个对象。

close()

关闭管道。关闭后,不能再进行发送或接收操作。

poll(timeout)

检查管道是否有数据可接收,如果有则返回True,否则返回False。

timeout参数指定在非阻塞模式下等待的时间(以秒为单

位)。

fileno()

返回管道的文件描述符。在某些情况下,可以使用该文件描述符

进行更高级的操作。

进程池(Pool)

在某些情况下,我们可能需要同时创建多个进程来执行任务。multiprocessing模块提供了Pool类,它可以方便地创建进程池。

使用进程池可以有效地管理多个进程的创建和执行,尤其在处理大量数据或需要执行密集计算的场景中特别有用。

Pool类的初始化语法如下:

multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild)

以下是multiprocessing.Pool()函数中参数的作用:

  1. processes:指定进程池中的进程数。默认值为None,表示使用系统的 CPU 核心数。可以设置一个整数值来限制进程池中的进程数。

  2. initializer:指定一个可调用对象,在每个进程启动时调用。可以用于执行一些初始化操作。

  3. initargs:传递给initializer函数的参数,以元组的形式提供。

  4. maxtasksperchild:指定每个工作进程在完成一定数量的任务后将会被替换。默认值为None,表示工作进程将一直存在。可以使用一个整数值来设置任务数的阈值。

以下是创建进程池的示例:

import multiprocessing

def initialize(arg1, arg2):
    # 使用arg1和arg2执行初始化操作的代码
    print(arg1, arg2)
    
# 创建进程数为4的进程池,并在每个进程启动时调用initialize函数,并传递参数。每个工作进程完成10个任务后将被替换
pool = multiprocessing.Pool(processes=4, initializer=initialize, initargs=(1, 2), maxtasksperchild=10)

multiprocessing.Pool()函数常见的方法如下:

方法

说明

apply()

提交一个任务到进程池,并阻塞地等待任务执行完成,然后返回结果。

apply_async()

提交一个任务到进程池,并非阻塞地等待任务执行完成。

返回一个AsyncResult对象,可以使用get()方法获取任务的结果。

map()

并行处理一个可迭代对象中的多个任务,并返回结果列表。任务将按顺序执行,直到完成。

imap()

并行处理一个可迭代对象中的多个任务,并返回一个迭代器,可以逐个获取任务的结果。

任务将按顺序执行。

imap_unordered()

与imap()方法类似,但是任务的结果将按照完成的顺序返回,而不是按照提交的顺序。

close()

停止接受新的任务。已提交但尚未执行的任务将继续执行。

join()

阻塞地等待所有任务执行完成,然后关闭进程池。

  1. 使用apply_async()提交任务并获取结果

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    result = pool.apply_async(square, (5,))
    print(result.get())  # 输出: 25
    pool.close()
    pool.join()
  1. 使用map()方法并行处理多个任务

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.map(square, numbers)
    print(results)  # 输出: [1, 4, 9, 16, 25]
    pool.close()
    pool.join()
  1. 使用imap()方法迭代获取任务结果

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.imap(square, numbers)
    for result in results:
        print(result)  # 逐个输出: 1, 4, 9, 16, 25
    pool.close()
    pool.join()

进程池(ProcessPoolExecutor)

上面使用了Pool去创建进程池,从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor(进程池)ProcessPoolExecutor(进程池)两个类。比较推荐使用该类进行线程池的创建

下面是一个使用ProcessPoolExecutor创建进程池的示例:

import concurrent.futures

# 定义一个任务函数
def task(name):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(2)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    # 提交任务给进程池
    future1 = executor.submit(task, "Task 1")
    future2 = executor.submit(task, "Task 2")
    future3 = executor.submit(task, "Task 3")

    # 获取任务的返回值
    result1 = future1.result()
    result2 = future2.result()
    result3 = future3.result()

    print(f"任务返回值:{result1}, {result2}, {result3}")
  1. 使用with语句 ,通过ProcessPoolExecutor构造实例,同时传入max_workers参数来设置进程池中最多能同时运行的进程数目。

  2. 使用submit函数来提交进程需要执行的任务到进程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

  3. 通过使用done()方法判断该任务是否结束。

  4. 使用result()方法可以获取任务的返回值。

在进行submit的时候,即使提交了超过max_workers标识的最大运行进程数量,其他进程会等待这些进程运行完成,详情看如下代码:

from concurrent.futures import ThreadPoolExecutor

# 定义一个任务函数
def task(name, wait_time):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(wait_time)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建进程池
with ThreadPoolExecutor(max_workers=20) as executor:
    # 提交任务给进程池
    Thread_list = [executor.submit(task, i, 2) for i in range(100)]

result_list = []
for thread in Thread_list:
    result_list.append(thread.result())

print(f"任务返回值:{result_list}")

可以看到,我们提交了100个进程,但是实际运行的时候是20个进程,其他进程会等待这20个进程运行完毕后再运行。

虽然进程的运行的顺序是随机的,但是上面代码中,我们用list封装之后,返回的结果依然是按照我们想要的顺序去返回。这样子就可以保证多个进程并发运行后依然可以按照顺序得到我们想要的结果。

锁(Lock)

在多进程编程中,多个进程可能会同时访问共享的资源,为了避免竞争条件和数据不一致问题,可以使用锁(Lock)来同步进程的访问。

multiprocessing模块提供了Lock类,可以使用multiprocessing.Lock()创建锁对象,并在需要时使用acquire()方法获取锁,使用release()方法释放锁。

以下是使用锁的示例:

from multiprocessing import Process, Lock

def func(lock):
    lock.acquire()
    try:
        print("Hello from a locked process!")
    finally:
        lock.release()

if __name__ == '__main__':
    lock = Lock()
    processes = []

    for _ in range(5):
        p = Process(target=func, args=(lock,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

在子进程中,使用lock.acquire()获取锁,然后执行需要同步的代码,最后使用lock.release()释放锁。通过使用锁,可以确保一次只有一个进程可以访问被锁定的代码块,从而避免数据竞争和不一致性。

线程(threading)

threading是 Python 标准库中用于多线程编程的模块。它提供了线程相关的类和函数,使得在 Python 中可以方便地创建和管理多线程应用程序。

Thread

Threadthreading模块中最重要的类之一,用于表示一个线程。通过创Thread类的实例,可以在程序中启动一个新线程。

创建Thread的语法如下:

Thread(target, args, kwargs)
  1. target: 指定线程要执行的函数

  2. argskwargs: 可选参数,用于传递给目标函数的参数。

threading的简单使用举例如下:

def print_numbers(count, thread_name, pramB, pramC=None):
    for i in range(count):
        print(i)
        time.sleep(1)
        print(f"线程名: {thread_name}, 参数B: {pramB}, 参数C: {pramC}")

# 创建线程实例
# target中传入的是函数名, thread会调用该函数
# args中的参数需要以列表或元组的形式传入, 并且根据函数定义的参数顺序来传入参数, 默认的可以不传, 就和正常的传参是一致的
threadA = threading.Thread(target=print_numbers, args=[4, "A线程", "参数A"])
threadB = threading.Thread(target=print_numbers, args=(4, "B线程", "参数B"))

# 启动线程
threadA.start()
threadB.start()

# 使用is_alive()方法检查线程是否仍在运行
if threadA.is_alive():
    print("A线程正在运行......")
else:
    print("A线程运行完毕")

if threadB.is_alive():
    print("B线程正在运行......")
    
else:
    print("B线程运行完毕")

# 等待线程执行完成(如果不使用join, 主进程会提前结束(即运行python文件的这个线程会结束, 而python开启的这个线程会继续运行直到结束为止))
threadA.join()
threadB.join()

print("所有线程运行均已完毕")

下面是Thread类常见的方法,注意,在python中线程并不能手动杀死,只能等待其自行结束

方法

说明

start()

启动线程,并开始执行目标函数。

join(timeout=None)

等待线程执行完成。

默认情况下,join()方法将阻塞调用线程,直到被调用线程执行完成。

可以设置timeout参数来指定等待的时间。

is_alive()

检查线程是否正在执行。

下面是Thread类常见的属性:

属性

说明

name

表示线程的名称。

ident

表示线程的标识符。

daemon

表示表示线程是否为守护线程

守护线程的创建

守护线程是一种在后台运行的线程,当所有非守护线程(包括主线程)结束时,无论守护线程是否执行完成,它们都会自动退出。

守护线程的实际应用场景如下:

  1. 后台监控和心跳检测

  2. 日志记录器

  3. 缓存刷新器

在python中,创建守护线程的有两种方法

第一种方法,创建时设daemon参数

import threading
import time

def daemon_task():
    while True:
        print("守护线程正在运行...")
        time.sleep(1)

# 创建守护线程
daemon_thread = threading.Thread(target=daemon_task, daemon=True)
daemon_thread.start()

# 主线程工作
print("主线程开始工作")
time.sleep(3)
print("主线程结束,守护线程将自动退出")

第二种方法,创建后设daemon属性

import threading
import time

def daemon_task():
    while True:
        print("守护线程正在运行...")
        time.sleep(1)

# 创建线程
daemon_thread = threading.Thread(target=daemon_task)
# 设置为守护线程(必须要在线程开启前创建)
daemon_thread.daemon = True
daemon_thread.start()

# 主线程工作
print("主线程开始工作")
time.sleep(3)
print("主线程结束,守护线程将自动退出")

注意:

  1. 创建守护线程的方法如果在ipynb中执行会失效,因为ipynb在执行完单元格后并未真正关闭了线程

  2. 守护线程被终止时不会执行 finally 块或进行资源清理

  3. 对于需要清理资源的守护线程,使用事件机制(详情请看”线程通信“)

锁(Lock

线程锁(Thread Lock)是一种线程同步机制,用于确保在多个线程同时访问共享资源时,通过使用线程锁,可以确保同一时间只有一个线程可以访问临界区(Critical Section)代码,从而保证了共享资源的安全性,避免并发访问导致的数据不一致或竞态条件(Race Condition)问题。

但需要注意的是,过度使用线程锁可能导致线程的串行执行,从而降低并发性能。因此,在设计并发程序时,需要权衡锁的使用和粒度,避免不必要的锁竞争。

在Python中,可以使用threading.Lock类来创建一个线程锁对象,通过调用锁对象的acquire方法获取锁,在临界区代码执行完成后调用release方法释放锁。

下面是一个例子,用于展示threading.Lock类的使用。

import threading

eu = ExcelUtils(FilePathUtils.path_joint_str(project_dir, "resources", "test_threading.xlsx"), "test_threading")
engine = eu.get_engine()

lock = threading.Lock() # 初始化锁

def write_excel(thread_name, write_row):
    lock.acquire() # 获取锁
    try:
        engine.insert_value("A", write_row, DateTimeUtils.get_day(pattern=DateTimeUtils.ACCURACY_TIME), False)
        engine.insert_value("B", write_row, thread_name, False)
        engine.insert_value("C", write_row, randint(0, 10000000), False)
        engine.save()
    finally:
        lock.release() # 释放锁

# 创建10个线程并开启它们
threads = []
write_row = 2
for i in range(10):
    thread = threading.Thread(target=write_excel, args=(f"Thread {i}", write_row))
    thread.start()
    threads.append(thread)
    write_row += 1

# 等待所有线程结束
for thread in threads:
    thread.join()

engine.close()

线程通信

下面是一些常用的线程通信机制:

共享变量

多个线程可以访问和修改同一个全局变量或对象,通过对变量的读写操作来进行通信。但是需要注意的是,共享变量可能存在竞争条件(race condition),需要采取同步机制(如锁)来保证线程安全。

import threading

# 共享变量
shared_variable = 0
lock = threading.Lock()

def increment():
    global shared_variable
    lock.acquire()
    try:
        shared_variable += 1
    finally:
        lock.release()

def decrement():
    global shared_variable
    lock.acquire()
    try:
        shared_variable -= 1
    finally:
        lock.release()

# 创建线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print(shared_variable)

队列(Queue)

queue.Queue提供了一个线程安全的队列实现,可以用于在线程之间传递数据。线程可以往队列中放入数据,也可以从队列中获取数据。这种方式是线程安全的,无需手动添加锁机制。

import threading
from queue import Queue

# 创建队列
shared_queue = Queue()

for i in range(20):
    shared_queue.put(i)

def provider():
    # 队列中的元素也可以由线程进行输入
    for i in range(20):
        shared_queue.put(i)

def consumer():
    # 判断队列是否为空
    while not shared_queue.empty():
        item = shared_queue.get() # 每一次调用get()方法, 队列中的元素就会减少一个
        print(item)

# 创建线程
threadA = threading.Thread(target=consumer)
threadB = threading.Thread(target=consumer)
provider_thread = threading.Thread(target=provider)

# 等带线程生产元素结束
provider_thread.start()
provider_thread.join()

# 启动线程消耗队列中的元素
threadA.start()
threadB.start()

threadA.join()
threadB.join()

事件(Event)

threading.Event提供了一个简单的线程间通信方式,一个线程可以等待事件的触发,另一个线程可以触发事件。通过事件可以实现线程的同步和协作。

import time
import threading

# 创建事件
event = threading.Event()

def wait_for_event(thread_name):
    print(f"线程{thread_name}正在等待其他线程触发事件")
    event.wait() # 等待事件
    print(f"其他线程的事件已触发, 线程{thread_name}会继续执行代码")

def trigger_event(thread_name):
    time.sleep(3)
    print(f"{thread_name}触发事件")
    event.set() # 触发事件

# 创建线程
threadA = threading.Thread(target=wait_for_event, args=("ThreadA", ))
threadB = threading.Thread(target=trigger_event, args=("ThreadB", ))

# 启动线程
threadA.start()
threadB.start()

# 等待线程结束
threadA.join()
threadB.join()

可以看到,threadA 在等待其他事件触发后,才会继续执行代码

线程池(ThreadPoolExecutor)

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor(线程池)ProcessPoolExecutor(进程池)两个类。

下面是一个使ThreadPoolExecutor创建线程池的示例:

from concurrent.futures import ThreadPoolExecutor

eu = ExcelUtils(FilePathUtils.path_joint_str(project_dir, "resources", "test_threading.xlsx"), "test_threading")
engine = eu.get_engine()

lock = threading.Lock() # 初始化锁

# 定义一个任务函数
def write_excel(thread_name, write_row):
    lock.acquire() # 获取锁
    try:
        engine.insert_value("A", write_row, DateTimeUtils.get_day(pattern=DateTimeUtils.ACCURACY_TIME), False)
        engine.insert_value("B", write_row, thread_name, False)
        engine.insert_value("C", write_row, randint(0, 10000000), False)
        engine.save()
    finally:
        lock.release() # 释放锁
    return "save success"

# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    write_row = 2
    for i in range(10):
        args=(f"Thread {i}", write_row)
        # 提交任务给线程池
        future = executor.submit(write_excel, *args)
        # 等待线程完成并获取任务的返回值(返回值是线程函数的返回值, 如果没有任何返回值就是None)
        result = future.result()
        write_row += 1
        print(f"任务返回值: {result}")

engine.close()

在进行submit的时候,即使提交了超过max_workers标识的最大运行线程数量,其他线程会等待这些进程运行完成,详情看如下代码:

from concurrent.futures import ThreadPoolExecutor

# 定义一个任务函数
def task(name, wait_time):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(wait_time)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建线程池
Thread_list = []
with ThreadPoolExecutor(max_workers=20) as executor:
    # 提交任务给线程池
    for i in range(100):
        Thread_list.append(executor.submit(task, i, 2))

result_list = []
for thread in Thread_list:
    result_list.append(thread.result())

print(f"任务返回值:{result_list}")

可以看到,我们提交了100个线程,但是实际运行的时候是20个线程,其他线程会等待这20个线程运行完毕后再运行。

虽然线程的运行的顺序是随机的,但是上面代码中,我们用list封装之后,返回的结果依然是按照我们想要的顺序去返回。这样子就可以保证多个线程并发运行后依然可以按照顺序得到我们想要的结果。

concurrent.futures.as_completed

as_completedconcurrent.futures模块中一个极其实用的工具,它用于异步获取并发任务的完成结果

它接收一个 Future list/tuple/dict,返回一个迭代器,按任务完成的顺序产出结果

它适合处理多个任务,谁先完成现处理谁

from concurrent.futures import ThreadPoolExecutor, as_completed

eu = ExcelUtils(FilePathUtils.path_joint_str(project_dir, "resources", "test_threading.xlsx"), "test_threading")
engine = eu.get_engine()

lock = threading.Lock() # 初始化锁

# 定义一个任务函数
def write_excel(thread_name, write_row):
    lock.acquire() # 获取锁
    try:
        engine.insert_value("A", write_row, DateTimeUtils.get_day(pattern=DateTimeUtils.ACCURACY_TIME), False)
        engine.insert_value("B", write_row, thread_name, False)
        engine.insert_value("C", write_row, randint(0, 10000000), False)
        engine.save()
    finally:
        lock.release() # 释放锁
    return "save success"

# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    write_row = 2
    futures = []
    for i in range(10):
        args=(f"Thread {i}", write_row)
        # 提交任务给线程池
        future = executor.submit(write_excel, *args)
        futures.append(future)
        write_row += 1

for future in as_completed(futures):
    # 获取任务的返回值(返回值是线程函数的返回值, 如果没有任何返回值就是None)
    result = future.result()
    print(f"任务返回值: {result}")

engine.close()

concurrent.futures.wait

wait 是 concurrent.futures 模块中的核心同步工具。它是比 join() 更强大的批量等待机制,可以批量等待并发任务,并根据不同条件灵活控制等待的结束时机。

它会等待一组并发任务达到指定状态,而不是逐个等待:

  • FIRST_COMPLETED:任意一个完成

  • FIRST_EXCEPTION:任意一个异常

  • ALL_COMPLETED:全部完成

完成后,会返回(done, not_done)两个集合

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED

eu = ExcelUtils(FilePathUtils.path_joint_str(project_dir, "resources", "test_threading.xlsx"), "test_threading")
engine = eu.get_engine()

lock = threading.Lock() # 初始化锁

# 定义一个任务函数
def write_excel(thread_name, write_row):
    lock.acquire() # 获取锁
    try:
        engine.insert_value("A", write_row, DateTimeUtils.get_day(pattern=DateTimeUtils.ACCURACY_TIME), False)
        engine.insert_value("B", write_row, thread_name, False)
        engine.insert_value("C", write_row, randint(0, 10000000), False)
        engine.save()
    finally:
        lock.release() # 释放锁
    return "save success"

# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    write_row = 2
    futures = []
    for i in range(10):
        args=(f"Thread {i}", write_row)
        # 提交任务给线程池
        future = executor.submit(write_excel, *args)
        futures.append(future)
        write_row += 1
    # 执行等待策略
    done, not_done = wait(futures, return_when=ALL_COMPLETED)
    print("已经完成的任务有{len(done)}个, 未完成的任务有{len(not_done)}个")

engine.close()

协程(asyncio

Python的协程可以通过asyncio模块提供支持。协程函数使用async关键字定义,内部使用await关键字来挂起执行,让出控制权给其他协程或者主程序,等待某些操作完成后再继续执行。asyncio用于编写基于事件循环(event loop)和协程(coroutine)的并发代码。它可以处理异步I/O操作、并发任务和网络通信等事务。

以下是asyncio的主要组件和功能:

  • 事件循环(Event Loop)asyncio基于事件循环模型,通过一个单线程的事件循环来调度协程的执行。事件循环负责处理并发任务的调度和事件的分发,协程在事件循环中注册并等待执行。

  • 协程(Coroutine):使用asyncawait关键字定义的异步函数。协程是可以被挂起和恢复的轻量级执行单元,可以在执行过程中让出控制权并等待异步操作完成。协程之间可以方便地切换执行,提供了一种高效的并发编程模型。

  • 异步I/O操作:asyncio提供了各种用于异步I/O操作的工具和API。例如,asyncio.open()可以异步地打开文件,asyncio.wait()可以等待多个异步任务完成,asyncio.sleep()可以创建一个暂停执行的延时操作等等。

  • 协程调度器(Coroutine Scheduler):事件循环作为协程的调度器,负责协程的调度和执行。它根据协程的状态和I/O可用性来决定哪个协程继续执行,哪个协程被挂起等待。调度器还处理协程之间的通信和同步。

  • 异步上下文管理器(Asynchronous Context Manager):asyncio提供了async withasync for语法来支持异步上下文管理器,类似于常规上下文管理器的功能,但在异步代码中更适用。

举个简单的例子:

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("World")

async def main():
    await asyncio.gather(hello(), hello(), hello())

asyncio.run(main())

在定义协程函数时,使用async关键字定义协程函数,并在需要的地方使用await关键字进行挂起操作。

通过asyncio.run()函数来运行main()协程函数,它会创建一个事件循环(event loop)并执行协程。协程函数的执行是非阻塞的,因此在执行await asyncio.sleep(1)的时候不会阻塞主程序的其他任务。

需要注意的是,协程只在异步上下文中才能运行,例如在使用asyncio库的情况下。在其他情况下,协程函数的执行将退化为普通函数的执行,不能实现真正的并发。

接下来我们可以再举几个常用的例子来深入认识asyncio

并发任务处理

asyncio可以处理并发任务,提高程序的执行效率。通过asyncio.gather()可以同时运行多个协程任务。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(1)
    print("Task 1 completed")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(2)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

异步文件操作

使用asyncio实现异步文件的读写操作。通过asyncio.open()的文件读写方法,可以在进行文件操作时不阻塞主程序的执行。

import asyncio

async def read_file(filename):
    try:
        async with asyncio.open(filename, "r") as f:
            contents = await f.read()
            print(f"Read contents: {contents}")
    except FileNotFoundError:
        print(f"File {filename} not found")

async def write_file(filename, contents):
    async with asyncio.open(filename, "w") as f:
        await f.write(contents)
        print(f"File {filename} written")

async def main():
    await write_file("test.txt", "Hello, world!")
    await read_file("test.txt")

asyncio.run(main())

异步HTTP请求

asyncio可以执行异步的HTTP请求,从而提高网络通信的效率。可以使用aiohttp库(使用pip install aiohttp安装该第三方库)来进行HTTP请求的管理和处理。

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "https://api.example.com"
    response = await fetch(url)
    print(response)

asyncio.run(main())

定时任务

asyncio可以执行定时任务,例如定期执行某个协程函数或者执行一些定时操作。可以使用asyncio.sleep()asyncio.ensure_future()来实现定时任务。

import asyncio

async def periodic_task():
    while True:
        print("Periodic task executed")
        await asyncio.sleep(1)

async def main():
    task = asyncio.ensure_future(periodic_task())
    await asyncio.sleep(5)
    task.cancel()

asyncio.run(main())

协程(Gevent)

gevent是一个基于greenlet的第三方库,提供了对协程的支持和并发编程的能力。它允许开发者使用类似于标准线程的方式来编写异步代码,从而简化并发编程的复杂性。

安装

pip install gevent

常用API

  1. spawn(func, *args, **kwargs):创建一个协程对象并执行指定的函数。可以传递额外的参数给函数。返回一个Greenlet对象,可以用于等待协程执行完毕。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet = gevent.spawn(my_coroutine, "A")
  1. join(timeout=None, raise_error=False):等待协程对象执行完毕。可以设置超时时间和是否抛出错误。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet = gevent.spawn(my_coroutine, "A")
greenlet.join()
  1. joinall(greenlets, timeout=None, raise_error=False):等待所有协程对象执行完毕。可以传递一个可迭代的协程对象列表,设置超时时间和是否抛出错误。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet1 = gevent.spawn(my_coroutine, "A")
greenlet2 = gevent.spawn(my_coroutine, "B")
gevent.joinall([greenlet1, greenlet2])
  1. sleep(seconds):暂停当前协程的执行,让出CPU时间片给其他协程。指定暂停的时间,单位为秒。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

gevent.spawn(my_coroutine, "A")
gevent.sleep(2)
  1. get():获取协程对象的返回值。如果协程还没有执行完毕,调用该方法会阻塞。

import gevent

def my_coroutine():
    gevent.sleep(1)
    return "Hello, world!"

greenlet = gevent.spawn(my_coroutine)
result = greenlet.get()
print(result)

构造协程函数

使用gevent的spawn方法可以创建并执行协程函数

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

gevent.spawn(my_coroutine, "A")
gevent.spawn(my_coroutine, "B")
gevent.joinall()

gevent.spawn()创建两个协程对象,分别传入不同的名称,然后使用gevent.joinall()等待所有协程执行完毕。

I/O操作的并发处理

gevent可以用于并发处理I/O操作,例如并发地执行网络请求。

import gevent
import requests

def fetch(url):
    response = requests.get(url)
    print(f"Fetched {url}, status code: {response.status_code}")

urls = ["https://www.bing.com", "https://www.baidu.com", "https://www.python.org"]

jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)

协程间的通信和同步

gevent提供了一些同步原语来实现协程间的通信和同步,例如Event、Semaphore和Queue等。

import gevent
from gevent.queue import Queue
​
def producer(queue):
    for i in range(1, 6):
        queue.put(f"Message {i}")
        print(f"Produced Message {i}")
        gevent.sleep(1)
​
def consumer(queue):
    while not queue.empty():
        message = queue.get()
        print(f"Consumed {message}")
        gevent.sleep(0.5)
​
queue = Queue()
producer_coroutine = gevent.spawn(producer, queue)
consumer_coroutine = gevent.spawn(consumer, queue)
gevent.joinall([producer_coroutine, consumer_coroutine])

文章作者: Vsoapmac
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 soap的会员制餐厅
python python基础
喜欢就支持一下吧