~ubuntu-branches/debian/sid/scala/sid

« back to all changes in this revision

Viewing changes to src/forkjoin/scala/concurrent/forkjoin/LinkedTransferQueue.java

  • Committer: Package Import Robot
  • Author(s): Emmanuel Bourg, Mehdi Dogguy, Lucas Satabin, Frank S. Thomas, Emmanuel Bourg
  • Date: 2015-06-05 23:52:59 UTC
  • mfrom: (1.2.11)
  • Revision ID: package-import@ubuntu.com-20150605235259-wk00vgk83dh8o19g
Tags: 2.10.5-1
* Team upload.

[ Mehdi Dogguy ]
* New upstream release (Closes: #744278).

[ Lucas Satabin ]
* Update patches
* Update the clean target
* Update paths of elements to install
* Update watch file

[ Frank S. Thomas ]
* Remove myself from Uploaders.

[ Emmanuel Bourg ]
* The package has been adopted by the Java Team (Closes: #754935)
* Patched the build to avoid downloading libraries from the Internet
* Replaced the minified JavaScript files with unobfuscated ones
* No longer build scala-partest.jar until diffutils is packaged or replaced
* debian/watch: Fixed the versions matched (x.y.z instead of x.y.z..z)
* debian/rules:
  - Added the missing get-orig-source target (Closes: #724704)
  - Improved the clean target
* debian/control:
  - Build depend on scala (>= 2.10) and bnd
  - Use canonical URLs for the Vcs-* fields
  - Standards-Version updated to 3.9.6 (no changes)
* Switch to debhelper level 9

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
2
 * Written by Doug Lea with assistance from members of JCP JSR-166
3
3
 * Expert Group and released to the public domain, as explained at
4
 
 * http://creativecommons.org/licenses/publicdomain
 
4
 * http://creativecommons.org/publicdomain/zero/1.0/
5
5
 */
6
6
 
7
7
package scala.concurrent.forkjoin;
8
 
import java.util.concurrent.*;
9
 
import java.util.concurrent.locks.*;
10
 
import java.util.concurrent.atomic.*;
11
 
import java.util.*;
12
 
import java.io.*;
13
 
import sun.misc.Unsafe;
14
 
import java.lang.reflect.*;
 
8
 
 
9
import java.util.AbstractQueue;
 
10
import java.util.Collection;
 
11
import java.util.Iterator;
 
12
import java.util.NoSuchElementException;
 
13
import java.util.Queue;
 
14
import java.util.concurrent.TimeUnit;
 
15
import java.util.concurrent.locks.LockSupport;
15
16
 
16
17
/**
17
 
 * An unbounded {@linkplain TransferQueue} based on linked nodes.
 
18
 * An unbounded {@link TransferQueue} based on linked nodes.
18
19
 * This queue orders elements FIFO (first-in-first-out) with respect
19
20
 * to any given producer.  The <em>head</em> of the queue is that
20
21
 * element that has been on the queue the longest time for some
21
22
 * producer.  The <em>tail</em> of the queue is that element that has
22
23
 * been on the queue the shortest time for some producer.
23
24
 *
24
 
 * <p>Beware that, unlike in most collections, the {@code size}
25
 
 * method is <em>NOT</em> a constant-time operation. Because of the
 
25
 * <p>Beware that, unlike in most collections, the {@code size} method
 
26
 * is <em>NOT</em> a constant-time operation. Because of the
26
27
 * asynchronous nature of these queues, determining the current number
27
 
 * of elements requires a traversal of the elements.
 
28
 * of elements requires a traversal of the elements, and so may report
 
29
 * inaccurate results if this collection is modified during traversal.
 
30
 * Additionally, the bulk operations {@code addAll},
 
31
 * {@code removeAll}, {@code retainAll}, {@code containsAll},
 
32
 * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
 
33
 * to be performed atomically. For example, an iterator operating
 
34
 * concurrently with an {@code addAll} operation might view only some
 
35
 * of the added elements.
28
36
 *
29
37
 * <p>This class and its iterator implement all of the
30
38
 * <em>optional</em> methods of the {@link Collection} and {@link
44
52
 * @since 1.7
45
53
 * @author Doug Lea
46
54
 * @param <E> the type of elements held in this collection
47
 
 *
48
55
 */
49
56
public class LinkedTransferQueue<E> extends AbstractQueue<E>
50
57
    implements TransferQueue<E>, java.io.Serializable {
51
58
    private static final long serialVersionUID = -3223113410248163686L;
52
59
 
53
60
    /*
54
 
     * This class extends the approach used in FIFO-mode
55
 
     * SynchronousQueues. See the internal documentation, as well as
56
 
     * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57
 
     * Lea & Scott
58
 
     * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59
 
     *
60
 
     * The main extension is to provide different Wait modes for the
61
 
     * main "xfer" method that puts or takes items.  These don't
62
 
     * impact the basic dual-queue logic, but instead control whether
63
 
     * or how threads block upon insertion of request or data nodes
64
 
     * into the dual queue. It also uses slightly different
65
 
     * conventions for tracking whether nodes are off-list or
66
 
     * cancelled.
67
 
     */
68
 
 
69
 
    // Wait modes for xfer method
70
 
    static final int NOWAIT  = 0;
71
 
    static final int TIMEOUT = 1;
72
 
    static final int WAIT    = 2;
73
 
 
74
 
    /** The number of CPUs, for spin control */
75
 
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
76
 
 
77
 
    /**
78
 
     * The number of times to spin before blocking in timed waits.
79
 
     * The value is empirically derived -- it works well across a
80
 
     * variety of processors and OSes. Empirically, the best value
81
 
     * seems not to vary with number of CPUs (beyond 2) so is just
82
 
     * a constant.
83
 
     */
84
 
    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85
 
 
86
 
    /**
87
 
     * The number of times to spin before blocking in untimed waits.
88
 
     * This is greater than timed value because untimed waits spin
89
 
     * faster since they don't need to check times on each spin.
90
 
     */
91
 
    static final int maxUntimedSpins = maxTimedSpins * 16;
92
 
 
93
 
    /**
94
 
     * The number of nanoseconds for which it is faster to spin
95
 
     * rather than to use timed park. A rough estimate suffices.
96
 
     */
97
 
    static final long spinForTimeoutThreshold = 1000L;
98
 
 
99
 
    /**
100
 
     * Node class for LinkedTransferQueue. Opportunistically
101
 
     * subclasses from AtomicReference to represent item. Uses Object,
102
 
     * not E, to allow setting item to "this" after use, to avoid
103
 
     * garbage retention. Similarly, setting the next field to this is
104
 
     * used as sentinel that node is off list.
105
 
     */
106
 
    static final class QNode extends AtomicReference<Object> {
107
 
        volatile QNode next;
108
 
        volatile Thread waiter;       // to control park/unpark
109
 
        final boolean isData;
110
 
        QNode(Object item, boolean isData) {
111
 
            super(item);
 
61
     * *** Overview of Dual Queues with Slack ***
 
62
     *
 
63
     * Dual Queues, introduced by Scherer and Scott
 
64
     * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
 
65
     * (linked) queues in which nodes may represent either data or
 
66
     * requests.  When a thread tries to enqueue a data node, but
 
67
     * encounters a request node, it instead "matches" and removes it;
 
68
     * and vice versa for enqueuing requests. Blocking Dual Queues
 
69
     * arrange that threads enqueuing unmatched requests block until
 
70
     * other threads provide the match. Dual Synchronous Queues (see
 
71
     * Scherer, Lea, & Scott
 
72
     * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
 
73
     * additionally arrange that threads enqueuing unmatched data also
 
74
     * block.  Dual Transfer Queues support all of these modes, as
 
75
     * dictated by callers.
 
76
     *
 
77
     * A FIFO dual queue may be implemented using a variation of the
 
78
     * Michael & Scott (M&S) lock-free queue algorithm
 
79
     * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
 
80
     * It maintains two pointer fields, "head", pointing to a
 
81
     * (matched) node that in turn points to the first actual
 
82
     * (unmatched) queue node (or null if empty); and "tail" that
 
83
     * points to the last node on the queue (or again null if
 
84
     * empty). For example, here is a possible queue with four data
 
85
     * elements:
 
86
     *
 
87
     *  head                tail
 
88
     *    |                   |
 
89
     *    v                   v
 
90
     *    M -> U -> U -> U -> U
 
91
     *
 
92
     * The M&S queue algorithm is known to be prone to scalability and
 
93
     * overhead limitations when maintaining (via CAS) these head and
 
94
     * tail pointers. This has led to the development of
 
95
     * contention-reducing variants such as elimination arrays (see
 
96
     * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
 
97
     * optimistic back pointers (see Ladan-Mozes & Shavit
 
98
     * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
 
99
     * However, the nature of dual queues enables a simpler tactic for
 
100
     * improving M&S-style implementations when dual-ness is needed.
 
101
     *
 
102
     * In a dual queue, each node must atomically maintain its match
 
103
     * status. While there are other possible variants, we implement
 
104
     * this here as: for a data-mode node, matching entails CASing an
 
105
     * "item" field from a non-null data value to null upon match, and
 
106
     * vice-versa for request nodes, CASing from null to a data
 
107
     * value. (Note that the linearization properties of this style of
 
108
     * queue are easy to verify -- elements are made available by
 
109
     * linking, and unavailable by matching.) Compared to plain M&S
 
110
     * queues, this property of dual queues requires one additional
 
111
     * successful atomic operation per enq/deq pair. But it also
 
112
     * enables lower cost variants of queue maintenance mechanics. (A
 
113
     * variation of this idea applies even for non-dual queues that
 
114
     * support deletion of interior elements, such as
 
115
     * j.u.c.ConcurrentLinkedQueue.)
 
116
     *
 
117
     * Once a node is matched, its match status can never again
 
118
     * change.  We may thus arrange that the linked list of them
 
119
     * contain a prefix of zero or more matched nodes, followed by a
 
120
     * suffix of zero or more unmatched nodes. (Note that we allow
 
121
     * both the prefix and suffix to be zero length, which in turn
 
122
     * means that we do not use a dummy header.)  If we were not
 
123
     * concerned with either time or space efficiency, we could
 
124
     * correctly perform enqueue and dequeue operations by traversing
 
125
     * from a pointer to the initial node; CASing the item of the
 
126
     * first unmatched node on match and CASing the next field of the
 
127
     * trailing node on appends. (Plus some special-casing when
 
128
     * initially empty).  While this would be a terrible idea in
 
129
     * itself, it does have the benefit of not requiring ANY atomic
 
130
     * updates on head/tail fields.
 
131
     *
 
132
     * We introduce here an approach that lies between the extremes of
 
133
     * never versus always updating queue (head and tail) pointers.
 
134
     * This offers a tradeoff between sometimes requiring extra
 
135
     * traversal steps to locate the first and/or last unmatched
 
136
     * nodes, versus the reduced overhead and contention of fewer
 
137
     * updates to queue pointers. For example, a possible snapshot of
 
138
     * a queue is:
 
139
     *
 
140
     *  head           tail
 
141
     *    |              |
 
142
     *    v              v
 
143
     *    M -> M -> U -> U -> U -> U
 
144
     *
 
145
     * The best value for this "slack" (the targeted maximum distance
 
146
     * between the value of "head" and the first unmatched node, and
 
147
     * similarly for "tail") is an empirical matter. We have found
 
148
     * that using very small constants in the range of 1-3 work best
 
149
     * over a range of platforms. Larger values introduce increasing
 
150
     * costs of cache misses and risks of long traversal chains, while
 
151
     * smaller values increase CAS contention and overhead.
 
152
     *
 
153
     * Dual queues with slack differ from plain M&S dual queues by
 
154
     * virtue of only sometimes updating head or tail pointers when
 
155
     * matching, appending, or even traversing nodes; in order to
 
156
     * maintain a targeted slack.  The idea of "sometimes" may be
 
157
     * operationalized in several ways. The simplest is to use a
 
158
     * per-operation counter incremented on each traversal step, and
 
159
     * to try (via CAS) to update the associated queue pointer
 
160
     * whenever the count exceeds a threshold. Another, that requires
 
161
     * more overhead, is to use random number generators to update
 
162
     * with a given probability per traversal step.
 
163
     *
 
164
     * In any strategy along these lines, because CASes updating
 
165
     * fields may fail, the actual slack may exceed targeted
 
166
     * slack. However, they may be retried at any time to maintain
 
167
     * targets.  Even when using very small slack values, this
 
168
     * approach works well for dual queues because it allows all
 
169
     * operations up to the point of matching or appending an item
 
170
     * (hence potentially allowing progress by another thread) to be
 
171
     * read-only, thus not introducing any further contention. As
 
172
     * described below, we implement this by performing slack
 
173
     * maintenance retries only after these points.
 
174
     *
 
175
     * As an accompaniment to such techniques, traversal overhead can
 
176
     * be further reduced without increasing contention of head
 
177
     * pointer updates: Threads may sometimes shortcut the "next" link
 
178
     * path from the current "head" node to be closer to the currently
 
179
     * known first unmatched node, and similarly for tail. Again, this
 
180
     * may be triggered with using thresholds or randomization.
 
181
     *
 
182
     * These ideas must be further extended to avoid unbounded amounts
 
183
     * of costly-to-reclaim garbage caused by the sequential "next"
 
184
     * links of nodes starting at old forgotten head nodes: As first
 
185
     * described in detail by Boehm
 
186
     * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
 
187
     * delays noticing that any arbitrarily old node has become
 
188
     * garbage, all newer dead nodes will also be unreclaimed.
 
189
     * (Similar issues arise in non-GC environments.)  To cope with
 
190
     * this in our implementation, upon CASing to advance the head
 
191
     * pointer, we set the "next" link of the previous head to point
 
192
     * only to itself; thus limiting the length of connected dead lists.
 
193
     * (We also take similar care to wipe out possibly garbage
 
194
     * retaining values held in other Node fields.)  However, doing so
 
195
     * adds some further complexity to traversal: If any "next"
 
196
     * pointer links to itself, it indicates that the current thread
 
197
     * has lagged behind a head-update, and so the traversal must
 
198
     * continue from the "head".  Traversals trying to find the
 
199
     * current tail starting from "tail" may also encounter
 
200
     * self-links, in which case they also continue at "head".
 
201
     *
 
202
     * It is tempting in slack-based scheme to not even use CAS for
 
203
     * updates (similarly to Ladan-Mozes & Shavit). However, this
 
204
     * cannot be done for head updates under the above link-forgetting
 
205
     * mechanics because an update may leave head at a detached node.
 
206
     * And while direct writes are possible for tail updates, they
 
207
     * increase the risk of long retraversals, and hence long garbage
 
208
     * chains, which can be much more costly than is worthwhile
 
209
     * considering that the cost difference of performing a CAS vs
 
210
     * write is smaller when they are not triggered on each operation
 
211
     * (especially considering that writes and CASes equally require
 
212
     * additional GC bookkeeping ("write barriers") that are sometimes
 
213
     * more costly than the writes themselves because of contention).
 
214
     *
 
215
     * *** Overview of implementation ***
 
216
     *
 
217
     * We use a threshold-based approach to updates, with a slack
 
218
     * threshold of two -- that is, we update head/tail when the
 
219
     * current pointer appears to be two or more steps away from the
 
220
     * first/last node. The slack value is hard-wired: a path greater
 
221
     * than one is naturally implemented by checking equality of
 
222
     * traversal pointers except when the list has only one element,
 
223
     * in which case we keep slack threshold at one. Avoiding tracking
 
224
     * explicit counts across method calls slightly simplifies an
 
225
     * already-messy implementation. Using randomization would
 
226
     * probably work better if there were a low-quality dirt-cheap
 
227
     * per-thread one available, but even ThreadLocalRandom is too
 
228
     * heavy for these purposes.
 
229
     *
 
230
     * With such a small slack threshold value, it is not worthwhile
 
231
     * to augment this with path short-circuiting (i.e., unsplicing
 
232
     * interior nodes) except in the case of cancellation/removal (see
 
233
     * below).
 
234
     *
 
235
     * We allow both the head and tail fields to be null before any
 
236
     * nodes are enqueued; initializing upon first append.  This
 
237
     * simplifies some other logic, as well as providing more
 
238
     * efficient explicit control paths instead of letting JVMs insert
 
239
     * implicit NullPointerExceptions when they are null.  While not
 
240
     * currently fully implemented, we also leave open the possibility
 
241
     * of re-nulling these fields when empty (which is complicated to
 
242
     * arrange, for little benefit.)
 
243
     *
 
244
     * All enqueue/dequeue operations are handled by the single method
 
245
     * "xfer" with parameters indicating whether to act as some form
 
246
     * of offer, put, poll, take, or transfer (each possibly with
 
247
     * timeout). The relative complexity of using one monolithic
 
248
     * method outweighs the code bulk and maintenance problems of
 
249
     * using separate methods for each case.
 
250
     *
 
251
     * Operation consists of up to three phases. The first is
 
252
     * implemented within method xfer, the second in tryAppend, and
 
253
     * the third in method awaitMatch.
 
254
     *
 
255
     * 1. Try to match an existing node
 
256
     *
 
257
     *    Starting at head, skip already-matched nodes until finding
 
258
     *    an unmatched node of opposite mode, if one exists, in which
 
259
     *    case matching it and returning, also if necessary updating
 
260
     *    head to one past the matched node (or the node itself if the
 
261
     *    list has no other unmatched nodes). If the CAS misses, then
 
262
     *    a loop retries advancing head by two steps until either
 
263
     *    success or the slack is at most two. By requiring that each
 
264
     *    attempt advances head by two (if applicable), we ensure that
 
265
     *    the slack does not grow without bound. Traversals also check
 
266
     *    if the initial head is now off-list, in which case they
 
267
     *    start at the new head.
 
268
     *
 
269
     *    If no candidates are found and the call was untimed
 
270
     *    poll/offer, (argument "how" is NOW) return.
 
271
     *
 
272
     * 2. Try to append a new node (method tryAppend)
 
273
     *
 
274
     *    Starting at current tail pointer, find the actual last node
 
275
     *    and try to append a new node (or if head was null, establish
 
276
     *    the first node). Nodes can be appended only if their
 
277
     *    predecessors are either already matched or are of the same
 
278
     *    mode. If we detect otherwise, then a new node with opposite
 
279
     *    mode must have been appended during traversal, so we must
 
280
     *    restart at phase 1. The traversal and update steps are
 
281
     *    otherwise similar to phase 1: Retrying upon CAS misses and
 
282
     *    checking for staleness.  In particular, if a self-link is
 
283
     *    encountered, then we can safely jump to a node on the list
 
284
     *    by continuing the traversal at current head.
 
285
     *
 
286
     *    On successful append, if the call was ASYNC, return.
 
287
     *
 
288
     * 3. Await match or cancellation (method awaitMatch)
 
289
     *
 
290
     *    Wait for another thread to match node; instead cancelling if
 
291
     *    the current thread was interrupted or the wait timed out. On
 
292
     *    multiprocessors, we use front-of-queue spinning: If a node
 
293
     *    appears to be the first unmatched node in the queue, it
 
294
     *    spins a bit before blocking. In either case, before blocking
 
295
     *    it tries to unsplice any nodes between the current "head"
 
296
     *    and the first unmatched node.
 
297
     *
 
298
     *    Front-of-queue spinning vastly improves performance of
 
299
     *    heavily contended queues. And so long as it is relatively
 
300
     *    brief and "quiet", spinning does not much impact performance
 
301
     *    of less-contended queues.  During spins threads check their
 
302
     *    interrupt status and generate a thread-local random number
 
303
     *    to decide to occasionally perform a Thread.yield. While
 
304
     *    yield has underdefined specs, we assume that it might help,
 
305
     *    and will not hurt, in limiting impact of spinning on busy
 
306
     *    systems.  We also use smaller (1/2) spins for nodes that are
 
307
     *    not known to be front but whose predecessors have not
 
308
     *    blocked -- these "chained" spins avoid artifacts of
 
309
     *    front-of-queue rules which otherwise lead to alternating
 
310
     *    nodes spinning vs blocking. Further, front threads that
 
311
     *    represent phase changes (from data to request node or vice
 
312
     *    versa) compared to their predecessors receive additional
 
313
     *    chained spins, reflecting longer paths typically required to
 
314
     *    unblock threads during phase changes.
 
315
     *
 
316
     *
 
317
     * ** Unlinking removed interior nodes **
 
318
     *
 
319
     * In addition to minimizing garbage retention via self-linking
 
320
     * described above, we also unlink removed interior nodes. These
 
321
     * may arise due to timed out or interrupted waits, or calls to
 
322
     * remove(x) or Iterator.remove.  Normally, given a node that was
 
323
     * at one time known to be the predecessor of some node s that is
 
324
     * to be removed, we can unsplice s by CASing the next field of
 
325
     * its predecessor if it still points to s (otherwise s must
 
326
     * already have been removed or is now offlist). But there are two
 
327
     * situations in which we cannot guarantee to make node s
 
328
     * unreachable in this way: (1) If s is the trailing node of list
 
329
     * (i.e., with null next), then it is pinned as the target node
 
330
     * for appends, so can only be removed later after other nodes are
 
331
     * appended. (2) We cannot necessarily unlink s given a
 
332
     * predecessor node that is matched (including the case of being
 
333
     * cancelled): the predecessor may already be unspliced, in which
 
334
     * case some previous reachable node may still point to s.
 
335
     * (For further explanation see Herlihy & Shavit "The Art of
 
336
     * Multiprocessor Programming" chapter 9).  Although, in both
 
337
     * cases, we can rule out the need for further action if either s
 
338
     * or its predecessor are (or can be made to be) at, or fall off
 
339
     * from, the head of list.
 
340
     *
 
341
     * Without taking these into account, it would be possible for an
 
342
     * unbounded number of supposedly removed nodes to remain
 
343
     * reachable.  Situations leading to such buildup are uncommon but
 
344
     * can occur in practice; for example when a series of short timed
 
345
     * calls to poll repeatedly time out but never otherwise fall off
 
346
     * the list because of an untimed call to take at the front of the
 
347
     * queue.
 
348
     *
 
349
     * When these cases arise, rather than always retraversing the
 
350
     * entire list to find an actual predecessor to unlink (which
 
351
     * won't help for case (1) anyway), we record a conservative
 
352
     * estimate of possible unsplice failures (in "sweepVotes").
 
353
     * We trigger a full sweep when the estimate exceeds a threshold
 
354
     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
 
355
     * removal failures to tolerate before sweeping through, unlinking
 
356
     * cancelled nodes that were not unlinked upon initial removal.
 
357
     * We perform sweeps by the thread hitting threshold (rather than
 
358
     * background threads or by spreading work to other threads)
 
359
     * because in the main contexts in which removal occurs, the
 
360
     * caller is already timed-out, cancelled, or performing a
 
361
     * potentially O(n) operation (e.g. remove(x)), none of which are
 
362
     * time-critical enough to warrant the overhead that alternatives
 
363
     * would impose on other threads.
 
364
     *
 
365
     * Because the sweepVotes estimate is conservative, and because
 
366
     * nodes become unlinked "naturally" as they fall off the head of
 
367
     * the queue, and because we allow votes to accumulate even while
 
368
     * sweeps are in progress, there are typically significantly fewer
 
369
     * such nodes than estimated.  Choice of a threshold value
 
370
     * balances the likelihood of wasted effort and contention, versus
 
371
     * providing a worst-case bound on retention of interior nodes in
 
372
     * quiescent queues. The value defined below was chosen
 
373
     * empirically to balance these under various timeout scenarios.
 
374
     *
 
375
     * Note that we cannot self-link unlinked interior nodes during
 
376
     * sweeps. However, the associated garbage chains terminate when
 
377
     * some successor ultimately falls off the head of the list and is
 
378
     * self-linked.
 
379
     */
 
380
 
 
381
    /** True if on multiprocessor */
 
382
    private static final boolean MP =
 
383
        Runtime.getRuntime().availableProcessors() > 1;
 
384
 
 
385
    /**
 
386
     * The number of times to spin (with randomly interspersed calls
 
387
     * to Thread.yield) on multiprocessor before blocking when a node
 
388
     * is apparently the first waiter in the queue.  See above for
 
389
     * explanation. Must be a power of two. The value is empirically
 
390
     * derived -- it works pretty well across a variety of processors,
 
391
     * numbers of CPUs, and OSes.
 
392
     */
 
393
    private static final int FRONT_SPINS   = 1 << 7;
 
394
 
 
395
    /**
 
396
     * The number of times to spin before blocking when a node is
 
397
     * preceded by another node that is apparently spinning.  Also
 
398
     * serves as an increment to FRONT_SPINS on phase changes, and as
 
399
     * base average frequency for yielding during spins. Must be a
 
400
     * power of two.
 
401
     */
 
402
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
 
403
 
 
404
    /**
 
405
     * The maximum number of estimated removal failures (sweepVotes)
 
406
     * to tolerate before sweeping through the queue unlinking
 
407
     * cancelled nodes that were not unlinked upon initial
 
408
     * removal. See above for explanation. The value must be at least
 
409
     * two to avoid useless sweeps when removing trailing nodes.
 
410
     */
 
411
    static final int SWEEP_THRESHOLD = 32;
 
412
 
 
413
    /**
 
414
     * Queue nodes. Uses Object, not E, for items to allow forgetting
 
415
     * them after use.  Relies heavily on Unsafe mechanics to minimize
 
416
     * unnecessary ordering constraints: Writes that are intrinsically
 
417
     * ordered wrt other accesses or CASes use simple relaxed forms.
 
418
     */
 
419
    static final class Node {
 
420
        final boolean isData;   // false if this is a request node
 
421
        volatile Object item;   // initially non-null if isData; CASed to match
 
422
        volatile Node next;
 
423
        volatile Thread waiter; // null until waiting
 
424
 
 
425
        // CAS methods for fields
 
426
        final boolean casNext(Node cmp, Node val) {
 
427
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 
428
        }
 
429
 
 
430
        final boolean casItem(Object cmp, Object val) {
 
431
            // assert cmp == null || cmp.getClass() != Node.class;
 
432
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 
433
        }
 
434
 
 
435
        /**
 
436
         * Constructs a new node.  Uses relaxed write because item can
 
437
         * only be seen after publication via casNext.
 
438
         */
 
439
        Node(Object item, boolean isData) {
 
440
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
112
441
            this.isData = isData;
113
442
        }
114
443
 
115
 
        static final AtomicReferenceFieldUpdater<QNode, QNode>
116
 
            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117
 
            (QNode.class, QNode.class, "next");
118
 
 
119
 
        final boolean casNext(QNode cmp, QNode val) {
120
 
            return nextUpdater.compareAndSet(this, cmp, val);
121
 
        }
122
 
 
123
 
        final void clearNext() {
124
 
            nextUpdater.lazySet(this, this);
125
 
        }
126
 
 
127
 
    }
128
 
 
129
 
    /**
130
 
     * Padded version of AtomicReference used for head, tail and
131
 
     * cleanMe, to alleviate contention across threads CASing one vs
132
 
     * the other.
133
 
     */
134
 
    static final class PaddedAtomicReference<T> extends AtomicReference<T> {
135
 
        // enough padding for 64bytes with 4byte refs
136
 
        Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
137
 
        PaddedAtomicReference(T r) { super(r); }
138
 
    }
139
 
 
140
 
 
141
 
    /** head of the queue */
142
 
    private transient final PaddedAtomicReference<QNode> head;
143
 
    /** tail of the queue */
144
 
    private transient final PaddedAtomicReference<QNode> tail;
145
 
 
146
 
    /**
147
 
     * Reference to a cancelled node that might not yet have been
148
 
     * unlinked from queue because it was the last inserted node
149
 
     * when it cancelled.
150
 
     */
151
 
    private transient final PaddedAtomicReference<QNode> cleanMe;
152
 
 
153
 
    /**
154
 
     * Tries to cas nh as new head; if successful, unlink
155
 
     * old head's next node to avoid garbage retention.
156
 
     */
157
 
    private boolean advanceHead(QNode h, QNode nh) {
158
 
        if (h == head.get() && head.compareAndSet(h, nh)) {
159
 
            h.clearNext(); // forget old next
160
 
            return true;
161
 
        }
162
 
        return false;
163
 
    }
164
 
 
165
 
    /**
166
 
     * Puts or takes an item. Used for most queue operations (except
167
 
     * poll() and tryTransfer()). See the similar code in
168
 
     * SynchronousQueue for detailed explanation.
 
444
        /**
 
445
         * Links node to itself to avoid garbage retention.  Called
 
446
         * only after CASing head field, so uses relaxed write.
 
447
         */
 
448
        final void forgetNext() {
 
449
            UNSAFE.putObject(this, nextOffset, this);
 
450
        }
 
451
 
 
452
        /**
 
453
         * Sets item to self and waiter to null, to avoid garbage
 
454
         * retention after matching or cancelling. Uses relaxed writes
 
455
         * because order is already constrained in the only calling
 
456
         * contexts: item is forgotten only after volatile/atomic
 
457
         * mechanics that extract items.  Similarly, clearing waiter
 
458
         * follows either CAS or return from park (if ever parked;
 
459
         * else we don't care).
 
460
         */
 
461
        final void forgetContents() {
 
462
            UNSAFE.putObject(this, itemOffset, this);
 
463
            UNSAFE.putObject(this, waiterOffset, null);
 
464
        }
 
465
 
 
466
        /**
 
467
         * Returns true if this node has been matched, including the
 
468
         * case of artificial matches due to cancellation.
 
469
         */
 
470
        final boolean isMatched() {
 
471
            Object x = item;
 
472
            return (x == this) || ((x == null) == isData);
 
473
        }
 
474
 
 
475
        /**
 
476
         * Returns true if this is an unmatched request node.
 
477
         */
 
478
        final boolean isUnmatchedRequest() {
 
479
            return !isData && item == null;
 
480
        }
 
481
 
 
482
        /**
 
483
         * Returns true if a node with the given mode cannot be
 
484
         * appended to this node because this node is unmatched and
 
485
         * has opposite data mode.
 
486
         */
 
487
        final boolean cannotPrecede(boolean haveData) {
 
488
            boolean d = isData;
 
489
            Object x;
 
490
            return d != haveData && (x = item) != this && (x != null) == d;
 
491
        }
 
492
 
 
493
        /**
 
494
         * Tries to artificially match a data node -- used by remove.
 
495
         */
 
496
        final boolean tryMatchData() {
 
497
            // assert isData;
 
498
            Object x = item;
 
499
            if (x != null && x != this && casItem(x, null)) {
 
500
                LockSupport.unpark(waiter);
 
501
                return true;
 
502
            }
 
503
            return false;
 
504
        }
 
505
 
 
506
        private static final long serialVersionUID = -3375979862319811754L;
 
507
 
 
508
        // Unsafe mechanics
 
509
        private static final sun.misc.Unsafe UNSAFE;
 
510
        private static final long itemOffset;
 
511
        private static final long nextOffset;
 
512
        private static final long waiterOffset;
 
513
        static {
 
514
            try {
 
515
                UNSAFE = getUnsafe();
 
516
                Class<?> k = Node.class;
 
517
                itemOffset = UNSAFE.objectFieldOffset
 
518
                    (k.getDeclaredField("item"));
 
519
                nextOffset = UNSAFE.objectFieldOffset
 
520
                    (k.getDeclaredField("next"));
 
521
                waiterOffset = UNSAFE.objectFieldOffset
 
522
                    (k.getDeclaredField("waiter"));
 
523
            } catch (Exception e) {
 
524
                throw new Error(e);
 
525
            }
 
526
        }
 
527
    }
 
528
 
 
529
    /** head of the queue; null until first enqueue */
 
530
    transient volatile Node head;
 
531
 
 
532
    /** tail of the queue; null until first append */
 
533
    private transient volatile Node tail;
 
534
 
 
535
    /** The number of apparent failures to unsplice removed nodes */
 
536
    private transient volatile int sweepVotes;
 
537
 
 
538
    // CAS methods for fields
 
539
    private boolean casTail(Node cmp, Node val) {
 
540
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
 
541
    }
 
542
 
 
543
    private boolean casHead(Node cmp, Node val) {
 
544
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
 
545
    }
 
546
 
 
547
    private boolean casSweepVotes(int cmp, int val) {
 
548
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
 
549
    }
 
550
 
 
551
    /*
 
552
     * Possible values for "how" argument in xfer method.
 
553
     */
 
554
    private static final int NOW   = 0; // for untimed poll, tryTransfer
 
555
    private static final int ASYNC = 1; // for offer, put, add
 
556
    private static final int SYNC  = 2; // for transfer, take
 
557
    private static final int TIMED = 3; // for timed poll, tryTransfer
 
558
 
 
559
    @SuppressWarnings("unchecked")
 
560
    static <E> E cast(Object item) {
 
561
        // assert item == null || item.getClass() != Node.class;
 
562
        return (E) item;
 
563
    }
 
564
 
 
565
    /**
 
566
     * Implements all queuing methods. See above for explanation.
169
567
     *
170
 
     * @param e the item or if null, signifies that this is a take
171
 
     * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
172
 
     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
173
 
     * @return an item, or null on failure
 
568
     * @param e the item or null for take
 
569
     * @param haveData true if this is a put, else a take
 
570
     * @param how NOW, ASYNC, SYNC, or TIMED
 
571
     * @param nanos timeout in nanosecs, used only if mode is TIMED
 
572
     * @return an item if matched, else e
 
573
     * @throws NullPointerException if haveData mode but e is null
174
574
     */
175
 
    private Object xfer(Object e, int mode, long nanos) {
176
 
        boolean isData = (e != null);
177
 
        QNode s = null;
178
 
        final PaddedAtomicReference<QNode> head = this.head;
179
 
        final PaddedAtomicReference<QNode> tail = this.tail;
180
 
 
181
 
        for (;;) {
182
 
            QNode t = tail.get();
183
 
            QNode h = head.get();
184
 
 
185
 
            if (t != null && (t == h || t.isData == isData)) {
 
575
    private E xfer(E e, boolean haveData, int how, long nanos) {
 
576
        if (haveData && (e == null))
 
577
            throw new NullPointerException();
 
578
        Node s = null;                        // the node to append, if needed
 
579
 
 
580
        retry:
 
581
        for (;;) {                            // restart on append race
 
582
 
 
583
            for (Node h = head, p = h; p != null;) { // find & match first node
 
584
                boolean isData = p.isData;
 
585
                Object item = p.item;
 
586
                if (item != p && (item != null) == isData) { // unmatched
 
587
                    if (isData == haveData)   // can't match
 
588
                        break;
 
589
                    if (p.casItem(item, e)) { // match
 
590
                        for (Node q = p; q != h;) {
 
591
                            Node n = q.next;  // update by 2 unless singleton
 
592
                            if (head == h && casHead(h, n == null ? q : n)) {
 
593
                                h.forgetNext();
 
594
                                break;
 
595
                            }                 // advance and retry
 
596
                            if ((h = head)   == null ||
 
597
                                (q = h.next) == null || !q.isMatched())
 
598
                                break;        // unless slack < 2
 
599
                        }
 
600
                        LockSupport.unpark(p.waiter);
 
601
                        return LinkedTransferQueue.<E>cast(item);
 
602
                    }
 
603
                }
 
604
                Node n = p.next;
 
605
                p = (p != n) ? n : (h = head); // Use head if p offlist
 
606
            }
 
607
 
 
608
            if (how != NOW) {                 // No matches available
186
609
                if (s == null)
187
 
                    s = new QNode(e, isData);
188
 
                QNode last = t.next;
189
 
                if (last != null) {
190
 
                    if (t == tail.get())
191
 
                        tail.compareAndSet(t, last);
192
 
                }
193
 
                else if (t.casNext(null, s)) {
194
 
                    tail.compareAndSet(t, s);
195
 
                    return awaitFulfill(t, s, e, mode, nanos);
196
 
                }
197
 
            }
198
 
 
199
 
            else if (h != null) {
200
 
                QNode first = h.next;
201
 
                if (t == tail.get() && first != null &&
202
 
                    advanceHead(h, first)) {
203
 
                    Object x = first.get();
204
 
                    if (x != first && first.compareAndSet(x, e)) {
205
 
                        LockSupport.unpark(first.waiter);
206
 
                        return isData? e : x;
207
 
                    }
208
 
                }
209
 
            }
 
610
                    s = new Node(e, haveData);
 
611
                Node pred = tryAppend(s, haveData);
 
612
                if (pred == null)
 
613
                    continue retry;           // lost race vs opposite mode
 
614
                if (how != ASYNC)
 
615
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
 
616
            }
 
617
            return e; // not waiting
210
618
        }
211
619
    }
212
620
 
213
 
 
214
621
    /**
215
 
     * Version of xfer for poll() and tryTransfer, which
216
 
     * simplifies control paths both here and in xfer.
 
622
     * Tries to append node s as tail.
 
623
     *
 
624
     * @param s the node to append
 
625
     * @param haveData true if appending in data mode
 
626
     * @return null on failure due to losing race with append in
 
627
     * different mode, else s's predecessor, or s itself if no
 
628
     * predecessor
217
629
     */
218
 
    private Object fulfill(Object e) {
219
 
        boolean isData = (e != null);
220
 
        final PaddedAtomicReference<QNode> head = this.head;
221
 
        final PaddedAtomicReference<QNode> tail = this.tail;
222
 
 
223
 
        for (;;) {
224
 
            QNode t = tail.get();
225
 
            QNode h = head.get();
226
 
 
227
 
            if (t != null && (t == h || t.isData == isData)) {
228
 
                QNode last = t.next;
229
 
                if (t == tail.get()) {
230
 
                    if (last != null)
231
 
                        tail.compareAndSet(t, last);
232
 
                    else
233
 
                        return null;
234
 
                }
 
630
    private Node tryAppend(Node s, boolean haveData) {
 
631
        for (Node t = tail, p = t;;) {        // move p to last node and append
 
632
            Node n, u;                        // temps for reads of next & tail
 
633
            if (p == null && (p = head) == null) {
 
634
                if (casHead(null, s))
 
635
                    return s;                 // initialize
235
636
            }
236
 
            else if (h != null) {
237
 
                QNode first = h.next;
238
 
                if (t == tail.get() &&
239
 
                    first != null &&
240
 
                    advanceHead(h, first)) {
241
 
                    Object x = first.get();
242
 
                    if (x != first && first.compareAndSet(x, e)) {
243
 
                        LockSupport.unpark(first.waiter);
244
 
                        return isData? e : x;
245
 
                    }
 
637
            else if (p.cannotPrecede(haveData))
 
638
                return null;                  // lost race vs opposite mode
 
639
            else if ((n = p.next) != null)    // not last; keep traversing
 
640
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
 
641
                    (p != n) ? n : null;      // restart if off list
 
642
            else if (!p.casNext(null, s))
 
643
                p = p.next;                   // re-read on CAS failure
 
644
            else {
 
645
                if (p != t) {                 // update if slack now >= 2
 
646
                    while ((tail != t || !casTail(t, s)) &&
 
647
                           (t = tail)   != null &&
 
648
                           (s = t.next) != null && // advance and retry
 
649
                           (s = s.next) != null && s != t);
246
650
                }
 
651
                return p;
247
652
            }
248
653
        }
249
654
    }
250
655
 
251
656
    /**
252
 
     * Spins/blocks until node s is fulfilled or caller gives up,
253
 
     * depending on wait mode.
 
657
     * Spins/yields/blocks until node s is matched or caller gives up.
254
658
     *
255
 
     * @param pred the predecessor of waiting node
256
659
     * @param s the waiting node
 
660
     * @param pred the predecessor of s, or s itself if it has no
 
661
     * predecessor, or null if unknown (the null case does not occur
 
662
     * in any current calls but may in possible future extensions)
257
663
     * @param e the comparison value for checking match
258
 
     * @param mode mode
259
 
     * @param nanos timeout value
260
 
     * @return matched item, or s if cancelled
 
664
     * @param timed if true, wait only until timeout elapses
 
665
     * @param nanos timeout in nanosecs, used only if timed is true
 
666
     * @return matched item, or e if unmatched on interrupt or timeout
261
667
     */
262
 
    private Object awaitFulfill(QNode pred, QNode s, Object e,
263
 
                                int mode, long nanos) {
264
 
        if (mode == NOWAIT)
265
 
            return null;
266
 
 
267
 
        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
 
668
    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
 
669
        long lastTime = timed ? System.nanoTime() : 0L;
268
670
        Thread w = Thread.currentThread();
269
 
        int spins = -1; // set to desired spin count below
 
671
        int spins = -1; // initialized after first item and cancel checks
 
672
        ThreadLocalRandom randomYields = null; // bound if needed
 
673
 
270
674
        for (;;) {
271
 
            if (w.isInterrupted())
272
 
                s.compareAndSet(e, s);
273
 
            Object x = s.get();
274
 
            if (x != e) {                 // Node was matched or cancelled
275
 
                advanceHead(pred, s);     // unlink if head
276
 
                if (x == s) {             // was cancelled
277
 
                    clean(pred, s);
278
 
                    return null;
279
 
                }
280
 
                else if (x != null) {
281
 
                    s.set(s);             // avoid garbage retention
282
 
                    return x;
283
 
                }
284
 
                else
285
 
                    return e;
286
 
            }
287
 
            if (mode == TIMEOUT) {
 
675
            Object item = s.item;
 
676
            if (item != e) {                  // matched
 
677
                // assert item != s;
 
678
                s.forgetContents();           // avoid garbage
 
679
                return LinkedTransferQueue.<E>cast(item);
 
680
            }
 
681
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
 
682
                    s.casItem(e, s)) {        // cancel
 
683
                unsplice(pred, s);
 
684
                return e;
 
685
            }
 
686
 
 
687
            if (spins < 0) {                  // establish spins at/near front
 
688
                if ((spins = spinsFor(pred, s.isData)) > 0)
 
689
                    randomYields = ThreadLocalRandom.current();
 
690
            }
 
691
            else if (spins > 0) {             // spin
 
692
                --spins;
 
693
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
 
694
                    Thread.yield();           // occasionally yield
 
695
            }
 
696
            else if (s.waiter == null) {
 
697
                s.waiter = w;                 // request unpark then recheck
 
698
            }
 
699
            else if (timed) {
288
700
                long now = System.nanoTime();
289
 
                nanos -= now - lastTime;
 
701
                if ((nanos -= now - lastTime) > 0)
 
702
                    LockSupport.parkNanos(this, nanos);
290
703
                lastTime = now;
291
 
                if (nanos <= 0) {
292
 
                    s.compareAndSet(e, s); // try to cancel
 
704
            }
 
705
            else {
 
706
                LockSupport.park(this);
 
707
            }
 
708
        }
 
709
    }
 
710
 
 
711
    /**
 
712
     * Returns spin/yield value for a node with given predecessor and
 
713
     * data mode. See above for explanation.
 
714
     */
 
715
    private static int spinsFor(Node pred, boolean haveData) {
 
716
        if (MP && pred != null) {
 
717
            if (pred.isData != haveData)      // phase change
 
718
                return FRONT_SPINS + CHAINED_SPINS;
 
719
            if (pred.isMatched())             // probably at front
 
720
                return FRONT_SPINS;
 
721
            if (pred.waiter == null)          // pred apparently spinning
 
722
                return CHAINED_SPINS;
 
723
        }
 
724
        return 0;
 
725
    }
 
726
 
 
727
    /* -------------- Traversal methods -------------- */
 
728
 
 
729
    /**
 
730
     * Returns the successor of p, or the head node if p.next has been
 
731
     * linked to self, which will only be true if traversing with a
 
732
     * stale pointer that is now off the list.
 
733
     */
 
734
    final Node succ(Node p) {
 
735
        Node next = p.next;
 
736
        return (p == next) ? head : next;
 
737
    }
 
738
 
 
739
    /**
 
740
     * Returns the first unmatched node of the given mode, or null if
 
741
     * none.  Used by methods isEmpty, hasWaitingConsumer.
 
742
     */
 
743
    private Node firstOfMode(boolean isData) {
 
744
        for (Node p = head; p != null; p = succ(p)) {
 
745
            if (!p.isMatched())
 
746
                return (p.isData == isData) ? p : null;
 
747
        }
 
748
        return null;
 
749
    }
 
750
 
 
751
    /**
 
752
     * Returns the item in the first unmatched node with isData; or
 
753
     * null if none.  Used by peek.
 
754
     */
 
755
    private E firstDataItem() {
 
756
        for (Node p = head; p != null; p = succ(p)) {
 
757
            Object item = p.item;
 
758
            if (p.isData) {
 
759
                if (item != null && item != p)
 
760
                    return LinkedTransferQueue.<E>cast(item);
 
761
            }
 
762
            else if (item == null)
 
763
                return null;
 
764
        }
 
765
        return null;
 
766
    }
 
767
 
 
768
    /**
 
769
     * Traverses and counts unmatched nodes of the given mode.
 
770
     * Used by methods size and getWaitingConsumerCount.
 
771
     */
 
772
    private int countOfMode(boolean data) {
 
773
        int count = 0;
 
774
        for (Node p = head; p != null; ) {
 
775
            if (!p.isMatched()) {
 
776
                if (p.isData != data)
 
777
                    return 0;
 
778
                if (++count == Integer.MAX_VALUE) // saturated
 
779
                    break;
 
780
            }
 
781
            Node n = p.next;
 
782
            if (n != p)
 
783
                p = n;
 
784
            else {
 
785
                count = 0;
 
786
                p = head;
 
787
            }
 
788
        }
 
789
        return count;
 
790
    }
 
791
 
 
792
    final class Itr implements Iterator<E> {
 
793
        private Node nextNode;   // next node to return item for
 
794
        private E nextItem;      // the corresponding item
 
795
        private Node lastRet;    // last returned node, to support remove
 
796
        private Node lastPred;   // predecessor to unlink lastRet
 
797
 
 
798
        /**
 
799
         * Moves to next node after prev, or first node if prev null.
 
800
         */
 
801
        private void advance(Node prev) {
 
802
            /*
 
803
             * To track and avoid buildup of deleted nodes in the face
 
804
             * of calls to both Queue.remove and Itr.remove, we must
 
805
             * include variants of unsplice and sweep upon each
 
806
             * advance: Upon Itr.remove, we may need to catch up links
 
807
             * from lastPred, and upon other removes, we might need to
 
808
             * skip ahead from stale nodes and unsplice deleted ones
 
809
             * found while advancing.
 
810
             */
 
811
 
 
812
            Node r, b; // reset lastPred upon possible deletion of lastRet
 
813
            if ((r = lastRet) != null && !r.isMatched())
 
814
                lastPred = r;    // next lastPred is old lastRet
 
815
            else if ((b = lastPred) == null || b.isMatched())
 
816
                lastPred = null; // at start of list
 
817
            else {
 
818
                Node s, n;       // help with removal of lastPred.next
 
819
                while ((s = b.next) != null &&
 
820
                       s != b && s.isMatched() &&
 
821
                       (n = s.next) != null && n != s)
 
822
                    b.casNext(s, n);
 
823
            }
 
824
 
 
825
            this.lastRet = prev;
 
826
 
 
827
            for (Node p = prev, s, n;;) {
 
828
                s = (p == null) ? head : p.next;
 
829
                if (s == null)
 
830
                    break;
 
831
                else if (s == p) {
 
832
                    p = null;
293
833
                    continue;
294
834
                }
295
 
            }
296
 
            if (spins < 0) {
297
 
                QNode h = head.get(); // only spin if at head
298
 
                spins = ((h != null && h.next == s) ?
299
 
                         (mode == TIMEOUT?
300
 
                          maxTimedSpins : maxUntimedSpins) : 0);
301
 
            }
302
 
            if (spins > 0)
303
 
                --spins;
304
 
            else if (s.waiter == null)
305
 
                s.waiter = w;
306
 
            else if (mode != TIMEOUT) {
307
 
                LockSupport.park(this);
308
 
                s.waiter = null;
309
 
                spins = -1;
310
 
            }
311
 
            else if (nanos > spinForTimeoutThreshold) {
312
 
                LockSupport.parkNanos(this, nanos);
313
 
                s.waiter = null;
314
 
                spins = -1;
315
 
            }
316
 
        }
317
 
    }
318
 
 
319
 
    /**
320
 
     * Returns validated tail for use in cleaning methods.
321
 
     */
322
 
    private QNode getValidatedTail() {
323
 
        for (;;) {
324
 
            QNode h = head.get();
325
 
            QNode first = h.next;
326
 
            if (first != null && first.next == first) { // help advance
327
 
                advanceHead(h, first);
328
 
                continue;
329
 
            }
330
 
            QNode t = tail.get();
331
 
            QNode last = t.next;
332
 
            if (t == tail.get()) {
333
 
                if (last != null)
334
 
                    tail.compareAndSet(t, last); // help advance
 
835
                Object item = s.item;
 
836
                if (s.isData) {
 
837
                    if (item != null && item != s) {
 
838
                        nextItem = LinkedTransferQueue.<E>cast(item);
 
839
                        nextNode = s;
 
840
                        return;
 
841
                    }
 
842
                }
 
843
                else if (item == null)
 
844
                    break;
 
845
                // assert s.isMatched();
 
846
                if (p == null)
 
847
                    p = s;
 
848
                else if ((n = s.next) == null)
 
849
                    break;
 
850
                else if (s == n)
 
851
                    p = null;
335
852
                else
336
 
                    return t;
337
 
            }
338
 
        }
339
 
    }
340
 
 
341
 
    /**
342
 
     * Gets rid of cancelled node s with original predecessor pred.
343
 
     *
344
 
     * @param pred predecessor of cancelled node
345
 
     * @param s the cancelled node
346
 
     */
347
 
    private void clean(QNode pred, QNode s) {
348
 
        Thread w = s.waiter;
349
 
        if (w != null) {             // Wake up thread
350
 
            s.waiter = null;
351
 
            if (w != Thread.currentThread())
352
 
                LockSupport.unpark(w);
353
 
        }
354
 
 
355
 
        if (pred == null)
356
 
            return;
357
 
 
358
 
        /*
359
 
         * At any given time, exactly one node on list cannot be
360
 
         * deleted -- the last inserted node. To accommodate this, if
361
 
         * we cannot delete s, we save its predecessor as "cleanMe",
362
 
         * processing the previously saved version first. At least one
363
 
         * of node s or the node previously saved can always be
364
 
         * processed, so this always terminates.
365
 
         */
366
 
        while (pred.next == s) {
367
 
            QNode oldpred = reclean();  // First, help get rid of cleanMe
368
 
            QNode t = getValidatedTail();
369
 
            if (s != t) {               // If not tail, try to unsplice
370
 
                QNode sn = s.next;      // s.next == s means s already off list
371
 
                if (sn == s || pred.casNext(s, sn))
372
 
                    break;
373
 
            }
374
 
            else if (oldpred == pred || // Already saved
375
 
                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
376
 
                break;                  // Postpone cleaning
377
 
        }
378
 
    }
379
 
 
380
 
    /**
381
 
     * Tries to unsplice the cancelled node held in cleanMe that was
382
 
     * previously uncleanable because it was at tail.
383
 
     *
384
 
     * @return current cleanMe node (or null)
385
 
     */
386
 
    private QNode reclean() {
387
 
        /*
388
 
         * cleanMe is, or at one time was, predecessor of cancelled
389
 
         * node s that was the tail so could not be unspliced.  If s
390
 
         * is no longer the tail, try to unsplice if necessary and
391
 
         * make cleanMe slot available.  This differs from similar
392
 
         * code in clean() because we must check that pred still
393
 
         * points to a cancelled node that must be unspliced -- if
394
 
         * not, we can (must) clear cleanMe without unsplicing.
395
 
         * This can loop only due to contention on casNext or
396
 
         * clearing cleanMe.
397
 
         */
398
 
        QNode pred;
399
 
        while ((pred = cleanMe.get()) != null) {
400
 
            QNode t = getValidatedTail();
401
 
            QNode s = pred.next;
402
 
            if (s != t) {
403
 
                QNode sn;
404
 
                if (s == null || s == pred || s.get() != s ||
405
 
                    (sn = s.next) == s || pred.casNext(s, sn))
406
 
                    cleanMe.compareAndSet(pred, null);
407
 
            }
408
 
            else // s is still tail; cannot clean
 
853
                    p.casNext(s, n);
 
854
            }
 
855
            nextNode = null;
 
856
            nextItem = null;
 
857
        }
 
858
 
 
859
        Itr() {
 
860
            advance(null);
 
861
        }
 
862
 
 
863
        public final boolean hasNext() {
 
864
            return nextNode != null;
 
865
        }
 
866
 
 
867
        public final E next() {
 
868
            Node p = nextNode;
 
869
            if (p == null) throw new NoSuchElementException();
 
870
            E e = nextItem;
 
871
            advance(p);
 
872
            return e;
 
873
        }
 
874
 
 
875
        public final void remove() {
 
876
            final Node lastRet = this.lastRet;
 
877
            if (lastRet == null)
 
878
                throw new IllegalStateException();
 
879
            this.lastRet = null;
 
880
            if (lastRet.tryMatchData())
 
881
                unsplice(lastPred, lastRet);
 
882
        }
 
883
    }
 
884
 
 
885
    /* -------------- Removal methods -------------- */
 
886
 
 
887
    /**
 
888
     * Unsplices (now or later) the given deleted/cancelled node with
 
889
     * the given predecessor.
 
890
     *
 
891
     * @param pred a node that was at one time known to be the
 
892
     * predecessor of s, or null or s itself if s is/was at head
 
893
     * @param s the node to be unspliced
 
894
     */
 
895
    final void unsplice(Node pred, Node s) {
 
896
        s.forgetContents(); // forget unneeded fields
 
897
        /*
 
898
         * See above for rationale. Briefly: if pred still points to
 
899
         * s, try to unlink s.  If s cannot be unlinked, because it is
 
900
         * trailing node or pred might be unlinked, and neither pred
 
901
         * nor s are head or offlist, add to sweepVotes, and if enough
 
902
         * votes have accumulated, sweep.
 
903
         */
 
904
        if (pred != null && pred != s && pred.next == s) {
 
905
            Node n = s.next;
 
906
            if (n == null ||
 
907
                (n != s && pred.casNext(s, n) && pred.isMatched())) {
 
908
                for (;;) {               // check if at, or could be, head
 
909
                    Node h = head;
 
910
                    if (h == pred || h == s || h == null)
 
911
                        return;          // at head or list empty
 
912
                    if (!h.isMatched())
 
913
                        break;
 
914
                    Node hn = h.next;
 
915
                    if (hn == null)
 
916
                        return;          // now empty
 
917
                    if (hn != h && casHead(h, hn))
 
918
                        h.forgetNext();  // advance head
 
919
                }
 
920
                if (pred.next != pred && s.next != s) { // recheck if offlist
 
921
                    for (;;) {           // sweep now if enough votes
 
922
                        int v = sweepVotes;
 
923
                        if (v < SWEEP_THRESHOLD) {
 
924
                            if (casSweepVotes(v, v + 1))
 
925
                                break;
 
926
                        }
 
927
                        else if (casSweepVotes(v, 0)) {
 
928
                            sweep();
 
929
                            break;
 
930
                        }
 
931
                    }
 
932
                }
 
933
            }
 
934
        }
 
935
    }
 
936
 
 
937
    /**
 
938
     * Unlinks matched (typically cancelled) nodes encountered in a
 
939
     * traversal from head.
 
940
     */
 
941
    private void sweep() {
 
942
        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
 
943
            if (!s.isMatched())
 
944
                // Unmatched nodes are never self-linked
 
945
                p = s;
 
946
            else if ((n = s.next) == null) // trailing node is pinned
409
947
                break;
410
 
        }
411
 
        return pred;
412
 
    }
 
948
            else if (s == n)    // stale
 
949
                // No need to also check for p == s, since that implies s == n
 
950
                p = head;
 
951
            else
 
952
                p.casNext(s, n);
 
953
        }
 
954
    }
 
955
 
 
956
    /**
 
957
     * Main implementation of remove(Object)
 
958
     */
 
959
    private boolean findAndRemove(Object e) {
 
960
        if (e != null) {
 
961
            for (Node pred = null, p = head; p != null; ) {
 
962
                Object item = p.item;
 
963
                if (p.isData) {
 
964
                    if (item != null && item != p && e.equals(item) &&
 
965
                        p.tryMatchData()) {
 
966
                        unsplice(pred, p);
 
967
                        return true;
 
968
                    }
 
969
                }
 
970
                else if (item == null)
 
971
                    break;
 
972
                pred = p;
 
973
                if ((p = p.next) == pred) { // stale
 
974
                    pred = null;
 
975
                    p = head;
 
976
                }
 
977
            }
 
978
        }
 
979
        return false;
 
980
    }
 
981
 
413
982
 
414
983
    /**
415
984
     * Creates an initially empty {@code LinkedTransferQueue}.
416
985
     */
417
986
    public LinkedTransferQueue() {
418
 
        QNode dummy = new QNode(null, false);
419
 
        head = new PaddedAtomicReference<QNode>(dummy);
420
 
        tail = new PaddedAtomicReference<QNode>(dummy);
421
 
        cleanMe = new PaddedAtomicReference<QNode>(null);
422
987
    }
423
988
 
424
989
    /**
435
1000
        addAll(c);
436
1001
    }
437
1002
 
438
 
    public void put(E e) throws InterruptedException {
439
 
        if (e == null) throw new NullPointerException();
440
 
        if (Thread.interrupted()) throw new InterruptedException();
441
 
        xfer(e, NOWAIT, 0);
 
1003
    /**
 
1004
     * Inserts the specified element at the tail of this queue.
 
1005
     * As the queue is unbounded, this method will never block.
 
1006
     *
 
1007
     * @throws NullPointerException if the specified element is null
 
1008
     */
 
1009
    public void put(E e) {
 
1010
        xfer(e, true, ASYNC, 0);
442
1011
    }
443
1012
 
444
 
    public boolean offer(E e, long timeout, TimeUnit unit)
445
 
        throws InterruptedException {
446
 
        if (e == null) throw new NullPointerException();
447
 
        if (Thread.interrupted()) throw new InterruptedException();
448
 
        xfer(e, NOWAIT, 0);
 
1013
    /**
 
1014
     * Inserts the specified element at the tail of this queue.
 
1015
     * As the queue is unbounded, this method will never block or
 
1016
     * return {@code false}.
 
1017
     *
 
1018
     * @return {@code true} (as specified by
 
1019
     *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
 
1020
     *  BlockingQueue.offer})
 
1021
     * @throws NullPointerException if the specified element is null
 
1022
     */
 
1023
    public boolean offer(E e, long timeout, TimeUnit unit) {
 
1024
        xfer(e, true, ASYNC, 0);
449
1025
        return true;
450
1026
    }
451
1027
 
 
1028
    /**
 
1029
     * Inserts the specified element at the tail of this queue.
 
1030
     * As the queue is unbounded, this method will never return {@code false}.
 
1031
     *
 
1032
     * @return {@code true} (as specified by {@link Queue#offer})
 
1033
     * @throws NullPointerException if the specified element is null
 
1034
     */
452
1035
    public boolean offer(E e) {
453
 
        if (e == null) throw new NullPointerException();
454
 
        xfer(e, NOWAIT, 0);
 
1036
        xfer(e, true, ASYNC, 0);
455
1037
        return true;
456
1038
    }
457
1039
 
 
1040
    /**
 
1041
     * Inserts the specified element at the tail of this queue.
 
1042
     * As the queue is unbounded, this method will never throw
 
1043
     * {@link IllegalStateException} or return {@code false}.
 
1044
     *
 
1045
     * @return {@code true} (as specified by {@link Collection#add})
 
1046
     * @throws NullPointerException if the specified element is null
 
1047
     */
458
1048
    public boolean add(E e) {
459
 
        if (e == null) throw new NullPointerException();
460
 
        xfer(e, NOWAIT, 0);
 
1049
        xfer(e, true, ASYNC, 0);
461
1050
        return true;
462
1051
    }
463
1052
 
 
1053
    /**
 
1054
     * Transfers the element to a waiting consumer immediately, if possible.
 
1055
     *
 
1056
     * <p>More precisely, transfers the specified element immediately
 
1057
     * if there exists a consumer already waiting to receive it (in
 
1058
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
 
1059
     * otherwise returning {@code false} without enqueuing the element.
 
1060
     *
 
1061
     * @throws NullPointerException if the specified element is null
 
1062
     */
 
1063
    public boolean tryTransfer(E e) {
 
1064
        return xfer(e, true, NOW, 0) == null;
 
1065
    }
 
1066
 
 
1067
    /**
 
1068
     * Transfers the element to a consumer, waiting if necessary to do so.
 
1069
     *
 
1070
     * <p>More precisely, transfers the specified element immediately
 
1071
     * if there exists a consumer already waiting to receive it (in
 
1072
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
 
1073
     * else inserts the specified element at the tail of this queue
 
1074
     * and waits until the element is received by a consumer.
 
1075
     *
 
1076
     * @throws NullPointerException if the specified element is null
 
1077
     */
464
1078
    public void transfer(E e) throws InterruptedException {
465
 
        if (e == null) throw new NullPointerException();
466
 
        if (xfer(e, WAIT, 0) == null) {
467
 
            Thread.interrupted();
 
1079
        if (xfer(e, true, SYNC, 0) != null) {
 
1080
            Thread.interrupted(); // failure possible only due to interrupt
468
1081
            throw new InterruptedException();
469
1082
        }
470
1083
    }
471
1084
 
 
1085
    /**
 
1086
     * Transfers the element to a consumer if it is possible to do so
 
1087
     * before the timeout elapses.
 
1088
     *
 
1089
     * <p>More precisely, transfers the specified element immediately
 
1090
     * if there exists a consumer already waiting to receive it (in
 
1091
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
 
1092
     * else inserts the specified element at the tail of this queue
 
1093
     * and waits until the element is received by a consumer,
 
1094
     * returning {@code false} if the specified wait time elapses
 
1095
     * before the element can be transferred.
 
1096
     *
 
1097
     * @throws NullPointerException if the specified element is null
 
1098
     */
472
1099
    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
473
1100
        throws InterruptedException {
474
 
        if (e == null) throw new NullPointerException();
475
 
        if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
 
1101
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
476
1102
            return true;
477
1103
        if (!Thread.interrupted())
478
1104
            return false;
479
1105
        throw new InterruptedException();
480
1106
    }
481
1107
 
482
 
    public boolean tryTransfer(E e) {
483
 
        if (e == null) throw new NullPointerException();
484
 
        return fulfill(e) != null;
485
 
    }
486
 
 
487
1108
    public E take() throws InterruptedException {
488
 
        Object e = xfer(null, WAIT, 0);
 
1109
        E e = xfer(null, false, SYNC, 0);
489
1110
        if (e != null)
490
 
            return (E)e;
 
1111
            return e;
491
1112
        Thread.interrupted();
492
1113
        throw new InterruptedException();
493
1114
    }
494
1115
 
495
1116
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
496
 
        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
 
1117
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
497
1118
        if (e != null || !Thread.interrupted())
498
 
            return (E)e;
 
1119
            return e;
499
1120
        throw new InterruptedException();
500
1121
    }
501
1122
 
502
1123
    public E poll() {
503
 
        return (E)fulfill(null);
 
1124
        return xfer(null, false, NOW, 0);
504
1125
    }
505
1126
 
 
1127
    /**
 
1128
     * @throws NullPointerException     {@inheritDoc}
 
1129
     * @throws IllegalArgumentException {@inheritDoc}
 
1130
     */
506
1131
    public int drainTo(Collection<? super E> c) {
507
1132
        if (c == null)
508
1133
            throw new NullPointerException();
509
1134
        if (c == this)
510
1135
            throw new IllegalArgumentException();
511
1136
        int n = 0;
512
 
        E e;
513
 
        while ( (e = poll()) != null) {
 
1137
        for (E e; (e = poll()) != null;) {
514
1138
            c.add(e);
515
1139
            ++n;
516
1140
        }
517
1141
        return n;
518
1142
    }
519
1143
 
 
1144
    /**
 
1145
     * @throws NullPointerException     {@inheritDoc}
 
1146
     * @throws IllegalArgumentException {@inheritDoc}
 
1147
     */
520
1148
    public int drainTo(Collection<? super E> c, int maxElements) {
521
1149
        if (c == null)
522
1150
            throw new NullPointerException();
523
1151
        if (c == this)
524
1152
            throw new IllegalArgumentException();
525
1153
        int n = 0;
526
 
        E e;
527
 
        while (n < maxElements && (e = poll()) != null) {
 
1154
        for (E e; n < maxElements && (e = poll()) != null;) {
528
1155
            c.add(e);
529
1156
            ++n;
530
1157
        }
531
1158
        return n;
532
1159
    }
533
1160
 
534
 
    // Traversal-based methods
535
 
 
536
1161
    /**
537
 
     * Returns head after performing any outstanding helping steps.
 
1162
     * Returns an iterator over the elements in this queue in proper sequence.
 
1163
     * The elements will be returned in order from first (head) to last (tail).
 
1164
     *
 
1165
     * <p>The returned iterator is a "weakly consistent" iterator that
 
1166
     * will never throw {@link java.util.ConcurrentModificationException
 
1167
     * ConcurrentModificationException}, and guarantees to traverse
 
1168
     * elements as they existed upon construction of the iterator, and
 
1169
     * may (but is not guaranteed to) reflect any modifications
 
1170
     * subsequent to construction.
 
1171
     *
 
1172
     * @return an iterator over the elements in this queue in proper sequence
538
1173
     */
539
 
    private QNode traversalHead() {
540
 
        for (;;) {
541
 
            QNode t = tail.get();
542
 
            QNode h = head.get();
543
 
            if (h != null && t != null) {
544
 
                QNode last = t.next;
545
 
                QNode first = h.next;
546
 
                if (t == tail.get()) {
547
 
                    if (last != null)
548
 
                        tail.compareAndSet(t, last);
549
 
                    else if (first != null) {
550
 
                        Object x = first.get();
551
 
                        if (x == first)
552
 
                            advanceHead(h, first);
553
 
                        else
554
 
                            return h;
555
 
                    }
556
 
                    else
557
 
                        return h;
558
 
                }
559
 
            }
560
 
            reclean();
561
 
        }
562
 
    }
563
 
 
564
 
 
565
1174
    public Iterator<E> iterator() {
566
1175
        return new Itr();
567
1176
    }
568
1177
 
569
 
    /**
570
 
     * Iterators. Basic strategy is to traverse list, treating
571
 
     * non-data (i.e., request) nodes as terminating list.
572
 
     * Once a valid data node is found, the item is cached
573
 
     * so that the next call to next() will return it even
574
 
     * if subsequently removed.
575
 
     */
576
 
    class Itr implements Iterator<E> {
577
 
        QNode next;        // node to return next
578
 
        QNode pnext;       // predecessor of next
579
 
        QNode snext;       // successor of next
580
 
        QNode curr;        // last returned node, for remove()
581
 
        QNode pcurr;       // predecessor of curr, for remove()
582
 
        E nextItem;        // Cache of next item, once commited to in next
583
 
 
584
 
        Itr() {
585
 
            findNext();
586
 
        }
587
 
 
588
 
        /**
589
 
         * Ensures next points to next valid node, or null if none.
590
 
         */
591
 
        void findNext() {
592
 
            for (;;) {
593
 
                QNode pred = pnext;
594
 
                QNode q = next;
595
 
                if (pred == null || pred == q) {
596
 
                    pred = traversalHead();
597
 
                    q = pred.next;
598
 
                }
599
 
                if (q == null || !q.isData) {
600
 
                    next = null;
601
 
                    return;
602
 
                }
603
 
                Object x = q.get();
604
 
                QNode s = q.next;
605
 
                if (x != null && q != x && q != s) {
606
 
                    nextItem = (E)x;
607
 
                    snext = s;
608
 
                    pnext = pred;
609
 
                    next = q;
610
 
                    return;
611
 
                }
612
 
                pnext = q;
613
 
                next = s;
614
 
            }
615
 
        }
616
 
 
617
 
        public boolean hasNext() {
618
 
            return next != null;
619
 
        }
620
 
 
621
 
        public E next() {
622
 
            if (next == null) throw new NoSuchElementException();
623
 
            pcurr = pnext;
624
 
            curr = next;
625
 
            pnext = next;
626
 
            next = snext;
627
 
            E x = nextItem;
628
 
            findNext();
629
 
            return x;
630
 
        }
631
 
 
632
 
        public void remove() {
633
 
            QNode p = curr;
634
 
            if (p == null)
635
 
                throw new IllegalStateException();
636
 
            Object x = p.get();
637
 
            if (x != null && x != p && p.compareAndSet(x, p))
638
 
                clean(pcurr, p);
639
 
        }
640
 
    }
641
 
 
642
1178
    public E peek() {
643
 
        for (;;) {
644
 
            QNode h = traversalHead();
645
 
            QNode p = h.next;
646
 
            if (p == null)
647
 
                return null;
648
 
            Object x = p.get();
649
 
            if (p != x) {
650
 
                if (!p.isData)
651
 
                    return null;
652
 
                if (x != null)
653
 
                    return (E)x;
654
 
            }
655
 
        }
 
1179
        return firstDataItem();
656
1180
    }
657
1181
 
 
1182
    /**
 
1183
     * Returns {@code true} if this queue contains no elements.
 
1184
     *
 
1185
     * @return {@code true} if this queue contains no elements
 
1186
     */
658
1187
    public boolean isEmpty() {
659
 
        for (;;) {
660
 
            QNode h = traversalHead();
661
 
            QNode p = h.next;
662
 
            if (p == null)
663
 
                return true;
664
 
            Object x = p.get();
665
 
            if (p != x) {
666
 
                if (!p.isData)
667
 
                    return true;
668
 
                if (x != null)
669
 
                    return false;
670
 
            }
 
1188
        for (Node p = head; p != null; p = succ(p)) {
 
1189
            if (!p.isMatched())
 
1190
                return !p.isData;
671
1191
        }
 
1192
        return true;
672
1193
    }
673
1194
 
674
1195
    public boolean hasWaitingConsumer() {
675
 
        for (;;) {
676
 
            QNode h = traversalHead();
677
 
            QNode p = h.next;
678
 
            if (p == null)
679
 
                return false;
680
 
            Object x = p.get();
681
 
            if (p != x)
682
 
                return !p.isData;
683
 
        }
 
1196
        return firstOfMode(false) != null;
684
1197
    }
685
1198
 
686
1199
    /**
696
1209
     * @return the number of elements in this queue
697
1210
     */
698
1211
    public int size() {
699
 
        int count = 0;
700
 
        QNode h = traversalHead();
701
 
        for (QNode p = h.next; p != null && p.isData; p = p.next) {
702
 
            Object x = p.get();
703
 
            if (x != null && x != p) {
704
 
                if (++count == Integer.MAX_VALUE) // saturated
705
 
                    break;
706
 
            }
707
 
        }
708
 
        return count;
 
1212
        return countOfMode(true);
709
1213
    }
710
1214
 
711
1215
    public int getWaitingConsumerCount() {
712
 
        int count = 0;
713
 
        QNode h = traversalHead();
714
 
        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
715
 
            if (p.get() == null) {
716
 
                if (++count == Integer.MAX_VALUE)
717
 
                    break;
 
1216
        return countOfMode(false);
 
1217
    }
 
1218
 
 
1219
    /**
 
1220
     * Removes a single instance of the specified element from this queue,
 
1221
     * if it is present.  More formally, removes an element {@code e} such
 
1222
     * that {@code o.equals(e)}, if this queue contains one or more such
 
1223
     * elements.
 
1224
     * Returns {@code true} if this queue contained the specified element
 
1225
     * (or equivalently, if this queue changed as a result of the call).
 
1226
     *
 
1227
     * @param o element to be removed from this queue, if present
 
1228
     * @return {@code true} if this queue changed as a result of the call
 
1229
     */
 
1230
    public boolean remove(Object o) {
 
1231
        return findAndRemove(o);
 
1232
    }
 
1233
 
 
1234
    /**
 
1235
     * Returns {@code true} if this queue contains the specified element.
 
1236
     * More formally, returns {@code true} if and only if this queue contains
 
1237
     * at least one element {@code e} such that {@code o.equals(e)}.
 
1238
     *
 
1239
     * @param o object to be checked for containment in this queue
 
1240
     * @return {@code true} if this queue contains the specified element
 
1241
     */
 
1242
    public boolean contains(Object o) {
 
1243
        if (o == null) return false;
 
1244
        for (Node p = head; p != null; p = succ(p)) {
 
1245
            Object item = p.item;
 
1246
            if (p.isData) {
 
1247
                if (item != null && item != p && o.equals(item))
 
1248
                    return true;
718
1249
            }
 
1250
            else if (item == null)
 
1251
                break;
719
1252
        }
720
 
        return count;
 
1253
        return false;
721
1254
    }
722
1255
 
 
1256
    /**
 
1257
     * Always returns {@code Integer.MAX_VALUE} because a
 
1258
     * {@code LinkedTransferQueue} is not capacity constrained.
 
1259
     *
 
1260
     * @return {@code Integer.MAX_VALUE} (as specified by
 
1261
     *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
 
1262
     *         BlockingQueue.remainingCapacity})
 
1263
     */
723
1264
    public int remainingCapacity() {
724
1265
        return Integer.MAX_VALUE;
725
1266
    }
726
1267
 
727
 
    public boolean remove(Object o) {
728
 
        if (o == null)
729
 
            return false;
730
 
        for (;;) {
731
 
            QNode pred = traversalHead();
732
 
            for (;;) {
733
 
                QNode q = pred.next;
734
 
                if (q == null || !q.isData)
735
 
                    return false;
736
 
                if (q == pred) // restart
737
 
                    break;
738
 
                Object x = q.get();
739
 
                if (x != null && x != q && o.equals(x) &&
740
 
                    q.compareAndSet(x, q)) {
741
 
                    clean(pred, q);
742
 
                    return true;
743
 
                }
744
 
                pred = q;
745
 
            }
746
 
        }
747
 
    }
748
 
 
749
1268
    /**
750
 
     * Save the state to a stream (that is, serialize it).
 
1269
     * Saves the state to a stream (that is, serializes it).
751
1270
     *
752
1271
     * @serialData All of the elements (each an {@code E}) in
753
1272
     * the proper order, followed by a null
763
1282
    }
764
1283
 
765
1284
    /**
766
 
     * Reconstitute the Queue instance from a stream (that is,
767
 
     * deserialize it).
 
1285
     * Reconstitutes the Queue instance from a stream (that is,
 
1286
     * deserializes it).
 
1287
     *
768
1288
     * @param s the stream
769
1289
     */
770
1290
    private void readObject(java.io.ObjectInputStream s)
771
1291
        throws java.io.IOException, ClassNotFoundException {
772
1292
        s.defaultReadObject();
773
 
        resetHeadAndTail();
774
1293
        for (;;) {
775
 
            E item = (E)s.readObject();
 
1294
            @SuppressWarnings("unchecked")
 
1295
            E item = (E) s.readObject();
776
1296
            if (item == null)
777
1297
                break;
778
1298
            else
780
1300
        }
781
1301
    }
782
1302
 
783
 
 
784
 
    // Support for resetting head/tail while deserializing
785
 
    private void resetHeadAndTail() {
786
 
        QNode dummy = new QNode(null, false);
787
 
        _unsafe.putObjectVolatile(this, headOffset,
788
 
                                  new PaddedAtomicReference<QNode>(dummy));
789
 
        _unsafe.putObjectVolatile(this, tailOffset,
790
 
                                  new PaddedAtomicReference<QNode>(dummy));
791
 
        _unsafe.putObjectVolatile(this, cleanMeOffset,
792
 
                                  new PaddedAtomicReference<QNode>(null));
793
 
    }
794
 
 
795
 
    // Temporary Unsafe mechanics for preliminary release
796
 
    private static Unsafe getUnsafe() throws Throwable {
797
 
        try {
798
 
            return Unsafe.getUnsafe();
799
 
        } catch (SecurityException se) {
800
 
            try {
801
 
                return java.security.AccessController.doPrivileged
802
 
                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
803
 
                        public Unsafe run() throws Exception {
804
 
                            return getUnsafePrivileged();
805
 
                        }});
806
 
            } catch (java.security.PrivilegedActionException e) {
807
 
                throw e.getCause();
808
 
            }
809
 
        }
810
 
    }
811
 
 
812
 
    private static Unsafe getUnsafePrivileged()
813
 
            throws NoSuchFieldException, IllegalAccessException {
814
 
        Field f = Unsafe.class.getDeclaredField("theUnsafe");
815
 
        f.setAccessible(true);
816
 
        return (Unsafe) f.get(null);
817
 
    }
818
 
 
819
 
    private static long fieldOffset(String fieldName)
820
 
            throws NoSuchFieldException {
821
 
        return _unsafe.objectFieldOffset
822
 
            (LinkedTransferQueue.class.getDeclaredField(fieldName));
823
 
    }
824
 
 
825
 
    private static final Unsafe _unsafe;
 
1303
    // Unsafe mechanics
 
1304
 
 
1305
    private static final sun.misc.Unsafe UNSAFE;
826
1306
    private static final long headOffset;
827
1307
    private static final long tailOffset;
828
 
    private static final long cleanMeOffset;
 
1308
    private static final long sweepVotesOffset;
829
1309
    static {
830
1310
        try {
831
 
            _unsafe = getUnsafe();
832
 
            headOffset = fieldOffset("head");
833
 
            tailOffset = fieldOffset("tail");
834
 
            cleanMeOffset = fieldOffset("cleanMe");
835
 
        } catch (Throwable e) {
836
 
            throw new RuntimeException("Could not initialize intrinsics", e);
 
1311
            UNSAFE = getUnsafe();
 
1312
            Class<?> k = LinkedTransferQueue.class;
 
1313
            headOffset = UNSAFE.objectFieldOffset
 
1314
                (k.getDeclaredField("head"));
 
1315
            tailOffset = UNSAFE.objectFieldOffset
 
1316
                (k.getDeclaredField("tail"));
 
1317
            sweepVotesOffset = UNSAFE.objectFieldOffset
 
1318
                (k.getDeclaredField("sweepVotes"));
 
1319
        } catch (Exception e) {
 
1320
            throw new Error(e);
837
1321
        }
838
1322
    }
839
1323
 
 
1324
    /**
 
1325
     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
 
1326
     * Replace with a simple call to Unsafe.getUnsafe when integrating
 
1327
     * into a jdk.
 
1328
     *
 
1329
     * @return a sun.misc.Unsafe
 
1330
     */
 
1331
    static sun.misc.Unsafe getUnsafe() {
 
1332
        return scala.concurrent.util.Unsafe.instance;
 
1333
    }
 
1334
 
840
1335
}