~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/mail/test/pop3testserver.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
# -*- test-case-name: twisted.mail.test.test_pop3client -*-
 
3
 
 
4
from twisted.internet.protocol import Factory
 
5
from twisted.protocols import basic
 
6
from twisted.internet import reactor
 
7
import sys, time
 
8
 
 
9
USER = "test"
 
10
PASS = "twisted"
 
11
 
 
12
PORT = 1100
 
13
 
 
14
SSL_SUPPORT = True
 
15
UIDL_SUPPORT = True
 
16
INVALID_SERVER_RESPONSE = False
 
17
INVALID_CAPABILITY_RESPONSE = False
 
18
INVALID_LOGIN_RESPONSE = False
 
19
DENY_CONNECTION = False
 
20
DROP_CONNECTION = False
 
21
BAD_TLS_RESPONSE = False
 
22
TIMEOUT_RESPONSE = False
 
23
TIMEOUT_DEFERRED = False
 
24
SLOW_GREETING = False
 
25
 
 
26
"""Commands"""
 
27
CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
 
28
 
 
29
CAPABILITIES = [
 
30
"TOP",
 
31
"LOGIN-DELAY 180",
 
32
"USER",
 
33
"SASL LOGIN"
 
34
]
 
35
 
 
36
CAPABILITIES_SSL = "STLS"
 
37
CAPABILITIES_UIDL = "UIDL"
 
38
 
 
39
 
 
40
INVALID_RESPONSE = "-ERR Unknown request"
 
41
VALID_RESPONSE = "+OK Command Completed"
 
42
AUTH_DECLINED = "-ERR LOGIN failed"
 
43
AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
 
44
TLS_ERROR = "-ERR server side error start TLS handshake"
 
45
LOGOUT_COMPLETE = "+OK quit completed"
 
46
NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
 
47
STAT = "+OK 0 0"
 
48
UIDL = "+OK Unique-ID listing follows\r\n."
 
49
LIST = "+OK Mailbox scan listing follows\r\n."
 
50
CAP_START = "+OK Capability list follows:"
 
51
 
 
52
 
 
53
class POP3TestServer(basic.LineReceiver):
 
54
    def __init__(self, contextFactory = None):
 
55
        self.loggedIn = False
 
56
        self.caps = None
 
57
        self.tmpUser = None
 
58
        self.ctx = contextFactory
 
59
 
 
60
    def sendSTATResp(self, req):
 
61
        self.sendLine(STAT)
 
62
 
 
63
    def sendUIDLResp(self, req):
 
64
        self.sendLine(UIDL)
 
65
 
 
66
    def sendLISTResp(self, req):
 
67
        self.sendLine(LIST)
 
68
 
 
69
    def sendCapabilities(self):
 
70
        if self.caps is None:
 
71
            self.caps = [CAP_START]
 
72
 
 
73
        if UIDL_SUPPORT:
 
74
            self.caps.append(CAPABILITIES_UIDL)
 
75
 
 
76
        if SSL_SUPPORT:
 
77
            self.caps.append(CAPABILITIES_SSL)
 
78
 
 
79
        for cap in CAPABILITIES:
 
80
            self.caps.append(cap)
 
81
        resp = '\r\n'.join(self.caps)
 
82
        resp += "\r\n."
 
83
 
 
84
        self.sendLine(resp)
 
85
 
 
86
 
 
87
    def connectionMade(self):
 
88
        if DENY_CONNECTION:
 
89
            self.disconnect()
 
90
            return
 
91
 
 
92
        if SLOW_GREETING:
 
93
            reactor.callLater(20, self.sendGreeting)
 
94
 
 
95
        else:
 
96
            self.sendGreeting()
 
97
 
 
98
    def sendGreeting(self):
 
99
        self.sendLine(CONNECTION_MADE)
 
100
 
 
101
    def lineReceived(self, line):
 
102
        """Error Conditions"""
 
103
 
 
104
        uline = line.upper()
 
