~ubuntu-branches/ubuntu/lucid/gajim/lucid-security

« back to all changes in this revision

Viewing changes to src/common/zeroconf/client_zeroconf.py

  • Committer: Maia Kozheva
  • Date: 2009-11-25 08:32:36 UTC
  • mfrom: (1.1.16 upstream)
  • Revision ID: sikon@maia-desktop-20091125083236-hkxrujhn3amehuve
Merged new upstream release 0.13

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
##      common/zeroconf/client_zeroconf.py
 
1
##      common/zeroconf/client_zeroconf.py
2
2
##
3
3
## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de>
4
4
##                              2006 Dimitur Kirov <dkirov@gmail.com>
21
21
import common.xmpp
22
22
from common.xmpp.idlequeue import IdleObject
23
23
from common.xmpp import dispatcher_nb, simplexml
24
 
from common.xmpp.client import *
 
24
from common.xmpp.plugin import *
25
25
from common.xmpp.simplexml import ustr
 
26
from common.xmpp.transports_nb import DATA_RECEIVED, DATA_SENT
26
27
from common.zeroconf import zeroconf
27
28
 
28
29
from common.xmpp.protocol import *
36
37
from common.zeroconf import roster_zeroconf
37
38
 
38
39
MAX_BUFF_LEN = 65536
39
 
DATA_RECEIVED = 'DATA RECEIVED'
40
 
DATA_SENT = 'DATA SENT'
41
40
TYPE_SERVER, TYPE_CLIENT = range(2)
42
41
 
43
42
# wait XX sec to establish a connection
58
57
                self.caller = conn_holder.caller
59
58
                self.conn_holder = conn_holder
60
59
 
61
 
        def bind(self):
62
 
                flags = socket.AI_PASSIVE
63
 
                if hasattr(socket, 'AI_ADDRCONFIG'):
 
60
        def bind(self):
 
61
                flags = socket.AI_PASSIVE
 
62
                if hasattr(socket, 'AI_ADDRCONFIG'):
64
63
                        flags |= socket.AI_ADDRCONFIG
65
64
                ai = socket.getaddrinfo(None, self.port, socket.AF_UNSPEC,
66
65
                        socket.SOCK_STREAM, 0, flags)[0]
71
70
                # will fail when port is busy, or we don't have rights to bind
72
71
                try:
73
72
                        self._serv.bind((ai[4][0], self.port))
74
 
                except Exception, e:
 
73
                except Exception:
75
74
                        # unable to bind, show error dialog
76
75
                        return None
77
76
                self._serv.listen(socket.SOMAXCONN)
87
86
        def pollin(self):
88
87
                ''' accept a new incomming connection and notify queue'''
89
88
                sock = self.accept_conn()
90
 
                ''' loop through roster to find who has connected to us'''
 
89
                # loop through roster to find who has connected to us
91
90
                from_jid = None
92
91
                ipaddr = sock[1][0]
93
92
                for jid in self.conn_holder.getRoster().keys():
94
93
                        entry = self.conn_holder.getRoster().getItem(jid)
95
94
                        if (entry['address'] == ipaddr):
96
95
                                from_jid = jid
97
 
                                break;
 
96
                                break
98
97
                P2PClient(sock[0], ipaddr, sock[1][1], self.conn_holder, [], from_jid)
99
98
 
100
99
        def disconnect(self, message=''):
106
105
                self.started = False
107
106
                try:
108
107
                        self._serv.close()
109
 
                except:
 
108
                except socket.error:
110
109
                        pass
111
110
                self.conn_holder.kill_all_connections()
112
111
 
121
120
        on_ok=None, on_not_ok=None):
122
121
                self._owner = self
123
122
                self.Namespace = 'jabber:client'
 
123
                self.protocol_type = 'XMPP'
124
124
                self.defaultNamespace = self.Namespace
125
125
                self._component = 0
126
126
                self._registered_name = None
131
131
                self.Server = host
132
132
                self.on_ok = on_ok
133
133
                self.on_not_ok = on_not_ok
134
 
                self.DBG = 'client'
135
134
                self.Connection = None
136
 
                if gajim.verbose:
137
 
                        debug = ['always', 'nodebuilder']
138
 
                else:
139
 
                        debug = []
140
 
                self._DEBUG = Debug.Debug(debug)
141
 
                self.DEBUG = self._DEBUG.Show
