1
from collections import deque
4
logger = logging.getLogger(__name__)
5
debug, info, warn = (logger.debug, logger.info, logger.warn,)
7
class RPCStream(object):
8
def __init__(self, stream):
10
self.pending_requests = {}
11
self.next_request_id = 1
12
self.interrupted = False
13
self.posted_notifications = deque()
16
def post(self, name, args):
17
self.posted_notifications.append((name, args,))
18
self.stream.interrupt()
21
def send(self, method, args, response_cb):
22
request_id = self.next_request_id
24
self.next_request_id = request_id + 1
26
self.stream.send([0, request_id, method, args])
28
self.pending_requests[request_id] = response_cb
31
def loop_start(self, request_cb, notification_cb, error_cb):
37
# - msg[2]: method name
39
debug('received request: %s, %s', msg[2], msg[3])
40
request_cb(msg[2], msg[3], reply_fn(self.stream, msg[1]))
42
# response to a previous request:
44
# - msg[2]: error(if any)
45
# - msg[3]: result(if not errored)
46
debug('received response: %s, %s', msg[2], msg[3])
47
self.pending_requests.pop(msg[1])(msg[2], msg[3])
50
# - msg[1]: event name
52
debug('received notification: %s, %s', msg[1], msg[2])
53
notification_cb(msg[1], msg[2])
55
error = 'Received invalid message %s' % msg
57
raise Exception(error)
59
self.stream.loop_start(msg_cb, error_cb)
61
while self.posted_notifications:
62
notification_cb(*self.posted_notifications.popleft())
63
self.stream.loop_start(msg_cb, error_cb)
67
self.stream.loop_stop()
70
def reply_fn(stream, request_id):
71
def reply(value, error=False):
73
resp = [1, request_id, value, None]
75
resp = [1, request_id, None, value]