
一、理论基础1.1 线程与进程的区别进程拥有独立的内存空间、数据栈等系统开销大进程间通信复杂。线程依附于进程共享进程的内存空间创建和切换开销小但需要自己处理共享数据的同步问题。1.2 Python 的 GIL全局解释器锁CPython 解释器中有一个 GIL同一时刻只允许一个线程执行 Python 字节码。因此Python 的多线程并不能利用多核 CPU 并行计算计算密集型任务反而可能变慢。适用场景I/O 密集型任务网络请求、文件读写、用户输入等线程在等待 I/O 时会释放 GIL从而真正并发。1.3 两个线程模块threadPython 2 中的名字Python 3 中为_thread底层、原始不推荐直接使用。threading基于_thread封装提供了更强大的Thread类和各种同步工具推荐使用。二、实操环境准备Python 3.6任何较新版本都可以任意代码编辑器VS Code、PyCharm、甚至记事本命令行 / 终端验证 Python 版本python --version # 应显示 3.x三、实操步骤 1创建并启动线程两种方式3.1 方式一函数式使用threading.Thread(target...)新建文件thread_func.py写入以下代码import threading import time def print_time(thread_name, delay, counter): 线程函数每隔 delay 秒打印一次当前时间重复 counter 次 while counter 0: time.sleep(delay) print(f{thread_name}: {time.ctime()}) counter - 1 # 创建两个线程 t1 threading.Thread(targetprint_time, args(Thread-1, 2, 5)) t2 threading.Thread(targetprint_time, args(Thread-2, 4, 5)) # 启动线程 t1.start() t2.start() # 等待两个线程都结束否则主线程可能提前退出 t1.join() t2.join() print(所有线程执行完毕主线程退出。)运行python thread_func.py预期输出时间会变化但顺序类似Thread-1: Thu Jun 11 15:42:17 2026 Thread-1: Thu Jun 11 15:42:19 2026 Thread-2: Thu Jun 11 15:42:19 2026 Thread-1: Thu Jun 11 15:42:21 2026 Thread-2: Thu Jun 11 15:42:23 2026 ...说明threading.Thread(target函数名, args元组)创建线程对象。start()使线程开始运行调用run()默认run()会执行target指定的函数。join()阻塞主线程直到被调用的线程结束避免主线程提前退出。3.2 方式二继承threading.Thread类新建文件thread_class.pyimport threading import time class MyThread(threading.Thread): def __init__(self, thread_id, name, delay, counter): super().__init__() # 调用父类初始化 self.thread_id thread_id self.name name self.delay delay self.counter counter def run(self): 线程启动后自动执行的方法 print(f开始线程{self.name}) while self.counter 0: time.sleep(self.delay) print(f{self.name}: {time.ctime()}) self.counter - 1 print(f退出线程{self.name}) # 创建线程实例 thread1 MyThread(1, Thread-1, 1, 5) thread2 MyThread(2, Thread-2, 2, 5) thread1.start() thread2.start() thread1.join() thread2.join() print(主线程结束)运行python thread_class.py效果与函数式类似但更符合面向对象风格适合需要保存更多状态或复用的场景。四、实操步骤 2线程同步使用锁当多个线程修改同一份数据时会发生“竞态条件”。下面用两个线程同时对一个全局变量加 1000000 次来演示问题然后用锁解决。4.1 无锁的错误示例新建race_condition.pyimport threading # 共享资源 counter 0 def increment(thread_name, times): global counter for _ in range(times): counter 1 # 这不是原子操作可能被线程切换中断 # 创建两个线程每个加 1000000 次 t1 threading.Thread(targetincrement, args(A, 1000000)) t2 threading.Thread(targetincrement, args(B, 1000000)) t1.start() t2.start() t1.join() t2.join() print(f期望结果: 2000000) print(f实际结果: {counter})运行几次你会发现结果往往小于 2000000如 1845214 等这就是因为线程交替执行时丢失了更新。4.2 使用锁threading.Lock修复新建with_lock.pyimport threading counter 0 lock threading.Lock() # 创建一个锁对象 def increment_safe(thread_name, times): global counter for _ in range(times): lock.acquire() # 获取锁如果已被其他线程持有则阻塞 # 以下临界区代码同一时刻只有一个线程能执行 counter 1 lock.release() # 释放锁 # 也可以使用 with 语句更推荐 def increment_safe_with(thread_name, times): global counter for _ in range(times): with lock: # 自动 acquire 和 release counter 1 t1 threading.Thread(targetincrement_safe, args(A, 1000000)) t2 threading.Thread(targetincrement_safe, args(B, 1000000)) t1.start() t2.start() t1.join() t2.join() print(f加锁后的结果: {counter}) # 稳定输出 2000000说明lock.acquire()和lock.release()之间的代码称为“临界区”一次只允许一个线程进入。使用with lock:更加简洁安全避免忘记释放锁。五、实操步骤 3线程间通信 - 队列Queuequeue.Queue是线程安全的 FIFO 队列非常适合生产者-消费者模式。示例三个工人线程处理一个任务队列新建queue_demo.pyimport threading import queue import time import random # 任务队列最大长度 5 task_queue queue.Queue(maxsize5) def producer(name, total_tasks): 生产者往队列中放任务 for i in range(total_tasks): task f任务-{i} task_queue.put(task) # 如果队列满会阻塞直到有空间 print(f{name} 生产了 {task}) time.sleep(random.uniform(0.2, 0.5)) # 生产结束后发送“毒丸”信号通知消费者退出数量等于消费者个数 for _ in range(3): task_queue.put(None) def consumer(name): 消费者从队列中取任务并处理 while True: task task_queue.get() # 如果队列空会阻塞直到有新任务 if task is None: # 收到毒丸退出 break print(f{name} 开始处理 {task}) time.sleep(random.uniform(0.3, 0.7)) # 模拟处理耗时 print(f{name} 完成 {task}) task_queue.task_done() # 告诉队列该任务已完成 # 创建线程 prod threading.Thread(targetproducer, args(生产者, 10)) cons1 threading.Thread(targetconsumer, args(消费者-1,)) cons2 threading.Thread(targetconsumer, args(消费者-2,)) cons3 threading.Thread(targetconsumer, args(消费者-3,)) # 启动消费者先启动会阻塞等待任务 cons1.start() cons2.start() cons3.start() # 启动生产者 prod.start() # 等待生产者结束 prod.join() # 等待所有任务被处理队列为空且 task_done 调用次数匹配 task_queue.join() print(所有任务处理完毕)运行bashpython queue_demo.py你会看到三个消费者并发地从队列中取出任务处理整个过程自动同步无需手动加锁。关键方法put(item)放入元素队列满时阻塞get()取出元素队列空时阻塞task_done()表示之前取出的任务已完成join()阻塞直到队列中所有元素都被task_done()确认六、实操步骤 4使用threading的其他常用功能6.1 获取当前线程信息import threading def show_info(): t threading.current_thread() print(f当前线程: {t.name}, ID: {t.ident}, 是否存活: {t.is_alive()}) t threading.Thread(targetshow_info, name我的线程) t.start() t.join()运行结果为6.2 线程枚举print(f活动线程数: {threading.active_count()}) for t in threading.enumerate(): print(t.name)6.3 守护线程Daemon Thread主线程结束时守护线程会自动被终止。适合后台监控任务。d threading.Thread(targetsome_task, daemonTrue) d.start() # 主线程结束后d 会立即被强制结束七、注意事项与最佳实践场景推荐做法避免使用_thread模块始终使用threading和queue共享可变数据使用Lock、RLock或Queue生产者-消费者模型优先使用queue.Queue不要自己用锁实现线程池使用concurrent.futures.ThreadPoolExecutor计算密集型任务改用multiprocessing模块利用多核 CPU线程间通信使用Queue或Event、Condition等八、扩展学习线程池ThreadPoolExecutor对于大量短期任务频繁创建销毁线程开销大线程池可以复用线程。from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(1) return n * n with ThreadPoolExecutor(max_workers3) as executor: results executor.map(task, [1, 2, 3, 4, 5]) print(list(results)) # [1, 4, 9, 16, 25]map方法自动将迭代器的元素分配给线程池中的线程执行非常方便