~christopher-hunt08/maus/beam_selection_development

« back to all changes in this revision

Viewing changes to src/common_py/framework/utilities.py

  • Committer: Chris Rogers
  • Date: 2012-10-13 18:43:00 UTC
  • mfrom: (663.6.137 merge)
  • mto: (663.6.204 merge)
  • mto: This revision was merged to the branch mainline in revision 680.
  • Revision ID: chris.rogers@stfc.ac.uk-20121013184300-ry9q81m45dmtgejr
Bring control room branch into line with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
19
19
 
20
20
import socket
 
21
import sys
21
22
 
22
23
from celery.task.control import discard_all
23
24
from celery.task.control import inspect
85
86
            run_number = spill["run_number"]
86
87
        return run_number
87
88
 
 
89
    @staticmethod
 
90
    def get_event_type(event):
 
91
        """
 
92
        Extract event type string from an event.
 
93
 
 
94
        @param event MAUS event document as a dictionary.
 
95
        @return event type (string) or None if none.
 
96
        """
 
97
        event_type = None
 
98
        if event.has_key("maus_event_type"):
 
99
            event_type = event["maus_event_type"]
 
100
        return event_type
 
101
 
 
102
 
 
103
 
88
104
class DocumentStoreUtilities: # pylint: disable=W0232
89
105
    """
90
106
    @class DocumentStoreUtilities.
142
158
        try:
143
159
            doc_store.connect(config) 
144
160
        except Exception as exc:
 
161
            sys.excepthook(*sys.exc_info())
145
162
            raise DocumentStoreException(exc)
146
163
        return doc_store
147
164
 
197
214
            raise RabbitMQException(exc)
198
215
 
199
216
    @staticmethod
200
 
    def birth_celery(transform, config, config_id, timeout = 1000):
 
217
    def birth_celery(transform, config, config_id, run_number, timeout = 1000):
201
218
        """
202
219
        Set new configuration and transforms in Celery nodes, and
203
220
        birth the transforms. An initial ping is done to
221
238
            response = broadcast("birth", arguments={
222
239
                "transform": transform, 
223
240
                "configuration": config,
224
 
                "config_id": config_id}, 
 
241
                "config_id": config_id,
 
242
                "run_number": run_number},
225
243
                reply=True, timeout=timeout, limit=num_nodes)
226
244
        except socket.error as exc:
227
245
            raise RabbitMQException(exc)
228
246
        CeleryUtilities.validate_celery_response(response)
 
247
        run_headers = []
 
248
        for machines in response:
 
249
            for name in machines.keys():
 
250
                run_headers += machines[name]["run_headers"]
 
251
        return run_headers
229
252
 
230
253
    @staticmethod
231
 
    def death_celery():
 
254
    def death_celery(run_number):
232
255
        """
233
256
        Call death on transforms in Celery nodes.
234
257
        @throws RabbitMQException if RabbitMQ cannot be contacted.
236
259
        nodes fails to death.
237
260
        """
238
261
        try:
239
 
            response = broadcast("death", reply=True)
 
262
            response = broadcast("death", arguments={"run_number":run_number},
 
263
                                 reply=True)
240
264
        except socket.error as exc:
241
265
            raise RabbitMQException(exc)
242
266
        CeleryUtilities.validate_celery_response(response)
 
267
        run_footers = []
 
268
        for machines in response:
 
269
            for name in machines.keys():
 
270
                run_footers += machines[name]["run_footers"]
 
271
        return run_footers
243
272
 
244
273
    @staticmethod
245
274
    def validate_celery_response(response):
255
284
        nodes raised an error.
256
285
        """
257
286
        failed_nodes = []
 
287
        print response
258
288
        for node in response:
259
289
            node_id = node.keys()[0]
260
290
            node_status = node[node_id]