~ubuntu-branches/ubuntu/precise/hbase/precise

« back to all changes in this revision

Viewing changes to src/java/org/apache/hadoop/hbase/regionserver/MemStore.java

  • Committer: Bazaar Package Importer
  • Author(s): Thomas Koch
  • Date: 2010-05-06 14:20:42 UTC
  • Revision ID: james.westby@ubuntu.com-20100506142042-r3hlvgxdcpb8tynl
Tags: upstream-0.20.4+dfsg1
ImportĀ upstreamĀ versionĀ 0.20.4+dfsg1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * Copyright 2009 The Apache Software Foundation
 
3
 *
 
4
 * Licensed to the Apache Software Foundation (ASF) under one
 
5
 * or more contributor license agreements.  See the NOTICE file
 
6
 * distributed with this work for additional information
 
7
 * regarding copyright ownership.  The ASF licenses this file
 
8
 * to you under the Apache License, Version 2.0 (the
 
9
 * "License"); you may not use this file except in compliance
 
10
 * with the License.  You may obtain a copy of the License at
 
11
 *
 
12
 *     http://www.apache.org/licenses/LICENSE-2.0
 
13
 *
 
14
 * Unless required by applicable law or agreed to in writing, software
 
15
 * distributed under the License is distributed on an "AS IS" BASIS,
 
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
17
 * See the License for the specific language governing permissions and
 
18
 * limitations under the License.
 
19
 */
 
20
 
 
21
package org.apache.hadoop.hbase.regionserver;
 
22
 
 
23
import java.io.IOException;
 
24
import java.lang.management.ManagementFactory;
 
25
import java.lang.management.RuntimeMXBean;
 
26
import java.rmi.UnexpectedException;
 
27
import java.util.ArrayList;
 
28
import java.util.Iterator;
 
29
import java.util.List;
 
30
import java.util.NavigableSet;
 
31
import java.util.Set;
 
32
import java.util.SortedSet;
 
33
import java.util.concurrent.CopyOnWriteArraySet;
 
34
import java.util.concurrent.atomic.AtomicLong;
 
35
import java.util.concurrent.locks.ReentrantReadWriteLock;
 
36
 
 
37
import org.apache.commons.logging.Log;
 
38
import org.apache.commons.logging.LogFactory;
 
39
import org.apache.hadoop.hbase.HConstants;
 
40
import org.apache.hadoop.hbase.KeyValue;
 
41
import org.apache.hadoop.hbase.io.HeapSize;
 
42
import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 
43
import org.apache.hadoop.hbase.util.Bytes;
 
44
import org.apache.hadoop.hbase.util.ClassSize;
 
45
 
 
46
/**
 
47
 * The MemStore holds in-memory modifications to the Store.  Modifications
 
48
 * are {@link KeyValue}s.  When asked to flush, current memstore is moved
 
49
 * to snapshot and is cleared.  We continue to serve edits out of new memstore
 
50
 * and backing snapshot until flusher reports in that the flush succeeded. At
 
51
 * this point we let the snapshot go.
 
52
 * TODO: Adjust size of the memstore when we remove items because they have
 
53
 * been deleted.
 
54
 * TODO: With new KVSLS, need to make sure we update HeapSize with difference
 
55
 * in KV size.
 
56
 */
 
