~slub.team/goobi-indexserver/3.x

« back to all changes in this revision

Viewing changes to lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java

  • Committer: Sebastian Meyer
  • Date: 2012-08-03 09:12:40 UTC
  • Revision ID: sebastian.meyer@slub-dresden.de-20120803091240-x6861b0vabq1xror
Remove Lucene and Solr source code and add patches instead
Fix Bug #985487: Auto-suggestion for the search interface

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
package org.apache.lucene.store;
2
 
 
3
 
/**
4
 
 * Licensed to the Apache Software Foundation (ASF) under one or more
5
 
 * contributor license agreements. See the NOTICE file distributed with this
6
 
 * work for additional information regarding copyright ownership. The ASF
7
 
 * licenses this file to You under the Apache License, Version 2.0 (the
8
 
 * "License"); you may not use this file except in compliance with the License.
9
 
 * You may obtain a copy of the License at
10
 
 * 
11
 
 * http://www.apache.org/licenses/LICENSE-2.0
12
 
 * 
13
 
 * Unless required by applicable law or agreed to in writing, software
14
 
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
 * License for the specific language governing permissions and limitations under
17
 
 * the License.
18
 
 */
19
 
 
20
 
import java.io.File;
21
 
import java.io.IOException;
22
 
import java.io.FileInputStream;
23
 
import java.io.FileDescriptor;
24
 
import java.io.FileOutputStream;
25
 
import java.nio.ByteBuffer;
26
 
import java.nio.channels.FileChannel;
27
 
 
28
 
import org.apache.lucene.store.Directory; // javadoc
29
 
import org.apache.lucene.store.NativeFSLockFactory; // javadoc
30
 
 
31
 
/**
32
 
 * An {@link Directory} implementation that uses the
33
 
 * Linux-specific O_DIRECT flag to bypass all OS level
34
 
 * caching.  To use this you must compile
35
 
 * NativePosixUtil.cpp (exposes Linux-specific APIs through
36
 
 * JNI) for your platform.
37
 
 *
38
 
 * <p><b>WARNING</b>: this code is very new and quite easily
39
 
 * could contain horrible bugs.  For example, here's one
40
 
 * known issue: if you use seek in IndexOutput, and then
41
 
 * write more than one buffer's worth of bytes, then the
42
 
 * file will be wrong.  Lucene does not do this (only writes
43
 
 * small number of bytes after seek).
44
 
 
45
 
 * @lucene.experimental
46
 
 */
47
 
public class DirectIOLinuxDirectory extends FSDirectory {
48
 
 
49
 
  private final static long ALIGN = 512;
50
 
  private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
51
 
 
52
 
  private final int forcedBufferSize;
53
 
 
54
 
  /** Create a new NIOFSDirectory for the named location.
55
 
   * 
56
 
   * @param path the path of the directory
57
 
   * @param lockFactory the lock factory to use, or null for the default
58
 
   * ({@link NativeFSLockFactory});
59
 
   * @param forcedBufferSize if this is 0, just use Lucene's
60
 
   *    default buffer size; else, force this buffer size.
61
 
   *    For best performance, force the buffer size to
62
 
   *    something fairly large (eg 1 MB), but note that this
63
 
   *    will eat up the JRE's direct buffer storage space
64
 
   * @throws IOException
65
 
   */
66
 
  public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
67
 
    super(path, lockFactory);
68
 
    this.forcedBufferSize = forcedBufferSize;
69
 
  }
70
 
 
71
 
  @Override
72
 
  public IndexInput openInput(String name, int bufferSize) throws IOException {
73
 
    ensureOpen();
74
 
    return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
75
 
  }
76
 
 
77
 
  @Override
78
 
  public IndexOutput createOutput(String name) throws IOException {
79
 
    ensureOpen();
80
 
    ensureCanWrite(name);
81
 
    return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
82
 
  }
83
 
 
84
 
  private final static class DirectIOLinuxIndexOutput extends IndexOutput {
85
 
    private final ByteBuffer buffer;
86
 
    private final FileOutputStream fos;
87
 
    private final FileChannel channel;
88
 
    private final int bufferSize;
89
 
 
90
 
    //private final File path;
91
 
 
92
 
    private int bufferPos;
93
 
    private long filePos;
94
 
    private long fileLength;
95
 
    private boolean isOpen;
96
 
 
97
 
    public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
98
 
      //this.path = path;
99
 
      FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
100
 
      fos = new FileOutputStream(fd);
101
 
      //fos = new FileOutputStream(path);
102
 
      channel = fos.getChannel();
103
 
      buffer = ByteBuffer.allocateDirect(bufferSize);
104
 
      this.bufferSize = bufferSize;
105
 
      isOpen = true;
106
 
    }
