2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
8
from zope.interface import implements, Interface
10
from twisted.protocols import basic
11
from twisted.internet import protocol
12
from twisted.python import log
14
from twisted.cred import error
15
from twisted.cred import portal
16
from twisted.cred import checkers
17
from twisted.cred import credentials
19
class IProtocolUser(Interface):
21
"""Return a list of privileges this user has."""
24
"""Cleanup per-login resources allocated to this avatar"""
27
implements(IProtocolUser)
29
def getPrivileges(self):
33
print "Cleaning up anonymous user resources"
36
implements(IProtocolUser)
38
def getPrivileges(self):
39
return [1, 2, 3, 5, 6]
42
print "Cleaning up regular user resources"
45
implements(IProtocolUser)
47
def getPrivileges(self):
51
print "Cleaning up administrator resources"
53
class Protocol(basic.LineReceiver):
59
def connectionMade(self):
60
self.sendLine("Login with USER <name> followed by PASS <password> or ANON")
61
self.sendLine("Check privileges with PRIVS")
63
def connectionLost(self, reason):
69
def lineReceived(self, line):
70
f = getattr(self, 'cmd_' + line.upper().split()[0])
75
self.sendLine("Wrong number of arguments.")
77
self.sendLine("Server error (probably your fault)")
81
self.portal.login(credentials.Anonymous(), None, IProtocolUser
82
).addCallbacks(self._cbLogin, self._ebLogin
85
self.sendLine("DENIED")
87
def cmd_USER(self, name):
89
self.sendLine("Alright. Now PASS?")
91
def cmd_PASS(self, password):
93
self.sendLine("USER required before PASS")
97
credentials.UsernamePassword(self.user, password),
100
).addCallbacks(self._cbLogin, self._ebLogin
103
self.sendLine("DENIED")
106
self.sendLine("You have the following privileges: ")
107
self.sendLine(" ".join(map(str, self.avatar.getPrivileges())))
109
def _cbLogin(self, (interface, avatar, logout)):
110
assert interface is IProtocolUser
113
self.sendLine("Login successful. Available commands: PRIVS")
115
def _ebLogin(self, failure):
116
failure.trap(error.UnauthorizedLogin)
117
self.sendLine("Login denied! Go away.")
119
class ServerFactory(protocol.ServerFactory):
122
def __init__(self, portal):
125
def buildProtocol(self, addr):
126
p = protocol.ServerFactory.buildProtocol(self, addr)
127
p.portal = self.portal
131
implements(portal.IRealm)
133
def requestAvatar(self, avatarId, mind, *interfaces):
134
if IProtocolUser in interfaces:
135
if avatarId == checkers.ANONYMOUS:
137
elif avatarId.isupper():
138
# Capitalized usernames are administrators.
142
return IProtocolUser, av, av.logout
143
raise NotImplementedError("Only IProtocolUser interface is supported by this realm")
148
c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
149
c.addUser("auser", "thepass")
150
c.addUser("SECONDUSER", "secret")
152
p.registerChecker(checkers.AllowAnonymousAccess())
156
log.startLogging(sys.stdout)
158
from twisted.internet import reactor
159
reactor.listenTCP(4738, f)
162
if __name__ == '__main__':