关注

【Java】多线程详解

Java 多线程详解

Thread、Runnable、Callable、线程同步、线程池、并发工具类完整教程

目录


一、线程基础

1.1 进程与线程

进程(Process)

  • 定义:程序的一次执行过程,是系统进行资源分配的基本单位
  • 特点:拥有独立的内存空间、代码、数据和系统资源
  • 隔离性:进程之间相互独立,一个进程崩溃不会影响其他进程

线程(Thread)

  • 定义:进程中的一个执行单元,是 CPU 调度的基本单位
  • 特点:共享进程的内存空间和资源
  • 轻量级:线程切换开销远小于进程切换

进程 vs 线程对比:

维度进程线程
资源分配独立的内存空间共享进程内存
通信方式IPC(管道、消息队列等)直接读写共享变量
创建开销大(需要分配资源)小(共享资源)
切换开销大(需要切换地址空间)小(只切换上下文)
影响范围崩溃不影响其他进程崩溃导致整个进程终止
数据共享困难(需要IPC)容易(共享内存)
适用场景独立应用程序同一应用内的并发任务

为什么需要多线程?

  • 提高程序响应速度:UI线程不被阻塞
  • 充分利用CPU:多核CPU并行执行
  • 提高资源利用率:IO等待时CPU可执行其他任务
  • 简化程序结构:将复杂任务分解为多个独立线程

1.2 并发与并行

并发(Concurrency)

  • 定义:多个任务在同一时间段内交替执行
  • 场景:单核CPU通过时间片轮转实现
  • 特点:宏观上同时进行,微观上交替执行
  • 示例:一个人同时处理多个任务(切换处理)

并行(Parallelism)

  • 定义:多个任务在同一时刻真正同时执行
  • 场景:多核CPU同时执行多个线程
  • 特点:真正的同时执行
  • 示例:多个人同时处理不同任务
并发(单核):
时间 →  [任务A] [任务B] [任务A] [任务C] [任务B] ...
        ↑ 快速切换,看起来像同时执行

并行(多核):
核心1:  [任务A] [任务A] [任务A] [任务A] ...
核心2:  [任务B] [任务B] [任务B] [任务B] ...
核心3:  [任务C] [任务C] [任务C] [任务C] ...
        ↑ 真正同时执行

并发 vs 并行对比:

维度并发并行
执行方式交替执行同时执行
CPU要求单核或多核必须多核
目的提高响应速度提高吞吐量
实现时间片轮转多个CPU核心

1.3 多线程的挑战

优势:

  • ✅ 提高程序性能
  • ✅ 改善用户体验
  • ✅ 充分利用硬件资源

挑战:

  • ⚠️ 线程安全问题:多个线程访问共享数据可能导致数据不一致
  • ⚠️ 死锁问题:多个线程相互等待对方释放资源
  • ⚠️ 性能开销:线程创建、切换、同步都有开销
  • ⚠️ 调试困难:并发bug难以重现和调试
  • ⚠️ 复杂性增加:需要考虑同步、通信、资源竞争等问题

二、创建线程

2.1 继承Thread类

public class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();

        t1.setName("线程1");
        t2.setName("线程2");

        t1.start();  // 启动线程
        t2.start();
    }
}

2.2 实现Runnable接口

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
        }
    }
}

public class Main {
    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();

        Thread t1 = new Thread(runnable, "线程1");
        Thread t2 = new Thread(runnable, "线程2");

        t1.start();
        t2.start();
    }
}

2.3 实现Callable接口

import java.util.concurrent.*;

public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        MyCallable callable = new MyCallable();
        FutureTask<Integer> task = new FutureTask<>(callable);

        Thread thread = new Thread(task);
        thread.start();

        // 获取返回值(会阻塞)
        Integer result = task.get();
        System.out.println("结果: " + result);
    }
}

2.4 Lambda表达式创建

public class Main {
    public static void main(String[] args) {
        // 方式1:Runnable
        new Thread(() -> {
            System.out.println("Lambda线程执行");
        }).start();

        // 方式2:Callable
        FutureTask<String> task = new FutureTask<>(() -> {
            return "Lambda返回值";
        });
        new Thread(task).start();
    }
}

2.5 三种方式对比

方式优点缺点使用场景
继承Thread简单直接不能继承其他类简单任务
实现Runnable可继承其他类,资源共享无返回值常规任务
实现Callable有返回值,可抛异常代码复杂需要返回值

2.6 线程创建实战案例

import java.util.concurrent.*;
import java.util.*;

public class ThreadCreationPractice {
    public static void main(String[] args) throws Exception {
        // 案例1:多线程计算数组和
        int[] arr = new int[1000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = i + 1;
        }

        long sum = parallelSum(arr, 4);
        System.out.println("数组和: " + sum);

        // 案例2:多线程文件处理
        List<String> files = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
        processFilesInParallel(files);

        // 案例3:超时控制
        String result = executeWithTimeout(() -> {
            Thread.sleep(2000);
            return "任务完成";
        }, 3, TimeUnit.SECONDS);
        System.out.println(result);
    }

