~ubuntu-branches/ubuntu/trusty/python-happybase/trusty

« back to all changes in this revision

Viewing changes to happybase/batch.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2013-05-30 13:56:42 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130530135642-tveld2y1dbkhmuv3
Tags: 0.6-1
* New upstream release (Closes: #712971).
* Ran wrap-and-sort.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
HappyBase Batch module.
 
3
"""
 
4
 
 
5
from collections import defaultdict
 
6
import logging
 
7
from numbers import Integral
 
8
 
 
9
from .hbase.ttypes import BatchMutation, Mutation
 
10
 
 
11
logger = logging.getLogger(__name__)
 
12
 
 
13
 
 
14
class Batch(object):
 
15
    """Batch mutation class.
 
16
 
 
17
    This class cannot be instantiated directly; use :py:meth:`Table.batch`
 
18
    instead.
 
19
    """
 
20
    def __init__(self, table, timestamp=None, batch_size=None,
 
21
                 transaction=False):
 
22
        """Initialise a new Batch instance."""
 
23
        if not (timestamp is None or isinstance(timestamp, Integral)):
 
24
            raise TypeError("'timestamp' must be an integer or None")
 
25
 
 
26
        if batch_size is not None:
 
27
            if transaction:
 
28
                raise TypeError("'transaction' cannot be used when "
 
29
                                "'batch_size' is specified")
 
30
            if not batch_size > 0:
 
31
                raise ValueError("'batch_size' must be > 0")
 
32
 
 
33
        self._table = table
 
34
        self._batch_size = batch_size
 
35
        self._timestamp = timestamp
 
36
        self._transaction = transaction
 
37
        self._families = None
 
38
        self._reset_mutations()
 
39
 
 
40
    def _reset_mutations(self):
 
41
        """Reset the internal mutation buffer."""
 
42
        self._mutations = defaultdict(list)
 
43
        self._mutation_count = 0
 
44
 
 
45
    def send(self):
 
46
        """Send the batch to the server."""
 
47
        bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
 
48
        if not bms:
 
49
            return
 
50
 
 
51
        logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
 
52
                     self._table.name, self._mutation_count, len(bms))
 
53
        if self._timestamp is None:
 
54
            self._table.connection.client.mutateRows(self._table.name, bms)
 
55
        else:
 
56
            self._table.connection.client.mutateRowsTs(
 
57
                self._table.name, bms, self._timestamp)
 
58
 
 
59
        self._reset_mutations()
 
60
 
 
61
    #
 
62
    # Mutation methods
 
63
    #
 
64
 
 
65
    def put(self, row, data):
 
66
        """Store data in the table.
 
67
 
 
68
        See :py:meth:`Table.put` for a description of the `row` and `data`
 
69
        arguments.
 
70
        """
 
71
        self._mutations[row].extend(
 
72
            Mutation(isDelete=False, column=column, value=value)
 
73
            for column, value in data.iteritems())
 
74
 
 
75
        self._mutation_count += len(data)
 
76
        if self._batch_size and self._mutation_count >= self._batch_size:
 
77
            self.send()
 
78
 
 
79
    def delete(self, row, columns=None):
 
80
        """Delete data from the table.
 
81
 
 
82
        See :py:meth:`Table.delete` for a description of the `row` and
 
83
        `columns` arguments.
 
84
        """
 
85
        # Work-around Thrift API limitation: the mutation API can only
 
86
        # delete specified columns, not complete rows, so just list the
 
87
        # column families once and cache them for later use in the same
 
88
        # transaction.
 
89
        if columns is None:
 
90
            if self._families is None:
 
91
                self._families = self._table._column_family_names()
 
92
            columns = self._families
 
93
 
 
94
        self._mutations[row].extend(
 
95
            Mutation(isDelete=True, column=column) for column in columns)
 
96
 
 
97
        self._mutation_count += len(columns)
 
98
        if self._batch_size and self._mutation_count >= self._batch_size:
 
99
            self.send()
 
100
 
 
101
    #
 
102
    # Context manager methods
 
103
    #
 
104
 
 
105
    def __enter__(self):
 
106
        """Called upon entering a ``with`` block"""
 
107
        return self
 
108
 
 
109
    def __exit__(self, exc_type, exc_value, traceback):
 
110
        """Called upon exiting a ``with`` block"""
 
111
        # If the 'with' block raises an exception, the batch will not be
 
112
        # sent to the server.
 
113
        if self._transaction and exc_type is not None:
 
114
            return
 
115
 
 
116
        self.send()