~durga/maus/rel709

« back to all changes in this revision

Viewing changes to src/common_py/framework/input_transform.py

  • Committer: Durga Rajaram
  • Date: 2013-10-01 00:19:57 UTC
  • mfrom: (659.1.74 rc)
  • Revision ID: durga@fnal.gov-20131001001957-iswih60vis9rodw0
Tags: MAUS-v0.7.1
MAUS-v0.7.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import os
21
21
import json
22
22
import socket
 
23
import time
 
24
 
 
25
import celery.states
 
26
import celery.task.control
23
27
 
24
28
from docstore.DocumentStore import DocumentStoreException
25
29
from framework.utilities import CeleryUtilities
104
108
        self.transformer = transformer
105
109
        self.config_doc = config_doc
106
110
        self.config = json.loads(self.config_doc)
 
111
        # Maximum number of tasks in queue
 
112
        self.timeout = self.config['reconstruction_timeout']
107
113
        # Unique ID for execution.
108
114
        self.config_id = "%s-%s" % (socket.gethostname(), os.getpid())
109
115
        # Current run number (from spills).
128
134
        else:
129
135
            self.collection = self.config["doc_collection_name"]
130
136
 
 
137
    def revoke_celery_tasks(self):
 
138
        """Revoke the celery tasks"""
 
139
        for task in self.celery_tasks:
 
140
            print "    Celery task %s status is %s" % (task.task_id,
 
141
                                                       task.state)
 
142
            celery.task.control.revoke(task.task_id, terminate=True)
 
143
            print "    REVOKED at start of new run"
 
144
        self.celery_tasks = []
 
145
 
131
146
    def poll_celery_tasks(self):
132
147
        """
133
 
        Wait for tasks currently being executed by Celery nodes to 
134
 
        complete.
 
148
        Loop over existing tasks; remove from the queue if they are completed
135
149
        @param self Object reference.
136
150
        @throws RabbitMQException if RabbitMQ cannot be contacted.
137
 
        @throws DocumentStoreException if there is a problem
138
 
        using the document store.
 
151
        @throws DocumentStoreException if there is a problem using the document
 
152
                store.
139
153
        """
140
 
        # Check each submitted Celery task.
141
 
        num_tasks = len(self.celery_tasks)
142
 
        current = 0
143
 
        while (current < num_tasks):
144
 
            result = self.celery_tasks[current]
 
154
        print 'Polling celery queue'
 
155
        temp_queue = []
 
156
        for result in self.celery_tasks:
145
157
            try:
146
 
                # Catch any RabbitMQ errors when querying status.
147
 
                is_successful = result.successful()
148
 
                is_failed = result.failed()
149
158
                result_result = result.result
150
 
                result_traceback = result.traceback
151
159
            except socket.error as exc:
152
160
                raise RabbitMQException(exc)
153
 
            if is_successful:
154
 
                self.celery_tasks.pop(current)
155
 
                num_tasks -= 1
156
 
                spill = result_result
 
161
            print "    Celery task %s status is %s" % (result.task_id,
 
162
                                                       result.state)
 
163
            if result.state in celery.states.READY_STATES: # means finished
 
164
                if result.successful():
 
165
                    spill = result_result
 
166
                    try:
 
167
                        if spill != None:
 
168
                            self.doc_store.put(self.collection,
 
169
                                str(self.spill_process_count), spill)
 
170
                    except Exception as exc:
 
171
                        raise DocumentStoreException(exc)
 
172
                else:
 
173
                    self.spill_fail_count += 1
157
174
                self.spill_process_count += 1
158
 
                print " Celery task %s SUCCESS " % (result.task_id)
159
 
                print "   SAVING to collection %s (with ID %s)" \
160
 
                    % (self.collection, self.spill_process_count)
161
 
                try:
162
 
                    self.doc_store.put(self.collection,
163
 
                        str(self.spill_process_count), spill)
164
 
                except Exception as exc:
165
 
                    raise DocumentStoreException(exc)
166
 
                self.print_counts()
167
 
            elif is_failed:
168
 
                self.celery_tasks.pop(current)
169
 
                self.spill_fail_count += 1
170
 
                num_tasks -= 1
171
 
                print " Celery task %s FAILED : %s : %s" \
172
 
                    % (result.task_id, result_result, result_traceback)
173
 
                self.print_counts()
174
 
            else:
175
 
                current += 1
 
175
                self.print_counts()
 
176
            else: #pending, retry, etc
 
177
                temp_queue.append(result)
 
178
        self.celery_tasks = temp_queue
 
179
        print len(self.celery_tasks), 'tasks remain in queue'
176
180
 
177
181
    def submit_spill_to_celery(self, spill):
178
182
        """
182
186
        @throws RabbitMQException if RabbitMQ cannot be contacted.
183
187
        """
184
188
        try:
185
 
            result = \
186
 
                execute_transform.delay(spill, self.config_id) # pylint:disable=E1101, C0301
 
189
            result = execute_transform.apply_async(
 
190
                      args=[spill, self.config_id]) # pylint:disable=E1101
187
191
        except socket.error as exc:
188
192
            raise RabbitMQException(exc)
189
 
        print "Task ID: %s" % result.task_id
 
193
        print "Submitting task %s" % result.task_id
190
194
        # Save asynchronous result object for checking task status. 
191
195
        self.celery_tasks.append(result)
192
196
 
199
203
        @param run_number New run number.
200
204
        """
201
205
        print "New run detected...waiting for current processing to complete"
202
 
        # Wait for current tasks, from previous run, to complete.
 
206
        # Force kill current tasks, from previous run, to complete.
203
207
        # This also ensures their timestamps < those of next run.
204
 
        while (len(self.celery_tasks) != 0):
205
 
            self.poll_celery_tasks()
 
208
        time.sleep(10)
 
209
        # Should get hit by timeout in mauscelery.tasks.py
 
210
        self.poll_celery_tasks()
 
211
        # Anything still alive must have stuck
 
212
        self.revoke_celery_tasks()
206
213
        self.run_number = run_number
207
214
        # Configure Celery nodes.
208
215
        print "---------- RUN %d ----------" % self.run_number
221
228
        @param run_number Old run number.
222
229
        @return None
223
230
        """
 
231
        #self.revoke_celery_tasks()
224
232
        run_footer_list = CeleryUtilities.death_celery(self.run_number)
225
233
        for footer in run_footer_list:
226
234
            self.submit_spill_to_celery(footer)