    // 并行计算数组和
    public static long parallelSum(int[] arr, int threadCount) throws Exception {
        int chunkSize = arr.length / threadCount;
        List<Future<Long>> futures = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int start = i * chunkSize;
            final int end = (i == threadCount - 1) ? arr.length : (i + 1) * chunkSize;

            Future<Long> future = executor.submit(() -> {
                long sum = 0;
                for (int j = start; j < end; j++) {
                    sum += arr[j];
                }
                return sum;
            });
            futures.add(future);
        }

        long totalSum = 0;
        for (Future<Long> future : futures) {
            totalSum += future.get();
        }

        executor.shutdown();
        return totalSum;
    }

    // 并行处理文件
    public static void processFilesInParallel(List<String> files) {
        List<Thread> threads = new ArrayList<>();

        for (String file : files) {
            Thread thread = new Thread(() -> {
                System.out.println("处理文件: " + file + ", 线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);  // 模拟处理
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "FileProcessor-" + file);
            threads.add(thread);
            thread.start();
        }

        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("所有文件处理完成");
    }

    // 带超时的任务执行
    public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit)
            throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<T> future = executor.submit(task);

        try {
            return future.get(timeout, unit);
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new RuntimeException("任务超时");
        } finally {
            executor.shutdown();
        }
    }
}

三、线程生命周期

3.1 线程状态

NEW(新建)
  ↓ start()
RUNNABLE(就绪/运行)
  ↓ 获取锁失败 / wait() / sleep() / join()
BLOCKED(阻塞)/ WAITING(等待)/ TIMED_WAITING(超时等待)
  ↓ 获取锁 / notify() / 超时
RUNNABLE
  ↓ run()执行完毕
TERMINATED(终止)

3.2 线程常用方法

public class ThreadMethodDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("线程执行中");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
        });

        // 启动线程
        thread.start();

        // 线程名称
        System.out.println("线程名: " + thread.getName());

        // 线程状态
        System.out.println("状态: " + thread.getState());

        // 线程优先级(1-10,默认5)
        thread.setPriority(Thread.MAX_PRIORITY);

        // 是否存活
        System.out.println("存活: " + thread.isAlive());

        // 等待线程结束
        thread.join();

        System.out.println("主线程结束");
    }
}

3.3 sleep vs wait

特性sleep()wait()
所属类ThreadObject
释放锁不释放释放
使用位置任何地方同步代码块/方法
唤醒方式自动(超时)notify()/notifyAll()

四、线程同步

4.1 线程安全问题

public class UnsafeDemo {
    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                count++;  // 非原子操作
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                count++;
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("count: " + count);  // 结果小于20000
    }
}

4.2 synchronized关键字

1. 同步方法

public class SafeCounter {
    private int count = 0;

    // 同步方法
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

2. 同步代码块

public class SafeDemo {
    private int count = 0;
    private Object lock = new Object();

    public void increment() {
        synchronized (lock) {
            count++;
        }
    }

    // 同步this对象
    public void decrement() {
        synchronized (this) {
            count--;
        }
    }
}

3. 静态同步方法

public class StaticSync {
    private static int count = 0;

    // 锁的是类对象
    public static synchronized void increment() {
        count++;
    }
}

4.3 Lock接口

import java.util.concurrent.locks.*;

public class LockDemo {
    private int count = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();  // 必须在finally中释放
        }
    }

    // 尝试获取锁
    public void tryIncrement() {
        if (lock.tryLock()) {
            try {
                count++;
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println("获取锁失败");
        }
    }
}

4.4 ReadWriteLock

import java.util.concurrent.locks.*;

public class ReadWriteLockDemo {
    private int value = 0;
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();

    // 读操作
    public int read() {
        rwLock.readLock().lock();
        try {
            return value;
        } finally {
            rwLock.readLock().unlock();
        }
    }

