~nchohan/+junk/mytools

« back to all changes in this revision

Viewing changes to sample_apps/httpmr/httpmr/driver.py

  • Committer: root
  • Date: 2010-11-03 07:43:57 UTC
  • Revision ID: root@appscale-image0-20101103074357-xea7ja3sor3x93oc
init

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
 
 
3
"""Simple multithreaded HTTP request driver for HTTPMR.
 
4
 
 
5
Command-line tool for driving HTTPMR operations.  Spawns multiple threads for
 
6
concurrent shard operation, handles statistics collection and operation failure
 
7
retries.
 
8
 
 
9
Sample usage:
 
10
 
 
11
driver.py --httpmr_base=http://your.app.com/httpmr_base_url \
 
12
    --max_operations_inflight=10 \
 
13
    --max_per_operation_failures=10
 
14
"""
 
15
 
 
16
import HTMLParser
 
17
import logging
 
18
import optparse
 
19
import time
 
20
import sys
 
21
import threading
 
22
import urllib
 
23
import urllib2
 
24
import urlparse
 
25
 
 
26
MAP_MASTER_TASK_NAME = "map_master"
 
27
REDUCE_MASTER_TASK_NAME = "reduce_master"
 
28
INTERMEDIATE_DATA_CLEANUP_MASTER_TASK_NAME = "cleanup_master"
 
29
OPERATION_TIMEOUT_SEC = "operation_timeout"
 
30
MIN_OPERATION_TIMEOUT_SEC_VALUE = 0.5
 
31
INFINITE_PARAMETER_VALUE = -1
 
32
 
 
33
 
 
34
class Error(Exception):
 
35
  """Base class for all driver-specific Exceptions."""
 
36
 
 
37
 
 
38
class UncrecoverableOperationError(Error):
 
39
  """Base class for all fatal operation errors."""
 
40
 
 
41
 
 
42
class TooManyTriesError(UncrecoverableOperationError):
 
43
  """An operation has been tried too many times without success."""
 
44
 
 
45
 
 
46
class OperationResult(object):
 
47
  """Simple data object that holds the result of a map or reduce operation.
 
48
  
 
49
  To use, set public instance parameters directly.  Not meant to be used outside
 
50
  the context of this module.
 
51
  """
 
52
  
 
53
  def __init__(self):
 
54
    self.url = None
 
55
    self.next_url = None
 
56
    self.errors = []
 
57
    self.tries = 0
 
58
    self.statistics = {}
 
59
  
 
60
  def __str__(self):
 
61
    return str({"url": self.url,
 
62
                "next_url": self.next_url,
 
63
                "errors": self.errors,
 
64
                "tries": self.tries,
 
65
                "statistics": self.statistics})
 
66
  
 
67
  def ParseStatisticsString(self, statistics_string):
 
68
    logging.debug("Parsing statistics from %s" % statistics_string)
 
69
    self.statistics = {}
 
70
    for line in statistics_string.splitlines():
 
71
      tuple = line.split(" ")
 
72
      if len(tuple) == 2:
 
73
        key = tuple[0]
 
74
        value = float(tuple[1])
 
75
        self.statistics[key] = value
 
76
    logging.debug("Got statistics: %s" % self.statistics)
 
77
 
 
78
 
 
79
class OperationResultHTMLParser(HTMLParser.HTMLParser):
 
80
  """HTMLParser that reads the HTML page from a Map or Reduce operation."""
 
81
  
 
82
  def handle_starttag(self, tag, attrs):
 
83
    if tag == "a":
 
84
      self.handle_start_a_tag(attrs)
 
85
    elif tag == "pre":
 
86
      self.handle_start_pre_tag(attrs)
 
87
    
 
88
  def handle_start_a_tag(self, attrs):
 
89
    """Determine the next operation's URL."""
 
90
    self.url = None
 
91
    for tuple in attrs:
 
92
      if tuple[0] == "href":
 
93
        self.url = tuple[1]
 
94
  
 
95
  def handle_start_pre_tag(self, attrs):
 