105
        find = lambda s: uline.find(s) != -1
 
106
 
 
107
        if TIMEOUT_RESPONSE:
 
108
            # Do not respond to clients request
 
109
            return
 
110
 
 
111
        if DROP_CONNECTION:
 
112
            self.disconnect()
 
113
            return
 
114
 
 
115
        elif find("CAPA"):
 
116
            if INVALID_CAPABILITY_RESPONSE:
 
117
                self.sendLine(INVALID_RESPONSE)
 
118
            else:
 
119
                self.sendCapabilities()
 
120
 
 
121
        elif find("STLS") and SSL_SUPPORT:
 
122
            self.startTLS()
 
123
 
 
124
        elif find("USER"):
 
125
            if INVALID_LOGIN_RESPONSE:
 
126
                self.sendLine(INVALID_RESPONSE)
 
127
                return
 
128
 
 
129
            resp = None
 
130
            try:
 
131
                self.tmpUser = line.split(" ")[1]
 
132
                resp = VALID_RESPONSE
 
133
            except:
 
134
                resp = AUTH_DECLINED
 
135
 
 
136
            self.sendLine(resp)
 
137
 
 
138
        elif find("PASS"):
 
139
            resp = None
 
140
            try:
 
141
                pwd = line.split(" ")[1]
 
142
 
 
143
                if self.tmpUser is None or pwd is None:
 
144
                    resp = AUTH_DECLINED
 
145
                elif self.tmpUser == USER and pwd == PASS:
 
146
                    resp = AUTH_ACCEPTED
 
147
                    self.loggedIn = True
 
148
                else:
 
149
                    resp = AUTH_DECLINED
 
150
            except:
 
151
                resp = AUTH_DECLINED
 
152
 
 
153
            self.sendLine(resp)
 
154
 
 
155
        elif find("QUIT"):
 
156
            self.loggedIn = False
 
157
            self.sendLine(LOGOUT_COMPLETE)
 
158
            self.disconnect()
 
159
 
 
160
        elif INVALID_SERVER_RESPONSE:
 
161
            self.sendLine(INVALID_RESPONSE)
 
162
 
 
163
        elif not self.loggedIn:
 
164
            self.sendLine(NOT_LOGGED_IN)
 
165
 
 
166
        elif find("NOOP"):
 
167
            self.sendLine(VALID_RESPONSE)
 
168
 
 
169
        elif find("STAT"):
 
170
            if TIMEOUT_DEFERRED:
 
171
                return
 
172
            self.sendLine(STAT)
 
173
 
 
174
        elif find("LIST"):
 
175
            if TIMEOUT_DEFERRED:
 
176
                return
 
177
            self.sendLine(LIST)
 
178
 
 
179
        elif find("UIDL"):
 
180
            if TIMEOUT_DEFERRED:
 
181
                return
 
182
            elif not UIDL_SUPPORT:
 
183
                self.sendLine(INVALID_RESPONSE)
 
184
                return
 
185
 
 
186
            self.sendLine(UIDL)
 
187
 
 
188
    def startTLS(self):
 
189
        if self.ctx is None:
 
190
            self.getContext()
 
191
 
 
192
        if SSL_SUPPORT and self.ctx is not None:
 
193
            self.sendLine('+OK Begin TLS negotiation now')
 
194
            self.transport.startTLS(self.ctx)
 
195
        else:
 
196
            self.sendLine('-ERR TLS not available')
 
197
 
 
198
    def disconnect(self):
 
199
        self.transport.loseConnection()
 
200
 
 
201
    def getContext(self):
 
202
        try:
 
203
            from twisted.internet import ssl
 
204
        except ImportError:
 
205
           self.ctx = None
 
206
        else:
 
207
            self.ctx = ssl.ClientContextFactory()
 
208
            self.ctx.method = ssl.SSL.TLSv1_METHOD
 
