~ubuntu-branches/ubuntu/trusty/txtorcon/trusty-proposed

« back to all changes in this revision

Viewing changes to txtorcon/test/test_util.py

  • Committer: Package Import Robot
  • Author(s): Jérémy Bobbio
  • Date: 2014-01-21 15:10:52 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20140121151052-r1dw06if9912ihbp
Tags: 0.9.1-1
* New upstream release.
* Update debian/watch and verify upstream signature
* Improve debian/README.source

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from twisted.trial import unittest
2
 
from twisted.internet import defer
3
 
from twisted.internet.endpoints import TCP4ServerEndpoint
4
 
from twisted.internet.interfaces import IProtocolFactory
5
 
from zope.interface import implements
6
 
 
7
 
from txtorcon.util import process_from_address, delete_file_or_tree, find_keywords, ip_from_int
8
 
 
9
 
import os
10
 
import tempfile
11
 
 
12
 
 
13
 
class FakeState:
14
 
    tor_pid = 0
15
 
 
16
 
 
17
 
class FakeProtocolFactory:
18
 
    implements(IProtocolFactory)
19
 
 
20
 
    def doStart(self):
21
 
        "IProtocolFactory API"
22
 
 
23
 
    def doStop(self):
24
 
        "IProtocolFactory API"
25
 
 
26
 
    def buildProtocol(self, addr):
27
 
        "IProtocolFactory API"
28
 
        return None
29
 
 
30
 
 
31
 
class TestIPFromInt(unittest.TestCase):
32
 
 
33
 
    def test_cast(self):
34
 
        self.assertEqual(ip_from_int(0x7f000001), '127.0.0.1')
35
 
 
36
 
 
37
 
class TestFindKeywords(unittest.TestCase):
38
 
 
39
 
    def test_filter(self):
40
 
        self.assertEqual(find_keywords("foo=bar $1234567890=routername baz=quux".split()),
41
 
                         {'foo': 'bar', 'baz': 'quux'})
42
 
 
43
 
 
44
 
class TestProcessFromUtil(unittest.TestCase):
45
 
 
46
 
    def setUp(self):
47
 
        self.fakestate = FakeState()
48
 
 
49
 
    def test_none(self):
50
 
        self.assertEqual(process_from_address(None, 80, self.fakestate), None)
51
 
 
52
 
    def test_internal(self):
53
 
        pfa = process_from_address('(Tor_internal)', 80, self.fakestate)
54
 
        # depends on whether you have psutil installed or not, and on
55
 
        # whether your system always has a PID 0 process...
56
 
        self.assertEqual(pfa, self.fakestate.tor_pid)
57
 
 
58
 
    @defer.inlineCallbacks
59
 
    def test_real_addr(self):
60
 
        ## FIXME should choose a port which definitely isn't used.
61
 
 
62
 
        ## it's apparently frowned upon to use the "real" reactor in
63
 
        ## tests, but I was using "nc" before, and I think this is
64
 
        ## preferable.
65
 
        from twisted.internet import reactor
66
 
        listener = yield TCP4ServerEndpoint(reactor, 9887).listen(FakeProtocolFactory())
67
 
 
68
 
        try:
69
 
            pid = process_from_address('0.0.0.0', 9887, self.fakestate)
70
 
        finally:
71
 
            listener.stopListening()
72
 
 
73
 
        self.assertEqual(pid, os.getpid())
74
 
 
75
 
 
76
 
class TestDelete(unittest.TestCase):
77
 
 
78
 
    def test_delete_file(self):
79
 
        (fd, f) = tempfile.mkstemp()
80
 
        os.write(fd, 'some\ndata\n')
81
 
        os.close(fd)
82
 
        self.assertTrue(os.path.exists(f))
83
 
        delete_file_or_tree(f)
84
 
        self.assertTrue(not os.path.exists(f))
85
 
 
86
 
    def test_delete_tree(self):
87
 
        d = tempfile.mkdtemp()
88
 
        f = open(os.path.join(d, 'foo'), 'w')
89
 
        f.write('foo\n')
90
 
        f.close()
91
 
 
92
 
        self.assertTrue(os.path.exists(d))
93
 
        self.assertTrue(os.path.isdir(d))
94
 
        self.assertTrue(os.path.exists(os.path.join(d, 'foo')))
95
 
 
96
 
        delete_file_or_tree(d)
97
 
 
98
 
        self.assertTrue(not os.path.exists(d))
99
 
        self.assertTrue(not os.path.exists(os.path.join(d, 'foo')))