~ubuntu-branches/ubuntu/trusty/monodevelop/trusty-proposed

« back to all changes in this revision

Viewing changes to external/ikvm/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java

  • Committer: Package Import Robot
  • Author(s): Jo Shields
  • Date: 2013-05-12 09:46:03 UTC
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20130512094603-mad323bzcxvmcam0
Tags: upstream-4.0.5+dfsg
ImportĀ upstreamĀ versionĀ 4.0.5+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
 
3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 
4
 *
 
5
 * This code is free software; you can redistribute it and/or modify it
 
6
 * under the terms of the GNU General Public License version 2 only, as
 
7
 * published by the Free Software Foundation.  Oracle designates this
 
8
 * particular file as subject to the "Classpath" exception as provided
 
9
 * by Oracle in the LICENSE file that accompanied this code.
 
10
 *
 
11
 * This code is distributed in the hope that it will be useful, but WITHOUT
 
12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 
14
 * version 2 for more details (a copy is included in the LICENSE file that
 
15
 * accompanied this code).
 
16
 *
 
17
 * You should have received a copy of the GNU General Public License version
 
18
 * 2 along with this work; if not, write to the Free Software Foundation,
 
19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 
20
 *
 
21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 
22
 * or visit www.oracle.com if you need additional information or have any
 
23
 * questions.
 
24
 */
 
25
 
 
26
package sun.nio.ch;
 
27
 
 
28
import java.nio.channels.*;
 
29
import java.nio.ByteBuffer;
 
30
import java.nio.BufferOverflowException;
 
31
import java.net.*;
 
32
import java.util.concurrent.*;
 
33
import java.io.FileDescriptor;
 
34
import java.io.IOException;
 
35
 
 
36
/**
 
37
 * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
 
38
 */
 
39
 
 
40
class WindowsAsynchronousSocketChannelImpl
 
41
    extends AsynchronousSocketChannelImpl
 
42
{
 
43
    // maximum vector size for scatter/gather I/O
 
44
    private static final int MAX_WSABUF     = 16;
 
45
 
 
46
    // I/O completion port that the socket is associated with
 
47
    private final Iocp iocp;
 
48
 
 
49
 
 
50
    WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
 
51
        throws IOException
 
52
    {
 
53
        super(iocp);
 
54
 
 
55
        this.iocp = iocp;
 
56
    }
 
57
 
 
58
    WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
 
59
        this(iocp, true);
 
60
    }
 
61
 
 
62
    @Override
 
63
    public AsynchronousChannelGroupImpl group() {
 
64
        return iocp;
 
65
    }
 
66
 
 
67
    // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
 
68
    // accept
 
69
    void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
 
70
        synchronized (stateLock) {
 
71
            state = ST_CONNECTED;
 
72
            this.localAddress = localAddress;
 
73
            this.remoteAddress = remoteAddress;
 
74
        }
 
75
    }
 
76
 
 
77
    @Override
 
78
    void implClose() throws IOException {
 
79
        // close socket (may cause outstanding async I/O operations to fail).
 
80
        SocketDispatcher.closeImpl(fd);
 
81
    }
 
82
 
 
83
    @Override
 
84
    public void onCancel(PendingFuture<?,?> task) {
 
85
        if (task.getContext() instanceof ConnectTask)
 
86
            killConnect();
 
87
        if (task.getContext() instanceof ReadTask)
 
88
            killReading();
 
89
        if (task.getContext() instanceof WriteTask)
 
90
            killWriting();
 
91
    }
 
92
 
 
93
    /**
 
94
     * Implements the task to initiate a connection and the handler to
 
95
     * consume the result when the connection is established (or fails).
 
96
     */
 
97
    private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
 
98
        private final InetSocketAddress remote;
 
99
        private final PendingFuture<Void,A> result;
 
100
 
 
101
        ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
 
102
            this.remote = remote;
 
103
            this.result = result;
 
104
        }
 
105
 
 
106
        private void closeChannel() {
 
107
            try {
 
108
                close();
 
109
            } catch (IOException ignore) { }
 
110
        }
 