142
 
                self.debug_flags = self._DEBUG.debug_flags
143
 
                self.debug_flags.append(self.DBG)
144
135
                self.sock_hash = None
145
136
                if _sock:
146
137
                        self.sock_type = TYPE_SERVER
165
156
                                        if on_not_ok:
166
157
                                                on_not_ok('Connection to host could not be established.')
167
158
                                        return
168
 
                                if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
169
 
                                        self.conn_holder.number_of_awaiting_messages[self.fd] += 1
 
159
                                thread_id = stanza.getThread()
 
160
                                id_ = stanza.getID()
 
161
                                if not id_:
 
162
                                        id_ = self.Dispatcher.getAnID()
 
163
                                if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
 
164
                                        self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
 
165
                                                thread_id))
170
166
                                else:
171
 
                                        self.conn_holder.number_of_awaiting_messages[self.fd] = 1
 
167
                                        self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
 
168
                                                thread_id)]
 
169
 
 
170
                self.on_responses = {}
172
171
 
173
172
        def add_stanza(self, stanza, is_message=False):
174
173
                if self.Connection:
179
178
                        self.stanzaqueue.append((stanza, is_message))
180
179
 
181
180
                if is_message:
182
 
                        if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
183
 
                                self.conn_holder.number_of_awaiting_messages[self.fd] += 1
 
181
                        thread_id = stanza.getThread()
 
182
                        id_ = stanza.getID()
 
183
                        if not id_:
 
184
                                id_ = self.Dispatcher.getAnID()
 
185
                        if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
 
186
                                self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
 
187
                                        thread_id))
184
188
                        else:
185
 
                                self.conn_holder.number_of_awaiting_messages[self.fd] = 1
 
189
                                self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
 
190
                                        thread_id)]
186
191
 
187
192
                return True
188
193
 
189
194
        def on_message_sent(self, connection_id):
190
 
                self.conn_holder.number_of_awaiting_messages[connection_id] -= 1
 
195
                id_, thread_id = \
 
196
                        self.conn_holder.ids_of_awaiting_messages[connection_id].pop(0)
 
197
                if self.on_ok:
 
198
                        self.on_ok(id_)
 
199
                        # use on_ok only on first message. For others it's called in
 
200
                        # ClientZeroconf
 
201
                        self.on_ok = None
191
202
 
192
203
        def on_connect(self, conn):
193
204
                self.Connection = conn
194
205
                self.Connection.PlugIn(self)
195
206
                dispatcher_nb.Dispatcher().PlugIn(self)
196
207
                self._register_handlers()
197
 
                if self.on_ok:
198
 
                        self.on_ok()
199
208
 
200
209
        def StreamInit(self):
201
210
                ''' Send an initial stream header. '''
203
212
                self.Dispatcher.Stream._dispatch_depth = 2
204
213
                self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch
205
214
                self.Dispatcher.Stream.stream_header_received = self._check_stream_start
206
 
                self.debug_flags.append(simplexml.DBG_NODEBUILDER)
207
 
                self.Dispatcher.Stream.DEBUG = self.DEBUG
208
215
                self.Dispatcher.Stream.features = None
209
216
                if self.sock_type == TYPE_CLIENT:
210
217
                        self.send_stream_header()
221
228
                        self.Dispatcher._metastream)[:-2])
222
229
 
223
230
        def _check_stream_start(self, ns, tag, attrs):
224
 
                if ns<>NS_STREAMS or tag<>'stream':
225
 
                        self.Connection.DEBUG('Incorrect stream start: (%s,%s).Terminating! ' \
226
 
                                % (tag, ns), 'error')
 
231
                if ns != NS_STREAMS or tag != 'stream':
 
232
                        log.error('Incorrect stream start: (%s,%s).Terminating!' % (tag, ns), 'error')
227
233
                        self.Connection.disconnect()
228
234
                        if self.on_not_ok:
229
235
                                self.on_not_ok('Connection to host could not be established: Incorrect answer from server.')
246
252
 
247
253
        def on_disconnect(self):
248
254
                if self.conn_holder:
249
 
                        if self.conn_holder.number_of_awaiting_messages.has_key(self.fd):
250
 
                                del self.conn_holder.number_of_awaiting_messages[self.fd]
 
255
                        if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
 
256
                                del self.conn_holder.ids_of_awaiting_messages[self.fd]
251
257
                        self.conn_holder.remove_connection(self.sock_hash)
