1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
from collections import deque
import logging
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warn,)
class RPCStream(object):
def __init__(self, stream):
self.stream = stream
self.pending_requests = {}
self.next_request_id = 1
self.interrupted = False
self.stopped = False
self.running = False
self.posted_notifications = deque()
def configure(self, vim):
self.stream.configure(vim)
def post(self, name, args):
self.posted_notifications.append((name, args,))
self.stream.interrupt()
def send(self, method, args, response_cb):
request_id = self.next_request_id
# Update request id
self.next_request_id = request_id + 1
# Send the request
self.stream.send([0, request_id, method, args])
# set the callback
self.pending_requests[request_id] = response_cb
def loop_start(self, request_cb, notification_cb, error_cb):
def msg_cb(msg):
msg_type = msg[0]
if msg_type == 0:
# request
# - msg[1]: id
# - msg[2]: method name
# - msg[3]: arguments
debug('received request: %s, %s', msg[2], msg[3])
request_cb(msg[2], msg[3], reply_fn(self.stream, msg[1]))
elif msg_type == 1:
# response to a previous request:
# - msg[1]: the id
# - msg[2]: error(if any)
# - msg[3]: result(if not errored)
debug('received response: %s, %s', msg[2], msg[3])
self.pending_requests.pop(msg[1])(msg[2], msg[3])
elif msg_type == 2:
# notification/event
# - msg[1]: event name
# - msg[2]: arguments
debug('received notification: %s, %s', msg[1], msg[2])
notification_cb(msg[1], msg[2])
else:
error = 'Received invalid message %s' % msg
warn(error)
raise Exception(error)
self._run(msg_cb, notification_cb, error_cb)
debug('exiting rpc stream loop')
def loop_stop(self):
self.stopped = True
if self.running:
self.stream.loop_stop()
def _run(self, msg_cb, notification_cb, error_cb):
self.stopped = False
while not self.stopped:
if self.posted_notifications:
notification_cb(*self.posted_notifications.popleft())
continue
self.running = True
self.stream.loop_start(msg_cb, error_cb)
self.running = False
def reply_fn(stream, request_id):
def reply(value, error=False):
if error:
resp = [1, request_id, value, None]
else:
resp = [1, request_id, None, value]
stream.send(resp)
return reply
|