111
 
 
112
        private IOException toIOException(Throwable x) {
 
113
            if (x instanceof IOException) {
 
114
                if (x instanceof ClosedChannelException)
 
115
                    x = new AsynchronousCloseException();
 
116
                return (IOException)x;
 
117
            }
 
118
            return new IOException(x);
 
119
        }
 
120
 
 
121
        /**
 
122
         * Invoke after a connection is successfully established.
 
123
         */
 
124
        private void afterConnect() throws IOException {
 
125
            updateConnectContext(fd);
 
126
            synchronized (stateLock) {
 
127
                state = ST_CONNECTED;
 
128
                remoteAddress = remote;
 
129
            }
 
130
        }
 
131
 
 
132
        /**
 
133
         * Task to initiate a connection.
 
134
         */
 
135
        @Override
 
136
        public void run() {
 
137
            Throwable exc = null;
 
138
            try {
 
139
                begin();
 
140
 
 
141
                // synchronize on result to allow this thread handle the case
 
142
                // where the connection is established immediately.
 
143
                synchronized (result) {
 
144
                    // initiate the connection
 
145
                    int n = connect0(fd, Net.isIPv6Available(), remote.getAddress(),
 
146
                                     remote.getPort(), this);
 
147
                    if (n == IOStatus.UNAVAILABLE) {
 
148
                        // connection is pending
 
149
                        return;
 
150
                    }
 
151
 
 
152
                    // connection established immediately
 
153
                    afterConnect();
 
154
                    result.setResult(null);
 
155
                }
 
156
            } catch (Throwable x) {
 
157
                exc = x;
 
158
            } finally {
 
159
                end();
 
160
            }
 
161
 
 
162
            if (exc != null) {
 
163
                closeChannel();
 
164
                result.setFailure(toIOException(exc));
 
165
            }
 
166
            Invoker.invoke(result);
 
167
        }
 
168
 
 
169
        /**
 
170
         * Invoked by handler thread when connection established.
 
171
         */
 
172
        @Override
 
173
        public void completed(int bytesTransferred, boolean canInvokeDirect) {
 
174
            Throwable exc = null;
 
175
            try {
 
176
                begin();
 
177
                afterConnect();
 
178
                result.setResult(null);
 
179
            } catch (Throwable x) {
 
180
                // channel is closed or unable to finish connect
 
181
                exc = x;
 
182
            } finally {
 
183
                end();
 
184
            }
 
185
 
 
186
            // can't close channel while in begin/end block
 
187
            if (exc != null) {
 
188
                closeChannel();
 
189
                result.setFailure(toIOException(exc));
 
190
            }
 
191
 
 
192
            if (canInvokeDirect) {
 
193
                Invoker.invokeUnchecked(result);
 
194
            } else {
 
195
                Invoker.invoke(result);
 
196
            }
 
197
        }
 
198
 
 
199
        /**
 
200
         * Invoked by handler thread when failed to establish connection.
 
201
         */
 
202
        @Override
 
203
        public void failed(int error, IOException x) {
 
204
            if (isOpen()) {
 
205
                closeChannel();
 
206
                result.setFailure(x);
 
207
            } else {
 
208
                result.setFailure(new AsynchronousCloseException());
 
209
            }
 
210
            Invoker.invoke(result);
 
211
        }
 
212
    }
 
213
 
 
214
    @Override
 
215
    <A> Future<Void> implConnect(SocketAddress remote,
 
216
                                 A attachment,
 
217
                                 CompletionHandler<Void,? super A> handler)
 
