//用于创建同步等待队列节点 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
// 用于创建条件队列节点 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
节点入队列操作
private Node enq(final Node node) { for (;;) { Nodet= tail; if (t == null) { // Must initialize //假如这里设置失败说明已存在tail节点,下次循环获取到tail if (compareAndSetHead(newNode())) tail = head; } else { // 先设置当前入队列节点的前继节点为"Tail"节点 node.prev = t; //假如这里tail节点设置失败,说明,tail节点被其他线程更新了,所有继续循环直到成功为止. if (compareAndSetTail(t, node)) { //设置之前的tail节点的后继节点为新的tail节点 t.next = node; return t; } } } }
为当前线程和给定模式创建节点并将其入队。
/** * 为当前线程和给定模式创建节点并将其入队。 * * Creates and enqueues node for current thread and given mode. * * EXCLUSIVE:独占模式,SHARED:共享模式 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred= tail; //已存在tail节点 if (pred != null) { //设置当前节点的前继节点为tail节点 node.prev = pred; //设置tail节点为当前入队列节点,假如这里设置失败如何处理 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 用于上述快速入队列失败的后备操作,下面这个方法通过自旋,保证入队列成功 enq(node); return node; }
设置头节点
//将队列头设置为节点,从而出队。仅由acquire方法调用。为了GC和抑制不必要的信号和遍历,还会清空未使用的字段。 /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ privatevoidsetHead(Node node) { head = node; node.thread = null; node.prev = null; }
唤醒后继节点
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; //当前节点状态小于0,则把当前节点状态置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//找到实际的未被取消不为空的后继节点.从tail向head遍历 Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
取消Acquire
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ privatevoidcancelAcquire(Node node) { // Ignore if node doesn't exist // 节点为空,不执行操作 if (node == null) return;
//Help for GC node.thread = null;
// Skip cancelled predecessors Nodepred= node.prev; //找到未取消的前置节点 while (pred.waitStatus > 0){ // pre = pred.prev; // node.prev = pred; node.prev = pred = pred.prev; }
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. NodepredNext= pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads.
// 设置当前节点为已取消 node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Nodenext= node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } // 上面的unparkSuccessor 为什么向前遍历,估计就是因为这个,node.next=node. 如果正向遍历,可能就是死循环了 node.next = node; // help GC } }