8
8
from idletest import exec_test
9
9
from servicetest import EventPattern, call_async
10
from constants import *
12
13
def test(q, bus, conn, stream):
14
15
q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
15
16
CHANNEL_NAME = '#idletest'
16
room_handles = conn.RequestHandles(2, [CHANNEL_NAME])
17
call_async(q, conn, 'RequestChannel', 'org.freedesktop.Telepathy.Channel.Type.Text', 2, room_handles[0], True)
17
room_handles = conn.RequestHandles(HT_ROOM, [CHANNEL_NAME])
18
call_async(q, conn, 'RequestChannel', CHANNEL_TYPE_TEXT, HT_ROOM,
19
room_handles[0], True)
19
21
ret = q.expect('dbus-return', method='RequestChannel')
20
22
q.expect('dbus-signal', signal='MembersChanged')
21
23
chan = bus.get_object(conn.bus_name, ret.value[0])
23
text_chan = dbus.Interface(chan,
24
u'org.freedesktop.Telepathy.Channel.Type.Text')
25
text_chan = dbus.Interface(chan, CHANNEL_TYPE_TEXT)
26
27
# send a whole bunch of messages in a row and make sure they get delivered
27
28
# in the proper order
30
for i in range(NUM_MESSAGES):
29
31
call_async(q, text_chan, 'Send', 0, str(i))
32
q.expect('irc-privmsg', data={'message':str(i),'recipient':CHANNEL_NAME})
33
for i in range(NUM_MESSAGES):
34
message = q.expect('stream-PRIVMSG')
35
assert message.data[0] == CHANNEL_NAME
36
assert message.data[1].rstrip('\r\n') == str(i)
34
38
call_async(q, conn, 'Disconnect')