~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/idlelib/rpc.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""RPC Implemention, originally written for the Python Idle IDE
 
2
 
 
3
For security reasons, GvR requested that Idle's Python execution server process
 
4
connect to the Idle process, which listens for the connection.  Since Idle has
 
5
has only one client per server, this was not a limitation.
 
6
 
 
7
   +---------------------------------+ +-------------+
 
8
   | SocketServer.BaseRequestHandler | | SocketIO    |
 
9
   +---------------------------------+ +-------------+
 
10
                   ^                   | register()  |
 
11
                   |                   | unregister()|
 
12
                   |                   +-------------+
 
13
                   |                      ^  ^
 
14
                   |                      |  |
 
15
                   | + -------------------+  |
 
16
                   | |                       |
 
17
   +-------------------------+        +-----------------+
 
18
   | RPCHandler              |        | RPCClient       |
 
19
   | [attribute of RPCServer]|        |                 |
 
20
   +-------------------------+        +-----------------+
 
21
 
 
22
The RPCServer handler class is expected to provide register/unregister methods.
 
23
RPCHandler inherits the mix-in class SocketIO, which provides these methods.
 
24
 
 
25
See the Idle run.main() docstring for further information on how this was
 
26
accomplished in Idle.
 
27
 
 
28
"""
 
29
 
 
30
import sys
 
31
import os
 
32
import socket
 
33
import select
 
34
import SocketServer
 
35
import struct
 
36
import cPickle as pickle
 
37
import threading
 
38
import Queue
 
39
import traceback
 
40
import copy_reg
 
41
import types
 
42
import marshal
 
43
 
 
44
 
 
45
def unpickle_code(ms):
 
46
    co = marshal.loads(ms)
 
47
    assert isinstance(co, types.CodeType)
 
48
    return co
 
49
 
 
50
def pickle_code(co):
 
51
    assert isinstance(co, types.CodeType)
 
52
    ms = marshal.dumps(co)
 
53
    return unpickle_code, (ms,)
 
54
 
 
55
# XXX KBK 24Aug02 function pickling capability not used in Idle
 
56
#  def unpickle_function(ms):
 
57
#      return ms
 
58
 
 
59
#  def pickle_function(fn):
 
60
#      assert isinstance(fn, type.FunctionType)
 
61
#      return repr(fn)
 
62
 
 
63
copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
 
64
# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
 
65
 
 
66
BUFSIZE = 8*1024
 
67
LOCALHOST = '127.0.0.1'
 
68
 
 
69
class RPCServer(SocketServer.TCPServer):
 
70
 
 
71
    def __init__(self, addr, handlerclass=None):
 
72
        if handlerclass is None:
 
73
            handlerclass = RPCHandler
 
74
        SocketServer.TCPServer.__init__(self, addr, handlerclass)
 
75
 
 
76
    def server_bind(self):
 
77
        "Override TCPServer method, no bind() phase for connecting entity"
 
78
        pass
 
79
 
 
80
    def server_activate(self):
 
81
        """Override TCPServer method, connect() instead of listen()
 
82
 
 
83
        Due to the reversed connection, self.server_address is actually the
 
84
        address of the Idle Client to which we are connecting.
 
85
 
 
86
        """
 
87
        self.socket.connect(self.server_address)
 
88
 
 
89
    def get_request(self):
 
90
        "Override TCPServer method, return already connected socket"
 
91
        return self.socket, self.server_address
 
92
 
 
93
    def handle_error(self, request, client_address):
 
94
        """Override TCPServer method
 
95
 
 
96
        Error message goes to __stderr__.  No error message if exiting
 
97
        normally or socket raised EOF.  Other exceptions not handled in
 
98
        server code will cause os._exit.
 
