~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/doc/core/examples/cred.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
 
 
7
import sys
 
8
from zope.interface import implements, Interface
 
9
 
 
10
from twisted.protocols import basic
 
11
from twisted.internet import protocol
 
12
from twisted.python import log
 
13
 
 
14
from twisted.cred import error
 
15
from twisted.cred import portal
 
16
from twisted.cred import checkers
 
17
from twisted.cred import credentials
 
18
 
 
19
class IProtocolUser(Interface):
 
20
    def getPrivileges():
 
21
        """Return a list of privileges this user has."""
 
22
 
 
23
    def logout():
 
24
        """Cleanup per-login resources allocated to this avatar"""
 
25
 
 
26
class AnonymousUser:
 
27
    implements(IProtocolUser)
 
28
    
 
29
    def getPrivileges(self):
 
30
        return [1, 2, 3]
 
31
 
 
32
    def logout(self):
 
33
        print "Cleaning up anonymous user resources"
 
34
 
 
35
class RegularUser:
 
36
    implements(IProtocolUser)
 
37
    
 
38
    def getPrivileges(self):
 
39
        return [1, 2, 3, 5, 6]
 
40
 
 
41
    def logout(self):
 
42
        print "Cleaning up regular user resources"
 
43
 
 
44
class Administrator:
 
45
    implements(IProtocolUser)
 
46
    
 
47
    def getPrivileges(self):
 
48
        return range(50)
 
49
 
 
50
    def logout(self):
 
51
        print "Cleaning up administrator resources"
 
52
 
 
53
class Protocol(basic.LineReceiver):
 
54
    user = None
 
55
    portal = None
 
56
    avatar = None
 
57
    logout = None
 
58
 
 
59
    def connectionMade(self):
 
60
        self.sendLine("Login with USER <name> followed by PASS <password> or ANON")
 
61
        self.sendLine("Check privileges with PRIVS")
 
62
 
 
63
    def connectionLost(self, reason):
 
64
        if self.logout:
 
65
            self.logout()
 
66
            self.avatar = None
 
67
            self.logout = None
 
68
    
 
69
    def lineReceived(self, line):
 
70
        f = getattr(self, 'cmd_' + line.upper().split()[0])
 
71
        if f:
 
72
            try:
 
73
                f(*line.split()[1:])
 
74
            except TypeError:
 
75
                self.sendLine("Wrong number of arguments.")
 
76
            except:
 
77
                self.sendLine("Server error (probably your fault)")
 
78
 
 
79
    def cmd_ANON(self):
 
80
        if self.portal:
 
81
            self.portal.login(credentials.Anonymous(), None, IProtocolUser
 
82
                ).addCallbacks(self._cbLogin, self._ebLogin
 
83
                )
 
84
        else:
 
85
            self.sendLine("DENIED")
 
86
    
 
87
    def cmd_USER(self, name):
 
88
        self.user = name
 
89
        self.sendLine("Alright.  Now PASS?")
 
90
    
 
91
    def cmd_PASS(self, password):
 
92
        if not self.user:
 
93
            self.sendLine("USER required before PASS")
 
94
        else:
 
95
            if self.portal:
 
96
                self.portal.login(
 
97
                    credentials.UsernamePassword(self.user, password),
 
98
                    None,
 
99
                    IProtocolUser
 
100
                ).addCallbacks(self._cbLogin, self._ebLogin
 
101
                )
 
102
            else:
 
103
                self.sendLine("DENIED")
 
104
 
 
105
    def cmd_PRIVS(self):
 
106
        self.sendLine("You have the following privileges: ")
 
107
        self.sendLine(" ".join(map(str, self.avatar.getPrivileges())))
 
108
 
 
109
    def _cbLogin(self, (interface, avatar, logout)):
 
110
        assert interface is IProtocolUser
 
111
        self.avatar = avatar
 
112
        self.logout = logout
 
113
        self.sendLine("Login successful.  Available commands: PRIVS")
 
114
    
 
115
    def _ebLogin(self, failure):
 
116
        failure.trap(error.UnauthorizedLogin)
 
117
        self.sendLine("Login denied!  Go away.")
 
118
 
 
119
class ServerFactory(protocol.ServerFactory):
 
120
    protocol = Protocol
 
121
    
 
122
    def __init__(self, portal):
 
123
        self.portal = portal
 
124
    
 
125
    def buildProtocol(self, addr):
 
126
        p = protocol.ServerFactory.buildProtocol(self, addr)
 
127
        p.portal = self.portal
 
128
        return p
 
129
 
 
130
class Realm:
 
131
    implements(portal.IRealm)
 
132
 
 
133
    def requestAvatar(self, avatarId, mind, *interfaces):
 
134
        if IProtocolUser in interfaces:
 
135
            if avatarId == checkers.ANONYMOUS:
 
136
                av = AnonymousUser()
 
137
            elif avatarId.isupper():
 
138
                # Capitalized usernames are administrators.
 
139
                av = Administrator()
 
140
            else:
 
141
                av = RegularUser()
 
142
            return IProtocolUser, av, av.logout
 
143
        raise NotImplementedError("Only IProtocolUser interface is supported by this realm")
 
144
 
 
145
def main():
 
146
    r = Realm()
 
147
    p = portal.Portal(r)
 
148
    c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
149
    c.addUser("auser", "thepass")
 
150
    c.addUser("SECONDUSER", "secret")
 
151
    p.registerChecker(c)
 
152
    p.registerChecker(checkers.AllowAnonymousAccess())
 
153
    
 
154
    f = ServerFactory(p)
 
155
 
 
156
    log.startLogging(sys.stdout)
 
157
 
 
158
    from twisted.internet import reactor
 
159
    reactor.listenTCP(4738, f)
 
160
    reactor.run()
 
161
 
 
162
if __name__ == '__main__':
 
163
    main()