57
public class MemStore implements HeapSize {
 
58
  private static final Log LOG = LogFactory.getLog(MemStore.class);
 
59
 
 
60
  // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
 
61
  // better semantics.  The Map will overwrite if passed a key it already had
 
62
  // whereas the Set will not add new KV if key is same though value might be
 
63
  // different.  Value is not important -- just make sure always same
 
64
  // reference passed.
 
65
  volatile KeyValueSkipListSet kvset;
 
66
 
 
67
  // Snapshot of memstore.  Made for flusher.
 
68
  volatile KeyValueSkipListSet snapshot;
 
69
 
 
70
  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
71
 
 
72
  final KeyValue.KVComparator comparator;
 
73
 
 
74
  // Used comparing versions -- same r/c and ts but different type.
 
75
  final KeyValue.KVComparator comparatorIgnoreType;
 
76
 
 
77
  // Used comparing versions -- same r/c and type but different timestamp.
 
78
  final KeyValue.KVComparator comparatorIgnoreTimestamp;
 
79
 
 
80
  // Used to track own heapSize
 
81
  final AtomicLong size;
 
82
  /**
 
83
   * Default constructor. Used for tests.
 
84
   */
 
85
  public MemStore() {
 
86
    this(KeyValue.COMPARATOR);
 
87
  }
 
88
 
 
89
  /**
 
90
   * Constructor.
 
91
   * @param c Comparator
 
92
   */
 
93
  public MemStore(final KeyValue.KVComparator c) {
 
94
    this.comparator = c;
 
95
    this.comparatorIgnoreTimestamp =
 
96
      this.comparator.getComparatorIgnoringTimestamps();
 
97
    this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
 
98
    this.kvset = new KeyValueSkipListSet(c);
 
99
    this.snapshot = new KeyValueSkipListSet(c);
 
100
    this.size = new AtomicLong(DEEP_OVERHEAD);
 
101
  }
 
102
 
 
103
  void dump() {
 
104
    for (KeyValue kv: this.kvset) {
 
105
      LOG.info(kv);
 
106
    }
 
107
    for (KeyValue kv: this.snapshot) {
 
108
      LOG.info(kv);
 
109
    }
 
110
  }
 
111
 
 
112
  /**
 
113
   * Creates a snapshot of the current memstore.
 
114
   * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)}
 
115
   * To get the snapshot made by this method, use {@link #getSnapshot()}
 
116
   */
 
117
  void snapshot() {
 
118
    this.lock.writeLock().lock();
 
119
    try {
 
120
      // If snapshot currently has entries, then flusher failed or didn't call
 
121
      // cleanup.  Log a warning.
 
122
      if (!this.snapshot.isEmpty()) {
 
123
        LOG.warn("Snapshot called again without clearing previous. " +
 
124
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
 
125
      } else {
 
126
        if (!this.kvset.isEmpty()) {
 
127
          this.snapshot = this.kvset;
 
128
          this.kvset = new KeyValueSkipListSet(this.comparator);
 
129
          // Reset heap to not include any keys
 
130
          this.size.set(DEEP_OVERHEAD);
 
131
        }
 
132
      }
 
133
    } finally {
 
134
      this.lock.writeLock().unlock();
 
135
    }
 
136
  }
 
137
 
 
138
  /**
 
139
   * Return the current snapshot.
 
140
   * Called by flusher to get current snapshot made by a previous
 
141
   * call to {@link #snapshot()}
 
142
   * @return Return snapshot.
 
143
   * @see {@link #snapshot()}
 
144
   * @see {@link #clearSnapshot(java.util.SortedSet)}
 
145
   */
 
146
  KeyValueSkipListSet getSnapshot() {
 
147
    return this.snapshot;
 
148
  }
 
149
 
 
150
  /**
 
151
   * The passed snapshot was successfully persisted; it can be let go.
 
152
   * @param ss The snapshot to clean out.
 
153
   * @throws UnexpectedException
 
154
   * @see {@link #snapshot()}
 
155
   */
 
156
  void clearSnapshot(final SortedSet<KeyValue> ss)
 
157
  throws UnexpectedException {
 
158
    this.lock.writeLock().lock();
 
159
    try {
 
160
      if (this.snapshot != ss) {
 
161
        throw new UnexpectedException("Current snapshot is " +
 
162
          this.snapshot + ", was passed " + ss);
 
163
      }
 
164
      // OK. Passed in snapshot is same as current snapshot.  If not-empty,
 
165
      // create a new snapshot and let the old one go.
 
166
      if (!ss.isEmpty()) {
 
167
        this.snapshot = new KeyValueSkipListSet(this.comparator);
 
168
      }
 
169
    } finally {
 
170
      this.lock.writeLock().unlock();
 
171
    }
 
172
  }
 
173
 
 
174
  /**
 
175
   * Write an update
 
176
   * @param kv
 
177
   * @return approximate size of the passed key and value.
 
178
   */
 
179
  long add(final KeyValue kv) {
 
180
    long s = -1;
 
181
    this.lock.readLock().lock();
 
182
    try {
 
183
      s = heapSizeChange(kv, this.kvset.add(kv));
 
184
      this.size.addAndGet(s);
 
185
    } finally {
 
186
      this.lock.readLock().unlock();
 
187
    }
 
188
    return s;
 
189
  }
 
190
 
 
191
  /** 
 
192
   * Write a delete
 
193
   * @param delete
 
194
   * @return approximate size of the passed key and value.
 
195
   */
 
196
  long delete(final KeyValue delete) {
 
197
    long s = 0;
 
198
    this.lock.readLock().lock();
 
199
 
 
200
    try {
 
201
      s += heapSizeChange(delete, this.kvset.add(delete));
 
202
    } finally {
 
203
      this.lock.readLock().unlock();
 
204
    }
 
205
    this.size.addAndGet(s);
 
206
    return s;
 
207
  }
 
208
  
 
209
  /**
 
210
   * @param kv Find the row that comes after this one.  If null, we return the
 
211
   * first.
 
212
   * @return Next row or null if none found.
 
213
   */
 
214
  KeyValue getNextRow(final KeyValue kv) {
 
215
    this.lock.readLock().lock();
 
216
    try {
 
217
      return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
 
218
    } finally {
 
219
      this.lock.readLock().unlock();
 
220
    }
 
221
  }
 
222
 
 
223
  /*
 
224
   * @param a
 
225
   * @param b
 
226
   * @return Return lowest of a or b or null if both a and b are null
 
227
   */
 
228
  private KeyValue getLowest(final KeyValue a, final KeyValue b) {
 
229
    if (a == null) {
 
230
      return b;
 
231
    }
 
232
    if (b == null) {
 
233
      return a;
 
234
    }
 
235
    return comparator.compareRows(a, b) <= 0? a: b;
 
236
  }
 
237
 
 
238
  /*
 
239
   * @param key Find row that follows this one.  If null, return first.
 
240
   * @param map Set to look in for a row beyond <code>row</code>.
 
241
   * @return Next row or null if none found.  If one found, will be a new
 
242
   * KeyValue -- can be destroyed by subsequent calls to this method.
 
243
   */
 
244
  private KeyValue getNextRow(final KeyValue key,
 
245
      final NavigableSet<KeyValue> set) {
 
246
    KeyValue result = null;
 
247
    SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
 
248
    // Iterate until we fall into the next row; i.e. move off current row
 
249
    for (KeyValue kv: tail) {
 
250
      if (comparator.compareRows(kv, key) <= 0)
 
251
        continue;
 
252
      // Note: Not suppressing deletes or expired cells.  Needs to be handled
 
253
      // by higher up functions.
 
254
      result = kv;
 
255
      break;
 
256
    }
 
257
    return result;
 
258
  }
 
259
 
 
260
  /**
 
261
   * @param state column/delete tracking state
 
262
   */
 
263
  void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
 
264
    this.lock.readLock().lock();
 
265
    try {
 
266
      getRowKeyAtOrBefore(kvset, state);
 
267
      getRowKeyAtOrBefore(snapshot, state);
 
268
    } finally {
 
269
      this.lock.readLock().unlock();
 
270
    }
 
271
  }
 
272
 
 
273
  /*
 
274
   * @param set
 
275
   * @param state Accumulates deletes and candidates.
 
276
   */
 
277
  private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
 
278
      final GetClosestRowBeforeTracker state) {
 
279
    if (set.isEmpty()) {
 
280
      return;
 
281
    }
 
282
    if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
 
283
      // Found nothing in row.  Try backing up.
 
284
      getRowKeyBefore(set, state);
 
285
    }
 