96
    """Read the statistics information from the <pre> tag."""
 
97
    self._in_pre_tag = True
 
98
  
 
99
  def handle_data(self, data):
 
100
    if hasattr(self, "_in_pre_tag") and self._in_pre_tag:
 
101
      self.statistics = data
 
102
      self._in_pre_tag = False
 
103
 
 
104
 
 
105
class MasterPageResultHTMLParser(HTMLParser.HTMLParser):
 
106
  """HTMLParser that reads the HTML page from a Master page."""
 
107
 
 
108
  def Init(self):
 
109
    self.urls = []
 
110
  
 
111
  def handle_starttag(self, tag, attrs):
 
112
    if tag == "a":
 
113
      self.handle_start_a_tag(attrs)
 
114
    
 
115
  def handle_start_a_tag(self, attrs):
 
116
    """Read the 'href' attribute from an 'a' tag, and add it to the list of URLs
 
117
    
 
118
    The master page for any HTTPMR operation master page lists a set of <a>
 
119
    tags, each representing the first operation of the relevant shard.  Each of
 
120
    these links should be retained and used to populate the initial set of
 
121
    operation threads.
 
122
    """ 
 
123
    logging.debug("Reading 'a' tag: %s" % attrs)
 
124
    for tuple in attrs:
 
125
      if tuple[0] == "href":
 
126
        self.urls.append(tuple[1])
 
127
 
 
128
 
 
129
class OperationThread(threading.Thread):
 
130
  """An OperationThread handles the execution and retry of an HTTP request.
 
131
  
 
132
  The OperationThread handles executing and retrying an HTTP request to a single
 
133
  Map or Reduce operation.  Once the thread has successfully completed its
 
134
  operation (successfully fetched the url assigned via #SetUrl and parsed the
 
135
  operation result page HTML), the callback set via #SetOperationCallback is
 
136
  invoked.  If there is an unrecoverable error (i.e., too many operation
 
137
  failures), the callback set via #SetUnrecoverableErrorCallback is invoked.
 
138
  """
 
139
  
 
140
  def SetOperationCallback(self, callback, **kwargs):
 
141
    """Set the callback that will be invoked when this operation is finished.
 
142
    
 
143
    args:
 
144
      callback: A callable that takes one parameter, the OperationResult
 
145
          constructed by this thread when the operation has completed, and the
 
146
          supplied keyword arguments.
 
147
      kwargs: Keyword arguments that should be passed to the callback.
 
148
    """
 
149
    self.operation_callback = callback
 
150
    self.operation_callback_kwargs = kwargs
 
151
  
 
152
  def SetUnrecoverableErrorCallback(self, callback, **kwargs):
 
153
    """Set the callback that will be invoked on unrecoverable errors.
 
154
    
 
155
    args:
 
156
      callback: A callable that takes the failed URL as its first argument, the
 
157
          unrecoverable exception as its second, and the supplied keyword
 
158
          arguments.
 
159
      kwargs: The keyword arguments that should be supplied to the callback.
 
160
    """
 
161
    self.error_callback = callback
 
162
    self.error_callback_kwargs = kwargs
 
163
  
 
164
  def SetMaxTries(self, max_tries):
 
165
    """Set the maximum number of tries that the operation can be performed.
 
166
    
 
167
    If the operation is attempted unsuccessfully more than this number of times,
 
168
    the operation is considered to fail and a TooManyTriesError is handed to the
 
169
    unrecoverable error callback.
 
170
    """
 
171
    self.max_tries = max_tries
 
172
  
 
173
  def SetUrl(self, url):
 
174
    """Specify the URL that this operation should operate on."""
 
175
    self.url = url
 
176
    self._cancel = False
 
177
  
 
178
  def run(self):
 
179
    """Fetch the URL, retry on failures, invoke error or operation callbacks."""
 
180
    assert self.url is not None
 
181
    assert self.operation_callback is not None
 
182
    assert self.error_callback is not None
 
183
    self.html = None
 
