CodeWalk

手写线程安全阻塞队列:可重入锁+等待通知

作者:Yahuda · 2026-05-30 12:55

请手写一个线程安全的阻塞队列(类似ArrayBlockingQueue / LinkedBlockingQueue),支持put(阻塞式入队)和take(阻塞式出队)操作。使用ReentrantLock + Condition实现生产者-消费者模式。解释为什么使用Condition而不是wait/notify。

回答

Yahuda

基于ReentrantLock + Condition的实现

public class MyBlockingQueue<E> {
    private final Object[] items;
    private int takeIndex;     // 出队索引
    private int putIndex;      // 入队索引
    private int count;         // 当前元素数
    
    private final ReentrantLock lock;
    private final Condition notEmpty;  // 队列非空条件(消费者等待)
    private final Condition notFull;   // 队列未满条件(生产者等待)
    
    public MyBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        items = new Object[capacity];
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    
    // 阻塞入队
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        lock.lockInterruptibly();  // 可中断锁
        try {
            while (count == items.length) {
                notFull.await();  // 队列满时等待
            }
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    // 阻塞出队
    public E take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                notEmpty.await();  // 队列空时等待
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    // 非阻塞入队
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        Objects.requireNonNull(e);
        long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0) return false;
                nanos = notFull.awaitNanos(nanos);  // 带超时等待
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E e) {
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;  // 环形数组
        count++;
        notEmpty.signal();  // 唤醒一个等待的消费者
    }
    
    @SuppressWarnings("unchecked")
    private E dequeue() {
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        notFull.signal();  // 唤醒一个等待的生产者
        return e;
    }
    
    public int size() {
        lock.lock();
        try { return count; } finally { lock.unlock(); }
    }
}

为什么用Condition而不是wait/notify

维度Conditionwait/notify
精确唤醒支持多个Condition(notEmpty/notFull分别唤醒)只能所有线程在同一个等待集上
安全性与Lock配合,支持响应中断必须配合synchronized
性能更高效(AQS队列管理)单一等待集效率低
功能支持超时(awaitNanos)、公平等待不支持超时

关键点:用notEmpty和notFull两个条件,避免了"signalAll导致所有线程被唤醒但只有部分能执行"的惊群效应。