~juju/pyjuju/zookeeper-vendor

« back to all changes in this revision

Viewing changes to contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java

  • Committer: Gustavo Niemeyer
  • Date: 2011-05-24 20:53:37 UTC
  • Revision ID: gustavo@niemeyer.net-20110524205337-i11yow5biap5xapo
Importing stock Zookeeper 3.3.3 without jars.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *
 
3
 * Licensed to the Apache Software Foundation (ASF) under one
 
4
 * or more contributor license agreements.  See the NOTICE file
 
5
 * distributed with this work for additional information
 
6
 * regarding copyright ownership.  The ASF licenses this file
 
7
 * to you under the Apache License, Version 2.0 (the
 
8
 * "License"); you may not use this file except in compliance
 
9
 * with the License.  You may obtain a copy of the License at
 
10
 *
 
11
 *   http://www.apache.org/licenses/LICENSE-2.0
 
12
 *
 
13
 * Unless required by applicable law or agreed to in writing,
 
14
 * software distributed under the License is distributed on an
 
15
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 
16
 * KIND, either express or implied.  See the License for the
 
17
 * specific language governing permissions and limitations
 
18
 * under the License.
 
19
 *
 
20
 */
 
21
 
 
22
package org.apache.bookkeeper.bookie;
 
23
 
 
24
import java.io.BufferedReader;
 
25
import java.io.BufferedWriter;
 
26
import java.io.File;
 
27
import java.io.FileInputStream;
 
28
import java.io.FileNotFoundException;
 
29
import java.io.FileOutputStream;
 
30
import java.io.IOException;
 
31
import java.io.InputStreamReader;
 
32
import java.io.OutputStreamWriter;
 
33
import java.io.RandomAccessFile;
 
34
import java.nio.ByteBuffer;
 
35
import java.nio.channels.FileChannel;
 
36
import java.util.Arrays;
 
37
import java.util.Collections;
 
38
import java.util.List;
 
39
import java.util.concurrent.ConcurrentHashMap;
 
40
 
 
41
import org.apache.log4j.Logger;
 
42
 
 
43
/**
 
44
 * This class manages the writing of the bookkeeper entries. All the new
 
45
 * entries are written to a common log. The LedgerCache will have pointers
 
46
 * into files created by this class with offsets into the files to find
 
47
 * the actual ledger entry. The entry log files created by this class are
 
48
 * identified by a long.
 
49
 */
 
50
public class EntryLogger {
 
51
    private static final Logger LOG = Logger.getLogger(EntryLogger.class);
 
52
    private File dirs[];
 
53
    private long logId;
 
54
    /**
 
55
     * The maximum size of a entry logger file.
 
56
     */
 
57
    final static long LOG_SIZE_LIMIT = 2*1024*1024*1024L;
 
58
    private volatile BufferedChannel logChannel;
 
59
    // The ledgers contained in this file, seems to be unsused right now
 
60
    //private HashSet<Long> ledgerMembers = new HashSet<Long>();
 
61
    /**
 
62
     * The 1K block at the head of the entry logger file
 
63
     * that contains the fingerprint and (future) meta-data
 
64
     */
 
65
    final static ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(1024);
 
66
    static {
 
67
        LOGFILE_HEADER.put("BKLO".getBytes());
 
68
    }
 
69
    // this indicates that a write has happened since the last flush
 
70
    private volatile boolean somethingWritten = false;
 
71
    
 
72
    /**
 
73
     * Create an EntryLogger that stores it's log files in the given
 
74
     * directories
 
75
     */
 
76
    public EntryLogger(File dirs[]) throws IOException {
 
77
        this.dirs = dirs;
 
78
        // Find the largest logId
 
79
        for(File f: dirs) {
 
80
            long lastLogId = getLastLogId(f);
 
81
            if (lastLogId >= logId) {
 
82
                logId = lastLogId+1;
 
83
            }
 
84
        }
 
85
        createLogId(logId);
 
86
        //syncThread = new SyncThread();
 
87
        //syncThread.start();
 
88
    }
 
89
    
 
90
    /**
 
91
     * Maps entry log files to open channels.
 
92
     */
 
93
    private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
 
94
    
 
95
    /**
 
96
     * Creates a new log file with the given id.
 
97
     */
 
98
    private void createLogId(long logId) throws IOException {
 
99
        List<File> list = Arrays.asList(dirs);
 
100
        Collections.shuffle(list);
 
101
        File firstDir = list.get(0);
 
102
        if (logChannel != null) {
 
103
            logChannel.flush(true);
 
104
        }
 
105
        logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
 
106
        logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
 
107
        channels.put(logId, logChannel);
 
108
        for(File f: dirs) {
 
109
            setLastLogId(f, logId);
 
110
        }
 
111
    }
 
112
 
 
113
    /**
 
114
     * writes the given id to the "lastId" file in the given directory.
 
115
     */
 
116
    private void setLastLogId(File dir, long logId) throws IOException {
 
117
        FileOutputStream fos;
 
118
        fos = new FileOutputStream(new File(dir, "lastId"));
 
119
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos));
 
