前言
本文将和大家一起探讨python的多线程并发编程(上篇),使用内置基本库threading
来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。
本文为python并发编程的第七篇,上一篇文章地址如下:
python:并发编程(六)_Lion King的博客-CSDN博客
下一篇文章地址如下:
python:并发编程(八)_Lion King的博客-CSDN博客
一、快速开始
官方文档:threading --- 基于线程的并行 — Python 3.11.4 文档
1、线程本地数据
threading
模块提供了线程本地数据(Thread-local data)的功能,可以让每个线程拥有自己独立的数据空间,不同线程之间的数据互不干扰。
可以使用threading.local()
函数创建一个线程本地数据对象。每个线程通过访问这个对象来获取和设置自己独立的数据。每个线程对这个对象的操作都是针对自己的数据空间,不会影响其他线程。
下面是一个简单的示例:
import threading
# 创建线程本地数据对象
local_data = threading.local()
def set_data(value):
# 设置线程本地数据
local_data.value = value
def get_data():
# 获取线程本地数据
return local_data.value
def worker():
# 在各个线程中设置和获取线程本地数据
set_data(threading.current_thread().name)
print(get_data())
# 创建多个线程并启动
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
# 等待所有线程执行完成
for t in threads:
t.join()
每个线程在执行worker
函数时,会将当前线程的名称作为数据设置到线程本地数据对象中,并在之后获取并打印出来。由于每个线程都有自己独立的数据空间,所以每个线程获取到的数据都是自己设置的,互不干扰。
2、线程对象
from threading import Thread
# 创建一个新的线程,并指定目标函数为print,参数为1
t = Thread(target=print, args=[1])
# 启动新线程,开始执行目标函数
t.start()
start()可以使用run()
方法代替,然而,需要注意的是,调用run()
方法并不会启动一个新线程来执行目标函数,而是在当前线程中直接调用目标函数。所以,要启动一个新线程并执行目标函数,应该使用start()
方法,而不是run()
方法。
在Python中,threading
模块提供了Thread
类来创建和操作线程对象。线程对象用于执行并发的代码块,使得多个代码块可以同时执行。
以下是一个简单的示例:
import threading
# 线程执行的目标函数
def worker():
print("Thread executing")
# 创建线程对象
t = threading.Thread(target=worker)
# 启动线程
t.start()
# 等待线程执行完成
t.join()
print("Thread finished")
上述代码创建了一个线程对象t
,指定了目标函数worker
作为线程要执行的内容。然后通过调用start()
方法来启动线程,线程开始执行目标函数中的代码块。在示例中,目标函数只是简单地打印一条消息。
使用join()
方法可以让主线程等待子线程执行完成。这样确保主线程在子线程完成后再继续执行。最后,主线程打印一条消息来表示线程执行结束。
通过Thread
类,你可以根据需要创建多个线程对象来执行并发的任务。每个线程对象都有自己的执行上下文,包括线程的状态、代码指针等。你可以使用线程对象的方法来控制线程的行为,如启动、暂停、终止等。
需要注意的是,线程对象不是直接调用目标函数,而是通过start()
方法启动线程,由Python解释器在适当的时候调用目标函数。这样可以确保线程的正确启动和执行。
另外,线程对象还提供了其他一些方法和属性,如is_alive()
用于检查线程是否处于活动状态,name
属性用于获取线程的名称等。通过这些方法和属性,可以更加灵活地控制和管理线程的行为。
3、锁对象
在threading
模块中,锁对象用于控制多个线程之间对共享资源的访问。通过锁对象,可以确保在任意时刻只有一个线程可以访问共享资源,从而避免竞争条件和数据不一致的问题。
在Python中,可以使用Lock
类来创建锁对象。下面是一个简单的示例:
import threading
# 创建锁对象
lock = threading.Lock()
# 共享资源
shared_resource = 0
# 线程函数
def worker():
global shared_resource
# 获取锁
lock.acquire()
try:
# 访问共享资源
shared_resource += 1
finally:
# 释放锁
lock.release()
# 创建多个线程并启动
threads = []
for _ in range(5):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# 等待所有线程执行完成
for t in threads:
t.join()
print("Shared resource:", shared_resource)
在示例中,我们创建了一个锁对象lock
,用于控制对shared_resource
这个共享变量的访问。在每个线程的执行过程中,通过调用acquire()
方法来获取锁,这样只有一个线程能够成功获取锁,其他线程会被阻塞。然后在try...finally
块中访问共享资源,并在最后使用release()
方法释放锁,以便其他线程可以继续访问。
通过使用锁对象,我们确保了对共享资源的安全访问,避免了多个线程同时修改共享资源导致的数据不一致问题。只有获取到锁的线程才能访问共享资源,其他线程在等待期间将被阻塞。这种机制确保了线程之间的同步和互斥。
需要注意的是,为了避免死锁情况的发生,我们应该遵循一定的锁使用规则,如避免在持有锁的情况下调用阻塞的操作,避免嵌套使用锁等。合理地使用锁对象可以确保线程间的数据一致性和并发安全性。
4、递归锁对象
在threading
模块中,除了普通的锁对象Lock
外,还提供了递归锁对象RLock
,也称为可重入锁。
递归锁和普通锁类似,用于控制多个线程对共享资源的访问。但与普通锁不同的是,递归锁允许同一个线程多次获取同一个锁,而不会发生死锁。
递归锁在同一个线程多次获取锁时会维护一个锁的层数。每次线程成功获取锁时,层数加1;每次释放锁时,层数减1。只有当锁的层数为0时,其他线程才能获取该锁。
下面是一个简单的示例:
import threading
# 创建递归锁对象
lock = threading.RLock()
# 共享资源
shared_resource = 0
# 递归函数
def recursive_function(count):
global shared_resource
# 获取递归锁
lock.acquire()
try:
if count > 0:
shared_resource += 1
recursive_function(count - 1)
finally:
# 释放递归锁
lock.release()
# 创建并启动线程
t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()
print("Shared resource:", shared_resource)
在示例中,我们创建了一个递归锁对象lock
,然后定义了一个递归函数recursive_function
。在递归函数中,通过调用acquire()
方法获取递归锁,并在递归过程中多次调用自身。递归函数在每次调用时会增加shared_resource
的值,直到递归结束。
由于使用的是递归锁,同一个线程可以多次获取锁而不会发生死锁。在每次递归调用时,线程会成功获取锁,并在递归结束后释放锁。这样可以保证线程对共享资源的安全访问。
需要注意的是,递归锁的使用需要谨慎,确保在每次获取锁后都会相应地释放锁,避免层数不平衡导致的死锁情况。递归锁主要用于处理复杂的嵌套锁场景,确保线程在多层嵌套中能够正确获取和释放锁。
5、条件对象
在threading
模块中,条件对象(Condition
)是一种用于线程间通信和同步的高级工具。条件对象提供了一种机制,允许一个或多个线程等待某个条件的发生,当条件满足时,线程可以继续执行。
条件对象基于锁对象(Lock
)实现,并与锁对象配合使用。它提供了wait()
、notify()
和notify_all()
等方法,用于线程之间的等待和通知。
以下是条件对象的基本用法示例:
import threading
# 创建条件对象和关联的锁对象
condition = threading.Condition()
# 共享资源
shared_resource = []
# 生产者函数
def producer():
global shared_resource
with condition:
# 检查条件是否满足,如果满足则等待
while shared_resource:
condition.wait()
# 生产一个新的元素
new_item = len(shared_resource) + 1
shared_resource.append(new_item)
# 通知等待的消费者线程
condition.notify()
# 消费者函数
def consumer():
global shared_resource
with condition:
# 检查条件是否满足,如果不满足则等待
while not shared_resource:
condition.wait()
# 消费最后一个元素
consumed_item = shared_resource.pop()
# 通知等待的生产者线程
condition.notify()
# 打印消费的元素
print("Consumed item:", consumed_item)
# 创建并启动生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
上述代码实现了一个简单的生产者-消费者模型,使用了threading.Condition
对象来进行线程间的同步和通信。
代码中的condition = threading.Condition()
创建了一个条件对象和关联的锁对象。
生产者函数producer()
和消费者函数consumer()
都使用了with condition:
语句来获取条件对象的锁,并在块内执行线程的操作。
在生产者函数中,首先检查共享资源shared_resource
是否为空,如果为空,则调用condition.wait()
进入等待状态,直到有消费者线程通知生产者线程继续执行。然后,生产一个新的元素,并将其添加到共享资源中,最后调用condition.notify()
通知等待的消费者线程。
在消费者函数中,也是首先检查共享资源是否为空,如果为空,则调用condition.wait()
进入等待状态,直到有生产者线程通知消费者线程继续执行。然后,从共享资源中取出最后一个元素,调用condition.notify()
通知等待的生产者线程,并打印消费的元素。
最后,创建并启动生产者和消费者线程,通过调用start()
方法启动线程,并使用join()
方法等待线程执行完成。文章来源:https://uudwc.com/A/vPoa
这样,生产者和消费者线程就可以通过条件对象和关联的锁对象进行同步和通信,确保在共享资源不满足特定条件时线程进入等待状态,并在条件满足时唤醒等待的线程继续执行。文章来源地址https://uudwc.com/A/vPoa