~blamar/+junk/openstack-api-arrrg

« back to all changes in this revision

Viewing changes to vendor/boto/boto/mapreduce/lqs.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import SocketServer, os, datetime, sys, random, time
 
2
import simplejson
 
3
 
 
4
class LQSCommand:
 
5
 
 
6
    def __init__(self, line):
 
7
        self.raw_line = line
 
8
        self.line = self.raw_line.strip()
 
9
        l = self.line.split(' ')
 
10
        self.name = l[0]
 
11
        if len(l) > 1:
 
12
            self.args = [arg for arg in l[1:] if arg]
 
13
        else:
 
14
            self.args = []
 
15
 
 
16
class LQSMessage(dict):
 
17
 
 
18
    def __init__(self, item=None, args=None, jsonvalue=None):
 
19
        dict.__init__(self)
 
20
        if jsonvalue:
 
21
            self.decode(jsonvalue)
 
22
        else:
 
23
            self['id'] = '%d_%d' % (int(time.time()), int(random.random()*1000000))
 
24
            self['item'] = item
 
25
            self['args'] = args
 
26
 
 
27
    def encode(self):
 
28
        return simplejson.dumps(self)
 
29
 
 
30
    def decode(self, value):
 
31
        self.update(simplejson.loads(value))
 
32
 
 
33
    def is_empty(self):
 
34
        if self['item'] == None:
 
35
            return True
 
36
        return False
 
37
 
 
38
class LQSServer(SocketServer.UDPServer):
 
39
 
 
40
    PORT = 5151
 
41
    TIMEOUT = 30
 
42
    MAXSIZE = 8192
 
43
 
 
44
    def __init__(self, server_address, RequestHandlerClass, iterator, args=None):
 
45
        server_address = (server_address, self.PORT)
 
46
        SocketServer.UDPServer.__init__(self, server_address, RequestHandlerClass)
 
47
        self.count = 0
 
48
        self.iterator = iterator
 
49
        self.args = args
 
50
        self.start = datetime.datetime.now()
 
51
        self.end = None
 
52
        self.extant = []
 
53
 
 
54
class LQSHandler(SocketServer.DatagramRequestHandler):
 
55
 
 
56
    def get_cmd(self):
 
57
        return LQSCommand(self.rfile.readline())
 
58
 
 
59
    def build_msg(self):
 
60
        if not self.server.iterator:
 
61
            return LQSMessage(None)
 
62
        try:
 
63
            item = self.server.iterator.next()
 
64
            msg = LQSMessage(item, self.server.args)
 
65
            return msg
 
66
        except StopIteration:
 
67
            self.server.iterator = None
 
68
            return LQSMessage(None)
 
69
 
 
70
    def respond(self, msg):
 
71
        self.wfile.write(msg.encode())
 
72
 
 
73
    def check_extant(self):
 
74
        if len(self.server.extant) == 0 and not self.server.iterator:
 
75
            self.server.end = datetime.datetime.now()
 
76
            delta = self.server.end - self.server.start
 
77
            print 'Total Processing Time: %s' % delta
 
78
            print 'Total Messages Processed: %d' % self.server.count
 
79
 
 
80
    def do_debug(self, cmd):
 
81
        args = {'extant' : self.server.extant,
 
82
                'count' : self.server.count}
 
83
        msg = LQSMessage('debug', args)
 
84
        self.respond(msg)
 
85
 
 
86
    def do_next(self, cmd):
 
87
        out_msg = self.build_msg()
 
88
        if not out_msg.is_empty():
 
89
            self.server.count += 1
 
90
            self.server.extant.append(out_msg['id'])
 
91
        self.respond(out_msg)
 
92
 
 
93
    def do_delete(self, cmd):
 
94
        if len(cmd.args) != 1:
 
95
            self.error(cmd, 'delete command requires message id')
 
96
        else:
 
97
            mid = cmd.args[0]
 
98
            try:
 
99
                self.server.extant.remove(mid)
 
100
            except ValueError:
 
101
                self.error(cmd, 'message id not found')
 
102
            args = {'deleted' : True}
 
103
            msg = LQSMessage(mid, args)
 
104
            self.respond(msg)
 
105
            self.check_extant()
 
106
 
 
107
    def error(self, cmd, error_msg=None):
 
108
        args = {'error_msg' : error_msg,
 
109
                'cmd_name' : cmd.name,
 
110
                'cmd_args' : cmd.args}
 
111
        msg = LQSMessage('error', args)
 
112
        self.respond(msg)
 
113
 
 
114
    def do_stop(self, cmd):
 
115
        sys.exit(0)
 
116
 
 
117
    def handle(self):
 
118
        cmd = self.get_cmd()
 
119
        if hasattr(self, 'do_%s' % cmd.name):
 
120
            method = getattr(self, 'do_%s' % cmd.name)
 
121
            method(cmd)
 
122
        else:
 
123
            self.error(cmd, 'unrecognized command')
 
124
 
 
125
class PersistHandler(LQSHandler):
 
126
 
 
127
    def build_msg(self):
 
128
        if not self.server.iterator:
 
129
            return LQSMessage(None)
 
130
        try:
 
131
            obj = self.server.iterator.next()
 
132
            msg = LQSMessage(obj.id, self.server.args)
 
133
            return msg
 
134
        except StopIteration:
 
135
            self.server.iterator = None
 
136
            return LQSMessage(None)
 
137
 
 
138
def test_file(path, args=None):
 
139
    l = os.listdir(path)
 
140
    if not args:
 
141
        args = {}
 
142
    args['path'] = path
 
143
    s = LQSServer('', LQSHandler, iter(l), args)
 
144
    print "Awaiting UDP messages on port %d" % s.PORT
 
145
    s.serve_forever()
 
146
 
 
147
def test_simple(n):
 
148
    l = range(0, n)
 
149
    s = LQSServer('', LQSHandler, iter(l), None)
 
150
    print "Awaiting UDP messages on port %d" % s.PORT
 
151
    s.serve_forever()
 
152