2
* Copyright 2009 The Apache Software Foundation
4
* Licensed to the Apache Software Foundation (ASF) under one
5
* or more contributor license agreements. See the NOTICE file
6
* distributed with this work for additional information
7
* regarding copyright ownership. The ASF licenses this file
8
* to you under the Apache License, Version 2.0 (the
9
* "License"); you may not use this file except in compliance
10
* with the License. You may obtain a copy of the License at
12
* http://www.apache.org/licenses/LICENSE-2.0
14
* Unless required by applicable law or agreed to in writing, software
15
* distributed under the License is distributed on an "AS IS" BASIS,
16
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
* See the License for the specific language governing permissions and
18
* limitations under the License.
21
package org.apache.hadoop.hbase.regionserver;
23
import java.io.IOException;
24
import java.lang.management.ManagementFactory;
25
import java.lang.management.RuntimeMXBean;
26
import java.rmi.UnexpectedException;
27
import java.util.ArrayList;
28
import java.util.Iterator;
29
import java.util.List;
30
import java.util.NavigableSet;
32
import java.util.SortedSet;
33
import java.util.concurrent.CopyOnWriteArraySet;
34
import java.util.concurrent.atomic.AtomicLong;
35
import java.util.concurrent.locks.ReentrantReadWriteLock;
37
import org.apache.commons.logging.Log;
38
import org.apache.commons.logging.LogFactory;
39
import org.apache.hadoop.hbase.HConstants;
40
import org.apache.hadoop.hbase.KeyValue;
41
import org.apache.hadoop.hbase.io.HeapSize;
42
import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
43
import org.apache.hadoop.hbase.util.Bytes;
44
import org.apache.hadoop.hbase.util.ClassSize;
47
* The MemStore holds in-memory modifications to the Store. Modifications
48
* are {@link KeyValue}s. When asked to flush, current memstore is moved
49
* to snapshot and is cleared. We continue to serve edits out of new memstore
50
* and backing snapshot until flusher reports in that the flush succeeded. At
51
* this point we let the snapshot go.
52
* TODO: Adjust size of the memstore when we remove items because they have
54
* TODO: With new KVSLS, need to make sure we update HeapSize with difference
57
public class MemStore implements HeapSize {
58
private static final Log LOG = LogFactory.getLog(MemStore.class);
60
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
61
// better semantics. The Map will overwrite if passed a key it already had
62
// whereas the Set will not add new KV if key is same though value might be
63
// different. Value is not important -- just make sure always same
65
volatile KeyValueSkipListSet kvset;
67
// Snapshot of memstore. Made for flusher.
68
volatile KeyValueSkipListSet snapshot;
70
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
72
final KeyValue.KVComparator comparator;
74
// Used comparing versions -- same r/c and ts but different type.
75
final KeyValue.KVComparator comparatorIgnoreType;
77
// Used comparing versions -- same r/c and type but different timestamp.
78
final KeyValue.KVComparator comparatorIgnoreTimestamp;
80
// Used to track own heapSize
81
final AtomicLong size;
83
* Default constructor. Used for tests.
86
this(KeyValue.COMPARATOR);
93
public MemStore(final KeyValue.KVComparator c) {
95
this.comparatorIgnoreTimestamp =
96
this.comparator.getComparatorIgnoringTimestamps();
97
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
98
this.kvset = new KeyValueSkipListSet(c);
99
this.snapshot = new KeyValueSkipListSet(c);
100
this.size = new AtomicLong(DEEP_OVERHEAD);
104
for (KeyValue kv: this.kvset) {
107
for (KeyValue kv: this.snapshot) {
113
* Creates a snapshot of the current memstore.
114
* Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)}
115
* To get the snapshot made by this method, use {@link #getSnapshot()}
118
this.lock.writeLock().lock();
120
// If snapshot currently has entries, then flusher failed or didn't call
121
// cleanup. Log a warning.
122
if (!this.snapshot.isEmpty()) {
123
LOG.warn("Snapshot called again without clearing previous. " +
124
"Doing nothing. Another ongoing flush or did we fail last attempt?");
126
if (!this.kvset.isEmpty()) {
127
this.snapshot = this.kvset;
128
this.kvset = new KeyValueSkipListSet(this.comparator);
129
// Reset heap to not include any keys
130
this.size.set(DEEP_OVERHEAD);
134
this.lock.writeLock().unlock();
139
* Return the current snapshot.
140
* Called by flusher to get current snapshot made by a previous
141
* call to {@link #snapshot()}
142
* @return Return snapshot.
143
* @see {@link #snapshot()}
144
* @see {@link #clearSnapshot(java.util.SortedSet)}
146
KeyValueSkipListSet getSnapshot() {
147
return this.snapshot;
151
* The passed snapshot was successfully persisted; it can be let go.
152
* @param ss The snapshot to clean out.
153
* @throws UnexpectedException
154
* @see {@link #snapshot()}
156
void clearSnapshot(final SortedSet<KeyValue> ss)
157
throws UnexpectedException {
158
this.lock.writeLock().lock();
160
if (this.snapshot != ss) {
161
throw new UnexpectedException("Current snapshot is " +
162
this.snapshot + ", was passed " + ss);
164
// OK. Passed in snapshot is same as current snapshot. If not-empty,
165
// create a new snapshot and let the old one go.
167
this.snapshot = new KeyValueSkipListSet(this.comparator);
170
this.lock.writeLock().unlock();
177
* @return approximate size of the passed key and value.
179
long add(final KeyValue kv) {
181
this.lock.readLock().lock();
183
s = heapSizeChange(kv, this.kvset.add(kv));
184
this.size.addAndGet(s);
186
this.lock.readLock().unlock();
194
* @return approximate size of the passed key and value.
196
long delete(final KeyValue delete) {
198
this.lock.readLock().lock();
201
s += heapSizeChange(delete, this.kvset.add(delete));
203
this.lock.readLock().unlock();
205
this.size.addAndGet(s);
210
* @param kv Find the row that comes after this one. If null, we return the
212
* @return Next row or null if none found.
214
KeyValue getNextRow(final KeyValue kv) {
215
this.lock.readLock().lock();
217
return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
219
this.lock.readLock().unlock();
226
* @return Return lowest of a or b or null if both a and b are null
228
private KeyValue getLowest(final KeyValue a, final KeyValue b) {
235
return comparator.compareRows(a, b) <= 0? a: b;
239
* @param key Find row that follows this one. If null, return first.
240
* @param map Set to look in for a row beyond <code>row</code>.
241
* @return Next row or null if none found. If one found, will be a new
242
* KeyValue -- can be destroyed by subsequent calls to this method.
244
private KeyValue getNextRow(final KeyValue key,
245
final NavigableSet<KeyValue> set) {
246
KeyValue result = null;
247
SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
248
// Iterate until we fall into the next row; i.e. move off current row
249
for (KeyValue kv: tail) {
250
if (comparator.compareRows(kv, key) <= 0)
252
// Note: Not suppressing deletes or expired cells. Needs to be handled
253
// by higher up functions.
261
* @param state column/delete tracking state
263
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
264
this.lock.readLock().lock();
266
getRowKeyAtOrBefore(kvset, state);
267
getRowKeyAtOrBefore(snapshot, state);
269
this.lock.readLock().unlock();
275
* @param state Accumulates deletes and candidates.
277
private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
278
final GetClosestRowBeforeTracker state) {
282
if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
283
// Found nothing in row. Try backing up.
284
getRowKeyBefore(set, state);
289
* Walk forward in a row from <code>firstOnRow</code>. Presumption is that
290
* we have been passed the first possible key on a row. As we walk forward
291
* we accumulate deletes until we hit a candidate on the row at which point
294
* @param firstOnRow First possible key on this row.
296
* @return True if we found a candidate walking this row.
298
private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
299
final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
300
boolean foundCandidate = false;
301
SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
302
if (tail.isEmpty()) return foundCandidate;
303
for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
304
KeyValue kv = i.next();
305
// Did we go beyond the target row? If so break.
306
if (state.isTooFar(kv, firstOnRow)) break;
307
if (state.isExpired(kv)) {
311
// If we added something, this row is a contender. break.
312
if (state.handle(kv)) {
313
foundCandidate = true;
317
return foundCandidate;
321
* Walk backwards through the passed set a row at a time until we run out of
322
* set or until we get a candidate.
326
private void getRowKeyBefore(NavigableSet<KeyValue> set,
327
final GetClosestRowBeforeTracker state) {
328
KeyValue firstOnRow = state.getTargetKey();
329
for (Member p = memberOfPreviousRow(set, state, firstOnRow);
330
p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
331
// Make sure we don't fall out of our table.
332
if (!state.isTargetTable(p.kv)) break;
333
// Stop looking if we've exited the better candidate range.
334
if (!state.isBetterCandidate(p.kv)) break;
335
// Make into firstOnRow
336
firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
337
// If we find something, break;
338
if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
343
* Immutable data structure to hold member found in set and the set it was
344
* found in. Include set because it is carrying context.
346
private static class Member {
348
final NavigableSet<KeyValue> set;
349
Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
356
* @param set Set to walk back in. Pass a first in row or we'll return
358
* @param state Utility and context.
359
* @param firstOnRow First item on the row after the one we want to find a
361
* @return Null or member of row previous to <code>firstOnRow</code>
363
private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
364
final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
365
NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
366
if (head.isEmpty()) return null;
367
for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
368
KeyValue found = i.next();
369
if (state.isExpired(found)) {
373
return new Member(head, found);
379
* @return scanner on memstore and snapshot in this order.
381
KeyValueScanner [] getScanners() {
382
this.lock.readLock().lock();
384
KeyValueScanner [] scanners = new KeyValueScanner[1];
385
scanners[0] = new MemStoreScanner();
388
this.lock.readLock().unlock();
393
// HBASE-880/1249/1304
397
* Perform a single-row Get on the and snapshot, placing results
398
* into the specified KV list.
400
* This will return true if it is determined that the query is complete
401
* and it is not necessary to check any storefiles after this.
403
* Otherwise, it will return false and you should continue on.
404
* @param matcher Column matcher
405
* @param result List to add results to
406
* @return true if done with store (early-out), false if not
408
public boolean get(QueryMatcher matcher, List<KeyValue> result) {
409
this.lock.readLock().lock();
411
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
415
return internalGet(this.snapshot, matcher, result) || matcher.isDone();
417
this.lock.readLock().unlock();
422
* Gets from either the memstore or the snapshop, and returns a code
423
* to let you know which is which.
425
* @param matcher query matcher
426
* @param result puts results here
427
* @return 1 == memstore, 2 == snapshot, 0 == none
429
int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
430
this.lock.readLock().lock();
432
boolean fromMemstore = internalGet(this.kvset, matcher, result);
433
if (fromMemstore || matcher.isDone())
437
boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
438
if (fromSnapshot || matcher.isDone())
443
this.lock.readLock().unlock();
448
* Small utility functions for use by Store.incrementColumnValue
449
* _only_ under the threat of pain and everlasting race conditions.
451
void readLockLock() {
452
this.lock.readLock().lock();
454
void readLockUnlock() {
455
this.lock.readLock().unlock();
460
* @param set memstore or snapshot
461
* @param matcher query matcher
462
* @param result list to add results to
463
* @return true if done with store (early-out), false if not
465
boolean internalGet(final NavigableSet<KeyValue> set,
466
final QueryMatcher matcher, final List<KeyValue> result) {
467
if(set.isEmpty()) return false;
469
SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
470
for (KeyValue kv : tail) {
471
QueryMatcher.MatchCode res = matcher.match(kv);
483
throw new RuntimeException("Unexpected " + res);
491
* MemStoreScanner implements the KeyValueScanner.
492
* It lets the caller scan the contents of a memstore -- both current
494
* This behaves as if it were a real scanner but does not maintain position.
496
protected class MemStoreScanner implements KeyValueScanner {
497
// Next row information for either kvset or snapshot
498
private KeyValue kvsetNextRow = null;
499
private KeyValue snapshotNextRow = null;
501
// iterator based scanning.
502
Iterator<KeyValue> kvsetIt;
503
Iterator<KeyValue> snapshotIt;
508
So memstorescanner is fixed at creation time. this includes pointers/iterators into
509
existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
510
snapshot is moved. since kvset is null there is no point on reseeking on both,
511
we can save us the trouble. During the snapshot->hfile transition, the memstore
512
scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
513
potentially do something smarter by adjusting the existing memstore scanner.
515
But there is a greater problem here, that being once a scanner has progressed
516
during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
517
if a scan lasts a little while, there is a chance for new entries in kvset to
518
become available but we will never see them. This needs to be handled at the
519
StoreScanner level with coordination with MemStoreScanner.
527
//DebugPrint.println(" MS new@" + hashCode());
530
protected KeyValue getNext(Iterator<KeyValue> it) {
532
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
533
//DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
535
while (ret == null && it.hasNext()) {
536
KeyValue v = it.next();
537
if (v.getMemstoreTS() <= readPoint) {
544
public synchronized boolean seek(KeyValue key) {
550
// kvset and snapshot will never be empty.
551
// if tailSet cant find anything, SS is empty (not null).
552
SortedSet<KeyValue> kvTail = kvset.tailSet(key);
553
SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
555
kvsetIt = kvTail.iterator();
556
snapshotIt = snapshotTail.iterator();
558
kvsetNextRow = getNext(kvsetIt);
559
snapshotNextRow = getNext(snapshotIt);
560
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
562
//DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
563
// kvset.size() + " threadread = " + readPoint);
564
//DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
565
// snapshot.size() + " threadread = " + readPoint);
567
KeyValue lowest = getLowest();
569
// has data := (lowest != null)
570
return lowest != null;
573
public synchronized KeyValue peek() {
574
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
579
public synchronized KeyValue next() {
580
KeyValue theNext = getLowest();
582
if (theNext == null) {
586
// Advance one of the iterators
587
if (theNext == kvsetNextRow) {
588
kvsetNextRow = getNext(kvsetIt);
590
snapshotNextRow = getNext(snapshotIt);
592
//long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
593
//DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
594
// getLowest() + " threadpoint=" + readpoint);
599
protected KeyValue getLowest() {
600
return getLower(kvsetNextRow,
605
* Returns the lower of the two key values, or null if they are both null.
606
* This uses comparator.compare() to compare the KeyValue using the memstore
609
protected KeyValue getLower(KeyValue first, KeyValue second) {
610
if (first == null && second == null) {
613
if (first != null && second != null) {
614
int compare = comparator.compare(first, second);
615
return (compare <= 0 ? first : second);
617
return (first != null ? first : second);
620
public synchronized void close() {
621
// Accelerate the GC a bit perhaps?
623
this.snapshotIt = null;
625
this.kvsetNextRow = null;
626
this.snapshotNextRow = null;
630
public final static long FIXED_OVERHEAD = ClassSize.align(
631
ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
633
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
634
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
635
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
636
(2 * ClassSize.CONCURRENT_SKIPLISTMAP));
639
* Calculate how the MemStore size has changed. Includes overhead of the
642
* @param notpresent True if the kv was NOT present in the set.
645
long heapSizeChange(final KeyValue kv, final boolean notpresent) {
647
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
652
* Get the entire heap usage for this MemStore not including keys in the
656
public long heapSize() {
661
* Get the heap usage of KVs in this MemStore.
663
public long keySize() {
664
return heapSize() - DEEP_OVERHEAD;
668
* Get an estimate of the number of key values stored in this store.
670
* @return the number of key/values in this memstore.
672
public int numKeyValues() {
673
return kvset.size() + snapshot.size();
677
* Code to help figure if our approximation of object heap sizes is close
678
* enough. See hbase-900. Fills memstores then waits so user can heap
679
* dump and bring up resultant hprof in something like jprofiler which
680
* allows you get 'deep size' on objects.
681
* @param args main args
683
public static void main(String [] args) {
684
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
685
LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
686
runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
687
LOG.info("vmInputArguments=" + runtime.getInputArguments());
688
MemStore memstore1 = new MemStore();
691
final int count = 10000;
692
byte [] column = Bytes.toBytes("col:umn");
693
for (int i = 0; i < count; i++) {
694
// Give each its own ts
695
size += memstore1.add(new KeyValue(Bytes.toBytes(i), column, i));
697
LOG.info("memstore1 estimated size=" + size);
698
for (int i = 0; i < count; i++) {
699
size += memstore1.add(new KeyValue(Bytes.toBytes(i), column, i));
701
LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
702
// Make a variably sized memstore.
703
MemStore memstore2 = new MemStore();
704
for (int i = 0; i < count; i++) {
705
size += memstore2.add(new KeyValue(Bytes.toBytes(i), column, i,
708
LOG.info("memstore2 estimated size=" + size);
709
final int seconds = 30;
710
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
711
for (int i = 0; i < seconds; i++) {
712
// Thread.sleep(1000);
714
LOG.info("Exiting.");