LinkedBlockingQueue面试中你被问到过吗?

   日期:2020-11-13     浏览:101    评论:0    
核心提示:1.概述自java5后,jdk增加了concurrent包,concurrent中的BlockingQueue,也就是堵塞队列,BlockingQueue只是一个接口,jdk为其提供了丰富的实现类,适用于不同的场景,这篇讲的是LinkedBlockingQueue。2.简介LinkedBlockingQueue继承了AbstractQueue类和实现了BlockingQueue接口,是一个基于内部链表的有界队列,如果初始化不设置大小,默认设置大小为Integer.MAX_VALUE(无界队列)。锁是基

相关文章

面试官:你手写过堵塞队列吗?
ArrayBlockingQueue讲解及源码解析

1.概述

自java5后,jdk增加了concurrent包,concurrent中的BlockingQueue,也就是堵塞队列,BlockingQueue只是一个接口,jdk为其提供了丰富的实现类,适用于不同的场景,这篇讲的是LinkedBlockingQueue。

2.简介

LinkedBlockingQueue继承了AbstractQueue类和实现了BlockingQueue接口,是一个基于内部链表的有界队列,如果初始化不设置大小,默认设置大小为Integer.MAX_VALUE(无界队列)。锁是基于ReentrantLock实现,和ArrayBlockingQueue不同的是,其有两个锁对象,这就可以实现生产/消费并行。

3.应用

  1. 在实际应用中,应该设置大小,否则变为无界队列,生产者速度大于消费者,则会导致内存溢出。
  2. 不支持null元素,否则报NullPointerException异常。
  3. LinkedBlockingQueue相较于ArrayBlockingQueue更适用于处理高并发,因为实现了锁分离,可以更快的存储数据,但是因为LinkedBlockingQueue多了Node,导致GC需要额外回收一个Node对象。

主要方法


1.插入数据
(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不堵塞)。
(2)offer(E e, long timeout, TimeUnit unit):可以设置等待时间,如果队列已满,则进行等待。超过等待时间,则返回false。
(3)put(E e):无返回值,一直等待,直至队列空出位置。

2.获取数据
(1)poll():如果有数据,出队,如果没有数据,返回null。
(2)poll(long timeout, TimeUnit unit):可以设置等待时间,如果没有数据,则等待,超过等待时间,则返回null。
(3)take():如果有数据,出队。如果没有数据,一直等待(堵塞)。

4.为什么采用链表?

LinkedBlockingQueue通过Node实现了链表存储,链表是可伸缩的,而数组大小是不可变的, 在处理大数据量时,如果不可伸缩,队列已满后,会进行堵塞,如果通过数组进行扩容,只能通过新数组的方式,对资源消耗大。

5.为什么采用单向链表?

因为队列是先进先出的,所以只需要知道下一个节点是谁就行。

6.源码解析

AbstractQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

单向链表

	//单向链表
    static class Node<E> { 
        E item;

        
        Node<E> next;

        Node(E x) { 
            item = x;
        }
    }

成员变量

    
    private final int capacity;

    
    private final AtomicInteger count = new AtomicInteger();

    
    transient Node<E> head;

    
    private transient Node<E> last;

    
    private final ReentrantLock takeLock = new ReentrantLock();

    
    private final Condition notEmpty = takeLock.newCondition();

    
    private final ReentrantLock putLock = new ReentrantLock();

    
    private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue采用的是双锁机制,可以并行执行。多线程环境下,必然存在并发问题,所以采用了AtomicInteger,可以实现原子操作,防止并发问题

