20
20
# along with this program; if not, write to the Free Software
21
21
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24
Protocol transport module.
26
This module contains classes used by the library to connect to the msn server
27
it includes for example, the direct connection transport as well as an http
28
polling transport ideal in firewalled environment."""
23
"""Network Transport Layer
25
This module provides an abstraction of the transport to be used to communicate
26
with the MSN servers, actually MSN servers can communicate either through
27
direct connection using TCP/1863 or using TCP/80 by tunelling the protocol
28
inside HTTP POST requests.
30
The classes of this module are structured as follow:
31
G{classtree BaseTransport}"""
39
from consts import ServerType
41
logger = logging.getLogger('Connection')
40
__all__=['ServerType', 'DirectConnection']
42
logger = logging.getLogger('Transport')
44
class ServerType(object):
49
TransportError = gnet.IoError
43
51
class BaseTransport(gobject.GObject):
44
"""Abstract Base Class that modelize a connection to an MSN service"""
52
"""Abstract Base Class that modelize a connection to the MSN service, this
53
abstraction is used to build various transports that expose the same
54
interface, basically a transport is created using its constructor then it
55
simply emits signals when network events (or even abstracted network events)
56
occur, for example a Transport that successfully connected to the MSN
57
service will emit a connection-success signal, and when that transport
58
received a meaningful message it would emit a command-received signal.
60
@ivar server: the server being used to connect to
61
@type server: tuple(host, port)
63
@ivar server_type: the server that we are connecting to, either
64
Notification or switchboard.
65
@type server_type: L{ServerType}
67
@ivar proxies: proxies that we can use to connect
68
@type proxies: dict(type => L{gnet.proxy.ProxyInfos})
70
@ivar transaction_id: the current transaction ID
71
@type transaction_id: integer
74
@cvar connection-failure: signal emitted when the connection fails
75
@type connection-failure: ()
77
@cvar connection-success: signal emitted when the connection succeed
78
@type connection-success: ()
80
@cvar connection-reset: signal emitted when the connection is being
82
@type connection-reset: ()
84
@cvar connection-lost: signal emitted when the connection was lost
85
@type connection-lost: ()
87
@cvar command-received: signal emitted when a command is received
88
@type command-received: FIXME
90
@cvar command-sent: signal emitted when a command was successfully
91
transmitted to the server
92
@type command-sent: FIXME
94
@undocumented: __gsignals__"""
47
97
"connection-failure" : (gobject.SIGNAL_RUN_FIRST,
51
101
"connection-success" : (gobject.SIGNAL_RUN_FIRST,
52
102
gobject.TYPE_NONE,
243
320
self._receiver.delimiter = "\r\n"
246
if cmd.name in structure.Command.PAYLOAD_COMMANDS:
247
payload_len = int(cmd.arguments[-1])
323
if cmd.name in msnp.Command.INCOMING_PAYLOAD or \
324
(cmd.is_error() and (cmd.arguments is not None) and len(cmd.arguments) > 0):
326
payload_len = int(cmd.arguments[-1])
248
329
if payload_len > 0:
249
330
self.__pending_chunk = chunk
250
331
self._receiver.delimiter = payload_len
253
333
logger.debug('<<< ' + repr(cmd))
254
self.emit("command-received", cmd)
334
if cmd.name == 'QNG':
335
self.__handle_ping_reply(cmd)
337
self.emit("command-received", cmd)
256
338
gobject.type_register(DirectConnection)
259
class HTTPPollTransportStatus(object):
260
"""Transport status."""
263
"""The transport is open."""
264
OPENING_REQUESTED = 1
265
"""The transport is being opened due to user request."""
267
"""The transport is being opened due to auto reconnection process."""
269
"""The transport lost his connection."""
271
"""The transport is being closed."""
273
"""The transport is closed."""
275
class HTTPPollTransactionStatus(object):
276
"""Data transfert status"""
280
IGNORING = 3 # to deal with HTTP/1.1 100 Continue
282
341
class HTTPPollConnection(BaseTransport):
283
"""Implements an http connection to the net"""
342
"""Implements an HTTP polling transport, basically it encapsulates the MSNP
343
commands into an HTTP request, and receive responses by polling a specific
287
345
def __init__(self, server, server_type=ServerType.NOTIFICATION, proxies={}):
288
self._target_host = server[0]
346
self._target_server = server
289
347
server = ("gateway.messenger.hotmail.com", 80)
290
348
BaseTransport.__init__(self, server, server_type, proxies)
292
if 'http' in self.proxies:
293
transport = gio.network.TCPClient(proxies['http'].host, proxies['http'].port)
295
transport = gio.network.TCPClient(server[0], server[1])
296
transport.connect("notify::status", self.__on_status_change)
297
transport.connect("error", lambda t, msg: self.emit("connection-failure"))
299
receiver = gio.ChunkReceiver(transport)
300
receiver.connect("received", self.__on_received)
302
self._receiver = receiver
303
self._receiver.delimiter = "\r\n"
304
self._transport = transport
349
self._setup_transport()
306
351
self._command_queue = []
308
self._transport_status = HTTPPollTransportStatus.CLOSED
309
self._transaction_status = HTTPPollTransactionStatus.NONE
310
self._session_open = False # used to send Action=open
312
self.__clear_response_handler()
315
self._disable_poll = True
317
__init__.__doc__ = BaseTransport.__init__.__doc__
352
self._waiting_for_response = False # are we waiting for a response
353
self._session_id = None
356
def _setup_transport(self):
358
proxies = self.proxies
359
if 'http' in proxies:
360
transport = gnet.protocol.HTTP(server[0], server[1], proxies['http'])
362
transport = gnet.protocol.HTTP(server[0], server[1])
363
transport.connect("response-received", self.__on_received)
364
transport.connect("request-sent", self.__on_sent)
365
transport.connect("error", self.__on_error)
366
self._transport = transport
320
368
def establish_connection(self):
321
369
logger.debug('<-> Connecting to %s:%d' % self.server)
322
self._transport.open()
370
self._polling_source_id = gobject.timeout_add(5000, self._poll)
371
self.emit("connection-success")
324
373
def lose_connection(self):
325
self._transport_status = HTTPPollTransportStatus.CLOSING
326
self._session_open = False
327
self._transaction_status = HTTPPollTransactionStatus.NONE
328
self._transport.close()
374
gobject.source_remove(self._polling_source_id)
375
del self._polling_source_id
377
self.emit("connection-lost", None)
330
def reset_connection(self, server=None): #TODO: HTTPPollTransportStatus.RESETING ?
380
def reset_connection(self, server=None):
332
self._transport.set_property("host", server[0])
333
self._transport.set_property("port", server[1])
335
self._transport.close()
336
self._session_open = False
337
self._transaction_status = HTTPPollTransactionStatus.NONE
338
self._transport.open()
340
def send_command(self, command, increment=True, callback=None, cb_args=()):
341
self.__queue_command(command, callback, cb_args)
382
self._target_server = server
383
self.emit("connection-reset")
385
def send_command(self, command, increment=True, callback=None, *cb_args):
386
self._command_queue.append((command, increment, callback, cb_args))
389
def _send_command(self):
390
if len(self._command_queue) == 0 or self._waiting_for_response:
392
command, increment = self._command_queue[0][0:2]
393
resource = "/gateway/gateway.dll"
396
"Accept-Language": "en-us",
397
#"User-Agent": "MSMSGS",
398
"Connection": "Keep-Alive",
399
"Pragma": "no-cache",
400
"Content-Type": "application/x-msn-messenger",
401
"Proxy-Connection": "Keep-Alive"
404
str_command = str(command)
405
if self._session_id is None:
406
resource += "?Action=open&Server=%s&IP=%s" % (self.server_type,
407
self._target_server[0])
408
elif command == None:# Polling the server for queued messages
409
resource += "?Action=poll&SessionID=%s" % self._session_id
412
resource += "?SessionID=%s" % self._session_id
414
self._transport.request(resource, headers, str_command, "POST")
415
self._waiting_for_response = True
417
if command is not None:
418
logger.debug('>>> ' + repr(command))
343
421
self._increment_transaction_id()
345
def __queue_command(self, command, callback, cb_args):
346
self._command_queue.append((command, callback, cb_args))
347
self.__process_command_queue()
349
def __pop_command(self):
350
return self._command_queue.pop(0)
424
if not self._waiting_for_response:
425
self.send_command(None)
352
def __send_command(self, command, callback=None, cb_args=()):
353
logger.debug('>>> ' + repr(command))
354
host = self.server[0]
355
strcmd = str(command)
357
if not self._session_open:
358
params = "Action=open&Server=%s&IP=%s" % (self.server_type, self._target_host)
359
self._session_open = True
360
elif command == None: # Polling the server for queued messages
361
assert(self._transaction_status == HTTPPollTransactionStatus.NONE)
362
params = "Action=poll&SessionID=%s" % self.session_id
365
assert(self._transaction_status == HTTPPollTransactionStatus.NONE)
366
params = "SessionID=%s" % self.session_id
368
action = "POST http://%s/gateway/gateway.dll?%s HTTP/1.1" % (host, params)
371
headers.append("Accept: */*")
372
headers.append("Accept-Language: en-us")
373
headers.append("User-Agent: MSMSGS")
374
headers.append("Host: " + host)
375
headers.append("Proxy-Connection: Keep-Alive")
376
headers.append("Connection: Keep-Alive")
377
headers.append("Pragma: no-cache")
378
headers.append("Content-Type: application/x-msn-messenger")
379
headers.append("Content-Length: %d" % len(strcmd))
380
if 'http' in self.proxies and self.proxies['http'].user:
381
auth = base64.encodestring(self.proxies['http'].user + ':' + self.proxies['http'].password)
382
headers.append("Proxy-authorization: Basic " + auth)
384
http_envelope = action + "\r\n" + "\r\n".join(headers) + "\r\n"
386
self._transaction_status = HTTPPollTransactionStatus.SENDING
387
command_sent_cb_args = (command, callback, cb_args)
388
self._transport.send(http_envelope + "\r\n" + strcmd,
389
self.__on_command_sent, command_sent_cb_args)
390
blob = http_envelope + "\r\n" + strcmd
392
#for line in blob.split('\r\n'):
393
# print '\t>>> ', line
394
#print '----------------------------------------------------------'
396
def __process_command_queue(self):
397
s = HTTPPollTransportStatus
398
if self._transport_status == s.OPEN:
399
if self._transaction_status != HTTPPollTransactionStatus.NONE:
402
if len(self._command_queue) > 0:
403
cmd = self.__pop_command()
404
self.__send_command(*cmd)
405
elif not self._disable_poll:
406
self.__send_command(None) # Poll (WARN: don't use self.__poll())
407
elif self._transport_status == s.CLOSED or \
408
self._transport_status == s.LOST:
409
self.establish_connection()
428
def __on_error(self, transport, reason):
430
self.emit("connection-lost", reason)
432
def __on_received(self, transport, http_response):
433
if http_response.status == 403:
434
self.emit("connection-lost", TransportError.PROXY_FORBIDDEN)
435
self.lose_connection()
436
if 'X-MSN-Messenger' in http_response.headers:
437
data = http_response.headers['X-MSN-Messenger'].split(";")
439
key, value = [p.strip() for p in elem.split('=', 1)]
440
if key == 'SessionID':
441
self._session_id = value
443
self.server = (value, self.server[1])
444
self._setup_transport()
445
elif key == 'Session'and value == 'close':
446
#self.lose_connection()
449
self._waiting_for_response = False
451
commands = http_response.body
452
while len(commands) != 0:
453
commands = self.__extract_command(commands)
457
def __on_sent(self, transport, http_request):
412
458
if len(self._command_queue) == 0:
413
self.__process_command_queue()
417
def __on_command_sent(self, command, user_callback, user_cb_args):
418
self.emit("command-sent", command)
420
user_callback(*user_cb_args)
422
def __on_status_change(self, transport, param):
423
status = transport.get_property("status")
424
s = HTTPPollTransportStatus
425
if status == gio.STATUS_OPEN:
426
if self._transport_status == s.OPENING_REQUESTED:
427
self._transport_status = s.OPEN
428
self.emit("connection-success")
429
self._poll_id = gobject.timeout_add(self.POLL_DELAY, self.__poll)
430
elif self._transport_status == s.OPENING_AUTO:
431
self._transport_status = s.OPEN
432
self.__process_command_queue()
434
elif status == gio.STATUS_OPENING:
435
if self._transport_status == s.CLOSED:
436
self._transport_status = s.OPENING_REQUESTED
437
elif self._transport_status == s.LOST:
438
self._transport_status = s.OPENING_AUTO
440
elif status == gio.STATUS_CLOSED:
441
if self._transport_status == s.OPEN:
442
self._transport_status = s.LOST
443
elif self._transport_status == s.CLOSING:
444
self._transport_status = s.CLOSED
445
self.emit("connection-lost")
446
self._disable_poll = True
447
gobject.source_remove(self._poll_id)
449
def __on_received(self, receiver, chunk):
450
#print '\t<<< ', chunk
451
if self._transaction_status == HTTPPollTransactionStatus.SENDING and \
452
chunk[:4] == 'HTTP': # HTTP status line
453
chunk = chunk.split()
454
if chunk[1] == '100':
455
self._transaction_status = HTTPPollTransactionStatus.IGNORING
456
elif chunk[1] == '200':
457
self._transaction_status = HTTPPollTransactionStatus.RECEIVING
459
self.emit("connection-failure")
460
self.lose_connection()
461
elif not self.http_body_pending: # headers
463
header, value = [p.strip() for p in chunk.split(':', 1)]
464
if header == 'Content-Length':
465
self.content_length = int(value)
466
elif header == 'X-MSN-Messenger':
467
for elem in value.split(';'):
468
key, val = [p.strip() for p in elem.split('=', 1)]
469
if key == 'SessionID':
470
self.session_id = val
472
self.server = (val, self.server[1])
473
#if 'http' not in self.proxies: # TODO: shall we reset the connection ?
474
# self._transport.set_property("host", val)
475
elif key == 'Session'and val == 'close':
476
self.emit("connection-lost")
477
self.lose_connection()
478
else: # empty line separating headers from body
479
if self._transaction_status == HTTPPollTransactionStatus.IGNORING:
480
self._transaction_status = HTTPPollTransactionStatus.SENDING
482
if self.content_length > 0:
483
self.http_body_pending = True
484
self._receiver.delimiter = self.content_length
486
self.__clear_response_handler()
489
if self.content_length == 0:
490
# The message was an empty response to a poll,
491
# there is nothing to retrieve from the server
493
elif len(self.data) != 0:
494
while len(self.data) != 0:
495
self.data = self.__extract_command(self.data)
497
# We got a complete response from the server, we can
498
# send the next command or reconnect to the server
500
self.__clear_response_handler()
501
self.__process_command_queue()
460
command, increment, callback, cb_args = self._command_queue.pop(0)
461
if command is not None:
464
self.emit("command-sent", command)
503
466
def __extract_command(self, data):
504
467
first, rest = data.split('\r\n', 1)
505
cmd = structure.Command()
506
469
cmd.parse(first.strip())
507
if cmd.name == 'USR' and cmd.arguments[0] == 'OK':
508
self._disable_poll = False
509
if cmd.name in structure.Command.PAYLOAD_COMMANDS:
510
payload_len = int(cmd.arguments[-1])
470
if cmd.name in msnp.Command.INCOMING_PAYLOAD or \
471
(cmd.is_error() and (cmd.arguments is not None) and len(cmd.arguments) > 0):
473
payload_len = int(cmd.arguments[-1])
511
476
if payload_len > 0:
512
477
cmd.payload = rest[:payload_len].strip()
513
478
logger.debug('<<< ' + repr(cmd))