286
  }
 
287
 
 
288
  /*
 
289
   * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
 
290
   * we have been passed the first possible key on a row.  As we walk forward
 
291
   * we accumulate deletes until we hit a candidate on the row at which point
 
292
   * we return.
 
293
   * @param set
 
294
   * @param firstOnRow First possible key on this row.
 
295
   * @param state
 
296
   * @return True if we found a candidate walking this row.
 
297
   */
 
298
  private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
 
299
      final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
 
300
    boolean foundCandidate = false;
 
301
    SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
 
302
    if (tail.isEmpty()) return foundCandidate;
 
303
    for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
 
304
      KeyValue kv = i.next();
 
305
      // Did we go beyond the target row? If so break.
 
306
      if (state.isTooFar(kv, firstOnRow)) break;
 
307
      if (state.isExpired(kv)) {
 
308
        i.remove();
 
309
        continue;
 
310
      }
 
311
      // If we added something, this row is a contender. break.
 
312
      if (state.handle(kv)) {
 
313
        foundCandidate = true;
 
314
        break;
 
315
      }
 
316
    }
 
317
    return foundCandidate;
 
318
  }
 
319
 
 
320
  /*
 
321
   * Walk backwards through the passed set a row at a time until we run out of
 
322
   * set or until we get a candidate.
 
323
   * @param set
 
324
   * @param state
 
325
   */
 
