2
MAUS framework utilities module.
5
# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
7
# MAUS is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
12
# MAUS is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
17
# You should have received a copy of the GNU General Public License
18
# along with MAUS. If not, see <http://www.gnu.org/licenses/>.
22
from celery.task.control import discard_all
23
from celery.task.control import inspect
24
from celery.task.control import broadcast
25
from docstore.DocumentStore import DocumentStore
26
from docstore.DocumentStore import DocumentStoreException
28
class DataflowUtilities: # pylint: disable=W0232
30
@class DataflowUtilities.
31
Dataflow-related utility functions.
35
def buffer_input(input_emitter, number_of_inputs):
37
Buffer an input stream of strings by only reading up to the
38
first N inputs into memory. Returns an array of inputs.
40
@param input_emitter Input stream of strings.
41
@param number_of_inputs Number of inputs to read.
42
@return array of number_of_inputs inputs.
45
for i in range(number_of_inputs): # pylint: disable=W0612
47
value = next(input_emitter)
48
my_buffer.append(value.encode('ascii'))
54
def is_start_of_run(spill):
56
Return true if spill represents a start of a run i.e. it has
57
"daq_event_type" with value "start_of_run".
58
@param spill Spill document as a dictionary.
59
@return True or False.
61
return (spill.has_key("daq_event_type") and
62
spill["daq_event_type"] == "start_of_run")
65
def is_end_of_run(spill):
67
Return true if spill represents a end of a run i.e. it has
68
"daq_event_type" with value "end_of_run".
69
@param spill Spill document as a dictionary.
70
@return True or False.
72
return (spill.has_key("daq_event_type") and
73
spill["daq_event_type"] == "end_of_run")
76
def get_run_number(spill):
78
Extract run number from spill. Assumes spill has a
80
@param spill Spill document as a dictionary.
81
@return run number or None if none.
84
if spill.has_key("run_num"):
85
run_number = spill["run_num"]
88
class DocumentStoreUtilities: # pylint: disable=W0232
90
@class DocumentStoreUtilities.
91
Document store-related utility functions.
95
def setup_doc_store(config):
97
Set up document store. The document store is configured via
98
the following parameter in the JSON configuration:
100
-doc_store_class - document store class name. This value
101
is assumed to be of form "modulename.classname".
103
It is assumed that the class can be loaded by execution of
104
statements analogous to "import classname from modulename".
105
The "connect" method of the document store is then invoked to
106
initialise the connection.
108
@param config JSON configuration.
109
@return document store.
110
@throws KeyError. If there is no doc_store_class in the JSON
112
@throws ValueError. If the module name do not exist,
113
or the class is not in the module.
114
@throws TypeError If doc_store_class does not sub-class
115
docstore.DocumentStore.DocumentStore.
116
@throws DocumentStoreException if there is a problem connecting
117
to the document store.
119
# Get class and bust up into module path and class name.
120
doc_store_class = config["doc_store_class"]
121
path = doc_store_class.split(".")
122
doc_store_class = path.pop()
123
import_path = ".".join(path)
125
module_object = __import__(import_path)
127
# Dynamically import the module.
128
for sub_module in path:
129
module_object = getattr(module_object, sub_module)
131
class_object = getattr(module_object, doc_store_class)
132
# Create instance of class object.
133
doc_store = class_object()
135
raise ValueError("Module %s does not exist or has a problem" \
138
if (not isinstance(doc_store, DocumentStore)):
139
raise TypeError("Document store class %s does not implement %s" \
140
% (doc_store_class, DocumentStore))
141
# Connect to the document store.
143
doc_store.connect(config)
144
except Exception as exc:
145
raise DocumentStoreException(exc)
149
def create_doc_store_collection(doc_store, collection):
151
Create a collection in the document store. If it exists already
153
@param doc_store Document store.
154
@param collection Collection name.
155
@throws DocumentStoreException if there is a problem.
158
if (doc_store.has_collection(collection)):
159
doc_store.delete_collection(collection)
160
doc_store.create_collection(collection)
161
except Exception as exc:
162
raise DocumentStoreException(exc)
164
class CeleryUtilities: # pylint: disable=W0232
166
@class CeleryUtilities.
167
Celery-related utility functions.
171
def ping_celery_nodes():
173
Check for active Celery nodes.
174
@return number of active nodes.
175
@throws RabbitMQException if RabbitMQ cannot be contacted.
176
@throws NoCeleryNodeException if no Celery nodes.
178
inspection = inspect()
180
active_nodes = inspection.active()
181
except socket.error as exc:
182
raise RabbitMQException(exc)
183
if (active_nodes == None):
184
raise NoCeleryNodeException()
185
return len(active_nodes)
188
def discard_celery_jobs():
190
Discard all currently pending Celery jobs.
191
@throws RabbitMQException if RabbitMQ cannot be contacted.
192
@throws NoCeleryNodeException if no Celery nodes.
196
except socket.error as exc:
197
raise RabbitMQException(exc)
200
def birth_celery(transform, config, config_id, timeout = 1000):
202
Set new configuration and transforms in Celery nodes, and
203
birth the transforms. An initial ping is done to
204
identify the number of live nodes.
205
Each node is given up to 1000s to reply
206
but if they all reply then the method returns sooner.
207
@param transform Either a single name can be given - representing
208
a single transform - or a list of transforms - representing a
209
MapPyGroup. Sub-lists are treated as nested MapPyGroups. If None
210
then the current transform is deathed and rebirthed.
211
@param config Valid JSON configuration document
212
@param config_id Configuration ID from client.
213
@param timeout Time to wait for replies.
214
@return results from Celery.
215
@throws RabbitMQException if RabbitMQ cannot be contacted.
216
@throws CeleryNodeException if one or more Celery
217
nodes fails to configure or birth.
219
num_nodes = CeleryUtilities.ping_celery_nodes()
221
response = broadcast("birth", arguments={
222
"transform": transform,
223
"configuration": config,
224
"config_id": config_id},
225
reply=True, timeout=timeout, limit=num_nodes)
226
except socket.error as exc:
227
raise RabbitMQException(exc)
228
CeleryUtilities.validate_celery_response(response)
233
Call death on transforms in Celery nodes.
234
@throws RabbitMQException if RabbitMQ cannot be contacted.
235
@throws CeleryNodeException if one or more Celery
236
nodes fails to death.
239
response = broadcast("death", reply=True)
240
except socket.error as exc:
241
raise RabbitMQException(exc)
242
CeleryUtilities.validate_celery_response(response)
245
def validate_celery_response(response):
247
Validate the status from a Celery confgure or death
248
broadcast. Give a dictionary of node statuses, indexed
249
by node name, if any status has a "status" key
250
with value "error" then an exception is thrown. All
251
the nodes are checked and any exception records all the
253
@param response Status from Celery broadcast.
254
@throws CeleryNodeException if one or more Celery
255
nodes raised an error.
258
for node in response:
259
node_id = node.keys()[0]
260
node_status = node[node_id]
261
if node_status["status"] == "error":
262
failed_nodes.append((node_id, node_status))
263
if (len(failed_nodes) > 0):
264
raise CeleryNodeException(failed_nodes)
266
class RabbitMQException(Exception):
267
""" Exception raised if RabbitMQ cannot be contacted. """
269
def __init__(self, exception):
271
Constructor. Overrides Exception.__init__.
272
@param self Object reference.
273
@param exception Wrapped exception
275
Exception.__init__(self)
276
self.exception = exception
280
Return string representation. Overrides Exception.__str__.
281
@param self Object reference.
284
return "RabbitMQ cannot be contacted. Problem is %s" \
287
class NoCeleryNodeException(Exception):
288
""" Exception raised if no Celery nodes are available. """
292
Return string representation. Overrides Exception.__str__.
293
@param self Object reference.
296
return "No Celery nodes are available"
298
class CeleryNodeException(Exception):
300
Exception raised if Celery nodes fail to configure, birth
304
def __init__(self, node_status = []): # pylint:disable = W0102
306
Constructor. Overrides Exception.__init__.
307
@param self Object reference.
308
@param node_status List of tuples of form (node_id, node_status)
309
with failure information.
311
Exception.__init__(self)
312
self.node_status = node_status
316
Return string representation. Overrides Exception.__str__.
317
@param self Object reference.
320
return "Celery node(s) failed to configure: %s" \