~chris-rogers/maus/emr_mc_digitization

« back to all changes in this revision

Viewing changes to src/common_py/framework/utilities.py

Merge from jackson devel branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
MAUS framework utilities module.
 
3
"""
 
4
 
 
5
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
 
6
#
 
7
#  MAUS is free software: you can redistribute it and/or modify
 
8
#  it under the terms of the GNU General Public License as published by
 
9
#  the Free Software Foundation, either version 3 of the License, or
 
10
#  (at your option) any later version.
 
11
#
 
12
#  MAUS is distributed in the hope that it will be useful,
 
13
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
#  GNU General Public License for more details.
 
16
#
 
17
#  You should have received a copy of the GNU General Public License
 
18
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
 
19
 
 
20
import socket
 
21
 
 
22
from celery.task.control import discard_all
 
23
from celery.task.control import inspect
 
24
from celery.task.control import broadcast
 
25
from docstore.DocumentStore import DocumentStore
 
26
from docstore.DocumentStore import DocumentStoreException
 
27
 
 
28
class DataflowUtilities: # pylint: disable=W0232
 
29
    """
 
30
    @class DataflowUtilities.
 
31
    Dataflow-related utility functions.
 
32
    """
 
33
 
 
34
    @staticmethod
 
35
    def buffer_input(input_emitter, number_of_inputs):
 
36
        """
 
37
        Buffer an input stream of strings by only reading up to the
 
38
        first N inputs into memory. Returns an array of inputs. 
 
39
 
 
40
        @param input_emitter Input stream of strings.
 
41
        @param number_of_inputs Number of inputs to read.
 
42
        @return array of number_of_inputs inputs.
 
43
        """
 
44
        my_buffer = []
 
45
        for i in range(number_of_inputs):  # pylint: disable=W0612
 
46
            try:
 
47
                value = next(input_emitter)
 
48
                my_buffer.append(value.encode('ascii'))
 
49
            except StopIteration:
 
50
                return my_buffer
 
51
        return my_buffer
 
52
 
 
53
    @staticmethod
 
54
    def is_start_of_run(spill):
 
55
        """
 
56
        Return true if spill represents a start of a run i.e. it has
 
57
        "daq_event_type" with value "start_of_run".
 
58
        @param spill Spill document as a dictionary.
 
59
        @return True or False.
 
60
        """
 
61
        return (spill.has_key("daq_event_type") and
 
62
            spill["daq_event_type"] == "start_of_run")
 
63
 
 
64
    @staticmethod
 
65
    def is_end_of_run(spill):
 
66
        """
 
67
        Return true if spill represents a end of a run i.e. it has
 
68
        "daq_event_type" with value "end_of_run".
 
69
        @param spill Spill document as a dictionary.
 
70
        @return True or False.
 
71
        """
 
72
        return (spill.has_key("daq_event_type") and
 
73
            spill["daq_event_type"] == "end_of_run")
 
74
 
 
75
    @staticmethod
 
76
    def get_run_number(spill):
 
77
        """
 
78
        Extract run number from spill. Assumes spill has a 
 
79
        "run_num" entry.
 
80
        @param spill Spill document as a dictionary.
 
81
        @return run number or None if none.
 
82
        """
 
83
        run_number = None
 
84
        if spill.has_key("run_num"):
 
85
            run_number = spill["run_num"]
 
86
        return run_number
 
87
 
 
88
class DocumentStoreUtilities: # pylint: disable=W0232
 
89
    """
 
90
    @class DocumentStoreUtilities.
 
91
    Document store-related utility functions.
 
92
    """
 
93
 
 
94
    @staticmethod
 
95
    def setup_doc_store(config):
 
96
        """
 
97
        Set up document store. The document store is configured via
 
98
        the following parameter in the JSON configuration:
 
99
 
 
100
        -doc_store_class - document store class name. This value
 
101
         is assumed to be of form "modulename.classname".
 
102
 
 
103
        It is assumed that the class can be loaded by execution of 
 
104
        statements analogous to "import classname from modulename".
 
105
        The "connect" method of the document store is then invoked to 
 
106
        initialise the connection.
 
107
 
 
108
        @param config JSON configuration.
 
109
        @return document store.
 
110
        @throws KeyError. If there is no doc_store_class in the JSON
 
