2
HappyBase Batch module.
5
from collections import defaultdict
7
from numbers import Integral
9
from .hbase.ttypes import BatchMutation, Mutation
11
logger = logging.getLogger(__name__)
15
"""Batch mutation class.
17
This class cannot be instantiated directly; use :py:meth:`Table.batch`
20
def __init__(self, table, timestamp=None, batch_size=None,
22
"""Initialise a new Batch instance."""
23
if not (timestamp is None or isinstance(timestamp, Integral)):
24
raise TypeError("'timestamp' must be an integer or None")
26
if batch_size is not None:
28
raise TypeError("'transaction' cannot be used when "
29
"'batch_size' is specified")
30
if not batch_size > 0:
31
raise ValueError("'batch_size' must be > 0")
34
self._batch_size = batch_size
35
self._timestamp = timestamp
36
self._transaction = transaction
38
self._reset_mutations()
40
def _reset_mutations(self):
41
"""Reset the internal mutation buffer."""
42
self._mutations = defaultdict(list)
43
self._mutation_count = 0
46
"""Send the batch to the server."""
47
bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
51
logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
52
self._table.name, self._mutation_count, len(bms))
53
if self._timestamp is None:
54
self._table.connection.client.mutateRows(self._table.name, bms)
56
self._table.connection.client.mutateRowsTs(
57
self._table.name, bms, self._timestamp)
59
self._reset_mutations()
65
def put(self, row, data):
66
"""Store data in the table.
68
See :py:meth:`Table.put` for a description of the `row` and `data`
71
self._mutations[row].extend(
72
Mutation(isDelete=False, column=column, value=value)
73
for column, value in data.iteritems())
75
self._mutation_count += len(data)
76
if self._batch_size and self._mutation_count >= self._batch_size:
79
def delete(self, row, columns=None):
80
"""Delete data from the table.
82
See :py:meth:`Table.delete` for a description of the `row` and
85
# Work-around Thrift API limitation: the mutation API can only
86
# delete specified columns, not complete rows, so just list the
87
# column families once and cache them for later use in the same
90
if self._families is None:
91
self._families = self._table._column_family_names()
92
columns = self._families
94
self._mutations[row].extend(
95
Mutation(isDelete=True, column=column) for column in columns)
97
self._mutation_count += len(columns)
98
if self._batch_size and self._mutation_count >= self._batch_size:
102
# Context manager methods
106
"""Called upon entering a ``with`` block"""
109
def __exit__(self, exc_type, exc_value, traceback):
110
"""Called upon exiting a ``with`` block"""
111
# If the 'with' block raises an exception, the batch will not be
112
# sent to the server.
113
if self._transaction and exc_type is not None: