手写线程安全阻塞队列:可重入锁+等待通知
请手写一个线程安全的阻塞队列(类似ArrayBlockingQueue / LinkedBlockingQueue),支持put(阻塞式入队)和take(阻塞式出队)操作。使用ReentrantLock + Condition实现生产者-消费者模式。解释为什么使用Condition而不是wait/notify。
回答
Yahuda
基于ReentrantLock + Condition的实现
public class MyBlockingQueue<E> {
private final Object[] items;
private int takeIndex; // 出队索引
private int putIndex; // 入队索引
private int count; // 当前元素数
private final ReentrantLock lock;
private final Condition notEmpty; // 队列非空条件(消费者等待)
private final Condition notFull; // 队列未满条件(生产者等待)
public MyBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
items = new Object[capacity];
lock = new ReentrantLock(false);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 阻塞入队
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
lock.lockInterruptibly(); // 可中断锁
try {
while (count == items.length) {
notFull.await(); // 队列满时等待
}
enqueue(e);
} finally {
lock.unlock();
}
}
// 阻塞出队
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0) {
notEmpty.await(); // 队列空时等待
}
return dequeue();
} finally {
lock.unlock();
}
}
// 非阻塞入队
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0) return false;
nanos = notFull.awaitNanos(nanos); // 带超时等待
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0; // 环形数组
count++;
notEmpty.signal(); // 唤醒一个等待的消费者
}
@SuppressWarnings("unchecked")
private E dequeue() {
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal(); // 唤醒一个等待的生产者
return e;
}
public int size() {
lock.lock();
try { return count; } finally { lock.unlock(); }
}
}
为什么用Condition而不是wait/notify
| 维度 | Condition | wait/notify |
|---|---|---|
| 精确唤醒 | 支持多个Condition(notEmpty/notFull分别唤醒) | 只能所有线程在同一个等待集上 |
| 安全性 | 与Lock配合,支持响应中断 | 必须配合synchronized |
| 性能 | 更高效(AQS队列管理) | 单一等待集效率低 |
| 功能 | 支持超时(awaitNanos)、公平等待 | 不支持超时 |
关键点:用notEmpty和notFull两个条件,避免了"signalAll导致所有线程被唤醒但只有部分能执行"的惊群效应。