326
  private void getRowKeyBefore(NavigableSet<KeyValue> set,
 
327
      final GetClosestRowBeforeTracker state) {
 
328
    KeyValue firstOnRow = state.getTargetKey();
 
329
    for (Member p = memberOfPreviousRow(set, state, firstOnRow);
 
330
        p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
 
331
      // Make sure we don't fall out of our table.
 
332
      if (!state.isTargetTable(p.kv)) break;
 
333
      // Stop looking if we've exited the better candidate range.
 
334
      if (!state.isBetterCandidate(p.kv)) break;
 
335
      // Make into firstOnRow
 
336
      firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
 
337
      // If we find something, break;
 
338
      if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
 
339
    }
 
340
  }
 
341
 
 
342
  /*
 
343
   * Immutable data structure to hold member found in set and the set it was
 
344
   * found in.  Include set because it is carrying context.
 
345
   */
 
346
  private static class Member {
 
347
    final KeyValue kv;
 
348
    final NavigableSet<KeyValue> set;
 
349
    Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
 
350
      this.kv = kv;
 
351
      this.set = s;
 
352
    }
 
353
  }
 
354
 
 
355
  /*
 
356
   * @param set Set to walk back in.  Pass a first in row or we'll return
 
357
   * same row (loop).
 
358
   * @param state Utility and context.
 
359
   * @param firstOnRow First item on the row after the one we want to find a
 
360
   * member in.
 
361
   * @return Null or member of row previous to <code>firstOnRow</code>
 
362
   */
 
363
  private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
 
364
      final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
 
365
    NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
 
366
    if (head.isEmpty()) return null;
 
367
    for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
 
368
      KeyValue found = i.next();
 
369
      if (state.isExpired(found)) {
 
370
        i.remove();
 
371
        continue;
 
372
      }
 
373
      return new Member(head, found);
 
374
    }
 
375
    return null;
 
376
  }
 
377
 
 
378
  /**
 
379
   * @return scanner on memstore and snapshot in this order.
 
380
   */
 
381
  KeyValueScanner [] getScanners() {
 
382
    this.lock.readLock().lock();
 
383
    try {
 
384
      KeyValueScanner [] scanners = new KeyValueScanner[1];
 
385
      scanners[0] = new MemStoreScanner();
 
386
      return scanners;
 
387
    } finally {
 
388
      this.lock.readLock().unlock();
 
389
    }
 
390
  }
 
391
 
 
392
  //
 
