~ubuntu-branches/ubuntu/karmic/zeroinstall-injector/karmic

« back to all changes in this revision

Viewing changes to tests/testdownload.py

  • Committer: Bazaar Package Importer
  • Author(s): Thomas Leonard
  • Date: 2007-01-23 21:50:46 UTC
  • Revision ID: james.westby@ubuntu.com-20070123215046-3ya2x81i99m5ya8r
Tags: upstream-0.25
ImportĀ upstreamĀ versionĀ 0.25

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python2.3
 
2
import sys, tempfile, os, shutil
 
3
from StringIO import StringIO
 
4
import unittest, signal
 
5
from logging import getLogger, DEBUG, INFO
 
6
#getLogger().setLevel(DEBUG)
 
7
 
 
8
sys.path.insert(0, '..')
 
9
 
 
10
from zeroinstall.injector import model, basedir, autopolicy, gpg, iface_cache, download
 
11
import data
 
12
 
 
13
import server
 
14
 
 
15
class Reply:
 
16
        def __init__(self, reply):
 
17
                self.reply = reply
 
18
 
 
19
        def readline(self):
 
20
                return self.reply
 
21
 
 
22
class TestDownload(unittest.TestCase):
 
23
        def setUp(self):
 
24
                self.config_home = tempfile.mktemp()
 
25
                self.cache_home = tempfile.mktemp()
 
26
                os.environ['XDG_CONFIG_HOME'] = self.config_home
 
27
                os.environ['XDG_CACHE_HOME'] = self.cache_home
 
28
                os.environ['XDG_CACHE_DIRS'] = ''
 
29
                reload(basedir)
 
30
 
 
31
                os.mkdir(self.config_home, 0700)
 
32
                os.mkdir(self.cache_home, 0700)
 
33
                if os.environ.has_key('DISPLAY'):
 
34
                        del os.environ['DISPLAY']
 
35
                self.gnupg_home = tempfile.mktemp()
 
36
                os.environ['GNUPGHOME'] = self.gnupg_home
 
37
                os.mkdir(self.gnupg_home, 0700)
 
38
                stream = tempfile.TemporaryFile()
 
39
                stream.write(data.thomas_key)
 
40
                stream.seek(0)
 
41
                gpg.import_key(stream)
 
42
                iface_cache.iface_cache.__init__()
 
43
                download._downloads = {}
 
44
                self.child = None
 
45
        
 
46
        def tearDown(self):
 
47
                if self.child is not None:
 
48
                        os.kill(self.child, signal.SIGTERM)
 
49
                        os.waitpid(self.child, 0)
 
50
                        self.child = None
 
51
                shutil.rmtree(self.config_home)
 
52
                shutil.rmtree(self.cache_home)
 
53
                shutil.rmtree(self.gnupg_home)
 
54
        
 
55
        def testRejectKey(self):
 
56
                old_out = sys.stdout
 
57
                try:
 
58
                        sys.stdout = StringIO()
 
59
                        self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
 
60
                        policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False)
 
61
                        assert policy.need_download()
 
62
                        sys.stdin = Reply("N\n")
 
63
                        try:
 
64
                                policy.download_and_execute(['Hello'])
 
65
                                assert 0
 
66
                        except model.SafeException, ex:
 
67
                                if "Not signed with a trusted key" not in str(ex):
 
68
                                        raise ex
 
69
                finally:
 
70
                        sys.stdout = old_out
 
71
        
 
72
        def testRejectKeyXML(self):
 
73
                old_out = sys.stdout
 
74
                try:
 
75
                        sys.stdout = StringIO()
 
76
                        self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
 
77
                        policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
 
78
                        assert policy.need_download()
 
79
                        sys.stdin = Reply("N\n")
 
80
                        try:
 
81
                                policy.download_and_execute(['Hello'])
 
82
                                assert 0
 
83
                        except model.SafeException, ex:
 
84
                                if "Not signed with a trusted key" not in str(ex):
 
85
                                        raise
 
86
                finally:
 
87
                        sys.stdout = old_out
 
88
        
 
89
        def testImport(self):
 
90
                old_out = sys.stdout
 
91
                try:
 
92
                        from zeroinstall.injector import cli
 
93
                        from zeroinstall.injector.trust import trust_db
 
94
                        sys.stdout = StringIO()
 
95
                        self.child = server.handle_requests('6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
 
96
                        sys.stdin = Reply("Y\n")
 
97
                        try:
 
98
                                assert not trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
 
99
                                try:
 
100
                                        cli.main(['--import', 'Hello'])
 
101
                                        assert 0
 
102
                                except SystemExit, ex:
 
103
                                        assert ex.code == 0
 
104
                                assert trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
 
105
                                # Shouldn't need to prompt the second time
 
106
                                sys.stdin = None
 
107
                                try:
 
108
                                        cli.main(['--import', 'Hello'])
 
109
                                        assert 0
 
110
                                except SystemExit, ex:
 
111
                                        assert ex.code == 0
 
112
                        except model.SafeException, ex:
 
113
                                if "HelloWorld/Missing" not in str(ex):
 
114
                                        raise ex
 
115
                finally:
 
116
                        sys.stdout = old_out
 
117
        
 
118
        def testAcceptKey(self):
 
119
                old_out = sys.stdout
 
120
                try:
 
121
                        sys.stdout = StringIO()
 
122
                        self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
 
123
                        policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False)
 
124
                        assert policy.need_download()
 
125
                        sys.stdin = Reply("Y\n")
 
126
                        try:
 
127
                                policy.download_and_execute(['Hello'], main = 'Missing')
 
128
                                assert 0
 
129
                        except model.SafeException, ex:
 
130
                                if "HelloWorld/Missing" not in str(ex):
 
131
                                        raise ex
 
132
                finally:
 
133
                        sys.stdout = old_out
 
134
        
 
135
        def testRecipe(self):
 
136
                old_out = sys.stdout
 
137
                try:
 
138
                        sys.stdout = StringIO()
 
139
                        self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
 
140
                        policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
 
141
                        try:
 
142
                                policy.download_and_execute([])
 
143
                                assert False
 
144
                        except model.SafeException, ex:
 
145
                                if "HelloWorld/Missing" not in str(ex):
 
146
                                        raise ex
 
147
                finally:
 
148
                        sys.stdout = old_out
 
149
 
 
150
        def testAutopackage(self):
 
151
                old_out = sys.stdout
 
152
                try:
 
153
                        sys.stdout = StringIO()
 
154
                        self.child = server.handle_requests('HelloWorld.autopackage')
 
155
                        policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
 
156
                        try:
 
157
                                policy.download_and_execute([])
 
158
                                assert False
 
159
                        except model.SafeException, ex:
 
160
                                if "HelloWorld/Missing" not in str(ex):
 
161
                                        raise ex
 
162
                finally:
 
163
                        sys.stdout = old_out
 
164
 
 
165
        def testRecipeFailure(self):
 
166
                old_out = sys.stdout
 
167
                try:
 
168
                        sys.stdout = StringIO()
 
169
                        self.child = server.handle_requests('*')
 
170
                        policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
 
171
                        try:
 
172
                                policy.download_and_execute([])
 
173
                                assert False
 
174
                        except download.DownloadError, ex:
 
175
                                if "Connection" not in str(ex):
 
176
                                        raise ex
 
177
                finally:
 
178
                        sys.stdout = old_out
 
179
 
 
180
suite = unittest.makeSuite(TestDownload)
 
181
if __name__ == '__main__':
 
182
        sys.argv.append('-v')
 
183
        unittest.main()