11
from telepathy.client import (
13
from telepathy.interfaces import (
14
CONN_INTERFACE, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_TUBES,
16
from telepathy.constants import (
17
CONNECTION_HANDLE_TYPE_CONTACT,
18
CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_STATUS_CONNECTED,
19
CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_CONNECTING,
20
TUBE_TYPE_DBUS, TUBE_TYPE_STREAM, TUBE_STATE_LOCAL_PENDING,
21
TUBE_STATE_REMOTE_PENDING, TUBE_STATE_OPEN,
22
SOCKET_ADDRESS_TYPE_UNIX, SOCKET_ADDRESS_TYPE_ABSTRACT_UNIX,
23
SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ADDRESS_TYPE_IPV6,
24
SOCKET_ACCESS_CONTROL_LOCALHOST, SOCKET_ACCESS_CONTROL_PORT,
25
SOCKET_ACCESS_CONTROL_NETMASK, SOCKET_ACCESS_CONTROL_CREDENTIALS)
27
from account import connection_from_file
29
tube_type = {TUBE_TYPE_DBUS: "D-Bus",\
30
TUBE_TYPE_STREAM: "Stream"}
32
tube_state = {TUBE_STATE_LOCAL_PENDING : 'local pending',\
33
TUBE_STATE_REMOTE_PENDING : 'remote pending',\
34
TUBE_STATE_OPEN : 'open'}
40
class StreamTubeClient:
41
def __init__(self, account_file, muc_id, contact_id):
42
self.conn = connection_from_file(account_file)
44
self.contact_id = contact_id
48
assert self.muc_id is None or self.contact_id is None
50
self.conn[CONN_INTERFACE].connect_to_signal('StatusChanged',
51
self.status_changed_cb)
52
self.conn[CONN_INTERFACE].connect_to_signal("NewChannel",
56
self.conn[CONN_INTERFACE].Connect()
58
loop = gobject.MainLoop()
63
self.conn[CONN_INTERFACE].Disconnect()
67
def status_changed_cb(self, state, reason):
68
if state == CONNECTION_STATUS_CONNECTING:
70
elif state == CONNECTION_STATUS_CONNECTED:
73
elif state == CONNECTION_STATUS_DISCONNECTED:
77
def connected_cb(self):
78
self.self_handle = self.conn[CONN_INTERFACE].GetSelfHandle()
81
# workaround to be sure that the muc service is fully resolved in
85
print "join muc", self.muc_id
86
handle = self.conn[CONN_INTERFACE].RequestHandles(
87
CONNECTION_HANDLE_TYPE_ROOM, [self.muc_id])[0]
89
chan_path = self.conn[CONN_INTERFACE].RequestChannel(
90
CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM,
93
self.channel_text = Channel(self.conn.dbus_proxy.bus_name, chan_path)
95
self.self_handle = self.channel_text[CHANNEL_INTERFACE_GROUP].GetSelfHandle()
96
self.channel_text[CHANNEL_INTERFACE_GROUP].connect_to_signal(
97
"MembersChanged", self.text_channel_members_changed_cb)
99
chan_path = self.conn[CONN_INTERFACE].RequestChannel(
100
CHANNEL_TYPE_TUBES, CONNECTION_HANDLE_TYPE_ROOM,
102
self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name, chan_path)
104
if self.self_handle in self.channel_text[CHANNEL_INTERFACE_GROUP].GetMembers():
108
def new_channel_cb(self, object_path, channel_type, handle_type, handle,
110
if channel_type == CHANNEL_TYPE_TUBES:
111
self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name,
114
self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
115
"TubeStateChanged", self.tube_state_changed_cb)
116
self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
117
"NewTube", self.new_tube_cb)
118
self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
119
"TubeClosed", self.tube_closed_cb)
120
self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
121
"StreamTubeNewConnection",
122
self.stream_tube_new_connection_cb)
124
for tube in self.channel_tubes[CHANNEL_TYPE_TUBES].ListTubes():
125
id, initiator, type, service, params, state = (tube[0],
126
tube[1], tube[2], tube[3], tube[4], tube[5])
127
self.new_tube_cb(id, initiator, type, service, params, state)
129
def text_channel_members_changed_cb(self, message, added, removed,
130
local_pending, remote_pending, actor, reason):
131
if self.self_handle in added and not self.joined:
135
def muc_joined(self):
138
def new_tube_cb(self, id, initiator, type, service, params, state):
139
initiator_id = self.conn[CONN_INTERFACE].InspectHandles(
140
CONNECTION_HANDLE_TYPE_CONTACT, [initiator])[0]
142
print "new %s tube (%d) offered by %s. Service: %s. State: %s" % (
143
tube_type[type], id, initiator_id, service, tube_state[state])
145
if state == TUBE_STATE_OPEN:
148
def tube_opened(self, id):
151
def tube_state_changed_cb(self, id, state):
152
if state == TUBE_STATE_OPEN:
155
def tube_closed_cb(self, id):
156
print "tube closed", id
158
def stream_tube_new_connection_cb(self, id, handle):
159
print "new socket connection on tube %u from %s" % (id,
160
self.conn[CONN_INTERFACE].InspectHandles(
161
CONNECTION_HANDLE_TYPE_CONTACT, [handle])[0])
163
class StreamTubeInitiatorClient(StreamTubeClient):
164
def __init__(self, account_file, muc_id, contact_id, socket_address=None):
165
StreamTubeClient.__init__(self, account_file, muc_id, contact_id)
167
if socket_address is None:
168
self.server = TrivialStreamServer()
170
socket_address = self.server.socket_address
171
self.socket_address = (socket_address[0],
172
dbus.UInt16(socket_address[1]))
174
print "Will export socket", socket_address
175
self.socket_address = socket_address
177
def offer_tube(self):
178
params = {"login": "badger", "a_int" : 69}
180
id = self.channel_tubes[CHANNEL_TYPE_TUBES].OfferStreamTube(SERVICE,
181
params, SOCKET_ADDRESS_TYPE_IPV4, self.socket_address,
182
SOCKET_ACCESS_CONTROL_LOCALHOST, "")
184
class StreamTubeJoinerClient(StreamTubeClient):
185
def __init__(self, account_file, muc_id, contact_id, connect_trivial_client):
186
StreamTubeClient.__init__(self, account_file, muc_id, contact_id)
188
self.tube_accepted = False
189
self.connect_trivial_client = connect_trivial_client
191
def new_tube_cb(self, id, initiator, type, service, params, state):
192
StreamTubeClient.new_tube_cb(self, id, initiator, type, service, params, state)
194
if state == TUBE_STATE_LOCAL_PENDING and service == SERVICE and\
195
not self.tube_accepted:
196
print "accept tube", id
197
self.tube_accepted = True
198
self.channel_tubes[CHANNEL_TYPE_TUBES].AcceptStreamTube(id,
199
SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, "")
201
def tube_opened(self, id):
202
StreamTubeClient.tube_opened(self, id)
204
address_type, address = self.channel_tubes[CHANNEL_TYPE_TUBES].GetStreamTubeSocketAddress(
205
id, byte_arrays=True)
206
print "tube opened. Clients can connect to", address
208
if self.connect_trivial_client:
209
self.client = TrivialStreamClient(address)
210
self.client.connect()
213
def __init__(self, socket_address=None):
214
self.socket_address = socket_address
216
def read_socket(self, s):
220
print "received:", data
221
except socket.error, e:
225
def write_socket(self, s, msg):
229
except socket.error, e:
233
class TrivialStreamServer(TrivialStream):
235
TrivialStream.__init__(self)
238
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
241
s.bind(("127.0.0.1", 0))
243
self.socket_address = s.getsockname()
244
print "Trivial Server lauched on socket", self.socket_address
247
gobject.timeout_add(1000, self.accept_client, s)
249
def accept_client(self, s):
251
s2, addr = s.accept()
254
self.handle_client(s2)
256
except socket.timeout:
259
def handle_client(self, s):
260
gobject.timeout_add(5000, self.write_socket, s, "hi !")
262
class TrivialStreamClient(TrivialStream):
263
def __init__(self, socket_address):
264
TrivialStream.__init__(self, socket_address)
267
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
268
s.connect(self.socket_address)
269
print "Trivial client connected to", self.socket_address
270
gobject.timeout_add(1000, self.read_socket, s)