393
  // HBASE-880/1249/1304
 
394
  //
 
395
 
 
396
  /**
 
397
   * Perform a single-row Get on the  and snapshot, placing results
 
398
   * into the specified KV list.
 
399
   * <p>
 
400
   * This will return true if it is determined that the query is complete
 
401
   * and it is not necessary to check any storefiles after this.
 
402
   * <p>
 
403
   * Otherwise, it will return false and you should continue on.
 
404
   * @param matcher Column matcher
 
405
   * @param result List to add results to
 
406
   * @return true if done with store (early-out), false if not
 
407
   */
 
408
  public boolean get(QueryMatcher matcher, List<KeyValue> result) {
 
409
    this.lock.readLock().lock();
 
410
    try {
 
411
      if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
 
412
        return true;
 
413
      }
 
414
      matcher.update();
 
415
      return internalGet(this.snapshot, matcher, result) || matcher.isDone();
 
416
    } finally {
 
417
      this.lock.readLock().unlock();
 
418
    }
 
419
  }
 
420
 
 
421
  /**
 
422
   * Gets from either the memstore or the snapshop, and returns a code
 
423
   * to let you know which is which.
 
424
   *
 
425
   * @param matcher query matcher
 
426
   * @param result puts results here
 
427
   * @return 1 == memstore, 2 == snapshot, 0 == none
 
428
   */
 
429
  int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
 
430
    this.lock.readLock().lock();
 
431
    try {
 
432
      boolean fromMemstore = internalGet(this.kvset, matcher, result);
 
433
      if (fromMemstore || matcher.isDone())
 
434
        return 1;
 
435
 
 
436
      matcher.update();
 
437
      boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
 
438
      if (fromSnapshot || matcher.isDone())
 
439
        return 2;
 
440
 
 
441
      return 0;
 
442
    } finally {
 
443
      this.lock.readLock().unlock();
 
444
    }
 
445
  }
 
446
 
 
447
  /**
 
448
   * Small utility functions for use by Store.incrementColumnValue
 
449
   * _only_ under the threat of pain and everlasting race conditions.
 
450
   */
 
451
  void readLockLock() {
 
452
    this.lock.readLock().lock();
 
453
  }
 
454
  void readLockUnlock() {
 
455
    this.lock.readLock().unlock();
 
456
  }
 
457
  
 
458
  /**
 
459
   *
 
460
   * @param set memstore or snapshot
 
461
   * @param matcher query matcher
 
462
   * @param result list to add results to
 
463
   * @return true if done with store (early-out), false if not
 
464
   */
 
465
  boolean internalGet(final NavigableSet<KeyValue> set,
 
466
      final QueryMatcher matcher, final List<KeyValue> result) {
 
467
    if(set.isEmpty()) return false;
 
468
    // Seek to startKey
 
469
    SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
 
470
    for (KeyValue kv : tail) {
 
471
      QueryMatcher.MatchCode res = matcher.match(kv);
 
472
      switch(res) {
 
473
        case INCLUDE:
 
474
          result.add(kv);
 
475
          break;
 
476
        case SKIP:
 
477
          break;
 
478
        case NEXT:
 
479
          return false;
 
480
        case DONE:
 
481
          return true;
 
482
        default:
 
483
          throw new RuntimeException("Unexpected " + res);
 
484
      }
 
485
    }
 
486
    return false;
 
487
  }
 
488
  
 
489
 
 
490
  /*
 
491
   * MemStoreScanner implements the KeyValueScanner.
 
492
   * It lets the caller scan the contents of a memstore -- both current
 
493
   * map and snapshot.
 
494
   * This behaves as if it were a real scanner but does not maintain position.
 
495
   */
 