99
 
 
100
        """
 
101
        try:
 
102
            raise
 
103
        except SystemExit:
 
104
            raise
 
105
        except:
 
106
            erf = sys.__stderr__
 
107
            print>>erf, '\n' + '-'*40
 
108
            print>>erf, 'Unhandled server exception!'
 
109
            print>>erf, 'Thread: %s' % threading.currentThread().getName()
 
110
            print>>erf, 'Client Address: ', client_address
 
111
            print>>erf, 'Request: ', repr(request)
 
112
            traceback.print_exc(file=erf)
 
113
            print>>erf, '\n*** Unrecoverable, server exiting!'
 
114
            print>>erf, '-'*40
 
115
            os._exit(0)
 
116
 
 
117
#----------------- end class RPCServer --------------------
 
118
 
 
119
objecttable = {}
 
120
request_queue = Queue.Queue(0)
 
121
response_queue = Queue.Queue(0)
 
122
 
 
123
 
 
124
class SocketIO(object):
 
125
 
 
126
    nextseq = 0
 
127
 
 
128
    def __init__(self, sock, objtable=None, debugging=None):
 
129
        self.sockthread = threading.currentThread()
 
130
        if debugging is not None:
 
131
            self.debugging = debugging
 
132
        self.sock = sock
 
133
        if objtable is None:
 
134
            objtable = objecttable
 
135
        self.objtable = objtable
 
136
        self.responses = {}
 
137
        self.cvars = {}
 
138
 
 
139
    def close(self):
 
140
        sock = self.sock
 
141
        self.sock = None
 
142
        if sock is not None:
 
143
            sock.close()
 
144
 
 
145
    def exithook(self):
 
146
        "override for specific exit action"
 
147
        os._exit()
 
148
 
 
149
    def debug(self, *args):
 
150
        if not self.debugging:
 
151
            return
 
152
        s = self.location + " " + str(threading.currentThread().getName())
 
153
        for a in args:
 
154
            s = s + " " + str(a)
 
155
        print>>sys.__stderr__, s
 
156
 
 
157
    def register(self, oid, object):
 
158
        self.objtable[oid] = object
 
159
 
 
160
    def unregister(self, oid):
 
161
        try:
 
162
            del self.objtable[oid]
 
163
        except KeyError:
 
164
            pass
 
165
 
 
166
    def localcall(self, seq, request):
 
167
        self.debug("localcall:", request)
 
168
        try:
 
169
            how, (oid, methodname, args, kwargs) = request
 
170
        except TypeError:
 
171
            return ("ERROR", "Bad request format")
 
172
        if not self.objtable.has_key(oid):
 
173
            return ("ERROR", "Unknown object id: %r" % (oid,))
 
174
        obj = self.objtable[oid]
 
175
        if methodname == "__methods__":
 
176
            methods = {}
 
177
            _getmethods(obj, methods)
 
178
            return ("OK", methods)
 
179
        if methodname == "__attributes__":
 
180
            attributes = {}
 
181
            _getattributes(obj, attributes)
 
182
            return ("OK", attributes)
 
183
        if not hasattr(obj, methodname):
 
184
            return ("ERROR", "Unsupported method name: %r" % (methodname,))
 
185
        method = getattr(obj, methodname)
 
186
        try:
 
187
            if how == 'CALL':
 
188
                ret = method(*args, **kwargs)
 
189
                if isinstance(ret, RemoteObject):
 
190
                    ret = remoteref(ret)
 
191
                return ("OK", ret)
 
192
            elif how == 'QUEUE':
 
193
                request_queue.put((seq, (method, args, kwargs)))
 
194
                return("QUEUED", None)
 
195
            else:
 
196
                return ("ERROR", "Unsupported message type: %s" % how)
 
197
        except SystemExit:
 
198
            raise
 
199
        except socket.error:
 
200
            raise
 
201
        except:
 
202
            msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
 
203
                  " Object: %s \n Method: %s \n Args: %s\n"
 
204
            print>>sys.__stderr__, msg % (oid, method, args)
 
205
            traceback.print_exc(file=sys.__stderr__)
 
206
            return ("EXCEPTION", None)
 
207
 
 
208
    def remotecall(self, oid, methodname, args, kwargs):
 
209
        self.debug("remotecall:asynccall: ", oid, methodname)
 
210
        seq = self.asynccall(oid, methodname, args, kwargs)
 
211
        return self.asyncreturn(seq)
 
212
 
 
213
    def remotequeue(self, oid, methodname, args, kwargs):
 
214
        self.debug("remotequeue:asyncqueue: ", oid, methodname)
 
215
        seq = self.asyncqueue(oid, methodname, args, kwargs)
 
216
        return self.asyncreturn(seq)
 
217
 
 
218
    def asynccall(self, oid, methodname, args, kwargs):
 
219
        request = ("CALL", (oid, methodname, args, kwargs))
 
220
        seq = self.newseq()
 
221
        if threading.currentThread() != self.sockthread:
 
222
            cvar = threading.Condition()
 
223
            self.cvars[seq] = cvar
 
224
        self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
 
225
        self.putmessage((seq, request))
 
226
        return seq
 
227
 
 
228
    def asyncqueue(self, oid, methodname, args, kwargs):
 
229
        request = ("QUEUE", (oid, methodname, args, kwargs))
 
230
        seq = self.newseq()
 
231
        if threading.currentThread() != self.sockthread:
 
232
            cvar = threading.Condition()
 
233
            self.cvars[seq] = cvar
 
234
        self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
 
235
        self.putmessage((seq, request))
 
236
        return seq
 
237
 
 
238
    def asyncreturn(self, seq):
 
239
        self.debug("asyncreturn:%d:call getresponse(): " % seq)
 
240
        response = self.getresponse(seq, wait=0.05)
 
241
        self.debug(("asyncreturn:%d:response: " % seq), response)
 
242
        return self.decoderesponse(response)
 
243
 
 
244
    def decoderesponse(self, response):
 
245
        how, what = response
 
246
        if how == "OK":
 
247
            return what
 
248
        if how == "QUEUED":
 
249
            return None
 
250
        if how == "EXCEPTION":
 
251
            self.debug("decoderesponse: EXCEPTION")
 
252
            return None
 
253
        if how == "EOF":
 
254
            self.debug("decoderesponse: EOF")
 
255
            self.decode_interrupthook()
 
256
            return None
 
257
        if how == "ERROR":
 
258
            self.debug("decoderesponse: Internal ERROR:", what)
 
259
            raise RuntimeError, what
 
260
        raise SystemError, (how, what)
 
261
 
 
262
    def decode_interrupthook(self):
 
263
        ""
 
264
        raise EOFError
 
265
 
 
266
    def mainloop(self):
 
267
        """Listen on socket until I/O not ready or EOF
 