184
 
 
185
    logging.info("Starting operation on %s." % self.url)
 
186
    
 
187
    self.results = OperationResult()
 
188
    self.results.url = self.url
 
189
    
 
190
    try:
 
191
      if self._cancel:
 
192
        return
 
193
      self.html = self._FetchWithRetries(self.url, self.max_tries)
 
194
      logging.debug("Retrieved HTML %s" % self.html)
 
195
    except UncrecoverableOperationError, e:
 
196
      self.error_callback(self.url, e, **self.error_callback_kwargs)
 
197
    self._PopulateResults()
 
198
    if not self._cancel:
 
199
      self.operation_callback(self.results, **self.operation_callback_kwargs)
 
200
  
 
201
  def Cancel(self):
 
202
    self._cancel = True
 
203
  
 
204
  def _FetchWithRetries(self, url, max_tries):
 
205
    tries = 0
 
206
    while tries < max_tries or max_tries == INFINITE_PARAMETER_VALUE:
 
207
      try:
 
208
        tries += 1
 
209
        self.results.tries = tries
 
210
        return self._Fetch(url)
 
211
      except urllib2.HTTPError, e:
 
212
        logging.warning("HTTPError on fetch of %s: %s" % (url, str(e)))
 
213
        url = self._ReduceOperationTimeout(url)
 
214
        self.results.errors.append(e)
 
215
        self._WaitForRetry(tries)
 
216
    raise TooManyTriesError("Too many tries on URL %s" % url)
 
217
  
 
218
  def _WaitForRetry(self, tries):
 
219
    wait_time_sec = min(30 * tries, 600)
 
220
    logging.info("Sleeping for %s seconds." % wait_time_sec)
 
221
    time.sleep(wait_time_sec)
 
222
    
 
223
  def _Fetch(self, url):
 
224
    safe_url = self._GetSafeUrl(url)
 
225
    logging.debug("Fetching %s" % safe_url)
 
226
    f = urllib2.urlopen(safe_url)
 
227
    contents = f.read()
 
228
    f.close()
 
229
    return contents
 
230
  
 
231
  def _GetSafeUrl(self, url):
 
232
    parts = urlparse.urlsplit(url)
 
233
    safe_query = \
 
234
        urllib.quote(parts.query).replace("%26", "&").replace("%3D", "=")
 
235
    parts = (parts.scheme,
 
236
             parts.netloc,
 
237
             parts.path,
 
238
             safe_query,
 
239
             parts.fragment)
 
240
    return urlparse.urlunsplit(parts)
 
241
 
 
242
  def _ReduceOperationTimeout(self, url):
 
243
    current_timeout = None
 
244
    # TODO: Hand URL parameter parsing off to a library, here and elsewhere
 
245
    params = url.split("?")[1]
 
246
    for key_value in params.split("&"):
 
247
      (key, value) = key_value.split("=", 2)
 
248
      if key == OPERATION_TIMEOUT_SEC:
 
249
        # At this point current_timeout is a string
 
250
        current_timeout = value
 
251
    if current_timeout is not None:
 
252
      new_timeout = max(float(current_timeout) - 1,
 
253
                        MIN_OPERATION_TIMEOUT_SEC_VALUE)
 
254
      return url.replace("%s=%s" % (OPERATION_TIMEOUT_SEC, current_timeout),
 
255
                         "%s=%s" % (OPERATION_TIMEOUT_SEC, new_timeout))
 
256
    else:
 
257
      logging.warning("Could not parse the operation timeout from URL '%s', "
 
258
                      "operation retry with original timeout value." % url)
 
259
      return url
 
260
  
 
261
  def _PopulateResults(self):
 
262
    parser = OperationResultHTMLParser()
 
263
    parser.feed(self.html)
 
264
    parser.close()
 
265
    
 
266
    self.results.next_url = None
 
267
    if hasattr(parser, "url"):
 
268
      self.results.next_url = parser.url
 
269
    if hasattr(parser, "statistics"):
 
270
      self.results.ParseStatisticsString(parser.statistics)
 