    // 写操作
    public void write(int value) {
        rwLock.writeLock().lock();
        try {
            this.value = value;
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

4.5 volatile关键字

public class VolatileDemo {
    private volatile boolean flag = false;

    public void writer() {
        flag = true;  // 写操作立即可见
    }

    public void reader() {
        while (!flag) {
            // 等待flag变为true
        }
        System.out.println("flag已改变");
    }
}

volatile 特性:

  • ✅ 保证可见性:一个线程修改后,其他线程立即可见
  • ✅ 禁止指令重排序
  • ❌ 不保证原子性:count++ 仍然不是线程安全的

volatile vs synchronized:

特性volatilesynchronized
原子性❌ 不保证✅ 保证
可见性✅ 保证✅ 保证
有序性✅ 保证✅ 保证
性能高(无锁)低(加锁)
使用场景状态标志复合操作

4.6 原子类

import java.util.concurrent.atomic.*;

public class AtomicDemo {
    private AtomicInteger count = new AtomicInteger(0);
    private AtomicLong longValue = new AtomicLong(0);
    private AtomicBoolean flag = new AtomicBoolean(false);

    public void increment() {
        count.incrementAndGet();  // 原子操作:count++
    }

    public void decrement() {
        count.decrementAndGet();  // 原子操作:count--
    }

    public void addValue(int delta) {
        count.addAndGet(delta);  // 原子操作:count += delta
    }

    public boolean compareAndSet(int expect, int update) {
        return count.compareAndSet(expect, update);  // CAS操作
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicDemo demo = new AtomicDemo();

        // 创建10个线程,每个线程增加1000次
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    demo.increment();
                }
            });
            threads[i].start();
        }

        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("最终结果: " + demo.count.get());  // 10000
    }
}

常用原子类:

类型说明示例
AtomicInteger原子整数count.incrementAndGet()
AtomicLong原子长整数value.addAndGet(10)
AtomicBoolean原子布尔flag.compareAndSet(false, true)
AtomicReference<T>原子引用ref.set(newObject)
LongAdder高性能计数器adder.increment()

4.7 死锁问题

public class DeadlockDemo {
    private static Object lock1 = new Object();
    private static Object lock2 = new Object();

    public static void main(String[] args) {
        // 线程1:先获取lock1,再获取lock2
        Thread t1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("线程1获取lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}

                synchronized (lock2) {
                    System.out.println("线程1获取lock2");
                }
            }
        });

        // 线程2:先获取lock2,再获取lock1
        Thread t2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("线程2获取lock2");
                try { Thread.sleep(100); } catch (InterruptedException e) {}

                synchronized (lock1) {
                    System.out.println("线程2获取lock1");
                }
            }
        });

        t1.start();
        t2.start();
        // 结果:死锁,两个线程互相等待
    }
}

避免死锁的方法:

  1. 固定加锁顺序
// ✅ 正确:所有线程按相同顺序获取锁
synchronized (lock1) {
    synchronized (lock2) {
        // ...
    }
}
  1. 使用 tryLock 超时
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();

if (lock1.tryLock(100, TimeUnit.MILLISECONDS)) {
    try {
        if (lock2.tryLock(100, TimeUnit.MILLISECONDS)) {
            try {
                // 业务逻辑
            } finally {
                lock2.unlock();
            }
        }
    } finally {
        lock1.unlock();
    }
}
  1. 使用 Lock 的 lockInterruptibly
try {
    lock.lockInterruptibly();  // 可被中断的锁
    // 业务逻辑
} catch (InterruptedException e) {
    // 处理中断
} finally {
    lock.unlock();
}

4.8 线程安全的单例模式

// 方式1:饿汉式(线程安全,但可能浪费内存)
public class Singleton1 {
    private static final Singleton1 INSTANCE = new Singleton1();

    private Singleton1() {}

    public static Singleton1 getInstance() {
        return INSTANCE;
    }
}

// 方式2:双重检查锁(推荐)
public class Singleton2 {
    private static volatile Singleton2 instance;

    private Singleton2() {}

    public static Singleton2 getInstance() {
        if (instance == null) {  // 第一次检查
            synchronized (Singleton2.class) {
                if (instance == null) {  // 第二次检查
                    instance = new Singleton2();
                }
            }
        }
        return instance;
    }
}

// 方式3:静态内部类(推荐)
public class Singleton3 {
    private Singleton3() {}

    private static class Holder {
        private static final Singleton3 INSTANCE = new Singleton3();
    }

    public static Singleton3 getInstance() {
        return Holder.INSTANCE;
    }
}

// 方式4:枚举(最安全)
public enum Singleton4 {
    INSTANCE;

    public void doSomething() {
        System.out.println("单例方法");
    }
}

五、线程通信

5.1 wait/notify

public class WaitNotifyDemo {
    private static Object lock = new Object();
    private static boolean flag = false;

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                while (!flag) {
                    try {
                        System.out.println("t1等待");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t1继续执行");
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                flag = true;
                System.out.println("t2通知");
                lock.notify();
            }
        });

        t1.start();
        try { Thread.sleep(100); } catch (InterruptedException e) {}
        t2.start();
    }
}

5.2 生产者消费者模式

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {
    private Queue<Integer> queue = new LinkedList<>();
    private int maxSize = 10;

    // 生产者
    public synchronized void produce(int value) throws InterruptedException {
        while (queue.size() == maxSize) {
            wait();  // 队列满,等待
        }
        queue.offer(value);
        System.out.println("生产: " + value + ", 队列大小: " + queue.size());
        notifyAll();  // 通知消费者
    }

    // 消费者
    public synchronized int consume() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();  // 队列空,等待
        }
        int value = queue.poll();
        System.out.println("消费: " + value + ", 队列大小: " + queue.size());
        notifyAll();  // 通知生产者
        return value;
    }
}

