1
"""RPC Implemention, originally written for the Python Idle IDE
3
For security reasons, GvR requested that Idle's Python execution server process
4
connect to the Idle process, which listens for the connection. Since Idle has
5
has only one client per server, this was not a limitation.
7
+---------------------------------+ +-------------+
8
| SocketServer.BaseRequestHandler | | SocketIO |
9
+---------------------------------+ +-------------+
15
| + -------------------+ |
17
+-------------------------+ +-----------------+
18
| RPCHandler | | RPCClient |
19
| [attribute of RPCServer]| | |
20
+-------------------------+ +-----------------+
22
The RPCServer handler class is expected to provide register/unregister methods.
23
RPCHandler inherits the mix-in class SocketIO, which provides these methods.
25
See the Idle run.main() docstring for further information on how this was
36
import cPickle as pickle
45
def unpickle_code(ms):
46
co = marshal.loads(ms)
47
assert isinstance(co, types.CodeType)
51
assert isinstance(co, types.CodeType)
52
ms = marshal.dumps(co)
53
return unpickle_code, (ms,)
55
# XXX KBK 24Aug02 function pickling capability not used in Idle
56
# def unpickle_function(ms):
59
# def pickle_function(fn):
60
# assert isinstance(fn, type.FunctionType)
63
copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
64
# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
67
LOCALHOST = '127.0.0.1'
69
class RPCServer(SocketServer.TCPServer):
71
def __init__(self, addr, handlerclass=None):
72
if handlerclass is None:
73
handlerclass = RPCHandler
74
SocketServer.TCPServer.__init__(self, addr, handlerclass)
76
def server_bind(self):
77
"Override TCPServer method, no bind() phase for connecting entity"
80
def server_activate(self):
81
"""Override TCPServer method, connect() instead of listen()
83
Due to the reversed connection, self.server_address is actually the
84
address of the Idle Client to which we are connecting.
87
self.socket.connect(self.server_address)
89
def get_request(self):
90
"Override TCPServer method, return already connected socket"
91
return self.socket, self.server_address
93
def handle_error(self, request, client_address):
94
"""Override TCPServer method
96
Error message goes to __stderr__. No error message if exiting
97
normally or socket raised EOF. Other exceptions not handled in
98
server code will cause os._exit.
107
print>>erf, '\n' + '-'*40
108
print>>erf, 'Unhandled server exception!'
109
print>>erf, 'Thread: %s' % threading.currentThread().getName()
110
print>>erf, 'Client Address: ', client_address
111
print>>erf, 'Request: ', repr(request)
112
traceback.print_exc(file=erf)
113
print>>erf, '\n*** Unrecoverable, server exiting!'
117
#----------------- end class RPCServer --------------------
120
request_queue = Queue.Queue(0)
121
response_queue = Queue.Queue(0)
124
class SocketIO(object):
128
def __init__(self, sock, objtable=None, debugging=None):
129
self.sockthread = threading.currentThread()
130
if debugging is not None:
131
self.debugging = debugging
134
objtable = objecttable
135
self.objtable = objtable
146
"override for specific exit action"
149
def debug(self, *args):
150
if not self.debugging:
152
s = self.location + " " + str(threading.currentThread().getName())
155
print>>sys.__stderr__, s
157
def register(self, oid, object):
158
self.objtable[oid] = object
160
def unregister(self, oid):
162
del self.objtable[oid]
166
def localcall(self, seq, request):
167
self.debug("localcall:", request)
169
how, (oid, methodname, args, kwargs) = request
171
return ("ERROR", "Bad request format")
172
if not self.objtable.has_key(oid):
173
return ("ERROR", "Unknown object id: %r" % (oid,))
174
obj = self.objtable[oid]
175
if methodname == "__methods__":
177
_getmethods(obj, methods)
178
return ("OK", methods)
179
if methodname == "__attributes__":
181
_getattributes(obj, attributes)
182
return ("OK", attributes)
183
if not hasattr(obj, methodname):
184
return ("ERROR", "Unsupported method name: %r" % (methodname,))
185
method = getattr(obj, methodname)
188
ret = method(*args, **kwargs)
189
if isinstance(ret, RemoteObject):
193
request_queue.put((seq, (method, args, kwargs)))
194
return("QUEUED", None)
196
return ("ERROR", "Unsupported message type: %s" % how)
202
msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
203
" Object: %s \n Method: %s \n Args: %s\n"
204
print>>sys.__stderr__, msg % (oid, method, args)
205
traceback.print_exc(file=sys.__stderr__)
206
return ("EXCEPTION", None)
208
def remotecall(self, oid, methodname, args, kwargs):
209
self.debug("remotecall:asynccall: ", oid, methodname)
210
seq = self.asynccall(oid, methodname, args, kwargs)
211
return self.asyncreturn(seq)
213
def remotequeue(self, oid, methodname, args, kwargs):
214
self.debug("remotequeue:asyncqueue: ", oid, methodname)
215
seq = self.asyncqueue(oid, methodname, args, kwargs)
216
return self.asyncreturn(seq)
218
def asynccall(self, oid, methodname, args, kwargs):
219
request = ("CALL", (oid, methodname, args, kwargs))
221
if threading.currentThread() != self.sockthread:
222
cvar = threading.Condition()
223
self.cvars[seq] = cvar
224
self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
225
self.putmessage((seq, request))
228
def asyncqueue(self, oid, methodname, args, kwargs):
229
request = ("QUEUE", (oid, methodname, args, kwargs))
231
if threading.currentThread() != self.sockthread:
232
cvar = threading.Condition()
233
self.cvars[seq] = cvar
234
self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
235
self.putmessage((seq, request))
238
def asyncreturn(self, seq):
239
self.debug("asyncreturn:%d:call getresponse(): " % seq)
240
response = self.getresponse(seq, wait=0.05)
241
self.debug(("asyncreturn:%d:response: " % seq), response)
242
return self.decoderesponse(response)
244
def decoderesponse(self, response):
250
if how == "EXCEPTION":
251
self.debug("decoderesponse: EXCEPTION")
254
self.debug("decoderesponse: EOF")
255
self.decode_interrupthook()
258
self.debug("decoderesponse: Internal ERROR:", what)
259
raise RuntimeError, what
260
raise SystemError, (how, what)
262
def decode_interrupthook(self):
267
"""Listen on socket until I/O not ready or EOF
269
pollresponse() will loop looking for seq number None, which
270
never comes, and exit on EOFError.
274
self.getresponse(myseq=None, wait=0.05)
276
self.debug("mainloop:return")
279
def getresponse(self, myseq, wait):
280
response = self._getresponse(myseq, wait)
281
if response is not None:
284
response = how, self._proxify(what)
287
def _proxify(self, obj):
288
if isinstance(obj, RemoteProxy):
289
return RPCProxy(self, obj.oid)
290
if isinstance(obj, types.ListType):
291
return map(self._proxify, obj)
292
# XXX Check for other types -- not currently needed
295
def _getresponse(self, myseq, wait):
296
self.debug("_getresponse:myseq:", myseq)
297
if threading.currentThread() is self.sockthread:
298
# this thread does all reading of requests or responses
300
response = self.pollresponse(myseq, wait)
301
if response is not None:
304
# wait for notification from socket handling thread
305
cvar = self.cvars[myseq]
307
while not self.responses.has_key(myseq):
309
response = self.responses[myseq]
310
self.debug("_getresponse:%s: thread woke up: response: %s" %
312
del self.responses[myseq]
313
del self.cvars[myseq]
318
self.nextseq = seq = self.nextseq + 2
321
def putmessage(self, message):
322
self.debug("putmessage:%d:" % message[0])
324
s = pickle.dumps(message)
325
except pickle.PicklingError:
326
print >>sys.__stderr__, "Cannot pickle:", repr(message)
328
s = struct.pack("<i", len(s)) + s
331
r, w, x = select.select([], [self.sock], [])
332
n = self.sock.send(s[:BUFSIZE])
333
except (AttributeError, TypeError):
334
raise IOError, "socket no longer exists"
342
bufstate = 0 # meaning: 0 => reading count; 1 => reading data
344
def pollpacket(self, wait):
346
if len(self.buffer) < self.bufneed:
347
r, w, x = select.select([self.sock.fileno()], [], [], wait)
351
s = self.sock.recv(BUFSIZE)
358
return self._stage1()
361
if self.bufstate == 0 and len(self.buffer) >= 4:
363
self.buffer = self.buffer[4:]
364
self.bufneed = struct.unpack("<i", s)[0]
368
if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
369
packet = self.buffer[:self.bufneed]
370
self.buffer = self.buffer[self.bufneed:]
375
def pollmessage(self, wait):
376
packet = self.pollpacket(wait)
380
message = pickle.loads(packet)
381
except pickle.UnpicklingError:
382
print >>sys.__stderr__, "-----------------------"
383
print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
384
traceback.print_stack(file=sys.__stderr__)
385
print >>sys.__stderr__, "-----------------------"
389
def pollresponse(self, myseq, wait):
390
"""Handle messages received on the socket.
392
Some messages received may be asynchronous 'call' or 'queue' requests,
393
and some may be responses for other threads.
395
'call' requests are passed to self.localcall() with the expectation of
396
immediate execution, during which time the socket is not serviced.
398
'queue' requests are used for tasks (which may block or hang) to be
399
processed in a different thread. These requests are fed into
400
request_queue by self.localcall(). Responses to queued requests are
401
taken from response_queue and sent across the link with the associated
402
sequence numbers. Messages in the queues are (sequence_number,
403
request/response) tuples and code using this module removing messages
404
from the request_queue is responsible for returning the correct
405
sequence number in the response_queue.
407
pollresponse() will loop until a response message with the myseq
408
sequence number is received, and will save other responses in
409
self.responses and notify the owning thread.
413
# send queued response if there is one available
415
qmsg = response_queue.get(0)
420
message = (seq, ('OK', response))
421
self.putmessage(message)
422
# poll for message on link
424
message = self.pollmessage(wait)
425
if message is None: # socket not ready
430
except AttributeError:
434
self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
435
# process or queue a request
436
if how in ("CALL", "QUEUE"):
437
self.debug("pollresponse:%d:localcall:call:" % seq)
438
response = self.localcall(seq, resq)
439
self.debug("pollresponse:%d:localcall:response:%s"
442
self.putmessage((seq, response))
444
# don't acknowledge the 'queue' request!
447
# return if completed message transaction
450
# must be a response for a different thread:
452
cv = self.cvars.get(seq, None)
453
# response involving unknown sequence number is discarded,
454
# probably intended for prior incarnation of server
457
self.responses[seq] = resq
462
def handle_EOF(self):
463
"action taken upon link being closed by peer"
465
self.debug("handle_EOF")
466
for key in self.cvars:
469
self.responses[key] = ('EOF', None)
472
# call our (possibly overridden) exit function
476
"Classes using rpc client/server can override to augment EOF action"
479
#----------------- end class SocketIO --------------------
481
class RemoteObject(object):
487
objecttable[oid] = obj
488
return RemoteProxy(oid)
490
class RemoteProxy(object):
492
def __init__(self, oid):
495
class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
498
location = "#S" # Server
500
def __init__(self, sock, addr, svr):
501
svr.current_handler = self ## cgt xxx
502
SocketIO.__init__(self, sock)
503
SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
506
"handle() method required by SocketServer"
509
def get_remote_proxy(self, oid):
510
return RPCProxy(self, oid)
512
class RPCClient(SocketIO):
515
location = "#C" # Client
517
nextseq = 1 # Requests coming from the client are odd numbered
519
def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
520
self.listening_sock = socket.socket(family, type)
521
self.listening_sock.setsockopt(socket.SOL_SOCKET,
522
socket.SO_REUSEADDR, 1)
523
self.listening_sock.bind(address)
524
self.listening_sock.listen(1)
527
working_sock, address = self.listening_sock.accept()
529
print>>sys.__stderr__, "****** Connection request from ", address
530
if address[0] == LOCALHOST:
531
SocketIO.__init__(self, working_sock)
533
print>>sys.__stderr__, "** Invalid host: ", address
536
def get_remote_proxy(self, oid):
537
return RPCProxy(self, oid)
539
class RPCProxy(object):
544
def __init__(self, sockio, oid):
548
def __getattr__(self, name):
549
if self.__methods is None:
551
if self.__methods.get(name):
552
return MethodProxy(self.sockio, self.oid, name)
553
if self.__attributes is None:
554
self.__getattributes()
555
if self.__attributes.has_key(name):
556
value = self.sockio.remotecall(self.oid, '__getattribute__',
560
raise AttributeError, name
562
def __getattributes(self):
563
self.__attributes = self.sockio.remotecall(self.oid,
564
"__attributes__", (), {})
566
def __getmethods(self):
567
self.__methods = self.sockio.remotecall(self.oid,
568
"__methods__", (), {})
570
def _getmethods(obj, methods):
571
# Helper to get a list of methods from an object
572
# Adds names to dictionary argument 'methods'
573
for name in dir(obj):
574
attr = getattr(obj, name)
577
if type(obj) == types.InstanceType:
578
_getmethods(obj.__class__, methods)
579
if type(obj) == types.ClassType:
580
for super in obj.__bases__:
581
_getmethods(super, methods)
583
def _getattributes(obj, attributes):
584
for name in dir(obj):
585
attr = getattr(obj, name)
586
if not callable(attr):
589
class MethodProxy(object):
591
def __init__(self, sockio, oid, name):
596
def __call__(self, *args, **kwargs):
597
value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
601
# XXX KBK 09Sep03 We need a proper unit test for this module. Previously
602
# existing test code was removed at Rev 1.27.