进程

multiprocessing

Python的multiprocessing模块用于实现多进程编程的标准库。它提供了创建和管理进程的功能,使得在Python中可以轻松地利用多核处理器执行并行任务。

Process

multiprocessing模块提供了Process类,用于创建新的子进程。可以通过实例化Process类并传入一个可调用对象(通常是函数)来创建进程。可调用对象将在新的进程中执行。

Process类的初始化语法如下:

Process(group, target, name, args, kwargs, daemon)

参数如下:

  1. group: 进程组。它是一个ProcessGroup 对象的可选参数。默认为None。

  2. target: 要执行的函数或方法。它是必需的参数,用于指定子进程要执行的任务。

  3. name: 进程的名称。它是一个字符串类型的可选参数。如果不提供名称,系统会自动分配一个唯一的名称给进程。

  4. args: 传递给目标函数的参数值。它是一个元组类型的可选参数。如果目标函数需要接收多个参数,可以通过这个参数传递。

  5. kwargs: 传递给目标函数的关键字参数。它是一个字典类型的可选参数。

  6. daemon: 进程是否作为守护进程运行。它是一个布尔类型的可选参数,默认值为 False。如果将守护进程标记为 True,当主进程结束时,守护进程也会随之结束。

下面是一个简单的例子,用于快速掌握Process的使用:

from multiprocessing import Process
​
def func():
    print("Hello from a child process!")
​
if __name__ == '__main__':
    p = Process(target=func)
    p.start() # 启动新的进程
    p.join() # 等待子进程执行完成后才执行下面的语句
    p.kill() # 杀死子进程

以下介绍常见的可调用的Process方法:

方法

说明

start()

启动进程,调用进程中的run()方法。

run()

进程启动时运行的方法,正是它去调用target指定的函数,若自定义线程类,该类一定要实现该方法

terminate()

强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了子进程,那么该子进程在其强制结束后变为僵尸进程;如果该进程还保存了一个锁那么也将不会被释放,进而导致死锁。使用时,要注意。

is_alive()

判断某进程是否存活,存活返回True,否则False。

join(timeout)

主线程等待子线程终止。timeout为可选择超时时间;需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

以下为Process方法中常见的属性:

方法

说明

daemon

默认值为False,如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前

name

进程名称。

pid

进程pid

exitcode

进程运行时为None,如果为-N,表示被信号N结束了。

authkey

进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络连接的底层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功

以下为几个常见的例子:

  1. 并行计算任务: 将一个计算密集型任务拆分成多个子任务,并使用多个进程并行执行。这可以显著提高计算速度。例如,计算素数的示例:

import multiprocessing
​
def is_prime(num):
    """检查一个数字是否为素数"""
    if num < 2:
        return False
    for i in range(2, int(num**0.5) + 1):
        if num % i == 0:
            return False
    return True