// 测试
public class Main {
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    pc.produce(i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费者线程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    pc.consume();
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

六、线程池

6.1 为什么使用线程池

优点:

  • 降低资源消耗(复用线程)
  • 提高响应速度(无需创建线程)
  • 提高线程可管理性

6.2 创建线程池

import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 1. 固定大小线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(3);

        // 2. 单线程线程池
        ExecutorService singlePool = Executors.newSingleThreadExecutor();

        // 3. 缓存线程池
        ExecutorService cachedPool = Executors.newCachedThreadPool();

        // 4. 定时任务线程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);

        // 提交任务
        fixedPool.execute(() -> {
            System.out.println("任务执行");
        });

        // 关闭线程池
        fixedPool.shutdown();
    }
}

6.3 自定义线程池

import java.util.concurrent.*;

public class CustomThreadPool {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                      // 核心线程数
            5,                      // 最大线程数
            60L,                    // 空闲线程存活时间
            TimeUnit.SECONDS,       // 时间单位
            new LinkedBlockingQueue<>(10),  // 任务队列
            Executors.defaultThreadFactory(),  // 线程工厂
            new ThreadPoolExecutor.AbortPolicy()  // 拒绝策略
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println("任务" + taskId + "执行,线程: " +
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

6.4 线程池参数

参数说明
corePoolSize核心线程数
maximumPoolSize最大线程数
keepAliveTime空闲线程存活时间
unit时间单位
workQueue任务队列
threadFactory线程工厂
handler拒绝策略

6.5 拒绝策略

// 1. AbortPolicy:抛出异常(默认)
new ThreadPoolExecutor.AbortPolicy()

// 2. CallerRunsPolicy:调用者线程执行
new ThreadPoolExecutor.CallerRunsPolicy()

// 3. DiscardPolicy:丢弃任务
new ThreadPoolExecutor.DiscardPolicy()

// 4. DiscardOldestPolicy:丢弃最老任务
new ThreadPoolExecutor.DiscardOldestPolicy()

6.6 实战案例:批量下载

import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;

public class DownloadManager {
    private ExecutorService executor;

    public DownloadManager(int threadCount) {
        this.executor = Executors.newFixedThreadPool(threadCount);
    }

    public void downloadFiles(List<String> urls) {
        List<Future<Boolean>> futures = new ArrayList<>();

        for (String url : urls) {
            Future<Boolean> future = executor.submit(() -> {
                return downloadFile(url);
            });
            futures.add(future);
        }

        // 等待所有任务完成
        for (int i = 0; i < futures.size(); i++) {
            try {
                boolean success = futures.get(i).get();
                System.out.println("文件" + i + "下载" + (success ? "成功" : "失败"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();
    }

    private boolean downloadFile(String url) {
        System.out.println("下载: " + url + ", 线程: " + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);  // 模拟下载
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }
}

// 测试
public class Main {
    public static void main(String[] args) {
        List<String> urls = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            urls.add("http://example.com/file" + i + ".zip");
        }

        DownloadManager manager = new DownloadManager(3);
        manager.downloadFiles(urls);
    }
}

七、并发工具类

7.1 CountDownLatch

用于等待多个线程完成。

import java.util.concurrent.*;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                System.out.println("任务" + taskId + "开始");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务" + taskId + "完成");
                latch.countDown();  // 计数减1
            }).start();
        }

        System.out.println("等待所有任务完成...");
        latch.await();  // 等待计数归零
        System.out.println("所有任务已完成");
    }
}

使用场景:

  • 主线程等待多个子线程完成
  • 多个线程等待某个条件满足后同时开始

7.2 CyclicBarrier

让一组线程到达屏障点时被阻塞,直到最后一个线程到达。

import java.util.concurrent.*;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程已到达屏障,开始下一阶段");
        });

        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程" + taskId + "准备中...");
                    Thread.sleep((taskId + 1) * 1000);
                    System.out.println("线程" + taskId + "已就绪");

                    barrier.await();  // 等待其他线程

                    System.out.println("线程" + taskId + "继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

CountDownLatch vs CyclicBarrier:

特性CountDownLatchCyclicBarrier
可重用❌ 不可重用✅ 可重用
等待方式一个或多个线程等待所有线程互相等待
计数方式递减到0递增到N
使用场景主线程等待子线程多线程协同工作

7.3 Semaphore

控制同时访问资源的线程数量。

import java.util.concurrent.*;

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 只允许3个线程同时访问
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire();  // 获取许可
                    System.out.println("线程" + taskId + "获得许可,开始执行");
                    Thread.sleep(2000);
                    System.out.println("线程" + taskId + "执行完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();  // 释放许可
                }
            }).start();
        }
    }
}