111
        configuration.
 
112
        @throws ValueError. If the module name do not exist,
 
113
        or the class is not in the module.
 
114
        @throws TypeError If doc_store_class does not sub-class
 
115
        docstore.DocumentStore.DocumentStore.
 
116
        @throws DocumentStoreException if there is a problem connecting
 
117
        to the document store.
 
118
        """
 
119
        # Get class and bust up into module path and class name.
 
120
        doc_store_class = config["doc_store_class"]
 
121
        path = doc_store_class.split(".")
 
122
        doc_store_class = path.pop()
 
123
        import_path = ".".join(path)
 
124
        try:
 
125
            module_object = __import__(import_path)
 
126
            path.pop(0)
 
127
            # Dynamically import the module.
 
128
            for sub_module in path:
 
129
                module_object = getattr(module_object, sub_module) 
 
130
            # Get class object.
 
131
            class_object = getattr(module_object, doc_store_class)
 
132
            # Create instance of class object.
 
133
            doc_store = class_object()
 
134
        except:
 
135
            raise ValueError("Module %s does not exist or has a problem" \
 
136
                % doc_store_class)
 
137
        # Validate.
 
138
        if (not isinstance(doc_store, DocumentStore)):
 
139
            raise TypeError("Document store class %s does not implement %s" \
 
140
                % (doc_store_class, DocumentStore))
 
141
        # Connect to the document store.
 
142
        try:
 
143
            doc_store.connect(config) 
 
144
        except Exception as exc:
 
145
            raise DocumentStoreException(exc)
 
146
        return doc_store
 
147
 
 
148
    @staticmethod
 
149
    def create_doc_store_collection(doc_store, collection):
 
150
        """
 
151
        Create a collection in the document store. If it exists already
 
152
        then it is deleted.
 
153
        @param doc_store Document store.
 
154
        @param collection Collection name.
 
155
        @throws DocumentStoreException if there is a problem.
 
156
        """
 
157
        try:
 
158
            if (doc_store.has_collection(collection)):
 
159
                doc_store.delete_collection(collection)
 
160
            doc_store.create_collection(collection)
 
161
        except Exception as exc:
 
162
            raise DocumentStoreException(exc)
 
163
 
 
164
class CeleryUtilities: # pylint: disable=W0232
 
165
    """
 
166
    @class CeleryUtilities.
 
167
    Celery-related utility functions.
 
168
    """
 
169
 
 
170
    @staticmethod
 
171
    def ping_celery_nodes():
 
172
        """
 
173
        Check for active Celery nodes.
 
174
        @return number of active nodes.
 
175
        @throws RabbitMQException if RabbitMQ cannot be contacted.
 
176
        @throws NoCeleryNodeException if no Celery nodes.
 
177
        """
 
178
        inspection = inspect()
 
179
        try:
 
180
            active_nodes = inspection.active()
 
181
        except socket.error as exc:
 
182
            raise RabbitMQException(exc)
 
183
        if (active_nodes == None):
 
184
            raise NoCeleryNodeException()
 
185
        return len(active_nodes)
 
186
 
 
187
    @staticmethod
 
188
    def discard_celery_jobs():
 
189
        """
 
190
        Discard all currently pending Celery jobs.
 
191
        @throws RabbitMQException if RabbitMQ cannot be contacted.
 
192
        @throws NoCeleryNodeException if no Celery nodes.
 
193
        """
 
194
        try:
 
195
            discard_all()
 
196
        except socket.error as exc:
 
197
            raise RabbitMQException(exc)
 
198
 
 
199
    @staticmethod
 
200
    def birth_celery(transform, config, config_id, timeout = 1000):
 
201
        """
 
202
        Set new configuration and transforms in Celery nodes, and
 
203
        birth the transforms. An initial ping is done to
 
204
        identify the number of live nodes. 
 
205
        Each node is given up to 1000s to reply
 
206
        but if they all reply then the method returns sooner.
 
207
        @param transform Either a single name can be given - representing
 
208
        a single transform - or a list of transforms - representing a
 
209
        MapPyGroup. Sub-lists are treated as nested MapPyGroups. If None
 
210
        then the current transform is deathed and rebirthed.  
 
211
        @param config Valid JSON configuration document
 
212
        @param config_id Configuration ID from client.
 