271
  
 
272
 
 
273
class HTTPMRDriver(object):
 
274
  
 
275
  threads = []
 
276
  threads_pending = []
 
277
  
 
278
  def __init__(self,
 
279
               httpmr_base,
 
280
               max_operation_tries=-1,
 
281
               max_operations_inflight=-1):
 
282
    self.httpmr_base = httpmr_base
 
283
    self.max_operation_tries = max_operation_tries
 
284
    self.max_operations_inflight = max_operations_inflight
 
285
    self.results = []
 
286
    self.lock = threading.Lock()
 
287
    
 
288
  def Run(self):
 
289
    """Begin the Driver's Map - Reduce - Cleanup phase.
 
290
    
 
291
    It is important to use this method as the primary entry point, as it may
 
292
    be utilized in the future to precompute optimal operation parameters in the
 
293
    future.
 
294
    """
 
295
    logging.info("Beginning HTTPMR Driver Run with base URL %s" %
 
296
                 self.httpmr_base)
 
297
    self.Map()
 
298
 
 
299
  def _HandleUnrecoverableOperationError(self, url, error):
 
300
    logging.error("Unrecoverable error on url %s: %s; %s" %
 
301
                  (url, type(error), error))
 
302
    for thread in HTTPMRDriver.threads:
 
303
      thread.Cancel()
 
304
    logging.info("Going to cleanup.")
 
305
    self.Cleanup()
 
306
    
 
307
  def Map(self):
 
308
    self._LaunchPhase(MAP_MASTER_TASK_NAME, self._AllMapOperationsComplete)
 
309
  
 
310
  def _AllMapOperationsComplete(self):
 
311
    logging.info("Done Mapping!")
 
312
    self.Reduce()
 
313
  
 
314
  def Reduce(self):
 
315
    self._LaunchPhase(REDUCE_MASTER_TASK_NAME,
 
316
                      self._AllReduceOperationsComplete)
 
317
  
 
318
  def _AllReduceOperationsComplete(self):
 
319
    logging.info("Done Reducing!")
 
320
    self.Cleanup()
 
321
    
 
322
  def Cleanup(self):
 
323
    self._LaunchPhase(INTERMEDIATE_DATA_CLEANUP_MASTER_TASK_NAME,
 
324
                      self._AllCleanupOperationsComplete)
 
325
    
 
326
  def _AllCleanupOperationsComplete(self):
 
327
    logging.info("Done Cleaning Up!")
 
328
    logging.debug("Results: %s" % self.results)
 
329
    logging.info("Comprehensive Results: %s" % self._GetAggregateResults())
 
330
  
 
331
  def _GetAggregateResults(self):
 
332
    def AddDicts(a, b):
 
333
      sum_dict = {}
 
334
      for key in a:
 
335
        if key in b:
 
336
          sum_dict[key] = a[key] + b[key]
 
337
      return sum_dict
 
338
    return reduce(AddDicts, map(lambda result: result.statistics,
 
339
                                self.results))
 
340
  
 
341
  def _LaunchPhase(self, phase_task_name, all_operations_complete_callback):
 
342
    logging.info("Starting %s phase." % phase_task_name)
 
343
    base_urls = self._GetInitialUrls(phase_task_name)
 
344
    logging.debug("Initial URLs: %s" % ", ".join(base_urls))
 
345
    self.threads_inflight = 0
 
346
    for url in base_urls:
 
347
      thread = self._CreateOperationThread(url,
 
348
                                           all_operations_complete_callback)
 
349
      HTTPMRDriver.threads.append(thread)
 
350
      if (self.threads_inflight < self.max_operations_inflight or
 
351
          self.max_operations_inflight == INFINITE_PARAMETER_VALUE):
 
352
        self.threads_inflight += 1
 
353
        thread.start()
 
354
      else:
 
355
        HTTPMRDriver.threads_pending.append(thread)
 
356
 
 
357
  def _GetInitialUrls(self, task):
 
358
    url = "%s?task=%s" % (self.httpmr_base, task) 
 
