3
* Licensed to the Apache Software Foundation (ASF) under one
4
* or more contributor license agreements. See the NOTICE file
5
* distributed with this work for additional information
6
* regarding copyright ownership. The ASF licenses this file
7
* to you under the Apache License, Version 2.0 (the
8
* "License"); you may not use this file except in compliance
9
* with the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing,
14
* software distributed under the License is distributed on an
15
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
* KIND, either express or implied. See the License for the
17
* specific language governing permissions and limitations
22
package org.apache.bookkeeper.bookie;
24
import java.io.BufferedReader;
25
import java.io.BufferedWriter;
27
import java.io.FileInputStream;
28
import java.io.FileNotFoundException;
29
import java.io.FileOutputStream;
30
import java.io.IOException;
31
import java.io.InputStreamReader;
32
import java.io.OutputStreamWriter;
33
import java.io.RandomAccessFile;
34
import java.nio.ByteBuffer;
35
import java.nio.channels.FileChannel;
36
import java.util.Arrays;
37
import java.util.Collections;
38
import java.util.List;
39
import java.util.concurrent.ConcurrentHashMap;
41
import org.apache.log4j.Logger;
44
* This class manages the writing of the bookkeeper entries. All the new
45
* entries are written to a common log. The LedgerCache will have pointers
46
* into files created by this class with offsets into the files to find
47
* the actual ledger entry. The entry log files created by this class are
48
* identified by a long.
50
public class EntryLogger {
51
private static final Logger LOG = Logger.getLogger(EntryLogger.class);
55
* The maximum size of a entry logger file.
57
final static long LOG_SIZE_LIMIT = 2*1024*1024*1024L;
58
private volatile BufferedChannel logChannel;
59
// The ledgers contained in this file, seems to be unsused right now
60
//private HashSet<Long> ledgerMembers = new HashSet<Long>();
62
* The 1K block at the head of the entry logger file
63
* that contains the fingerprint and (future) meta-data
65
final static ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(1024);
67
LOGFILE_HEADER.put("BKLO".getBytes());
69
// this indicates that a write has happened since the last flush
70
private volatile boolean somethingWritten = false;
73
* Create an EntryLogger that stores it's log files in the given
76
public EntryLogger(File dirs[]) throws IOException {
78
// Find the largest logId
80
long lastLogId = getLastLogId(f);
81
if (lastLogId >= logId) {
86
//syncThread = new SyncThread();
91
* Maps entry log files to open channels.
93
private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
96
* Creates a new log file with the given id.
98
private void createLogId(long logId) throws IOException {
99
List<File> list = Arrays.asList(dirs);
100
Collections.shuffle(list);
101
File firstDir = list.get(0);
102
if (logChannel != null) {
103
logChannel.flush(true);
105
logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
106
logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
107
channels.put(logId, logChannel);
109
setLastLogId(f, logId);
114
* writes the given id to the "lastId" file in the given directory.
116
private void setLastLogId(File dir, long logId) throws IOException {
117
FileOutputStream fos;
118
fos = new FileOutputStream(new File(dir, "lastId"));
119
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos));
121
bw.write(Long.toHexString(logId) + "\n");
126
} catch (IOException e) {
132
* reads id from the "lastId" file in the given directory.
134
private long getLastLogId(File f) {
137
fis = new FileInputStream(new File(f, "lastId"));
138
} catch (FileNotFoundException e) {
141
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
143
String lastIdString = br.readLine();
144
return Long.parseLong(lastIdString);
145
} catch (IOException e) {
147
} catch(NumberFormatException e) {
152
} catch (IOException e) {
157
private void openNewChannel() throws IOException {
158
createLogId(++logId);
161
synchronized void flush() throws IOException {
162
if (logChannel != null) {
163
logChannel.flush(true);
166
synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
167
if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
170
ByteBuffer buff = ByteBuffer.allocate(4);
171
buff.putInt(entry.remaining());
173
logChannel.write(buff);
174
long pos = logChannel.position();
175
logChannel.write(entry);
176
//logChannel.flush(false);
177
somethingWritten = true;
178
return (logId << 32L) | pos;
181
byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
182
long entryLogId = location >> 32L;
183
long pos = location & 0xffffffffL;
184
ByteBuffer sizeBuff = ByteBuffer.allocate(4);
185
pos -= 4; // we want to get the ledgerId and length to check
188
fc = getChannelForLogId(entryLogId);
189
} catch (FileNotFoundException e) {
190
FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
191
newe.setStackTrace(e.getStackTrace());
194
if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
195
throw new IOException("Short read from entrylog " + entryLogId);
199
int entrySize = sizeBuff.getInt();
200
// entrySize does not include the ledgerId
201
if (entrySize > 1024*1024) {
202
LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
205
byte data[] = new byte[entrySize];
206
ByteBuffer buff = ByteBuffer.wrap(data);
207
int rc = fc.read(buff, pos);
208
if ( rc != data.length) {
209
throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
212
long thisLedgerId = buff.getLong();
213
if (thisLedgerId != ledgerId) {
214
throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
216
long thisEntryId = buff.getLong();
217
if (thisEntryId != entryId) {
218
throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
224
private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
225
BufferedChannel fc = channels.get(entryLogId);
229
File file = findFile(entryLogId);
230
FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
231
synchronized (channels) {
232
fc = channels.get(entryLogId);
237
fc = new BufferedChannel(newFc, 8192);
238
channels.put(entryLogId, fc);
243
private File findFile(long logId) throws FileNotFoundException {
245
File f = new File(d, Long.toHexString(logId)+".log");
250
throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
253
public void close() {
256
synchronized public boolean testAndClearSomethingWritten() {
258
return somethingWritten;
260
somethingWritten = false;