252
258
                if self.__dict__.has_key('Dispatcher'):
253
259
                        self.Dispatcher.PlugOut()
278
284
                self.onreceive(None)
279
285
                return True
280
286
 
 
287
        def remove_timeout(self):
 
288
                pass
 
289
 
281
290
        def _register_handlers(self):
282
291
                self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(
283
292
                        self.Server, conn, data))
293
302
                        common.xmpp.NS_BYTESTREAM)
294
303
                self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error',
295
304
                        common.xmpp.NS_BYTESTREAM)
 
305
                self.RegisterHandler('iq', self._caller._DiscoverItemsGetCB, 'get',
 
306
                        common.xmpp.NS_DISCO_ITEMS)
296
307
 
297
308
class P2PConnection(IdleObject, PlugIn):
298
309
        def __init__(self, sock_hash, _sock, host=None, port=None, caller=None,
300
311
                IdleObject.__init__(self)
301
312
                self._owner = client
302
313
                PlugIn.__init__(self)
303
 
                self.DBG_LINE = 'socket'
304
314
                self.sendqueue = []
305
315
                self.sendbuff = None
306
316
                self.buff_is_message = False
342
352
                        self._sock = socket.socket(*ai[:3])
343
353
                        self._sock.setblocking(False)
344
354
                        self._server = ai[4]
345
 
                except:
 
355
                except socket.error:
346
356
                        if sys.exc_value[0] != errno.EINPROGRESS:
347
357
                                # for all errors, we try other addresses
348
358
                                self.connect_to_next_ip()
403
413
                self._plug_idle()
404
414
 
405
415
        def read_timeout(self):
406
 
                if self.client.conn_holder.number_of_awaiting_messages.has_key(self.fd) \
407
 
                and self.client.conn_holder.number_of_awaiting_messages[self.fd] > 0:
408
 
                        self.client._caller.dispatch('MSGERROR',[unicode(self.client.to), -1,
409
 
                                _('Connection to host could not be established: Timeout while sending data.'), None, None])
410
 
                        self.client.conn_holder.number_of_awaiting_messages[self.fd] = 0
 
416
                ids = self.client.conn_holder.ids_of_awaiting_messages
 
417
                if self.fd in ids and len(ids[self.fd]) > 0:
 
418
                        for (id_, thread_id) in ids[self.fd]:
 
419
                                self._owner.Dispatcher.Event('', DATA_ERROR, (self.client.to,
 
420
                                        thread_id))
 
421
                        ids[self.fd] = []
411
422
                self.pollend()
412
423
 
413
424
        def do_connect(self):
473
484
                        if self._owner.sock_type == TYPE_CLIENT:
474
485
                                self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
475
486
                        if received.strip():
476
 
                                self.DEBUG(received, 'got')
 
487
                                log.debug('received: %s', received)
477
488
                        if hasattr(self._owner, 'Dispatcher'):
478
489
                                self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
479
490
                        self.on_receive(received)
480
491
                else:
481
492
                        # This should never happed, so we need the debug
482
 
                        self.DEBUG('Unhandled data received: %s' % received,'error')
 
493
                        log.error('Unhandled data received: %s' % received)
483
494
                        self.disconnect()
484
495
                return True
485
496
 
486
 
        def onreceive(self, recv_handler):
487
 
                if not recv_handler:
488
 
                        if hasattr(self._owner, 'Dispatcher'):
489
 
                                self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
490
 
                        else:
491
 
                                self.on_receive = None
492
 
                        return
493
 
                _tmp = self.on_receive
494
 
                # make sure this cb is not overriden by recursive calls
495
 
                if not recv_handler(None) and _tmp == self.on_receive:
496
 
                        self.on_receive = recv_handler
497
 
 
498
 
        def disconnect(self):
 
497
        def disconnect(self, message=''):
499
498
                ''' Closes the socket. '''
500
499
                gajim.idlequeue.remove_timeout(self.fd)
501
500
                gajim.idlequeue.unplug_idle(self.fd)
502
501
                try:
503
502
                        self._sock.shutdown(socket.SHUT_RDWR)
504
503
                        self._sock.close()
505
 
                except:
 
504
                except socket.error:
506
505
                        # socket is already closed
507
506
                        pass
508
507
                self.fd = -1
554
553
 
555
554
        def _on_send(self):
556
555
                if self.sent_data and self.sent_data.strip():
557
 
                        self.DEBUG(self.sent_data,'sent')
 
556
                        log.debug('sent: %s' % self.sent_data)
558
557
                        if hasattr(self._owner, 'Dispatcher'):
559
558
                                self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
560
559
                self.sent_data = None
563
562
                        self.buff_is_message = False
564
563
 
565
564
        def _on_send_failure(self):
566
 
                self.DEBUG("Socket error while sending data",'error')
567
 
                self._owner.disconnected()
 
565
                log.error('Socket error while sending data')
 
566
                self._owner.on_disconnect()
568
567
                self.sent_data = None
569
568
 
570
569
class ClientZeroconf:
578
577
                self.ip_to_hash = {}
579
578
                self.hash_to_port = {}
580
579
                self.listener = None
581
 
                self.number_of_awaiting_messages = {}
 
580
                self.ids_of_awaiting_messages = {}
582
581
 
583
582
        def connect(self, show, msg):
584
583
                self.port = self.start_listener(self.caller.port)
699
698
                # look for hashed connections
700
699
                if to in self.recipient_to_hash:
701
700
                        conn = self.connections[self.recipient_to_hash[to]]
702
 
                        if not stanza.getID():
703
 
                                id_ = conn.Dispatcher.getAnID()
704
 
                                stanza.setID(id_)
 
701
                        id_ = stanza.getID() or ''
705
702
                        if conn.add_stanza(stanza, is_message):
706
703
                                if on_ok:
707
 
                                        on_ok()
708
 
                                return id
 
704
                                        on_ok(id_)
 
705
                                return
709
706
 
710
707
                if item['address'] in self.ip_to_hash:
711
 
                        hash = self.ip_to_hash[item['address']]
712
 
                        if self.hash_to_port[hash] == item['port']:
713
 
                                conn = self.connections[hash]
714
 
                                if not stanza.getID():
715
 
                                        id_ = conn.Dispatcher.getAnID()
716
 
                                        stanza.setID(id_)
 
708
                        hash_ = self.ip_to_hash[item['address']]
 
709
                        if self.hash_to_port[hash_] == item['port']:
 
710
                                conn = self.connections[hash_]
 
711
                                id_ = stanza.getID() or ''
717
712
                                if conn.add_stanza(stanza, is_message):
718
713
                                        if on_ok:
719
 
                                                on_ok()
720
 
                                        return id
 
714
                                                on_ok(id_)
 
715
                                        return
721
716
 
722
717
                # otherwise open new connection
723
718
                if not stanza.getID():
725
720
                P2PClient(None, item['address'], item['port'], self,
726
721
                        [(stanza, is_message)], to, on_ok=on_ok, on_not_ok=on_not_ok)
727
722
 
728
 
                return 'zero'
 
723
        def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
 
724
                '''
 
725
                Send stanza and wait for recipient's response to it. Will call transports
 
726
                on_timeout callback if response is not retrieved in time.
 
727
 
 
728
                Be aware: Only timeout of latest call of SendAndWait is active.
 
729
                '''
 
730
#               if timeout is None:
 
731
#                       timeout = DEFAULT_TIMEOUT_SECONDS
 
732
                def on_ok(_waitid):
 
733
#                       if timeout:
 
734
#                               self._owner.set_timeout(timeout)
 
735
                        to = stanza.getTo()
 
736
                        conn = None
 
737
                        if to in self.recipient_to_hash:
 
738
                                conn = self.connections[self.recipient_to_hash[to]]
 
739
                        elif item['address'] in self.ip_to_hash:
 
740
                                hash_ = self.ip_to_hash[item['address']]
 
741
                                if self.hash_to_port[hash_] == item['port']:
 
742
                                        conn = self.connections[hash_]
 
743
                        if func:
 
744
                                conn.Dispatcher.on_responses[_waitid] = (func, args)
 
745
                        conn.onreceive(conn.Dispatcher._WaitForData)
 
746
                        conn.Dispatcher._expected[_waitid] = None
 
747
                self.send(stanza, on_ok=on_ok)
 
748
 
 
749
        def SendAndCallForResponse(self, stanza, func=None, args=None):
 
750
                ''' Put stanza on the wire and call back when recipient replies.
 
751
                        Additional callback arguments can be specified in args. '''
 
752
                self.SendAndWaitForResponse(stanza, 0, func, args)
729
753
 
730
754
# vim: se ts=3: