22
22
from common.xmpp.idlequeue import IdleObject
23
23
from common.xmpp import dispatcher_nb, simplexml
24
from common.xmpp.client import *
24
from common.xmpp.plugin import *
25
25
from common.xmpp.simplexml import ustr
26
from common.xmpp.transports_nb import DATA_RECEIVED, DATA_SENT
26
27
from common.zeroconf import zeroconf
28
29
from common.xmpp.protocol import *
36
37
from common.zeroconf import roster_zeroconf
38
39
MAX_BUFF_LEN = 65536
39
DATA_RECEIVED = 'DATA RECEIVED'
40
DATA_SENT = 'DATA SENT'
41
40
TYPE_SERVER, TYPE_CLIENT = range(2)
43
42
# wait XX sec to establish a connection
58
57
self.caller = conn_holder.caller
59
58
self.conn_holder = conn_holder
62
flags = socket.AI_PASSIVE
63
if hasattr(socket, 'AI_ADDRCONFIG'):
61
flags = socket.AI_PASSIVE
62
if hasattr(socket, 'AI_ADDRCONFIG'):
64
63
flags |= socket.AI_ADDRCONFIG
65
64
ai = socket.getaddrinfo(None, self.port, socket.AF_UNSPEC,
66
65
socket.SOCK_STREAM, 0, flags)[0]
71
70
# will fail when port is busy, or we don't have rights to bind
73
72
self._serv.bind((ai[4][0], self.port))
75
74
# unable to bind, show error dialog
77
76
self._serv.listen(socket.SOMAXCONN)
88
87
''' accept a new incomming connection and notify queue'''
89
88
sock = self.accept_conn()
90
''' loop through roster to find who has connected to us'''
89
# loop through roster to find who has connected to us
92
91
ipaddr = sock[1][0]
93
92
for jid in self.conn_holder.getRoster().keys():
94
93
entry = self.conn_holder.getRoster().getItem(jid)
95
94
if (entry['address'] == ipaddr):
98
97
P2PClient(sock[0], ipaddr, sock[1][1], self.conn_holder, [], from_jid)
100
99
def disconnect(self, message=''):
121
120
on_ok=None, on_not_ok=None):
122
121
self._owner = self
123
122
self.Namespace = 'jabber:client'
123
self.protocol_type = 'XMPP'
124
124
self.defaultNamespace = self.Namespace
125
125
self._component = 0
126
126
self._registered_name = None
131
131
self.Server = host
132
132
self.on_ok = on_ok
133
133
self.on_not_ok = on_not_ok
135
134
self.Connection = None
137
debug = ['always', 'nodebuilder']
140
self._DEBUG = Debug.Debug(debug)
141
self.DEBUG = self._DEBUG.Show
142
self.debug_flags = self._DEBUG.debug_flags
143
self.debug_flags.append(self.DBG)
144
135
self.sock_hash = None
146
137
self.sock_type = TYPE_SERVER
166
157
on_not_ok('Connection to host could not be established.')
168
if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
169
self.conn_holder.number_of_awaiting_messages[self.fd] += 1
159
thread_id = stanza.getThread()
162
id_ = self.Dispatcher.getAnID()
163
if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
164
self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
171
self.conn_holder.number_of_awaiting_messages[self.fd] = 1
167
self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
170
self.on_responses = {}
173
172
def add_stanza(self, stanza, is_message=False):
174
173
if self.Connection:
179
178
self.stanzaqueue.append((stanza, is_message))
182
if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
183
self.conn_holder.number_of_awaiting_messages[self.fd] += 1
181
thread_id = stanza.getThread()
184
id_ = self.Dispatcher.getAnID()
185
if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
186
self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
185
self.conn_holder.number_of_awaiting_messages[self.fd] = 1
189
self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
189
194
def on_message_sent(self, connection_id):
190
self.conn_holder.number_of_awaiting_messages[connection_id] -= 1
196
self.conn_holder.ids_of_awaiting_messages[connection_id].pop(0)
199
# use on_ok only on first message. For others it's called in
192
203
def on_connect(self, conn):
193
204
self.Connection = conn
194
205
self.Connection.PlugIn(self)
195
206
dispatcher_nb.Dispatcher().PlugIn(self)
196
207
self._register_handlers()
200
209
def StreamInit(self):
201
210
''' Send an initial stream header. '''
203
212
self.Dispatcher.Stream._dispatch_depth = 2
204
213
self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch
205
214
self.Dispatcher.Stream.stream_header_received = self._check_stream_start
206
self.debug_flags.append(simplexml.DBG_NODEBUILDER)
207
self.Dispatcher.Stream.DEBUG = self.DEBUG
208
215
self.Dispatcher.Stream.features = None
209
216
if self.sock_type == TYPE_CLIENT:
210
217
self.send_stream_header()
221
228
self.Dispatcher._metastream)[:-2])
223
230
def _check_stream_start(self, ns, tag, attrs):
224
if ns<>NS_STREAMS or tag<>'stream':
225
self.Connection.DEBUG('Incorrect stream start: (%s,%s).Terminating! ' \
226
% (tag, ns), 'error')
231
if ns != NS_STREAMS or tag != 'stream':
232
log.error('Incorrect stream start: (%s,%s).Terminating!' % (tag, ns), 'error')
227
233
self.Connection.disconnect()
228
234
if self.on_not_ok:
229
235
self.on_not_ok('Connection to host could not be established: Incorrect answer from server.')
247
253
def on_disconnect(self):
248
254
if self.conn_holder:
249
if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
250
del self.conn_holder.number_of_awaiting_messages[self.fd]
255
if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
256
del self.conn_holder.ids_of_awaiting_messages[self.fd]
251
257
self.conn_holder.remove_connection(self.sock_hash)
252
258
if self.__dict__.has_key('Dispatcher'):
253
259
self.Dispatcher.PlugOut()
278
284
self.onreceive(None)
287
def remove_timeout(self):
281
290
def _register_handlers(self):
282
291
self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(
283
292
self.Server, conn, data))
293
302
common.xmpp.NS_BYTESTREAM)
294
303
self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error',
295
304
common.xmpp.NS_BYTESTREAM)
305
self.RegisterHandler('iq', self._caller._DiscoverItemsGetCB, 'get',
306
common.xmpp.NS_DISCO_ITEMS)
297
308
class P2PConnection(IdleObject, PlugIn):
298
309
def __init__(self, sock_hash, _sock, host=None, port=None, caller=None,
342
352
self._sock = socket.socket(*ai[:3])
343
353
self._sock.setblocking(False)
344
354
self._server = ai[4]
346
356
if sys.exc_value[0] != errno.EINPROGRESS:
347
357
# for all errors, we try other addresses
348
358
self.connect_to_next_ip()
403
413
self._plug_idle()
405
415
def read_timeout(self):
406
if self.client.conn_holder.number_of_awaiting_messages.has_key(self.fd) \
407
and self.client.conn_holder.number_of_awaiting_messages[self.fd] > 0:
408
self.client._caller.dispatch('MSGERROR',[unicode(self.client.to), -1,
409
_('Connection to host could not be established: Timeout while sending data.'), None, None])
410
self.client.conn_holder.number_of_awaiting_messages[self.fd] = 0
416
ids = self.client.conn_holder.ids_of_awaiting_messages
417
if self.fd in ids and len(ids[self.fd]) > 0:
418
for (id_, thread_id) in ids[self.fd]:
419
self._owner.Dispatcher.Event('', DATA_ERROR, (self.client.to,
413
424
def do_connect(self):
473
484
if self._owner.sock_type == TYPE_CLIENT:
474
485
self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
475
486
if received.strip():
476
self.DEBUG(received, 'got')
487
log.debug('received: %s', received)
477
488
if hasattr(self._owner, 'Dispatcher'):
478
489
self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
479
490
self.on_receive(received)
481
492
# This should never happed, so we need the debug
482
self.DEBUG('Unhandled data received: %s' % received,'error')
493
log.error('Unhandled data received: %s' % received)
483
494
self.disconnect()
486
def onreceive(self, recv_handler):
488
if hasattr(self._owner, 'Dispatcher'):
489
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
491
self.on_receive = None
493
_tmp = self.on_receive
494
# make sure this cb is not overriden by recursive calls
495
if not recv_handler(None) and _tmp == self.on_receive:
496
self.on_receive = recv_handler
498
def disconnect(self):
497
def disconnect(self, message=''):
499
498
''' Closes the socket. '''
500
499
gajim.idlequeue.remove_timeout(self.fd)
501
500
gajim.idlequeue.unplug_idle(self.fd)
503
502
self._sock.shutdown(socket.SHUT_RDWR)
504
503
self._sock.close()
506
505
# socket is already closed
555
554
def _on_send(self):
556
555
if self.sent_data and self.sent_data.strip():
557
self.DEBUG(self.sent_data,'sent')
556
log.debug('sent: %s' % self.sent_data)
558
557
if hasattr(self._owner, 'Dispatcher'):
559
558
self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
560
559
self.sent_data = None
563
562
self.buff_is_message = False
565
564
def _on_send_failure(self):
566
self.DEBUG("Socket error while sending data",'error')
567
self._owner.disconnected()
565
log.error('Socket error while sending data')
566
self._owner.on_disconnect()
568
567
self.sent_data = None
570
569
class ClientZeroconf:
578
577
self.ip_to_hash = {}
579
578
self.hash_to_port = {}
580
579
self.listener = None
581
self.number_of_awaiting_messages = {}
580
self.ids_of_awaiting_messages = {}
583
582
def connect(self, show, msg):
584
583
self.port = self.start_listener(self.caller.port)
699
698
# look for hashed connections
700
699
if to in self.recipient_to_hash:
701
700
conn = self.connections[self.recipient_to_hash[to]]
702
if not stanza.getID():
703
id_ = conn.Dispatcher.getAnID()
701
id_ = stanza.getID() or ''
705
702
if conn.add_stanza(stanza, is_message):
710
707
if item['address'] in self.ip_to_hash:
711
hash = self.ip_to_hash[item['address']]
712
if self.hash_to_port[hash] == item['port']:
713
conn = self.connections[hash]
714
if not stanza.getID():
715
id_ = conn.Dispatcher.getAnID()
708
hash_ = self.ip_to_hash[item['address']]
709
if self.hash_to_port[hash_] == item['port']:
710
conn = self.connections[hash_]
711
id_ = stanza.getID() or ''
717
712
if conn.add_stanza(stanza, is_message):
722
717
# otherwise open new connection
723
718
if not stanza.getID():
725
720
P2PClient(None, item['address'], item['port'], self,
726
721
[(stanza, is_message)], to, on_ok=on_ok, on_not_ok=on_not_ok)
723
def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
725
Send stanza and wait for recipient's response to it. Will call transports
726
on_timeout callback if response is not retrieved in time.
728
Be aware: Only timeout of latest call of SendAndWait is active.
730
# if timeout is None:
731
# timeout = DEFAULT_TIMEOUT_SECONDS
734
# self._owner.set_timeout(timeout)
737
if to in self.recipient_to_hash:
738
conn = self.connections[self.recipient_to_hash[to]]
739
elif item['address'] in self.ip_to_hash:
740
hash_ = self.ip_to_hash[item['address']]
741
if self.hash_to_port[hash_] == item['port']:
742
conn = self.connections[hash_]
744
conn.Dispatcher.on_responses[_waitid] = (func, args)
745
conn.onreceive(conn.Dispatcher._WaitForData)
746
conn.Dispatcher._expected[_waitid] = None
747
self.send(stanza, on_ok=on_ok)
749
def SendAndCallForResponse(self, stanza, func=None, args=None):
750
''' Put stanza on the wire and call back when recipient replies.
751
Additional callback arguments can be specified in args. '''
752
self.SendAndWaitForResponse(stanza, 0, func, args)