~ubuntu-branches/ubuntu/saucy/python-happybase/saucy

« back to all changes in this revision

Viewing changes to happybase/table.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2013-05-30 13:56:42 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130530135642-tveld2y1dbkhmuv3
Tags: 0.6-1
* New upstream release (Closes: #712971).
* Ran wrap-and-sort.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
HappyBase table module.
 
3
"""
 
4
 
 
5
import logging
 
6
from numbers import Integral
 
7
from operator import attrgetter
 
8
from struct import Struct
 
9
 
 
10
from .hbase.ttypes import TScan
 
11
from .util import thrift_type_to_dict, str_increment
 
12
from .batch import Batch
 
13
 
 
14
logger = logging.getLogger(__name__)
 
15
 
 
16
make_cell = attrgetter('value')
 
17
make_cell_timestamp = attrgetter('value', 'timestamp')
 
18
pack_i64 = Struct('>q').pack
 
19
 
 
20
 
 
21
def make_row(cell_map, include_timestamp):
 
22
    """Make a row dict for a cell mapping like ttypes.TRowResult.columns."""
 
23
    cellfn = include_timestamp and make_cell_timestamp or make_cell
 
24
    return dict((cn, cellfn(cell)) for cn, cell in cell_map.iteritems())
 
25
 
 
26
 
 
27
class Table(object):
 
28
    """HBase table abstraction class.
 
29
 
 
30
    This class cannot be instantiated directly; use :py:meth:`Connection.table`
 
31
    instead.
 
32
    """
 
33
    def __init__(self, name, connection):
 
34
        self.name = name
 
35
        self.connection = connection
 
36
 
 
37
    def __repr__(self):
 
38
        return '<%s.%s name=%r>' % (
 
39
            __name__,
 
40
            self.__class__.__name__,
 
41
            self.name,
 
42
        )
 
43
 
 
44
    def families(self):
 
45
        """Retrieve the column families for this table.
 
46
 
 
47
        :return: Mapping from column family name to settings dict
 
48
        :rtype: dict
 
49
        """
 
50
        descriptors = self.connection.client.getColumnDescriptors(self.name)
 
51
        families = dict()
 
52
        for name, descriptor in descriptors.items():
 
53
            name = name[:-1]  # drop trailing ':'
 
54
            families[name] = thrift_type_to_dict(descriptor)
 
55
        return families
 
56
 
 
57
    def _column_family_names(self):
 
58
        """Retrieve the column family names for this table (internal use)"""
 
59
        return self.connection.client.getColumnDescriptors(self.name).keys()
 
60
 
 
61
    def regions(self):
 
62
        """Retrieve the regions for this table.
 
63
 
 
64
        :return: regions for this table
 
65
        :rtype: list of dicts
 
66
        """
 
67
        regions = self.connection.client.getTableRegions(self.name)
 
68
        return map(thrift_type_to_dict, regions)
 
69
 
 
70
    #
 
71
    # Data retrieval
 
72
    #
 
73
 
 
74
    def row(self, row, columns=None, timestamp=None, include_timestamp=False):
 
75
        """Retrieve a single row of data.
 
76
 
 
77
        This method retrieves the row with the row key specified in the `row`
 
78
        argument and returns the columns and values for this row as
 
79
        a dictionary.
 
80
 
 
81
        The `row` argument is the row key of the row. If the `columns` argument
 
82
        is specified, only the values for these columns will be returned
 
83
        instead of all available columns. The `columns` argument should be
 
84
        a list or tuple containing strings. Each name can be a column family,
 
85
        such as `cf1` or `cf1:` (the trailing colon is not required), or
 
86
        a column family with a qualifier, such as `cf1:col1`.
 
87
 
 
88
        If specified, the `timestamp` argument specifies the maximum version
 
89
        that results may have. The `include_timestamp` argument specifies
 
90
        whether cells are returned as single values or as `(value, timestamp)`
 
91
        tuples.
 
92
 
 
93
        :param str row: the row key
 
94
        :param list_or_tuple columns: list of columns (optional)
 
95
        :param int timestamp: timestamp (optional)
 
96
        :param bool include_timestamp: whether timestamps are returned
 
97
 
 
98
        :return: Mapping of columns (both qualifier and family) to values
 
99
        :rtype: dict
 
100
        """
 
101
        if columns is not None and not isinstance(columns, (tuple, list)):
 
102
            raise TypeError("'columns' must be a tuple or list")
 
103
 
 
104
        if timestamp is None:
 
105
            rows = self.connection.client.getRowWithColumns(
 
106
                self.name, row, columns)
 
107
        else:
 
108
            if not isinstance(timestamp, Integral):
 
109
                raise TypeError("'timestamp' must be an integer")
 
110
            rows = self.connection.client.getRowWithColumnsTs(
 
111
                self.name, row, columns, timestamp)
 
112
 
 
113
        if not rows:
 
114
            return {}
 
115
 
 
116
        return make_row(rows[0].columns, include_timestamp)
 
117
 
 
118
    def rows(self, rows, columns=None, timestamp=None,
 
119
             include_timestamp=False):
 
120
        """Retrieve multiple rows of data.
 
121
 
 
122
        This method retrieves the rows with the row keys specified in the
 
123
        `rows` argument, which should be should be a list (or tuple) of row
 
124
        keys. The return value is a list of `(row_key, row_dict)` tuples.
 
125
 
 
126
        The `columns`, `timestamp` and `include_timestamp` arguments behave
 
127
        exactly the same as for :py:meth:`row`.
 
128
 
 
129
        :param list rows: list of row keys
 
130
        :param list_or_tuple columns: list of columns (optional)
 
131
        :param int timestamp: timestamp (optional)
 
132
        :param bool include_timestamp: whether timestamps are returned
 
133
 
 
134
        :return: List of mappings (columns to values)
 
135
        :rtype: list of dicts
 
136
        """
 
137
        if columns is not None and not isinstance(columns, (tuple, list)):
 
138
            raise TypeError("'columns' must be a tuple or list")
 
139
 
 
140
        if not rows:
 
141
            # Avoid round-trip if the result is empty anyway
 
142
            return {}
 
143
 
 
144
        if timestamp is None:
 
145
            results = self.connection.client.getRowsWithColumns(
 
146
                self.name, rows, columns)
 
147
        else:
 
148
            if not isinstance(timestamp, Integral):
 
149
                raise TypeError("'timestamp' must be an integer")
 
150
 
 
151
            # Work-around a bug in the HBase Thrift server where the
 
152
            # timestamp is only applied if columns are specified, at
 
153
            # the cost of an extra round-trip.
 
154
            if columns is None:
 
155
                columns = self._column_family_names()
 
156
 
 
157
            results = self.connection.client.getRowsWithColumnsTs(
 
158
                self.name, rows, columns, timestamp)
 
159
 
 
160
        return [(r.row, make_row(r.columns, include_timestamp))
 
161
                for r in results]
 
162
 
 
163
    def cells(self, row, column, versions=None, timestamp=None,
 
164
              include_timestamp=False):
 
165
        """Retrieve multiple versions of a single cell from the table.
 
166
 
 
167
        This method retrieves multiple versions of a cell (if any).
 
168
 
 
169
        The `versions` argument defines how many cell versions to
 
170
        retrieve at most.
 
171
 
 
172
        The `timestamp` and `include_timestamp` arguments behave exactly the
 
173
        same as for :py:meth:`row`.
 
174
 
 
175
        :param str row: the row key
 
176
        :param str column: the column name
 
177
        :param int versions: the maximum number of versions to retrieve
 
178
        :param int timestamp: timestamp (optional)
 
179
        :param bool include_timestamp: whether timestamps are returned
 
180
 
 
181
        :return: cell values
 
182
        :rtype: list of values
 
183
        """
 
184
        if versions is None:
 
185
            versions = (2 ** 31) - 1  # Thrift type is i32
 
186
        elif not isinstance(versions, int):
 
187
            raise TypeError("'versions' parameter must be a number or None")
 
188
        elif versions < 1:
 
189
            raise ValueError(
 
190
                "'versions' parameter must be at least 1 (or None)")
 
191
 
 
192
        if timestamp is None:
 
193
            cells = self.connection.client.getVer(
 
194
                self.name, row, column, versions)
 
195
        else:
 
196
            if not isinstance(timestamp, Integral):
 
197
                raise TypeError("'timestamp' must be an integer")
 
198
            cells = self.connection.client.getVerTs(
 
199
                self.name, row, column, timestamp, versions)
 
200
 
 
201
        if include_timestamp:
 
202
            return map(make_cell_timestamp, cells)
 
203
        else:
 
204
            return map(make_cell, cells)
 
205
 
 
206
    def scan(self, row_start=None, row_stop=None, row_prefix=None,
 
207
             columns=None, filter=None, timestamp=None,
 
208
             include_timestamp=False, batch_size=1000, limit=None):
 
209
        """Create a scanner for data in the table.
 
210
 
 
211
        This method returns an iterable that can be used for looping over the
 
212
        matching rows. Scanners can be created in two ways:
 
213
 
 
214
        * The `row_start` and `row_stop` arguments specify the row keys where
 
215
          the scanner should start and stop. It does not matter whether the
 
216
          table contains any rows with the specified keys: the first row after
 
217
          `row_start` will be the first result, and the last row before
 
218
          `row_stop` will be the last result. Note that the start of the range
 
219
          is inclusive, while the end is exclusive.
 
220
 
 
221
          Both `row_start` and `row_stop` can be `None` to specify the start
 
222
          and the end of the table respectively. If both are omitted, a full
 
223
          table scan is done. Note that this usually results in severe
 
224
          performance problems.
 
225
 
 
226
        * Alternatively, if `row_prefix` is specified, only rows with row keys
 
227
          matching the prefix will be returned. If given, `row_start` and
 
228
          `row_stop` cannot be used.
 
229
 
 
230
        The `columns`, `timestamp` and `include_timestamp` arguments behave
 
231
        exactly the same as for :py:meth:`row`.
 
232
 
 
233
        The `filter` argument may be a filter string that will be applied at
 
234
        the server by the region servers.
 
235
 
 
236
        If `limit` is given, at most `limit` results will be returned.
 
237
 
 
238
        The `batch_size` argument specifies how many results should be
 
239
        retrieved per batch when retrieving results from the scanner. Only set
 
240
        this to a low value (or even 1) if your data is large, since a low
 
241
        batch size results in added round-trips to the server.
 
242
 
 
243
        **Compatibility note:** The `filter` argument is only available when
 
244
        using HBase 0.92 (or up). In HBase 0.90 compatibility mode, specifying
 
245
        a `filter` raises an exception.
 
246
 
 
247
        :param str row_start: the row key to start at (inclusive)
 
248
        :param str row_stop: the row key to stop at (exclusive)
 
249
        :param str row_prefix: a prefix of the row key that must match
 
250
        :param list_or_tuple columns: list of columns (optional)
 
251
        :param str filter: a filter string (optional)
 
252
        :param int timestamp: timestamp (optional)
 
253
        :param bool include_timestamp: whether timestamps are returned
 
254
        :param int batch_size: batch size for retrieving resuls
 
255
 
 
256
        :return: generator yielding the rows matching the scan
 
257
        :rtype: iterable of `(row_key, row_data)` tuples
 
258
        """
 
259
        if batch_size < 1:
 
260
            raise ValueError("'batch_size' must be >= 1")
 
261
 
 
262
        if limit is not None and limit < 1:
 
263
            raise ValueError("'limit' must be >= 1")
 
264
 
 
265
        if row_prefix is not None:
 
266
            if row_start is not None or row_stop is not None:
 
267
                raise TypeError(
 
268
                    "'row_prefix' cannot be combined with 'row_start' "
 
269
                    "or 'row_stop'")
 
270
 
 
271
            row_start = row_prefix
 
272
            row_stop = str_increment(row_prefix)
 
273
 
 
274
        if row_start is None:
 
275
            row_start = ''
 
276
 
 
277
        if self.connection.compat == '0.90':
 
278
            # The scannerOpenWithScan() Thrift function is not
 
279
            # available, so work around it as much as possible with the
 
280
            # other scannerOpen*() Thrift functions
 
281
 
 
282
            if filter is not None:
 
283
                raise NotImplementedError(
 
284
                    "'filter' is not supported in HBase 0.90")
 
285
 
 
286
            if row_stop is None:
 
287
                if timestamp is None:
 
288
                    scan_id = self.connection.client.scannerOpen(
 
289
                        self.name, row_start, columns)
 
290
                else:
 
291
                    scan_id = self.connection.client.scannerOpenTs(
 
292
                        self.name, row_start, columns, timestamp)
 
293
            else:
 
294
                if timestamp is None:
 
295
                    scan_id = self.connection.client.scannerOpenWithStop(
 
296
                        self.name, row_start, row_stop, columns)
 
297
                else:
 
298
                    scan_id = self.connection.client.scannerOpenWithStopTs(
 
299
                        self.name, row_start, row_stop, columns, timestamp)
 
300
 
 
301
        else:
 
302
            # The scan's caching size is set to the batch_size, so that
 
303
            # the HTable on the Java side retrieves rows from the region
 
304
            # servers in the same chunk sizes that it sends out over
 
305
            # Thrift.
 
306
            scan = TScan(
 
307
                startRow=row_start,
 
308
                stopRow=row_stop,
 
309
                timestamp=timestamp,
 
310
                columns=columns,
 
311
                caching=batch_size,
 
312
                filterString=filter,
 
313
            )
 
314
            scan_id = self.connection.client.scannerOpenWithScan(
 
315
                self.name, scan)
 
316
 
 
317
        logger.debug("Opened scanner (id=%d) on '%s'", scan_id, self.name)
 
318
 
 
319
        n_returned = n_fetched = 0
 
320
        try:
 
321
            while True:
 
322
                if limit is None:
 
323
                    how_many = batch_size
 
324
                else:
 
325
                    how_many = min(batch_size, limit - n_returned)
 
326
 
 
327
                if how_many == 1:
 
328
                    items = self.connection.client.scannerGet(scan_id)
 
329
                else:
 
330
                    items = self.connection.client.scannerGetList(
 
331
                        scan_id, how_many)
 
332
 
 
333
                n_fetched += len(items)
 
334
 
 
335
                for n_returned, item in enumerate(items, n_returned + 1):
 
336
                    yield item.row, make_row(item.columns, include_timestamp)
 
337
                    if limit is not None and n_returned == limit:
 
338
                        return
 
339
 
 
340
                # Avoid round-trip when exhausted
 
341
                if len(items) < how_many:
 
342
                    break
 
343
        finally:
 
344
            self.connection.client.scannerClose(scan_id)
 
345
            logger.debug(
 
346
                "Closed scanner (id=%d) on '%s' (%d returned, %d fetched)",
 
347
                scan_id, self.name, n_returned, n_fetched)
 
348
 
 
349
    #
 
350
    # Data manipulation
 
351
    #
 
352
 
 
353
    def put(self, row, data, timestamp=None):
 
354
        """Store data in the table.
 
355
 
 
356
        This method stores the data in the `data` argument for the row
 
357
        specified by `row`. The `data` argument is dictionary that maps columns
 
358
        to values. Column names must include a family and qualifier part, e.g.
 
359
        `cf:col`, though the qualifier part may be the empty string, e.g.
 
360
        `cf:`. The `timestamp` argument is optional.
 
361
 
 
362
        Note that, in many situations, :py:meth:`batch()` is a more appropriate
 
363
        method to manipulate data.
 
364
 
 
365
        :param str row: the row key
 
366
        :param dict data: the data to store
 
367
        :param int timestamp: timestamp (optional)
 
368
        """
 
369
        with self.batch(timestamp=timestamp) as batch:
 
370
            batch.put(row, data)
 
371
 
 
372
    def delete(self, row, columns=None, timestamp=None):
 
373
        """Delete data from the table.
 
374
 
 
375
        This method deletes all columns for the row specified by `row`, or only
 
376
        some columns if the `columns` argument is specified.
 
377
 
 
378
        Note that, in many situations, :py:meth:`batch()` is a more appropriate
 
379
        method to manipulate data.
 
380
 
 
381
        :param str row: the row key
 
382
        :param list_or_tuple columns: list of columns (optional)
 
383
        :param int timestamp: timestamp (optional)
 
384
        """
 
385
        if columns is None:
 
386
            if timestamp is None:
 
387
                self.connection.client.deleteAllRow(self.name, row)
 
388
            else:
 
389
                self.connection.client.deleteAllRowTs(
 
390
                    self.name, row, timestamp)
 
391
        else:
 
392
            with self.batch(timestamp=timestamp) as batch:
 
393
                batch.delete(row, columns)
 
394
 
 
395
    def batch(self, timestamp=None, batch_size=None, transaction=False):
 
396
        """Create a new batch operation for this table.
 
397
 
 
398
        This method returns a new :py:class:`Batch` instance that can be used
 
399
        for mass data manipulation. The `timestamp` argument applies to all
 
400
        puts and deletes on the batch.
 
401
 
 
402
        If given, the `batch_size` argument specifies the maximum batch size
 
403
        after which the batch should send the mutations to the server. By
 
404
        default this is unbounded.
 
405
 
 
406
        The `transaction` argument specifies whether the returned
 
407
        :py:class:`Batch` instance should act in a transaction-like manner when
 
408
        used as context manager in a ``with`` block of code. The `transaction`
 
409
        flag cannot be used in combination with `batch_size`.
 
410
 
 
411
        :param bool transaction: whether this batch should behave like
 
412
                                 a transaction (only useful when used as a
 
413
                                 context manager)
 
414
        :param int batch_size: batch size (optional)
 
415
        :param int timestamp: timestamp (optional)
 
416
 
 
417
        :return: Batch instance
 
418
        :rtype: :py:class:`Batch`
 
419
        """
 
420
        kwargs = locals().copy()
 
421
        del kwargs['self']
 
422
        return Batch(table=self, **kwargs)
 
423
 
 
424
    #
 
425
    # Atomic counters
 
426
    #
 
427
 
 
428
    def counter_get(self, row, column):
 
429
        """Retrieve the current value of a counter column.
 
430
 
 
431
        This method retrieves the current value of a counter column. If the
 
432
        counter column does not exist, this function initialises it to `0`.
 
433
 
 
434
        Note that application code should *never* store a incremented or
 
435
        decremented counter value directly; use the atomic
 
436
        :py:meth:`Table.counter_inc` and :py:meth:`Table.counter_dec` methods
 
437
        for that.
 
438
 
 
439
        :param str row: the row key
 
440
        :param str column: the column name
 
441
 
 
442
        :return: counter value
 
443
        :rtype: int
 
444
        """
 
445
        # Don't query directly, but increment with value=0 so that the counter
 
446
        # is correctly initialised if didn't exist yet.
 
447
        return self.counter_inc(row, column, value=0)
 
448
 
 
449
    def counter_set(self, row, column, value=0):
 
450
        """Set a counter column to a specific value.
 
451
 
 
452
        This method stores a 64-bit signed integer value in the specified
 
453
        column.
 
454
 
 
455
        Note that application code should *never* store a incremented or
 
456
        decremented counter value directly; use the atomic
 
457
        :py:meth:`Table.counter_inc` and :py:meth:`Table.counter_dec` methods
 
458
        for that.
 
459
 
 
460
        :param str row: the row key
 
461
        :param str column: the column name
 
462
        :param int value: the counter value to set
 
463
        """
 
464
        self.put(row, {column: pack_i64(value)})
 
465
 
 
466
    def counter_inc(self, row, column, value=1):
 
467
        """Atomically increment (or decrements) a counter column.
 
468
 
 
469
        This method atomically increments or decrements a counter column in the
 
470
        row specified by `row`. The `value` argument specifies how much the
 
471
        counter should be incremented (for positive values) or decremented (for
 
472
        negative values). If the counter column did not exist, it is
 
473
        automatically initialised to 0 before incrementing it.
 
474
 
 
475
        :param str row: the row key
 
476
        :param str column: the column name
 
477
        :param int value: the amount to increment or decrement by (optional)
 
478
 
 
479
        :return: counter value after incrementing
 
480
        :rtype: int
 
481
        """
 
482
        return self.connection.client.atomicIncrement(
 
483
            self.name, row, column, value)
 
484
 
 
485
    def counter_dec(self, row, column, value=1):
 
486
        """Atomically decrement (or increments) a counter column.
 
487
 
 
488
        This method is a shortcut for calling :py:meth:`Table.counter_inc` with
 
489
        the value negated.
 
490
 
 
491
        :return: counter value after decrementing
 
492
        :rtype: int
 
493
        """
 
494
        return self.counter_inc(row, column, -value)