2
Test connecting to a IRC channel via the Requests interface
5
from idletest import exec_test, BaseIRCServer
6
from servicetest import EventPattern, call_async, sync_dbus, make_channel_proxy
10
class DelayJoinServer(BaseIRCServer):
11
def handleJOIN(self, args):
12
# do nothing; wait for the test to call sendJoin().
15
def test(q, bus, conn, stream):
18
EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
19
EventPattern('irc-connected'))
20
q.expect('dbus-signal', signal='SelfHandleChanged')
21
q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
23
self_handle = conn.GetSelfHandle()
25
request = dbus.Dictionary({
26
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
27
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
28
cs.TARGET_ID: '#idletest',
31
call_async(q, conn, 'CreateChannel', request,
32
dbus_interface=cs.CONN_IFACE_REQUESTS)
34
# Idle should try to join the channel.
35
q.expect('stream-JOIN')
37
# Meanwhile, in another application...
39
call_async(q, conn, 'EnsureChannel', request,
40
dbus_interface=cs.CONN_IFACE_REQUESTS)
42
sync_dbus(bus, q, conn)
44
# Now the ircd responds:
45
stream.sendJoin('#idletest')
47
cc, ec = q.expect_many(
48
EventPattern('dbus-return', method='CreateChannel'),
49
EventPattern('dbus-return', method='EnsureChannel'),
51
nc = q.expect('dbus-signal', signal='NewChannels')
53
path, props = cc.value
55
assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT
56
assert sorted(props[cs.INTERFACES]) == \
57
sorted([cs.CHANNEL_IFACE_GROUP,
58
cs.CHANNEL_IFACE_PASSWORD,
59
cs.TP_AWKWARD_PROPERTIES,
61
assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
62
assert props[cs.TARGET_ID] == '#idletest'
63
assert props[cs.TARGET_HANDLE] == \
64
conn.RequestHandles(cs.HT_ROOM, ['#idletest'])[0]
65
assert props[cs.REQUESTED]
66
assert props[cs.INITIATOR_HANDLE] == self_handle
67
assert props[cs.INITIATOR_ID] == \
68
conn.InspectHandles(cs.HT_CONTACT, [self_handle])[0]
70
ec_yours, ec_path, ec_props = ec.value
72
assert ec_path == path
73
assert ec_props == props
76
assert len(channels) == 1
77
nc_path, nc_props = channels[0]
78
assert nc_path == path
79
assert nc_props == props
82
ec_ = conn.EnsureChannel(request,
83
dbus_interface=cs.CONN_IFACE_REQUESTS)
84
assert ec.value == ec_
86
chans = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
87
dbus_interface=cs.PROPERTIES_IFACE)
88
assert len(chans) == 1
89
assert chans[0] == (path, props)
91
chan = make_channel_proxy(conn, path, 'Channel')
92
chan.RemoveMembers([self_handle], "", dbus_interface=cs.CHANNEL_IFACE_GROUP)
94
q.expect('stream-PART')
95
stream.sendPart('#idletest', stream.nick)
97
q.expect('dbus-signal', signal='Closed')
99
chans = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
100
dbus_interface=cs.PROPERTIES_IFACE)
101
assert len(chans) == 0
103
if __name__ == '__main__':
104
exec_test(test, protocol=DelayJoinServer)