1
# -*- test-case-name: axiom.test.test_userbase -*-
3
from twisted.cred.portal import IRealm
4
from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword
5
from twisted.cred.checkers import ICredentialsChecker, ANONYMOUS
6
from twisted.cred.error import UnauthorizedLogin
7
from twisted.python import log
9
from axiom.substore import SubStore
10
from axiom.item import Item
11
from axiom.attributes import text, bytes, integer, reference, AND
13
from zope.interface import implements, Interface
20
class BadCredentials(UnauthorizedLogin):
23
class NoSuchUser(UnauthorizedLogin):
26
class DuplicateUser(Exception):
31
class LoginAccount(Item):
35
username = text(indexed=True)
36
domain = text(indexed=True) # flipped using dflip, e.g. "com.divmod"
38
avatars = reference() # reference to a thing which can be adapted to
39
# implementations for application-level
40
# protocols. In general this is a reference to
41
# a SubStore because this is optimized for
42
# applications where per-user data is a
43
# substantial portion of the cost.
46
def __conform__(self, interface):
47
return interface(self.avatars, None)
49
class LoginSystem(Item):
50
implements(IRealm, ICredentialsChecker)
52
credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword)
55
typeName = 'login_system'
57
loginCount = integer()
58
failedLogins = integer()
60
def __init__(self, **kw):
61
super(LoginSystem, self).__init__(**kw)
66
self.store.powerUp(self, IRealm)
67
self.store.powerUp(self, ICredentialsChecker)
69
def accountByAddress(self, username, domain):
71
@type username: C{unicode} without NUL
72
@type domain: C{unicode} without NUL
74
for account in self.store.query(LoginAccount,
75
AND(LoginAccount.domain == dflip(domain),
76
LoginAccount.username == username)):
79
def addAccount(self, username, domain, password):
80
username = unicode(username)
81
domain = unicode(domain)
82
if self.accountByAddress(username, domain) is not None:
83
raise DuplicateUser(username, domain)
84
return LoginAccount(store=self.store,
88
avatars=SubStore(self.store,
89
('account', domain, username)),
92
def logoutFactory(self, obj):
93
return getattr(obj, 'logout', lambda: None)
95
def requestAvatar(self, avatarId, mind, *interfaces):
96
if avatarId is ANONYMOUS:
99
av = self.store.getItemByID(avatarId)
100
for interface in interfaces:
101
impl = interface(av, None)
104
return interface, impl, self.logoutFactory(impl)
105
raise NotImplementedError()
107
def requestAvatarId(self, credentials):
108
passwordSecure = IUsernameHashedPassword(credentials, None) is not None
109
# ^ need to do something with this. security warning perhaps?
110
username, domain = credentials.username.split('@', 1)
112
username = unicode(username)
113
domain = unicode(domain)
115
acct = self.accountByAddress(username, domain)
117
password = acct.password
118
if credentials.checkPassword(password):
121
self.failedLogins += 1
122
raise BadCredentials()