~ubuntu-branches/ubuntu/utopic/telepathy-python/utopic

« back to all changes in this revision

Viewing changes to examples/stream_tube_client.py

  • Committer: Bazaar Package Importer
  • Author(s): Simon McVittie
  • Date: 2008-02-21 10:42:31 UTC
  • mfrom: (1.2.1 upstream) (7.1.10 hardy)
  • Revision ID: james.westby@ubuntu.com-20080221104231-88bloeih42cmsb0x
* New upstream version 0.15.0
* Don't mention Cohoba and telepathy-msn in description (-msn is now
  -butterfly, and Cohoba is obsolete)
* Standards-Version: 3.7.3 (no changes)
* Add XS-Dm-Upload-Allowed: yes so I can upload it in future

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import dbus.glib
 
2
import gobject
 
3
import sys
 
4
import time
 
5
import os
 
6
import socket
 
7
import tempfile
 
8
import random
 
9
import string
 
10
 
 
11
from telepathy.client import (
 
12
        Connection, Channel)
 
13
from telepathy.interfaces import (
 
14
        CONN_INTERFACE, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_TUBES,
 
15
        CHANNEL_TYPE_TEXT)
 
16
from telepathy.constants import (
 
17
        CONNECTION_HANDLE_TYPE_CONTACT,
 
18
        CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_STATUS_CONNECTED,
 
19
        CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_CONNECTING,
 
20
        TUBE_TYPE_DBUS, TUBE_TYPE_STREAM, TUBE_STATE_LOCAL_PENDING,
 
21
        TUBE_STATE_REMOTE_PENDING, TUBE_STATE_OPEN,
 
22
        SOCKET_ADDRESS_TYPE_UNIX, SOCKET_ADDRESS_TYPE_ABSTRACT_UNIX,
 
23
        SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ADDRESS_TYPE_IPV6,
 
24
        SOCKET_ACCESS_CONTROL_LOCALHOST, SOCKET_ACCESS_CONTROL_PORT,
 
25
        SOCKET_ACCESS_CONTROL_NETMASK, SOCKET_ACCESS_CONTROL_CREDENTIALS)
 
26
 
 
27
from account import connection_from_file
 
28
 
 
29
tube_type = {TUBE_TYPE_DBUS: "D-Bus",\
 
30
             TUBE_TYPE_STREAM: "Stream"}
 
31
 
 
32
tube_state = {TUBE_STATE_LOCAL_PENDING : 'local pending',\
 
33
              TUBE_STATE_REMOTE_PENDING : 'remote pending',\
 
34
              TUBE_STATE_OPEN : 'open'}
 
35
 
 
36
SERVICE = "x-example"
 
37
 
 
38
loop = None
 
39
 
 
40
class StreamTubeClient:
 
41
    def __init__(self, account_file, muc_id, contact_id):
 
42
        self.conn = connection_from_file(account_file)
 
43
        self.muc_id = muc_id
 
44
        self.contact_id = contact_id
 
45
 
 
46
        self.joined = False
 
47
 
 
48
        assert self.muc_id is None or self.contact_id is None
 
49
 
 
50
        self.conn[CONN_INTERFACE].connect_to_signal('StatusChanged',
 
51
            self.status_changed_cb)
 
52
        self.conn[CONN_INTERFACE].connect_to_signal("NewChannel",
 
53
                self.new_channel_cb)
 
54
 
 
55
    def run(self):
 
56
        self.conn[CONN_INTERFACE].Connect()
 
57
 
 
58
        loop = gobject.MainLoop()
 
59
        try:
 
60
            loop.run()
 
61
        finally:
 
62
            try:
 
63
                self.conn[CONN_INTERFACE].Disconnect()
 
64
            except:
 
65
                pass
 
66
 
 
67
    def status_changed_cb(self, state, reason):
 
68
        if state == CONNECTION_STATUS_CONNECTING:
 
69
            print 'connecting'
 
70
        elif state == CONNECTION_STATUS_CONNECTED:
 
71
            print 'connected'
 
72
            self.connected_cb()
 
73
        elif state == CONNECTION_STATUS_DISCONNECTED:
 
74
            print 'disconnected'
 
75
            loop.quit()
 
76
 
 
77
    def connected_cb(self):
 
78
        self.self_handle = self.conn[CONN_INTERFACE].GetSelfHandle()
 