209
 
 
210
 
 
211
usage = """popServer.py [arg] (default is Standard POP Server with no messages)
 
212
no_ssl  - Start with no SSL support
 
213
no_uidl - Start with no UIDL support
 
214
bad_resp - Send a non-RFC compliant response to the Client
 
215
bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
 
216
bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
 
217
deny - Deny the connection
 
218
drop - Drop the connection after sending the greeting
 
219
bad_tls - Send a bad response to a STARTTLS
 
220
timeout - Do not return a response to a Client request
 
221
to_deferred - Do not return a response on a 'Select' request. This
 
222
              will test Deferred callback handling
 
223
slow - Wait 20 seconds after the connection is made to return a Server Greeting
 
224
"""
 
225
 
 
226
def printMessage(msg):
 
227
    print "Server Starting in %s mode" % msg
 
228
 
 
229
def processArg(arg):
 
230
 
 
231
    if arg.lower() == 'no_ssl':
 
232
        global SSL_SUPPORT
 
233
        SSL_SUPPORT = False
 
234
        printMessage("NON-SSL")
 
235
 
 
236
    elif arg.lower() == 'no_uidl':
 
237
        global UIDL_SUPPORT
 
238
        UIDL_SUPPORT = False
 
239
        printMessage("NON-UIDL")
 
240
 
 
241
    elif arg.lower() == 'bad_resp':
 
242
        global INVALID_SERVER_RESPONSE
 
243
        INVALID_SERVER_RESPONSE = True
 
244
        printMessage("Invalid Server Response")
 
245
 
 
246
    elif arg.lower() == 'bad_cap_resp':
 
247
        global INVALID_CAPABILITY_RESPONSE
 
248
        INVALID_CAPABILITY_RESPONSE = True
 
249
        printMessage("Invalid Capability Response")
 
250
 
 
251
    elif arg.lower() == 'bad_login_resp':
 
252
        global INVALID_LOGIN_RESPONSE
 
253
        INVALID_LOGIN_RESPONSE = True
 
254
        printMessage("Invalid Capability Response")
 
255
 
 
256
    elif arg.lower() == 'deny':
 
257
        global DENY_CONNECTION
 
258
        DENY_CONNECTION = True
 
259
        printMessage("Deny Connection")
 
260
 
 
261
    elif arg.lower() == 'drop':
 
262
        global DROP_CONNECTION
 
263
        DROP_CONNECTION = True
 
264
        printMessage("Drop Connection")
 
265
 
 
266
 
 
267
    elif arg.lower() == 'bad_tls':
 
268
        global BAD_TLS_RESPONSE
 
269
        BAD_TLS_RESPONSE = True
 
270
        printMessage("Bad TLS Response")
 
271
 
 
272
    elif arg.lower() == 'timeout':
 
273
        global TIMEOUT_RESPONSE
 
274
        TIMEOUT_RESPONSE = True
 
275
        printMessage("Timeout Response")
 
276
 
 
277
    elif arg.lower() == 'to_deferred':
 
278
        global TIMEOUT_DEFERRED
 
279
        TIMEOUT_DEFERRED = True
 
280
        printMessage("Timeout Deferred Response")
 
281
 
 
282
    elif arg.lower() == 'slow':
 
283
        global SLOW_GREETING
 
284
        SLOW_GREETING = True
 
285
        printMessage("Slow Greeting")
 
286
 
 
287
    elif arg.lower() == '--help':
 
288
        print usage
 
289
        sys.exit()
 
290
 
 
291
    else:
 
292
        print usage
 
293
        sys.exit()
 
294
 
 
295
def main():
 
296
 
 
297
    if len(sys.argv) < 2:
 
298
        printMessage("POP3 with no messages")
 
299
    else:
 
300
        args = sys.argv[1:]
 
301
 
 
302
        for arg in args:
 
303
            processArg(arg)
 
304
 
 
305
    f = Factory()
 
306
    f.protocol = POP3TestServer
 
307
    reactor.listenTCP(PORT, f)
 
308
    reactor.run()
 
309
 
 
310
if __name__ == '__main__':
 
311
    main()