~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/mail/test/pop3testserver.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# -*- test-case-name: twisted.mail.test.test_pop3client -*-
 
3
 
 
4
# Copyright (c) 2009 Twisted Matrix Laboratories.
 
5
# See LICENSE for details.
 
6
 
 
7
from twisted.internet.protocol import Factory
 
8
from twisted.protocols import basic
 
9
from twisted.internet import reactor
 
10
import sys, time
 
11
 
 
12
USER = "test"
 
13
PASS = "twisted"
 
14
 
 
15
PORT = 1100
 
16
 
 
17
SSL_SUPPORT = True
 
18
UIDL_SUPPORT = True
 
19
INVALID_SERVER_RESPONSE = False
 
20
INVALID_CAPABILITY_RESPONSE = False
 
21
INVALID_LOGIN_RESPONSE = False
 
22
DENY_CONNECTION = False
 
23
DROP_CONNECTION = False
 
24
BAD_TLS_RESPONSE = False
 
25
TIMEOUT_RESPONSE = False
 
26
TIMEOUT_DEFERRED = False
 
27
SLOW_GREETING = False
 
28
 
 
29
"""Commands"""
 
30
CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
 
31
 
 
32
CAPABILITIES = [
 
33
"TOP",
 
34
"LOGIN-DELAY 180",
 
35
"USER",
 
36
"SASL LOGIN"
 
37
]
 
38
 
 
39
CAPABILITIES_SSL = "STLS"
 
40
CAPABILITIES_UIDL = "UIDL"
 
41
 
 
42
 
 
43
INVALID_RESPONSE = "-ERR Unknown request"
 
44
VALID_RESPONSE = "+OK Command Completed"
 
45
AUTH_DECLINED = "-ERR LOGIN failed"
 
46
AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
 
47
TLS_ERROR = "-ERR server side error start TLS handshake"
 
48
LOGOUT_COMPLETE = "+OK quit completed"
 
49
NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
 
50
STAT = "+OK 0 0"
 
51
UIDL = "+OK Unique-ID listing follows\r\n."
 
52
LIST = "+OK Mailbox scan listing follows\r\n."
 
53
CAP_START = "+OK Capability list follows:"
 
54
 
 
55
 
 
56
class POP3TestServer(basic.LineReceiver):
 
57
    def __init__(self, contextFactory = None):
 
58
        self.loggedIn = False
 
59
        self.caps = None
 
60
        self.tmpUser = None
 
61
        self.ctx = contextFactory
 
62
 
 
63
    def sendSTATResp(self, req):
 
64
        self.sendLine(STAT)
 
65
 
 
66
    def sendUIDLResp(self, req):
 
67
        self.sendLine(UIDL)
 
68
 
 
69
    def sendLISTResp(self, req):
 
70
        self.sendLine(LIST)
 
71
 
 
72
    def sendCapabilities(self):
 
73
        if self.caps is None:
 
74
            self.caps = [CAP_START]
 
75
 
 
76
        if UIDL_SUPPORT:
 
77
            self.caps.append(CAPABILITIES_UIDL)
 
78
 
 
79
        if SSL_SUPPORT:
 
80
            self.caps.append(CAPABILITIES_SSL)
 
81
 
 
82
        for cap in CAPABILITIES:
 
83
            self.caps.append(cap)
 
84
        resp = '\r\n'.join(self.caps)
 
85
        resp += "\r\n."
 
86
 
 
87
        self.sendLine(resp)
 
88
 
 
89
 
 
90
    def connectionMade(self):
 
91
        if DENY_CONNECTION:
 
92
            self.disconnect()
 
93
            return
 
94
 
 
95
        if SLOW_GREETING:
 
96
            reactor.callLater(20, self.sendGreeting)
 
97
 
 
98
        else:
 
99
            self.sendGreeting()
 
100
 
 
101
    def sendGreeting(self):
 
102
        self.sendLine(CONNECTION_MADE)
 
103
 
 
104
    def lineReceived(self, line):
 
105
        """Error Conditions"""
 
