46
54
* @param <E> the type of elements held in this collection
49
56
public class LinkedTransferQueue<E> extends AbstractQueue<E>
50
57
implements TransferQueue<E>, java.io.Serializable {
51
58
private static final long serialVersionUID = -3223113410248163686L;
54
* This class extends the approach used in FIFO-mode
55
* SynchronousQueues. See the internal documentation, as well as
56
* the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
58
* (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
60
* The main extension is to provide different Wait modes for the
61
* main "xfer" method that puts or takes items. These don't
62
* impact the basic dual-queue logic, but instead control whether
63
* or how threads block upon insertion of request or data nodes
64
* into the dual queue. It also uses slightly different
65
* conventions for tracking whether nodes are off-list or
69
// Wait modes for xfer method
70
static final int NOWAIT = 0;
71
static final int TIMEOUT = 1;
72
static final int WAIT = 2;
74
/** The number of CPUs, for spin control */
75
static final int NCPUS = Runtime.getRuntime().availableProcessors();
78
* The number of times to spin before blocking in timed waits.
79
* The value is empirically derived -- it works well across a
80
* variety of processors and OSes. Empirically, the best value
81
* seems not to vary with number of CPUs (beyond 2) so is just
84
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
87
* The number of times to spin before blocking in untimed waits.
88
* This is greater than timed value because untimed waits spin
89
* faster since they don't need to check times on each spin.
91
static final int maxUntimedSpins = maxTimedSpins * 16;
94
* The number of nanoseconds for which it is faster to spin
95
* rather than to use timed park. A rough estimate suffices.
97
static final long spinForTimeoutThreshold = 1000L;
100
* Node class for LinkedTransferQueue. Opportunistically
101
* subclasses from AtomicReference to represent item. Uses Object,
102
* not E, to allow setting item to "this" after use, to avoid
103
* garbage retention. Similarly, setting the next field to this is
104
* used as sentinel that node is off list.
106
static final class QNode extends AtomicReference<Object> {
108
volatile Thread waiter; // to control park/unpark
109
final boolean isData;
110
QNode(Object item, boolean isData) {
61
* *** Overview of Dual Queues with Slack ***
63
* Dual Queues, introduced by Scherer and Scott
64
* (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
65
* (linked) queues in which nodes may represent either data or
66
* requests. When a thread tries to enqueue a data node, but
67
* encounters a request node, it instead "matches" and removes it;
68
* and vice versa for enqueuing requests. Blocking Dual Queues
69
* arrange that threads enqueuing unmatched requests block until
70
* other threads provide the match. Dual Synchronous Queues (see
71
* Scherer, Lea, & Scott
72
* http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
73
* additionally arrange that threads enqueuing unmatched data also
74
* block. Dual Transfer Queues support all of these modes, as
75
* dictated by callers.
77
* A FIFO dual queue may be implemented using a variation of the
78
* Michael & Scott (M&S) lock-free queue algorithm
79
* (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
80
* It maintains two pointer fields, "head", pointing to a
81
* (matched) node that in turn points to the first actual
82
* (unmatched) queue node (or null if empty); and "tail" that
83
* points to the last node on the queue (or again null if
84
* empty). For example, here is a possible queue with four data
90
* M -> U -> U -> U -> U
92
* The M&S queue algorithm is known to be prone to scalability and
93
* overhead limitations when maintaining (via CAS) these head and
94
* tail pointers. This has led to the development of
95
* contention-reducing variants such as elimination arrays (see
96
* Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
97
* optimistic back pointers (see Ladan-Mozes & Shavit
98
* http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
99
* However, the nature of dual queues enables a simpler tactic for
100
* improving M&S-style implementations when dual-ness is needed.
102
* In a dual queue, each node must atomically maintain its match
103
* status. While there are other possible variants, we implement
104
* this here as: for a data-mode node, matching entails CASing an
105
* "item" field from a non-null data value to null upon match, and
106
* vice-versa for request nodes, CASing from null to a data
107
* value. (Note that the linearization properties of this style of
108
* queue are easy to verify -- elements are made available by
109
* linking, and unavailable by matching.) Compared to plain M&S
110
* queues, this property of dual queues requires one additional
111
* successful atomic operation per enq/deq pair. But it also
112
* enables lower cost variants of queue maintenance mechanics. (A
113
* variation of this idea applies even for non-dual queues that
114
* support deletion of interior elements, such as
115
* j.u.c.ConcurrentLinkedQueue.)
117
* Once a node is matched, its match status can never again
118
* change. We may thus arrange that the linked list of them
119
* contain a prefix of zero or more matched nodes, followed by a
120
* suffix of zero or more unmatched nodes. (Note that we allow
121
* both the prefix and suffix to be zero length, which in turn
122
* means that we do not use a dummy header.) If we were not
123
* concerned with either time or space efficiency, we could
124
* correctly perform enqueue and dequeue operations by traversing
125
* from a pointer to the initial node; CASing the item of the
126
* first unmatched node on match and CASing the next field of the
127
* trailing node on appends. (Plus some special-casing when
128
* initially empty). While this would be a terrible idea in
129
* itself, it does have the benefit of not requiring ANY atomic
130
* updates on head/tail fields.
132
* We introduce here an approach that lies between the extremes of
133
* never versus always updating queue (head and tail) pointers.
134
* This offers a tradeoff between sometimes requiring extra
135
* traversal steps to locate the first and/or last unmatched
136
* nodes, versus the reduced overhead and contention of fewer
137
* updates to queue pointers. For example, a possible snapshot of
143
* M -> M -> U -> U -> U -> U
145
* The best value for this "slack" (the targeted maximum distance
146
* between the value of "head" and the first unmatched node, and
147
* similarly for "tail") is an empirical matter. We have found
148
* that using very small constants in the range of 1-3 work best
149
* over a range of platforms. Larger values introduce increasing
150
* costs of cache misses and risks of long traversal chains, while
151
* smaller values increase CAS contention and overhead.
153
* Dual queues with slack differ from plain M&S dual queues by
154
* virtue of only sometimes updating head or tail pointers when
155
* matching, appending, or even traversing nodes; in order to
156
* maintain a targeted slack. The idea of "sometimes" may be
157
* operationalized in several ways. The simplest is to use a
158
* per-operation counter incremented on each traversal step, and
159
* to try (via CAS) to update the associated queue pointer
160
* whenever the count exceeds a threshold. Another, that requires
161
* more overhead, is to use random number generators to update
162
* with a given probability per traversal step.
164
* In any strategy along these lines, because CASes updating
165
* fields may fail, the actual slack may exceed targeted
166
* slack. However, they may be retried at any time to maintain
167
* targets. Even when using very small slack values, this
168
* approach works well for dual queues because it allows all
169
* operations up to the point of matching or appending an item
170
* (hence potentially allowing progress by another thread) to be
171
* read-only, thus not introducing any further contention. As
172
* described below, we implement this by performing slack
173
* maintenance retries only after these points.
175
* As an accompaniment to such techniques, traversal overhead can
176
* be further reduced without increasing contention of head
177
* pointer updates: Threads may sometimes shortcut the "next" link
178
* path from the current "head" node to be closer to the currently
179
* known first unmatched node, and similarly for tail. Again, this
180
* may be triggered with using thresholds or randomization.
182
* These ideas must be further extended to avoid unbounded amounts
183
* of costly-to-reclaim garbage caused by the sequential "next"
184
* links of nodes starting at old forgotten head nodes: As first
185
* described in detail by Boehm
186
* (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
187
* delays noticing that any arbitrarily old node has become
188
* garbage, all newer dead nodes will also be unreclaimed.
189
* (Similar issues arise in non-GC environments.) To cope with
190
* this in our implementation, upon CASing to advance the head
191
* pointer, we set the "next" link of the previous head to point
192
* only to itself; thus limiting the length of connected dead lists.
193
* (We also take similar care to wipe out possibly garbage
194
* retaining values held in other Node fields.) However, doing so
195
* adds some further complexity to traversal: If any "next"
196
* pointer links to itself, it indicates that the current thread
197
* has lagged behind a head-update, and so the traversal must
198
* continue from the "head". Traversals trying to find the
199
* current tail starting from "tail" may also encounter
200
* self-links, in which case they also continue at "head".
202
* It is tempting in slack-based scheme to not even use CAS for
203
* updates (similarly to Ladan-Mozes & Shavit). However, this
204
* cannot be done for head updates under the above link-forgetting
205
* mechanics because an update may leave head at a detached node.
206
* And while direct writes are possible for tail updates, they
207
* increase the risk of long retraversals, and hence long garbage
208
* chains, which can be much more costly than is worthwhile
209
* considering that the cost difference of performing a CAS vs
210
* write is smaller when they are not triggered on each operation
211
* (especially considering that writes and CASes equally require
212
* additional GC bookkeeping ("write barriers") that are sometimes
213
* more costly than the writes themselves because of contention).
215
* *** Overview of implementation ***
217
* We use a threshold-based approach to updates, with a slack
218
* threshold of two -- that is, we update head/tail when the
219
* current pointer appears to be two or more steps away from the
220
* first/last node. The slack value is hard-wired: a path greater
221
* than one is naturally implemented by checking equality of
222
* traversal pointers except when the list has only one element,
223
* in which case we keep slack threshold at one. Avoiding tracking
224
* explicit counts across method calls slightly simplifies an
225
* already-messy implementation. Using randomization would
226
* probably work better if there were a low-quality dirt-cheap
227
* per-thread one available, but even ThreadLocalRandom is too
228
* heavy for these purposes.
230
* With such a small slack threshold value, it is not worthwhile
231
* to augment this with path short-circuiting (i.e., unsplicing
232
* interior nodes) except in the case of cancellation/removal (see
235
* We allow both the head and tail fields to be null before any
236
* nodes are enqueued; initializing upon first append. This
237
* simplifies some other logic, as well as providing more
238
* efficient explicit control paths instead of letting JVMs insert
239
* implicit NullPointerExceptions when they are null. While not
240
* currently fully implemented, we also leave open the possibility
241
* of re-nulling these fields when empty (which is complicated to
242
* arrange, for little benefit.)
244
* All enqueue/dequeue operations are handled by the single method
245
* "xfer" with parameters indicating whether to act as some form
246
* of offer, put, poll, take, or transfer (each possibly with
247
* timeout). The relative complexity of using one monolithic
248
* method outweighs the code bulk and maintenance problems of
249
* using separate methods for each case.
251
* Operation consists of up to three phases. The first is
252
* implemented within method xfer, the second in tryAppend, and
253
* the third in method awaitMatch.
255
* 1. Try to match an existing node
257
* Starting at head, skip already-matched nodes until finding
258
* an unmatched node of opposite mode, if one exists, in which
259
* case matching it and returning, also if necessary updating
260
* head to one past the matched node (or the node itself if the
261
* list has no other unmatched nodes). If the CAS misses, then
262
* a loop retries advancing head by two steps until either
263
* success or the slack is at most two. By requiring that each
264
* attempt advances head by two (if applicable), we ensure that
265
* the slack does not grow without bound. Traversals also check
266
* if the initial head is now off-list, in which case they
267
* start at the new head.
269
* If no candidates are found and the call was untimed
270
* poll/offer, (argument "how" is NOW) return.
272
* 2. Try to append a new node (method tryAppend)
274
* Starting at current tail pointer, find the actual last node
275
* and try to append a new node (or if head was null, establish
276
* the first node). Nodes can be appended only if their
277
* predecessors are either already matched or are of the same
278
* mode. If we detect otherwise, then a new node with opposite
279
* mode must have been appended during traversal, so we must
280
* restart at phase 1. The traversal and update steps are
281
* otherwise similar to phase 1: Retrying upon CAS misses and
282
* checking for staleness. In particular, if a self-link is
283
* encountered, then we can safely jump to a node on the list
284
* by continuing the traversal at current head.
286
* On successful append, if the call was ASYNC, return.
288
* 3. Await match or cancellation (method awaitMatch)
290
* Wait for another thread to match node; instead cancelling if
291
* the current thread was interrupted or the wait timed out. On
292
* multiprocessors, we use front-of-queue spinning: If a node
293
* appears to be the first unmatched node in the queue, it
294
* spins a bit before blocking. In either case, before blocking
295
* it tries to unsplice any nodes between the current "head"
296
* and the first unmatched node.
298
* Front-of-queue spinning vastly improves performance of
299
* heavily contended queues. And so long as it is relatively
300
* brief and "quiet", spinning does not much impact performance
301
* of less-contended queues. During spins threads check their
302
* interrupt status and generate a thread-local random number
303
* to decide to occasionally perform a Thread.yield. While
304
* yield has underdefined specs, we assume that it might help,
305
* and will not hurt, in limiting impact of spinning on busy
306
* systems. We also use smaller (1/2) spins for nodes that are
307
* not known to be front but whose predecessors have not
308
* blocked -- these "chained" spins avoid artifacts of
309
* front-of-queue rules which otherwise lead to alternating
310
* nodes spinning vs blocking. Further, front threads that
311
* represent phase changes (from data to request node or vice
312
* versa) compared to their predecessors receive additional
313
* chained spins, reflecting longer paths typically required to
314
* unblock threads during phase changes.
317
* ** Unlinking removed interior nodes **
319
* In addition to minimizing garbage retention via self-linking
320
* described above, we also unlink removed interior nodes. These
321
* may arise due to timed out or interrupted waits, or calls to
322
* remove(x) or Iterator.remove. Normally, given a node that was
323
* at one time known to be the predecessor of some node s that is
324
* to be removed, we can unsplice s by CASing the next field of
325
* its predecessor if it still points to s (otherwise s must
326
* already have been removed or is now offlist). But there are two
327
* situations in which we cannot guarantee to make node s
328
* unreachable in this way: (1) If s is the trailing node of list
329
* (i.e., with null next), then it is pinned as the target node
330
* for appends, so can only be removed later after other nodes are
331
* appended. (2) We cannot necessarily unlink s given a
332
* predecessor node that is matched (including the case of being
333
* cancelled): the predecessor may already be unspliced, in which
334
* case some previous reachable node may still point to s.
335
* (For further explanation see Herlihy & Shavit "The Art of
336
* Multiprocessor Programming" chapter 9). Although, in both
337
* cases, we can rule out the need for further action if either s
338
* or its predecessor are (or can be made to be) at, or fall off
339
* from, the head of list.
341
* Without taking these into account, it would be possible for an
342
* unbounded number of supposedly removed nodes to remain
343
* reachable. Situations leading to such buildup are uncommon but
344
* can occur in practice; for example when a series of short timed
345
* calls to poll repeatedly time out but never otherwise fall off
346
* the list because of an untimed call to take at the front of the
349
* When these cases arise, rather than always retraversing the
350
* entire list to find an actual predecessor to unlink (which
351
* won't help for case (1) anyway), we record a conservative
352
* estimate of possible unsplice failures (in "sweepVotes").
353
* We trigger a full sweep when the estimate exceeds a threshold
354
* ("SWEEP_THRESHOLD") indicating the maximum number of estimated
355
* removal failures to tolerate before sweeping through, unlinking
356
* cancelled nodes that were not unlinked upon initial removal.
357
* We perform sweeps by the thread hitting threshold (rather than
358
* background threads or by spreading work to other threads)
359
* because in the main contexts in which removal occurs, the
360
* caller is already timed-out, cancelled, or performing a
361
* potentially O(n) operation (e.g. remove(x)), none of which are
362
* time-critical enough to warrant the overhead that alternatives
363
* would impose on other threads.
365
* Because the sweepVotes estimate is conservative, and because
366
* nodes become unlinked "naturally" as they fall off the head of
367
* the queue, and because we allow votes to accumulate even while
368
* sweeps are in progress, there are typically significantly fewer
369
* such nodes than estimated. Choice of a threshold value
370
* balances the likelihood of wasted effort and contention, versus
371
* providing a worst-case bound on retention of interior nodes in
372
* quiescent queues. The value defined below was chosen
373
* empirically to balance these under various timeout scenarios.
375
* Note that we cannot self-link unlinked interior nodes during
376
* sweeps. However, the associated garbage chains terminate when
377
* some successor ultimately falls off the head of the list and is
381
/** True if on multiprocessor */
382
private static final boolean MP =
383
Runtime.getRuntime().availableProcessors() > 1;
386
* The number of times to spin (with randomly interspersed calls
387
* to Thread.yield) on multiprocessor before blocking when a node
388
* is apparently the first waiter in the queue. See above for
389
* explanation. Must be a power of two. The value is empirically
390
* derived -- it works pretty well across a variety of processors,
391
* numbers of CPUs, and OSes.
393
private static final int FRONT_SPINS = 1 << 7;
396
* The number of times to spin before blocking when a node is
397
* preceded by another node that is apparently spinning. Also
398
* serves as an increment to FRONT_SPINS on phase changes, and as
399
* base average frequency for yielding during spins. Must be a
402
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
405
* The maximum number of estimated removal failures (sweepVotes)
406
* to tolerate before sweeping through the queue unlinking
407
* cancelled nodes that were not unlinked upon initial
408
* removal. See above for explanation. The value must be at least
409
* two to avoid useless sweeps when removing trailing nodes.
411
static final int SWEEP_THRESHOLD = 32;
414
* Queue nodes. Uses Object, not E, for items to allow forgetting
415
* them after use. Relies heavily on Unsafe mechanics to minimize
416
* unnecessary ordering constraints: Writes that are intrinsically
417
* ordered wrt other accesses or CASes use simple relaxed forms.
419
static final class Node {
420
final boolean isData; // false if this is a request node
421
volatile Object item; // initially non-null if isData; CASed to match
423
volatile Thread waiter; // null until waiting
425
// CAS methods for fields
426
final boolean casNext(Node cmp, Node val) {
427
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
430
final boolean casItem(Object cmp, Object val) {
431
// assert cmp == null || cmp.getClass() != Node.class;
432
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
436
* Constructs a new node. Uses relaxed write because item can
437
* only be seen after publication via casNext.
439
Node(Object item, boolean isData) {
440
UNSAFE.putObject(this, itemOffset, item); // relaxed write
112
441
this.isData = isData;
115
static final AtomicReferenceFieldUpdater<QNode, QNode>
116
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117
(QNode.class, QNode.class, "next");
119
final boolean casNext(QNode cmp, QNode val) {
120
return nextUpdater.compareAndSet(this, cmp, val);
123
final void clearNext() {
124
nextUpdater.lazySet(this, this);
130
* Padded version of AtomicReference used for head, tail and
131
* cleanMe, to alleviate contention across threads CASing one vs
134
static final class PaddedAtomicReference<T> extends AtomicReference<T> {
135
// enough padding for 64bytes with 4byte refs
136
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
137
PaddedAtomicReference(T r) { super(r); }
141
/** head of the queue */
142
private transient final PaddedAtomicReference<QNode> head;
143
/** tail of the queue */
144
private transient final PaddedAtomicReference<QNode> tail;
147
* Reference to a cancelled node that might not yet have been
148
* unlinked from queue because it was the last inserted node
151
private transient final PaddedAtomicReference<QNode> cleanMe;
154
* Tries to cas nh as new head; if successful, unlink
155
* old head's next node to avoid garbage retention.
157
private boolean advanceHead(QNode h, QNode nh) {
158
if (h == head.get() && head.compareAndSet(h, nh)) {
159
h.clearNext(); // forget old next
166
* Puts or takes an item. Used for most queue operations (except
167
* poll() and tryTransfer()). See the similar code in
168
* SynchronousQueue for detailed explanation.
445
* Links node to itself to avoid garbage retention. Called
446
* only after CASing head field, so uses relaxed write.
448
final void forgetNext() {
449
UNSAFE.putObject(this, nextOffset, this);
453
* Sets item to self and waiter to null, to avoid garbage
454
* retention after matching or cancelling. Uses relaxed writes
455
* because order is already constrained in the only calling
456
* contexts: item is forgotten only after volatile/atomic
457
* mechanics that extract items. Similarly, clearing waiter
458
* follows either CAS or return from park (if ever parked;
459
* else we don't care).
461
final void forgetContents() {
462
UNSAFE.putObject(this, itemOffset, this);
463
UNSAFE.putObject(this, waiterOffset, null);
467
* Returns true if this node has been matched, including the
468
* case of artificial matches due to cancellation.
470
final boolean isMatched() {
472
return (x == this) || ((x == null) == isData);
476
* Returns true if this is an unmatched request node.
478
final boolean isUnmatchedRequest() {
479
return !isData && item == null;
483
* Returns true if a node with the given mode cannot be
484
* appended to this node because this node is unmatched and
485
* has opposite data mode.
487
final boolean cannotPrecede(boolean haveData) {
490
return d != haveData && (x = item) != this && (x != null) == d;
494
* Tries to artificially match a data node -- used by remove.
496
final boolean tryMatchData() {
499
if (x != null && x != this && casItem(x, null)) {
500
LockSupport.unpark(waiter);
506
private static final long serialVersionUID = -3375979862319811754L;
509
private static final sun.misc.Unsafe UNSAFE;
510
private static final long itemOffset;
511
private static final long nextOffset;
512
private static final long waiterOffset;
515
UNSAFE = getUnsafe();
516
Class<?> k = Node.class;
517
itemOffset = UNSAFE.objectFieldOffset
518
(k.getDeclaredField("item"));
519
nextOffset = UNSAFE.objectFieldOffset
520
(k.getDeclaredField("next"));
521
waiterOffset = UNSAFE.objectFieldOffset
522
(k.getDeclaredField("waiter"));
523
} catch (Exception e) {
529
/** head of the queue; null until first enqueue */
530
transient volatile Node head;
532
/** tail of the queue; null until first append */
533
private transient volatile Node tail;
535
/** The number of apparent failures to unsplice removed nodes */
536
private transient volatile int sweepVotes;
538
// CAS methods for fields
539
private boolean casTail(Node cmp, Node val) {
540
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
543
private boolean casHead(Node cmp, Node val) {
544
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
547
private boolean casSweepVotes(int cmp, int val) {
548
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
552
* Possible values for "how" argument in xfer method.
554
private static final int NOW = 0; // for untimed poll, tryTransfer
555
private static final int ASYNC = 1; // for offer, put, add
556
private static final int SYNC = 2; // for transfer, take
557
private static final int TIMED = 3; // for timed poll, tryTransfer
559
@SuppressWarnings("unchecked")
560
static <E> E cast(Object item) {
561
// assert item == null || item.getClass() != Node.class;
566
* Implements all queuing methods. See above for explanation.
170
* @param e the item or if null, signifies that this is a take
171
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
172
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
173
* @return an item, or null on failure
568
* @param e the item or null for take
569
* @param haveData true if this is a put, else a take
570
* @param how NOW, ASYNC, SYNC, or TIMED
571
* @param nanos timeout in nanosecs, used only if mode is TIMED
572
* @return an item if matched, else e
573
* @throws NullPointerException if haveData mode but e is null
175
private Object xfer(Object e, int mode, long nanos) {
176
boolean isData = (e != null);
178
final PaddedAtomicReference<QNode> head = this.head;
179
final PaddedAtomicReference<QNode> tail = this.tail;
182
QNode t = tail.get();
183
QNode h = head.get();
185
if (t != null && (t == h || t.isData == isData)) {
575
private E xfer(E e, boolean haveData, int how, long nanos) {
576
if (haveData && (e == null))
577
throw new NullPointerException();
578
Node s = null; // the node to append, if needed
581
for (;;) { // restart on append race
583
for (Node h = head, p = h; p != null;) { // find & match first node
584
boolean isData = p.isData;
585
Object item = p.item;
586
if (item != p && (item != null) == isData) { // unmatched
587
if (isData == haveData) // can't match
589
if (p.casItem(item, e)) { // match
590
for (Node q = p; q != h;) {
591
Node n = q.next; // update by 2 unless singleton
592
if (head == h && casHead(h, n == null ? q : n)) {
595
} // advance and retry
596
if ((h = head) == null ||
597
(q = h.next) == null || !q.isMatched())
598
break; // unless slack < 2
600
LockSupport.unpark(p.waiter);
601
return LinkedTransferQueue.<E>cast(item);
605
p = (p != n) ? n : (h = head); // Use head if p offlist
608
if (how != NOW) { // No matches available
187
s = new QNode(e, isData);
191
tail.compareAndSet(t, last);
193
else if (t.casNext(null, s)) {
194
tail.compareAndSet(t, s);
195
return awaitFulfill(t, s, e, mode, nanos);
199
else if (h != null) {
200
QNode first = h.next;
201
if (t == tail.get() && first != null &&
202
advanceHead(h, first)) {
203
Object x = first.get();
204
if (x != first && first.compareAndSet(x, e)) {
205
LockSupport.unpark(first.waiter);
206
return isData? e : x;
610
s = new Node(e, haveData);
611
Node pred = tryAppend(s, haveData);
613
continue retry; // lost race vs opposite mode
615
return awaitMatch(s, pred, e, (how == TIMED), nanos);
617
return e; // not waiting
215
* Version of xfer for poll() and tryTransfer, which
216
* simplifies control paths both here and in xfer.
622
* Tries to append node s as tail.
624
* @param s the node to append
625
* @param haveData true if appending in data mode
626
* @return null on failure due to losing race with append in
627
* different mode, else s's predecessor, or s itself if no
218
private Object fulfill(Object e) {
219
boolean isData = (e != null);
220
final PaddedAtomicReference<QNode> head = this.head;
221
final PaddedAtomicReference<QNode> tail = this.tail;
224
QNode t = tail.get();
225
QNode h = head.get();
227
if (t != null && (t == h || t.isData == isData)) {
229
if (t == tail.get()) {
231
tail.compareAndSet(t, last);
630
private Node tryAppend(Node s, boolean haveData) {
631
for (Node t = tail, p = t;;) { // move p to last node and append
632
Node n, u; // temps for reads of next & tail
633
if (p == null && (p = head) == null) {
634
if (casHead(null, s))
635
return s; // initialize
236
else if (h != null) {
237
QNode first = h.next;
238
if (t == tail.get() &&
240
advanceHead(h, first)) {
241
Object x = first.get();
242
if (x != first && first.compareAndSet(x, e)) {
243
LockSupport.unpark(first.waiter);
244
return isData? e : x;
637
else if (p.cannotPrecede(haveData))
638
return null; // lost race vs opposite mode
639
else if ((n = p.next) != null) // not last; keep traversing
640
p = p != t && t != (u = tail) ? (t = u) : // stale tail
641
(p != n) ? n : null; // restart if off list
642
else if (!p.casNext(null, s))
643
p = p.next; // re-read on CAS failure
645
if (p != t) { // update if slack now >= 2
646
while ((tail != t || !casTail(t, s)) &&
647
(t = tail) != null &&
648
(s = t.next) != null && // advance and retry
649
(s = s.next) != null && s != t);
252
* Spins/blocks until node s is fulfilled or caller gives up,
253
* depending on wait mode.
657
* Spins/yields/blocks until node s is matched or caller gives up.
255
* @param pred the predecessor of waiting node
256
659
* @param s the waiting node
660
* @param pred the predecessor of s, or s itself if it has no
661
* predecessor, or null if unknown (the null case does not occur
662
* in any current calls but may in possible future extensions)
257
663
* @param e the comparison value for checking match
259
* @param nanos timeout value
260
* @return matched item, or s if cancelled
664
* @param timed if true, wait only until timeout elapses
665
* @param nanos timeout in nanosecs, used only if timed is true
666
* @return matched item, or e if unmatched on interrupt or timeout
262
private Object awaitFulfill(QNode pred, QNode s, Object e,
263
int mode, long nanos) {
267
long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
668
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
669
long lastTime = timed ? System.nanoTime() : 0L;
268
670
Thread w = Thread.currentThread();
269
int spins = -1; // set to desired spin count below
671
int spins = -1; // initialized after first item and cancel checks
672
ThreadLocalRandom randomYields = null; // bound if needed
271
if (w.isInterrupted())
272
s.compareAndSet(e, s);
274
if (x != e) { // Node was matched or cancelled
275
advanceHead(pred, s); // unlink if head
276
if (x == s) { // was cancelled
280
else if (x != null) {
281
s.set(s); // avoid garbage retention
287
if (mode == TIMEOUT) {
675
Object item = s.item;
676
if (item != e) { // matched
678
s.forgetContents(); // avoid garbage
679
return LinkedTransferQueue.<E>cast(item);
681
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
682
s.casItem(e, s)) { // cancel
687
if (spins < 0) { // establish spins at/near front
688
if ((spins = spinsFor(pred, s.isData)) > 0)
689
randomYields = ThreadLocalRandom.current();
691
else if (spins > 0) { // spin
693
if (randomYields.nextInt(CHAINED_SPINS) == 0)
694
Thread.yield(); // occasionally yield
696
else if (s.waiter == null) {
697
s.waiter = w; // request unpark then recheck
288
700
long now = System.nanoTime();
289
nanos -= now - lastTime;
701
if ((nanos -= now - lastTime) > 0)
702
LockSupport.parkNanos(this, nanos);
292
s.compareAndSet(e, s); // try to cancel
706
LockSupport.park(this);
712
* Returns spin/yield value for a node with given predecessor and
713
* data mode. See above for explanation.
715
private static int spinsFor(Node pred, boolean haveData) {
716
if (MP && pred != null) {
717
if (pred.isData != haveData) // phase change
718
return FRONT_SPINS + CHAINED_SPINS;
719
if (pred.isMatched()) // probably at front
721
if (pred.waiter == null) // pred apparently spinning
722
return CHAINED_SPINS;
727
/* -------------- Traversal methods -------------- */
730
* Returns the successor of p, or the head node if p.next has been
731
* linked to self, which will only be true if traversing with a
732
* stale pointer that is now off the list.
734
final Node succ(Node p) {
736
return (p == next) ? head : next;
740
* Returns the first unmatched node of the given mode, or null if
741
* none. Used by methods isEmpty, hasWaitingConsumer.
743
private Node firstOfMode(boolean isData) {
744
for (Node p = head; p != null; p = succ(p)) {
746
return (p.isData == isData) ? p : null;
752
* Returns the item in the first unmatched node with isData; or
753
* null if none. Used by peek.
755
private E firstDataItem() {
756
for (Node p = head; p != null; p = succ(p)) {
757
Object item = p.item;
759
if (item != null && item != p)
760
return LinkedTransferQueue.<E>cast(item);
762
else if (item == null)
769
* Traverses and counts unmatched nodes of the given mode.
770
* Used by methods size and getWaitingConsumerCount.
772
private int countOfMode(boolean data) {
774
for (Node p = head; p != null; ) {
775
if (!p.isMatched()) {
776
if (p.isData != data)
778
if (++count == Integer.MAX_VALUE) // saturated
792
final class Itr implements Iterator<E> {
793
private Node nextNode; // next node to return item for
794
private E nextItem; // the corresponding item
795
private Node lastRet; // last returned node, to support remove
796
private Node lastPred; // predecessor to unlink lastRet
799
* Moves to next node after prev, or first node if prev null.
801
private void advance(Node prev) {
803
* To track and avoid buildup of deleted nodes in the face
804
* of calls to both Queue.remove and Itr.remove, we must
805
* include variants of unsplice and sweep upon each
806
* advance: Upon Itr.remove, we may need to catch up links
807
* from lastPred, and upon other removes, we might need to
808
* skip ahead from stale nodes and unsplice deleted ones
809
* found while advancing.
812
Node r, b; // reset lastPred upon possible deletion of lastRet
813
if ((r = lastRet) != null && !r.isMatched())
814
lastPred = r; // next lastPred is old lastRet
815
else if ((b = lastPred) == null || b.isMatched())
816
lastPred = null; // at start of list
818
Node s, n; // help with removal of lastPred.next
819
while ((s = b.next) != null &&
820
s != b && s.isMatched() &&
821
(n = s.next) != null && n != s)
827
for (Node p = prev, s, n;;) {
828
s = (p == null) ? head : p.next;
297
QNode h = head.get(); // only spin if at head
298
spins = ((h != null && h.next == s) ?
300
maxTimedSpins : maxUntimedSpins) : 0);
304
else if (s.waiter == null)
306
else if (mode != TIMEOUT) {
307
LockSupport.park(this);
311
else if (nanos > spinForTimeoutThreshold) {
312
LockSupport.parkNanos(this, nanos);
320
* Returns validated tail for use in cleaning methods.
322
private QNode getValidatedTail() {
324
QNode h = head.get();
325
QNode first = h.next;
326
if (first != null && first.next == first) { // help advance
327
advanceHead(h, first);
330
QNode t = tail.get();
332
if (t == tail.get()) {
334
tail.compareAndSet(t, last); // help advance
835
Object item = s.item;
837
if (item != null && item != s) {
838
nextItem = LinkedTransferQueue.<E>cast(item);
843
else if (item == null)
845
// assert s.isMatched();
848
else if ((n = s.next) == null)
342
* Gets rid of cancelled node s with original predecessor pred.
344
* @param pred predecessor of cancelled node
345
* @param s the cancelled node
347
private void clean(QNode pred, QNode s) {
349
if (w != null) { // Wake up thread
351
if (w != Thread.currentThread())
352
LockSupport.unpark(w);
359
* At any given time, exactly one node on list cannot be
360
* deleted -- the last inserted node. To accommodate this, if
361
* we cannot delete s, we save its predecessor as "cleanMe",
362
* processing the previously saved version first. At least one
363
* of node s or the node previously saved can always be
364
* processed, so this always terminates.
366
while (pred.next == s) {
367
QNode oldpred = reclean(); // First, help get rid of cleanMe
368
QNode t = getValidatedTail();
369
if (s != t) { // If not tail, try to unsplice
370
QNode sn = s.next; // s.next == s means s already off list
371
if (sn == s || pred.casNext(s, sn))
374
else if (oldpred == pred || // Already saved
375
(oldpred == null && cleanMe.compareAndSet(null, pred)))
376
break; // Postpone cleaning
381
* Tries to unsplice the cancelled node held in cleanMe that was
382
* previously uncleanable because it was at tail.
384
* @return current cleanMe node (or null)
386
private QNode reclean() {
388
* cleanMe is, or at one time was, predecessor of cancelled
389
* node s that was the tail so could not be unspliced. If s
390
* is no longer the tail, try to unsplice if necessary and
391
* make cleanMe slot available. This differs from similar
392
* code in clean() because we must check that pred still
393
* points to a cancelled node that must be unspliced -- if
394
* not, we can (must) clear cleanMe without unsplicing.
395
* This can loop only due to contention on casNext or
399
while ((pred = cleanMe.get()) != null) {
400
QNode t = getValidatedTail();
404
if (s == null || s == pred || s.get() != s ||
405
(sn = s.next) == s || pred.casNext(s, sn))
406
cleanMe.compareAndSet(pred, null);
408
else // s is still tail; cannot clean
863
public final boolean hasNext() {
864
return nextNode != null;
867
public final E next() {
869
if (p == null) throw new NoSuchElementException();
875
public final void remove() {
876
final Node lastRet = this.lastRet;
878
throw new IllegalStateException();
880
if (lastRet.tryMatchData())
881
unsplice(lastPred, lastRet);
885
/* -------------- Removal methods -------------- */
888
* Unsplices (now or later) the given deleted/cancelled node with
889
* the given predecessor.
891
* @param pred a node that was at one time known to be the
892
* predecessor of s, or null or s itself if s is/was at head
893
* @param s the node to be unspliced
895
final void unsplice(Node pred, Node s) {
896
s.forgetContents(); // forget unneeded fields
898
* See above for rationale. Briefly: if pred still points to
899
* s, try to unlink s. If s cannot be unlinked, because it is
900
* trailing node or pred might be unlinked, and neither pred
901
* nor s are head or offlist, add to sweepVotes, and if enough
902
* votes have accumulated, sweep.
904
if (pred != null && pred != s && pred.next == s) {
907
(n != s && pred.casNext(s, n) && pred.isMatched())) {
908
for (;;) { // check if at, or could be, head
910
if (h == pred || h == s || h == null)
911
return; // at head or list empty
917
if (hn != h && casHead(h, hn))
918
h.forgetNext(); // advance head
920
if (pred.next != pred && s.next != s) { // recheck if offlist
921
for (;;) { // sweep now if enough votes
923
if (v < SWEEP_THRESHOLD) {
924
if (casSweepVotes(v, v + 1))
927
else if (casSweepVotes(v, 0)) {
938
* Unlinks matched (typically cancelled) nodes encountered in a
939
* traversal from head.
941
private void sweep() {
942
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
944
// Unmatched nodes are never self-linked
946
else if ((n = s.next) == null) // trailing node is pinned
948
else if (s == n) // stale
949
// No need to also check for p == s, since that implies s == n
957
* Main implementation of remove(Object)
959
private boolean findAndRemove(Object e) {
961
for (Node pred = null, p = head; p != null; ) {
962
Object item = p.item;
964
if (item != null && item != p && e.equals(item) &&
970
else if (item == null)
973
if ((p = p.next) == pred) { // stale
415
984
* Creates an initially empty {@code LinkedTransferQueue}.
417
986
public LinkedTransferQueue() {
418
QNode dummy = new QNode(null, false);
419
head = new PaddedAtomicReference<QNode>(dummy);
420
tail = new PaddedAtomicReference<QNode>(dummy);
421
cleanMe = new PaddedAtomicReference<QNode>(null);