268
 
 
269
        pollresponse() will loop looking for seq number None, which
 
270
        never comes, and exit on EOFError.
 
271
 
 
272
        """
 
273
        try:
 
274
            self.getresponse(myseq=None, wait=0.05)
 
275
        except EOFError:
 
276
            self.debug("mainloop:return")
 
277
            return
 
278
 
 
279
    def getresponse(self, myseq, wait):
 
280
        response = self._getresponse(myseq, wait)
 
281
        if response is not None:
 
282
            how, what = response
 
283
            if how == "OK":
 
284
                response = how, self._proxify(what)
 
285
        return response
 
286
 
 
287
    def _proxify(self, obj):
 
288
        if isinstance(obj, RemoteProxy):
 
289
            return RPCProxy(self, obj.oid)
 
290
        if isinstance(obj, types.ListType):
 
291
            return map(self._proxify, obj)
 
292
        # XXX Check for other types -- not currently needed
 
293
        return obj
 
294
 
 
295
    def _getresponse(self, myseq, wait):
 
296
        self.debug("_getresponse:myseq:", myseq)
 
297
        if threading.currentThread() is self.sockthread:
 
298
            # this thread does all reading of requests or responses
 
299
            while 1:
 
300
                response = self.pollresponse(myseq, wait)
 
301
                if response is not None:
 
302
                    return response
 
303
        else:
 
304
            # wait for notification from socket handling thread
 
305
            cvar = self.cvars[myseq]
 
306
            cvar.acquire()
 
307
            while not self.responses.has_key(myseq):
 
308
                cvar.wait()
 
309
            response = self.responses[myseq]
 
310
            self.debug("_getresponse:%s: thread woke up: response: %s" %
 
311
                       (myseq, response))
 
312
            del self.responses[myseq]
 
313
            del self.cvars[myseq]
 
314
            cvar.release()
 
315
            return response
 
316
 
 
317
    def newseq(self):
 
318
        self.nextseq = seq = self.nextseq + 2
 
319
        return seq
 
320
 
 
321
    def putmessage(self, message):
 
322
        self.debug("putmessage:%d:" % message[0])
 
323
        try:
 
324
            s = pickle.dumps(message)
 
325
        except pickle.PicklingError:
 
326
            print >>sys.__stderr__, "Cannot pickle:", repr(message)
 
327
            raise
 
328
        s = struct.pack("<i", len(s)) + s
 
329
        while len(s) > 0:
 
330
            try:
 
331
                r, w, x = select.select([], [self.sock], [])
 
332
                n = self.sock.send(s[:BUFSIZE])
 
333
            except (AttributeError, TypeError):
 
334
                raise IOError, "socket no longer exists"
 
335
            except socket.error:
 
336
                raise
 
337
            else:
 
338
                s = s[n:]
 
339
 
 
340
    buffer = ""
 
341
    bufneed = 4
 
342
    bufstate = 0 # meaning: 0 => reading count; 1 => reading data
 
343
 
 
344
    def pollpacket(self, wait):
 
345
        self._stage0()
 
346
        if len(self.buffer) < self.bufneed:
 
347
            r, w, x = select.select([self.sock.fileno()], [], [], wait)
 
348
            if len(r) == 0:
 
349
                return None
 
350
            try:
 
351
                s = self.sock.recv(BUFSIZE)
 
352
            except socket.error:
 
353
                raise EOFError
 
354
            if len(s) == 0:
 
355
                raise EOFError
 
356
            self.buffer += s
 
357
            self._stage0()
 
358
        return self._stage1()
 
359
 
 
360
    def _stage0(self):
 
361
        if self.bufstate == 0 and len(self.buffer) >= 4:
 
362
            s = self.buffer[:4]
 
363
            self.buffer = self.buffer[4:]
 
364
            self.bufneed = struct.unpack("<i", s)[0]
 
365
            self.bufstate = 1
 
366
 
 
367
    def _stage1(self):
 
368
        if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
 
369
            packet = self.buffer[:self.bufneed]
 
370
            self.buffer = self.buffer[self.bufneed:]
 
371
            self.bufneed = 4
 
372
            self.bufstate = 0
 
373
            return packet
 
374
 
 
375
    def pollmessage(self, wait):
 
376
        packet = self.pollpacket(wait)
 
377
        if packet is None:
 
378
            return None
 
379
        try:
 
380
            message = pickle.loads(packet)
 
381
        except pickle.UnpicklingError:
 
382
            print >>sys.__stderr__, "-----------------------"
 
383
            print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
 
384
            traceback.print_stack(file=sys.__stderr__)
 
385
            print >>sys.__stderr__, "-----------------------"
 
386
            raise
 
387
        return message
 
388
 
 
389
    def pollresponse(self, myseq, wait):
 
390
        """Handle messages received on the socket.
 