107
 
 
108
 
    @Override
109
 
    public void writeByte(byte b) throws IOException {
110
 
      assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
111
 
      buffer.put(b);
112
 
      if (++bufferPos == bufferSize) {
113
 
        dump();
114
 
      }
115
 
    }
116
 
 
117
 
    @Override
118
 
    public void writeBytes(byte[] src, int offset, int len) throws IOException {
119
 
      int toWrite = len;
120
 
      while(true) {
121
 
        final int left = bufferSize - bufferPos;
122
 
        if (left <= toWrite) {
123
 
          buffer.put(src, offset, left);
124
 
          toWrite -= left;
125
 
          offset += left;
126
 
          bufferPos = bufferSize;
127
 
          dump();
128
 
        } else {
129
 
          buffer.put(src, offset, toWrite);
130
 
          bufferPos += toWrite;
131
 
          break;
132
 
        }
133
 
      }
134
 
    }
135
 
 
136
 
    //@Override
137
 
    //public void setLength() throws IOException {
138
 
    //   TODO -- how to impl this?  neither FOS nor
139
 
    //   FileChannel provides an API?
140
 
    //}
141
 
 
142
 
    @Override
143
 
    public void flush() throws IOException {
144
 
      // TODO -- I don't think this method is necessary?
145
 
    }
146
 
 
147
 
    private void dump() throws IOException {
148
 
      buffer.flip();
149
 
      final long limit = filePos + buffer.limit();
150
 
      if (limit > fileLength) {
151
 
        // this dump extends the file
152
 
        fileLength = limit;
153
 
      } else {
154
 
        // we had seek'd back & wrote some changes
155
 
      }
156
 
 
157
 
      // must always round to next block
158
 
      buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
159
 
 
160
 
      assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
161
 
      assert (filePos & ALIGN_NOT_MASK) == filePos;
162
 
      //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
163
 
      channel.write(buffer, filePos);
164
 
      filePos += bufferPos;
165
 
      bufferPos = 0;
166
 
      buffer.clear();
167
 
      //System.out.println("dump: done");
168
 
 
169
 
      // TODO: the case where we'd seek'd back, wrote an
170
 
      // entire buffer, we must here read the next buffer;
171
 
      // likely Lucene won't trip on this since we only
172
 
      // write smallish amounts on seeking back
173
 
    }
174
 
 
175
 
    @Override
176
 
    public long getFilePointer() {
177
 
      return filePos + bufferPos;
178
 
    }
179
 
 
180
 
    // TODO: seek is fragile at best; it can only properly
181
 
    // handle seek & then change bytes that fit entirely
182
 
    // within one buffer
183
 
    @Override
184
 
    public void seek(long pos) throws IOException {
185
 
      if (pos != getFilePointer()) {
186
 
        dump();
187
 
        final long alignedPos = pos & ALIGN_NOT_MASK;
188
 
        filePos = alignedPos;
189
 
        int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
190
 
        if (n < bufferSize) {
191
 
          buffer.limit(n);
192
 
        }
193
 
        //System.out.println("seek refill=" + n);
194
 
        final int delta = (int) (pos - alignedPos);
195
 
        buffer.position(delta);
196
 
        bufferPos = delta;
197
 
      }
198
 
    }
199
 
 
200
 
    @Override
201
 
    public long length() throws IOException {
202
 
      return fileLength;
203
 
    }
204
 
 
205
 
    @Override
206
 
    public void close() throws IOException {
207
 
      if (isOpen) {
208
 
        isOpen = false;
209
 
        try {
210
 
          dump();
211
 
        } finally {
212
 
          try {
213
 
            //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
214
 
            channel.truncate(fileLength);
215
 
            //System.out.println("  now: " + channel.size());
216
 
          } finally {
217
 
            try {
218
 
              channel.close();
219
 
            } finally {
220
 
              fos.close();
221
 
              //System.out.println("  final len=" + path.length());
222
 
            }
223
 
          }
224
 
        }
225
 
      }
226
 
    }
227
 
  }
