1
package org.apache.lucene.util;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
19
import java.io.IOException;
21
import org.apache.lucene.store.DataInput;
22
import org.apache.lucene.store.IndexOutput;
24
public class ThrottledIndexOutput extends IndexOutput {
25
public static final int DEFAULT_MIN_WRITTEN_BYTES = 1024;
26
private final int bytesPerSecond;
27
private IndexOutput delegate;
28
private long flushDelayMillis;
29
private long closeDelayMillis;
30
private long seekDelayMillis;
31
private long pendingBytes;
32
private long minBytesWritten;
33
private long timeElapsed;
34
private final byte[] bytes = new byte[1];
36
public ThrottledIndexOutput newFromDelegate(IndexOutput output) {
37
return new ThrottledIndexOutput(bytesPerSecond, flushDelayMillis,
38
closeDelayMillis, seekDelayMillis, minBytesWritten, output);
41
public ThrottledIndexOutput(int bytesPerSecond, long delayInMillis,
42
IndexOutput delegate) {
43
this(bytesPerSecond, delayInMillis, delayInMillis, delayInMillis,
44
DEFAULT_MIN_WRITTEN_BYTES, delegate);
47
public ThrottledIndexOutput(int bytesPerSecond, long delays,
48
int minBytesWritten, IndexOutput delegate) {
49
this(bytesPerSecond, delays, delays, delays, minBytesWritten, delegate);
52
public static final int mBitsToBytes(int mbits) {
53
return mbits * 125000;
56
public ThrottledIndexOutput(int bytesPerSecond, long flushDelayMillis,
57
long closeDelayMillis, long seekDelayMillis, long minBytesWritten,
58
IndexOutput delegate) {
59
assert bytesPerSecond > 0;
60
this.delegate = delegate;
61
this.bytesPerSecond = bytesPerSecond;
62
this.flushDelayMillis = flushDelayMillis;
63
this.closeDelayMillis = closeDelayMillis;
64
this.seekDelayMillis = seekDelayMillis;
65
this.minBytesWritten = minBytesWritten;
69
public void flush() throws IOException {
70
sleep(flushDelayMillis);
75
public void close() throws IOException {
77
sleep(closeDelayMillis + getDelay(true));
84
public long getFilePointer() {
85
return delegate.getFilePointer();
89
public void seek(long pos) throws IOException {
90
sleep(seekDelayMillis);
95
public long length() throws IOException {
96
return delegate.length();
100
public void writeByte(byte b) throws IOException {
102
writeBytes(bytes, 0, 1);
106
public void writeBytes(byte[] b, int offset, int length) throws IOException {
107
final long before = System.nanoTime();
108
delegate.writeBytes(b, offset, length);
109
timeElapsed += System.nanoTime() - before;
110
pendingBytes += length;
111
sleep(getDelay(false));
115
protected long getDelay(boolean closing) {
116
if (pendingBytes > 0 && (closing || pendingBytes > minBytesWritten)) {
117
long actualBps = (timeElapsed / pendingBytes) * 1000000000l; // nano to sec
118
if (actualBps > bytesPerSecond) {
119
long expected = (pendingBytes * 1000l / bytesPerSecond) ;
120
final long delay = expected - (timeElapsed / 1000000l) ;
130
private static final void sleep(long ms) {
135
} catch (InterruptedException e) {
136
throw new ThreadInterruptedException(e);
141
public void setLength(long length) throws IOException {
142
delegate.setLength(length);
146
public void copyBytes(DataInput input, long numBytes) throws IOException {
147
delegate.copyBytes(input, numBytes);