相关文章
面试官:你手写过堵塞队列吗?
ArrayBlockingQueue讲解及源码解析
1.概述
自java5后,jdk增加了concurrent包,concurrent中的BlockingQueue,也就是堵塞队列,BlockingQueue只是一个接口,jdk为其提供了丰富的实现类,适用于不同的场景,这篇讲的是LinkedBlockingQueue。
2.简介
LinkedBlockingQueue继承了AbstractQueue类和实现了BlockingQueue接口,是一个基于内部链表的有界队列,如果初始化不设置大小,默认设置大小为Integer.MAX_VALUE(无界队列)。锁是基于ReentrantLock实现,和ArrayBlockingQueue不同的是,其有两个锁对象,这就可以实现生产/消费并行。
3.应用
- 在实际应用中,应该设置大小,否则变为无界队列,生产者速度大于消费者,则会导致内存溢出。
- 不支持null元素,否则报NullPointerException异常。
- LinkedBlockingQueue相较于ArrayBlockingQueue更适用于处理高并发,因为实现了锁分离,可以更快的存储数据,但是因为LinkedBlockingQueue多了Node,导致GC需要额外回收一个Node对象。
主要方法
1.插入数据
(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不堵塞)。
(2)offer(E e, long timeout, TimeUnit unit):可以设置等待时间,如果队列已满,则进行等待。超过等待时间,则返回false。
(3)put(E e):无返回值,一直等待,直至队列空出位置。
2.获取数据
(1)poll():如果有数据,出队,如果没有数据,返回null。
(2)poll(long timeout, TimeUnit unit):可以设置等待时间,如果没有数据,则等待,超过等待时间,则返回null。
(3)take():如果有数据,出队。如果没有数据,一直等待(堵塞)。
4.为什么采用链表?
LinkedBlockingQueue通过Node实现了链表存储,链表是可伸缩的,而数组大小是不可变的, 在处理大数据量时,如果不可伸缩,队列已满后,会进行堵塞,如果通过数组进行扩容,只能通过新数组的方式,对资源消耗大。
5.为什么采用单向链表?
因为队列是先进先出的,所以只需要知道下一个节点是谁就行。
6.源码解析
AbstractQueue
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
单向链表
//单向链表
static class Node<E> {
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
成员变量
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue采用的是双锁机制,可以并行执行。多线程环境下,必然存在并发问题,所以采用了AtomicInteger,可以实现原子操作,防止并发问题
构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//创建节点
Node<E> node = new Node<E>(e);
//获取写锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果队列已满
while (count.get() == capacity) {
//堵塞生产者线程
notFull.await();
}
//向队尾添加元素
enqueue(node);
//获取元素数量
c = count.getAndIncrement();
//如果还可以继续继续添加,唤醒下一个消费者线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果队列原来是空的,现在已经增加元素了,所以唤醒读线程
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒消费线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
1.当队列已满,会进行堵塞
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程
offer
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//判断元素是否为空
if (e == null) throw new NullPointerException();
//获取等待时间
long nanos = unit.toNanos(timeout);
int c = -1;
//获取写锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果队列已满
while (count.get() == capacity) {
//如果超过等待时间
if (nanos <= 0)
return false;
//生产线程堵塞nanos时间,也有可能被唤醒,如果超过nanos时间还未被唤醒,则nanos=0,再次循环,就会返回false
nanos = notFull.awaitNanos(nanos);
}
//向队尾添加元素
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//如果还可以继续继续添加,唤醒下一个生产者线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果队列原来是空的,现在已经增加元素了,所以唤醒读线程
if (c == 0)
signalNotEmpty();
return true;
}
public boolean offer(E e) {
//判断元素是否为空
if (e == null) throw new NullPointerException();
//判断队列是否已满
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//获取写锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
//写入元素
enqueue(node);
//获取当前队列大小
c = count.getAndIncrement();
//如果还可以继续继续添加,唤醒下一个生产者线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0,其实已经变为1了,所以唤醒消费线程
//TODO getAndIncrement()返回的是原值,incrementAndGet()返回的是原值+1
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒消费线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
offer方法有两种,一种是堵塞offer方法,另一种是不堵塞offer方法
堵塞offer方法
1.当队列已满,会根据设置的堵塞时间进行堵塞,如果超过堵塞时间,还未被唤醒,则返回false
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程
不堵塞offer方法
1.当队列已满 ,直接返回false
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程
take
public E take() throws InterruptedException {
E x;
int c = -1;
//获取队列元素数量
final AtomicInteger count = this.count;
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空
while (count.get() == 0) {
//堵塞读线程
notEmpty.await();
}
//从队尾获取元素
x = dequeue();
//获取数量
c = count.getAndDecrement();
//如果队列还有元素,唤醒下一个消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果原来队列是满的,唤醒生产者线程
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
1.当队列为空 ,进行堵塞,直至被唤醒
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程
poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
//获取等待时间
long nanos = unit.toNanos(timeout);
//获取队列元素数量
final AtomicInteger count = this.count;
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空
while (count.get() == 0) {
//如果超过等待时间
if (nanos <= 0)
return null;
//消费线程堵塞nanos时间,也有可能被唤醒,如果超过nanos时间还未被唤醒,则nanos=0,再次循环,就会返回false
nanos = notEmpty.awaitNanos(nanos);
}
//从队尾获取元素
x = dequeue();
c = count.getAndDecrement();
//如果队列还有元素,唤醒下一个消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果原来队列是满的,唤醒生产者线程
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
//获取队列元素数量
final AtomicInteger count = this.count;
//如果队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//如果队列元素数量大于0
if (count.get() > 0) {
//从队尾获取元素
x = dequeue();
c = count.getAndDecrement();
//如果队列还有元素,唤醒下一个消费者线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果原来队列是满的,唤醒生产者线程
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
poll方法有两种,一种是堵塞poll方法,另一种是不堵塞poll方法
堵塞poll方法
1.当队列为空 ,会根据设置的堵塞时间进行堵塞,如果超过堵塞时间,还未被唤醒,则返回null
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程
不堵塞poll方法
1.当队列为空 ,直接返回null
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程