2
* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
28
import java.nio.channels.*;
29
import java.util.concurrent.*;
30
import java.nio.ByteBuffer;
31
import java.nio.BufferOverflowException;
32
import java.io.IOException;
33
import java.io.FileDescriptor;
34
import cli.System.AsyncCallback;
35
import cli.System.IAsyncResult;
36
import cli.System.IO.FileStream;
37
import cli.System.IO.SeekOrigin;
40
* Windows implementation of AsynchronousFileChannel using overlapped I/O.
43
public class WindowsAsynchronousFileChannelImpl
44
extends AsynchronousFileChannelImpl
47
// error when EOF is detected asynchronously.
48
private static final int ERROR_HANDLE_EOF = 38;
50
// Lazy initialization of default I/O completion port
51
private static class DefaultIocpHolder {
52
static final Iocp defaultIocp = defaultIocp();
53
private static Iocp defaultIocp() {
55
return new Iocp(null, ThreadPool.createDefault()).start();
56
} catch (IOException ioe) {
57
InternalError e = new InternalError();
64
// Used for force/truncate/size methods
65
private static final FileDispatcher nd = new FileDispatcherImpl();
67
// I/O completion port (group)
68
private final Iocp iocp;
70
private final boolean isDefaultIocp;
73
private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
77
boolean isDefaultIocp)
80
super(fdObj, reading, writing, iocp.executor());
82
this.isDefaultIocp = isDefaultIocp;
85
public static AsynchronousFileChannel open(FileDescriptor fdo,
92
boolean isDefaultIocp;
94
iocp = DefaultIocpHolder.defaultIocp;
97
iocp = new Iocp(null, pool).start();
98
isDefaultIocp = false;
102
WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
103
} catch (IOException x) {
104
// error binding to port so need to close it (if created for this channel)
112
public void close() throws IOException {
113
closeLock.writeLock().lock();
116
return; // already closed
119
closeLock.writeLock().unlock();
122
// invalidate all locks held for this channel
123
invalidateAllLocks();
128
// for the non-default group close the port
130
iocp.detachFromThreadPool();
134
public AsynchronousChannelGroupImpl group() {
139
* Translates Throwable to IOException
141
private static IOException toIOException(Throwable x) {
142
if (x instanceof cli.System.ArgumentException) {
143
return new IOException(x.getMessage());
145
if (x instanceof cli.System.IO.IOException) {
146
return new IOException(x.getMessage());
148
if (x instanceof IOException) {
149
if (x instanceof ClosedChannelException)
150
x = new AsynchronousCloseException();
151
return (IOException)x;
153
return new IOException(x);
157
public long size() throws IOException {
160
return nd.size(fdObj);
167
public AsynchronousFileChannel truncate(long size) throws IOException {
169
throw new IllegalArgumentException("Negative size");
171
throw new NonWritableChannelException();
174
if (size > nd.size(fdObj))
176
nd.truncate(fdObj, size);
184
public void force(boolean metaData) throws IOException {
187
nd.force(fdObj, metaData);
193
// -- file locking --
196
* Task that initiates locking operation and handles completion result.
198
private class LockTask<A> implements Runnable, Iocp.ResultHandler {
199
private final long position;
200
private final FileLockImpl fli;
201
private final PendingFuture<FileLock,A> result;
203
LockTask(long position,
205
PendingFuture<FileLock,A> result)
207
this.position = position;
209
this.result = result;
214
FileStream fs = (FileStream)fdObj.getStream();
220
if (false) throw new cli.System.IO.IOException();
221
fs.Lock(position, fli.size());
222
result.setResult(fli);
224
} catch (cli.System.IO.IOException _) {
225
// we failed to acquire the lock, try again next iteration
227
} catch (Throwable x) {
228
// lock failed or channel closed
229
removeFromFileLockTable(fli);
230
result.setFailure(toIOException(x));
234
cli.System.Threading.Thread.Sleep(100);
237
// invoke completion handler
238
Invoker.invoke(result);
242
public void completed(int bytesTransferred, boolean canInvokeDirect) {
243
// release waiters and invoke completion handler
244
result.setResult(fli);
245
if (canInvokeDirect) {
246
Invoker.invokeUnchecked(result);
248
Invoker.invoke(result);
253
public void failed(int error, IOException x) {
254
// lock not acquired so remove from lock table
255
removeFromFileLockTable(fli);
259
result.setFailure(x);
261
result.setFailure(new AsynchronousCloseException());
263
Invoker.invoke(result);
268
<A> Future<FileLock> implLock(final long position,
270
final boolean shared,
272
final CompletionHandler<FileLock,? super A> handler)
274
if (shared && !reading)
275
throw new NonReadableChannelException();
276
if (!shared && !writing)
277
throw new NonWritableChannelException();
280
FileLockImpl fli = addToFileLockTable(position, size, shared);
282
Throwable exc = new ClosedChannelException();
284
return CompletedFuture.withFailure(exc);
285
Invoker.invoke(this, handler, attachment, null, exc);
289
// create Future and task that will be invoked to acquire lock
290
PendingFuture<FileLock,A> result =
291
new PendingFuture<FileLock,A>(this, handler, attachment);
292
LockTask lockTask = new LockTask<A>(position, fli, result);
293
result.setContext(lockTask);
299
boolean executed = false;
301
Invoker.invokeOnThreadInThreadPool(this, lockTask);
306
removeFromFileLockTable(fli);
313
static final int NO_LOCK = -1; // Failed to lock
314
static final int LOCKED = 0; // Obtained requested lock
317
public FileLock tryLock(long position, long size, boolean shared)
320
if (shared && !reading)
321
throw new NonReadableChannelException();
322
if (!shared && !writing)
323
throw new NonWritableChannelException();
326
final FileLockImpl fli = addToFileLockTable(position, size, shared);
328
throw new ClosedChannelException();
330
boolean gotLock = false;
333
// try to acquire the lock
336
if (false) throw new cli.System.IO.IOException();
337
FileStream fs = (FileStream)fdObj.getStream();
338
fs.Lock(position, size);
340
} catch (cli.System.IO.IOException _) {
349
removeFromFileLockTable(fli);
355
protected void implRelease(FileLockImpl fli) throws IOException {
357
if (false) throw new cli.System.IO.IOException();
358
FileStream fs = (FileStream)fdObj.getStream();
359
fs.Unlock(fli.position(), fli.size());
360
} catch (cli.System.IO.IOException x) {
361
if (!FileDispatcherImpl.NotLockedHack.isErrorNotLocked(x)) {
362
throw new IOException(x.getMessage());
368
* Task that initiates read operation and handles completion result.
370
private class ReadTask<A> implements Runnable, Iocp.ResultHandler, AsyncCallback.Method {
371
private final ByteBuffer dst;
372
private final int pos, rem; // buffer position/remaining
373
private final long position; // file position
374
private final PendingFuture<Integer,A> result;
376
// set to dst if direct; otherwise set to substituted direct buffer
377
private volatile ByteBuffer buf;
379
ReadTask(ByteBuffer dst,
383
PendingFuture<Integer,A> result)
388
this.position = position;
389
this.result = result;
392
void updatePosition(int bytesTransferred) {
393
// if the I/O succeeded then adjust buffer position
394
if (bytesTransferred > 0) {
397
dst.position(pos + bytesTransferred);
398
} catch (IllegalArgumentException x) {
399
// someone has changed the position; ignore
402
// had to substitute direct buffer
403
buf.position(bytesTransferred).flip();
406
} catch (BufferOverflowException x) {
407
// someone has changed the position; ignore
415
// Substitute an array backed buffer if not
416
if (dst.hasArray()) {
419
buf = ByteBuffer.allocate(rem);
426
FileStream fs = (FileStream)fdObj.getStream();
427
fs.Seek(position, SeekOrigin.wrap(SeekOrigin.Begin));
428
fs.BeginRead(buf.array(), buf.arrayOffset() + pos, rem, new AsyncCallback(this), null);
431
} catch (Throwable x) {
432
// failed to initiate read
433
result.setFailure(toIOException(x));
438
// invoke completion handler
439
Invoker.invoke(result);
442
public void Invoke(IAsyncResult ar) {
444
FileStream fs = (FileStream)fdObj.getStream();
445
completed(fs.EndRead(ar), false);
446
} catch (Throwable x) {
447
failed(0, toIOException(x));
452
* Executed when the I/O has completed
455
public void completed(int bytesTransferred, boolean canInvokeDirect) {
456
updatePosition(bytesTransferred);
458
// release waiters and invoke completion handler
459
result.setResult(bytesTransferred);
460
if (canInvokeDirect) {
461
Invoker.invokeUnchecked(result);
463
Invoker.invoke(result);
468
public void failed(int error, IOException x) {
469
// if EOF detected asynchronously then it is reported as error
470
if (error == ERROR_HANDLE_EOF) {
471
completed(-1, false);
475
result.setFailure(x);
477
result.setFailure(new AsynchronousCloseException());
479
Invoker.invoke(result);
485
<A> Future<Integer> implRead(ByteBuffer dst,
488
CompletionHandler<Integer,? super A> handler)
491
throw new NonReadableChannelException();
493
throw new IllegalArgumentException("Negative position");
494
if (dst.isReadOnly())
495
throw new IllegalArgumentException("Read-only buffer");
497
// check if channel is closed
499
Throwable exc = new ClosedChannelException();
501
return CompletedFuture.withFailure(exc);
502
Invoker.invoke(this, handler, attachment, null, exc);
506
int pos = dst.position();
507
int lim = dst.limit();
509
int rem = (pos <= lim ? lim - pos : 0);
511
// no space remaining
514
return CompletedFuture.withResult(0);
515
Invoker.invoke(this, handler, attachment, 0, null);
519
// create Future and task that initiates read
520
PendingFuture<Integer,A> result =
521
new PendingFuture<Integer,A>(this, handler, attachment);
522
ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
523
result.setContext(readTask);
526
if (Iocp.supportsThreadAgnosticIo()) {
529
Invoker.invokeOnThreadInThreadPool(this, readTask);
535
* Task that initiates write operation and handles completion result.
537
private class WriteTask<A> implements Runnable, Iocp.ResultHandler, AsyncCallback.Method {
538
private final ByteBuffer src;
539
private final int pos, rem; // buffer position/remaining
540
private final long position; // file position
541
private final PendingFuture<Integer,A> result;
543
// set to src if direct; otherwise set to substituted direct buffer
544
private volatile ByteBuffer buf;
546
WriteTask(ByteBuffer src,
550
PendingFuture<Integer,A> result)
555
this.position = position;
556
this.result = result;
559
void updatePosition(int bytesTransferred) {
560
// if the I/O succeeded then adjust buffer position
561
if (bytesTransferred > 0) {
563
src.position(pos + bytesTransferred);
564
} catch (IllegalArgumentException x) {
565
// someone has changed the position
572
// Substitute an array backed buffer if not
573
if (src.hasArray()) {
576
buf = ByteBuffer.allocate(rem);
579
// temporarily restore position as we don't know how many bytes
587
// initiate the write
588
FileStream fs = (FileStream)fdObj.getStream();
589
fs.Seek(position, SeekOrigin.wrap(SeekOrigin.Begin));
590
fs.BeginWrite(buf.array(), buf.arrayOffset() + pos, rem, new AsyncCallback(this), null);
593
} catch (Throwable x) {
594
// failed to initiate read:
595
result.setFailure(toIOException(x));
601
// invoke completion handler
602
Invoker.invoke(result);
605
public void Invoke(IAsyncResult ar) {
607
FileStream fs = (FileStream)fdObj.getStream();
609
completed(rem, false);
610
} catch (Throwable x) {
611
failed(0, toIOException(x));
616
* Executed when the I/O has completed
619
public void completed(int bytesTransferred, boolean canInvokeDirect) {
620
updatePosition(bytesTransferred);
622
// release waiters and invoke completion handler
623
result.setResult(bytesTransferred);
624
if (canInvokeDirect) {
625
Invoker.invokeUnchecked(result);
627
Invoker.invoke(result);
632
public void failed(int error, IOException x) {
633
// release waiters and invoker completion handler
635
result.setFailure(x);
637
result.setFailure(new AsynchronousCloseException());
639
Invoker.invoke(result);
643
<A> Future<Integer> implWrite(ByteBuffer src,
646
CompletionHandler<Integer,? super A> handler)
649
throw new NonWritableChannelException();
651
throw new IllegalArgumentException("Negative position");
653
// check if channel is closed
655
Throwable exc = new ClosedChannelException();
657
return CompletedFuture.withFailure(exc);
658
Invoker.invoke(this, handler, attachment, null, exc);
662
int pos = src.position();
663
int lim = src.limit();
665
int rem = (pos <= lim ? lim - pos : 0);
670
return CompletedFuture.withResult(0);
671
Invoker.invoke(this, handler, attachment, 0, null);
675
// create Future and task to initiate write
676
PendingFuture<Integer,A> result =
677
new PendingFuture<Integer,A>(this, handler, attachment);
678
WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
679
result.setContext(writeTask);
682
if (Iocp.supportsThreadAgnosticIo()) {
685
Invoker.invokeOnThreadInThreadPool(this, writeTask);