31
31
# this exception statement from your version. If you delete this exception
32
32
# statement from all source files in the program, then also delete it here.
40
from deluge.SimpleXMLRPCServer import SimpleXMLRPCServer
41
from deluge.SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
42
from SocketServer import ThreadingMixIn
43
from base64 import decodestring, encodestring
36
"""RPCServer Module"""
44
from twisted.internet.protocol import Factory, Protocol
45
from twisted.internet import ssl, reactor, defer
47
from OpenSSL import crypto, SSL
48
from types import FunctionType
50
import deluge.rencode as rencode
45
51
from deluge.log import LOG as log
46
53
import deluge.component as component
47
54
import deluge.configmanager
50
func._rpcserver_export = True
55
from deluge.core.authmanager import AUTH_LEVEL_NONE, AUTH_LEVEL_DEFAULT
61
def export(auth_level=AUTH_LEVEL_DEFAULT):
63
Decorator function to register an object's method as an RPC. The object
64
will need to be registered with an :class:`RPCServer` to be effective.
66
:param func: the function to export
68
:param auth_level: the auth level required to call this method
72
def wrap(func, *args, **kwargs):
73
func._rpcserver_export = True
74
func._rpcserver_auth_level = auth_level
76
func.__doc__ = "**RPC Exported Function** (*Auth Level: %s*)\n\n" % auth_level
82
if type(auth_level) is FunctionType:
84
auth_level = AUTH_LEVEL_DEFAULT
89
class DelugeError(Exception):
92
class NotAuthorizedError(DelugeError):
95
class ServerContextFactory(object):
98
Create an SSL context.
100
This loads the servers cert/private key SSL files for use with the
103
ssl_dir = deluge.configmanager.get_config_dir("ssl")
104
ctx = SSL.Context(SSL.SSLv3_METHOD)
105
ctx.use_certificate_file(os.path.join(ssl_dir, "daemon.cert"))
106
ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey"))
109
class DelugeRPCProtocol(Protocol):
112
def dataReceived(self, data):
114
This method is called whenever data is received from a client. The
115
only message that a client sends to the server is a RPC Request message.
116
If the RPC Request message is valid, then the method is called in a thread
117
with :meth:`dispatch`.
119
:param data: the data from the client. It should be a zlib compressed
125
# We have some data from the last dataReceived() so lets prepend it
126
data = self.__buffer + data
130
dobj = zlib.decompressobj()
132
request = rencode.loads(dobj.decompress(data))
134
log.debug("Received possible invalid message (%r): %s", data, e)
135
# This could be cut-off data, so we'll save this in the buffer
136
# and try to prepend it on the next dataReceived()
140
data = dobj.unused_data
142
if type(request) is not tuple:
143
log.debug("Received invalid message: type is not tuple")
147
log.debug("Received invalid message: there are no items")
152
log.debug("Received invalid rpc request: number of items in request is %s", len(call))
155
# Format the RPCRequest message for debug printing
159
s += ", ".join([str(x) for x in call[2]])
163
s += ", ".join([key + "=" + str(value) for key, value in call[3].items()])
165
except UnicodeEncodeError:
167
#log.debug("RPCRequest had some non-ascii text..")
170
#log.debug("RPCRequest: %s", s)
172
reactor.callLater(0, self.dispatch, *call)
174
def sendData(self, data):
176
Sends the data to the client.
178
:param data: the object that is to be sent to the client. This should
179
be one of the RPC message types.
182
self.transport.write(zlib.compress(rencode.dumps(data)))
184
def connectionMade(self):
186
This method is called when a new client connects.
188
peer = self.transport.getPeer()
189
log.info("Deluge Client connection made from: %s:%s", peer.host, peer.port)
190
# Set the initial auth level of this session to AUTH_LEVEL_NONE
191
self.factory.authorized_sessions[self.transport.sessionno] = AUTH_LEVEL_NONE
193
def connectionLost(self, reason):
195
This method is called when the client is disconnected.
197
:param reason: the reason the client disconnected.
202
# We need to remove this session from various dicts
203
del self.factory.authorized_sessions[self.transport.sessionno]
204
if self.transport.sessionno in self.factory.session_protocols:
205
del self.factory.session_protocols[self.transport.sessionno]
206
if self.transport.sessionno in self.factory.interested_events:
207
del self.factory.interested_events[self.transport.sessionno]
209
log.info("Deluge client disconnected: %s", reason.value)
211
def dispatch(self, request_id, method, args, kwargs):
213
This method is run when a RPC Request is made. It will run the local method
214
and will send either a RPC Response or RPC Error back to the client.
216
:param request_id: the request_id from the client (sent in the RPC Request)
217
:type request_id: int
218
:param method: the local method to call. It must be registered with
219
the :class:`RPCServer`.
221
:param args: the arguments to pass to `method`
223
:param kwargs: the keyword-arguments to pass to `method`
229
Sends an error response with the contents of the exception that was raised.
231
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
236
(exceptionType.__name__,
237
exceptionValue.args[0] if len(exceptionValue.args) == 1 else "",
238
"".join(traceback.format_tb(exceptionTraceback)))
241
if method == "daemon.login":
242
# This is a special case and used in the initial connection process
243
# We need to authenticate the user here
245
ret = component.get("AuthManager").authorize(*args, **kwargs)
247
self.factory.authorized_sessions[self.transport.sessionno] = ret
248
self.factory.session_protocols[self.transport.sessionno] = self
253
self.sendData((RPC_RESPONSE, request_id, (ret)))
255
self.transport.loseConnection()
258
elif method == "daemon.set_event_interest" and self.transport.sessionno in self.factory.authorized_sessions:
259
# This special case is to allow clients to set which events they are
260
# interested in receiving.
261
# We are expecting a sequence from the client.
263
if self.transport.sessionno not in self.factory.interested_events:
264
self.factory.interested_events[self.transport.sessionno] = []
265
self.factory.interested_events[self.transport.sessionno].extend(args[0])
269
self.sendData((RPC_RESPONSE, request_id, (True)))
273
if method in self.factory.methods and self.transport.sessionno in self.factory.authorized_sessions:
275
method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level
276
auth_level = self.factory.authorized_sessions[self.transport.sessionno]
277
if auth_level < method_auth_requirement:
278
# This session is not allowed to call this method
279
log.debug("Session %s is trying to call a method it is not authorized to call!", self.transport.sessionno)
280
raise NotAuthorizedError("Auth level too low: %s < %s" % (auth_level, method_auth_requirement))
281
ret = self.factory.methods[method](*args, **kwargs)
284
# Don't bother printing out DelugeErrors, because they are just for the client
285
if not isinstance(e, DelugeError):
286
log.exception("Exception calling RPC request: %s", e)
288
# Check if the return value is a deferred, since we'll need to
289
# wait for it to fire before sending the RPC_RESPONSE
290
if isinstance(ret, defer.Deferred):
291
def on_success(result):
292
self.sendData((RPC_RESPONSE, request_id, result))
295
def on_fail(failure):
297
failure.raiseException()
302
ret.addCallbacks(on_success, on_fail)
304
self.sendData((RPC_RESPONSE, request_id, ret))
53
306
class RPCServer(component.Component):
54
def __init__(self, port):
308
This class is used to handle rpc requests from the client. Objects are
309
registered with this class and their methods are exported using the export
312
:param port: the port the RPCServer will listen on
314
:param interface: the interface to listen on, this may override the `allow_remote` setting
316
:param allow_remote: set True if the server should allow remote connections
317
:type allow_remote: bool
318
:param listen: if False, will not start listening.. This is only useful in Classic Mode
322
def __init__(self, port=58846, interface="", allow_remote=False, listen=True):
55
323
component.Component.__init__(self, "RPCServer")
58
self.config = deluge.configmanager.ConfigManager("core.conf")
61
port = self.config["daemon_port"]
63
if self.config["allow_remote"]:
325
self.factory = Factory()
326
self.factory.protocol = DelugeRPCProtocol
327
# Holds the registered methods
328
self.factory.methods = {}
329
# Holds the session_ids and auth levels
330
self.factory.authorized_sessions = {}
331
# Holds the protocol objects with the session_id as key
332
self.factory.session_protocols = {}
333
# Holds the interested event list for the sessions
334
self.factory.interested_events = {}
66
342
hostname = "localhost"
68
# Setup the xmlrpc server
347
log.info("Starting DelugeRPC server %s:%s", hostname, port)
349
# Check for SSL keys and generate some if needed
70
log.info("Starting XMLRPC server %s:%s", hostname, port)
71
self.server = XMLRPCServer((hostname, port),
72
requestHandler=BasicAuthXMLRPCRequestHandler,
353
reactor.listenSSL(port, self.factory, ServerContextFactory(), interface=hostname)
75
354
except Exception, e:
76
355
log.info("Daemon already running or port not available..")
80
self.server.register_multicall_functions()
81
self.server.register_introspection_functions()
83
self.server.socket.setblocking(False)
85
gobject.io_add_watch(self.server.socket.fileno(), gobject.IO_IN | gobject.IO_OUT | gobject.IO_PRI | gobject.IO_ERR | gobject.IO_HUP, self._on_socket_activity)
87
def _on_socket_activity(self, source, condition):
88
"""This gets called when there is activity on the socket, ie, data to read
90
self.server.handle_request()
93
359
def register_object(self, obj, name=None):
361
Registers an object to export it's rpc methods. These methods should
362
be exported with the export decorator prior to registering the object.
364
:param obj: the object that we want to export
366
:param name: the name to use, if None, it will be the class name of the object
95
name = obj.__class__.__name__
370
name = obj.__class__.__name__.lower()
97
372
for d in dir(obj):
100
375
if getattr(getattr(obj, d), '_rpcserver_export', False):
101
376
log.debug("Registering method: %s", name + "." + d)
102
self.server.register_function(getattr(obj, d), name + "." + d)
104
class XMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
105
def get_request(self):
106
"""Get the request and client address from the socket.
107
We override this so that we can get the ip address of the client.
109
request, client_address = self.socket.accept()
110
self.client_address = client_address[0]
111
return (request, client_address)
113
class BasicAuthXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
115
if "authorization" in self.headers:
116
auth = self.headers['authorization']
117
auth = auth.replace("Basic ","")
118
decoded_auth = decodestring(auth)
119
# Check authentication here
120
if component.get("AuthManager").authorize(*decoded_auth.split(":")):
121
# User authorized, call the real do_POST now
122
return SimpleXMLRPCRequestHandler.do_POST(self)
124
# if cannot authenticate, end the connection
125
self.send_response(401)
377
self.factory.methods[name + "." + d] = getattr(obj, d)
379
def get_object_method(self, name):
381
Returns a registered method.
383
:param name: the name of the method, usually in the form of 'object.method'
388
:raises KeyError: if `name` is not registered
391
return self.factory.methods[name]
393
def get_method_list(self):
395
Returns a list of the exported methods.
397
:returns: the exported methods
400
return self.factory.methods.keys()
402
def emit_event(self, event):
404
Emits the event to interested clients.
406
:param event: the event to emit
407
:type event: :class:`deluge.event.DelugeEvent`
409
log.debug("intevents: %s", self.factory.interested_events)
410
# Find sessions interested in this event
411
for session_id, interest in self.factory.interested_events.iteritems():
412
if event.name in interest:
413
log.debug("Emit Event: %s %s", event.name, event.args)
414
# This session is interested so send a RPC_EVENT
415
self.factory.session_protocols[session_id].sendData(
416
(RPC_EVENT, event.name, event.args)
419
def check_ssl_keys():
421
Check for SSL cert/key and create them if necessary
423
ssl_dir = deluge.configmanager.get_config_dir("ssl")
424
if not os.path.exists(ssl_dir):
425
# The ssl folder doesn't exist so we need to create it
429
for f in ("daemon.pkey", "daemon.cert"):
430
if not os.path.exists(os.path.join(ssl_dir, f)):
434
def generate_ssl_keys():
436
This method generates a new SSL key/cert.
441
pkey.generate_key(crypto.TYPE_RSA, 1024)
443
# Generate cert request
444
req = crypto.X509Req()
445
subj = req.get_subject()
446
setattr(subj, "CN", "Deluge Daemon")
448
req.sign(pkey, digest)
450
# Generate certificate
452
cert.set_serial_number(0)
453
cert.gmtime_adj_notBefore(0)
454
cert.gmtime_adj_notAfter(60*60*24*365*5) # Five Years
455
cert.set_issuer(req.get_subject())
456
cert.set_subject(req.get_subject())
457
cert.set_pubkey(req.get_pubkey())
458
cert.sign(pkey, digest)
461
ssl_dir = deluge.configmanager.get_config_dir("ssl")
462
open(os.path.join(ssl_dir, "daemon.pkey"), "w").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
463
open(os.path.join(ssl_dir, "daemon.cert"), "w").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
464
# Make the files only readable by this user
465
for f in ("daemon.pkey", "daemon.cert"):
466
os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE)