104
108
self.transformer = transformer
105
109
self.config_doc = config_doc
106
110
self.config = json.loads(self.config_doc)
111
# Maximum number of tasks in queue
112
self.timeout = self.config['reconstruction_timeout']
107
113
# Unique ID for execution.
108
114
self.config_id = "%s-%s" % (socket.gethostname(), os.getpid())
109
115
# Current run number (from spills).
129
135
self.collection = self.config["doc_collection_name"]
137
def revoke_celery_tasks(self):
138
"""Revoke the celery tasks"""
139
for task in self.celery_tasks:
140
print " Celery task %s status is %s" % (task.task_id,
142
celery.task.control.revoke(task.task_id, terminate=True)
143
print " REVOKED at start of new run"
144
self.celery_tasks = []
131
146
def poll_celery_tasks(self):
133
Wait for tasks currently being executed by Celery nodes to
148
Loop over existing tasks; remove from the queue if they are completed
135
149
@param self Object reference.
136
150
@throws RabbitMQException if RabbitMQ cannot be contacted.
137
@throws DocumentStoreException if there is a problem
138
using the document store.
151
@throws DocumentStoreException if there is a problem using the document
140
# Check each submitted Celery task.
141
num_tasks = len(self.celery_tasks)
143
while (current < num_tasks):
144
result = self.celery_tasks[current]
154
print 'Polling celery queue'
156
for result in self.celery_tasks:
146
# Catch any RabbitMQ errors when querying status.
147
is_successful = result.successful()
148
is_failed = result.failed()
149
158
result_result = result.result
150
result_traceback = result.traceback
151
159
except socket.error as exc:
152
160
raise RabbitMQException(exc)
154
self.celery_tasks.pop(current)
156
spill = result_result
161
print " Celery task %s status is %s" % (result.task_id,
163
if result.state in celery.states.READY_STATES: # means finished
164
if result.successful():
165
spill = result_result
168
self.doc_store.put(self.collection,
169
str(self.spill_process_count), spill)
170
except Exception as exc:
171
raise DocumentStoreException(exc)
173
self.spill_fail_count += 1
157
174
self.spill_process_count += 1
158
print " Celery task %s SUCCESS " % (result.task_id)
159
print " SAVING to collection %s (with ID %s)" \
160
% (self.collection, self.spill_process_count)
162
self.doc_store.put(self.collection,
163
str(self.spill_process_count), spill)
164
except Exception as exc:
165
raise DocumentStoreException(exc)
168
self.celery_tasks.pop(current)
169
self.spill_fail_count += 1
171
print " Celery task %s FAILED : %s : %s" \
172
% (result.task_id, result_result, result_traceback)
176
else: #pending, retry, etc
177
temp_queue.append(result)
178
self.celery_tasks = temp_queue
179
print len(self.celery_tasks), 'tasks remain in queue'
177
181
def submit_spill_to_celery(self, spill):
182
186
@throws RabbitMQException if RabbitMQ cannot be contacted.
186
execute_transform.delay(spill, self.config_id) # pylint:disable=E1101, C0301
189
result = execute_transform.apply_async(
190
args=[spill, self.config_id]) # pylint:disable=E1101
187
191
except socket.error as exc:
188
192
raise RabbitMQException(exc)
189
print "Task ID: %s" % result.task_id
193
print "Submitting task %s" % result.task_id
190
194
# Save asynchronous result object for checking task status.
191
195
self.celery_tasks.append(result)
199
203
@param run_number New run number.
201
205
print "New run detected...waiting for current processing to complete"
202
# Wait for current tasks, from previous run, to complete.
206
# Force kill current tasks, from previous run, to complete.
203
207
# This also ensures their timestamps < those of next run.
204
while (len(self.celery_tasks) != 0):
205
self.poll_celery_tasks()
209
# Should get hit by timeout in mauscelery.tasks.py
210
self.poll_celery_tasks()
211
# Anything still alive must have stuck
212
self.revoke_celery_tasks()
206
213
self.run_number = run_number
207
214
# Configure Celery nodes.
208
215
print "---------- RUN %d ----------" % self.run_number