228
 
 
229
 
  private final static class DirectIOLinuxIndexInput extends IndexInput {
230
 
    private final ByteBuffer buffer;
231
 
    private final FileInputStream fis;
232
 
    private final FileChannel channel;
233
 
    private final int bufferSize;
234
 
 
235
 
    private boolean isOpen;
236
 
    private boolean isClone;
237
 
    private long filePos;
238
 
    private int bufferPos;
239
 
 
240
 
    public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
241
 
      super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
242
 
      FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
243
 
      fis = new FileInputStream(fd);
244
 
      channel = fis.getChannel();
245
 
      this.bufferSize = bufferSize;
246
 
      buffer = ByteBuffer.allocateDirect(bufferSize);
247
 
      isOpen = true;
248
 
      isClone = false;
249
 
      filePos = -bufferSize;
250
 
      bufferPos = bufferSize;
251
 
      //System.out.println("D open " + path + " this=" + this);
252
 
    }
253
 
 
254
 
    // for clone
255
 
    public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
256
 
      super(other.toString());
257
 
      this.fis = null;
258
 
      channel = other.channel;
259
 
      this.bufferSize = other.bufferSize;
260
 
      buffer = ByteBuffer.allocateDirect(bufferSize);
261
 
      filePos = -bufferSize;
262
 
      bufferPos = bufferSize;
263
 
      isOpen = true;
264
 
      isClone = true;
265
 
      //System.out.println("D clone this=" + this);
266
 
      seek(other.getFilePointer());
267
 
    }
268
 
 
269
 
    @Override
270
 
    public void close() throws IOException {
271
 
      if (isOpen && !isClone) {
272
 
        try {
273
 
          channel.close();
274
 
        } finally {
275
 
          if (!isClone) {
276
 
            fis.close();
277
 
          }
278
 
        }
279
 
      }
280
 
    }
281
 
 
282
 
    @Override
283
 
    public long getFilePointer() {
284
 
      return filePos + bufferPos;
285
 
    }
286
 
 
287
 
    @Override
288
 
    public void seek(long pos) throws IOException {
289
 
      if (pos != getFilePointer()) {
290
 
        final long alignedPos = pos & ALIGN_NOT_MASK;
291
 
        //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
292
 
        filePos = alignedPos-bufferSize;
293
 
        refill();
294
 
        
295
 
        final int delta = (int) (pos - alignedPos);
296
 
        buffer.position(delta);
297
 
        bufferPos = delta;
298
 
      }
299
 
    }
300
 
 
301
 
    @Override
302
 
    public long length() {
303
 
      try {
304
 
        return channel.size();
305
 
      } catch (IOException ioe) {
306
 
        throw new RuntimeException("IOException during length(): " + this, ioe);
307
 
      }
308
 
    }
309
 
 
310
 
    @Override
311
 
    public byte readByte() throws IOException {
312
 
      // NOTE: we don't guard against EOF here... ie the
313
 
      // "final" buffer will typically be filled to less
314
 
      // than bufferSize
315
 
      if (bufferPos == bufferSize) {
316
 
        refill();
317
 
      }
318
 
      assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
319
 
      bufferPos++;
320
 
      return buffer.get();
321
 
    }
322
 
 
323
 
    private void refill() throws IOException {
324
 
      buffer.clear();
325
 
      filePos += bufferSize;
326
 
      bufferPos = 0;
327
 
      assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
328
 
      //System.out.println("X refill filePos=" + filePos);
329
 
      int n;
330
 
      try {
331
 
        n = channel.read(buffer, filePos);
332
 
      } catch (IOException ioe) {
333
 
        IOException newIOE = new IOException(ioe.getMessage() + ": " + this);
334
 
        newIOE.initCause(ioe);
335
 
        throw newIOE;
336
 
      }
337
 
      if (n < 0) {
338
 
        throw new IOException("eof: " + this);
339
 
      }
340
 
      buffer.rewind();
341
 
    }
342
 
 
343
 
    @Override
344
 
    public void readBytes(byte[] dst, int offset, int len) throws IOException {
345
 
      int toRead = len;
346
 
      //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
347
 
      while(true) {
348
 
        final int left = bufferSize - bufferPos;
349
 
        if (left < toRead) {
350
 
          //System.out.println("  copy " + left);
351
 
          buffer.get(dst, offset, left);
352
 
          toRead -= left;
353
 
          offset += left;
354
 
          refill();
355
 
        } else {
356
 
          //System.out.println("  copy " + toRead);
357
 
          buffer.get(dst, offset, toRead);
358
 
          bufferPos += toRead;
359
 
          //System.out.println("  readBytes done");
360
 
          break;
361
 
        }
362
 
      }
363
 
    }
364
 
 
365
 
    @Override
366
 
    public Object clone() {
367
 
      try {
368
 
        return new DirectIOLinuxIndexInput(this);
369
 
      } catch (IOException ioe) {
370
 
        throw new RuntimeException("IOException during clone: " + this, ioe);
371
 
      }
372
 
    }
373
 
  }
374
 
}