~ubuntu-branches/ubuntu/raring/telepathy-idle/raring-proposed

« back to all changes in this revision

Viewing changes to tests/twisted/channels/requests-muc.py

  • Committer: Bazaar Package Importer
  • Author(s): Jonny Lamb
  • Date: 2009-09-14 14:46:37 UTC
  • mfrom: (1.2.3 upstream)
  • Revision ID: james.westby@ubuntu.com-20090914144637-y15kev7u8prmllee
Tags: 0.1.5-1
* New upstream release.
* debian/rules: Pass --disable-silent-rules to get more verbose build logs.
* debian/control: Upped Standards-Version. (no changes)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Test connecting to a IRC channel via the Requests interface
 
3
"""
 
4
 
 
5
from idletest import exec_test, BaseIRCServer
 
6
from servicetest import EventPattern, call_async, sync_dbus, make_channel_proxy
 
7
import constants as cs
 
8
import dbus
 
9
 
 
10
class DelayJoinServer(BaseIRCServer):
 
11
    def handleJOIN(self, args):
 
12
        # do nothing; wait for the test to call sendJoin().
 
13
        return
 
14
 
 
15
def test(q, bus, conn, stream):
 
16
    conn.Connect()
 
17
    q.expect_many(
 
18
            EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
 
19
            EventPattern('irc-connected'))
 
20
    q.expect('dbus-signal', signal='SelfHandleChanged')
 
21
    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
 
22
 
 
23
    self_handle = conn.GetSelfHandle()
 
24
 
 
25
    request = dbus.Dictionary({
 
26
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
 
27
        cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
 
28
        cs.TARGET_ID: '#idletest',
 
29
    }, signature='sv')
 
30
 
 
31
    call_async(q, conn, 'CreateChannel', request,
 
32
        dbus_interface=cs.CONN_IFACE_REQUESTS)
 
33
 
 
34
    # Idle should try to join the channel.
 
35
    q.expect('stream-JOIN')
 
36
 
 
37
    # Meanwhile, in another application...
 
38
 
 
39
    call_async(q, conn, 'EnsureChannel', request,
 
40
        dbus_interface=cs.CONN_IFACE_REQUESTS)
 
41
 
 
42
    sync_dbus(bus, q, conn)
 
43
 
 
44
    # Now the ircd responds:
 
45
    stream.sendJoin('#idletest')
 
46
 
 
47
    cc, ec = q.expect_many(
 
48
        EventPattern('dbus-return', method='CreateChannel'),
 
49
        EventPattern('dbus-return', method='EnsureChannel'),
 
50
        )
 
51
    nc = q.expect('dbus-signal', signal='NewChannels')
 
52
 
 
53
    path, props = cc.value
 
54
 
 
55
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT
 
56
    assert sorted(props[cs.INTERFACES]) == \
 
57
        sorted([cs.CHANNEL_IFACE_GROUP,
 
58
                cs.CHANNEL_IFACE_PASSWORD,
 
59
                cs.TP_AWKWARD_PROPERTIES,
 
60
               ])
 
61
    assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
 
62
    assert props[cs.TARGET_ID] == '#idletest'
 
63
    assert props[cs.TARGET_HANDLE] == \
 
64
        conn.RequestHandles(cs.HT_ROOM, ['#idletest'])[0]
 
65
    assert props[cs.REQUESTED]
 
66
    assert props[cs.INITIATOR_HANDLE] == self_handle
 
67
    assert props[cs.INITIATOR_ID] == \
 
68
        conn.InspectHandles(cs.HT_CONTACT, [self_handle])[0]
 
69
 
 
70
    ec_yours, ec_path, ec_props = ec.value
 
71
    assert not ec_yours
 
72
    assert ec_path == path
 
73
    assert ec_props == props
 
74
 
 
75
    channels = nc.args[0]
 
76
    assert len(channels) == 1
 
77
    nc_path, nc_props = channels[0]
 
78
    assert nc_path == path
 
79
    assert nc_props == props
 
80
 
 
81
    # And again?
 
82
    ec_ = conn.EnsureChannel(request,
 
83
        dbus_interface=cs.CONN_IFACE_REQUESTS)
 
84
    assert ec.value == ec_
 
85
 
 
86
    chans = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
 
87
        dbus_interface=cs.PROPERTIES_IFACE)
 
88
    assert len(chans) == 1
 
89
    assert chans[0] == (path, props)
 
90
 
 
91
    chan = make_channel_proxy(conn, path, 'Channel')
 
92
    chan.RemoveMembers([self_handle], "", dbus_interface=cs.CHANNEL_IFACE_GROUP)
 
93
 
 
94
    q.expect('stream-PART')
 
95
    stream.sendPart('#idletest', stream.nick)
 
96
 
 
97
    q.expect('dbus-signal', signal='Closed')
 
98
 
 
99
    chans = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
 
100
        dbus_interface=cs.PROPERTIES_IFACE)
 
101
    assert len(chans) == 0
 
102
 
 
103
if __name__ == '__main__':
 
104
    exec_test(test, protocol=DelayJoinServer)
 
105