使用场景:

  • 限流:控制并发访问数量
  • 资源池:管理有限资源(如数据库连接池)

7.4 并发集合

import java.util.concurrent.*;
import java.util.*;

public class ConcurrentCollectionsDemo {
    public static void main(String[] args) {
        // 1. ConcurrentHashMap:线程安全的HashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        map.put("key1", 1);
        map.putIfAbsent("key2", 2);  // 如果不存在则添加

        // 2. CopyOnWriteArrayList:读多写少场景
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("item1");
        list.add("item2");

        // 3. ConcurrentLinkedQueue:无界非阻塞队列
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("task1");
        String task = queue.poll();

        // 4. BlockingQueue:阻塞队列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
        try {
            blockingQueue.put("item");  // 队列满时阻塞
            String item = blockingQueue.take();  // 队列空时阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

并发集合选择:

场景推荐集合说明
高并发MapConcurrentHashMap分段锁,性能好
读多写少ListCopyOnWriteArrayList写时复制
无界队列ConcurrentLinkedQueue非阻塞
有界队列LinkedBlockingQueue阻塞
优先级队列PriorityBlockingQueue按优先级
延迟队列DelayQueue延迟获取

7.5 CompletableFuture

异步编程的强大工具(JDK 8+)。

import java.util.concurrent.*;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 1. 异步执行任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务执行中...");
            try { Thread.sleep(1000); } catch (InterruptedException e) {}
            return "结果1";
        });

        // 2. 任务完成后的回调
        future1.thenAccept(result -> {
            System.out.println("任务完成,结果: " + result);
        });

        // 3. 链式调用
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            return 10;
        }).thenApply(num -> {
            return num * 2;  // 20
        }).thenApply(num -> {
            return num + 5;  // 25
        });
        System.out.println("链式结果: " + future2.get());

        // 4. 组合多个异步任务
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) {}
            return "任务1";
        });

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(2000); } catch (InterruptedException e) {}
            return "任务2";
        });

        // 等待所有任务完成
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
        allTasks.get();
        System.out.println("所有任务完成");

        // 等待任意一个任务完成
        CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1, task2);
        System.out.println("最快完成的任务: " + anyTask.get());

        // 5. 异常处理
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("任务失败");
            }
            return "成功";
        }).exceptionally(ex -> {
            System.out.println("捕获异常: " + ex.getMessage());
            return "默认值";
        });

        System.out.println("结果: " + future3.get());
    }
}

CompletableFuture 常用方法:

方法说明示例
supplyAsync()异步执行有返回值任务supplyAsync(() -> "result")
runAsync()异步执行无返回值任务runAsync(() -> {...})
thenApply()转换结果thenApply(x -> x * 2)
thenAccept()消费结果thenAccept(System.out::println)
thenCompose()组合FuturethenCompose(x -> anotherFuture)
allOf()等待所有完成allOf(f1, f2, f3)
anyOf()等待任意完成anyOf(f1, f2, f3)
exceptionally()异常处理exceptionally(ex -> defaultValue)

7.6 实战:并行处理大数据

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ParallelProcessingDemo {
    public static void main(String[] args) throws Exception {
        // 模拟大量数据
        List<Integer> data = IntStream.range(0, 1000000)
                                      .boxed()
                                      .collect(Collectors.toList());

        // 方式1:使用线程池
        long start1 = System.currentTimeMillis();
        int result1 = processWithThreadPool(data);
        long end1 = System.currentTimeMillis();
        System.out.println("线程池处理耗时: " + (end1 - start1) + "ms, 结果: " + result1);

        // 方式2:使用并行流
        long start2 = System.currentTimeMillis();
        int result2 = data.parallelStream()
                          .filter(n -> n % 2 == 0)
                          .mapToInt(n -> n * 2)
                          .sum();
        long end2 = System.currentTimeMillis();
        System.out.println("并行流处理耗时: " + (end2 - start2) + "ms, 结果: " + result2);

        // 方式3:使用 CompletableFuture
        long start3 = System.currentTimeMillis();
        int result3 = processWithCompletableFuture(data);
        long end3 = System.currentTimeMillis();
        System.out.println("CompletableFuture处理耗时: " + (end3 - start3) + "ms, 结果: " + result3);
    }

    // 使用线程池处理
    private static int processWithThreadPool(List<Integer> data) throws Exception {
        int threadCount = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        int chunkSize = data.size() / threadCount;

        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            final int start = i * chunkSize;
            final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;

            Future<Integer> future = executor.submit(() -> {
                int sum = 0;
                for (int j = start; j < end; j++) {
                    int n = data.get(j);
                    if (n % 2 == 0) {
                        sum += n * 2;
                    }
                }
                return sum;
            });
            futures.add(future);
        }

        int totalSum = 0;
        for (Future<Integer> future : futures) {
            totalSum += future.get();
        }

        executor.shutdown();
        return totalSum;
    }

    // 使用 CompletableFuture 处理
    private static int processWithCompletableFuture(List<Integer> data) throws Exception {
        int threadCount = Runtime.getRuntime().availableProcessors();
        int chunkSize = data.size() / threadCount;

        List<CompletableFuture<Integer>> futures = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            final int start = i * chunkSize;
            final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;

            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                int sum = 0;
                for (int j = start; j < end; j++) {
                    int n = data.get(j);
                    if (n % 2 == 0) {
                        sum += n * 2;
                    }
                }
                return sum;
            });
            futures.add(future);
        }

        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        allOf.get();

        return futures.stream()
                      .mapToInt(f -> {
                          try {
                              return f.get();
                          } catch (Exception e) {
                              return 0;
                          }
                      })
                      .sum();
    }
}

