~cmiller/ubuntu/quantal/deluge/fix-parameter-move-storage

« back to all changes in this revision

Viewing changes to deluge/core/rpcserver.py

  • Committer: Bazaar Package Importer
  • Author(s): Cristian Greco
  • Date: 2009-11-13 02:39:45 UTC
  • mfrom: (4.1.7 squeeze)
  • Revision ID: james.westby@ubuntu.com-20091113023945-te1bybo2912ejzuc
Tags: 1.2.0~rc3-4
* debian/control: bump build-dep on python-setuptools to (>= 0.6c9).
* debian/patches:
  - 25_r5921_fastresume_files.patch
    new, should fix problems with fresh configs;
  - 30_r5931_ipc_lockfile.patch:
    new, should fix an issue where Deluge will fail to start if there is a
    stale ipc lockfile. (Closes: #555849)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#
2
2
# rpcserver.py
3
3
#
4
 
# Copyright (C) 2008 Andrew Resch <andrewresch@gmail.com>
 
4
# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com>
5
5
#
6
6
# Deluge is free software.
7
7
#
31
31
#    this exception statement from your version. If you delete this exception
32
32
#    statement from all source files in the program, then also delete it here.
33
33
#
34
 
 
35
34
#
36
35
 
37
 
 
38
 
import gobject
39
 
 
40
 
from deluge.SimpleXMLRPCServer import SimpleXMLRPCServer
41
 
from deluge.SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
42
 
from SocketServer import ThreadingMixIn
43
 
from base64 import decodestring, encodestring
44
 
 
 
36
"""RPCServer Module"""
 
37
 
 
38
import sys
 
39
import zlib
 
40
import os
 
41
import stat
 
42
import traceback
 
43
 
 
44
from twisted.internet.protocol import Factory, Protocol
 
45
from twisted.internet import ssl, reactor, defer
 
46
 
 
47
from OpenSSL import crypto, SSL
 
48
from types import FunctionType
 
49
 
 
50
import deluge.rencode as rencode
45
51
from deluge.log import LOG as log
 
52
 
46
53
import deluge.component as component
47
54
import deluge.configmanager
48
 
 
49
 
def export(func):
50
 
    func._rpcserver_export = True
51
 
    return func
 
55
from deluge.core.authmanager import AUTH_LEVEL_NONE, AUTH_LEVEL_DEFAULT
 
56
 
 
57
RPC_RESPONSE = 1
 
58
RPC_ERROR = 2
 
59
RPC_EVENT = 3
 
60
 
 
61
def export(auth_level=AUTH_LEVEL_DEFAULT):
 
62
    """
 
63
    Decorator function to register an object's method as an RPC.  The object
 
64
    will need to be registered with an :class:`RPCServer` to be effective.
 
65
 
 
66
    :param func: the function to export
 
67
    :type func: function
 
68
    :param auth_level: the auth level required to call this method
 
69
    :type auth_level: int
 
70
 
 
71
    """
 
72
    def wrap(func, *args, **kwargs):
 
73
        func._rpcserver_export = True
 
74
        func._rpcserver_auth_level = auth_level
 
75
        doc = func.__doc__
 
76
        func.__doc__ = "**RPC Exported Function** (*Auth Level: %s*)\n\n" % auth_level
 
77
        if doc:
 
78
            func.__doc__ += doc
 
79
 
 
80
        return func
 
81
 
 
82
    if type(auth_level) is FunctionType:
 
83
        func = auth_level
 
84
        auth_level = AUTH_LEVEL_DEFAULT
 
85
        return wrap(func)
 
86
    else:
 
87
        return wrap
 
88
 
 
89
class DelugeError(Exception):
 
90
    pass
 
91
 
 
92
class NotAuthorizedError(DelugeError):
 
93
    pass
 
94
 
 
95
class ServerContextFactory(object):
 
96
    def getContext(self):
 
97
        """
 
98
        Create an SSL context.
 
99
 
 
100
        This loads the servers cert/private key SSL files for use with the
 
101
        SSL transport.
 
102
        """
 
103
        ssl_dir = deluge.configmanager.get_config_dir("ssl")
 
104
        ctx = SSL.Context(SSL.SSLv3_METHOD)
 
105
        ctx.use_certificate_file(os.path.join(ssl_dir, "daemon.cert"))
 
106
        ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey"))
 
107
        return ctx
 
108
 
 
109
class DelugeRPCProtocol(Protocol):
 
110
    __buffer = None
 
111
 
 
112
    def dataReceived(self, data):
 
113
        """
 
114
        This method is called whenever data is received from a client.  The
 
115
        only message that a client sends to the server is a RPC Request message.
 
116
        If the RPC Request message is valid, then the method is called in a thread
 
117
        with :meth:`dispatch`.
 
118
 
 
119
        :param data: the data from the client. It should be a zlib compressed
 
120
            rencoded string.
 
121
        :type data: str
 
122
 
 
123
        """
 
124
        if self.__buffer:
 
125
            # We have some data from the last dataReceived() so lets prepend it
 
126
            data = self.__buffer + data
 
127
            self.__buffer = None
 
128
 
 
129
        while data:
 
130
            dobj = zlib.decompressobj()
 
131
            try:
 
132
                request = rencode.loads(dobj.decompress(data))
 
133
            except Exception, e:
 
134
                log.debug("Received possible invalid message (%r): %s", data, e)
 
135
                # This could be cut-off data, so we'll save this in the buffer
 
136
                # and try to prepend it on the next dataReceived()
 
137
                self.__buffer = data
 
138
                return
 
139
            else:
 
140
                data = dobj.unused_data
 
141
 
 
142
            if type(request) is not tuple:
 
143
                log.debug("Received invalid message: type is not tuple")
 
144
                return
 
145
 
 
146
            if len(request) < 1:
 
147
                log.debug("Received invalid message: there are no items")
 
148
                return
 
149
 
 
150
            for call in request:
 
151
                if len(call) != 4:
 
152
                    log.debug("Received invalid rpc request: number of items in request is %s", len(call))
 
153
                    continue
 
154
 
 
155
                # Format the RPCRequest message for debug printing
 
156
                try:
 
157
                    s = call[1] + "("
 
158
                    if call[2]:
 
159
                        s += ", ".join([str(x) for x in call[2]])
 
160
                    if call[3]:
 
161
                        if call[2]:
 
162
                            s += ", "
 
163
                        s += ", ".join([key + "=" + str(value) for key, value in call[3].items()])
 
164
                    s += ")"
 
165
                except UnicodeEncodeError:
 
166
                    pass
 
167
                    #log.debug("RPCRequest had some non-ascii text..")
 
168
                else:
 
169
                    pass
 
170
                    #log.debug("RPCRequest: %s", s)
 
171
 
 
172
                reactor.callLater(0, self.dispatch, *call)
 
173
 
 
174
    def sendData(self, data):
 
175
        """
 
176
        Sends the data to the client.
 
177
 
 
178
        :param data: the object that is to be sent to the client.  This should
 
179
            be one of the RPC message types.
 
180
 
 
181
        """
 
182
        self.transport.write(zlib.compress(rencode.dumps(data)))
 
183
 
 
184
    def connectionMade(self):
 
185
        """
 
186
        This method is called when a new client connects.
 
187
        """
 
188
        peer = self.transport.getPeer()
 
189
        log.info("Deluge Client connection made from: %s:%s", peer.host, peer.port)
 
190
        # Set the initial auth level of this session to AUTH_LEVEL_NONE
 
191
        self.factory.authorized_sessions[self.transport.sessionno] = AUTH_LEVEL_NONE
 
192
 
 
193
    def connectionLost(self, reason):
 
194
        """
 
195
        This method is called when the client is disconnected.
 
196
 
 
197
        :param reason: the reason the client disconnected.
 
198
        :type reason: str
 
199
 
 
200
        """
 
201
 
 
202
        # We need to remove this session from various dicts
 
203
        del self.factory.authorized_sessions[self.transport.sessionno]
 
204
        if self.transport.sessionno in self.factory.session_protocols:
 
205
            del self.factory.session_protocols[self.transport.sessionno]
 
206
        if self.transport.sessionno in self.factory.interested_events:
 
207
            del self.factory.interested_events[self.transport.sessionno]
 
208
 
 
209
        log.info("Deluge client disconnected: %s", reason.value)
 
210
 
 
211
    def dispatch(self, request_id, method, args, kwargs):
 
212
        """
 
213
        This method is run when a RPC Request is made.  It will run the local method
 
214
        and will send either a RPC Response or RPC Error back to the client.
 
215
 
 
216
        :param request_id: the request_id from the client (sent in the RPC Request)
 
217
        :type request_id: int
 
218
        :param method: the local method to call. It must be registered with
 
219
            the :class:`RPCServer`.
 
220
        :type method: str
 
221
        :param args: the arguments to pass to `method`
 
222
        :type args: list
 
223
        :param kwargs: the keyword-arguments to pass to `method`
 
224
        :type kwargs: dict
 
225
 
 
226
        """
 
227
        def sendError():
 
228
            """
 
229
            Sends an error response with the contents of the exception that was raised.
 
230
            """
 
231
            exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
 
232
 
 
233
            self.sendData((
 
234
                RPC_ERROR,
 
235
                request_id,
 
236
                (exceptionType.__name__,
 
237
                exceptionValue.args[0] if len(exceptionValue.args) == 1 else "",
 
238
                "".join(traceback.format_tb(exceptionTraceback)))
 
239
            ))
 
240
 
 
241
        if method == "daemon.login":
 
242
            # This is a special case and used in the initial connection process
 
243
            # We need to authenticate the user here
 
244
            try:
 
245
                ret = component.get("AuthManager").authorize(*args, **kwargs)
 
246
                if ret:
 
247
                    self.factory.authorized_sessions[self.transport.sessionno] = ret
 
248
                    self.factory.session_protocols[self.transport.sessionno] = self
 
249
            except Exception, e:
 
250
                sendError()
 
251
                log.exception(e)
 
252
            else:
 
253
                self.sendData((RPC_RESPONSE, request_id, (ret)))
 
254
                if not ret:
 
255
                    self.transport.loseConnection()
 
256
            finally:
 
257
                return
 
258
        elif method == "daemon.set_event_interest" and self.transport.sessionno in self.factory.authorized_sessions:
 
259
            # This special case is to allow clients to set which events they are
 
260
            # interested in receiving.
 
261
            # We are expecting a sequence from the client.
 
262
            try:
 
263
                if self.transport.sessionno not in self.factory.interested_events:
 
264
                    self.factory.interested_events[self.transport.sessionno] = []
 
265
                self.factory.interested_events[self.transport.sessionno].extend(args[0])
 
266
            except Exception, e:
 
267
                sendError()
 
268
            else:
 
269
                self.sendData((RPC_RESPONSE, request_id, (True)))
 
270
            finally:
 
271
                return
 
272
 
 
273
        if method in self.factory.methods and self.transport.sessionno in self.factory.authorized_sessions:
 
274
            try:
 
275
                method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level
 
276
                auth_level = self.factory.authorized_sessions[self.transport.sessionno]
 
277
                if auth_level < method_auth_requirement:
 
278
                    # This session is not allowed to call this method
 
279
                    log.debug("Session %s is trying to call a method it is not authorized to call!", self.transport.sessionno)
 
280
                    raise NotAuthorizedError("Auth level too low: %s < %s" % (auth_level, method_auth_requirement))
 
281
                ret = self.factory.methods[method](*args, **kwargs)
 
282
            except Exception, e:
 
283
                sendError()
 
284
                # Don't bother printing out DelugeErrors, because they are just for the client
 
285
                if not isinstance(e, DelugeError):
 
286
                    log.exception("Exception calling RPC request: %s", e)
 
287
            else:
 
288
                # Check if the return value is a deferred, since we'll need to
 
289
                # wait for it to fire before sending the RPC_RESPONSE
 
290
                if isinstance(ret, defer.Deferred):
 
291
                    def on_success(result):
 
292
                        self.sendData((RPC_RESPONSE, request_id, result))
 
293
                        return result
 
294
 
 
295
                    def on_fail(failure):
 
296
                        try:
 
297
                            failure.raiseException()
 
298
                        except Exception, e:
 
299
                            sendError()
 
300
                        return failure
 
301
 
 
302
                    ret.addCallbacks(on_success, on_fail)
 
303
                else:
 
304
                    self.sendData((RPC_RESPONSE, request_id, ret))
52
305
 
53
306
class RPCServer(component.Component):
54
 
    def __init__(self, port):
 
307
    """
 
308
    This class is used to handle rpc requests from the client.  Objects are
 
309
    registered with this class and their methods are exported using the export
 
310
    decorator.
 
311
 
 
312
    :param port: the port the RPCServer will listen on
 
313
    :type port: int
 
314
    :param interface: the interface to listen on, this may override the `allow_remote` setting
 
315
    :type interface: str
 
316
    :param allow_remote: set True if the server should allow remote connections
 
317
    :type allow_remote: bool
 
318
    :param listen: if False, will not start listening.. This is only useful in Classic Mode
 
319
    :type listen: bool
 
320
    """
 
321
 
 
322
    def __init__(self, port=58846, interface="", allow_remote=False, listen=True):
55
323
        component.Component.__init__(self, "RPCServer")
56
324
 
57
 
        # Get config
58
 
        self.config = deluge.configmanager.ConfigManager("core.conf")
59
 
 
60
 
        if port == None:
61
 
            port = self.config["daemon_port"]
62
 
 
63
 
        if self.config["allow_remote"]:
 
325
        self.factory = Factory()
 
326
        self.factory.protocol = DelugeRPCProtocol
 
327
        # Holds the registered methods
 
328
        self.factory.methods = {}
 
329
        # Holds the session_ids and auth levels
 
330
        self.factory.authorized_sessions = {}
 
331
        # Holds the protocol objects with the session_id as key
 
332
        self.factory.session_protocols = {}
 
333
        # Holds the interested event list for the sessions
 
334
        self.factory.interested_events = {}
 
335
 
 
336
        if not listen:
 
337
            return
 
338
 
 
339
        if allow_remote:
64
340
            hostname = ""
65
341
        else:
66
342
            hostname = "localhost"
67
343
 
68
 
        # Setup the xmlrpc server
 
344
        if interface:
 
345
            hostname = interface
 
346
 
 
347
        log.info("Starting DelugeRPC server %s:%s", hostname, port)
 
348
 
 
349
        # Check for SSL keys and generate some if needed
 
350
        check_ssl_keys()
 
351
 
69
352
        try:
70
 
            log.info("Starting XMLRPC server %s:%s", hostname, port)
71
 
            self.server = XMLRPCServer((hostname, port),
72
 
                requestHandler=BasicAuthXMLRPCRequestHandler,
73
 
                logRequests=False,
74
 
                allow_none=True)
 
353
            reactor.listenSSL(port, self.factory, ServerContextFactory(), interface=hostname)
75
354
        except Exception, e:
76
355
            log.info("Daemon already running or port not available..")
77
356
            log.error(e)
78
357
            sys.exit(0)
79
358
 
80
 
        self.server.register_multicall_functions()
81
 
        self.server.register_introspection_functions()
82
 
 
83
 
        self.server.socket.setblocking(False)
84
 
 
85
 
        gobject.io_add_watch(self.server.socket.fileno(), gobject.IO_IN | gobject.IO_OUT | gobject.IO_PRI | gobject.IO_ERR | gobject.IO_HUP, self._on_socket_activity)
86
 
 
87
 
    def _on_socket_activity(self, source, condition):
88
 
        """This gets called when there is activity on the socket, ie, data to read
89
 
        or to write."""
90
 
        self.server.handle_request()
91
 
        return True
92
 
 
93
359
    def register_object(self, obj, name=None):
 
360
        """
 
361
        Registers an object to export it's rpc methods.  These methods should
 
362
        be exported with the export decorator prior to registering the object.
 
363
 
 
364
        :param obj: the object that we want to export
 
365
        :type obj: object
 
366
        :param name: the name to use, if None, it will be the class name of the object
 
367
        :type name: str
 
368
        """
94
369
        if not name:
95
 
            name = obj.__class__.__name__
 
370
            name = obj.__class__.__name__.lower()
96
371
 
97
372
        for d in dir(obj):
98
373
            if d[0] == "_":
99
374
                continue
100
375
            if getattr(getattr(obj, d), '_rpcserver_export', False):
101
376
                log.debug("Registering method: %s", name + "." + d)
102
 
                self.server.register_function(getattr(obj, d), name + "." + d)
103
 
 
104
 
class XMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
105
 
    def get_request(self):
106
 
        """Get the request and client address from the socket.
107
 
            We override this so that we can get the ip address of the client.
108
 
        """
109
 
        request, client_address = self.socket.accept()
110
 
        self.client_address = client_address[0]
111
 
        return (request, client_address)
112
 
 
113
 
class BasicAuthXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
114
 
    def do_POST(self):
115
 
        if "authorization" in self.headers:
116
 
            auth = self.headers['authorization']
117
 
            auth = auth.replace("Basic ","")
118
 
            decoded_auth = decodestring(auth)
119
 
            # Check authentication here
120
 
            if component.get("AuthManager").authorize(*decoded_auth.split(":")):
121
 
                # User authorized, call the real do_POST now
122
 
                return SimpleXMLRPCRequestHandler.do_POST(self)
123
 
 
124
 
        # if cannot authenticate, end the connection
125
 
        self.send_response(401)
126
 
        self.end_headers()
 
377
                self.factory.methods[name + "." + d] = getattr(obj, d)
 
378
 
 
379
    def get_object_method(self, name):
 
380
        """
 
381
        Returns a registered method.
 
382
 
 
383
        :param name: the name of the method, usually in the form of 'object.method'
 
384
        :type name: str
 
385
 
 
386
        :returns: method
 
387
 
 
388
        :raises KeyError: if `name` is not registered
 
389
 
 
390
        """
 
391
        return self.factory.methods[name]
 
392
 
 
393
    def get_method_list(self):
 
394
        """
 
395
        Returns a list of the exported methods.
 
396
 
 
397
        :returns: the exported methods
 
398
        :rtype: list
 
399
        """
 
400
        return self.factory.methods.keys()
 
401
 
 
402
    def emit_event(self, event):
 
403
        """
 
404
        Emits the event to interested clients.
 
405
 
 
406
        :param event: the event to emit
 
407
        :type event: :class:`deluge.event.DelugeEvent`
 
408
        """
 
409
        log.debug("intevents: %s", self.factory.interested_events)
 
410
        # Find sessions interested in this event
 
411
        for session_id, interest in self.factory.interested_events.iteritems():
 
412
            if event.name in interest:
 
413
                log.debug("Emit Event: %s %s", event.name, event.args)
 
414
                # This session is interested so send a RPC_EVENT
 
415
                self.factory.session_protocols[session_id].sendData(
 
416
                    (RPC_EVENT, event.name, event.args)
 
417
                )
 
418
 
 
419
def check_ssl_keys():
 
420
    """
 
421
    Check for SSL cert/key and create them if necessary
 
422
    """
 
423
    ssl_dir = deluge.configmanager.get_config_dir("ssl")
 
424
    if not os.path.exists(ssl_dir):
 
425
        # The ssl folder doesn't exist so we need to create it
 
426
        os.makedirs(ssl_dir)
 
427
        generate_ssl_keys()
 
428
    else:
 
429
        for f in ("daemon.pkey", "daemon.cert"):
 
430
            if not os.path.exists(os.path.join(ssl_dir, f)):
 
431
                generate_ssl_keys()
 
432
                break
 
433
 
 
434
def generate_ssl_keys():
 
435
    """
 
436
    This method generates a new SSL key/cert.
 
437
    """
 
438
    digest = "md5"
 
439
    # Generate key pair
 
440
    pkey = crypto.PKey()
 
441
    pkey.generate_key(crypto.TYPE_RSA, 1024)
 
442
 
 
443
    # Generate cert request
 
444
    req = crypto.X509Req()
 
445
    subj = req.get_subject()
 
446
    setattr(subj, "CN", "Deluge Daemon")
 
447
    req.set_pubkey(pkey)
 
448
    req.sign(pkey, digest)
 
449
 
 
450
    # Generate certificate
 
451
    cert = crypto.X509()
 
452
    cert.set_serial_number(0)
 
453
    cert.gmtime_adj_notBefore(0)
 
454
    cert.gmtime_adj_notAfter(60*60*24*365*5) # Five Years
 
455
    cert.set_issuer(req.get_subject())
 
456
    cert.set_subject(req.get_subject())
 
457
    cert.set_pubkey(req.get_pubkey())
 
458
    cert.sign(pkey, digest)
 
459
 
 
460
    # Write out files
 
461
    ssl_dir = deluge.configmanager.get_config_dir("ssl")
 
462
    open(os.path.join(ssl_dir, "daemon.pkey"), "w").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
 
463
    open(os.path.join(ssl_dir, "daemon.cert"), "w").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
 
464
    # Make the files only readable by this user
 
465
    for f in ("daemon.pkey", "daemon.cert"):
 
466
        os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE)