1
import SocketServer, os, datetime, sys, random, time
6
def __init__(self, line):
8
self.line = self.raw_line.strip()
9
l = self.line.split(' ')
12
self.args = [arg for arg in l[1:] if arg]
16
class LQSMessage(dict):
18
def __init__(self, item=None, args=None, jsonvalue=None):
21
self.decode(jsonvalue)
23
self['id'] = '%d_%d' % (int(time.time()), int(random.random()*1000000))
28
return simplejson.dumps(self)
30
def decode(self, value):
31
self.update(simplejson.loads(value))
34
if self['item'] == None:
38
class LQSServer(SocketServer.UDPServer):
44
def __init__(self, server_address, RequestHandlerClass, iterator, args=None):
45
server_address = (server_address, self.PORT)
46
SocketServer.UDPServer.__init__(self, server_address, RequestHandlerClass)
48
self.iterator = iterator
50
self.start = datetime.datetime.now()
54
class LQSHandler(SocketServer.DatagramRequestHandler):
57
return LQSCommand(self.rfile.readline())
60
if not self.server.iterator:
61
return LQSMessage(None)
63
item = self.server.iterator.next()
64
msg = LQSMessage(item, self.server.args)
67
self.server.iterator = None
68
return LQSMessage(None)
70
def respond(self, msg):
71
self.wfile.write(msg.encode())
73
def check_extant(self):
74
if len(self.server.extant) == 0 and not self.server.iterator:
75
self.server.end = datetime.datetime.now()
76
delta = self.server.end - self.server.start
77
print 'Total Processing Time: %s' % delta
78
print 'Total Messages Processed: %d' % self.server.count
80
def do_debug(self, cmd):
81
args = {'extant' : self.server.extant,
82
'count' : self.server.count}
83
msg = LQSMessage('debug', args)
86
def do_next(self, cmd):
87
out_msg = self.build_msg()
88
if not out_msg.is_empty():
89
self.server.count += 1
90
self.server.extant.append(out_msg['id'])
93
def do_delete(self, cmd):
94
if len(cmd.args) != 1:
95
self.error(cmd, 'delete command requires message id')
99
self.server.extant.remove(mid)
101
self.error(cmd, 'message id not found')
102
args = {'deleted' : True}
103
msg = LQSMessage(mid, args)
107
def error(self, cmd, error_msg=None):
108
args = {'error_msg' : error_msg,
109
'cmd_name' : cmd.name,
110
'cmd_args' : cmd.args}
111
msg = LQSMessage('error', args)
114
def do_stop(self, cmd):
119
if hasattr(self, 'do_%s' % cmd.name):
120
method = getattr(self, 'do_%s' % cmd.name)
123
self.error(cmd, 'unrecognized command')
125
class PersistHandler(LQSHandler):
128
if not self.server.iterator:
129
return LQSMessage(None)
131
obj = self.server.iterator.next()
132
msg = LQSMessage(obj.id, self.server.args)
134
except StopIteration:
135
self.server.iterator = None
136
return LQSMessage(None)
138
def test_file(path, args=None):
143
s = LQSServer('', LQSHandler, iter(l), args)
144
print "Awaiting UDP messages on port %d" % s.PORT
149
s = LQSServer('', LQSHandler, iter(l), None)
150
print "Awaiting UDP messages on port %d" % s.PORT