496
  protected class MemStoreScanner implements KeyValueScanner {
 
497
    // Next row information for either kvset or snapshot
 
498
    private KeyValue kvsetNextRow = null;
 
499
    private KeyValue snapshotNextRow = null;
 
500
 
 
501
    // iterator based scanning.
 
502
    Iterator<KeyValue> kvsetIt;
 
503
    Iterator<KeyValue> snapshotIt;
 
504
 
 
505
    /*
 
506
    Some notes...
 
507
 
 
508
     So memstorescanner is fixed at creation time. this includes pointers/iterators into
 
509
    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
 
510
    snapshot is moved.  since kvset is null there is no point on reseeking on both,
 
511
      we can save us the trouble. During the snapshot->hfile transition, the memstore
 
512
      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
 
513
      potentially do something smarter by adjusting the existing memstore scanner.
 
514
 
 
515
      But there is a greater problem here, that being once a scanner has progressed
 
516
      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
 
517
      if a scan lasts a little while, there is a chance for new entries in kvset to
 
518
      become available but we will never see them.  This needs to be handled at the
 
519
      StoreScanner level with coordination with MemStoreScanner.
 
520
 
 
521
    */
 
522
 
 
523
 
 
524
    MemStoreScanner() {
 
525
      super();
 
526
 
 
527
      //DebugPrint.println(" MS new@" + hashCode());
 
528
    }
 
529
 
 
530
    protected KeyValue getNext(Iterator<KeyValue> it) {
 
531
      KeyValue ret = null;
 
532
      long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
 
533
      //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
 
534
 
 
535
      while (ret == null && it.hasNext()) {
 
536
        KeyValue v = it.next();
 
537
        if (v.getMemstoreTS() <= readPoint) {
 
538
          ret = v;
 
539
        }
 
540
      }
 
541
      return ret;
 
542
    }
 
543
 
 
544
    public synchronized boolean seek(KeyValue key) {
 
545
      if (key == null) {
 
546
        close();
 
547
        return false;
 
548
      }
 
549
 
 
550
      // kvset and snapshot will never be empty.
 
551
      // if tailSet cant find anything, SS is empty (not null).
 
552
      SortedSet<KeyValue> kvTail = kvset.tailSet(key);
 
553
      SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
 
554
 
 
555
      kvsetIt = kvTail.iterator();
 
556
      snapshotIt = snapshotTail.iterator();
 
557
 
 
558
      kvsetNextRow = getNext(kvsetIt);
 
559
      snapshotNextRow = getNext(snapshotIt);
 
560
      long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
 
561
 
 
562
      //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
 
563
      //    kvset.size() + " threadread = " + readPoint);
 
564
      //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
 
565
      //    snapshot.size() + " threadread = " + readPoint);
 
566
 
 
567
      KeyValue lowest = getLowest();
 
568
 
 
569
      // has data := (lowest != null)
 
570
      return lowest != null;
 
571
    }
 
572
 
 
573
    public synchronized KeyValue peek() {
 
574
      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
 
575
      return getLowest();
 
576
    }
 
577
 
 
578
 
 
579
    public synchronized KeyValue next() {
 
580
      KeyValue theNext = getLowest();
 
581
 
 
582
      if (theNext == null) {
 
583
          return null;
 
584
      }
 
585
 
 
586
      // Advance one of the iterators
 
587
      if (theNext == kvsetNextRow) {
 
588
        kvsetNextRow = getNext(kvsetIt);
 
589
      } else {
 
590
        snapshotNextRow = getNext(snapshotIt);
 
591
      }
 
592
      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
 
593
      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
 
594
      //    getLowest() + " threadpoint=" + readpoint);
 
595
 
 
596
      return theNext;
 
597
    }
 
598
 
 
599
    protected KeyValue getLowest() {
 
600
      return getLower(kvsetNextRow,
 
601
          snapshotNextRow);
 
602
    }
 
603
 
 
604
    /*
 
605
     * Returns the lower of the two key values, or null if they are both null.
 
606
     * This uses comparator.compare() to compare the KeyValue using the memstore
 
607
     * comparator.
 
608
     */
 
609
    protected KeyValue getLower(KeyValue first, KeyValue second) {
 
610
      if (first == null && second == null) {
 
611
        return null;
 
612
      }
 
613
      if (first != null && second != null) {
 
614
        int compare = comparator.compare(first, second);
 
615
        return (compare <= 0 ? first : second);
 
616
      }
 
617
      return (first != null ? first : second);
 
618
    }
 
619
 
 
620
    public synchronized void close() {
 
621
      // Accelerate the GC a bit perhaps?
 
622
      this.kvsetIt = null;
 
623
      this.snapshotIt = null;
 
624
 
 
625
      this.kvsetNextRow = null;
 
626
      this.snapshotNextRow = null;
 
627
    }
 
628
  }
 
