2
HappyBase table module.
6
from numbers import Integral
7
from operator import attrgetter
8
from struct import Struct
10
from .hbase.ttypes import TScan
11
from .util import thrift_type_to_dict, str_increment
12
from .batch import Batch
14
logger = logging.getLogger(__name__)
16
make_cell = attrgetter('value')
17
make_cell_timestamp = attrgetter('value', 'timestamp')
18
pack_i64 = Struct('>q').pack
21
def make_row(cell_map, include_timestamp):
22
"""Make a row dict for a cell mapping like ttypes.TRowResult.columns."""
23
cellfn = include_timestamp and make_cell_timestamp or make_cell
24
return dict((cn, cellfn(cell)) for cn, cell in cell_map.iteritems())
28
"""HBase table abstraction class.
30
This class cannot be instantiated directly; use :py:meth:`Connection.table`
33
def __init__(self, name, connection):
35
self.connection = connection
38
return '<%s.%s name=%r>' % (
40
self.__class__.__name__,
45
"""Retrieve the column families for this table.
47
:return: Mapping from column family name to settings dict
50
descriptors = self.connection.client.getColumnDescriptors(self.name)
52
for name, descriptor in descriptors.items():
53
name = name[:-1] # drop trailing ':'
54
families[name] = thrift_type_to_dict(descriptor)
57
def _column_family_names(self):
58
"""Retrieve the column family names for this table (internal use)"""
59
return self.connection.client.getColumnDescriptors(self.name).keys()
62
"""Retrieve the regions for this table.
64
:return: regions for this table
67
regions = self.connection.client.getTableRegions(self.name)
68
return map(thrift_type_to_dict, regions)
74
def row(self, row, columns=None, timestamp=None, include_timestamp=False):
75
"""Retrieve a single row of data.
77
This method retrieves the row with the row key specified in the `row`
78
argument and returns the columns and values for this row as
81
The `row` argument is the row key of the row. If the `columns` argument
82
is specified, only the values for these columns will be returned
83
instead of all available columns. The `columns` argument should be
84
a list or tuple containing strings. Each name can be a column family,
85
such as `cf1` or `cf1:` (the trailing colon is not required), or
86
a column family with a qualifier, such as `cf1:col1`.
88
If specified, the `timestamp` argument specifies the maximum version
89
that results may have. The `include_timestamp` argument specifies
90
whether cells are returned as single values or as `(value, timestamp)`
93
:param str row: the row key
94
:param list_or_tuple columns: list of columns (optional)
95
:param int timestamp: timestamp (optional)
96
:param bool include_timestamp: whether timestamps are returned
98
:return: Mapping of columns (both qualifier and family) to values
101
if columns is not None and not isinstance(columns, (tuple, list)):
102
raise TypeError("'columns' must be a tuple or list")
104
if timestamp is None:
105
rows = self.connection.client.getRowWithColumns(
106
self.name, row, columns)
108
if not isinstance(timestamp, Integral):
109
raise TypeError("'timestamp' must be an integer")
110
rows = self.connection.client.getRowWithColumnsTs(
111
self.name, row, columns, timestamp)
116
return make_row(rows[0].columns, include_timestamp)
118
def rows(self, rows, columns=None, timestamp=None,
119
include_timestamp=False):
120
"""Retrieve multiple rows of data.
122
This method retrieves the rows with the row keys specified in the
123
`rows` argument, which should be should be a list (or tuple) of row
124
keys. The return value is a list of `(row_key, row_dict)` tuples.
126
The `columns`, `timestamp` and `include_timestamp` arguments behave
127
exactly the same as for :py:meth:`row`.
129
:param list rows: list of row keys
130
:param list_or_tuple columns: list of columns (optional)
131
:param int timestamp: timestamp (optional)
132
:param bool include_timestamp: whether timestamps are returned
134
:return: List of mappings (columns to values)
135
:rtype: list of dicts
137
if columns is not None and not isinstance(columns, (tuple, list)):
138
raise TypeError("'columns' must be a tuple or list")
141
# Avoid round-trip if the result is empty anyway
144
if timestamp is None:
145
results = self.connection.client.getRowsWithColumns(
146
self.name, rows, columns)
148
if not isinstance(timestamp, Integral):
149
raise TypeError("'timestamp' must be an integer")
151
# Work-around a bug in the HBase Thrift server where the
152
# timestamp is only applied if columns are specified, at
153
# the cost of an extra round-trip.
155
columns = self._column_family_names()
157
results = self.connection.client.getRowsWithColumnsTs(
158
self.name, rows, columns, timestamp)
160
return [(r.row, make_row(r.columns, include_timestamp))
163
def cells(self, row, column, versions=None, timestamp=None,
164
include_timestamp=False):
165
"""Retrieve multiple versions of a single cell from the table.
167
This method retrieves multiple versions of a cell (if any).
169
The `versions` argument defines how many cell versions to
172
The `timestamp` and `include_timestamp` arguments behave exactly the
173
same as for :py:meth:`row`.
175
:param str row: the row key
176
:param str column: the column name
177
:param int versions: the maximum number of versions to retrieve
178
:param int timestamp: timestamp (optional)
179
:param bool include_timestamp: whether timestamps are returned
182
:rtype: list of values
185
versions = (2 ** 31) - 1 # Thrift type is i32
186
elif not isinstance(versions, int):
187
raise TypeError("'versions' parameter must be a number or None")
190
"'versions' parameter must be at least 1 (or None)")
192
if timestamp is None:
193
cells = self.connection.client.getVer(
194
self.name, row, column, versions)
196
if not isinstance(timestamp, Integral):
197
raise TypeError("'timestamp' must be an integer")
198
cells = self.connection.client.getVerTs(
199
self.name, row, column, timestamp, versions)
201
if include_timestamp:
202
return map(make_cell_timestamp, cells)
204
return map(make_cell, cells)
206
def scan(self, row_start=None, row_stop=None, row_prefix=None,
207
columns=None, filter=None, timestamp=None,
208
include_timestamp=False, batch_size=1000, limit=None):
209
"""Create a scanner for data in the table.
211
This method returns an iterable that can be used for looping over the
212
matching rows. Scanners can be created in two ways:
214
* The `row_start` and `row_stop` arguments specify the row keys where
215
the scanner should start and stop. It does not matter whether the
216
table contains any rows with the specified keys: the first row after
217
`row_start` will be the first result, and the last row before
218
`row_stop` will be the last result. Note that the start of the range
219
is inclusive, while the end is exclusive.
221
Both `row_start` and `row_stop` can be `None` to specify the start
222
and the end of the table respectively. If both are omitted, a full
223
table scan is done. Note that this usually results in severe
224
performance problems.
226
* Alternatively, if `row_prefix` is specified, only rows with row keys
227
matching the prefix will be returned. If given, `row_start` and
228
`row_stop` cannot be used.
230
The `columns`, `timestamp` and `include_timestamp` arguments behave
231
exactly the same as for :py:meth:`row`.
233
The `filter` argument may be a filter string that will be applied at
234
the server by the region servers.
236
If `limit` is given, at most `limit` results will be returned.
238
The `batch_size` argument specifies how many results should be
239
retrieved per batch when retrieving results from the scanner. Only set
240
this to a low value (or even 1) if your data is large, since a low
241
batch size results in added round-trips to the server.
243
**Compatibility note:** The `filter` argument is only available when
244
using HBase 0.92 (or up). In HBase 0.90 compatibility mode, specifying
245
a `filter` raises an exception.
247
:param str row_start: the row key to start at (inclusive)
248
:param str row_stop: the row key to stop at (exclusive)
249
:param str row_prefix: a prefix of the row key that must match
250
:param list_or_tuple columns: list of columns (optional)
251
:param str filter: a filter string (optional)
252
:param int timestamp: timestamp (optional)
253
:param bool include_timestamp: whether timestamps are returned
254
:param int batch_size: batch size for retrieving resuls
256
:return: generator yielding the rows matching the scan
257
:rtype: iterable of `(row_key, row_data)` tuples
260
raise ValueError("'batch_size' must be >= 1")
262
if limit is not None and limit < 1:
263
raise ValueError("'limit' must be >= 1")
265
if row_prefix is not None:
266
if row_start is not None or row_stop is not None:
268
"'row_prefix' cannot be combined with 'row_start' "
271
row_start = row_prefix
272
row_stop = str_increment(row_prefix)
274
if row_start is None:
277
if self.connection.compat == '0.90':
278
# The scannerOpenWithScan() Thrift function is not
279
# available, so work around it as much as possible with the
280
# other scannerOpen*() Thrift functions
282
if filter is not None:
283
raise NotImplementedError(
284
"'filter' is not supported in HBase 0.90")
287
if timestamp is None:
288
scan_id = self.connection.client.scannerOpen(
289
self.name, row_start, columns)
291
scan_id = self.connection.client.scannerOpenTs(
292
self.name, row_start, columns, timestamp)
294
if timestamp is None:
295
scan_id = self.connection.client.scannerOpenWithStop(
296
self.name, row_start, row_stop, columns)
298
scan_id = self.connection.client.scannerOpenWithStopTs(
299
self.name, row_start, row_stop, columns, timestamp)
302
# The scan's caching size is set to the batch_size, so that
303
# the HTable on the Java side retrieves rows from the region
304
# servers in the same chunk sizes that it sends out over
314
scan_id = self.connection.client.scannerOpenWithScan(
317
logger.debug("Opened scanner (id=%d) on '%s'", scan_id, self.name)
319
n_returned = n_fetched = 0
323
how_many = batch_size
325
how_many = min(batch_size, limit - n_returned)
328
items = self.connection.client.scannerGet(scan_id)
330
items = self.connection.client.scannerGetList(
333
n_fetched += len(items)
335
for n_returned, item in enumerate(items, n_returned + 1):
336
yield item.row, make_row(item.columns, include_timestamp)
337
if limit is not None and n_returned == limit:
340
# Avoid round-trip when exhausted
341
if len(items) < how_many:
344
self.connection.client.scannerClose(scan_id)
346
"Closed scanner (id=%d) on '%s' (%d returned, %d fetched)",
347
scan_id, self.name, n_returned, n_fetched)
353
def put(self, row, data, timestamp=None):
354
"""Store data in the table.
356
This method stores the data in the `data` argument for the row
357
specified by `row`. The `data` argument is dictionary that maps columns
358
to values. Column names must include a family and qualifier part, e.g.
359
`cf:col`, though the qualifier part may be the empty string, e.g.
360
`cf:`. The `timestamp` argument is optional.
362
Note that, in many situations, :py:meth:`batch()` is a more appropriate
363
method to manipulate data.
365
:param str row: the row key
366
:param dict data: the data to store
367
:param int timestamp: timestamp (optional)
369
with self.batch(timestamp=timestamp) as batch:
372
def delete(self, row, columns=None, timestamp=None):
373
"""Delete data from the table.
375
This method deletes all columns for the row specified by `row`, or only
376
some columns if the `columns` argument is specified.
378
Note that, in many situations, :py:meth:`batch()` is a more appropriate
379
method to manipulate data.
381
:param str row: the row key
382
:param list_or_tuple columns: list of columns (optional)
383
:param int timestamp: timestamp (optional)
386
if timestamp is None:
387
self.connection.client.deleteAllRow(self.name, row)
389
self.connection.client.deleteAllRowTs(
390
self.name, row, timestamp)
392
with self.batch(timestamp=timestamp) as batch:
393
batch.delete(row, columns)
395
def batch(self, timestamp=None, batch_size=None, transaction=False):
396
"""Create a new batch operation for this table.
398
This method returns a new :py:class:`Batch` instance that can be used
399
for mass data manipulation. The `timestamp` argument applies to all
400
puts and deletes on the batch.
402
If given, the `batch_size` argument specifies the maximum batch size
403
after which the batch should send the mutations to the server. By
404
default this is unbounded.
406
The `transaction` argument specifies whether the returned
407
:py:class:`Batch` instance should act in a transaction-like manner when
408
used as context manager in a ``with`` block of code. The `transaction`
409
flag cannot be used in combination with `batch_size`.
411
:param bool transaction: whether this batch should behave like
412
a transaction (only useful when used as a
414
:param int batch_size: batch size (optional)
415
:param int timestamp: timestamp (optional)
417
:return: Batch instance
418
:rtype: :py:class:`Batch`
420
kwargs = locals().copy()
422
return Batch(table=self, **kwargs)
428
def counter_get(self, row, column):
429
"""Retrieve the current value of a counter column.
431
This method retrieves the current value of a counter column. If the
432
counter column does not exist, this function initialises it to `0`.
434
Note that application code should *never* store a incremented or
435
decremented counter value directly; use the atomic
436
:py:meth:`Table.counter_inc` and :py:meth:`Table.counter_dec` methods
439
:param str row: the row key
440
:param str column: the column name
442
:return: counter value
445
# Don't query directly, but increment with value=0 so that the counter
446
# is correctly initialised if didn't exist yet.
447
return self.counter_inc(row, column, value=0)
449
def counter_set(self, row, column, value=0):
450
"""Set a counter column to a specific value.
452
This method stores a 64-bit signed integer value in the specified
455
Note that application code should *never* store a incremented or
456
decremented counter value directly; use the atomic
457
:py:meth:`Table.counter_inc` and :py:meth:`Table.counter_dec` methods
460
:param str row: the row key
461
:param str column: the column name
462
:param int value: the counter value to set
464
self.put(row, {column: pack_i64(value)})
466
def counter_inc(self, row, column, value=1):
467
"""Atomically increment (or decrements) a counter column.
469
This method atomically increments or decrements a counter column in the
470
row specified by `row`. The `value` argument specifies how much the
471
counter should be incremented (for positive values) or decremented (for
472
negative values). If the counter column did not exist, it is
473
automatically initialised to 0 before incrementing it.
475
:param str row: the row key
476
:param str column: the column name
477
:param int value: the amount to increment or decrement by (optional)
479
:return: counter value after incrementing
482
return self.connection.client.atomicIncrement(
483
self.name, row, column, value)
485
def counter_dec(self, row, column, value=1):
486
"""Atomically decrement (or increments) a counter column.
488
This method is a shortcut for calling :py:meth:`Table.counter_inc` with
491
:return: counter value after decrementing
494
return self.counter_inc(row, column, -value)