106
 
 
107
        uline = line.upper()
 
108
        find = lambda s: uline.find(s) != -1
 
109
 
 
110
        if TIMEOUT_RESPONSE:
 
111
            # Do not respond to clients request
 
112
            return
 
113
 
 
114
        if DROP_CONNECTION:
 
115
            self.disconnect()
 
116
            return
 
117
 
 
118
        elif find("CAPA"):
 
119
            if INVALID_CAPABILITY_RESPONSE:
 
120
                self.sendLine(INVALID_RESPONSE)
 
121
            else:
 
122
                self.sendCapabilities()
 
123
 
 
124
        elif find("STLS") and SSL_SUPPORT:
 
125
            self.startTLS()
 
126
 
 
127
        elif find("USER"):
 
128
            if INVALID_LOGIN_RESPONSE:
 
129
                self.sendLine(INVALID_RESPONSE)
 
130
                return
 
131
 
 
132
            resp = None
 
133
            try:
 
134
                self.tmpUser = line.split(" ")[1]
 
135
                resp = VALID_RESPONSE
 
136
            except:
 
137
                resp = AUTH_DECLINED
 
138
 
 
139
            self.sendLine(resp)
 
140
 
 
141
        elif find("PASS"):
 
142
            resp = None
 
143
            try:
 
144
                pwd = line.split(" ")[1]
 
145
 
 
146
                if self.tmpUser is None or pwd is None:
 
147
                    resp = AUTH_DECLINED
 
148
                elif self.tmpUser == USER and pwd == PASS:
 
149
                    resp = AUTH_ACCEPTED
 
150
                    self.loggedIn = True
 
151
                else:
 
152
                    resp = AUTH_DECLINED
 
153
            except:
 
154
                resp = AUTH_DECLINED
 
155
 
 
156
            self.sendLine(resp)
 
157
 
 
158
        elif find("QUIT"):
 
159
            self.loggedIn = False
 
160
            self.sendLine(LOGOUT_COMPLETE)
 
161
            self.disconnect()
 
162
 
 
163
        elif INVALID_SERVER_RESPONSE:
 
164
            self.sendLine(INVALID_RESPONSE)
 
165
 
 
166
        elif not self.loggedIn:
 
167
            self.sendLine(NOT_LOGGED_IN)
 
168
 
 
169
        elif find("NOOP"):
 
170
            self.sendLine(VALID_RESPONSE)
 
171
 
 
172
        elif find("STAT"):
 
173
            if TIMEOUT_DEFERRED:
 
174
                return
 
175
            self.sendLine(STAT)
 
176
 
 
177
        elif find("LIST"):
 
178
            if TIMEOUT_DEFERRED:
 
179
                return
 
180
            self.sendLine(LIST)
 
181
 
 
182
        elif find("UIDL"):
 
183
            if TIMEOUT_DEFERRED:
 
184
                return
 
185
            elif not UIDL_SUPPORT:
 
186
                self.sendLine(INVALID_RESPONSE)
 
187
                return
 
188
 
 
189
            self.sendLine(UIDL)
 
190
 
 
191
    def startTLS(self):
 
192
        if self.ctx is None:
 
193
            self.getContext()
 
194
 
 
195
        if SSL_SUPPORT and self.ctx is not None:
 
196
            self.sendLine('+OK Begin TLS negotiation now')
 
197
            self.transport.startTLS(self.ctx)
 
198
        else:
 
199
            self.sendLine('-ERR TLS not available')
 
200
 
 
201
    def disconnect(self):
 
202
        self.transport.loseConnection()
 
203
 
 
204
    def getContext(self):
 
205
        try:
 
206
            from twisted.internet import ssl
 
207
        except ImportError:
 
208
           self.ctx = None
 
209
        else:
 
210
            self.ctx = ssl.ClientContextFactory()
 
211
            self.ctx.method = ssl.SSL.TLSv1_METHOD
 