218
    {
 
219
        if (!isOpen()) {
 
220
            Throwable exc = new ClosedChannelException();
 
221
            if (handler == null)
 
222
                return CompletedFuture.withFailure(exc);
 
223
            Invoker.invoke(this, handler, attachment, null, exc);
 
224
            return null;
 
225
        }
 
226
 
 
227
        InetSocketAddress isa = Net.checkAddress(remote);
 
228
 
 
229
        // permission check
 
230
        SecurityManager sm = System.getSecurityManager();
 
231
        if (sm != null)
 
232
            sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 
233
 
 
234
        // check and update state
 
235
        // ConnectEx requires the socket to be bound to a local address
 
236
        IOException bindException = null;
 
237
        synchronized (stateLock) {
 
238
            if (state == ST_CONNECTED)
 
239
                throw new AlreadyConnectedException();
 
240
            if (state == ST_PENDING)
 
241
                throw new ConnectionPendingException();
 
242
            if (localAddress == null) {
 
243
                try {
 
244
                    bind(new InetSocketAddress(0));
 
245
                } catch (IOException x) {
 
246
                    bindException = x;
 
247
                }
 
248
            }
 
249
            if (bindException == null)
 
250
                state = ST_PENDING;
 
251
        }
 
252
 
 
253
        // handle bind failure
 
254
        if (bindException != null) {
 
255
            try {
 
256
                close();
 
257
            } catch (IOException ignore) { }
 
258
            if (handler == null)
 
259
                return CompletedFuture.withFailure(bindException);
 
260
            Invoker.invoke(this, handler, attachment, null, bindException);
 
261
            return null;
 
262
        }
 
263
 
 
264
        // setup task
 
265
        PendingFuture<Void,A> result =
 
266
            new PendingFuture<Void,A>(this, handler, attachment);
 
267
        ConnectTask task = new ConnectTask<A>(isa, result);
 
268
        result.setContext(task);
 
269
 
 
270
        // initiate I/O
 
271
        if (Iocp.supportsThreadAgnosticIo()) {
 
272
            task.run();
 
273
        } else {
 
274
            Invoker.invokeOnThreadInThreadPool(this, task);
 
275
        }
 
276
        return result;
 
277
    }
 
278
 
 
279
    /**
 
280
     * Implements the task to initiate a read and the handler to consume the
 
281
     * result when the read completes.
 
282
     */
 
283
    private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
 
284
        private final ByteBuffer[] bufs;
 
285
        private final int numBufs;
 
286
        private final boolean scatteringRead;
 
287
        private final PendingFuture<V,A> result;
 
288
 
 
289
        // set by run method
 
290
        private ByteBuffer[] shadow;
 
291
 
 
292
        ReadTask(ByteBuffer[] bufs,
 
293
                 boolean scatteringRead,
 
294
                 PendingFuture<V,A> result)
 
295
        {
 
296
            this.bufs = bufs;
 
297
            this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 
298
            this.scatteringRead = scatteringRead;
 
299
            this.result = result;
 
300
        }
 
301
 
 
302
        /**
 
303
         * Invoked prior to read to prepare the WSABUF array. Where necessary,
 
304
         * it substitutes direct buffers with managed buffers.
 
305
         */
 
306
        void prepareBuffers() {
 
307
            shadow = new ByteBuffer[numBufs];
 
308
            for (int i=0; i<numBufs; i++) {
 
309
                ByteBuffer dst = bufs[i];
 
310
                int pos = dst.position();
 
311
                int lim = dst.limit();
 
312
                assert (pos <= lim);
 
313
                int rem = (pos <= lim ? lim - pos : 0);
 
314
                if (!dst.hasArray()) {
 
315
                    // substitute with direct buffer
 
316
                    ByteBuffer bb = ByteBuffer.allocate(rem);
 
317
                    shadow[i] = bb;
 
318
                } else {
 
319
                    shadow[i] = dst;
 
320
                }
 
321
            }
 
322
        }
 
323
 
 
324
        /**
 
325
         * Invoked after a read has completed to update the buffer positions
 
326
         * and release any substituted buffers.
 
327
         */
 
328
        void updateBuffers(int bytesRead) {
 
329
            for (int i=0; i<numBufs; i++) {
 
330
                ByteBuffer nextBuffer = shadow[i];
 
331
                int pos = nextBuffer.position();
 
332
                int len = nextBuffer.remaining();
 
333
                if (bytesRead >= len) {
 
334
                    bytesRead -= len;
 
335
                    int newPosition = pos + len;
 
336
                    try {
 
337
                        nextBuffer.position(newPosition);
 
338
                    } catch (IllegalArgumentException x) {
 
339
                        // position changed by another
 
340
                    }
 
341
                } else { // Buffers not completely filled
 
342
                    if (bytesRead > 0) {
 
343
                        assert(pos + bytesRead < (long)Integer.MAX_VALUE);
 
344
                        int newPosition = pos + bytesRead;
 
345
                        try {
 
346
                            nextBuffer.position(newPosition);
 
347
                        } catch (IllegalArgumentException x) {
 
348
                            // position changed by another
 
349
                        }
 
350
                    }
 
351
                    break;
 
352
                }
 
353
            }
 
354
 
 
355
            // Put results from shadow into the slow buffers
 
356
            for (int i=0; i<numBufs; i++) {
 
357
                if (!bufs[i].hasArray()) {
 
358
                    shadow[i].flip();
 
359
                    try {
 
360
                        bufs[i].put(shadow[i]);
 
361
                    } catch (BufferOverflowException x) {
 
362
                        // position changed by another
 
363
                    }
 
364
                }
 
365
            }
 
366
        }
 