79
 
 
80
    def join_muc(self):
 
81
        # workaround to be sure that the muc service is fully resolved in
 
82
        # Salut.
 
83
        time.sleep(2)
 
84
 
 
85
        print "join muc", self.muc_id
 
86
        handle = self.conn[CONN_INTERFACE].RequestHandles(
 
87
            CONNECTION_HANDLE_TYPE_ROOM, [self.muc_id])[0]
 
88
 
 
89
        chan_path = self.conn[CONN_INTERFACE].RequestChannel(
 
90
            CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM,
 
91
            handle, True)
 
92
 
 
93
        self.channel_text = Channel(self.conn.dbus_proxy.bus_name, chan_path)
 
94
 
 
95
        self.self_handle = self.channel_text[CHANNEL_INTERFACE_GROUP].GetSelfHandle()
 
96
        self.channel_text[CHANNEL_INTERFACE_GROUP].connect_to_signal(
 
97
                "MembersChanged", self.text_channel_members_changed_cb)
 
98
 
 
99
        chan_path = self.conn[CONN_INTERFACE].RequestChannel(
 
100
            CHANNEL_TYPE_TUBES, CONNECTION_HANDLE_TYPE_ROOM,
 
101
            handle, True)
 
102
        self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name, chan_path)
 
103
 
 
104
        if self.self_handle in self.channel_text[CHANNEL_INTERFACE_GROUP].GetMembers():
 
105
            self.joined = True
 
106
            self.muc_joined()
 
107
 
 
108
    def new_channel_cb(self, object_path, channel_type, handle_type, handle,
 
109
        suppress_handler):
 
110
      if channel_type == CHANNEL_TYPE_TUBES:
 
111
            self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name,
 
112
                    object_path)
 
113
 
 
114
            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
 
115
                    "TubeStateChanged", self.tube_state_changed_cb)
 
116
            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
 
117
                    "NewTube", self.new_tube_cb)
 
118
            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
 
119
                    "TubeClosed", self.tube_closed_cb)
 
120
            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
 
121
                   "StreamTubeNewConnection",
 
122
                   self.stream_tube_new_connection_cb)
 
123
 
 
124
            for tube in self.channel_tubes[CHANNEL_TYPE_TUBES].ListTubes():
 
125
                id, initiator, type, service, params, state = (tube[0],
 
126
                        tube[1], tube[2], tube[3], tube[4], tube[5])
 
127
                self.new_tube_cb(id, initiator, type, service, params, state)
 
128
 
 
129
    def text_channel_members_changed_cb(self, message, added, removed,
 
130
            local_pending, remote_pending, actor, reason):
 
131
        if self.self_handle in added and not self.joined:
 
132
            self.joined = True
 
133
            self.muc_joined()
 
134
 
 
135
    def muc_joined(self):
 
136
        pass
 
137
 
 
138
    def new_tube_cb(self, id, initiator, type, service, params, state):
 
139
        initiator_id = self.conn[CONN_INTERFACE].InspectHandles(
 
140
                CONNECTION_HANDLE_TYPE_CONTACT, [initiator])[0]
 
141
 
 
142
        print "new %s tube (%d) offered by %s. Service: %s. State: %s" % (
 
143
                tube_type[type], id, initiator_id, service, tube_state[state])
 
144
 
 
145
        if state == TUBE_STATE_OPEN:
 
146
            self.tube_opened(id)
 
147
 
 
148
    def tube_opened(self, id):
 
149
        pass
 
150
 
 
151
    def tube_state_changed_cb(self, id, state):
 
152
        if state == TUBE_STATE_OPEN:
 
153
            self.tube_opened(id)
 
154
 
 
155
    def tube_closed_cb(self, id):
 
156
        print "tube closed", id
 
157
 
 
158
    def stream_tube_new_connection_cb(self, id, handle):
 
159
       print "new socket connection on tube %u from %s" % (id,
 
160
               self.conn[CONN_INTERFACE].InspectHandles(
 
161
                   CONNECTION_HANDLE_TYPE_CONTACT, [handle])[0])
 
162
 
 
163
class StreamTubeInitiatorClient(StreamTubeClient):
 
164
    def __init__(self, account_file, muc_id, contact_id, socket_address=None):
 
165
        StreamTubeClient.__init__(self, account_file, muc_id, contact_id)
 
166
 
 
167
        if socket_address is None:
 
