JUC随笔

RenentrantReadWriteLock

       16 bit          16 bit

+------------------------------------+
|ReadLockCounts | WriteLockCounts |
+------------------------------------+

CountDownLatch 和 CyclicBarrier 区别

实现方式

  1. CountDownLatch 通过 共享锁实现

  2. CyclicBarrier 通过 ReentrantLock + 条件等待队列 实现;

用法

  1. CountDownLatch 一般用于某个线程等待多个线程到达某个执行点后,然后再接着执行。

  2. CyclicBarrier 则是多个线程相互等待直至所有线程都到达执行点,然后再接着执行.

  3. CountDownLatch不可以重用,而CyclicBarrier可以重用

AbstractQueuedSynchronizer

Node中的waitStatus说明

  • CANCELLED (1):

由于超时,导致节点被取消,或者节点中的线程被中断。
节点永远不会离开此状态。具体地说,具有已取消节点的线程永远不会再阻塞。

  • SIGNAL (-1):

此节点的后继节点被(或将很快)阻塞(通过park),因此当前节点在释放或取消时必须unpark其后继节点。为了避免竞争,acquire方法必须首先指示它们需要一个信号,然后重试原子acquire ,然后在失败时阻塞。

  • CONDITION (-2):

此节点当前处于条件队列中。在传输之前(Conditon被唤醒,重新加入同步队列),它不会用作同步队列节点,此时状态将设置为0。(此处使用此值与该字段的其他用途无关,但简化了机制。)

  • PROPAGATE (-3):

“releaseShared”应该传播到其他节点。这是在“doReleaseShared”中设置的(仅针对头节点),以确保传播继续,即使其他操作已经介入。

对于普通同步节点,该字段被初始化为0,对于条件节点,该字段被初始化为CONDITION。使用CAS修改它(或者在可能的情况下,无条件的volatile写操作)。

//用于创建同步等待队列节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 用于创建条件队列节点
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}

节点入队列操作

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//假如这里设置失败说明已存在tail节点,下次循环获取到tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 先设置当前入队列节点的前继节点为"Tail"节点
node.prev = t;
//假如这里tail节点设置失败,说明,tail节点被其他线程更新了,所有继续循环直到成功为止.
if (compareAndSetTail(t, node)) {
//设置之前的tail节点的后继节点为新的tail节点
t.next = node;
return t;
}
}
}
}

为当前线程和给定模式创建节点并将其入队。

/**
* 为当前线程和给定模式创建节点并将其入队。
*
* Creates and enqueues node for current thread and given mode.
*
* EXCLUSIVE:独占模式,SHARED:共享模式
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//已存在tail节点
if (pred != null) {
//设置当前节点的前继节点为tail节点
node.prev = pred;
//设置tail节点为当前入队列节点,假如这里设置失败如何处理
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 用于上述快速入队列失败的后备操作,下面这个方法通过自旋,保证入队列成功
enq(node);
return node;
}

设置头节点


//将队列头设置为节点,从而出队。仅由acquire方法调用。为了GC和抑制不必要的信号和遍历,还会清空未使用的字段。
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

唤醒后继节点

/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//当前节点状态小于0,则把当前节点状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/

//找到实际的未被取消不为空的后继节点.从tail向head遍历
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

取消Acquire


/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 节点为空,不执行操作
if (node == null)
return;

//Help for GC
node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
//找到未取消的前置节点
while (pred.waitStatus > 0){
// pre = pred.prev;
// node.prev = pred;
node.prev = pred = pred.prev;
}

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.

// 设置当前节点为已取消
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
// 上面的unparkSuccessor 为什么向前遍历,估计就是因为这个,node.next=node. 如果正向遍历,可能就是死循环了
node.next = node; // help GC
}
}