367
 
 
368
        void releaseBuffers() {
 
369
        }
 
370
 
 
371
        @Override
 
372
        @SuppressWarnings("unchecked")
 
373
        public void run() {
 
374
            boolean prepared = false;
 
375
            boolean pending = false;
 
376
 
 
377
            try {
 
378
                begin();
 
379
 
 
380
                // substitute direct buffers
 
381
                prepareBuffers();
 
382
                prepared = true;
 
383
 
 
384
                // initiate read
 
385
                int n = read0(fd, shadow, this);
 
386
                if (n == IOStatus.UNAVAILABLE) {
 
387
                    // I/O is pending
 
388
                    pending = true;
 
389
                    return;
 
390
                }
 
391
                if (n == IOStatus.EOF) {
 
392
                    // input shutdown
 
393
                    enableReading();
 
394
                    if (scatteringRead) {
 
395
                        result.setResult((V)Long.valueOf(-1L));
 
396
                    } else {
 
397
                        result.setResult((V)Integer.valueOf(-1));
 
398
                    }
 
399
                }
 
400
                // read completed immediately
 
401
                if (n == 0) {
 
402
                    n = -1; // EOF
 
403
                } else {
 
404
                    updateBuffers(n);
 
405
                }
 
406
                releaseBuffers();
 
407
                enableReading();
 
408
                if (scatteringRead) {
 
409
                    result.setResult((V)Long.valueOf(n));
 
410
                } else {
 
411
                    result.setResult((V)Integer.valueOf(n));
 
412
                }
 
413
            } catch (Throwable x) {
 
414
                // failed to initiate read
 
415
                // reset read flag before releasing waiters
 
416
                enableReading();
 
417
                if (x instanceof ClosedChannelException)
 
418
                    x = new AsynchronousCloseException();
 
419
                if (!(x instanceof IOException))
 
420
                    x = new IOException(x);
 
421
                result.setFailure(x);
 
422
            } finally {
 
423
                // release resources if I/O not pending
 
424
                if (!pending) {
 
425
                    if (prepared)
 
426
                        releaseBuffers();
 
427
                }
 
428
                end();
 
429
            }
 
430
 
 
431
            // invoke completion handler
 
432
            Invoker.invoke(result);
 
433
        }
 
434
 
 
435
        /**
 
436
         * Executed when the I/O has completed
 
437
         */
 
438
        @Override
 
439
        @SuppressWarnings("unchecked")
 
440
        public void completed(int bytesTransferred, boolean canInvokeDirect) {
 
441
            if (bytesTransferred == 0) {
 
442
                bytesTransferred = -1;  // EOF
 
443
            } else {
 
444
                updateBuffers(bytesTransferred);
 
445
            }
 
446
 
 
447
            // return direct buffer to cache if substituted
 
448
            releaseBuffers();
 
449
 
 
450
            // release waiters if not already released by timeout
 
451
            synchronized (result) {
 
452
                if (result.isDone())
 
453
                    return;
 
454
                enableReading();
 
455
                if (scatteringRead) {
 
456
                    result.setResult((V)Long.valueOf(bytesTransferred));
 
457
                } else {
 
458
                    result.setResult((V)Integer.valueOf(bytesTransferred));
 
459
                }
 
460
            }
 
461
            if (canInvokeDirect) {
 
462
                Invoker.invokeUnchecked(result);
 
463
            } else {
 
464
                Invoker.invoke(result);
 
465
            }
 
466
        }
 
