~nchohan/appscale/zk3.3.4

« back to all changes in this revision

Viewing changes to AppServer/google/appengine/ext/mapreduce/util.py

  • Committer: Chris Bunch
  • Date: 2012-02-17 08:19:21 UTC
  • mfrom: (787.2.3 appscale-raj-merge)
  • Revision ID: cgb@cs.ucsb.edu-20120217081921-pakidyksaenlpzur
merged with main branch, gaining rabbitmq and upgrades for hbase, cassandra, and hypertable, as well as upgrading to gae 1.6.1 for python and go

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
 
35
35
 
36
36
__all__ = ["for_name", "is_generator_function", "get_short_name", "parse_bool",
37
 
           "create_datastore_write_config"]
38
 
 
39
 
 
 
37
           "create_datastore_write_config",
 
38
           "HugeTask", "HugeTaskHandler"]
 
39
 
 
40
 
 
41
import base64
 
42
import cgi
40
43
import inspect
41
44
import logging
 
45
import zlib
 
46
import urllib
42
47
 
 
48
from google.appengine.api import files
 
49
from google.appengine.api import taskqueue
 
50
from google.appengine.ext import db
43
51
from google.appengine.datastore import datastore_rpc
 
52
from google.appengine.ext.mapreduce import base_handler
44
53
 
45
54
 
46
55
def for_name(fq_name, recursive=False):
177
186
  else:
178
187
 
179
188
    return datastore_rpc.Configuration()
 
189
 
 
190
 
 
191
class _HugeTaskPayload(db.Model):
 
192
  """Model object to store task payload."""
 
193
 
 
194
  payload = db.TextProperty()
 
195
 
 
196
  @classmethod
 
197
  def kind(cls):
 
198
    """Returns entity kind."""
 
199
    return "_GAE_MR_TaskPayload"
 
200
 
 
201
 
 
202
class HugeTask(object):
 
203
  """HugeTask is a taskqueue.Task-like class that can store big payloads.
 
204
 
 
205
  Payloads are stored either in the task payload itself or in the datastore.
 
206
  Task handlers should inherit from HugeTaskHandler class.
 
207
  """
 
208
 
 
209
  PAYLOAD_PARAM = "__payload"
 
210
  PAYLOAD_KEY_PARAM = "__payload_key"
 
211
 
 
212
  MAX_TASK_PAYLOAD = 100000
 
213
  MAX_DB_PAYLOAD = 1000000
 
214
 
 
215
  def __init__(self,
 
216
               url,
 
217
               params,
 
218
               name=None,
 
219
               eta=None,
 
220
               countdown=None):
 
221
    self.url = url
 
222
    self.params = params
 
223
    self.name = name
 
224
    self.eta = eta
 
225
    self.countdown = countdown
 
226
 
 
227
  def add(self, queue_name, transactional=False, parent=None):
 
228
    """Add task to the queue."""
 
229
    payload_str = urllib.urlencode(self.params)
 
230
    if len(payload_str) < self.MAX_TASK_PAYLOAD:
 
231
 
 
232
      task = self.to_task()
 
233
      task.add(queue_name, transactional)
 
234
      return
 
235
 
 
236
    compressed_payload = base64.b64encode(zlib.compress(payload_str))
 
237
 
 
238
    if len(compressed_payload) < self.MAX_TASK_PAYLOAD:
 
239
 
 
240
      task = taskqueue.Task(
 
241
          url=self.url,
 
242
          params={self.PAYLOAD_PARAM: compressed_payload},
 
243
          name=self.name,
 
244
          eta=self.eta,
 
245
          countdown=self.countdown)
 
246
      task.add(queue_name, transactional)
 
247
      return
 
248
 
 
249
    if len(compressed_payload) > self.MAX_DB_PAYLOAD:
 
250
      raise Exception("Payload to big to be stored in database: %s",
 
251
                      len(compressed_payload))
 
252
 
 
253
 
 
254
    if not parent:
 
255
      raise Exception("Huge tasks should specify parent entity.")
 
256
 
 
257
    payload_entity = _HugeTaskPayload(payload=compressed_payload,
 
258
                                      parent=parent)
 
259
 
 
260
    payload_key = payload_entity.put()
 
261
    task = taskqueue.Task(
 
262
        url=self.url,
 
263
        params={self.PAYLOAD_KEY_PARAM: str(payload_key)},
 
264
        name=self.name,
 
265
        eta=self.eta,
 
266
        countdown=self.countdown)
 
267
    task.add(queue_name, transactional)
 
268
 
 
269
  def to_task(self):
 
270
    """Convert to a taskqueue task without doing any kind of encoding."""
 
271
    return taskqueue.Task(
 
272
        url=self.url,
 
273
        params=self.params,
 
274
        name=self.name,
 
275
        eta=self.eta,
 
276
        countdown=self.countdown)
 
277
 
 
278
  @classmethod
 
279
  def decode_payload(cls, payload_dict):
 
280
    if (not payload_dict.get(cls.PAYLOAD_PARAM) and
 
281
        not payload_dict.get(cls.PAYLOAD_KEY_PARAM)):
 
282
        return payload_dict
 
283
 
 
284
    if payload_dict.get(cls.PAYLOAD_PARAM):
 
285
      payload = payload_dict.get(cls.PAYLOAD_PARAM)
 
286
    else:
 
287
      payload_key = payload_dict.get(cls.PAYLOAD_KEY_PARAM)
 
288
      payload_entity = _HugeTaskPayload.get(payload_key)
 
289
      payload = payload_entity.payload
 
290
    payload_str = zlib.decompress(base64.b64decode(payload))
 
291
 
 
292
    result = {}
 
293
    for (name, value) in cgi.parse_qs(payload_str).items():
 
294
      if len(value) == 1:
 
295
        result[name] = value[0]
 
296
      else:
 
297
        result[name] = value
 
298
    return result
 
299
 
 
300
 
 
301
class HugeTaskHandler(base_handler.TaskQueueHandler):
 
302
  """Base handler for processing HugeTasks."""
 
303
 
 
304
  class RequestWrapper(object):
 
305
    def __init__(self, request):
 
306
      self._request = request
 
307
 
 
308
      self.path = self._request.path
 
309
      self.headers = self._request.headers
 
310
 
 
311
      self._encoded = True
 
312
 
 
313
      if (not self._request.get(HugeTask.PAYLOAD_PARAM) and
 
314
          not self._request.get(HugeTask.PAYLOAD_KEY_PARAM)):
 
315
          self._encoded = False
 
316
          return
 
317
      self._params = HugeTask.decode_payload(
 
318
          {HugeTask.PAYLOAD_PARAM:
 
319
              self._request.get(HugeTask.PAYLOAD_PARAM),
 
320
           HugeTask.PAYLOAD_KEY_PARAM:
 
321
              self._request.get(HugeTask.PAYLOAD_KEY_PARAM)})
 
322
 
 
323
    def get(self, name, default=""):
 
324
      if self._encoded:
 
325
        return self._params.get(name, default)
 
326
      else:
 
327
        return self._request.get(name, default)
 
328
 
 
329
    def set(self, name, value):
 
330
      if self._encoded:
 
331
        self._params.set(name, value)
 
332
      else:
 
333
        self._request.set(name, value)
 
334
 
 
335
  def __init__(self, *args, **kwargs):
 
336
    base_handler.TaskQueueHandler.__init__(self, *args, **kwargs)
 
337
 
 
338
  def _setup(self):
 
339
    base_handler.TaskQueueHandler._setup(self)
 
340
    self.request = self.RequestWrapper(self.request)