~soren/nova/iptables-security-groups

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_ident.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
"""
 
6
Test cases for twisted.protocols.ident module.
 
7
"""
 
8
 
 
9
import struct
 
10
 
 
11
from twisted.protocols import ident
 
12
from twisted.python import failure
 
13
from twisted.internet import error
 
14
from twisted.internet import defer
 
15
 
 
16
from twisted.trial import unittest
 
17
from twisted.test.proto_helpers import StringTransport
 
18
 
 
19
 
 
20
 
 
21
class ClassParserTestCase(unittest.TestCase):
 
22
    """
 
23
    Test parsing of ident responses.
 
24
    """
 
25
 
 
26
    def setUp(self):
 
27
        """
 
28
        Create a ident client used in tests.
 
29
        """
 
30
        self.client = ident.IdentClient()
 
31
 
 
32
 
 
33
    def test_indentError(self):
 
34
        """
 
35
        'UNKNOWN-ERROR' error should map to the L{ident.IdentError} exception.
 
36
        """
 
37
        d = defer.Deferred()
 
38
        self.client.queries.append((d, 123, 456))
 
39
        self.client.lineReceived('123, 456 : ERROR : UNKNOWN-ERROR')
 
40
        return self.assertFailure(d, ident.IdentError)
 
41
 
 
42
 
 
43
    def test_noUSerError(self):
 
44
        """
 
45
        'NO-USER' error should map to the L{ident.NoUser} exception.
 
46
        """
 
47
        d = defer.Deferred()
 
48
        self.client.queries.append((d, 234, 456))
 
49
        self.client.lineReceived('234, 456 : ERROR : NO-USER')
 
50
        return self.assertFailure(d, ident.NoUser)
 
51
 
 
52
 
 
53
    def test_invalidPortError(self):
 
54
        """
 
55
        'INVALID-PORT' error should map to the L{ident.InvalidPort} exception.
 
56
        """
 
57
        d = defer.Deferred()
 
58
        self.client.queries.append((d, 345, 567))
 
59
        self.client.lineReceived('345, 567 :  ERROR : INVALID-PORT')
 
60
        return self.assertFailure(d, ident.InvalidPort)
 
61
 
 
62
 
 
63
    def test_hiddenUserError(self):
 
64
        """
 
65
        'HIDDEN-USER' error should map to the L{ident.HiddenUser} exception.
 
66
        """
 
67
        d = defer.Deferred()
 
68
        self.client.queries.append((d, 567, 789))
 
69
        self.client.lineReceived('567, 789 : ERROR : HIDDEN-USER')
 
70
        return self.assertFailure(d, ident.HiddenUser)
 
71
 
 
72
 
 
73
    def test_lostConnection(self):
 
74
        """
 
75
        A pending query which failed because of a ConnectionLost should
 
76
        receive an L{ident.IdentError}.
 
77
        """
 
78
        d = defer.Deferred()
 
79
        self.client.queries.append((d, 765, 432))
 
80
        self.client.connectionLost(failure.Failure(error.ConnectionLost()))
 
81
        return self.assertFailure(d, ident.IdentError)
 
82
 
 
83
 
 
84
 
 
85
class TestIdentServer(ident.IdentServer):
 
86
    def lookup(self, serverAddress, clientAddress):
 
87
        return self.resultValue
 
88
 
 
89
 
 
90
class TestErrorIdentServer(ident.IdentServer):
 
91
    def lookup(self, serverAddress, clientAddress):
 
92
        raise self.exceptionType()
 
93
 
 
94
 
 
95
class NewException(RuntimeError):
 
96
    pass
 
97
 
 
98
 
 
99
class ServerParserTestCase(unittest.TestCase):
 
100
    def testErrors(self):
 
101
        p = TestErrorIdentServer()
 
102
        p.makeConnection(StringTransport())
 
103
        L = []
 
104
        p.sendLine = L.append
 
105
 
 
106
        p.exceptionType = ident.IdentError
 
107
        p.lineReceived('123, 345')
 
108
        self.assertEquals(L[0], '123, 345 : ERROR : UNKNOWN-ERROR')
 
