2
* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
28
import java.nio.channels.*;
29
import java.nio.ByteBuffer;
30
import java.nio.BufferOverflowException;
32
import java.util.concurrent.*;
33
import java.io.FileDescriptor;
34
import java.io.IOException;
37
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
40
class WindowsAsynchronousSocketChannelImpl
41
extends AsynchronousSocketChannelImpl
43
// maximum vector size for scatter/gather I/O
44
private static final int MAX_WSABUF = 16;
46
// I/O completion port that the socket is associated with
47
private final Iocp iocp;
50
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
58
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
63
public AsynchronousChannelGroupImpl group() {
67
// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
69
void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
70
synchronized (stateLock) {
72
this.localAddress = localAddress;
73
this.remoteAddress = remoteAddress;
78
void implClose() throws IOException {
79
// close socket (may cause outstanding async I/O operations to fail).
80
SocketDispatcher.closeImpl(fd);
84
public void onCancel(PendingFuture<?,?> task) {
85
if (task.getContext() instanceof ConnectTask)
87
if (task.getContext() instanceof ReadTask)
89
if (task.getContext() instanceof WriteTask)
94
* Implements the task to initiate a connection and the handler to
95
* consume the result when the connection is established (or fails).
97
private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
98
private final InetSocketAddress remote;
99
private final PendingFuture<Void,A> result;
101
ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
102
this.remote = remote;
103
this.result = result;
106
private void closeChannel() {
109
} catch (IOException ignore) { }
112
private IOException toIOException(Throwable x) {
113
if (x instanceof IOException) {
114
if (x instanceof ClosedChannelException)
115
x = new AsynchronousCloseException();
116
return (IOException)x;
118
return new IOException(x);
122
* Invoke after a connection is successfully established.
124
private void afterConnect() throws IOException {
125
updateConnectContext(fd);
126
synchronized (stateLock) {
127
state = ST_CONNECTED;
128
remoteAddress = remote;
133
* Task to initiate a connection.
137
Throwable exc = null;
141
// synchronize on result to allow this thread handle the case
142
// where the connection is established immediately.
143
synchronized (result) {
144
// initiate the connection
145
int n = connect0(fd, Net.isIPv6Available(), remote.getAddress(),
146
remote.getPort(), this);
147
if (n == IOStatus.UNAVAILABLE) {
148
// connection is pending
152
// connection established immediately
154
result.setResult(null);
156
} catch (Throwable x) {
164
result.setFailure(toIOException(exc));
166
Invoker.invoke(result);
170
* Invoked by handler thread when connection established.
173
public void completed(int bytesTransferred, boolean canInvokeDirect) {
174
Throwable exc = null;
178
result.setResult(null);
179
} catch (Throwable x) {
180
// channel is closed or unable to finish connect
186
// can't close channel while in begin/end block
189
result.setFailure(toIOException(exc));
192
if (canInvokeDirect) {
193
Invoker.invokeUnchecked(result);
195
Invoker.invoke(result);
200
* Invoked by handler thread when failed to establish connection.
203
public void failed(int error, IOException x) {
206
result.setFailure(x);
208
result.setFailure(new AsynchronousCloseException());
210
Invoker.invoke(result);
215
<A> Future<Void> implConnect(SocketAddress remote,
217
CompletionHandler<Void,? super A> handler)
220
Throwable exc = new ClosedChannelException();
222
return CompletedFuture.withFailure(exc);
223
Invoker.invoke(this, handler, attachment, null, exc);
227
InetSocketAddress isa = Net.checkAddress(remote);
230
SecurityManager sm = System.getSecurityManager();
232
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
234
// check and update state
235
// ConnectEx requires the socket to be bound to a local address
236
IOException bindException = null;
237
synchronized (stateLock) {
238
if (state == ST_CONNECTED)
239
throw new AlreadyConnectedException();
240
if (state == ST_PENDING)
241
throw new ConnectionPendingException();
242
if (localAddress == null) {
244
bind(new InetSocketAddress(0));
245
} catch (IOException x) {
249
if (bindException == null)
253
// handle bind failure
254
if (bindException != null) {
257
} catch (IOException ignore) { }
259
return CompletedFuture.withFailure(bindException);
260
Invoker.invoke(this, handler, attachment, null, bindException);
265
PendingFuture<Void,A> result =
266
new PendingFuture<Void,A>(this, handler, attachment);
267
ConnectTask task = new ConnectTask<A>(isa, result);
268
result.setContext(task);
271
if (Iocp.supportsThreadAgnosticIo()) {
274
Invoker.invokeOnThreadInThreadPool(this, task);
280
* Implements the task to initiate a read and the handler to consume the
281
* result when the read completes.
283
private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
284
private final ByteBuffer[] bufs;
285
private final int numBufs;
286
private final boolean scatteringRead;
287
private final PendingFuture<V,A> result;
290
private ByteBuffer[] shadow;
292
ReadTask(ByteBuffer[] bufs,
293
boolean scatteringRead,
294
PendingFuture<V,A> result)
297
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
298
this.scatteringRead = scatteringRead;
299
this.result = result;
303
* Invoked prior to read to prepare the WSABUF array. Where necessary,
304
* it substitutes direct buffers with managed buffers.
306
void prepareBuffers() {
307
shadow = new ByteBuffer[numBufs];
308
for (int i=0; i<numBufs; i++) {
309
ByteBuffer dst = bufs[i];
310
int pos = dst.position();
311
int lim = dst.limit();
313
int rem = (pos <= lim ? lim - pos : 0);
314
if (!dst.hasArray()) {
315
// substitute with direct buffer
316
ByteBuffer bb = ByteBuffer.allocate(rem);
325
* Invoked after a read has completed to update the buffer positions
326
* and release any substituted buffers.
328
void updateBuffers(int bytesRead) {
329
for (int i=0; i<numBufs; i++) {
330
ByteBuffer nextBuffer = shadow[i];
331
int pos = nextBuffer.position();
332
int len = nextBuffer.remaining();
333
if (bytesRead >= len) {
335
int newPosition = pos + len;
337
nextBuffer.position(newPosition);
338
} catch (IllegalArgumentException x) {
339
// position changed by another
341
} else { // Buffers not completely filled
343
assert(pos + bytesRead < (long)Integer.MAX_VALUE);
344
int newPosition = pos + bytesRead;
346
nextBuffer.position(newPosition);
347
} catch (IllegalArgumentException x) {
348
// position changed by another
355
// Put results from shadow into the slow buffers
356
for (int i=0; i<numBufs; i++) {
357
if (!bufs[i].hasArray()) {
360
bufs[i].put(shadow[i]);
361
} catch (BufferOverflowException x) {
362
// position changed by another
368
void releaseBuffers() {
372
@SuppressWarnings("unchecked")
374
boolean prepared = false;
375
boolean pending = false;
380
// substitute direct buffers
385
int n = read0(fd, shadow, this);
386
if (n == IOStatus.UNAVAILABLE) {
391
if (n == IOStatus.EOF) {
394
if (scatteringRead) {
395
result.setResult((V)Long.valueOf(-1L));
397
result.setResult((V)Integer.valueOf(-1));
400
// read completed immediately
408
if (scatteringRead) {
409
result.setResult((V)Long.valueOf(n));
411
result.setResult((V)Integer.valueOf(n));
413
} catch (Throwable x) {
414
// failed to initiate read
415
// reset read flag before releasing waiters
417
if (x instanceof ClosedChannelException)
418
x = new AsynchronousCloseException();
419
if (!(x instanceof IOException))
420
x = new IOException(x);
421
result.setFailure(x);
423
// release resources if I/O not pending
431
// invoke completion handler
432
Invoker.invoke(result);
436
* Executed when the I/O has completed
439
@SuppressWarnings("unchecked")
440
public void completed(int bytesTransferred, boolean canInvokeDirect) {
441
if (bytesTransferred == 0) {
442
bytesTransferred = -1; // EOF
444
updateBuffers(bytesTransferred);
447
// return direct buffer to cache if substituted
450
// release waiters if not already released by timeout
451
synchronized (result) {
455
if (scatteringRead) {
456
result.setResult((V)Long.valueOf(bytesTransferred));
458
result.setResult((V)Integer.valueOf(bytesTransferred));
461
if (canInvokeDirect) {
462
Invoker.invokeUnchecked(result);
464
Invoker.invoke(result);
469
public void failed(int error, IOException x) {
470
// return direct buffer to cache if substituted
473
// release waiters if not already released by timeout
475
x = new AsynchronousCloseException();
477
synchronized (result) {
481
result.setFailure(x);
483
Invoker.invoke(result);
487
* Invoked if timeout expires before it is cancelled
490
// synchronize on result as the I/O could complete/fail
491
synchronized (result) {
495
// kill further reading before releasing waiters
497
result.setFailure(new InterruptedByTimeoutException());
500
// invoke handler without any locks
501
Invoker.invoke(result);
506
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
512
CompletionHandler<V,? super A> handler)
515
PendingFuture<V,A> result =
516
new PendingFuture<V,A>(this, handler, attachment);
518
if (isScatteringRead) {
521
bufs = new ByteBuffer[1];
524
final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
525
result.setContext(readTask);
529
Future<?> timeoutTask = iocp.schedule(new Runnable() {
534
result.setTimeoutTask(timeoutTask);
538
if (Iocp.supportsThreadAgnosticIo()) {
541
Invoker.invokeOnThreadInThreadPool(this, readTask);
547
* Implements the task to initiate a write and the handler to consume the
548
* result when the write completes.
550
private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
551
private final ByteBuffer[] bufs;
552
private final int numBufs;
553
private final boolean gatheringWrite;
554
private final PendingFuture<V,A> result;
557
private ByteBuffer[] shadow;
559
WriteTask(ByteBuffer[] bufs,
560
boolean gatheringWrite,
561
PendingFuture<V,A> result)
564
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
565
this.gatheringWrite = gatheringWrite;
566
this.result = result;
570
* Invoked prior to write to prepare the WSABUF array. Where necessary,
571
* it substitutes direct buffers with managed buffers.
573
void prepareBuffers() {
574
shadow = new ByteBuffer[numBufs];
575
for (int i=0; i<numBufs; i++) {
576
ByteBuffer src = bufs[i];
577
int pos = src.position();
578
int lim = src.limit();
580
int rem = (pos <= lim ? lim - pos : 0);
581
if (!src.hasArray()) {
582
// substitute with direct buffer
583
ByteBuffer bb = ByteBuffer.allocate(rem);
586
src.position(pos); // leave heap buffer untouched for now
595
* Invoked after a write has completed to update the buffer positions
596
* and release any substituted buffers.
598
void updateBuffers(int bytesWritten) {
599
// Notify the buffers how many bytes were taken
600
for (int i=0; i<numBufs; i++) {
601
ByteBuffer nextBuffer = bufs[i];
602
int pos = nextBuffer.position();
603
int lim = nextBuffer.limit();
604
int len = (pos <= lim ? lim - pos : lim);
605
if (bytesWritten >= len) {
607
int newPosition = pos + len;
609
nextBuffer.position(newPosition);
610
} catch (IllegalArgumentException x) {
611
// position changed by someone else
613
} else { // Buffers not completely filled
614
if (bytesWritten > 0) {
615
assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
616
int newPosition = pos + bytesWritten;
618
nextBuffer.position(newPosition);
619
} catch (IllegalArgumentException x) {
620
// position changed by someone else
628
void releaseBuffers() {
632
//@SuppressWarnings("unchecked")
634
boolean prepared = false;
635
boolean pending = false;
636
boolean shutdown = false;
641
// substitute direct buffers
645
int n = write0(fd, shadow, this);
646
if (n == IOStatus.UNAVAILABLE) {
651
if (n == IOStatus.EOF) {
652
// special case for shutdown output
654
throw new ClosedChannelException();
656
// write completed immediately
660
if (gatheringWrite) {
661
result.setResult((V)Long.valueOf(n));
663
result.setResult((V)Integer.valueOf(n));
665
} catch (Throwable x) {
666
// write failed. Enable writing before releasing waiters.
668
if (!shutdown && (x instanceof ClosedChannelException))
669
x = new AsynchronousCloseException();
670
if (!(x instanceof IOException))
671
x = new IOException(x);
672
result.setFailure(x);
674
// release resources if I/O not pending
682
// invoke completion handler
683
Invoker.invoke(result);
687
* Executed when the I/O has completed
690
@SuppressWarnings("unchecked")
691
public void completed(int bytesTransferred, boolean canInvokeDirect) {
692
updateBuffers(bytesTransferred);
694
// return direct buffer to cache if substituted
697
// release waiters if not already released by timeout
698
synchronized (result) {
702
if (gatheringWrite) {
703
result.setResult((V)Long.valueOf(bytesTransferred));
705
result.setResult((V)Integer.valueOf(bytesTransferred));
708
if (canInvokeDirect) {
709
Invoker.invokeUnchecked(result);
711
Invoker.invoke(result);
716
public void failed(int error, IOException x) {
717
// return direct buffer to cache if substituted
720
// release waiters if not already released by timeout
722
x = new AsynchronousCloseException();
724
synchronized (result) {
728
result.setFailure(x);
730
Invoker.invoke(result);
734
* Invoked if timeout expires before it is cancelled
737
// synchronize on result as the I/O could complete/fail
738
synchronized (result) {
742
// kill further writing before releasing waiters
744
result.setFailure(new InterruptedByTimeoutException());
747
// invoke handler without any locks
748
Invoker.invoke(result);
753
<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
759
CompletionHandler<V,? super A> handler)
762
PendingFuture<V,A> result =
763
new PendingFuture<V,A>(this, handler, attachment);
765
if (gatheringWrite) {
768
bufs = new ByteBuffer[1];
771
final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
772
result.setContext(writeTask);
776
Future<?> timeoutTask = iocp.schedule(new Runnable() {
781
result.setTimeoutTask(timeoutTask);
784
// initiate I/O (can only be done from thread in thread pool)
786
if (Iocp.supportsThreadAgnosticIo()) {
789
Invoker.invokeOnThreadInThreadPool(this, writeTask);
794
// -- Native methods --
796
private static native void initIDs();
798
private static native int connect0(FileDescriptor fd, boolean preferIPv6,
799
InetAddress remote, int remotePort, Iocp.ResultHandler handler) throws IOException;
801
private static native void updateConnectContext(FileDescriptor fd) throws IOException;
803
private static native int read0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
806
private static native int write0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
809
private static native void shutdown0(long socket, int how) throws IOException;
811
private static native void closesocket0(long socket) throws IOException;