构造函数

	
    public LinkedBlockingQueue() { 
        this(Integer.MAX_VALUE);
    }

    
    public LinkedBlockingQueue(int capacity) { 
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    
    public LinkedBlockingQueue(Collection<? extends E> c) { 
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try { 
            int n = 0;
            for (E e : c) { 
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally { 
            putLock.unlock();
        }
    }

put


    public void put(E e) throws InterruptedException { 
        if (e == null) throw new NullPointerException();

        int c = -1;
        //创建节点
        Node<E> node = new Node<E>(e);
        //获取写锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 

            //如果队列已满
            while (count.get() == capacity) { 
                //堵塞生产者线程
                notFull.await();
            }
            //向队尾添加元素
            enqueue(node);
            //获取元素数量
            c = count.getAndIncrement();
            //如果还可以继续继续添加,唤醒下一个消费者线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally { 
            putLock.unlock();
        }
        //如果队列原来是空的,现在已经增加元素了,所以唤醒读线程
        if (c == 0)
            signalNotEmpty();
    }

	
    private void signalNotEmpty() { 
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try { 
            //唤醒消费线程
            notEmpty.signal();
        } finally { 
            takeLock.unlock();
        }
    }
	
	
    private void enqueue(Node<E> node) { 
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

1.当队列已满,会进行堵塞
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程

offer

	public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException { 
        //判断元素是否为空
        if (e == null) throw new NullPointerException();
        //获取等待时间
        long nanos = unit.toNanos(timeout);
        int c = -1;
        //获取写锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
            //如果队列已满
            while (count.get() == capacity) { 
                //如果超过等待时间
                if (nanos <= 0)
                    return false;
                //生产线程堵塞nanos时间,也有可能被唤醒,如果超过nanos时间还未被唤醒,则nanos=0,再次循环,就会返回false
                nanos = notFull.awaitNanos(nanos);
            }
            //向队尾添加元素
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            //如果还可以继续继续添加,唤醒下一个生产者线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally { 
            putLock.unlock();
        }
        //如果队列原来是空的,现在已经增加元素了,所以唤醒读线程
        if (c == 0)
            signalNotEmpty();
        return true;
    }


    public boolean offer(E e) { 
        //判断元素是否为空
        if (e == null) throw new NullPointerException();

        //判断队列是否已满
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        //获取写锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();

        try { 
            if (count.get() < capacity) { 
                //写入元素
                enqueue(node);
                //获取当前队列大小
                c = count.getAndIncrement();
                //如果还可以继续继续添加,唤醒下一个生产者线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally { 
            putLock.unlock();
        }
        //c==0,其实已经变为1了,所以唤醒消费线程
        //TODO getAndIncrement()返回的是原值,incrementAndGet()返回的是原值+1
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
	
    private void signalNotEmpty() { 
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try { 
            //唤醒消费线程
            notEmpty.signal();
        } finally { 
            takeLock.unlock();
        }
    }
    
    
    private void enqueue(Node<E> node) { 
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

offer方法有两种,一种是堵塞offer方法,另一种是不堵塞offer方法
堵塞offer方法
1.当队列已满,会根据设置的堵塞时间进行堵塞,如果超过堵塞时间,还未被唤醒,则返回false
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程

不堵塞offer方法
1.当队列已满 ,直接返回false
2.添加完元素后,如果队列还可以继续添加,则唤醒下一个生产者线程
3.如果队列原来是空的,说明读线程堵塞了,现在已经增加元素了,所以唤醒读线程

take

	public E take() throws InterruptedException { 
        E x;
        int c = -1;
        //获取队列元素数量
        final AtomicInteger count = this.count;
        //获取读锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try { 
            //如果队列为空
            while (count.get() == 0) { 
                //堵塞读线程
                notEmpty.await();
            }
            //从队尾获取元素
            x = dequeue();
            //获取数量
            c = count.getAndDecrement();
            //如果队列还有元素,唤醒下一个消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally { 
            takeLock.unlock();
        }
        //如果原来队列是满的,唤醒生产者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
	
    private void signalNotFull() { 
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try { 
            notFull.signal();
        } finally { 
            putLock.unlock();
        }
    }
    
	
    private E dequeue() { 
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

1.当队列为空 ,进行堵塞,直至被唤醒
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程

poll

	public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        E x = null;
        int c = -1;
        //获取等待时间
        long nanos = unit.toNanos(timeout);
        //获取队列元素数量
        final AtomicInteger count = this.count;
        //获取读锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try { 
            //如果队列为空
            while (count.get() == 0) { 
                //如果超过等待时间
                if (nanos <= 0)
                    return null;
                //消费线程堵塞nanos时间,也有可能被唤醒,如果超过nanos时间还未被唤醒,则nanos=0,再次循环,就会返回false
                nanos = notEmpty.awaitNanos(nanos);
            }
            //从队尾获取元素
            x = dequeue();
            c = count.getAndDecrement();
            //如果队列还有元素,唤醒下一个消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally { 
            takeLock.unlock();
        }
        //如果原来队列是满的,唤醒生产者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll() { 
        //获取队列元素数量
        final AtomicInteger count = this.count;
        //如果队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //获取读锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try { 
            //如果队列元素数量大于0
            if (count.get() > 0) { 
                //从队尾获取元素
                x = dequeue();
                c = count.getAndDecrement();
                //如果队列还有元素,唤醒下一个消费者线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally { 
            takeLock.unlock();
        }
        //如果原来队列是满的,唤醒生产者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

	
    private void signalNotFull() { 
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try { 
            notFull.signal();
        } finally { 
            putLock.unlock();
        }
    }

	
    private E dequeue() { 
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

	

poll方法有两种,一种是堵塞poll方法,另一种是不堵塞poll方法
堵塞poll方法
1.当队列为空 ,会根据设置的堵塞时间进行堵塞,如果超过堵塞时间,还未被唤醒,则返回null
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程

不堵塞poll方法
1.当队列为空 ,直接返回null
2.获取到元素后,如果队列还可以继续获取,则唤醒下一个消费者线程
3.如果队列原来是满的,说明写线程堵塞了,现在已经获取元素,队列可以继续添加元素,所以唤醒写线程

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服