1
package org.apache.lucene.store;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with this
6
* work for additional information regarding copyright ownership. The ASF
7
* licenses this file to You under the Apache License, Version 2.0 (the
8
* "License"); you may not use this file except in compliance with the License.
9
* You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
* License for the specific language governing permissions and limitations under
21
import java.io.IOException;
22
import java.io.FileInputStream;
23
import java.io.FileDescriptor;
24
import java.io.FileOutputStream;
25
import java.nio.ByteBuffer;
26
import java.nio.channels.FileChannel;
28
import org.apache.lucene.store.Directory; // javadoc
29
import org.apache.lucene.store.NativeFSLockFactory; // javadoc
32
* An {@link Directory} implementation that uses the
33
* Linux-specific O_DIRECT flag to bypass all OS level
34
* caching. To use this you must compile
35
* NativePosixUtil.cpp (exposes Linux-specific APIs through
36
* JNI) for your platform.
38
* <p><b>WARNING</b>: this code is very new and quite easily
39
* could contain horrible bugs. For example, here's one
40
* known issue: if you use seek in IndexOutput, and then
41
* write more than one buffer's worth of bytes, then the
42
* file will be wrong. Lucene does not do this (only writes
43
* small number of bytes after seek).
45
* @lucene.experimental
47
public class DirectIOLinuxDirectory extends FSDirectory {
49
private final static long ALIGN = 512;
50
private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
52
private final int forcedBufferSize;
54
/** Create a new NIOFSDirectory for the named location.
56
* @param path the path of the directory
57
* @param lockFactory the lock factory to use, or null for the default
58
* ({@link NativeFSLockFactory});
59
* @param forcedBufferSize if this is 0, just use Lucene's
60
* default buffer size; else, force this buffer size.
61
* For best performance, force the buffer size to
62
* something fairly large (eg 1 MB), but note that this
63
* will eat up the JRE's direct buffer storage space
66
public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
67
super(path, lockFactory);
68
this.forcedBufferSize = forcedBufferSize;
72
public IndexInput openInput(String name, int bufferSize) throws IOException {
74
return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
78
public IndexOutput createOutput(String name) throws IOException {
81
return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
84
private final static class DirectIOLinuxIndexOutput extends IndexOutput {
85
private final ByteBuffer buffer;
86
private final FileOutputStream fos;
87
private final FileChannel channel;
88
private final int bufferSize;
90
//private final File path;
92
private int bufferPos;
94
private long fileLength;
95
private boolean isOpen;
97
public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
99
FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
100
fos = new FileOutputStream(fd);
101
//fos = new FileOutputStream(path);
102
channel = fos.getChannel();
103
buffer = ByteBuffer.allocateDirect(bufferSize);
104
this.bufferSize = bufferSize;
109
public void writeByte(byte b) throws IOException {
110
assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
112
if (++bufferPos == bufferSize) {
118
public void writeBytes(byte[] src, int offset, int len) throws IOException {
121
final int left = bufferSize - bufferPos;
122
if (left <= toWrite) {
123
buffer.put(src, offset, left);
126
bufferPos = bufferSize;
129
buffer.put(src, offset, toWrite);
130
bufferPos += toWrite;
137
//public void setLength() throws IOException {
138
// TODO -- how to impl this? neither FOS nor
139
// FileChannel provides an API?
143
public void flush() throws IOException {
144
// TODO -- I don't think this method is necessary?
147
private void dump() throws IOException {
149
final long limit = filePos + buffer.limit();
150
if (limit > fileLength) {
151
// this dump extends the file
154
// we had seek'd back & wrote some changes
157
// must always round to next block
158
buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
160
assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
161
assert (filePos & ALIGN_NOT_MASK) == filePos;
162
//System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
163
channel.write(buffer, filePos);
164
filePos += bufferPos;
167
//System.out.println("dump: done");
169
// TODO: the case where we'd seek'd back, wrote an
170
// entire buffer, we must here read the next buffer;
171
// likely Lucene won't trip on this since we only
172
// write smallish amounts on seeking back
176
public long getFilePointer() {
177
return filePos + bufferPos;
180
// TODO: seek is fragile at best; it can only properly
181
// handle seek & then change bytes that fit entirely
184
public void seek(long pos) throws IOException {
185
if (pos != getFilePointer()) {
187
final long alignedPos = pos & ALIGN_NOT_MASK;
188
filePos = alignedPos;
189
int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
190
if (n < bufferSize) {
193
//System.out.println("seek refill=" + n);
194
final int delta = (int) (pos - alignedPos);
195
buffer.position(delta);
201
public long length() throws IOException {
206
public void close() throws IOException {
213
//System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
214
channel.truncate(fileLength);
215
//System.out.println(" now: " + channel.size());
221
//System.out.println(" final len=" + path.length());
229
private final static class DirectIOLinuxIndexInput extends IndexInput {
230
private final ByteBuffer buffer;
231
private final FileInputStream fis;
232
private final FileChannel channel;
233
private final int bufferSize;
235
private boolean isOpen;
236
private boolean isClone;
237
private long filePos;
238
private int bufferPos;
240
public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
241
super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
242
FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
243
fis = new FileInputStream(fd);
244
channel = fis.getChannel();
245
this.bufferSize = bufferSize;
246
buffer = ByteBuffer.allocateDirect(bufferSize);
249
filePos = -bufferSize;
250
bufferPos = bufferSize;
251
//System.out.println("D open " + path + " this=" + this);
255
public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
256
super(other.toString());
258
channel = other.channel;
259
this.bufferSize = other.bufferSize;
260
buffer = ByteBuffer.allocateDirect(bufferSize);
261
filePos = -bufferSize;
262
bufferPos = bufferSize;
265
//System.out.println("D clone this=" + this);
266
seek(other.getFilePointer());
270
public void close() throws IOException {
271
if (isOpen && !isClone) {
283
public long getFilePointer() {
284
return filePos + bufferPos;
288
public void seek(long pos) throws IOException {
289
if (pos != getFilePointer()) {
290
final long alignedPos = pos & ALIGN_NOT_MASK;
291
//System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
292
filePos = alignedPos-bufferSize;
295
final int delta = (int) (pos - alignedPos);
296
buffer.position(delta);
302
public long length() {
304
return channel.size();
305
} catch (IOException ioe) {
306
throw new RuntimeException("IOException during length(): " + this, ioe);
311
public byte readByte() throws IOException {
312
// NOTE: we don't guard against EOF here... ie the
313
// "final" buffer will typically be filled to less
315
if (bufferPos == bufferSize) {
318
assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
323
private void refill() throws IOException {
325
filePos += bufferSize;
327
assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
328
//System.out.println("X refill filePos=" + filePos);
331
n = channel.read(buffer, filePos);
332
} catch (IOException ioe) {
333
IOException newIOE = new IOException(ioe.getMessage() + ": " + this);
334
newIOE.initCause(ioe);
338
throw new IOException("eof: " + this);
344
public void readBytes(byte[] dst, int offset, int len) throws IOException {
346
//System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
348
final int left = bufferSize - bufferPos;
350
//System.out.println(" copy " + left);
351
buffer.get(dst, offset, left);
356
//System.out.println(" copy " + toRead);
357
buffer.get(dst, offset, toRead);
359
//System.out.println(" readBytes done");
366
public Object clone() {
368
return new DirectIOLinuxIndexInput(this);
369
} catch (IOException ioe) {
370
throw new RuntimeException("IOException during clone: " + this, ioe);