391
 
 
392
        Some messages received may be asynchronous 'call' or 'queue' requests,
 
393
        and some may be responses for other threads.
 
394
 
 
395
        'call' requests are passed to self.localcall() with the expectation of
 
396
        immediate execution, during which time the socket is not serviced.
 
397
 
 
398
        'queue' requests are used for tasks (which may block or hang) to be
 
399
        processed in a different thread.  These requests are fed into
 
400
        request_queue by self.localcall().  Responses to queued requests are
 
401
        taken from response_queue and sent across the link with the associated
 
402
        sequence numbers.  Messages in the queues are (sequence_number,
 
403
        request/response) tuples and code using this module removing messages
 
404
        from the request_queue is responsible for returning the correct
 
405
        sequence number in the response_queue.
 
406
 
 
407
        pollresponse() will loop until a response message with the myseq
 
408
        sequence number is received, and will save other responses in
 
409
        self.responses and notify the owning thread.
 
410
 
 
411
        """
 
412
        while 1:
 
413
            # send queued response if there is one available
 
414
            try:
 
415
                qmsg = response_queue.get(0)
 
416
            except Queue.Empty:
 
417
                pass
 
418
            else:
 
419
                seq, response = qmsg
 
420
                message = (seq, ('OK', response))
 
421
                self.putmessage(message)
 
422
            # poll for message on link
 
423
            try:
 
424
                message = self.pollmessage(wait)
 
425
                if message is None:  # socket not ready
 
426
                    return None
 
427
            except EOFError:
 
428
                self.handle_EOF()
 
429
                return None
 
430
            except AttributeError:
 
431
                return None
 
432
            seq, resq = message
 
433
            how = resq[0]
 
434
            self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
 
435
            # process or queue a request
 
436
            if how in ("CALL", "QUEUE"):
 
437
                self.debug("pollresponse:%d:localcall:call:" % seq)
 
438
                response = self.localcall(seq, resq)
 
439
                self.debug("pollresponse:%d:localcall:response:%s"
 
440
                           % (seq, response))
 
441
                if how == "CALL":
 
442
                    self.putmessage((seq, response))
 
443
                elif how == "QUEUE":
 
444
                    # don't acknowledge the 'queue' request!
 
445
                    pass
 
446
                continue
 
447
            # return if completed message transaction
 
448
            elif seq == myseq:
 
449
                return resq
 
450
            # must be a response for a different thread:
 
451
            else:
 
452
                cv = self.cvars.get(seq, None)
 
453
                # response involving unknown sequence number is discarded,
 
454
                # probably intended for prior incarnation of server
 
455
                if cv is not None:
 
456
                    cv.acquire()
 
457
                    self.responses[seq] = resq
 
458
                    cv.notify()
 
459
                    cv.release()
 
460
                continue
 
461
 
 
462
    def handle_EOF(self):
 
463
        "action taken upon link being closed by peer"
 
464
        self.EOFhook()
 
465
        self.debug("handle_EOF")
 
466
        for key in self.cvars:
 
467
            cv = self.cvars[key]
 
468
            cv.acquire()
 
469
            self.responses[key] = ('EOF', None)
 
470
            cv.notify()
 
471
            cv.release()
 
472
        # call our (possibly overridden) exit function
 
473
        self.exithook()
 
474
 
 
475
    def EOFhook(self):
 
476
        "Classes using rpc client/server can override to augment EOF action"
 
477
        pass
 
478
 
 
479
#----------------- end class SocketIO --------------------
 
480
 
 
481
class RemoteObject(object):
 
482
    # Token mix-in class
 
483
    pass
 
484
 
 
485
def remoteref(obj):
 
486
    oid = id(obj)
 
487
    objecttable[oid] = obj
 
488
    return RemoteProxy(oid)
 
489
 
 
490
class RemoteProxy(object):
 
491
 
 
492
    def __init__(self, oid):
 
493
        self.oid = oid
 
494
 
 
495
class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
 
496
 
 
497
    debugging = False
 
498
    location = "#S"  # Server
 
499
 
 
500
    def __init__(self, sock, addr, svr):
 
501
        svr.current_handler = self ## cgt xxx
 
502
        SocketIO.__init__(self, sock)
 
503
        SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
 
504
 
 
505
    def handle(self):
 
506
        "handle() method required by SocketServer"
 
507
        self.mainloop()
 
508
 
 
509
    def get_remote_proxy(self, oid):
 
510
        return RPCProxy(self, oid)
 
511
 
 
512
class RPCClient(SocketIO):
 
513
 
 
514
    debugging = False
 
515
    location = "#C"  # Client
 
516
 
 
517
    nextseq = 1 # Requests coming from the client are odd numbered
 
518
 
 
519
    def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
 
520
        self.listening_sock = socket.socket(family, type)
 
521
        self.listening_sock.setsockopt(socket.SOL_SOCKET,
 
522
                                       socket.SO_REUSEADDR, 1)
 
523
        self.listening_sock.bind(address)
 
524
        self.listening_sock.listen(1)
 
525
 
 
526
    def accept(self):
 
527
        working_sock, address = self.listening_sock.accept()
 
528
        if self.debugging:
 
529
            print>>sys.__stderr__, "****** Connection request from ", address
 
530
        if address[0] == LOCALHOST:
 
531
            SocketIO.__init__(self, working_sock)
 
532
        else:
 
533
            print>>sys.__stderr__, "** Invalid host: ", address
 
534
            raise socket.error
 
535
 
 
536
    def get_remote_proxy(self, oid):
 
537
        return RPCProxy(self, oid)
 
538
 
 
539
class RPCProxy(object):
 
540
 
 
541
    __methods = None
 
542
    __attributes = None
 
543
 
 
544
    def __init__(self, sockio, oid):
 
545
        self.sockio = sockio
 
546
        self.oid = oid
 
547
 
 
548
    def __getattr__(self, name):
 
549
        if self.__methods is None:
 
550
            self.__getmethods()
 
551
        if self.__methods.get(name):
 
552
            return MethodProxy(self.sockio, self.oid, name)
 
553
        if self.__attributes is None:
 
554
            self.__getattributes()
 
555
        if self.__attributes.has_key(name):
 
556
            value = self.sockio.remotecall(self.oid, '__getattribute__',
 
557
                                           (name,), {})
 
558
            return value
 
559
        else:
 
560
            raise AttributeError, name
 
561
 
 
562
    def __getattributes(self):
 
563
        self.__attributes = self.sockio.remotecall(self.oid,
 
564
                                                "__attributes__", (), {})
 
565
 
 
566
    def __getmethods(self):
 
567
        self.__methods = self.sockio.remotecall(self.oid,
 
568
                                                "__methods__", (), {})
 
569
 
 
570
def _getmethods(obj, methods):
 
571
    # Helper to get a list of methods from an object
 
572
    # Adds names to dictionary argument 'methods'
 
573
    for name in dir(obj):
 
574
        attr = getattr(obj, name)
 
575
        if callable(attr):
 
576
            methods[name] = 1
 
577
    if type(obj) == types.InstanceType:
 
578
        _getmethods(obj.__class__, methods)
 
579
    if type(obj) == types.ClassType:
 
580
        for super in obj.__bases__:
 
581
            _getmethods(super, methods)
 
582
 
 
583
def _getattributes(obj, attributes):
 
584
    for name in dir(obj):
 
585
        attr = getattr(obj, name)
 
586
        if not callable(attr):
 
587
            attributes[name] = 1
 
588
 
 
589
class MethodProxy(object):
 
590
 
 
591
    def __init__(self, sockio, oid, name):
 
592
        self.sockio = sockio
 
593
        self.oid = oid
 
594
        self.name = name
 
595
 
 
596
    def __call__(self, *args, **kwargs):
 
597
        value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
 
598
        return value
 
599
 
 
600
 
 
601
# XXX KBK 09Sep03  We need a proper unit test for this module.  Previously
 
602
#                  existing test code was removed at Rev 1.27.