467
 
 
468
        @Override
 
469
        public void failed(int error, IOException x) {
 
470
            // return direct buffer to cache if substituted
 
471
            releaseBuffers();
 
472
 
 
473
            // release waiters if not already released by timeout
 
474
            if (!isOpen())
 
475
                x = new AsynchronousCloseException();
 
476
 
 
477
            synchronized (result) {
 
478
                if (result.isDone())
 
479
                    return;
 
480
                enableReading();
 
481
                result.setFailure(x);
 
482
            }
 
483
            Invoker.invoke(result);
 
484
        }
 
485
 
 
486
        /**
 
487
         * Invoked if timeout expires before it is cancelled
 
488
         */
 
489
        void timeout() {
 
490
            // synchronize on result as the I/O could complete/fail
 
491
            synchronized (result) {
 
492
                if (result.isDone())
 
493
                    return;
 
494
 
 
495
                // kill further reading before releasing waiters
 
496
                enableReading(true);
 
497
                result.setFailure(new InterruptedByTimeoutException());
 
498
            }
 
499
 
 
500
            // invoke handler without any locks
 
501
            Invoker.invoke(result);
 
502
        }
 
503
    }
 
504
 
 
505
    @Override
 
506
    <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
 
507
                                            ByteBuffer dst,
 
508
                                            ByteBuffer[] dsts,
 
509
                                            long timeout,
 
510
                                            TimeUnit unit,
 
511
                                            A attachment,
 
512
                                            CompletionHandler<V,? super A> handler)
 
513
    {
 
514
        // setup task
 
515
        PendingFuture<V,A> result =
 
516
            new PendingFuture<V,A>(this, handler, attachment);
 
517
        ByteBuffer[] bufs;
 
518
        if (isScatteringRead) {
 
519
            bufs = dsts;
 
520
        } else {
 
521
            bufs = new ByteBuffer[1];
 
522
            bufs[0] = dst;
 
523
        }
 
524
        final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
 
525
        result.setContext(readTask);
 
526
 
 
527
        // schedule timeout
 
528
        if (timeout > 0L) {
 
529
            Future<?> timeoutTask = iocp.schedule(new Runnable() {
 
530
                public void run() {
 
531
                    readTask.timeout();
 
532
                }
 
533
            }, timeout, unit);
 
534
            result.setTimeoutTask(timeoutTask);
 
535
        }
 
536
 
 
537
        // initiate I/O
 
538
        if (Iocp.supportsThreadAgnosticIo()) {
 
539
            readTask.run();
 
540
        } else {
 
541
            Invoker.invokeOnThreadInThreadPool(this, readTask);
 
542
        }
 
543
        return result;
 
544
    }
 
545
 
 
546
    /**
 
547
     * Implements the task to initiate a write and the handler to consume the
 
548
     * result when the write completes.
 
549
     */
 
550
    private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
 
551
        private final ByteBuffer[] bufs;
 
552
        private final int numBufs;
 
553
        private final boolean gatheringWrite;
 
554
        private final PendingFuture<V,A> result;
 
555
 
 
556
        // set by run method
 
557
        private ByteBuffer[] shadow;
 
558
 
 
559
        WriteTask(ByteBuffer[] bufs,
 
560
                  boolean gatheringWrite,
 
561
                  PendingFuture<V,A> result)
 
562
        {
 
563
            this.bufs = bufs;
 
564
            this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 
565
            this.gatheringWrite = gatheringWrite;
 
566
            this.result = result;
 
567
        }
 
568
 
 
569
        /**
 
570
         * Invoked prior to write to prepare the WSABUF array. Where necessary,
 
571
         * it substitutes direct buffers with managed buffers.
 
572
         */
 
573
        void prepareBuffers() {
 
574
            shadow = new ByteBuffer[numBufs];
 
575
            for (int i=0; i<numBufs; i++) {
 
576
                ByteBuffer src = bufs[i];
 
577
                int pos = src.position();
 
578
                int lim = src.limit();
 
579
                assert (pos <= lim);
 
580
                int rem = (pos <= lim ? lim - pos : 0);
 
581
                if (!src.hasArray()) {
 
582
                    // substitute with direct buffer
 
583
                    ByteBuffer bb = ByteBuffer.allocate(rem);
 
584
                    bb.put(src);
 
585
                    bb.flip();
 
586
                    src.position(pos);  // leave heap buffer untouched for now
 
587
                    shadow[i] = bb;
 
588
                } else {
 
589
                    shadow[i] = src;
 
590
                }
 
591
            }
 
592
        }
 
