1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
# Copyright (C) 2004 Anthony Baxter
"""Tests for shtoom.doug
These tests test doug DTMF send and receive
"""
from twisted.trial import unittest, util
from twisted.internet import defer, reactor
from shtoom.doug import VoiceApp
from shtoom.app.doug import DougApplication
from shtoom.doug.events import *
from shtoom.i18n import install as i18n_install
i18n_install()
class TestDougApplication(DougApplication):
_trial_def = None
needLogging = False
configFileName = None
def ringBack(self):
# ring ring
pass
def acceptErrors(self, cookie, error):
#print "cookie %s got error %r"%(cookie, error)
if self._trial_def is not None:
d, self._trial_def = self._trial_def, None
reactor.callLater(0, d.errback, error)
def acceptResults(self, cookie, result):
#print "cookie %s got result %r"%(cookie, result)
if self._trial_def is not None:
d, self._trial_def = self._trial_def, None
reactor.callLater(0, d.callback, result)
class NullApp(VoiceApp):
""" This application does nothing but return """
def __start__(self):
#print "started!"
self.returnResult('hello world')
class SimpleListenApp(VoiceApp):
""" This application listens, connects a call, then either disconnects
it, or waits for the other end to disconnect it
"""
localDisconnect = True
def __start__(self):
return ( ( CallStartedEvent, self.started), )
def started(self, evt):
evt.leg.answerCall(self)
return ( (CallAnsweredEvent, self.callAnswered),
(Event, self.unknownEvent),
)
def callAnswered(self, evt):
if self.localDisconnect:
reactor.callLater(0, evt.leg.hangupCall)
return ( (CallEndedEvent, self.done),
(Event, self.unknownEvent),
)
def done(self, evt):
self.returnResult('hello world')
def unknownEvent(self, evt):
self.returnResult(repr(evt))
class SimpleCallApp(VoiceApp):
""" This application places a call, then then either disconnects
it, or waits for the other end to disconnect it
"""
callURL = None
localDisconnect = False
def __start__(self):
return ( ( CallStartedEvent, self.started), )
def started(self, evt):
self.placeCall(self.callURL, 'sip:test@127.0.0.1')
return ( (CallAnsweredEvent, self.callAnswered),
(CallRejectedEvent, self.callFailed),
(Event, self.unknownEvent),
)
def callAnswered(self, evt):
if self.localDisconnect:
reactor.callLater(0, evt.leg.hangupCall)
return ( (CallEndedEvent, self.done),
(Event, self.unknownEvent),
)
def done(self, evt):
self.returnResult('completed')
def unknownEvent(self, evt):
self.returnResult(repr(evt))
def callFailed(self, evt):
self.returnResult('failed')
class AudioSendingApp(VoiceApp):
""" This test application simply connects to a remote address and sends
an audio file
"""
class DTMFReceivingApp(VoiceApp):
""" This test application listens for a connection, then accepts it
and returns any DTMF that is sent
"""
def getDTMFAudioFile():
from twisted.python.util import sibpath
return sibpath(__file__, 'dtmftestfile.raw')
class Saver:
val = None
def save(self, v):
self.val = v
return v
class DougDTMFTest(unittest.TestCase):
def setUp(self):
import shtoom.nat
shtoom.nat._forceMapper(shtoom.nat.NullMapper())
shtoom.nat.clearCache()
def tearDown(self):
import shtoom.nat
shtoom.nat._forceMapper(None)
shtoom.nat.clearCache()
def test_nullDougApp(self):
app = TestDougApplication(NullApp)
app.configFileName = None
app.boot(args=['--listenport', '0'])
app._voiceappArgs = {}
d = app._trial_def = defer.Deferred()
s = Saver()
d.addCallback(s.save)
# We explicitly start the voiceapp here
reactor.callLater(0, app.startVoiceApp)
util.wait(d)
self.assertEquals(s.val, 'hello world')
app.stopSIP()
def test_callAndStartup(self):
lapp = TestDougApplication(NullApp)
lapp.boot(args=['--listenport', '0'])
# Now get the port number we actually listened on
port = lapp.sipListener.getHost().port
lapp._voiceappArgs = {}
d = lapp._trial_def = defer.Deferred()
s = Saver()
d.addCallback(s.save)
capp = TestDougApplication(SimpleCallApp)
capp.boot(args=['--listenport', '0'])
capp._voiceappArgs = {'callURL': 'sip:foo@127.0.0.1:%d'%port }
reactor.callLater(0, capp.startVoiceApp)
util.wait(d)
self.assertEquals(s.val, 'hello world')
lapp.stopSIP()
capp.stopSIP()
def test_callStartupAndConnecting(self):
lapp = TestDougApplication(SimpleListenApp)
lapp.boot(args=['--listenport', '0'])
# Now get the port number we actually listened on
port = lapp.sipListener.getHost().port
lapp._voiceappArgs = {}
d1 = lapp._trial_def = defer.Deferred()
s1 = Saver()
d1.addCallback(s1.save)
capp = TestDougApplication(SimpleCallApp)
capp.boot(args=['--listenport', '0'])
capp._voiceappArgs = {'callURL': 'sip:foo@127.0.0.1:%d'%port }
d2 = capp._trial_def = defer.Deferred()
s2 = Saver()
d2.addCallback(s2.save)
reactor.callLater(0, capp.startVoiceApp)
util.wait(d1)
util.wait(d2)
self.assertEquals(s1.val, 'hello world')
self.assertEquals(s2.val, 'completed')
lapp.stopSIP()
capp.stopSIP()
|