八、最佳实践

8.1 线程池配置

import java.util.concurrent.*;

public class ThreadPoolBestPractice {
    // ❌ 不推荐:使用 Executors 创建(可能导致OOM)
    ExecutorService bad1 = Executors.newFixedThreadPool(10);  // 无界队列
    ExecutorService bad2 = Executors.newCachedThreadPool();   // 无限线程

    // ✅ 推荐:手动创建,明确参数
    ThreadPoolExecutor good = new ThreadPoolExecutor(
        4,                                      // 核心线程数
        8,                                      // 最大线程数
        60L, TimeUnit.SECONDS,                  // 空闲线程存活时间
        new LinkedBlockingQueue<>(100),         // 有界队列
        new ThreadFactory() {                   // 自定义线程工厂
            private int count = 0;
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("MyPool-" + count++);
                thread.setUncaughtExceptionHandler((t, e) -> {
                    System.err.println("线程" + t.getName() + "异常: " + e.getMessage());
                });
                return thread;
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
    );
}

线程池大小计算:

任务类型推荐大小说明
CPU密集型N + 1N = CPU核心数
IO密集型2N或 N / (1 - 阻塞系数)
混合型根据实际测试调整监控CPU和IO使用率
// 获取CPU核心数
int cpuCount = Runtime.getRuntime().availableProcessors();

// CPU密集型任务
int cpuIntensivePoolSize = cpuCount + 1;

// IO密集型任务(假设阻塞系数0.9)
int ioIntensivePoolSize = cpuCount * 2;
// 或更精确:cpuCount / (1 - 0.9) = cpuCount * 10

8.2 异常处理

import java.util.concurrent.*;

public class ExceptionHandlingDemo {
    public static void main(String[] args) {
        // 方式1:try-catch 捕获
        Thread t1 = new Thread(() -> {
            try {
                int result = 10 / 0;
            } catch (Exception e) {
                System.err.println("捕获异常: " + e.getMessage());
            }
        });

        // 方式2:设置未捕获异常处理器
        Thread t2 = new Thread(() -> {
            int result = 10 / 0;  // 抛出异常
        });
        t2.setUncaughtExceptionHandler((thread, throwable) -> {
            System.err.println("线程" + thread.getName() + "异常: " + throwable.getMessage());
        });

        // 方式3:线程池的 afterExecute
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        ) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t != null) {
                    System.err.println("任务执行异常: " + t.getMessage());
                }
            }
        };

        // 方式4:Future.get() 捕获
        ExecutorService pool = Executors.newFixedThreadPool(2);
        Future<Integer> future = pool.submit(() -> {
            return 10 / 0;
        });

        try {
            future.get();
        } catch (ExecutionException e) {
            System.err.println("任务异常: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t1.start();
        t2.start();
        executor.shutdown();
        pool.shutdown();
    }
}

8.3 避免常见陷阱

public class CommonPitfalls {
    // ❌ 陷阱1:直接调用 run() 而不是 start()
    public void pitfall1() {
        Thread thread = new Thread(() -> System.out.println("执行"));
        thread.run();  // ❌ 错误:在当前线程执行,不会创建新线程
        thread.start();  // ✅ 正确:创建新线程执行
    }

    // ❌ 陷阱2:忘记释放锁
    private Object lock = new Object();
    public void pitfall2() {
        synchronized (lock) {
            // 业务逻辑
            if (someCondition()) {
                return;  // ❌ 可能导致锁未释放
            }
        }
        // ✅ 使用 try-finally 确保释放
    }