593
 
 
594
        /**
 
595
         * Invoked after a write has completed to update the buffer positions
 
596
         * and release any substituted buffers.
 
597
         */
 
598
        void updateBuffers(int bytesWritten) {
 
599
            // Notify the buffers how many bytes were taken
 
600
            for (int i=0; i<numBufs; i++) {
 
601
                ByteBuffer nextBuffer = bufs[i];
 
602
                int pos = nextBuffer.position();
 
603
                int lim = nextBuffer.limit();
 
604
                int len = (pos <= lim ? lim - pos : lim);
 
605
                if (bytesWritten >= len) {
 
606
                    bytesWritten -= len;
 
607
                    int newPosition = pos + len;
 
608
                    try {
 
609
                        nextBuffer.position(newPosition);
 
610
                    } catch (IllegalArgumentException x) {
 
611
                        // position changed by someone else
 
612
                    }
 
613
                } else { // Buffers not completely filled
 
614
                    if (bytesWritten > 0) {
 
615
                        assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
 
616
                        int newPosition = pos + bytesWritten;
 
617
                        try {
 
618
                            nextBuffer.position(newPosition);
 
619
                        } catch (IllegalArgumentException x) {
 
620
                            // position changed by someone else
 
621
                        }
 
622
                    }
 
623
                    break;
 
624
                }
 
625
            }
 
626
        }
 
627
 
 
628
        void releaseBuffers() {
 
629
        }
 
630
 
 
631
        @Override
 
632
        //@SuppressWarnings("unchecked")
 
633
        public void run() {
 
634
            boolean prepared = false;
 
635
            boolean pending = false;
 
636
            boolean shutdown = false;
 
637
 
 
638
            try {
 
639
                begin();
 
640
 
 
641
                // substitute direct buffers
 
642
                prepareBuffers();
 
643
                prepared = true;
 
644
 
 
645
                int n = write0(fd, shadow, this);
 
646
                if (n == IOStatus.UNAVAILABLE) {
 
647
                    // I/O is pending
 
648
                    pending = true;
 
649
                    return;
 
650
                }
 
651
                if (n == IOStatus.EOF) {
 
652
                    // special case for shutdown output
 
653
                    shutdown = true;
 
654
                    throw new ClosedChannelException();
 
655
                }
 
656
                // write completed immediately
 
657
                updateBuffers(n);
 
658
                releaseBuffers();
 
659
                enableWriting();
 
660
                if (gatheringWrite) {
 
661
                    result.setResult((V)Long.valueOf(n));
 
662
                } else {
 
663
                    result.setResult((V)Integer.valueOf(n));
 
664
                }
 
665
            } catch (Throwable x) {
 
666
                // write failed. Enable writing before releasing waiters.
 
667
                enableWriting();
 
668
                if (!shutdown && (x instanceof ClosedChannelException))
 
669
                    x = new AsynchronousCloseException();
 
670
                if (!(x instanceof IOException))
 
671
                    x = new IOException(x);
 
672
                result.setFailure(x);
 
673
            } finally {
 
674
                // release resources if I/O not pending
 
675
                if (!pending) {
 
676
                    if (prepared)
 
677
                        releaseBuffers();
 
678
                }
 
679
                end();
 
680
            }
 
681
 
 
682
            // invoke completion handler
 
683
            Invoker.invoke(result);
 
684
        }
 
685
 
 
686
        /**
 
687
         * Executed when the I/O has completed
 
688
         */
 
689
        @Override
 
690
        @SuppressWarnings("unchecked")
 
