一、ReentranLock工具类
Java官方在早期jdk1.5版本就引入了ReentranLock并发工具类,这是一种可重入的独占锁(排它锁),它允许同一个线程多次获取同一个锁而不会被阻塞。相比synchronized,它提供了更灵活的控制能力,如可中断锁、可超时获取锁、条件变量等,用于解决高并发场景下需要灵活控制的业务场景。
下面是使用ReentranLock的演示代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo { private static final ReentrantLock lock = new ReentrantLock(); private static int count = 0;
public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { threads[i] = new Thread(() -> { for (int j = 0; j < 1000; j++) { increment(); } }); threads[i].start(); } for (Thread thread : threads) { thread.join(); } System.out.println("最终计数:" + count); }
private static void increment() { lock.lock(); try { count++; reentrantMethod(); } finally { lock.unlock(); } }
private static void reentrantMethod() { lock.lock(); try { System.out.println("重入方法执行"); } finally { lock.unlock(); } } }
|
公平锁和非公平锁
ReentranLock支持公平锁和非公平锁两种模式,使用方式非常简单:
- 公平锁:线程在获取锁的时候,按照等待的先后顺序获取锁
- 非公平锁:线程在获取锁的时候,不按照等待的先后顺序获取锁,而是随机获取锁。ReentranLock默认是非公平锁
1 2
| ReentranLock lock = new ReentranLock(); ReentranLock lock = new ReentranLock(true);
|
可重入锁
可重入锁又名递归锁,是指同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象需要是同一个对象),不会因为之前已经获取过还没释放而阻塞。Java中ReentranLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他方法、锁嵌套等场景中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class Counter{ private final ReentranLock lock = new ReentranLock();
public void recursiveCall(int num){ lock.lock(); try{ if(num == 0){ return ; } System.out.println("执行递归,num = " + num); recursiveCall(num - 1); } finally{ lock.unlock(); } }
public static void main(String[] args){ Counter test = new Counter(); test.recursiveCall(10); } }
|
结合Condition实现生产者消费者
java.util.concurrent类库中提供Condition类来实现线程之间的协调/调用Condition.await()方法使线程等待,其他线程调用Condition.singal()或Condition.singnalAll()方法唤醒等待的线程。
注意:调用Condition的await()和signal()方法,都必须在lock保护之内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo { static class Queue { private Object[] items; private int size = 0; private int putIndex = 0; private int takeIndex = 0; private ReentrantLock lock; private Condition notEmpty; private Condition notFull;
public Queue(int capacity) { this.items = new Object[capacity]; lock = new ReentrantLock(); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public void put(Object value) throws InterruptedException { lock.lock(); try { while (size == items.length) { notFull.await(); } items[putIndex] = value; if (++putIndex == items.length) { putIndex = 0; } size++; notEmpty.signal(); System.out.println("producer 生产:" + value); } finally { lock.unlock(); } }
public Object take() throws InterruptedException { lock.lock(); try { while (size == 0) { notEmpty.await(); } Object value = items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) { takeIndex = 0; } size--; notFull.signal(); return value; } finally { lock.unlock(); } } }
static class Producer implements Runnable { private Queue queue;
public Producer(Queue queue) { this.queue = queue; }
@Override public void run() { try { while (true) { Thread.sleep(1000); queue.put(new Random().nextInt(1000)); } } catch (InterruptedException e) { e.printStackTrace(); } } }
static class Consumer implements Runnable { private Queue queue;
public Consumer(Queue queue) { this.queue = queue; }
@Override public void run() { try { while (true) { Thread.sleep(1000); System.out.println("consumer 消费:" + queue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public static void main(String[] args) { Queue queue = new Queue(5); new Thread(new Producer(queue)).start(); new Thread(new Consumer(queue)).start(); } }
|
二、Semaphore 信号量
用于控制同时访问某个资源的线程数量
应用场景:
- 限流:可以用于限制对共享资源的并发访问数量,以控制系统的流量
- 资源池:可以用于实现资源池,以维护一组有限的共享资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class SemaphoreDemo{ private static Semaphore semaphore = new Semaphore(2); private static Executor executor = new Executor(10); public static void main(String[] args) { for(int i=0;i<10;i++){ executor.execute(()->getProductInfo()); } } public static String getProductInfo(){ try { semaphore.acquire(); log.info("请求服务"); Thread.sleep(2000); } catch(InterruptedException e) { throw new RuntimeException(e); } finally { semaphore.release; } return "返回商品详情信息"; } public static String getProductInfo2(){ if(!semaphore.tryAcquire()){ log.error("请求被流控了"); return "请求被流控了"; } try { log.info("请求服务"); Thread.sleep(2000); } catch(InterruptedException e) { throw new RuntimeException(e); } finally { semaphore.release; } return "返回商品详情信息"; } }
|
三、CountDownLatch 闭锁
同步协助类,允许一个或多个线程等待,直到其他线程完成操作集
核心方法说明
- CountDownLatch(int count):构造方法,初始化计数器计数器值。
- countDown():计数器减 1(线程执行完后调用)。
- await():当前线程阻塞,直到计数器变为 0。
- await(long timeout, TimeUnit unit):带超时时间的等待,超时后即使计数器未到 0 也会继续执行。
- CountDownLatch 的计数器是一次性的,一旦计数器变为 0,再次调用 countDown() 也不会有任何效果。
应用场景:
- 百米赛跑,学生考试统一交卷,商品详情数据汇总。
- 并行任务同步:可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下一步操作。
- 多任务汇总:可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
- 资源初始化:可以用于等待资源的初始化完成,以便在资源初始化后开始使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo2 { public static void main(String[] args) throws InterruptedException { int threadCount = 3; CountDownLatch startSignal = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) { final int runnerId = i + 1; new Thread(() -> { try { System.out.println("选手 " + runnerId + " 准备就绪,等待发令..."); startSignal.await(); System.out.println("选手 " + runnerId + " 开始跑步!"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
Thread.sleep(3000); System.out.println("主线程:各就各位位,预备——跑!"); startSignal.countDown(); } }
|
四、CyclicBarrier 回环栅栏/循环屏障
实现让一组线程等待至某个状态(屏障点)之后再全部同时执行,而且可以被重复使用
核心方法说明:
- CyclicBarrier(int parties):构造方法,指定需要等待的线程数量(parties)。
- CyclicBarrier(int parties, Runnable barrierAction):指定等待线程数 + 屏障动作(所有线程到达后执行)。
- await():当前线程到达屏障并等待,直到所有线程到达或被中断。
- await(long timeout, TimeUnit unit):带超时的等待,超时后抛出 TimeoutException。
- reset():重置屏障计数器,让其可以重新使用(未到达的线程会收到 BrokenBarrierException)。
- getNumberWaiting():获取当前已到达屏障的线程数。
- isBroken():判断屏障是否被打破(如线程中断、超时等)。
应用场景:
- 批量数据处理,人满发车
- 多线程任务:可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操作
- 数据处理:可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo2 { public static void main(String[] args) { int runnerCount = 2; CyclicBarrier barrier = new CyclicBarrier(runnerCount, () -> { System.out.println("===== 本轮轮比赛开始! ====="); });
for (int i = 0; i < runnerCount; i++) { final int runnerId = i + 1; new Thread(() -> { try { for (int round = 1; round <= 3; round++) { System.out.println("第" + round + "轮:运动员" + runnerId + "准备中..."); Thread.sleep((long) (Math.random() * 1000)); System.out.println("第" + round + "轮:运动员" + runnerId + "已就位");
barrier.await();
System.out.println("第" + round + "轮:运动员" + runnerId + "冲刺!"); Thread.sleep(500); } } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
|
五、Exchanger 数据交换器
用于线程间协作的工具类,用于两个线程间交换数据
核心方法说明:
- Exchanger():构造方法,创建一个用于交换数据的同步器。
- exchange(V x):当前线程携带数据 x 到达交换点,阻塞等待另一个线程,交换数据后返回对方的数据。
- exchange(V x, long timeout, TimeUnit unit):带超时时间的交换,超时未完成则抛出 TimeoutException。
应用场景:
- 交易场景(一手交钱,一手交货),对账场景
- 数据交换:在多线程环境中,两个线程可以通过Exchanger进行数据交换
- 数据采集:在数据采集系统中,可以使用Exchanger在采集线程和处理线程间进行数据交换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class ExchangerDemo2 { public static void main(String[] args) { Exchanger<Integer> exchanger = new Exchanger<>();
new Thread(() -> { try { int data = 100; System.out.println("线程A准备交换:" + data + "(最多等3秒)"); Integer received = exchanger.exchange(data, 3, TimeUnit.SECONDS); System.out.println("线程A收到交换的数据:" + received); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { System.out.println("线程A:等待超时,未完成交换"); } }).start();
new Thread(() -> { try { Thread.sleep(5000); int data = 200; System.out.println("线程B准备交换:" + data); Integer received = exchanger.exchange(data); System.out.println("线程B收到交换的数据:" + received); } catch (InterruptedException e) { System.out.println("线程B:交换失败(对方已超时)"); } }).start(); } }
|
六、Phaser 阶段协同器
CyclicBarrier 和 CountDownLatch的进化版,管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行
核心方法:
- Phaser(int parties) 构造方法,指定初始参与线程数(parties)
- register() 注册 1 个新线程参与(返回当前阶段号)
- bulkRegister(int parties) 批量注册多个线程(返回当前阶段号)
- arriveAndAwaitAdvance() 当前线程完成当前阶段,等待其他线程,所有线程到达后进入下一阶段
- arriveAndDeregister() 线程完成当前阶段并注销(不再参与后续阶段),返回当前阶段号
- arrive() 线程完成当前阶段但不等待(用于无需阻塞的场景)
- getPhase() 获取当前阶段号(从 0 开始)
- getRegisteredParties() 获取当前注册的线程数
- isTerminated() 判断是否已终止(onAdvance() 返回 true 后为 true)
- onAdvance(int phase, int parties) 回调方法,阶段切换时执行,返回 true 表示终止,false 继续下一阶段
应用场景:
- 多线程任务分配
- 多级任务流程
- 模拟并行计算
- 阶段性任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import java.util.concurrent.Phaser;
public class PhaserDemo1 { public static void main(String[] args) { int threadCount = 3; Phaser phaser = new Phaser(threadCount) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("\n===== 阶段" + phase + "完成,共" + registeredParties + "个线程参与 ====="); return phase >= 2 || registeredParties == 0; } };
for (int i = 0; i < threadCount; i++) { final int threadId = i + 1; new Thread(() -> { for (int phase = 0; !phaser.isTerminated(); phase++) { System.out.println("线程" + threadId + " 正在执行阶段" + phase); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } phaser.arriveAndAwaitAdvance(); } System.out.println("线程" + threadId + " 所有阶段完成"); }).start(); } } }
|