109
 
 
110
        p.exceptionType = ident.NoUser
 
111
        p.lineReceived('432, 210')
 
112
        self.assertEquals(L[1], '432, 210 : ERROR : NO-USER')
 
113
 
 
114
        p.exceptionType = ident.InvalidPort
 
115
        p.lineReceived('987, 654')
 
116
        self.assertEquals(L[2], '987, 654 : ERROR : INVALID-PORT')
 
117
 
 
118
        p.exceptionType = ident.HiddenUser
 
119
        p.lineReceived('756, 827')
 
120
        self.assertEquals(L[3], '756, 827 : ERROR : HIDDEN-USER')
 
121
 
 
122
        p.exceptionType = NewException
 
123
        p.lineReceived('987, 789')
 
124
        self.assertEquals(L[4], '987, 789 : ERROR : UNKNOWN-ERROR')
 
125
        errs = self.flushLoggedErrors(NewException)
 
126
        self.assertEquals(len(errs), 1)
 
127
 
 
128
        for port in -1, 0, 65536, 65537:
 
129
            del L[:]
 
130
            p.lineReceived('%d, 5' % (port,))
 
131
            p.lineReceived('5, %d' % (port,))
 
132
            self.assertEquals(
 
133
                L, ['%d, 5 : ERROR : INVALID-PORT' % (port,),
 
134
                    '5, %d : ERROR : INVALID-PORT' % (port,)])
 
135
 
 
136
    def testSuccess(self):
 
137
        p = TestIdentServer()
 
138
        p.makeConnection(StringTransport())
 
139
        L = []
 
140
        p.sendLine = L.append
 
141
 
 
142
        p.resultValue = ('SYS', 'USER')
 
143
        p.lineReceived('123, 456')
 
144
        self.assertEquals(L[0], '123, 456 : USERID : SYS : USER')
 
145
 
 
146
 
 
147
if struct.pack('=L', 1)[0] == '\x01':
 
148
    _addr1 = '0100007F'
 
149
    _addr2 = '04030201'
 
150
else:
 
151
    _addr1 = '7F000001'
 
152
    _addr2 = '01020304'
 
153
 
 
154
 
 
155
class ProcMixinTestCase(unittest.TestCase):
 
156
    line = ('4: %s:0019 %s:02FA 0A 00000000:00000000 '
 
157
            '00:00000000 00000000     0        0 10927 1 f72a5b80 '
 
158
            '3000 0 0 2 -1') % (_addr1, _addr2)
 
159
 
 
160
    def testDottedQuadFromHexString(self):
 
161
        p = ident.ProcServerMixin()
 
162
        self.assertEquals(p.dottedQuadFromHexString(_addr1), '127.0.0.1')
 
163
 
 
164
    def testUnpackAddress(self):
 
165
        p = ident.ProcServerMixin()
 
166
        self.assertEquals(p.unpackAddress(_addr1 + ':0277'),
 
167
                          ('127.0.0.1', 631))
 
168
 
 
169
    def testLineParser(self):
 
170
        p = ident.ProcServerMixin()
 
171
        self.assertEquals(
 
172
            p.parseLine(self.line),
 
173
            (('127.0.0.1', 25), ('1.2.3.4', 762), 0))
 
174
 
 
175
    def testExistingAddress(self):
 
176
        username = []
 
177
        p = ident.ProcServerMixin()
 
178
        p.entries = lambda: iter([self.line])
 
179
        p.getUsername = lambda uid: (username.append(uid), 'root')[1]
 
180
        self.assertEquals(
 
181
            p.lookup(('127.0.0.1', 25), ('1.2.3.4', 762)),
 
182
            (p.SYSTEM_NAME, 'root'))
 
183
        self.assertEquals(username, [0])
 
184
 
 
185
    def testNonExistingAddress(self):
 
186
        p = ident.ProcServerMixin()
 
187
        p.entries = lambda: iter([self.line])
 
188
        self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 26),
 
189
                                                  ('1.2.3.4', 762))
 
190
        self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 25),
 
191
                                                  ('1.2.3.5', 762))
 
192
        self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 25),
 
193
                                                  ('1.2.3.4', 763))
 
194