python线程安全:使用lock或其他技术来实现

Python 线程

允许您同时运行代码的各个部分,从而提高代码效率。但是,如果在不了解线程安全性的情况下将线程引入代码,则可能会遇到争用条件等问题。 可以使用锁、信号量、事件、条件和屏障等工具来解决这些问题。

Python 中的线程处理

在讨论 Python 中的线程之前,、需要知道两个相关术语:

  • 并发性:系统处理多个任务的能力,允许这些任务的执行时间重叠,但不一定同时发生。
  • 并行度:同时执行多个任务,这些任务同时运行以利用多个处理单元,通常是多个 CPU 内核。

Python 的线程是一个并发框架,允许您启动多个并发运行的线程,每个线程执行代码段。这可以提高应用程序的效率和响应能力。当运行多个线程时,Python 解释器会在它们之间切换,将执行控制权移交给每个线程。

 import threading
import time
from concurrent.futures import ThreadPoolExecutor

def threaded_function():
    for number in range(3):
        print(f"Printing from {threading.current_thread().name}. {number=}")
        time.sleep(0.1)

with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
    for _ in range(4):
        executor.submit(threaded_function)
  • 将创建四个线程来执行函数
for _ in range(4):
        executor.submit(threaded_function)
  • 并且每个工作线程都以“Worker”前缀命名,
thread_name_prefix="Worker"
  • 要运行的函数功能:循环打印分配 变量 的值 0 到 2 。

看一下结果

Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_1. number=1Printing from Worker_2. number=1Printing from Worker_3. number=1


Printing from Worker_0. number=1
Printing from Worker_1. number=2
Printing from Worker_2. number=2
Printing from Worker_0. number=2
Printing from Worker_3. number=2

属性name用于获取当前线程的名字 Worker_0 Worker_1 Worker_2 Worker_3

namethreading.current_thread()

输出中的每一行都表示来自工作线程的调用,worker thread name 后面的数字显示每个线程正在执行的循环的当前迭代。每个线程轮流执行 ,并且执行以并发方式而不是顺序方式进行。

  • Printing from Worker_3. number=2

发生这种情况是因为 Python 解释器执行上下文切换。这意味着 Python 会暂停当前线程的执行状态,并将控制权传递给另一个线程。

当上下文切换时,Python 会保存当前执行状态,以便稍后可以恢复。通过以特定间隔切换执行控制,多个线程可以并发执行代码。

您可以通过在 REPL 中键入以下内容来检查 Python 解释器的上下文切换间隔:


import sys
sys.getswitchinterval()

0.005

输出是一个以秒为单位的数字,表示 Python 解释器的上下文切换间隔。在本例中,它是 0.005 秒或 5 毫秒

可以将 switch interval 视为 Python 解释器检查它是否应切换到另一个线程的频率

此浮点值确定分配给并发运行的 Python 线程的 “timeslices” 的理想持续时间。

线程安全


线程安全是指算法或程序在多个线程同时执行期间能够正常运行的属性。如果代码在多线程环境中运行时具有确定性行为并生成所需的输出,则认为代码是线程安全的。

线程安全问题的发生有两个原因:

  1. 共享的可变数据:线程共享其父进程的内存,因此所有变量和数据结构都在线程之间共享。这可能会导致在处理共享的可更改数据时出现错误。
  2. 非原子操作:当涉及多个步骤的操作被上下文切换中断时,这些操作发生在多线程环境中。如果在操作期间切换线程,这可能会导致意外结果

其实这一点和java中的逻辑是一样的。下面一段java代码多线程修改共享变量count

public class ThreadSyn implements Runnable{
    private  volatile static int count = 0;
    @Override
    public void run() {
        synchronized(this) {
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println(Thread.currentThread().getName() + ":" + (count++));
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
} 

GIL 及其对线程的影响

Python 的全局解释器锁 (GIL) 是一种互斥锁,可保护对 Python 对象的访问,防止多个线程同时执行 Python 字节码。GIL 只允许一个线程在单个时间点执行

当一个操作在单个字节码指令中完成时,它是原子的。由于 GIL 一次只允许一个线程运行,因此这些原子操作不会受到其他线程的干扰。这可确保原子操作通常是线程安全的

以下一个一个Python多络程序修改同一个用户的数据,但是这个程序多次运行的结果是不一致的。

import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def withdraw(self, amount):
        if self.balance >= amount:
            new_balance = self.balance - amount
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance
        else:
            raise ValueError("Insufficient balance")

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(account.withdraw, 500)
    executor.submit(account.withdraw, 700)

print(f"Final account balance: {account.balance}")

Final account balance: 500

但是是这个结果是不确定的,原因在于多线程的不安全性。

Python 线程锁进行互斥来保证多线程修改的正确性

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.account_lock = threading.Lock()

    def withdraw(self, amount):
        with self.account_lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                print(f"Withdrawing {amount}...")
                time.sleep(0.1)  # Simulate a delay
                self.balance = new_balance
            else:
                raise ValueError("Insufficient balance")

    def deposit(self, amount):
        with self.account_lock:
            new_balance = self.balance + amount
            print(f"Depositing {amount}...")
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(account.withdraw, 700)
    executor.submit(account.deposit, 1000)
    executor.submit(account.withdraw, 300)

print(f"Final account balance: {account.balance}")
  • self.account_lock = threading.Lock()创建唯一锁对象
  • with self.account_lock:使用with语句自动加锁与释放
 with self.account_lock:
            new_balance = self.balance + amount



在这种情况一如何多次重复的加锁会造成死锁

如下代码

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.lock = threading.Lock()

    def deposit(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for .deposit()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for .deposit()"
            )
            time.sleep(0.1)
            self._update_balance(amount)

    def _update_balance(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for ._update_balance()"
        )
        with self.lock:  # This will cause a deadlock
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for ._update_balance()"
            )
            self.balance += amount

account = BankAccount()

with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
    for _ in range(3):
        executor.submit(account.deposit, 100)

print(f"Final balance: {account.balance}")

deposit加锁,它调用了_update_balance方法,但是这个方法对同一个对象多次加锁了。最后有可能死锁

如何解决死锁后面再讲

原文链接:,转发请注明来源!