629
 
 
630
  public final static long FIXED_OVERHEAD = ClassSize.align(
 
631
      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
 
632
  
 
633
  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
 
634
      ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
 
635
      ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
 
636
      (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
637
 
 
638
  /*
 
639
   * Calculate how the MemStore size has changed.  Includes overhead of the
 
640
   * backing Map.
 
641
   * @param kv
 
642
   * @param notpresent True if the kv was NOT present in the set.
 
643
   * @return Size
 
644
   */
 
645
  long heapSizeChange(final KeyValue kv, final boolean notpresent) {
 
646
    return notpresent ? 
 
647
        ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
 
648
        0;
 
649
  }
 
650
  
 
651
  /**
 
652
   * Get the entire heap usage for this MemStore not including keys in the
 
653
   * snapshot.
 
654
   */
 
655
  @Override
 
656
  public long heapSize() {
 
657
    return size.get();
 
658
  }
 
659
  
 
660
  /**
 
661
   * Get the heap usage of KVs in this MemStore.
 
662
   */
 
663
  public long keySize() {
 
664
    return heapSize() - DEEP_OVERHEAD;
 
665
  }
 
666
 
 
667
  /**
 
668
   * Get an estimate of the number of key values stored in this store.
 
669
   *
 
670
   * @return the number of key/values in this memstore.
 
671
   */
 
672
  public int numKeyValues() {
 
673
    return kvset.size() + snapshot.size();
 
674
  }
 
675
 
 
676
  /**
 
677
   * Code to help figure if our approximation of object heap sizes is close
 
678
   * enough.  See hbase-900.  Fills memstores then waits so user can heap
 
679
   * dump and bring up resultant hprof in something like jprofiler which
 
680
   * allows you get 'deep size' on objects.
 
681
   * @param args main args
 
682
   */
 
683
  public static void main(String [] args) {
 
684
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
 
685
    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
 
686
      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
 
687
    LOG.info("vmInputArguments=" + runtime.getInputArguments());
 
688
    MemStore memstore1 = new MemStore();
 
689
    // TODO: x32 vs x64
 
690
    long size = 0;
 
691
    final int count = 10000;
 
692
    byte [] column = Bytes.toBytes("col:umn");
 
693
    for (int i = 0; i < count; i++) {
 
694
      // Give each its own ts
 
695
      size += memstore1.add(new KeyValue(Bytes.toBytes(i), column, i));
 
696
    }
 
697
    LOG.info("memstore1 estimated size=" + size);
 
698
    for (int i = 0; i < count; i++) {
 
699
      size += memstore1.add(new KeyValue(Bytes.toBytes(i), column, i));
 
700
    }
 
701
    LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
 
702
    // Make a variably sized memstore.
 
703
    MemStore memstore2 = new MemStore();
 
704
    for (int i = 0; i < count; i++) {
 
705
      size += memstore2.add(new KeyValue(Bytes.toBytes(i), column, i,
 
706
        new byte[i]));
 
707
    }
 
708
    LOG.info("memstore2 estimated size=" + size);
 
709
    final int seconds = 30;
 
710
    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
 
711
    for (int i = 0; i < seconds; i++) {
 
712
      // Thread.sleep(1000);
 
713
    }
 
714
    LOG.info("Exiting.");
 
715
  }
 
716
}