168
            self.server = TrivialStreamServer()
 
169
            self.server.run()
 
170
            socket_address = self.server.socket_address
 
171
            self.socket_address = (socket_address[0],
 
172
                    dbus.UInt16(socket_address[1]))
 
173
        else:
 
174
            print "Will export socket", socket_address
 
175
            self.socket_address = socket_address
 
176
 
 
177
    def offer_tube(self):
 
178
        params = {"login": "badger", "a_int" : 69}
 
179
        print "offer tube"
 
180
        id = self.channel_tubes[CHANNEL_TYPE_TUBES].OfferStreamTube(SERVICE,
 
181
                params, SOCKET_ADDRESS_TYPE_IPV4, self.socket_address,
 
182
                SOCKET_ACCESS_CONTROL_LOCALHOST, "")
 
183
 
 
184
class StreamTubeJoinerClient(StreamTubeClient):
 
185
    def __init__(self, account_file, muc_id, contact_id, connect_trivial_client):
 
186
        StreamTubeClient.__init__(self, account_file, muc_id, contact_id)
 
187
 
 
188
        self.tube_accepted = False
 
189
        self.connect_trivial_client = connect_trivial_client
 
190
 
 
191
    def new_tube_cb(self, id, initiator, type, service, params, state):
 
192
        StreamTubeClient.new_tube_cb(self, id, initiator, type, service, params, state)
 
193
 
 
194
        if state == TUBE_STATE_LOCAL_PENDING and service == SERVICE and\
 
195
                not self.tube_accepted:
 
196
            print "accept tube", id
 
197
            self.tube_accepted = True
 
198
            self.channel_tubes[CHANNEL_TYPE_TUBES].AcceptStreamTube(id,
 
199
                    SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, "")
 
200
 
 
201
    def tube_opened(self, id):
 
202
        StreamTubeClient.tube_opened(self, id)
 
203
 
 
204
        address_type, address = self.channel_tubes[CHANNEL_TYPE_TUBES].GetStreamTubeSocketAddress(
 
205
                id, byte_arrays=True)
 
206
        print "tube opened. Clients can connect to", address
 
207
 
 
208
        if self.connect_trivial_client:
 
209
            self.client = TrivialStreamClient(address)
 
210
            self.client.connect()
 
211
 
 
212
class TrivialStream:
 
213
    def __init__(self, socket_address=None):
 
214
        self.socket_address = socket_address
 
215
 
 
216
    def read_socket(self, s):
 
217
        try:
 
218
            data = s.recv(1024)
 
219
            if len(data) > 0:
 
220
                print "received:", data
 
221
        except socket.error, e:
 
222
            pass
 
223
        return True
 
224
 
 
225
    def write_socket(self, s, msg):
 
226
        print "send:", msg
 
227
        try:
 
228
            s = s.send(msg)
 
229
        except socket.error, e:
 
230
            pass
 
231
        return True
 
232
 
 
233
class TrivialStreamServer(TrivialStream):
 
234
    def __init__(self):
 
235
        TrivialStream.__init__(self)
 
236
 
 
237
    def run(self):
 
238
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
239
        s.setblocking(1)
 
240
        s.settimeout(0.1)
 
241
        s.bind(("127.0.0.1", 0))
 
242
 
 
243
        self.socket_address = s.getsockname()
 
244
        print "Trivial Server lauched on socket", self.socket_address
 
245
        s.listen(1)
 
246
 
 
247
        gobject.timeout_add(1000, self.accept_client, s)
 
248
 
 
249
    def accept_client(self, s):
 
250
        try:
 
251
            s2, addr = s.accept()
 
252
            s2.setblocking(1)
 
253
            s2.setblocking(0.1)
 
254
            self.handle_client(s2)
 
255
            return True
 
256
        except socket.timeout:
 
257
            return True
 
258
 
 
259
    def handle_client(self, s):
 
260
        gobject.timeout_add(5000, self.write_socket, s, "hi !")
 
261
 
 
262
class TrivialStreamClient(TrivialStream):
 
263
    def __init__(self, socket_address):
 
264
        TrivialStream.__init__(self, socket_address)
 
265
 
 
266
    def connect(self):
 
267
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
268
        s.connect(self.socket_address)
 
269
        print "Trivial client connected to", self.socket_address
 
270
        gobject.timeout_add(1000, self.read_socket, s)