36
36
__all__ = ["for_name", "is_generator_function", "get_short_name", "parse_bool",
37
"create_datastore_write_config"]
37
"create_datastore_write_config",
38
"HugeTask", "HugeTaskHandler"]
48
from google.appengine.api import files
49
from google.appengine.api import taskqueue
50
from google.appengine.ext import db
43
51
from google.appengine.datastore import datastore_rpc
52
from google.appengine.ext.mapreduce import base_handler
46
55
def for_name(fq_name, recursive=False):
179
188
return datastore_rpc.Configuration()
191
class _HugeTaskPayload(db.Model):
192
"""Model object to store task payload."""
194
payload = db.TextProperty()
198
"""Returns entity kind."""
199
return "_GAE_MR_TaskPayload"
202
class HugeTask(object):
203
"""HugeTask is a taskqueue.Task-like class that can store big payloads.
205
Payloads are stored either in the task payload itself or in the datastore.
206
Task handlers should inherit from HugeTaskHandler class.
209
PAYLOAD_PARAM = "__payload"
210
PAYLOAD_KEY_PARAM = "__payload_key"
212
MAX_TASK_PAYLOAD = 100000
213
MAX_DB_PAYLOAD = 1000000
225
self.countdown = countdown
227
def add(self, queue_name, transactional=False, parent=None):
228
"""Add task to the queue."""
229
payload_str = urllib.urlencode(self.params)
230
if len(payload_str) < self.MAX_TASK_PAYLOAD:
232
task = self.to_task()
233
task.add(queue_name, transactional)
236
compressed_payload = base64.b64encode(zlib.compress(payload_str))
238
if len(compressed_payload) < self.MAX_TASK_PAYLOAD:
240
task = taskqueue.Task(
242
params={self.PAYLOAD_PARAM: compressed_payload},
245
countdown=self.countdown)
246
task.add(queue_name, transactional)
249
if len(compressed_payload) > self.MAX_DB_PAYLOAD:
250
raise Exception("Payload to big to be stored in database: %s",
251
len(compressed_payload))
255
raise Exception("Huge tasks should specify parent entity.")
257
payload_entity = _HugeTaskPayload(payload=compressed_payload,
260
payload_key = payload_entity.put()
261
task = taskqueue.Task(
263
params={self.PAYLOAD_KEY_PARAM: str(payload_key)},
266
countdown=self.countdown)
267
task.add(queue_name, transactional)
270
"""Convert to a taskqueue task without doing any kind of encoding."""
271
return taskqueue.Task(
276
countdown=self.countdown)
279
def decode_payload(cls, payload_dict):
280
if (not payload_dict.get(cls.PAYLOAD_PARAM) and
281
not payload_dict.get(cls.PAYLOAD_KEY_PARAM)):
284
if payload_dict.get(cls.PAYLOAD_PARAM):
285
payload = payload_dict.get(cls.PAYLOAD_PARAM)
287
payload_key = payload_dict.get(cls.PAYLOAD_KEY_PARAM)
288
payload_entity = _HugeTaskPayload.get(payload_key)
289
payload = payload_entity.payload
290
payload_str = zlib.decompress(base64.b64decode(payload))
293
for (name, value) in cgi.parse_qs(payload_str).items():
295
result[name] = value[0]
301
class HugeTaskHandler(base_handler.TaskQueueHandler):
302
"""Base handler for processing HugeTasks."""
304
class RequestWrapper(object):
305
def __init__(self, request):
306
self._request = request
308
self.path = self._request.path
309
self.headers = self._request.headers
313
if (not self._request.get(HugeTask.PAYLOAD_PARAM) and
314
not self._request.get(HugeTask.PAYLOAD_KEY_PARAM)):
315
self._encoded = False
317
self._params = HugeTask.decode_payload(
318
{HugeTask.PAYLOAD_PARAM:
319
self._request.get(HugeTask.PAYLOAD_PARAM),
320
HugeTask.PAYLOAD_KEY_PARAM:
321
self._request.get(HugeTask.PAYLOAD_KEY_PARAM)})
323
def get(self, name, default=""):
325
return self._params.get(name, default)
327
return self._request.get(name, default)
329
def set(self, name, value):
331
self._params.set(name, value)
333
self._request.set(name, value)
335
def __init__(self, *args, **kwargs):
336
base_handler.TaskQueueHandler.__init__(self, *args, **kwargs)
339
base_handler.TaskQueueHandler._setup(self)
340
self.request = self.RequestWrapper(self.request)