~ken-vandine/ubuntu/precise/telepathy-gabble/ubuntu

« back to all changes in this revision

Viewing changes to tests/twisted/tubes/accept-muc-dbus-tube.py

  • Committer: Bazaar Package Importer
  • Author(s): Dafydd Harries
  • Date: 2009-08-11 10:25:56 UTC
  • mfrom: (38.2.6 sid)
  • Revision ID: james.westby@ubuntu.com-20090811102556-bh1bc6tch1mmrbxs
New upstream version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 
3
3
from servicetest import call_async, EventPattern
4
4
from gabbletest import exec_test, acknowledge_iq, make_muc_presence
5
 
import constants as c
 
5
import constants as cs
6
6
 
7
7
from twisted.words.xish import xpath
8
8
import ns
13
13
    conn.Connect()
14
14
 
15
15
    _, iq_event = q.expect_many(
16
 
        EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
 
16
        EventPattern('dbus-signal', signal='StatusChanged',
 
17
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED]),
17
18
        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
18
19
            query_name='vCard'))
19
20
 
51
52
    channels = event.args[0]
52
53
    path, props = channels[0]
53
54
 
54
 
    assert props[c.CHANNEL_TYPE] == c.CHANNEL_TYPE_DBUS_TUBE
55
 
    assert props[c.INITIATOR_ID] == 'chat@conf.localhost/bob'
56
 
    bob_handle = props[c.INITIATOR_HANDLE]
57
 
    assert props[c.INTERFACES] == [c.CHANNEL_IFACE_GROUP, c.CHANNEL_IFACE_TUBE]
58
 
    assert props[c.REQUESTED] == False
59
 
    assert props[c.TARGET_ID] == 'chat@conf.localhost'
60
 
    assert props[c.DBUS_TUBE_SERVICE_NAME] == 'com.example.Test'
61
 
    assert props[c.TUBE_PARAMETERS] == {'foo': 'bar'}
62
 
    assert props[c.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [c.SOCKET_ACCESS_CONTROL_CREDENTIALS,
63
 
        c.SOCKET_ACCESS_CONTROL_LOCALHOST]
 
55
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
 
56
    assert props[cs.INITIATOR_ID] == 'chat@conf.localhost/bob'
 
57
    bob_handle = props[cs.INITIATOR_HANDLE]
 
58
    assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP, cs.CHANNEL_IFACE_TUBE]
 
59
    assert props[cs.REQUESTED] == False
 
60
    assert props[cs.TARGET_ID] == 'chat@conf.localhost'
 
61
    assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.Test'
 
62
    assert props[cs.TUBE_PARAMETERS] == {'foo': 'bar'}
 
63
    assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
 
64
        cs.SOCKET_ACCESS_CONTROL_LOCALHOST]
64
65
 
65
66
    tube_chan = bus.get_object(conn.bus_name, path)
66
 
    tube_iface = dbus.Interface(tube_chan, c.CHANNEL_IFACE_TUBE)
67
 
    dbus_tube_iface = dbus.Interface(tube_chan, c.CHANNEL_TYPE_DBUS_TUBE)
68
 
    tube_chan_iface = dbus.Interface(tube_chan, c.CHANNEL)
 
67
    tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_IFACE_TUBE)
 
68
    dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
 
69
    tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
69
70
 
70
71
    # only Bob is in DBusNames
71
 
    dbus_names = tube_chan.Get(c.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=c.PROPERTIES_IFACE)
 
72
    dbus_names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
72
73
    assert dbus_names == {bob_handle: bob_bus_name}
73
74
 
74
75
    call_async(q, dbus_tube_iface, 'Accept', access_control)
75
76
 
76
77
    return_event, names_changed, presence_event = q.expect_many(
77
78
        EventPattern('dbus-return', method='Accept'),
78
 
        EventPattern('dbus-signal', signal='DBusNamesChanged', interface=c.CHANNEL_TYPE_DBUS_TUBE),
 
79
        EventPattern('dbus-signal', signal='DBusNamesChanged', interface=cs.CHANNEL_TYPE_DBUS_TUBE),
79
80
        EventPattern('stream-presence', to='chat@conf.localhost/test'))
80
81
 
81
82
    tube_addr = return_event.value[0]
90
91
    assert tube_node['id'] == '1'
91
92
    self_bus_name = tube_node['dbus-name']
92
93
 
93
 
    tubes_self_handle = tube_chan.GetSelfHandle(dbus_interface=c.CHANNEL_IFACE_GROUP)
 
94
    tubes_self_handle = tube_chan.GetSelfHandle(dbus_interface=cs.CHANNEL_IFACE_GROUP)
94
95
    assert tubes_self_handle != 0
95
96
 
96
97
    # both of us are in DBusNames now
97
 
    dbus_names = tube_chan.Get(c.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=c.PROPERTIES_IFACE)
 
98
    dbus_names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
98
99
    assert dbus_names == {bob_handle: bob_bus_name, tubes_self_handle: self_bus_name}
99
100
 
100
101
    added, removed = names_changed.args
109
110
if __name__ == '__main__':
110
111
    # We can't use t.exec_dbus_tube_test() as we can use only the muc bytestream
111
112
    exec_test(lambda q, bus, conn, stream:
112
 
        test(q, bus, conn, stream, c.SOCKET_ACCESS_CONTROL_CREDENTIALS))
 
113
        test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_CREDENTIALS))
113
114
    exec_test(lambda q, bus, conn, stream:
114
 
        test(q, bus, conn, stream, c.SOCKET_ACCESS_CONTROL_LOCALHOST))
 
115
        test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_LOCALHOST))