691
        public void completed(int bytesTransferred, boolean canInvokeDirect) {
 
692
            updateBuffers(bytesTransferred);
 
693
 
 
694
            // return direct buffer to cache if substituted
 
695
            releaseBuffers();
 
696
 
 
697
            // release waiters if not already released by timeout
 
698
            synchronized (result) {
 
699
                if (result.isDone())
 
700
                    return;
 
701
                enableWriting();
 
702
                if (gatheringWrite) {
 
703
                    result.setResult((V)Long.valueOf(bytesTransferred));
 
704
                } else {
 
705
                    result.setResult((V)Integer.valueOf(bytesTransferred));
 
706
                }
 
707
            }
 
708
            if (canInvokeDirect) {
 
709
                Invoker.invokeUnchecked(result);
 
710
            } else {
 
711
                Invoker.invoke(result);
 
712
            }
 
713
        }
 
714
 
 
715
        @Override
 
716
        public void failed(int error, IOException x) {
 
717
            // return direct buffer to cache if substituted
 
718
            releaseBuffers();
 
719
 
 
720
            // release waiters if not already released by timeout
 
721
            if (!isOpen())
 
722
                x = new AsynchronousCloseException();
 
723
 
 
724
            synchronized (result) {
 
725
                if (result.isDone())
 
726
                    return;
 
727
                enableWriting();
 
728
                result.setFailure(x);
 
729
            }
 
730
            Invoker.invoke(result);
 
731
        }
 
732
 
 
733
        /**
 
734
         * Invoked if timeout expires before it is cancelled
 
735
         */
 
736
        void timeout() {
 
737
            // synchronize on result as the I/O could complete/fail
 
738
            synchronized (result) {
 
739
                if (result.isDone())
 
740
                    return;
 
741
 
 
742
                // kill further writing before releasing waiters
 
743
                enableWriting(true);
 
744
                result.setFailure(new InterruptedByTimeoutException());
 
745
            }
 
746
 
 
747
            // invoke handler without any locks
 
748
            Invoker.invoke(result);
 
749
        }
 
750
    }
 
751
 
 
752
    @Override
 
753
    <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
 
754
                                             ByteBuffer src,
 
755
                                             ByteBuffer[] srcs,
 
756
                                             long timeout,
 
757
                                             TimeUnit unit,
 
758
                                             A attachment,
 
759
                                             CompletionHandler<V,? super A> handler)
 
760
    {
 
761
        // setup task
 
762
        PendingFuture<V,A> result =
 
763
            new PendingFuture<V,A>(this, handler, attachment);
 
764
        ByteBuffer[] bufs;
 
765
        if (gatheringWrite) {
 
766
            bufs = srcs;
 
767
        } else {
 
768
            bufs = new ByteBuffer[1];
 
769
            bufs[0] = src;
 
770
        }
 
771
        final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
 
772
        result.setContext(writeTask);
 
773
 
 
774
        // schedule timeout
 
775
        if (timeout > 0L) {
 
776
            Future<?> timeoutTask = iocp.schedule(new Runnable() {
 
777
                public void run() {
 
778
                    writeTask.timeout();
 
779
                }
 
780
            }, timeout, unit);
 
781
            result.setTimeoutTask(timeoutTask);
 
782
        }
 
783
 
 
784
        // initiate I/O (can only be done from thread in thread pool)
 
785
        // initiate I/O
 
786
        if (Iocp.supportsThreadAgnosticIo()) {
 
787
            writeTask.run();
 
788
        } else {
 
789
            Invoker.invokeOnThreadInThreadPool(this, writeTask);
 
790
        }
 
791
        return result;
 
792
    }
 
793
 
 
794
    // -- Native methods --
 
795
 
 
796
    private static native void initIDs();
 
797
 
 
798
    private static native int connect0(FileDescriptor fd, boolean preferIPv6,
 
799
        InetAddress remote, int remotePort, Iocp.ResultHandler handler) throws IOException;
 
800
 
 
801
    private static native void updateConnectContext(FileDescriptor fd) throws IOException;
 
802
 
 
803
    private static native int read0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
 
804
        throws IOException;
 
805
 
 
806
    private static native int write0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
 
807
        throws IOException;
 
808
 
 
809
    private static native void shutdown0(long socket, int how) throws IOException;
 
810
 
 
811
    private static native void closesocket0(long socket) throws IOException;
 
812
 
 
813
    static {
 
814
        Util.load();
 
815
        initIDs();
 
816
    }
 
817
}