首先Node的数据结构:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next; //并没有使用volatile 关键词进行修饰
Node(E x) { item = x; }
}
ps: 突然发现学习源码使用jdk7 比较好,没有8 的函数式风格的内容。更方便
LinkedBlockingQueue中的状态信息如下:
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0); //count 被final 关键字修饰。
/**
* Head of linked list.
* Invariant: head.item == null
*/
private transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); //确保放入的元素是非空的
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //响应中断
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await(); //具有阻塞性
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); //使用了signal() 提高性能, put的时候对notFull 条件队列中阻塞的线程进行唤醒。
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); // put 的时候, 会在c == 0 的情况下对 notEmpty 条件队列中阻塞的线程进行唤醒,当时这个需要获取takeLock锁。
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); //同样对放入队列中的元素进行非空检查
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false; //这里可以看出,如果队列满的情况下,直接返回false, 并不会把数据放入到队列中。 剩余的和put() 方法类似。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout); // 使用到了策略模式,TimeUnit的使用。
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); // 支持超时的操作 !!!!
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) //如果队列为空直接返回null,
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public boolean contains(Object o) {
if (o == null) return false;
fullyLock(); // 像contains(Object o) 和 remove(Object o) 这样的操作需要同时获取putLock 和 takeLock 这两个锁。
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
ps: 还有一些方法是在AbstractQueue中定义的,这里只是对LinkedBlockingQueue 中使用到的数据结构进行分析!
分享到:
相关推荐
linkedblockingqueue测试程序
LinkedBlockingQueue 首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
单向链表源码 博文链接:https://uule.iteye.com/blog/1561753
LinkedBlockingQueuejava.pdf
囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue(ArrayBlockingQueue、...
主要介绍了详细分析Java并发集合LinkedBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
JDK容器学习之Queue:LinkedBlockingQueue
NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943
并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
NULL 博文链接:https://kanpiaoxue.iteye.com/blog/2101309
bitset源码Java源码分析 基础集合列表 ArrayList (done) Vector (done) LinkedList (done) Stack (done) ReferenceQueue (done) ArrayDeque (done) Set HashSet (done) TreeSet (done) LinkedHashSet (done) BitSet ...
LinkedBlockingQueue LinkedBlockingDeque :scroll: 主要介绍LeetCode上面的算法译文,以及面试过程中遇到的实际编码问题总结。 :locked: :file_folder: :laptop: :globe_showing_Asia-Australia: :floppy_...
LinkedBlockingQueue :由容器/列表支持的有界阻塞队列 ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues 用法 非阻塞API queue , _ := NewArrayBlock
3:对线程池的基本使用及其部分源码的分析(注意:这里的源码分析是基于jdk1.6;) a:线程池的状态 volatile int runState; static final int RUNNING = 0; 运行状态 static final int SHUTDOWN = 1; 关闭状态;...
LinkedBlockingQueue以及redis队列写入mysql实例
java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和...