359
    html = urllib2.urlopen(url).read()
 
360
    parser = MasterPageResultHTMLParser()
 
361
    parser.Init()
 
362
    parser.feed(html)
 
363
    parser.close()
 
364
    return parser.urls
 
365
 
 
366
  def _CreateOperationThread(self, url, all_operations_complete_callback):
 
367
    thread = OperationThread()
 
368
    thread.SetUrl(url)
 
369
    # TODO(peterdolan): Make the maximum operation tries configurable via a
 
370
    # command-line parameter
 
371
    thread.SetMaxTries(self.max_operation_tries)
 
372
    thread.SetOperationCallback(
 
373
        self._HandleThreadCompletion,
 
374
        all_operations_complete_callback=all_operations_complete_callback)
 
375
    thread.SetUnrecoverableErrorCallback(
 
376
        self._HandleUnrecoverableOperationError)
 
377
    return thread
 
378
  
 
379
  def _HandleThreadCompletion(self, results, all_operations_complete_callback):
 
380
    self.lock.acquire()
 
381
    self.threads_inflight -= 1
 
382
    
 
383
    self.results.append(results)
 
384
    if results.next_url is not None:
 
385
      logging.debug("Initializing new thread to handle %s" % results.next_url)
 
386
      thread = self._CreateOperationThread(results.next_url,
 
387
                                           all_operations_complete_callback)
 
388
      HTTPMRDriver.threads_pending.insert(0, thread)
 
389
 
 
390
    if HTTPMRDriver.threads_pending:
 
391
      logging.debug("Starting the next pending thread.")
 
392
      thread = self.threads_pending.pop()
 
393
      self.threads_inflight += 1
 
394
      thread.start()
 
395
    
 
396
    if not self.threads_inflight:
 
397
      logging.debug("All threads completed for this phase.")
 
398
      all_operations_complete_callback()
 
399
    self.lock.release()
 
400
  
 
401
 
 
402
def main():
 
403
  logging.basicConfig(level=logging.INFO,
 
404
                      format='%(asctime)s %(levelname)-8s %(message)s',
 
405
                      datefmt='%a, %d %b %Y %H:%M:%S',
 
406
                      stream=sys.stdout)
 
407
  options_parser = optparse.OptionParser()
 
408
  options_parser.add_option("-b",
 
409
                            "--httpmr_base",
 
410
                            action="store",
 
411
                            type="string",
 
412
                            dest="httpmr_base",
 
413
                            help="The base URL of the HTTPMR operation.")
 
414
  options_parser.add_option("-i",
 
415
                            "--max_operations_inflight",
 
416
                            action="store",
 
417
                            type="int",
 
418
                            dest="max_operations_inflight",
 
419
                            default=-1,
 
420
                            help="The maximum number of operations to keep "
 
421
                                + "simultaneously inflight.  -1 for inf.")
 
422
  options_parser.add_option("-f",
 
423
                            "--max_per_operation_failures",
 
424
                            action="store",
 
425
                            type="int",
 
426
                            dest="max_per_operation_failures",
 
427
                            default=-1,
 
428
                            help="The maximum number of times any given "
 
429
                                + "operation can fail before a fatal error is"
 
430
                                + " thrown.  -1 for inf.")
 
431
  options_parser.add_option("-c",
 
432
                            "--cleanup_only",
 
433
                            action="store_true",
 
434
                            dest="cleanup_only",
 
435
                            default=False,
 
436
                            help="Only execute the intermediate data cleanup "
 
437
                                + "phase.")
 
438
  (options, args) = options_parser.parse_args()
 
439
  
 
440
  driver = HTTPMRDriver(options.httpmr_base,
 
441
                        options.max_per_operation_failures,
 
442
                        options.max_operations_inflight)
 
443
  if options.cleanup_only:
 
444
    driver.Cleanup()
 
445
  else:
 
446
    driver.Run()
 
447
 
 
448
 
 
449
if __name__ == "__main__":
 
450
  main()
 
 
b'\\ No newline at end of file'