213
        @param timeout Time to wait for replies.
 
214
        @return results from Celery.
 
215
        @throws RabbitMQException if RabbitMQ cannot be contacted.
 
216
        @throws CeleryNodeException if one or more Celery 
 
217
        nodes fails to configure or birth.
 
218
        """
 
219
        num_nodes = CeleryUtilities.ping_celery_nodes()
 
220
        try:
 
221
            response = broadcast("birth", arguments={
 
222
                "transform": transform, 
 
223
                "configuration": config,
 
224
                "config_id": config_id}, 
 
225
                reply=True, timeout=timeout, limit=num_nodes)
 
226
        except socket.error as exc:
 
227
            raise RabbitMQException(exc)
 
228
        CeleryUtilities.validate_celery_response(response)
 
229
 
 
230
    @staticmethod
 
231
    def death_celery():
 
232
        """
 
233
        Call death on transforms in Celery nodes.
 
234
        @throws RabbitMQException if RabbitMQ cannot be contacted.
 
235
        @throws CeleryNodeException if one or more Celery 
 
236
        nodes fails to death.
 
237
        """
 
238
        try:
 
239
            response = broadcast("death", reply=True)
 
240
        except socket.error as exc:
 
241
            raise RabbitMQException(exc)
 
242
        CeleryUtilities.validate_celery_response(response)
 
243
 
 
244
    @staticmethod
 
245
    def validate_celery_response(response):
 
246
        """
 
247
        Validate the status from a Celery confgure or death 
 
248
        broadcast. Give a dictionary of node statuses, indexed
 
249
        by node name, if any status has a "status" key 
 
250
        with value "error" then an exception is thrown. All
 
251
        the nodes are checked and any exception records all the
 
252
        problematic nodes.
 
253
        @param response Status from Celery broadcast.
 
254
        @throws CeleryNodeException if one or more Celery 
 
255
        nodes raised an error.
 
256
        """
 
257
        failed_nodes = []
 
258
        for node in response:
 
259
            node_id = node.keys()[0]
 
260
            node_status = node[node_id]
 
261
            if node_status["status"] == "error":
 
262
                failed_nodes.append((node_id, node_status))
 
263
        if (len(failed_nodes) > 0):
 
264
            raise CeleryNodeException(failed_nodes)
 
265
 
 
266
class RabbitMQException(Exception):
 
267
    """ Exception raised if RabbitMQ cannot be contacted. """
 
268
 
 
269
    def __init__(self, exception):
 
270
        """
 
271
        Constructor. Overrides Exception.__init__.
 
272
        @param self Object reference.
 
273
        @param exception Wrapped exception
 
274
        """
 
275
        Exception.__init__(self)
 
276
        self.exception = exception
 
277
 
 
278
    def __str__(self):
 
279
        """
 
280
        Return string representation. Overrides Exception.__str__.
 
281
        @param self Object reference.
 
282
        @return string.
 
283
        """
 
284
        return "RabbitMQ cannot be contacted. Problem is %s" \
 
285
            % self.exception
 
286
 
 
287
class NoCeleryNodeException(Exception):
 
288
    """ Exception raised if no Celery nodes are available. """
 
289
 
 
290
    def __str__(self):
 
291
        """
 
292
        Return string representation. Overrides Exception.__str__.
 
293
        @param self Object reference.
 
294
        @return string.
 
295
        """
 
296
        return "No Celery nodes are available"
 
297
 
 
298
class CeleryNodeException(Exception):
 
299
    """ 
 
300
    Exception raised if Celery nodes fail to configure, birth
 
301
    or death.
 
302
    """
 
303
 
 
304
    def __init__(self, node_status = []):  # pylint:disable = W0102
 
305
        """
 
306
        Constructor. Overrides Exception.__init__.
 
307
        @param self Object reference.
 
308
        @param node_status List of tuples of form (node_id, node_status)
 
309
        with failure information.
 
310
        """
 
311
        Exception.__init__(self)
 
312
        self.node_status = node_status
 
313
 
 
314
    def __str__(self):
 
315
        """
 
316
        Return string representation. Overrides Exception.__str__.
 
317
        @param self Object reference.
 
318
        @return string.
 
319
        """
 
320
        return "Celery node(s) failed to configure: %s" \
 
321
            % self.node_status