~nchohan/appscale/zk3.3.4

« back to all changes in this revision

Viewing changes to AppServer/google/appengine/ext/mapreduce/model.py

  • Committer: Chris Bunch
  • Date: 2012-02-17 08:19:21 UTC
  • mfrom: (787.2.3 appscale-raj-merge)
  • Revision ID: cgb@cs.ucsb.edu-20120217081921-pakidyksaenlpzur
merged with main branch, gaining rabbitmq and upgrades for hbase, cassandra, and hypertable, as well as upgrading to gae 1.6.1 for python and go

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
import datetime
50
50
import logging
51
51
import math
 
52
import os
52
53
import random
53
 
import simplejson
 
54
try:
 
55
  import json as simplejson
 
56
except ImportError:
 
57
  import simplejson
54
58
import time
55
59
import types
56
60
 
60
64
from google.appengine.ext.mapreduce import context
61
65
from google.appengine.ext.mapreduce import hooks
62
66
from google.appengine.ext.mapreduce import util
63
 
from graphy.backends import google_chart_api
64
 
 
65
 
 
66
 
 
67
 
_DEFAULT_PROCESSING_RATE_PER_SEC = 100
 
67
from google.appengine._internal.graphy.backends import google_chart_api
 
68
 
 
69
 
 
70
 
 
71
 
 
72
_DEFAULT_PROCESSING_RATE_PER_SEC = 1000000
68
73
 
69
74
 
70
75
_DEFAULT_SHARD_COUNT = 8
85
90
    Returns:
86
91
      json representation as string.
87
92
    """
88
 
    return simplejson.dumps(self.to_json(), sort_keys=True)
 
93
    json = self.to_json()
 
94
    try:
 
95
      return simplejson.dumps(json, sort_keys=True)
 
96
    except:
 
97
      logging.exception("Could not serialize JSON: %r", json)
 
98
      raise
89
99
 
90
100
  @classmethod
91
101
  def from_json_str(cls, json_str):
208
218
_FUTURE_TIME = 2**34
209
219
 
210
220
 
211
 
def _get_descending_key(gettime=time.time, getrandint=random.randint):
 
221
def _get_descending_key(gettime=time.time):
212
222
  """Returns a key name lexically ordered by time descending.
213
223
 
214
224
  This lets us have a key name for use with Datastore entities which returns
217
227
 
218
228
  Args:
219
229
    gettime: Used for testing.
220
 
    getrandint: Used for testing.
221
230
 
222
231
  Returns:
223
232
    A string with a time descending key.
224
233
  """
225
234
  now_descending = int((_FUTURE_TIME - gettime()) * 100)
226
 
  tie_breaker = getrandint(0, 100)
227
 
  return "%d%d" % (now_descending, tie_breaker)
 
235
  request_id_hash = os.environ.get("REQUEST_ID_HASH")
 
236
  if not request_id_hash:
 
237
    request_id_hash = str(random.getrandbits(32))
 
238
  return "%d%s" % (now_descending, request_id_hash)
228
239
 
229
240
 
230
241
class CountersMap(JsonMixin):
587
598
    Returns:
588
599
      Datastore Key that can be used to fetch the MapreduceState.
589
600
    """
590
 
    return db.Key.from_path(cls.kind(), mapreduce_id)
 
601
    return db.Key.from_path(cls.kind(), str(mapreduce_id))
591
602
 
592
603
  @classmethod
593
604
  def get_by_job_id(cls, mapreduce_id):
799
810
    return cls.get_by_key_name(shard_id)
800
811
 
801
812
  @classmethod
 
813
  def find_by_mapreduce_state(cls, mapreduce_state):
 
814
    """Find all shard states for given mapreduce.
 
815
 
 
816
    Args:
 
817
      mapreduce_state: MapreduceState instance
 
818
 
 
819
    Returns:
 
820
      iterable of all ShardState for given mapreduce.
 
821
    """
 
822
    keys = []
 
823
    for i in range(mapreduce_state.mapreduce_spec.mapper.shard_count):
 
824
      shard_id = cls.shard_id_from_number(mapreduce_state.key().name(), i)
 
825
      keys.append(cls.get_key_by_shard_id(shard_id))
 
826
    return [state for state in db.get(keys) if state]
 
827
 
 
828
  @classmethod
802
829
  def find_by_mapreduce_id(cls, mapreduce_id):
803
 
    """Find all shard states for given mapreduce.
804
 
 
805
 
    Args:
806
 
      mapreduce_id: mapreduce id.
807
 
 
808
 
    Returns:
809
 
      iterable of all ShardState for given mapreduce id.
810
 
    """
811
 
    return cls.all().filter("mapreduce_id =", mapreduce_id).fetch(99999)
 
830
    logging.error(
 
831
        "ShardState.find_by_mapreduce_id method may be inconsistent. " +
 
832
        "ShardState.find_by_mapreduce_state should be used instead.")
 
833
    return cls.all().filter(
 
834
        "mapreduce_id =", mapreduce_id).fetch(99999)
812
835
 
813
836
  @classmethod
814
837
  def create_new(cls, mapreduce_id, shard_number):