1
# -*- coding: utf-8 -*-
3
# This file is part of emesene.
5
# Emesene is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation; either version 2 of the License, or
8
# (at your option) any later version.
10
# emesene is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with emesene; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
# flag 0x10 set, file storage
26
# flag 0x10 not set, memory storage
28
from cStringIO import StringIO
30
from StringIO import StringIO
32
import emesenelib.common as common
33
import emesenelib.p2p.transfers as msn_p2p # TODO
34
import emesenelib.p2p.slp as msn_slp # TODO
38
class P2PUser(gobject.GObject):
39
'''this class manages the creation of objects to handle the p2p
40
messages and connect the necessary signals'''
43
# BinHeader, SLPMessage, message
44
'invite-message-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
45
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
47
# BinHeader, SLPMessage, message
48
'msnp2p-message-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
49
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
52
'msnp2p-file-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
53
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
55
# when a receiver/sender/whatever is created
56
'new-p2p-session': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
57
(gobject.TYPE_PYOBJECT, )),
59
# this signal is emited when the actual image is received
61
'custom-emoticon-data-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
62
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
65
'display-picture-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
66
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
69
'wink-data-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
70
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
72
# session, context, sender
73
'file-transfer-invite': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
74
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
76
# session, context, sender
77
'file-transfer-accepted': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
78
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
80
# session, context, sender
81
'file-transfer-canceled': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
82
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
84
# session, context, rc, sender
85
'file-transfer-complete': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
86
(gobject.TYPE_PYOBJECT, ) * 4),
89
'transfer-failed': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
90
(gobject.TYPE_PYOBJECT, ) * 2),
93
'transfer-progress': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
94
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
96
# session, sender, producer
97
'webcam-invite': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
98
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_BOOLEAN,
99
gobject.TYPE_PYOBJECT)),
101
# This signal is sent when the other user accepts webcam
102
'webcam-ack': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
103
(gobject.TYPE_PYOBJECT,)),
106
'webcam-accepted': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
107
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
110
'webcam-canceled': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
111
(gobject.TYPE_PYOBJECT,)),
114
'webcam-failed': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
115
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
117
# session, sender, frame
118
'webcam-frame-received': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
119
(gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT)),
122
'webcam-frame-ready': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
123
(gobject.TYPE_INT, gobject.TYPE_PYOBJECT, gobject.TYPE_INT, gobject.TYPE_INT)),
126
def __init__(self, manager, mail):
128
gobject.GObject.__init__( self )
130
self.manager = manager
131
self.mail = str(mail)
132
self.object_manager = manager.msn.msnObjectsManager
133
self.last_requested_msnobj = None # to avoid duplicates
136
self._output_connected = False
138
self.dchandler = None
140
self.connect('display-picture-received', \
141
self.manager.msn.on_display_picture_received)
142
self.connect('custom-emoticon-data-received', \
143
self.manager.msn.on_custom_emoticon_received)
144
self.connect('wink-data-received', \
145
self.manager.msn.on_wink_received)
147
# multipart messages that haven't been received/sent completely yet
148
# file object could be a file or a StringIO
149
self.incoming_pending_messages = {} # {identifier: file object}
150
self.outgoing_pending_messages = {} # [identifier: {dict}]
151
self.outgoing_identifier_list = [] # this will evolve to priorities
153
# per-user identifiers, let's see if this works
154
self.my_identifier = msn_p2p.random_number(50000)
155
#self.remote_identifier = 0 # i think we can avoid this better
158
return "<P2P User: " + self.mail + ">"
160
def _get_output_connected(self):
161
return self._output_connected
163
def _set_output_connected(self, value):
164
'''Notifies all registered transports of the value change'''
165
transports = self.transports
167
transports = [self.manager.msn.getSwitchboard(self.mail)]
169
for transport in transports:
170
transport.p2p_set_output_connected(value)
172
self._output_connected = value
174
output_connected = property(fget=_get_output_connected,
175
fset=_set_output_connected)
178
# transport layer stuff
181
def receive_message(self, body):
182
'''called from the transport itself, body is the raw chunk,
183
including headers, SLP, data, footer, etc'''
185
bin_header = get_bin_header(body)
187
if bin_header.flag == BinHeader.ACK_FLAG:
190
rc = None # if it's in a file
192
if bin_header.total_data_size > bin_header.message_length and \
193
bin_header.flag != BinHeader.RAK_FLAG: # don't parse these
195
common.debug("partial message", "p2p")
197
identifier = bin_header.identifier
199
if bin_header.flag & BinHeader.EACH_FLAG:
200
self.emit('transfer-progress', bin_header.session_id,
201
bin_header.data_offset)
203
if identifier in self.incoming_pending_messages:
204
fileobj = self.incoming_pending_messages[identifier]
205
common.debug("follow up", "p2p")
207
chunk = get_data_chunk(body, bin_header)
210
size = bin_header.data_offset + bin_header.message_length
212
if size < bin_header.total_data_size:
213
# don't process, it's not ready yet
216
common.debug("its complete!", "p2p")
218
if bin_header.flag & BinHeader.STORAGE_FLAG:
222
# build a looong message. this is a bit ugly
223
bin_header.data_offset = 0
224
bin_header.message_length = bin_header.total_data_size
225
body = str(bin_header) + fileobj.getvalue() + '\x00' * 4
227
del self.incoming_pending_messages[identifier]
229
common.debug("first message", "p2p")
231
chunk = get_data_chunk(body, bin_header)
232
if bin_header.flag & BinHeader.STORAGE_FLAG:
233
newfile = tempfile.TemporaryFile()
238
self.incoming_pending_messages[identifier] = newfile
241
if bin_header.flag & BinHeader.STORAGE_FLAG and rc is None:
242
# if the header says it's a file, it must be a file, even if
243
# it is a single chunk. ok, yes, i know it's a hint, but
244
# files and long invites can't be handled the same way
245
rc = tempfile.TemporaryFile()
246
rc.write(get_data_chunk(body, bin_header))
248
if bin_header.flag != BinHeader.ACK_FLAG:
249
common.debug("acking", 'p2p')
250
self.send_acknowledge(bin_header)
252
# session and flag 0 means slp
253
if body and bin_header.session_id == 0 and (bin_header.flag & 0xff) == 0:
255
slp = msn_slp.SLPMessage(body[48:-4])
256
except msn_slp.SLPError:
257
# since we are not 100% sure the condition is right
258
# but we should reply with 500 internal server error
259
slp = msn_slp.SLPMessage()
260
#assert str(slp) == body[48:-4]
262
slp = msn_slp.SLPMessage()
264
if bin_header.flag & BinHeader.STORAGE_FLAG:
265
common.debug(" ** FILE RECEIVED", 'p2p')
267
# send the last progress
268
if bin_header.flag & BinHeader.EACH_FLAG:
269
self.emit('transfer-progress', bin_header.session_id,
270
bin_header.data_offset)
273
self.emit('msnp2p-file-received', bin_header, rc)
274
elif slp and slp.method.startswith('INVITE'):
275
common.debug(" ** INVITE", 'p2p')
276
self.emit('invite-message-received', bin_header, slp, body)
278
common.debug("** MESSAGE", 'p2p')
279
self.emit('msnp2p-message-received', bin_header, slp, body)
281
def send_acknowledge(self, bin):
282
'''Builds an acknowledge message'''
284
ack.session_id = bin.session_id
285
ack.identifier = self.next_id()
286
ack.total_data_size = bin.total_data_size
287
ack.flag = BinHeader.ACK_FLAG
288
ack.acknowledged_identifier = bin.identifier
289
ack.acknowledged_unique_id = bin.acknowledged_identifier
290
ack.acknowledged_data_size = bin.total_data_size
291
self.send_message(self, str(ack) + msn_p2p.Base.FOOTER)
294
self.my_identifier += 1
295
return self.my_identifier
297
def do_invite_message_received(self, bin_header, slp, message):
298
'''method called when an invite message is received
299
in the switchboard'''
301
if slp.content_type == msn_slp.SLPMessage.CONTENT_SESSION_REQ:
302
if 'Context' not in slp.body:
303
common.debug('no context in p2p invite', 'p2p')
306
if slp.body['EUF-GUID'] == msn_slp.SLPMessage.EUFGUID_FILE:
308
rawcontext = slp.body['Context']
309
context = msn_p2p.FTContext(base64.b64decode(rawcontext))
310
# create a ft receiver here
311
msn_p2p.FTReceiver(self, context, slp, bin_header)
313
elif slp.body['EUF-GUID'] == msn_slp.SLPMessage.EUFGUID_WEBCAM:
315
rawcontext = slp.body['Context']
316
msn_p2p.WebcamHandler(False, False, self, slp)
318
elif slp.body['EUF-GUID'] == msn_slp.SLPMessage.EUFGUID_WEBCAM_ASK:
320
rawcontext = slp.body['Context']
321
msn_p2p.WebcamHandler(False, True, self, slp)
324
data = self.get_data_by_context(slp.body['Context'])
326
msn_p2p.Sender(self, bin_header, slp, data)
329
elif slp.content_type in (msn_slp.SLPMessage.CONTENT_TRANS_REQ,
330
msn_slp.SLPMessage.CONTENT_TRANS_RESP):
331
# its a direct connect invite
332
if not self.dchandler:
333
self.dchandler = msn_p2p.DCHandler(self, bin_header, slp)
335
self.dchandler.handle_invite(bin_header, slp)
338
common.debug('invalid invite message', 'p2p')
340
def get_data_by_context(self, context):
341
'''receive a base64 encoded msnobj and try to
342
return a Buffer instace with the data, if
343
the msnobj doesnt exist, return None'''
346
msnobj = self.object_manager.getByContext(context)
348
return open(msnobj.filename, 'rb')
351
except (TypeError, AttributeError):
352
common.debug('invalid msnobj in get_data_by_context', 'p2p')
355
def send_message(self, obj, message):
356
'''method called when a p2p message is ready to be sent
357
it will choose between switchboard or direct connection,
358
and start those connections if needed'''
360
transport = self.get_transport()
362
if len(message) > transport.MESSAGE_LIMIT + 52:
364
bin_header = get_bin_header(message)
365
rc = StringIO(message[48:-4])
366
footer = message[-4:]
367
return self.on_msnp2p_file_ready(obj, None,
368
rc, footer, None, bin_header)
370
transport.p2p_send(message, self.mail)
372
def on_msnp2p_message_ready(self, obj, slp_message, session_id):#XXX footer
373
'''called when a p2p/slp message is ready to be sent'''
375
body = str(slp_message)
378
header.session_id = int(session_id)
379
header.identifier = self.next_id()
380
header.acknowledged_identifier = msn_p2p.random_number(50000)
381
header.total_data_size = len(body)
382
header.message_length = len(body)
383
self.send_message(obj, \
384
str(header) + body + msn_p2p.Base.FOOTER)
386
def on_msnp2p_file_ready(self, obj, flags, data, footer, callback,
388
'''this method starts sending a file via p2p, abstracting most stuff
389
like offsets, select() on sockets, etc'''
391
# go to the end, get the size, go back to the start
392
# i tested it in a 134mb file after flushing disk cache, it's fast
394
data_size = data.tell()
397
identifier = self.next_id()
398
if baseheader is None:
400
header.session_id = int(obj.session_id)
401
header.identifier = identifier
403
header.acknowledged_identifier = msn_p2p.random_number(50000)
407
header.identifier = identifier
408
header.total_data_size = data_size
411
obj.current_transfer = int(identifier)
413
self.outgoing_pending_messages[identifier] = \
414
(header, data, footer, callback)
415
self.outgoing_identifier_list.append(identifier)
417
# this allows the mainloop work while sending data
418
self.output_connected = True
420
def output_ready(self, transport):
422
if not self.outgoing_identifier_list:
423
self.output_connected = False
425
identifier = self.outgoing_identifier_list.pop(0)
426
self.outgoing_identifier_list.append(identifier)
427
if not self.send_chunk(identifier):
428
self.outgoing_identifier_list.remove(identifier)
429
if not self.outgoing_identifier_list:
430
self.output_connected = False
432
def send_chunk(self, identifier):
433
'''Send the next chunk of a pending message'''
434
if identifier not in self.outgoing_pending_messages:
437
transport = self.get_transport()
439
header, data, footer, callback = \
440
self.outgoing_pending_messages[identifier]
442
header.data_offset = data.tell()
443
chunk = data.read(transport.MESSAGE_LIMIT)
444
header.message_length = len(chunk)
445
#header.print_fields()
446
self.emit("transfer-progress", header.session_id, header.data_offset)
449
self.send_message(None, str(header) + chunk + footer)
452
# TODO: handle when we receive a NAK after this
457
if identifier in self.outgoing_pending_messages:
458
del self.outgoing_pending_messages[identifier]
462
# transports management stuff
465
def get_transport(self):
466
'''Returns an usable transport from the list.
467
That list rotates by default.
468
If it's empty, a (new) switchboard is returned (which is added
469
to the transport list when it's ready'''
472
transport = self.transports.pop(0)
473
self.transports.append(transport)
475
return self.manager.msn.getSwitchboard(self.mail)
477
def register(self, transport):
478
'''Adds a working transport to the list'''
479
transport.p2p_set_output_connected(self.output_connected)
480
self.transports.append(transport)
482
def unregister(self, transport):
483
'''Removes a transport from the list'''
484
transport.p2p_set_output_connected(False)
485
if transport in self.transports:
486
self.transports.remove(transport)
488
def start_new_conv(self):
489
'''Tells the GUI that a new conv started
490
A bit unstable, may open more than one window''' #FIXME, h4x
491
switchboard = self.manager.msn.getSwitchboard(self.mail)
492
self.manager.msn.emit('new-conversation', self.mail, switchboard)
494
class BinHeader(object):
495
'''This class represents the bin header'''
497
FORMAT = '<LLQQLLLLQ'
499
NO_FLAG = 0x0 # no flags specified
500
NAK_FLAG = 0x1 # negative acknowledge
501
ACK_FLAG = 0x2 # acknowledge
502
RAK_FLAG = 0x4 # request acknowledge
503
BINERROR_FLAG = 0x8 # error on the binary level
504
STORAGE_FLAG = 0x10 # if set it should be saved in a file
505
EACH_FLAG = 0x20 # if we should emit a progress signal
506
CANCEL_FLAG = 0x40 # cancel
507
ERROR_FLAG = 0x80 # error
508
FILE_FLAG = 0x1000000 # data of a file
510
def __init__(self, data=None):
512
self.session_id = 0 # dw
513
self.identifier = 0 # dw
514
self.data_offset = 0 # qw
515
self.total_data_size = 0 # qw
516
self.message_length = 0 # dw
518
self.acknowledged_identifier = 0 # dw1
519
self.acknowledged_unique_id = 0 # dw2
520
self.acknowledged_data_size = 0 # qw1
525
def print_fields(self):
526
'''print the binary fields'''
528
print 'SessionID: ' + str(self.session_id)
529
print 'Identifier: ' + str(self.identifier)
530
print 'Data offset: ' + str(self.data_offset)
531
print 'Total data size: ' + str(self.total_data_size)
532
print 'Message length: ' + str(self.message_length)
533
print 'Flag: ' + str(self.flag)
534
print 'Acknowledged identifier: ' + str(self.acknowledged_identifier)
535
print 'Acknowledged unique ID: ' + str(self.acknowledged_unique_id)
536
print 'Acknowledged data size: ' + str(self.acknowledged_data_size)
539
def fill(self, data):
540
'''Parse and save in attributes the contents of data'''
542
# from __future__ import braces
547
self.total_data_size,
550
self.acknowledged_identifier,
551
self.acknowledged_unique_id,
552
self.acknowledged_data_size
553
) = struct.unpack(BinHeader.FORMAT, data[:48])
556
'''return the representation of this object'''
558
return struct.pack(BinHeader.FORMAT,
562
self.total_data_size,
565
self.acknowledged_identifier,
566
self.acknowledged_unique_id,
567
self.acknowledged_data_size)
569
def get_bin_header(message):
570
'''receive a msnp2p message and return a BinHeader
571
instance with the bin header of the message
572
if you send crap, you will get a nice BinHeader full
575
return BinHeader(message[:48])
577
def get_data_chunk(message, bin):
578
'''return the data chunk in message, an empty string if
579
something went bad'''
582
return message[48:48 + bin.message_length]
586
def compare_binheaders(b1, b2):
587
'''print the binary fields'''
588
def compare_field(name, attr):
589
print '%-17s %10s %10s' % (name, getattr(b1, attr), getattr(b2, attr))
591
print '-------------------------------------------\n'
592
compare_field('SessionID:', 'session_id')
593
compare_field('Identifier:', 'identifier')
594
compare_field('Data offset:', 'data_offset')
595
compare_field('Total data size:', 'total_data_size')
596
compare_field('Message length:', 'message_length')
597
compare_field('Flag:', 'flag')
598
compare_field('Ackd identifier:', 'acknowledged_identifier')
599
compare_field('Ackd unique ID:', 'acknowledged_unique_id')
600
compare_field('Ackd data size:', 'acknowledged_data_size')
601
print '-------------------------------------------\n'