212
 
 
213
 
 
214
usage = """popServer.py [arg] (default is Standard POP Server with no messages)
 
215
no_ssl  - Start with no SSL support
 
216
no_uidl - Start with no UIDL support
 
217
bad_resp - Send a non-RFC compliant response to the Client
 
218
bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
 
219
bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
 
220
deny - Deny the connection
 
221
drop - Drop the connection after sending the greeting
 
222
bad_tls - Send a bad response to a STARTTLS
 
223
timeout - Do not return a response to a Client request
 
224
to_deferred - Do not return a response on a 'Select' request. This
 
225
              will test Deferred callback handling
 
226
slow - Wait 20 seconds after the connection is made to return a Server Greeting
 
227
"""
 
228
 
 
229
def printMessage(msg):
 
230
    print "Server Starting in %s mode" % msg
 
231
 
 
232
def processArg(arg):
 
233
 
 
234
    if arg.lower() == 'no_ssl':
 
235
        global SSL_SUPPORT
 
236
        SSL_SUPPORT = False
 
237
        printMessage("NON-SSL")
 
238
 
 
239
    elif arg.lower() == 'no_uidl':
 
240
        global UIDL_SUPPORT
 
241
        UIDL_SUPPORT = False
 
242
        printMessage("NON-UIDL")
 
243
 
 
244
    elif arg.lower() == 'bad_resp':
 
245
        global INVALID_SERVER_RESPONSE
 
246
        INVALID_SERVER_RESPONSE = True
 
247
        printMessage("Invalid Server Response")
 
248
 
 
249
    elif arg.lower() == 'bad_cap_resp':
 
250
        global INVALID_CAPABILITY_RESPONSE
 
251
        INVALID_CAPABILITY_RESPONSE = True
 
252
        printMessage("Invalid Capability Response")
 
253
 
 
254
    elif arg.lower() == 'bad_login_resp':
 
255
        global INVALID_LOGIN_RESPONSE
 
256
        INVALID_LOGIN_RESPONSE = True
 
257
        printMessage("Invalid Capability Response")
 
258
 
 
259
    elif arg.lower() == 'deny':
 
260
        global DENY_CONNECTION
 
261
        DENY_CONNECTION = True
 
262
        printMessage("Deny Connection")
 
263
 
 
264
    elif arg.lower() == 'drop':
 
265
        global DROP_CONNECTION
 
266
        DROP_CONNECTION = True
 
267
        printMessage("Drop Connection")
 
268
 
 
269
 
 
270
    elif arg.lower() == 'bad_tls':
 
271
        global BAD_TLS_RESPONSE
 
272
        BAD_TLS_RESPONSE = True
 
273
        printMessage("Bad TLS Response")
 
274
 
 
275
    elif arg.lower() == 'timeout':
 
276
        global TIMEOUT_RESPONSE
 
277
        TIMEOUT_RESPONSE = True
 
278
        printMessage("Timeout Response")
 
279
 
 
280
    elif arg.lower() == 'to_deferred':
 
281
        global TIMEOUT_DEFERRED
 
282
        TIMEOUT_DEFERRED = True
 
283
        printMessage("Timeout Deferred Response")
 
284
 
 
285
    elif arg.lower() == 'slow':
 
286
        global SLOW_GREETING
 
287
        SLOW_GREETING = True
 
288
        printMessage("Slow Greeting")
 
289
 
 
290
    elif arg.lower() == '--help':
 
291
        print usage
 
292
        sys.exit()
 
293
 
 
294
    else:
 
295
        print usage
 
296
        sys.exit()
 
297
 
 
298
def main():
 
299
 
 
300
    if len(sys.argv) < 2:
 
301
        printMessage("POP3 with no messages")
 
302
    else:
 
303
        args = sys.argv[1:]
 
304
 
 
305
        for arg in args:
 
306
            processArg(arg)
 
307
 
 
308
    f = Factory()
 
309
    f.protocol = POP3TestServer
 
310
    reactor.listenTCP(PORT, f)
 
311
    reactor.run()
 
312
 
 
313
if __name__ == '__main__':
 
314
    main()