120
        try {
 
121
            bw.write(Long.toHexString(logId) + "\n");
 
122
            bw.flush();
 
123
        } finally {
 
124
            try {
 
125
                fos.close();
 
126
            } catch (IOException e) {
 
127
            }
 
128
        }
 
129
    }
 
130
    
 
131
    /**
 
132
     * reads id from the "lastId" file in the given directory.
 
133
     */
 
134
    private long getLastLogId(File f) {
 
135
        FileInputStream fis;
 
136
        try {
 
137
            fis = new FileInputStream(new File(f, "lastId"));
 
138
        } catch (FileNotFoundException e) {
 
139
            return -1;
 
140
        }
 
141
        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
 
142
        try {
 
143
            String lastIdString = br.readLine();
 
144
            return Long.parseLong(lastIdString);
 
145
        } catch (IOException e) {
 
146
            return -1;
 
147
        } catch(NumberFormatException e) {
 
148
            return -1;
 
149
        } finally {
 
150
            try {
 
151
                fis.close();
 
152
            } catch (IOException e) {
 
153
            }
 
154
        }
 
155
    }
 
156
    
 
157
    private void openNewChannel() throws IOException {
 
158
        createLogId(++logId);
 
159
    }
 
160
    
 
161
    synchronized void flush() throws IOException {
 
162
        if (logChannel != null) {
 
163
            logChannel.flush(true);
 
164
        }
 
165
    }
 
166
    synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
 
167
        if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
 
168
            openNewChannel();
 
169
        }
 
170
        ByteBuffer buff = ByteBuffer.allocate(4);
 
171
        buff.putInt(entry.remaining());
 
172
        buff.flip();
 
173
        logChannel.write(buff);
 
174
        long pos = logChannel.position();
 
175
        logChannel.write(entry);
 
176
        //logChannel.flush(false);
 
177
        somethingWritten = true;
 
178
        return (logId << 32L) | pos;
 
179
    }
 
180
    
 
181
    byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
 
182
        long entryLogId = location >> 32L;
 
183
        long pos = location & 0xffffffffL;
 
184
        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
 
185
        pos -= 4; // we want to get the ledgerId and length to check
 
186
        BufferedChannel fc;
 
187
        try {
 
188
            fc = getChannelForLogId(entryLogId);
 
189
        } catch (FileNotFoundException e) {
 
190
            FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
 
191
            newe.setStackTrace(e.getStackTrace());
 
192
            throw newe;
 
193
        }
 
194
        if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
 
195
            throw new IOException("Short read from entrylog " + entryLogId);
 
196
        }
 
197
        pos += 4;
 
198
        sizeBuff.flip();
 
199
        int entrySize = sizeBuff.getInt();
 
200
        // entrySize does not include the ledgerId
 
201
        if (entrySize > 1024*1024) {
 
202
            LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
 
203
            
 
204
        }
 
205
        byte data[] = new byte[entrySize];
 
206
        ByteBuffer buff = ByteBuffer.wrap(data);
 
207
        int rc = fc.read(buff, pos);
 
208
        if ( rc != data.length) {
 
209
            throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
 
210
        }
 
211
        buff.flip();
 
212
        long thisLedgerId = buff.getLong();
 
213
        if (thisLedgerId != ledgerId) {
 
214
            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
 
215
        }
 
216
        long thisEntryId = buff.getLong();
 
217
        if (thisEntryId != entryId) {
 
218
            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
 
219
        }
 
220
        
 
221
        return data;
 
222
    }
 
223
    
 
224
    private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
 
225
        BufferedChannel fc = channels.get(entryLogId);
 
226
        if (fc != null) {
 
227
            return fc;
 
228
        }
 
229
        File file = findFile(entryLogId);
 
230
        FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
 
231
        synchronized (channels) {
 
232
            fc = channels.get(entryLogId);
 
233
            if (fc != null){
 
234
                newFc.close();
 
235
                return fc;
 
236
            }
 
237
            fc = new BufferedChannel(newFc, 8192);
 
238
            channels.put(entryLogId, fc);
 
239
            return fc;
 
240
        }
 
241
    }
 
242
 
 
243
    private File findFile(long logId) throws FileNotFoundException {
 
244
        for(File d: dirs) {
 
245
            File f = new File(d, Long.toHexString(logId)+".log");
 
246
            if (f.exists()) {
 
247
                return f;
 
248
            }
 
249
        }
 
250
        throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
 
251
    }
 
252
    
 
253
    public void close() {
 
254
    }
 
255
 
 
256
    synchronized public boolean testAndClearSomethingWritten() {
 
257
        try {
 
258
            return somethingWritten;
 
259
        } finally {
 
260
            somethingWritten = false;
 
261
        }
 
262
    }
 
263
 
 
264
}