Java 多线程详解
Thread、Runnable、Callable、线程同步、线程池、并发工具类完整教程
目录
一、线程基础
1.1 进程与线程
进程(Process)
- 定义:程序的一次执行过程,是系统进行资源分配的基本单位
- 特点:拥有独立的内存空间、代码、数据和系统资源
- 隔离性:进程之间相互独立,一个进程崩溃不会影响其他进程
线程(Thread)
- 定义:进程中的一个执行单元,是 CPU 调度的基本单位
- 特点:共享进程的内存空间和资源
- 轻量级:线程切换开销远小于进程切换
进程 vs 线程对比:
| 维度 | 进程 | 线程 |
|---|---|---|
| 资源分配 | 独立的内存空间 | 共享进程内存 |
| 通信方式 | IPC(管道、消息队列等) | 直接读写共享变量 |
| 创建开销 | 大(需要分配资源) | 小(共享资源) |
| 切换开销 | 大(需要切换地址空间) | 小(只切换上下文) |
| 影响范围 | 崩溃不影响其他进程 | 崩溃导致整个进程终止 |
| 数据共享 | 困难(需要IPC) | 容易(共享内存) |
| 适用场景 | 独立应用程序 | 同一应用内的并发任务 |
为什么需要多线程?
- ✅ 提高程序响应速度:UI线程不被阻塞
- ✅ 充分利用CPU:多核CPU并行执行
- ✅ 提高资源利用率:IO等待时CPU可执行其他任务
- ✅ 简化程序结构:将复杂任务分解为多个独立线程
1.2 并发与并行
并发(Concurrency)
- 定义:多个任务在同一时间段内交替执行
- 场景:单核CPU通过时间片轮转实现
- 特点:宏观上同时进行,微观上交替执行
- 示例:一个人同时处理多个任务(切换处理)
并行(Parallelism)
- 定义:多个任务在同一时刻真正同时执行
- 场景:多核CPU同时执行多个线程
- 特点:真正的同时执行
- 示例:多个人同时处理不同任务
并发(单核):
时间 → [任务A] [任务B] [任务A] [任务C] [任务B] ...
↑ 快速切换,看起来像同时执行
并行(多核):
核心1: [任务A] [任务A] [任务A] [任务A] ...
核心2: [任务B] [任务B] [任务B] [任务B] ...
核心3: [任务C] [任务C] [任务C] [任务C] ...
↑ 真正同时执行
并发 vs 并行对比:
| 维度 | 并发 | 并行 |
|---|---|---|
| 执行方式 | 交替执行 | 同时执行 |
| CPU要求 | 单核或多核 | 必须多核 |
| 目的 | 提高响应速度 | 提高吞吐量 |
| 实现 | 时间片轮转 | 多个CPU核心 |
1.3 多线程的挑战
优势:
- ✅ 提高程序性能
- ✅ 改善用户体验
- ✅ 充分利用硬件资源
挑战:
- ⚠️ 线程安全问题:多个线程访问共享数据可能导致数据不一致
- ⚠️ 死锁问题:多个线程相互等待对方释放资源
- ⚠️ 性能开销:线程创建、切换、同步都有开销
- ⚠️ 调试困难:并发bug难以重现和调试
- ⚠️ 复杂性增加:需要考虑同步、通信、资源竞争等问题
二、创建线程
2.1 继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
t1.setName("线程1");
t2.setName("线程2");
t1.start(); // 启动线程
t2.start();
}
}
2.2 实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
}
public class Main {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
Thread t1 = new Thread(runnable, "线程1");
Thread t2 = new Thread(runnable, "线程2");
t1.start();
t2.start();
}
}
2.3 实现Callable接口
import java.util.concurrent.*;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
}
public class Main {
public static void main(String[] args) throws Exception {
MyCallable callable = new MyCallable();
FutureTask<Integer> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
thread.start();
// 获取返回值(会阻塞)
Integer result = task.get();
System.out.println("结果: " + result);
}
}
2.4 Lambda表达式创建
public class Main {
public static void main(String[] args) {
// 方式1:Runnable
new Thread(() -> {
System.out.println("Lambda线程执行");
}).start();
// 方式2:Callable
FutureTask<String> task = new FutureTask<>(() -> {
return "Lambda返回值";
});
new Thread(task).start();
}
}
2.5 三种方式对比
| 方式 | 优点 | 缺点 | 使用场景 |
|---|---|---|---|
| 继承Thread | 简单直接 | 不能继承其他类 | 简单任务 |
| 实现Runnable | 可继承其他类,资源共享 | 无返回值 | 常规任务 |
| 实现Callable | 有返回值,可抛异常 | 代码复杂 | 需要返回值 |
2.6 线程创建实战案例
import java.util.concurrent.*;
import java.util.*;
public class ThreadCreationPractice {
public static void main(String[] args) throws Exception {
// 案例1:多线程计算数组和
int[] arr = new int[1000];
for (int i = 0; i < arr.length; i++) {
arr[i] = i + 1;
}
long sum = parallelSum(arr, 4);
System.out.println("数组和: " + sum);
// 案例2:多线程文件处理
List<String> files = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
processFilesInParallel(files);
// 案例3:超时控制
String result = executeWithTimeout(() -> {
Thread.sleep(2000);
return "任务完成";
}, 3, TimeUnit.SECONDS);
System.out.println(result);
}
// 并行计算数组和
public static long parallelSum(int[] arr, int threadCount) throws Exception {
int chunkSize = arr.length / threadCount;
List<Future<Long>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? arr.length : (i + 1) * chunkSize;
Future<Long> future = executor.submit(() -> {
long sum = 0;
for (int j = start; j < end; j++) {
sum += arr[j];
}
return sum;
});
futures.add(future);
}
long totalSum = 0;
for (Future<Long> future : futures) {
totalSum += future.get();
}
executor.shutdown();
return totalSum;
}
// 并行处理文件
public static void processFilesInParallel(List<String> files) {
List<Thread> threads = new ArrayList<>();
for (String file : files) {
Thread thread = new Thread(() -> {
System.out.println("处理文件: " + file + ", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟处理
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "FileProcessor-" + file);
threads.add(thread);
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("所有文件处理完成");
}
// 带超时的任务执行
public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit)
throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(task);
try {
return future.get(timeout, unit);
} catch (TimeoutException e) {
future.cancel(true);
throw new RuntimeException("任务超时");
} finally {
executor.shutdown();
}
}
}
三、线程生命周期
3.1 线程状态
NEW(新建)
↓ start()
RUNNABLE(就绪/运行)
↓ 获取锁失败 / wait() / sleep() / join()
BLOCKED(阻塞)/ WAITING(等待)/ TIMED_WAITING(超时等待)
↓ 获取锁 / notify() / 超时
RUNNABLE
↓ run()执行完毕
TERMINATED(终止)
3.2 线程常用方法
public class ThreadMethodDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("线程执行中");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
});
// 启动线程
thread.start();
// 线程名称
System.out.println("线程名: " + thread.getName());
// 线程状态
System.out.println("状态: " + thread.getState());
// 线程优先级(1-10,默认5)
thread.setPriority(Thread.MAX_PRIORITY);
// 是否存活
System.out.println("存活: " + thread.isAlive());
// 等待线程结束
thread.join();
System.out.println("主线程结束");
}
}
3.3 sleep vs wait
| 特性 | sleep() | wait() |
|---|---|---|
| 所属类 | Thread | Object |
| 释放锁 | 不释放 | 释放 |
| 使用位置 | 任何地方 | 同步代码块/方法 |
| 唤醒方式 | 自动(超时) | notify()/notifyAll() |
四、线程同步
4.1 线程安全问题
public class UnsafeDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++; // 非原子操作
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count: " + count); // 结果小于20000
}
}
4.2 synchronized关键字
1. 同步方法
public class SafeCounter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
2. 同步代码块
public class SafeDemo {
private int count = 0;
private Object lock = new Object();
public void increment() {
synchronized (lock) {
count++;
}
}
// 同步this对象
public void decrement() {
synchronized (this) {
count--;
}
}
}
3. 静态同步方法
public class StaticSync {
private static int count = 0;
// 锁的是类对象
public static synchronized void increment() {
count++;
}
}
4.3 Lock接口
import java.util.concurrent.locks.*;
public class LockDemo {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 必须在finally中释放
}
}
// 尝试获取锁
public void tryIncrement() {
if (lock.tryLock()) {
try {
count++;
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败");
}
}
}
4.4 ReadWriteLock
import java.util.concurrent.locks.*;
public class ReadWriteLockDemo {
private int value = 0;
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读操作
public int read() {
rwLock.readLock().lock();
try {
return value;
} finally {
rwLock.readLock().unlock();
}
}
// 写操作
public void write(int value) {
rwLock.writeLock().lock();
try {
this.value = value;
} finally {
rwLock.writeLock().unlock();
}
}
}
4.5 volatile关键字
public class VolatileDemo {
private volatile boolean flag = false;
public void writer() {
flag = true; // 写操作立即可见
}
public void reader() {
while (!flag) {
// 等待flag变为true
}
System.out.println("flag已改变");
}
}
volatile 特性:
- ✅ 保证可见性:一个线程修改后,其他线程立即可见
- ✅ 禁止指令重排序
- ❌ 不保证原子性:
count++仍然不是线程安全的
volatile vs synchronized:
| 特性 | volatile | synchronized |
|---|---|---|
| 原子性 | ❌ 不保证 | ✅ 保证 |
| 可见性 | ✅ 保证 | ✅ 保证 |
| 有序性 | ✅ 保证 | ✅ 保证 |
| 性能 | 高(无锁) | 低(加锁) |
| 使用场景 | 状态标志 | 复合操作 |
4.6 原子类
import java.util.concurrent.atomic.*;
public class AtomicDemo {
private AtomicInteger count = new AtomicInteger(0);
private AtomicLong longValue = new AtomicLong(0);
private AtomicBoolean flag = new AtomicBoolean(false);
public void increment() {
count.incrementAndGet(); // 原子操作:count++
}
public void decrement() {
count.decrementAndGet(); // 原子操作:count--
}
public void addValue(int delta) {
count.addAndGet(delta); // 原子操作:count += delta
}
public boolean compareAndSet(int expect, int update) {
return count.compareAndSet(expect, update); // CAS操作
}
public static void main(String[] args) throws InterruptedException {
AtomicDemo demo = new AtomicDemo();
// 创建10个线程,每个线程增加1000次
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increment();
}
});
threads[i].start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + demo.count.get()); // 10000
}
}
常用原子类:
| 类型 | 说明 | 示例 |
|---|---|---|
AtomicInteger | 原子整数 | count.incrementAndGet() |
AtomicLong | 原子长整数 | value.addAndGet(10) |
AtomicBoolean | 原子布尔 | flag.compareAndSet(false, true) |
AtomicReference<T> | 原子引用 | ref.set(newObject) |
LongAdder | 高性能计数器 | adder.increment() |
4.7 死锁问题
public class DeadlockDemo {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
public static void main(String[] args) {
// 线程1:先获取lock1,再获取lock2
Thread t1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("线程1获取lock1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock2) {
System.out.println("线程1获取lock2");
}
}
});
// 线程2:先获取lock2,再获取lock1
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("线程2获取lock2");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock1) {
System.out.println("线程2获取lock1");
}
}
});
t1.start();
t2.start();
// 结果:死锁,两个线程互相等待
}
}
避免死锁的方法:
- 固定加锁顺序
// ✅ 正确:所有线程按相同顺序获取锁
synchronized (lock1) {
synchronized (lock2) {
// ...
}
}
- 使用 tryLock 超时
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
if (lock1.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
if (lock2.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
// 业务逻辑
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
- 使用 Lock 的 lockInterruptibly
try {
lock.lockInterruptibly(); // 可被中断的锁
// 业务逻辑
} catch (InterruptedException e) {
// 处理中断
} finally {
lock.unlock();
}
4.8 线程安全的单例模式
// 方式1:饿汉式(线程安全,但可能浪费内存)
public class Singleton1 {
private static final Singleton1 INSTANCE = new Singleton1();
private Singleton1() {}
public static Singleton1 getInstance() {
return INSTANCE;
}
}
// 方式2:双重检查锁(推荐)
public class Singleton2 {
private static volatile Singleton2 instance;
private Singleton2() {}
public static Singleton2 getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton2.class) {
if (instance == null) { // 第二次检查
instance = new Singleton2();
}
}
}
return instance;
}
}
// 方式3:静态内部类(推荐)
public class Singleton3 {
private Singleton3() {}
private static class Holder {
private static final Singleton3 INSTANCE = new Singleton3();
}
public static Singleton3 getInstance() {
return Holder.INSTANCE;
}
}
// 方式4:枚举(最安全)
public enum Singleton4 {
INSTANCE;
public void doSomething() {
System.out.println("单例方法");
}
}
五、线程通信
5.1 wait/notify
public class WaitNotifyDemo {
private static Object lock = new Object();
private static boolean flag = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!flag) {
try {
System.out.println("t1等待");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t1继续执行");
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
flag = true;
System.out.println("t2通知");
lock.notify();
}
});
t1.start();
try { Thread.sleep(100); } catch (InterruptedException e) {}
t2.start();
}
}
5.2 生产者消费者模式
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumer {
private Queue<Integer> queue = new LinkedList<>();
private int maxSize = 10;
// 生产者
public synchronized void produce(int value) throws InterruptedException {
while (queue.size() == maxSize) {
wait(); // 队列满,等待
}
queue.offer(value);
System.out.println("生产: " + value + ", 队列大小: " + queue.size());
notifyAll(); // 通知消费者
}
// 消费者
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 队列空,等待
}
int value = queue.poll();
System.out.println("消费: " + value + ", 队列大小: " + queue.size());
notifyAll(); // 通知生产者
return value;
}
}
// 测试
public class Main {
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
pc.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 消费者线程
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
pc.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
六、线程池
6.1 为什么使用线程池
优点:
- 降低资源消耗(复用线程)
- 提高响应速度(无需创建线程)
- 提高线程可管理性
6.2 创建线程池
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
// 2. 单线程线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 3. 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 4. 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 提交任务
fixedPool.execute(() -> {
System.out.println("任务执行");
});
// 关闭线程池
fixedPool.shutdown();
}
}
6.3 自定义线程池
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(10), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("任务" + taskId + "执行,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
6.4 线程池参数
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数 |
| maximumPoolSize | 最大线程数 |
| keepAliveTime | 空闲线程存活时间 |
| unit | 时间单位 |
| workQueue | 任务队列 |
| threadFactory | 线程工厂 |
| handler | 拒绝策略 |
6.5 拒绝策略
// 1. AbortPolicy:抛出异常(默认)
new ThreadPoolExecutor.AbortPolicy()
// 2. CallerRunsPolicy:调用者线程执行
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. DiscardPolicy:丢弃任务
new ThreadPoolExecutor.DiscardPolicy()
// 4. DiscardOldestPolicy:丢弃最老任务
new ThreadPoolExecutor.DiscardOldestPolicy()
6.6 实战案例:批量下载
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class DownloadManager {
private ExecutorService executor;
public DownloadManager(int threadCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
}
public void downloadFiles(List<String> urls) {
List<Future<Boolean>> futures = new ArrayList<>();
for (String url : urls) {
Future<Boolean> future = executor.submit(() -> {
return downloadFile(url);
});
futures.add(future);
}
// 等待所有任务完成
for (int i = 0; i < futures.size(); i++) {
try {
boolean success = futures.get(i).get();
System.out.println("文件" + i + "下载" + (success ? "成功" : "失败"));
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
}
private boolean downloadFile(String url) {
System.out.println("下载: " + url + ", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟下载
} catch (InterruptedException e) {
return false;
}
return true;
}
}
// 测试
public class Main {
public static void main(String[] args) {
List<String> urls = new ArrayList<>();
for (int i = 0; i < 10; i++) {
urls.add("http://example.com/file" + i + ".zip");
}
DownloadManager manager = new DownloadManager(3);
manager.downloadFiles(urls);
}
}
七、并发工具类
7.1 CountDownLatch
用于等待多个线程完成。
import java.util.concurrent.*;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
System.out.println("任务" + taskId + "开始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务" + taskId + "完成");
latch.countDown(); // 计数减1
}).start();
}
System.out.println("等待所有任务完成...");
latch.await(); // 等待计数归零
System.out.println("所有任务已完成");
}
}
使用场景:
- 主线程等待多个子线程完成
- 多个线程等待某个条件满足后同时开始
7.2 CyclicBarrier
让一组线程到达屏障点时被阻塞,直到最后一个线程到达。
import java.util.concurrent.*;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障,开始下一阶段");
});
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("线程" + taskId + "准备中...");
Thread.sleep((taskId + 1) * 1000);
System.out.println("线程" + taskId + "已就绪");
barrier.await(); // 等待其他线程
System.out.println("线程" + taskId + "继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
CountDownLatch vs CyclicBarrier:
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 可重用 | ❌ 不可重用 | ✅ 可重用 |
| 等待方式 | 一个或多个线程等待 | 所有线程互相等待 |
| 计数方式 | 递减到0 | 递增到N |
| 使用场景 | 主线程等待子线程 | 多线程协同工作 |
7.3 Semaphore
控制同时访问资源的线程数量。
import java.util.concurrent.*;
public class SemaphoreDemo {
public static void main(String[] args) {
// 只允许3个线程同时访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程" + taskId + "获得许可,开始执行");
Thread.sleep(2000);
System.out.println("线程" + taskId + "执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
使用场景:
- 限流:控制并发访问数量
- 资源池:管理有限资源(如数据库连接池)
7.4 并发集合
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// 1. ConcurrentHashMap:线程安全的HashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
map.putIfAbsent("key2", 2); // 如果不存在则添加
// 2. CopyOnWriteArrayList:读多写少场景
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
// 3. ConcurrentLinkedQueue:无界非阻塞队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("task1");
String task = queue.poll();
// 4. BlockingQueue:阻塞队列
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
try {
blockingQueue.put("item"); // 队列满时阻塞
String item = blockingQueue.take(); // 队列空时阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
并发集合选择:
| 场景 | 推荐集合 | 说明 |
|---|---|---|
| 高并发Map | ConcurrentHashMap | 分段锁,性能好 |
| 读多写少List | CopyOnWriteArrayList | 写时复制 |
| 无界队列 | ConcurrentLinkedQueue | 非阻塞 |
| 有界队列 | LinkedBlockingQueue | 阻塞 |
| 优先级队列 | PriorityBlockingQueue | 按优先级 |
| 延迟队列 | DelayQueue | 延迟获取 |
7.5 CompletableFuture
异步编程的强大工具(JDK 8+)。
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 1. 异步执行任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行中...");
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "结果1";
});
// 2. 任务完成后的回调
future1.thenAccept(result -> {
System.out.println("任务完成,结果: " + result);
});
// 3. 链式调用
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 10;
}).thenApply(num -> {
return num * 2; // 20
}).thenApply(num -> {
return num + 5; // 25
});
System.out.println("链式结果: " + future2.get());
// 4. 组合多个异步任务
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "任务1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (InterruptedException e) {}
return "任务2";
});
// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
allTasks.get();
System.out.println("所有任务完成");
// 等待任意一个任务完成
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1, task2);
System.out.println("最快完成的任务: " + anyTask.get());
// 5. 异常处理
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务失败");
}
return "成功";
}).exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "默认值";
});
System.out.println("结果: " + future3.get());
}
}
CompletableFuture 常用方法:
| 方法 | 说明 | 示例 |
|---|---|---|
supplyAsync() | 异步执行有返回值任务 | supplyAsync(() -> "result") |
runAsync() | 异步执行无返回值任务 | runAsync(() -> {...}) |
thenApply() | 转换结果 | thenApply(x -> x * 2) |
thenAccept() | 消费结果 | thenAccept(System.out::println) |
thenCompose() | 组合Future | thenCompose(x -> anotherFuture) |
allOf() | 等待所有完成 | allOf(f1, f2, f3) |
anyOf() | 等待任意完成 | anyOf(f1, f2, f3) |
exceptionally() | 异常处理 | exceptionally(ex -> defaultValue) |
7.6 实战:并行处理大数据
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelProcessingDemo {
public static void main(String[] args) throws Exception {
// 模拟大量数据
List<Integer> data = IntStream.range(0, 1000000)
.boxed()
.collect(Collectors.toList());
// 方式1:使用线程池
long start1 = System.currentTimeMillis();
int result1 = processWithThreadPool(data);
long end1 = System.currentTimeMillis();
System.out.println("线程池处理耗时: " + (end1 - start1) + "ms, 结果: " + result1);
// 方式2:使用并行流
long start2 = System.currentTimeMillis();
int result2 = data.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n * 2)
.sum();
long end2 = System.currentTimeMillis();
System.out.println("并行流处理耗时: " + (end2 - start2) + "ms, 结果: " + result2);
// 方式3:使用 CompletableFuture
long start3 = System.currentTimeMillis();
int result3 = processWithCompletableFuture(data);
long end3 = System.currentTimeMillis();
System.out.println("CompletableFuture处理耗时: " + (end3 - start3) + "ms, 结果: " + result3);
}
// 使用线程池处理
private static int processWithThreadPool(List<Integer> data) throws Exception {
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
int chunkSize = data.size() / threadCount;
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;
Future<Integer> future = executor.submit(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
int n = data.get(j);
if (n % 2 == 0) {
sum += n * 2;
}
}
return sum;
});
futures.add(future);
}
int totalSum = 0;
for (Future<Integer> future : futures) {
totalSum += future.get();
}
executor.shutdown();
return totalSum;
}
// 使用 CompletableFuture 处理
private static int processWithCompletableFuture(List<Integer> data) throws Exception {
int threadCount = Runtime.getRuntime().availableProcessors();
int chunkSize = data.size() / threadCount;
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
int n = data.get(j);
if (n % 2 == 0) {
sum += n * 2;
}
}
return sum;
});
futures.add(future);
}
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.get();
return futures.stream()
.mapToInt(f -> {
try {
return f.get();
} catch (Exception e) {
return 0;
}
})
.sum();
}
}
八、最佳实践
8.1 线程池配置
import java.util.concurrent.*;
public class ThreadPoolBestPractice {
// ❌ 不推荐:使用 Executors 创建(可能导致OOM)
ExecutorService bad1 = Executors.newFixedThreadPool(10); // 无界队列
ExecutorService bad2 = Executors.newCachedThreadPool(); // 无限线程
// ✅ 推荐:手动创建,明确参数
ThreadPoolExecutor good = new ThreadPoolExecutor(
4, // 核心线程数
8, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(100), // 有界队列
new ThreadFactory() { // 自定义线程工厂
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("MyPool-" + count++);
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("线程" + t.getName() + "异常: " + e.getMessage());
});
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
线程池大小计算:
| 任务类型 | 推荐大小 | 说明 |
|---|---|---|
| CPU密集型 | N + 1 | N = CPU核心数 |
| IO密集型 | 2N | 或 N / (1 - 阻塞系数) |
| 混合型 | 根据实际测试调整 | 监控CPU和IO使用率 |
// 获取CPU核心数
int cpuCount = Runtime.getRuntime().availableProcessors();
// CPU密集型任务
int cpuIntensivePoolSize = cpuCount + 1;
// IO密集型任务(假设阻塞系数0.9)
int ioIntensivePoolSize = cpuCount * 2;
// 或更精确:cpuCount / (1 - 0.9) = cpuCount * 10
8.2 异常处理
import java.util.concurrent.*;
public class ExceptionHandlingDemo {
public static void main(String[] args) {
// 方式1:try-catch 捕获
Thread t1 = new Thread(() -> {
try {
int result = 10 / 0;
} catch (Exception e) {
System.err.println("捕获异常: " + e.getMessage());
}
});
// 方式2:设置未捕获异常处理器
Thread t2 = new Thread(() -> {
int result = 10 / 0; // 抛出异常
});
t2.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("线程" + thread.getName() + "异常: " + throwable.getMessage());
});
// 方式3:线程池的 afterExecute
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.err.println("任务执行异常: " + t.getMessage());
}
}
};
// 方式4:Future.get() 捕获
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(() -> {
return 10 / 0;
});
try {
future.get();
} catch (ExecutionException e) {
System.err.println("任务异常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.start();
t2.start();
executor.shutdown();
pool.shutdown();
}
}
8.3 避免常见陷阱
public class CommonPitfalls {
// ❌ 陷阱1:直接调用 run() 而不是 start()
public void pitfall1() {
Thread thread = new Thread(() -> System.out.println("执行"));
thread.run(); // ❌ 错误:在当前线程执行,不会创建新线程
thread.start(); // ✅ 正确:创建新线程执行
}
// ❌ 陷阱2:忘记释放锁
private Object lock = new Object();
public void pitfall2() {
synchronized (lock) {
// 业务逻辑
if (someCondition()) {
return; // ❌ 可能导致锁未释放
}
}
// ✅ 使用 try-finally 确保释放
}
// ❌ 陷阱3:在循环中创建线程
public void pitfall3() {
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 任务
}).start(); // ❌ 创建大量线程,资源浪费
}
// ✅ 使用线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
// 任务
});
}
executor.shutdown();
}
// ❌ 陷阱4:共享可变状态
private int count = 0; // 共享变量
public void pitfall4() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
count++; // ❌ 非线程安全
}).start();
}
// ✅ 使用原子类或同步
private AtomicInteger safeCount = new AtomicInteger(0);
safeCount.incrementAndGet();
}
// ❌ 陷阱5:忘记关闭线程池
public void pitfall5() {
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.execute(() -> {
// 任务
});
// ❌ 忘记关闭,程序不会退出
// ✅ 正确关闭
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
private boolean someCondition() { return true; }
}
8.4 性能优化技巧
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class PerformanceOptimization {
// 1. 使用 ThreadLocal 避免同步
private static ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
public String formatDate(Date date) {
return dateFormat.get().format(date); // 每个线程独立实例
}
// 2. 使用 LongAdder 替代 AtomicLong(高并发计数)
private LongAdder counter = new LongAdder(); // 性能更好
public void increment() {
counter.increment();
}
public long getCount() {
return counter.sum();
}
// 3. 减小锁粒度
private Object lock1 = new Object();
private Object lock2 = new Object();
public void fineLock() {
synchronized (lock1) {
// 操作1
}
synchronized (lock2) {
// 操作2(独立的锁)
}
}
// 4. 使用读写锁
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private Map<String, String> cache = new HashMap<>();
public String read(String key) {
rwLock.readLock().lock();
try {
return cache.get(key); // 多个读线程可并发
} finally {
rwLock.readLock().unlock();
}
}
public void write(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value); // 写操作独占
} finally {
rwLock.writeLock().unlock();
}
}
// 5. 批量操作减少锁竞争
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void batchProcess() {
List<String> batch = new ArrayList<>();
queue.drainTo(batch, 100); // 一次取出多个,减少锁竞争
for (String item : batch) {
// 处理
}
}
}
8.5 监控和调试
import java.lang.management.*;
import java.util.concurrent.*;
public class ThreadMonitoring {
public static void main(String[] args) {
// 1. 获取线程信息
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
System.out.println("当前线程数: " + threadMXBean.getThreadCount());
System.out.println("峰值线程数: " + threadMXBean.getPeakThreadCount());
System.out.println("总启动线程数: " + threadMXBean.getTotalStartedThreadCount());
// 2. 检测死锁
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("检测到死锁,涉及线程数: " + deadlockedThreads.length);
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads);
for (ThreadInfo info : threadInfos) {
System.out.println("死锁线程: " + info.getThreadName());
}
}
// 3. 监控线程池状态
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
// 4. 打印所有线程堆栈
Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : allThreads.entrySet()) {
Thread thread = entry.getKey();
System.out.println("\n线程: " + thread.getName() + ", 状态: " + thread.getState());
for (StackTraceElement element : entry.getValue()) {
System.out.println(" " + element);
}
}
executor.shutdown();
}
}
九、快速参考
线程创建选择
需要返回值?
├─ 是 → Callable
└─ 否 → 需要继承其他类?
├─ 是 → Runnable
└─ 否 → Thread或Runnable
同步方式选择
| 场景 | 推荐方式 | 说明 |
|---|---|---|
| 简单同步 | synchronized | 简单易用 |
| 需要尝试获取锁 | Lock.tryLock() | 可超时 |
| 读多写少 | ReadWriteLock | 读并发 |
| 可见性保证 | volatile | 轻量级 |
| 原子操作 | AtomicXxx | 无锁 |
| 高并发计数 | LongAdder | 性能最好 |
线程池选择
| 场景 | 线程池类型 | 说明 |
|---|---|---|
| 固定任务数 | FixedThreadPool | 固定线程数 |
| 单线程顺序执行 | SingleThreadExecutor | 顺序执行 |
| 大量短任务 | CachedThreadPool | 动态扩展 |
| 定时任务 | ScheduledThreadPool | 定时调度 |
| 自定义需求 | ThreadPoolExecutor | 完全控制 |
并发工具选择
| 需求 | 工具类 | 使用场景 |
|---|---|---|
| 等待多个线程完成 | CountDownLatch | 主线程等待 |
| 多线程协同 | CyclicBarrier | 互相等待 |
| 限流 | Semaphore | 控制并发数 |
| 线程安全Map | ConcurrentHashMap | 高并发读写 |
| 线程安全List | CopyOnWriteArrayList | 读多写少 |
| 阻塞队列 | BlockingQueue | 生产消费 |
| 异步编程 | CompletableFuture | 链式调用 |
常用方法速查
Thread 类:
thread.start() // 启动线程
thread.join() // 等待线程结束
thread.interrupt() // 中断线程
Thread.sleep(millis) // 休眠
Thread.yield() // 让出CPU
thread.setName(name) // 设置名称
thread.setPriority(priority)// 设置优先级
thread.isAlive() // 是否存活
thread.getState() // 获取状态
Lock 接口:
lock.lock() // 获取锁
lock.unlock() // 释放锁
lock.tryLock() // 尝试获取锁
lock.tryLock(time, unit) // 超时获取锁
lock.lockInterruptibly() // 可中断获取锁
线程池:
executor.execute(runnable) // 执行任务(无返回值)
executor.submit(callable) // 提交任务(有返回值)
executor.shutdown() // 优雅关闭
executor.shutdownNow() // 立即关闭
executor.awaitTermination() // 等待终止
最佳实践清单
✅ 推荐做法:
- 优先使用线程池而非直接创建线程
- 给线程和线程池命名,便于调试
- 使用有界队列防止内存溢出
- 合理设置线程池大小(CPU密集型:N+1,IO密集型:2N)
- 使用
volatile保证可见性 - 使用原子类实现线程安全计数
- 使用
ThreadLocal避免共享状态 - 捕获线程异常,避免线程终止
- 使用
try-finally确保锁释放 - 优先使用 JUC 工具类而非自己实现
❌ 避免做法:
- 不要使用
Executors创建线程池(可能OOM) - 不要在循环中创建大量线程
- 不要忘记关闭线程池
- 不要直接调用
run()方法 - 不要在持有锁时执行耗时操作
- 不要使用
stop()、suspend()、resume()(已废弃) - 不要忽略
InterruptedException - 不要在
finally块中使用return
十、总结
本文档全面介绍了 Java 多线程编程的核心知识:
1. 线程基础
- 进程与线程的区别
- 并发与并行的概念
- 多线程的优势和挑战
2. 创建线程
- 继承 Thread 类
- 实现 Runnable 接口
- 实现 Callable 接口
- Lambda 表达式创建
- 实战案例:并行计算、文件处理、超时控制
3. 线程生命周期
- 6种线程状态
- 常用方法(start、join、sleep、interrupt)
- sleep vs wait 的区别
4. 线程同步
- synchronized 关键字(方法、代码块、静态)
- Lock 接口(ReentrantLock、ReadWriteLock)
- volatile 关键字
- 原子类(AtomicInteger、LongAdder)
- 死锁问题及解决方案
- 线程安全的单例模式
5. 线程通信
- wait/notify 机制
- 生产者消费者模式
6. 线程池
- 为什么使用线程池
- 4种预定义线程池
- 自定义线程池(ThreadPoolExecutor)
- 线程池参数和拒绝策略
- 实战案例:批量下载
7. 并发工具类
- CountDownLatch:等待多个线程完成
- CyclicBarrier:多线程协同
- Semaphore:限流控制
- 并发集合(ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue)
- CompletableFuture:异步编程
- 实战案例:并行处理大数据
8. 最佳实践
- 线程池配置和大小计算
- 异常处理(4种方式)
- 避免常见陷阱(5个典型错误)
- 性能优化技巧(ThreadLocal、LongAdder、读写锁)
- 监控和调试(死锁检测、线程池监控)
关键要点
| 主题 | 核心内容 |
|---|---|
| 线程创建 | 优先使用 Runnable/Callable,避免继承 Thread |
| 线程安全 | 使用 synchronized、Lock、原子类保证安全 |
| 线程池 | 手动创建 ThreadPoolExecutor,避免 Executors |
| 并发工具 | 善用 JUC 工具类,不要重复造轮子 |
| 性能优化 | ThreadLocal、LongAdder、读写锁、批量操作 |
| 异常处理 | 必须捕获异常,避免线程意外终止 |
| 资源管理 | 及时关闭线程池,使用 try-finally 释放锁 |
学习路径
- 入门阶段:掌握线程创建、生命周期、基本同步
- 进阶阶段:理解线程池、Lock、原子类、并发集合
- 高级阶段:掌握 CompletableFuture、性能优化、问题排查
- 实战阶段:在项目中应用,积累经验
推荐资源
- 《Java并发编程实战》(Brian Goetz)
- 《Java并发编程的艺术》(方腾飞)
- Java官方文档 - Concurrency
- Doug Lea 的并发编程网站
下一步
- 深入学习 JVM 内存模型(JMM)
- 了解 happens-before 原则
- 掌握无锁编程(CAS)
- 学习分布式并发(分布式锁、消息队列)
- 实践高并发系统设计
记住:并发编程是一门实践的艺术,需要在实际项目中不断积累经验。从简单开始,逐步深入,注重代码质量和性能优化。
转载自CSDN-专业IT技术社区
原文链接:https://blog.csdn.net/m0_48502770/article/details/156058273