​
if __name__ == '__main__':
    numbers = [2, 3, 4, 5, 6, 7, 8, 9, 10]
    processes = []
    
    for number in numbers:
        p = multiprocessing.Process(target=is_prime, args=(number,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
  1. 并行执行独立的任务: 使用多个进程同时执行独立的任务,提高整体处理速度。例如,使用进程下载多个文件:

import multiprocessing
import urllib.request
​
def download_file(url, filename):
    """下载文件"""
    urllib.request.urlretrieve(url, filename)
    print(f'Downloaded: {filename}')
​
if __name__ == '__main__':
    urls = [
        ('http://example.com/file1.txt', 'file1.txt'),
        ('http://example.com/file2.txt', 'file2.txt'),
        ('http://example.com/file3.txt', 'file3.txt')
    ]
    processes = []
    
    for url, filename in urls:
        p = multiprocessing.Process(target=download_file, args=(url, filename))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

进程通信(Queue/Pipe)

多个进程之间需要进行数据交换和通信时,multiprocessing模块提供了多种方式。其中最常用的是使用队列(Queue)和管道(Pipe)。

队列可以通过multiprocessing.Queue类来创建,进程可以使用put()方法将数据放入队列,而其他进程可以使用get()方法从队列中获取数据。

使用multiprocessing.Queue()创建一个空的队列对象。

以下是使用队列进行进程间通信的示例:

from multiprocessing import Process, Queue
​
def worker(q):
    data = q.get()
    print("Received data:", data)
​
if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
​
    q.put("Hello from the main process!")
    p.join()

下面是Queue类的一些常用方法:

方法

说明

put(obj, block, timeout)

将一个项目放入队列。如果队列已满且block参数为True,则会阻塞直到队列有空间可用。如果block参数为False,则会立即引发Full异常。timeout参数指定在阻塞模式下等待的时间(以秒为单位)。

get(block, timeout)

从队列中获取并移除一个项目。如果队列为空且block参数为True,则会阻塞直到队列有项目可用。如果block参数为False,则会立即引发Empty异常。timeout参数指定在阻塞模式下等待的时间(以秒为单位)。

empty()

判断队列是否为空。

full()

判断队列是否已满。

qsize()

返回队列中当前项目的数量。

close()

关闭队列。关闭后,不能再往队列中放入或获取项目。

join_thread()

等待队列关联的线程终止。这对于确保所有数据都被处理完毕非常有用。

除了队列,multiprocessing模块还提供了Pipe类,它提供了一种双向通信通道,可以用于在两个进程之间传递数据。

以下是使用管道进行进程间通信的示例:

import multiprocessing
​
def sender(pipe):
    """发送数据的进程"""
    message = 'Hello from sender!'
    pipe.send(message)  # 发送数据到管道
    pipe.close()  # 关闭管道连接
​
def receiver(pipe):
    """接收数据的进程"""
    message = pipe.recv()  # 从管道接收数据
    print(f'Message received: {message}')
    pipe.close()  # 关闭管道连接
​
if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()  # 创建管道
    
    # 创建发送数据的进程
    sender_process = multiprocessing.Process(target=sender, args=(parent_conn,))
    
    # 创建接收数据的进程
    receiver_process = multiprocessing.Process(target=receiver, args=(child_conn,))
    
    # 启动进程
    sender_process.start()
    receiver_process.start()
    
    # 等待进程执行完毕
    sender_process.join()
    receiver_process.join()

Pipe的初始化语法如下:

multiprocessing.Pipe(duplex)

改语法用于创建一个管道对象。duplex参数指定管道的类型,如果duplex为True,则为全双工管道,可以同时进行读取和写入;如果duplex为False,则为半双工管道,只能在一个方向上进行读取或写入。

下面是Pipe类的一些常用方法:

方法

说明

send(obj)

向管道发送一个对象。

recv()

从管道接收一个对象。

close()

关闭管道。关闭后,不能再进行发送或接收操作。

poll(timeout)

检查管道是否有数据可接收,如果有则返回True,否则返回False。timeout参数指定在非阻塞模式下等待的时间(以秒为单位)。

fileno()

返回管道的文件描述符。在某些情况下,可以使用该文件描述符进行更高级的操作。

进程池(Pool)

在某些情况下,我们可能需要同时创建多个进程来执行任务。multiprocessing模块提供了Pool类,它可以方便地创建进程池。

使用进程池可以有效地管理多个进程的创建和执行,尤其在处理大量数据或需要执行密集计算的场景中特别有用。

Pool类的初始化语法如下:

multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild)

以下是multiprocessing.Pool()函数中参数的作用:

  1. processes:指定进程池中的进程数。默认值为None,表示使用系统的 CPU 核心数。可以设置一个整数值来限制进程池中的进程数。

  2. initializer:指定一个可调用对象,在每个进程启动时调用。可以用于执行一些初始化操作。

  3. initargs:传递给initializer函数的参数,以元组的形式提供。

  4. maxtasksperchild:指定每个工作进程在完成一定数量的任务后将会被替换。默认值为None,表示工作进程将一直存在。可以使用一个整数值来设置任务数的阈值。

以下是创建进程池的示例:

import multiprocessing

def initialize(arg1, arg2):
    # 使用arg1和arg2执行初始化操作的代码
    print(arg1, arg2)
    
# 创建进程数为4的进程池,并在每个进程启动时调用initialize函数,并传递参数。每个工作进程完成10个任务后将被替换
pool = multiprocessing.Pool(processes=4, initializer=initialize, initargs=(1, 2), maxtasksperchild=10)

multiprocessing.Pool()函数常见的方法如下:

方法

说明

apply()

提交一个任务到进程池,并阻塞地等待任务执行完成,然后返回结果。

apply_async()

提交一个任务到进程池,并非阻塞地等待任务执行完成。返回一个AsyncResult对象,可以使用get()方法获取任务的结果。

map()

并行处理一个可迭代对象中的多个任务,并返回结果列表。任务将按顺序执行,直到完成。

imap()

并行处理一个可迭代对象中的多个任务,并返回一个迭代器,可以逐个获取任务的结果。任务将按顺序执行。

imap_unordered()

与imap()方法类似,但是任务的结果将按照完成的顺序返回,而不是按照提交的顺序。

close()

停止接受新的任务。已提交但尚未执行的任务将继续执行。

join()

阻塞地等待所有任务执行完成,然后关闭进程池。

  1. 使用apply_async()提交任务并获取结果

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    result = pool.apply_async(square, (5,))
    print(result.get())  # 输出: 25
    pool.close()
    pool.join()
  1. 使用map()方法并行处理多个任务

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.map(square, numbers)
    print(results)  # 输出: [1, 4, 9, 16, 25]
    pool.close()
    pool.join()
  1. 使用imap()方法迭代获取任务结果

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.imap(square, numbers)
    for result in results:
        print(result)  # 逐个输出: 1, 4, 9, 16, 25
    pool.close()
    pool.join()

进程池(ProcessPoolExecutor)

上面使用了Pool去创建进程池,从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor(进程池)ProcessPoolExecutor(进程池)两个类。

下面是一个使用ProcessPoolExecutor创建进程池的示例:

import concurrent.futures

# 定义一个任务函数
def task(name):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(2)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    # 提交任务给进程池
    future1 = executor.submit(task, "Task 1")
    future2 = executor.submit(task, "Task 2")
    future3 = executor.submit(task, "Task 3")

    # 获取任务的返回值
    result1 = future1.result()
    result2 = future2.result()
    result3 = future3.result()

    print(f"任务返回值:{result1}, {result2}, {result3}")
  1. 使用with语句 ,通过ProcessPoolExecutor构造实例,同时传入max_workers参数来设置进程池中最多能同时运行的进程数目。

  2. 使用submit函数来提交进程需要执行的任务到进程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

  3. 通过使用done()方法判断该任务是否结束。

  4. 使用result()方法可以获取任务的返回值。

在进行submit的时候,即使提交了超过max_workers标识的最大运行进程数量,其他进程会等待这些进程运行完成,详情看如下代码:

from concurrent.futures import ThreadPoolExecutor

# 定义一个任务函数
def task(name, wait_time):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(wait_time)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建进程池
with ThreadPoolExecutor(max_workers=20) as executor:
    # 提交任务给进程池
    Thread_list = [executor.submit(task, i, 2) for i in range(100)]

result_list = []
for thread in Thread_list:
    result_list.append(thread.result())

print(f"任务返回值:{result_list}")

可以看到,我们提交了100个进程,但是实际运行的时候是20个进程,其他进程会等待这20个进程运行完毕后再运行。

虽然进程的运行的顺序是随机的,但是上面代码中,我们用list封装之后,返回的结果依然是按照我们想要的顺序去返回。这样子就可以保证多个进程并发运行后依然可以按照顺序得到我们想要的结果。

锁(Lock)

在多进程编程中,多个进程可能会同时访问共享的资源,为了避免竞争条件和数据不一致问题,可以使用锁(Lock)来同步进程的访问。

multiprocessing模块提供了Lock类,可以使用multiprocessing.Lock()创建锁对象,并在需要时使用acquire()方法获取锁,使用release()方法释放锁。

以下是使用锁的示例:

from multiprocessing import Process, Lock

def func(lock):
    lock.acquire()
    try:
        print("Hello from a locked process!")
    finally:
        lock.release()

if __name__ == '__main__':
    lock = Lock()
    processes = []

    for _ in range(5):
        p = Process(target=func, args=(lock,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

在子进程中,使用lock.acquire()获取锁,然后执行需要同步的代码,最后使用lock.release()释放锁。通过使用锁,可以确保一次只有一个进程可以访问被锁定的代码块,从而避免数据竞争和不一致性。

线程

threading

threading是Python标准库中用于多线程编程的模块。它提供了线程相关的类和函数,使得在Python中可以方便地创建和管理多线程应用程序。

Thread

Thread类是threading模块中最重要的类之一,用于表示一个线程。通过创建Thread类的实例,可以在程序中启动一个新线程。

创建Thread的语法如下:

Thread(target, args, kwargs)
  1. target: 指定线程要执行的函数

  2. argskwargs: 可选参数,用于传递给目标函数的参数。

threading的简单使用举例如下:

import threading
import time

def print_numbers(count):
    for i in range(count):
        print(i)
        time.sleep(1)

# 创建线程实例
thread = threading.Thread(target=print_numbers, args=(3,))

# 启动线程
thread.start()

# 等待线程执行完成
thread.join()

print("Thread execution completed.")

下面是Thread类常见的方法,注意,在python中线程并不能手动杀死,只能等待其自行结束

方法

说明

start()

启动线程,并开始执行目标函数。

join(timeout=None)

等待线程执行完成。默认情况下,join()方法将阻塞调用线程,直到被调用线程执行完成。可以设置timeout参数来指定等待的时间。

is_alive()

检查线程是否正在执行。

下面是Thread类常见的属性:

属性

说明

name

表示线程的名称。

ident

表示线程的标识符。

daemon

表示表示线程是否为守护线程

线程通信

下面是一些常用的线程通信机制:

共享变量:多个线程可以访问和修改同一个全局变量或对象,通过对变量的读写操作来进行通信。但是需要注意的是,共享变量可能存在竞争条件(race condition),需要采取同步机制(如锁)来保证线程安全。

import threading

# 共享变量
shared_variable = 0
lock = threading.Lock()

def increment():
    global shared_variable
    lock.acquire()
    try:
        shared_variable += 1
    finally:
        lock.release()

def decrement():
    global shared_variable
    lock.acquire()
    try:
        shared_variable -= 1
    finally:
        lock.release()

# 创建线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print(shared_variable)

队列(Queue):queue.Queue提供了一个线程安全的队列实现,可以用于在线程之间传递数据。线程可以往队列中放入数据,也可以从队列中获取数据。这种方式是线程安全的,无需手动添加锁机制。

import threading
import queue

# 创建队列
shared_queue = queue.Queue()

def producer():
    for i in range(5):
        shared_queue.put(i)

def consumer():
    while True:
        item = shared_queue.get()
        if item is None:
            break
        print(item)

# 创建线程
thread1 = threading.Thread(target=producer)
thread2 = threading.Thread(target=consumer)

# 启动线程
thread1.start()
thread2.start()

# 等待生产者线程结束
thread1.join()

# 队列中放入停止标志
shared_queue.put(None)

# 等待消费者线程结束
thread2.join()

事件(Event):threading.Event提供了一个简单的线程间通信方式,一个线程可以等待事件的触发,另一个线程可以触发事件。通过事件可以实现线程的同步和协作。

import threading

# 创建事件
event = threading.Event()

def wait_for_event():
    print("等待事件触发")
    event.wait()  # 等待事件
    print("事件已触发")

def trigger_event():
    import time
    time.sleep(3)  # 延迟3秒
    print("触发事件")
    event.set()  # 触发事件

# 创建线程
thread1 = threading.Thread(target=wait_for_event)
thread2 = threading.Thread(target=trigger_event)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

线程池(ThreadPoolExecutor)

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor(线程池)ProcessPoolExecutor(进程池)两个类。

下面是一个使用ThreadPoolExecutor创建线程池的示例:

import concurrent.futures

# 定义一个任务函数
def task(name):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(2)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务给线程池
    future1 = executor.submit(task, "Task 1")
    future2 = executor.submit(task, "Task 2")
    future3 = executor.submit(task, "Task 3")

    # 获取任务的返回值
    result1 = future1.result()
    result2 = future2.result()
    result3 = future3.result()

    print(f"任务返回值:{result1}, {result2}, {result3}")
  1. 使用with语句 ,通过ThreadPoolExecutor构造实例,同时传入max_workers参数来设置线程池中最多能同时运行的线程数目。

  2. 使用submit函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

  3. 通过使用done()方法判断该任务是否结束。

  4. 使用result()方法可以获取任务的返回值。

在进行submit的时候,即使提交了超过max_workers标识的最大运行线程数量,其他线程会等待这些进程运行完成,详情看如下代码:

from concurrent.futures import ThreadPoolExecutor

# 定义一个任务函数
def task(name, wait_time):
    print(f"任务 {name} 开始执行")
    import time
    time.sleep(wait_time)  # 模拟任务执行时间
    print(f"任务 {name} 执行完成")
    return name

# 创建线程池
Thread_list = []
with ThreadPoolExecutor(max_workers=20) as executor:
    # 提交任务给线程池
    for i in range(100):
        Thread_list.append(executor.submit(task, i, 2))

result_list = []
for thread in Thread_list:
    result_list.append(thread.result())

print(f"任务返回值:{result_list}")

可以看到,我们提交了100个线程,但是实际运行的时候是20个线程,其他线程会等待这20个线程运行完毕后再运行。

虽然线程的运行的顺序是随机的,但是上面代码中,我们用list封装之后,返回的结果依然是按照我们想要的顺序去返回。这样子就可以保证多个线程并发运行后依然可以按照顺序得到我们想要的结果。

线程锁(Thread Lock)是一种线程同步机制,用于确保在多个线程同时访问共享资源时,通过使用线程锁,可以确保同一时间只有一个线程可以访问临界区(Critical Section)代码,从而保证了共享资源的安全性,避免并发访问导致的数据不一致或竞态条件(Race Condition)问题。

但需要注意的是,过度使用线程锁可能导致线程的串行执行,从而降低并发性能。因此,在设计并发程序时,需要权衡锁的使用和粒度,避免不必要的锁竞争。

在Python中,可以使用threading.Lock类来创建一个线程锁对象,通过调用锁对象的acquire方法获取锁,在临界区代码执行完成后调用release方法释放锁。

下面是一个例子,用于展示threading.Lock类的使用。

import threading

# 共享变量
shared_variable = 0
lock = threading.Lock()

def increment():
    global shared_variable
    lock.acquire()  # 获取锁
    try:
        shared_variable += 1
    finally:
        lock.release()  # 释放锁

def decrement():
    global shared_variable
    lock.acquire()  # 获取锁
    try:
        shared_variable -= 1
    finally:
        lock.release()  # 释放锁

# 创建线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print(shared_variable)

协程

Python协程(coroutine)是一种轻量级的并发编程技术,它可以在单线程中实现多个函数(或者子程序)之间的协作。使用协程可以实现高效的异步编程和并发任务处理。协程的特点是可以方便地在函数之间切换执行,并且不需要像线程或进程那样引入显式的锁或同步机制来处理并发访问问题。通过使用协程,可以更好地利用计算资源,提高程序的性能和效率。

asyncio

Python的协程可以通过asyncio模块提供支持。协程函数使用async关键字定义,内部使用await关键字来挂起执行,让出控制权给其他协程或者主程序,等待某些操作完成后再继续执行。asyncio用于编写基于事件循环(event loop)和协程(coroutine)的并发代码。它可以处理异步I/O操作、并发任务和网络通信等事务。

以下是asyncio的主要组件和功能:

  • 事件循环(Event Loop):asyncio基于事件循环模型,通过一个单线程的事件循环来调度协程的执行。事件循环负责处理并发任务的调度和事件的分发,协程在事件循环中注册并等待执行。

  • 协程(Coroutine):使用asyncawait关键字定义的异步函数。协程是可以被挂起和恢复的轻量级执行单元,可以在执行过程中让出控制权并等待异步操作完成。协程之间可以方便地切换执行,提供了一种高效的并发编程模型。

  • 异步I/O操作:asyncio提供了各种用于异步I/O操作的工具和API。例如,asyncio.open()可以异步地打开文件,asyncio.wait()可以等待多个异步任务完成,asyncio.sleep()可以创建一个暂停执行的延时操作等等。

  • 协程调度器(Coroutine Scheduler):事件循环作为协程的调度器,负责协程的调度和执行。它根据协程的状态和I/O可用性来决定哪个协程继续执行,哪个协程被挂起等待。调度器还处理协程之间的通信和同步。

  • 异步上下文管理器(Asynchronous Context Manager):asyncio提供了async withasync for语法来支持异步上下文管理器,类似于常规上下文管理器的功能,但在异步代码中更适用。

举个简单的例子:

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("World")

async def main():
    await asyncio.gather(hello(), hello(), hello())

asyncio.run(main())

在定义协程函数时,使用async关键字定义协程函数,并在需要的地方使用await关键字进行挂起操作。

通过asyncio.run()函数来运行main()协程函数,它会创建一个事件循环(event loop)并执行协程。协程函数的执行是非阻塞的,因此在执行await asyncio.sleep(1)的时候不会阻塞主程序的其他任务。

需要注意的是,协程只在异步上下文中才能运行,例如在使用asyncio库的情况下。在其他情况下,协程函数的执行将退化为普通函数的执行,不能实现真正的并发。

接下来我们可以再举几个常用的例子来深入认识asyncio

并发任务处理

asyncio可以处理并发任务,提高程序的执行效率。通过asyncio.gather()可以同时运行多个协程任务。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(1)
    print("Task 1 completed")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(2)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

异步文件操作

使用asyncio实现异步文件的读写操作。通过asyncio.open()的文件读写方法,可以在进行文件操作时不阻塞主程序的执行。

import asyncio

async def read_file(filename):
    try:
        async with asyncio.open(filename, "r") as f:
            contents = await f.read()
            print(f"Read contents: {contents}")
    except FileNotFoundError:
        print(f"File {filename} not found")

async def write_file(filename, contents):
    async with asyncio.open(filename, "w") as f:
        await f.write(contents)
        print(f"File {filename} written")

async def main():
    await write_file("test.txt", "Hello, world!")
    await read_file("test.txt")

asyncio.run(main())

异步HTTP请求

asyncio可以执行异步的HTTP请求,从而提高网络通信的效率。可以使用aiohttp库(使用pip install aiohttp安装该第三方库)来进行HTTP请求的管理和处理。

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "https://api.example.com"
    response = await fetch(url)
    print(response)

asyncio.run(main())

定时任务

asyncio可以执行定时任务,例如定期执行某个协程函数或者执行一些定时操作。可以使用asyncio.sleep()asyncio.ensure_future()来实现定时任务。

import asyncio

async def periodic_task():
    while True:
        print("Periodic task executed")
        await asyncio.sleep(1)

async def main():
    task = asyncio.ensure_future(periodic_task())
    await asyncio.sleep(5)
    task.cancel()

asyncio.run(main())

Gevent

gevent是一个基于greenlet的第三方库,提供了对协程的支持和并发编程的能力。它允许开发者使用类似于标准线程的方式来编写异步代码,从而简化并发编程的复杂性。

安装

pip install gevent

常用API

  1. spawn(func, *args, **kwargs):创建一个协程对象并执行指定的函数。可以传递额外的参数给函数。返回一个Greenlet对象,可以用于等待协程执行完毕。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet = gevent.spawn(my_coroutine, "A")
  1. join(timeout=None, raise_error=False):等待协程对象执行完毕。可以设置超时时间和是否抛出错误。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet = gevent.spawn(my_coroutine, "A")
greenlet.join()
  1. joinall(greenlets, timeout=None, raise_error=False):等待所有协程对象执行完毕。可以传递一个可迭代的协程对象列表,设置超时时间和是否抛出错误。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

greenlet1 = gevent.spawn(my_coroutine, "A")
greenlet2 = gevent.spawn(my_coroutine, "B")
gevent.joinall([greenlet1, greenlet2])
  1. sleep(seconds):暂停当前协程的执行,让出CPU时间片给其他协程。指定暂停的时间,单位为秒。

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

gevent.spawn(my_coroutine, "A")
gevent.sleep(2)
  1. get():获取协程对象的返回值。如果协程还没有执行完毕,调用该方法会阻塞。

import gevent

def my_coroutine():
    gevent.sleep(1)
    return "Hello, world!"

greenlet = gevent.spawn(my_coroutine)
result = greenlet.get()
print(result)

构造协程函数

使用gevent的spawn方法可以创建并执行协程函数

import gevent

def my_coroutine(name):
    print(f"Coroutine {name} started")
    gevent.sleep(1)
    print(f"Coroutine {name} completed")

gevent.spawn(my_coroutine, "A")
gevent.spawn(my_coroutine, "B")
gevent.joinall()

gevent.spawn()创建两个协程对象,分别传入不同的名称,然后使用gevent.joinall()等待所有协程执行完毕。

I/O操作的并发处理

gevent可以用于并发处理I/O操作,例如并发地执行网络请求。

import gevent
import requests

def fetch(url):
    response = requests.get(url)
    print(f"Fetched {url}, status code: {response.status_code}")

urls = ["https://www.bing.com", "https://www.baidu.com", "https://www.python.org"]

jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)

协程间的通信和同步

gevent提供了一些同步原语来实现协程间的通信和同步,例如Event、Semaphore和Queue等。

import gevent
from gevent.queue import Queue
​
def producer(queue):
    for i in range(1, 6):
        queue.put(f"Message {i}")
        print(f"Produced Message {i}")
        gevent.sleep(1)
​
def consumer(queue):
    while not queue.empty():
        message = queue.get()
        print(f"Consumed {message}")
        gevent.sleep(0.5)
​
queue = Queue()
producer_coroutine = gevent.spawn(producer, queue)
consumer_coroutine = gevent.spawn(consumer, queue)
gevent.joinall([producer_coroutine, consumer_coroutine])
文章作者: Vsoapmac
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 soap的会员制餐厅
python python基础
喜欢就支持一下吧