3
"""Simple multithreaded HTTP request driver for HTTPMR.
5
Command-line tool for driving HTTPMR operations. Spawns multiple threads for
6
concurrent shard operation, handles statistics collection and operation failure
11
driver.py --httpmr_base=http://your.app.com/httpmr_base_url \
12
--max_operations_inflight=10 \
13
--max_per_operation_failures=10
26
MAP_MASTER_TASK_NAME = "map_master"
27
REDUCE_MASTER_TASK_NAME = "reduce_master"
28
INTERMEDIATE_DATA_CLEANUP_MASTER_TASK_NAME = "cleanup_master"
29
OPERATION_TIMEOUT_SEC = "operation_timeout"
30
MIN_OPERATION_TIMEOUT_SEC_VALUE = 0.5
31
INFINITE_PARAMETER_VALUE = -1
34
class Error(Exception):
35
"""Base class for all driver-specific Exceptions."""
38
class UncrecoverableOperationError(Error):
39
"""Base class for all fatal operation errors."""
42
class TooManyTriesError(UncrecoverableOperationError):
43
"""An operation has been tried too many times without success."""
46
class OperationResult(object):
47
"""Simple data object that holds the result of a map or reduce operation.
49
To use, set public instance parameters directly. Not meant to be used outside
50
the context of this module.
61
return str({"url": self.url,
62
"next_url": self.next_url,
63
"errors": self.errors,
65
"statistics": self.statistics})
67
def ParseStatisticsString(self, statistics_string):
68
logging.debug("Parsing statistics from %s" % statistics_string)
70
for line in statistics_string.splitlines():
71
tuple = line.split(" ")
74
value = float(tuple[1])
75
self.statistics[key] = value
76
logging.debug("Got statistics: %s" % self.statistics)
79
class OperationResultHTMLParser(HTMLParser.HTMLParser):
80
"""HTMLParser that reads the HTML page from a Map or Reduce operation."""
82
def handle_starttag(self, tag, attrs):
84
self.handle_start_a_tag(attrs)
86
self.handle_start_pre_tag(attrs)
88
def handle_start_a_tag(self, attrs):
89
"""Determine the next operation's URL."""
92
if tuple[0] == "href":
95
def handle_start_pre_tag(self, attrs):
96
"""Read the statistics information from the <pre> tag."""
97
self._in_pre_tag = True
99
def handle_data(self, data):
100
if hasattr(self, "_in_pre_tag") and self._in_pre_tag:
101
self.statistics = data
102
self._in_pre_tag = False
105
class MasterPageResultHTMLParser(HTMLParser.HTMLParser):
106
"""HTMLParser that reads the HTML page from a Master page."""
111
def handle_starttag(self, tag, attrs):
113
self.handle_start_a_tag(attrs)
115
def handle_start_a_tag(self, attrs):
116
"""Read the 'href' attribute from an 'a' tag, and add it to the list of URLs
118
The master page for any HTTPMR operation master page lists a set of <a>
119
tags, each representing the first operation of the relevant shard. Each of
120
these links should be retained and used to populate the initial set of
123
logging.debug("Reading 'a' tag: %s" % attrs)
125
if tuple[0] == "href":
126
self.urls.append(tuple[1])
129
class OperationThread(threading.Thread):
130
"""An OperationThread handles the execution and retry of an HTTP request.
132
The OperationThread handles executing and retrying an HTTP request to a single
133
Map or Reduce operation. Once the thread has successfully completed its
134
operation (successfully fetched the url assigned via #SetUrl and parsed the
135
operation result page HTML), the callback set via #SetOperationCallback is
136
invoked. If there is an unrecoverable error (i.e., too many operation
137
failures), the callback set via #SetUnrecoverableErrorCallback is invoked.
140
def SetOperationCallback(self, callback, **kwargs):
141
"""Set the callback that will be invoked when this operation is finished.
144
callback: A callable that takes one parameter, the OperationResult
145
constructed by this thread when the operation has completed, and the
146
supplied keyword arguments.
147
kwargs: Keyword arguments that should be passed to the callback.
149
self.operation_callback = callback
150
self.operation_callback_kwargs = kwargs
152
def SetUnrecoverableErrorCallback(self, callback, **kwargs):
153
"""Set the callback that will be invoked on unrecoverable errors.
156
callback: A callable that takes the failed URL as its first argument, the
157
unrecoverable exception as its second, and the supplied keyword
159
kwargs: The keyword arguments that should be supplied to the callback.
161
self.error_callback = callback
162
self.error_callback_kwargs = kwargs
164
def SetMaxTries(self, max_tries):
165
"""Set the maximum number of tries that the operation can be performed.
167
If the operation is attempted unsuccessfully more than this number of times,
168
the operation is considered to fail and a TooManyTriesError is handed to the
169
unrecoverable error callback.
171
self.max_tries = max_tries
173
def SetUrl(self, url):
174
"""Specify the URL that this operation should operate on."""
179
"""Fetch the URL, retry on failures, invoke error or operation callbacks."""
180
assert self.url is not None
181
assert self.operation_callback is not None
182
assert self.error_callback is not None
185
logging.info("Starting operation on %s." % self.url)
187
self.results = OperationResult()
188
self.results.url = self.url
193
self.html = self._FetchWithRetries(self.url, self.max_tries)
194
logging.debug("Retrieved HTML %s" % self.html)
195
except UncrecoverableOperationError, e:
196
self.error_callback(self.url, e, **self.error_callback_kwargs)
197
self._PopulateResults()
199
self.operation_callback(self.results, **self.operation_callback_kwargs)
204
def _FetchWithRetries(self, url, max_tries):
206
while tries < max_tries or max_tries == INFINITE_PARAMETER_VALUE:
209
self.results.tries = tries
210
return self._Fetch(url)
211
except urllib2.HTTPError, e:
212
logging.warning("HTTPError on fetch of %s: %s" % (url, str(e)))
213
url = self._ReduceOperationTimeout(url)
214
self.results.errors.append(e)
215
self._WaitForRetry(tries)
216
raise TooManyTriesError("Too many tries on URL %s" % url)
218
def _WaitForRetry(self, tries):
219
wait_time_sec = min(30 * tries, 600)
220
logging.info("Sleeping for %s seconds." % wait_time_sec)
221
time.sleep(wait_time_sec)
223
def _Fetch(self, url):
224
safe_url = self._GetSafeUrl(url)
225
logging.debug("Fetching %s" % safe_url)
226
f = urllib2.urlopen(safe_url)
231
def _GetSafeUrl(self, url):
232
parts = urlparse.urlsplit(url)
234
urllib.quote(parts.query).replace("%26", "&").replace("%3D", "=")
235
parts = (parts.scheme,
240
return urlparse.urlunsplit(parts)
242
def _ReduceOperationTimeout(self, url):
243
current_timeout = None
244
# TODO: Hand URL parameter parsing off to a library, here and elsewhere
245
params = url.split("?")[1]
246
for key_value in params.split("&"):
247
(key, value) = key_value.split("=", 2)
248
if key == OPERATION_TIMEOUT_SEC:
249
# At this point current_timeout is a string
250
current_timeout = value
251
if current_timeout is not None:
252
new_timeout = max(float(current_timeout) - 1,
253
MIN_OPERATION_TIMEOUT_SEC_VALUE)
254
return url.replace("%s=%s" % (OPERATION_TIMEOUT_SEC, current_timeout),
255
"%s=%s" % (OPERATION_TIMEOUT_SEC, new_timeout))
257
logging.warning("Could not parse the operation timeout from URL '%s', "
258
"operation retry with original timeout value." % url)
261
def _PopulateResults(self):
262
parser = OperationResultHTMLParser()
263
parser.feed(self.html)
266
self.results.next_url = None
267
if hasattr(parser, "url"):
268
self.results.next_url = parser.url
269
if hasattr(parser, "statistics"):
270
self.results.ParseStatisticsString(parser.statistics)
273
class HTTPMRDriver(object):
280
max_operation_tries=-1,
281
max_operations_inflight=-1):
282
self.httpmr_base = httpmr_base
283
self.max_operation_tries = max_operation_tries
284
self.max_operations_inflight = max_operations_inflight
286
self.lock = threading.Lock()
289
"""Begin the Driver's Map - Reduce - Cleanup phase.
291
It is important to use this method as the primary entry point, as it may
292
be utilized in the future to precompute optimal operation parameters in the
295
logging.info("Beginning HTTPMR Driver Run with base URL %s" %
299
def _HandleUnrecoverableOperationError(self, url, error):
300
logging.error("Unrecoverable error on url %s: %s; %s" %
301
(url, type(error), error))
302
for thread in HTTPMRDriver.threads:
304
logging.info("Going to cleanup.")
308
self._LaunchPhase(MAP_MASTER_TASK_NAME, self._AllMapOperationsComplete)
310
def _AllMapOperationsComplete(self):
311
logging.info("Done Mapping!")
315
self._LaunchPhase(REDUCE_MASTER_TASK_NAME,
316
self._AllReduceOperationsComplete)
318
def _AllReduceOperationsComplete(self):
319
logging.info("Done Reducing!")
323
self._LaunchPhase(INTERMEDIATE_DATA_CLEANUP_MASTER_TASK_NAME,
324
self._AllCleanupOperationsComplete)
326
def _AllCleanupOperationsComplete(self):
327
logging.info("Done Cleaning Up!")
328
logging.debug("Results: %s" % self.results)
329
logging.info("Comprehensive Results: %s" % self._GetAggregateResults())
331
def _GetAggregateResults(self):
336
sum_dict[key] = a[key] + b[key]
338
return reduce(AddDicts, map(lambda result: result.statistics,
341
def _LaunchPhase(self, phase_task_name, all_operations_complete_callback):
342
logging.info("Starting %s phase." % phase_task_name)
343
base_urls = self._GetInitialUrls(phase_task_name)
344
logging.debug("Initial URLs: %s" % ", ".join(base_urls))
345
self.threads_inflight = 0
346
for url in base_urls:
347
thread = self._CreateOperationThread(url,
348
all_operations_complete_callback)
349
HTTPMRDriver.threads.append(thread)
350
if (self.threads_inflight < self.max_operations_inflight or
351
self.max_operations_inflight == INFINITE_PARAMETER_VALUE):
352
self.threads_inflight += 1
355
HTTPMRDriver.threads_pending.append(thread)
357
def _GetInitialUrls(self, task):
358
url = "%s?task=%s" % (self.httpmr_base, task)
359
html = urllib2.urlopen(url).read()
360
parser = MasterPageResultHTMLParser()
366
def _CreateOperationThread(self, url, all_operations_complete_callback):
367
thread = OperationThread()
369
# TODO(peterdolan): Make the maximum operation tries configurable via a
370
# command-line parameter
371
thread.SetMaxTries(self.max_operation_tries)
372
thread.SetOperationCallback(
373
self._HandleThreadCompletion,
374
all_operations_complete_callback=all_operations_complete_callback)
375
thread.SetUnrecoverableErrorCallback(
376
self._HandleUnrecoverableOperationError)
379
def _HandleThreadCompletion(self, results, all_operations_complete_callback):
381
self.threads_inflight -= 1
383
self.results.append(results)
384
if results.next_url is not None:
385
logging.debug("Initializing new thread to handle %s" % results.next_url)
386
thread = self._CreateOperationThread(results.next_url,
387
all_operations_complete_callback)
388
HTTPMRDriver.threads_pending.insert(0, thread)
390
if HTTPMRDriver.threads_pending:
391
logging.debug("Starting the next pending thread.")
392
thread = self.threads_pending.pop()
393
self.threads_inflight += 1
396
if not self.threads_inflight:
397
logging.debug("All threads completed for this phase.")
398
all_operations_complete_callback()
403
logging.basicConfig(level=logging.INFO,
404
format='%(asctime)s %(levelname)-8s %(message)s',
405
datefmt='%a, %d %b %Y %H:%M:%S',
407
options_parser = optparse.OptionParser()
408
options_parser.add_option("-b",
413
help="The base URL of the HTTPMR operation.")
414
options_parser.add_option("-i",
415
"--max_operations_inflight",
418
dest="max_operations_inflight",
420
help="The maximum number of operations to keep "
421
+ "simultaneously inflight. -1 for inf.")
422
options_parser.add_option("-f",
423
"--max_per_operation_failures",
426
dest="max_per_operation_failures",
428
help="The maximum number of times any given "
429
+ "operation can fail before a fatal error is"
430
+ " thrown. -1 for inf.")
431
options_parser.add_option("-c",
436
help="Only execute the intermediate data cleanup "
438
(options, args) = options_parser.parse_args()
440
driver = HTTPMRDriver(options.httpmr_base,
441
options.max_per_operation_failures,
442
options.max_operations_inflight)
443
if options.cleanup_only:
449
if __name__ == "__main__":
b'\\ No newline at end of file'