    // ❌ 陷阱3:在循环中创建线程
    public void pitfall3() {
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                // 任务
            }).start();  // ❌ 创建大量线程,资源浪费
        }

        // ✅ 使用线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executor.execute(() -> {
                // 任务
            });
        }
        executor.shutdown();
    }

    // ❌ 陷阱4:共享可变状态
    private int count = 0;  // 共享变量
    public void pitfall4() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                count++;  // ❌ 非线程安全
            }).start();
        }

        // ✅ 使用原子类或同步
        private AtomicInteger safeCount = new AtomicInteger(0);
        safeCount.incrementAndGet();
    }

    // ❌ 陷阱5:忘记关闭线程池
    public void pitfall5() {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        executor.execute(() -> {
            // 任务
        });
        // ❌ 忘记关闭,程序不会退出

        // ✅ 正确关闭
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }

    private boolean someCondition() { return true; }
}

8.4 性能优化技巧

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class PerformanceOptimization {
    // 1. 使用 ThreadLocal 避免同步
    private static ThreadLocal<SimpleDateFormat> dateFormat =
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

    public String formatDate(Date date) {
        return dateFormat.get().format(date);  // 每个线程独立实例
    }

    // 2. 使用 LongAdder 替代 AtomicLong(高并发计数)
    private LongAdder counter = new LongAdder();  // 性能更好

    public void increment() {
        counter.increment();
    }

    public long getCount() {
        return counter.sum();
    }

    // 3. 减小锁粒度
    private Object lock1 = new Object();
    private Object lock2 = new Object();

    public void fineLock() {
        synchronized (lock1) {
            // 操作1
        }
        synchronized (lock2) {
            // 操作2(独立的锁)
        }
    }

    // 4. 使用读写锁
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private Map<String, String> cache = new HashMap<>();

    public String read(String key) {
        rwLock.readLock().lock();
        try {
            return cache.get(key);  // 多个读线程可并发
        } finally {
            rwLock.readLock().unlock();
        }
    }

    public void write(String key, String value) {
        rwLock.writeLock().lock();
        try {
            cache.put(key, value);  // 写操作独占
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    // 5. 批量操作减少锁竞争
    private BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    public void batchProcess() {
        List<String> batch = new ArrayList<>();
        queue.drainTo(batch, 100);  // 一次取出多个,减少锁竞争

        for (String item : batch) {
            // 处理
        }
    }
}

8.5 监控和调试

import java.lang.management.*;
import java.util.concurrent.*;

public class ThreadMonitoring {
    public static void main(String[] args) {
        // 1. 获取线程信息
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        System.out.println("当前线程数: " + threadMXBean.getThreadCount());
        System.out.println("峰值线程数: " + threadMXBean.getPeakThreadCount());
        System.out.println("总启动线程数: " + threadMXBean.getTotalStartedThreadCount());

        // 2. 检测死锁
        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
        if (deadlockedThreads != null) {
            System.out.println("检测到死锁,涉及线程数: " + deadlockedThreads.length);
            ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads);
            for (ThreadInfo info : threadInfos) {
                System.out.println("死锁线程: " + info.getThreadName());
            }
        }

        // 3. 监控线程池状态
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("核心线程数: " + executor.getCorePoolSize());
        System.out.println("最大线程数: " + executor.getMaximumPoolSize());
        System.out.println("当前线程数: " + executor.getPoolSize());
        System.out.println("队列大小: " + executor.getQueue().size());
        System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
        System.out.println("总任务数: " + executor.getTaskCount());

        // 4. 打印所有线程堆栈
        Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
        for (Map.Entry<Thread, StackTraceElement[]> entry : allThreads.entrySet()) {
            Thread thread = entry.getKey();
            System.out.println("\n线程: " + thread.getName() + ", 状态: " + thread.getState());
            for (StackTraceElement element : entry.getValue()) {
                System.out.println("  " + element);
            }
        }

        executor.shutdown();
    }
}

九、快速参考

线程创建选择

需要返回值?
├─ 是 → Callable
└─ 否 → 需要继承其他类?
         ├─ 是 → Runnable
         └─ 否 → Thread或Runnable

同步方式选择

场景推荐方式说明
简单同步synchronized简单易用
需要尝试获取锁Lock.tryLock()可超时
读多写少ReadWriteLock读并发
可见性保证volatile轻量级
原子操作AtomicXxx无锁
高并发计数LongAdder性能最好

线程池选择

场景线程池类型说明
固定任务数FixedThreadPool固定线程数
单线程顺序执行SingleThreadExecutor顺序执行
大量短任务CachedThreadPool动态扩展
定时任务ScheduledThreadPool定时调度
自定义需求ThreadPoolExecutor完全控制

并发工具选择

需求工具类使用场景
等待多个线程完成CountDownLatch主线程等待
多线程协同CyclicBarrier互相等待
限流Semaphore控制并发数
线程安全MapConcurrentHashMap高并发读写
线程安全ListCopyOnWriteArrayList读多写少
阻塞队列BlockingQueue生产消费
异步编程CompletableFuture链式调用

常用方法速查

Thread 类:

thread.start()              // 启动线程
thread.join()               // 等待线程结束
thread.interrupt()          // 中断线程
Thread.sleep(millis)        // 休眠
Thread.yield()              // 让出CPU
thread.setName(name)        // 设置名称
thread.setPriority(priority)// 设置优先级
thread.isAlive()            // 是否存活
thread.getState()           // 获取状态

Lock 接口:

lock.lock()                 // 获取锁
lock.unlock()               // 释放锁
lock.tryLock()              // 尝试获取锁
lock.tryLock(time, unit)    // 超时获取锁
lock.lockInterruptibly()    // 可中断获取锁

线程池:

executor.execute(runnable)  // 执行任务(无返回值)
executor.submit(callable)   // 提交任务(有返回值)
executor.shutdown()         // 优雅关闭
executor.shutdownNow()      // 立即关闭
executor.awaitTermination() // 等待终止

最佳实践清单

✅ 推荐做法:

  • 优先使用线程池而非直接创建线程
  • 给线程和线程池命名,便于调试
  • 使用有界队列防止内存溢出
  • 合理设置线程池大小(CPU密集型:N+1,IO密集型:2N)
  • 使用 volatile 保证可见性
  • 使用原子类实现线程安全计数
  • 使用 ThreadLocal 避免共享状态
  • 捕获线程异常,避免线程终止
  • 使用 try-finally 确保锁释放
  • 优先使用 JUC 工具类而非自己实现

❌ 避免做法:

  • 不要使用 Executors 创建线程池(可能OOM)
  • 不要在循环中创建大量线程
  • 不要忘记关闭线程池
  • 不要直接调用 run() 方法
  • 不要在持有锁时执行耗时操作
  • 不要使用 stop()suspend()resume()(已废弃)
  • 不要忽略 InterruptedException
  • 不要在 finally 块中使用 return

十、总结

本文档全面介绍了 Java 多线程编程的核心知识:

1. 线程基础

  • 进程与线程的区别
  • 并发与并行的概念
  • 多线程的优势和挑战

2. 创建线程

  • 继承 Thread 类
  • 实现 Runnable 接口
  • 实现 Callable 接口
  • Lambda 表达式创建
  • 实战案例:并行计算、文件处理、超时控制

3. 线程生命周期

  • 6种线程状态
  • 常用方法(start、join、sleep、interrupt)
  • sleep vs wait 的区别

4. 线程同步

  • synchronized 关键字(方法、代码块、静态)
  • Lock 接口(ReentrantLock、ReadWriteLock)
  • volatile 关键字
  • 原子类(AtomicInteger、LongAdder)
  • 死锁问题及解决方案
  • 线程安全的单例模式

5. 线程通信

  • wait/notify 机制
  • 生产者消费者模式

6. 线程池

  • 为什么使用线程池
  • 4种预定义线程池
  • 自定义线程池(ThreadPoolExecutor)
  • 线程池参数和拒绝策略
  • 实战案例:批量下载

7. 并发工具类

  • CountDownLatch:等待多个线程完成
  • CyclicBarrier:多线程协同
  • Semaphore:限流控制
  • 并发集合(ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue)
  • CompletableFuture:异步编程
  • 实战案例:并行处理大数据

8. 最佳实践

  • 线程池配置和大小计算
  • 异常处理(4种方式)
  • 避免常见陷阱(5个典型错误)
  • 性能优化技巧(ThreadLocal、LongAdder、读写锁)
  • 监控和调试(死锁检测、线程池监控)

关键要点

主题核心内容
线程创建优先使用 Runnable/Callable,避免继承 Thread
线程安全使用 synchronized、Lock、原子类保证安全
线程池手动创建 ThreadPoolExecutor,避免 Executors
并发工具善用 JUC 工具类,不要重复造轮子
性能优化ThreadLocal、LongAdder、读写锁、批量操作
异常处理必须捕获异常,避免线程意外终止
资源管理及时关闭线程池,使用 try-finally 释放锁

学习路径

  1. 入门阶段:掌握线程创建、生命周期、基本同步
  2. 进阶阶段:理解线程池、Lock、原子类、并发集合
  3. 高级阶段:掌握 CompletableFuture、性能优化、问题排查
  4. 实战阶段:在项目中应用,积累经验

推荐资源

下一步

  • 深入学习 JVM 内存模型(JMM)
  • 了解 happens-before 原则
  • 掌握无锁编程(CAS)
  • 学习分布式并发(分布式锁、消息队列)
  • 实践高并发系统设计

记住:并发编程是一门实践的艺术,需要在实际项目中不断积累经验。从简单开始,逐步深入,注重代码质量和性能优化。

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/m0_48502770/article/details/156058273

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--