~azzar1/update-notifier/hide-livepatch-indicator-if-disabled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
#!/usr/bin/python3

import hashlib
import http.server as SimpleHTTPServer
import os
import random
import shutil
import socketserver
import sys
import time
import tempfile
import unittest

from mock import patch
from multiprocessing import Process

sys.path.insert(0, "data")
sys.path.insert(1, "../data")
import package_data_downloader  # nopep8


class TestHttpd(Process):
    def __init__(self, port, servdir):
        super(TestHttpd, self).__init__()
        self.port = port
        self.servdir = servdir

    def run(self):
        os.chdir(self.servdir)
        Handler = SimpleHTTPServer.SimpleHTTPRequestHandler
        httpd = socketserver.TCPServer(("localhost", self.port), Handler)
        httpd.serve_forever()


class DownloadTestCase(unittest.TestCase):

    def setUp(self):
        self.tempdir = tempfile.mkdtemp()
        self._setup_canary_file()
        self.PORT = random.randint(1025, 60000)
        self.p = TestHttpd(self.PORT, self.tempdir)
        self.p.start()

    def tearDown(self):
        self.p.terminate()
        shutil.rmtree(self.tempdir)

    def _setup_canary_file(self):
        self.canary_text = "meep"
        self.canary_file = os.path.join(self.tempdir, "canary-file.txt")
        with open(self.canary_file, "w") as f:
            f.write(self.canary_text)
        self.canary_hashsum = hashlib.sha256(
            self.canary_text.encode('utf-8')).hexdigest()

    def test_download_file(self):
        package_data_downloader.STAMPDIR = self.tempdir
        os.makedirs(os.path.join(self.tempdir, "partial"))
        uri = "http://localhost:%s/%s" % (
            self.PORT, os.path.basename(self.canary_file))
        destfile = package_data_downloader.download_file(
            uri, self.canary_hashsum)
        self.assertNotEqual(destfile, None)
        self.assertEqual(hashlib.sha256(
            open(destfile).read().encode('utf-8')).hexdigest(),
            self.canary_hashsum)


class ProcessDownloadRequestsTestCase(unittest.TestCase):

    def setUp(self):
        self.tempdir = tempfile.mkdtemp()
        self._setup_canary_file()
        self.PORT = random.randint(1025, 60000)
        self.p = TestHttpd(self.PORT, self.tempdir)
        self.p.start()
        self.tempdir = tempfile.mkdtemp()
        # stampdir
        stampdir = os.path.join(self.tempdir, "stampdir")
        os.makedirs(stampdir)
        package_data_downloader.STAMPDIR = stampdir
        os.makedirs(os.path.join(stampdir, "partial"))
        # datadir
        datadir = os.path.join(self.tempdir, "datadir")
        os.makedirs(datadir)
        package_data_downloader.DATADIR = datadir
        # override the location of the notifier files
        notifierdir = os.path.join(self.tempdir, "notifierdir")
        os.makedirs(notifierdir)
        package_data_downloader.NOTIFIER_FILE = \
            os.path.join(notifierdir, "data-downloads-failed")
        package_data_downloader.NOTIFIER_PERMANENT_FILE = \
            package_data_downloader.NOTIFIER_FILE + "-permanently"

    def tearDown(self):
        self.p.terminate()
        shutil.rmtree(self.tempdir)

    def _setup_canary_file(self):
        self.canary_text = "meep"
        self.canary_file = os.path.join(self.tempdir, "canary-file.txt")
        with open(self.canary_file, "w") as f:
            f.write(self.canary_text)
        self.canary_hashsum = hashlib.sha256(
            self.canary_text.encode('utf-8')).hexdigest()

    def _setup_hook_file(self, filename, script="/bin/true"):
        with open(os.path.join(package_data_downloader.DATADIR,
                  filename), "w") as foo:
            foo.write("Url: http://localhost:%s/%s\n" %
                      (self.PORT, os.path.basename(self.canary_file)))
            foo.write("Sha256: %s\n" % self.canary_hashsum)
            foo.write("\n")
            foo.write("Script: %s" % script)

    def test_hookfile_older_than_stampfile(self):
        # hook file older than stamp file nothing happens
        # create the hook file
        hookfile = "older-hookfile"
        self._setup_hook_file(hookfile)
        time.sleep(0.01)
        # create the stamp file
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        orig_stamp_time = os.stat(stampfile).st_mtime
        package_data_downloader.process_download_requests()
        new_stamp_time = os.stat(stampfile).st_mtime
        self.assertEqual(orig_stamp_time, new_stamp_time)
        # confirm failure files not created
        self.assertFalse(os.path.exists(
            os.path.join(package_data_downloader.STAMPDIR,
                         "%s.permanent-failure" % hookfile)))
        self.assertFalse(os.path.exists(package_data_downloader.NOTIFIER_FILE))

    def test_stampfile_older_than_hookfile(self):
        # hook file newer than stampfile, download, run script, update
        # stampfile
        hookfile = "older-stampfile"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        orig_stamp_date = os.stat(stampfile).st_mtime
        time.sleep(0.01)
        # create the hook file
        self._setup_hook_file(hookfile)
        package_data_downloader.process_download_requests()
        # the download succeeded so the stamp file is touched
        new_stamp_date = os.stat(stampfile).st_mtime
        self.assertGreater(new_stamp_date, orig_stamp_date)
        # confirm failure files not created
        self.assertFalse(os.path.exists(
            os.path.join(package_data_downloader.STAMPDIR,
                         "%s.permanent-failure" % hookfile)))
        self.assertFalse(os.path.exists(package_data_downloader.NOTIFIER_FILE))

    def test_only_retry_failure(self):
        # two hook files but only has failed
        hookfile = "success"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
            pass
        orig_stamp_date = os.stat(stampfile).st_mtime
        time.sleep(0.01)
        # create the hook file
        self._setup_hook_file(hookfile)
        fail_hookfile = "failure"
        # the stampfile indicates a failure
        fail_stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                      "%s.failed" % fail_hookfile)
        with open(fail_stampfile, "w"):
            pass
        time.sleep(0.01)
        # create the hook file
        self._setup_hook_file(fail_hookfile)
        # create an empty notifier file
        with open(package_data_downloader.NOTIFIER_FILE, "w"):
            pass
        package_data_downloader.process_download_requests()
        new_stamp_date = os.stat(stampfile).st_mtime
        # if the downloaded succeeded it shouldn't be done again
        self.assertEqual(new_stamp_date, orig_stamp_date)

    def test_hookfile_script_fails(self):
        # script in the hook file fails, failure recorded as permanent
        hookfile = "script-failure"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        time.sleep(0.01)
        # create the hook file
        self._setup_hook_file(hookfile, "/bin/false")
        package_data_downloader.process_download_requests()
        # script failures are considered permanent
        self.assertTrue(os.path.exists(
            os.path.join(package_data_downloader.STAMPDIR,
                         "%s.permanent-failure" % hookfile)))

    def test_stampfile_and_notifierfile(self):
        # notifier file removed on successful download
        hookfile = "stampfile_and_notifierfile"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        time.sleep(0.01)
        self._setup_hook_file(hookfile)
        # create an empty notifier file
        with open(package_data_downloader.NOTIFIER_FILE, "w"):
            pass
        package_data_downloader.process_download_requests()
        self.assertFalse(os.path.exists(package_data_downloader.NOTIFIER_FILE))

    def test_stampfile_notifierfile_and_notifierpermanent(self):
        # both notifier files removed on successful download
        hookfile = "stampfile_notifierfile_and_notifierpermanentfile"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        time.sleep(0.01)
        self._setup_hook_file(hookfile)
        # create empty notifier files
        with open(package_data_downloader.NOTIFIER_FILE, "w"):
            pass
        with open(package_data_downloader.NOTIFIER_PERMANENT_FILE, "w"):
            pass
        package_data_downloader.process_download_requests()
        self.assertFalse(os.path.exists(package_data_downloader.NOTIFIER_FILE))
        self.assertFalse(os.path.exists(
            package_data_downloader.NOTIFIER_PERMANENT_FILE))

    def test_stampfile_and_notifierpermanent(self):
        # permanent notifier file removed on successful download
        hookfile = "stampfile_and_permanentnotifierfile"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        time.sleep(0.01)
        self._setup_hook_file(hookfile)
        # create an empty permanent notifier file
        with open(package_data_downloader.NOTIFIER_PERMANENT_FILE, "w"):
            pass
        package_data_downloader.process_download_requests()
        self.assertFalse(os.path.exists(
            package_data_downloader.NOTIFIER_PERMANENT_FILE))

    def test_hookfile_download_failure(self):
        # can't download the file, non-permanent failure created
        hookfile = "download-failure"
        stampfile = os.path.join(package_data_downloader.STAMPDIR,
                                 hookfile)
        with open(stampfile, "w"):
                pass
        time.sleep(0.01)
        # overwrite canary file to create a failure
        self.canary_file = "not-there.txt"
        # create the hook file
        self._setup_hook_file(hookfile)
        package_data_downloader.process_download_requests()
        self.assertTrue(os.path.exists(
            os.path.join(package_data_downloader.STAMPDIR,
                         "%s.failed" % hookfile)))
        self.assertFalse(os.path.exists(
            os.path.join(package_data_downloader.STAMPDIR,
                         "%s.permanent-failure" % hookfile)))

    def test_hookfile_notifierfile_download_failure(self):
        # can't download the file, notifier file stays around
        hookfile = "download-failure-with-notifierfile"
        # overwrite canary file to create a failure
        self.canary_file = "not-there.txt"
        # create the hook file
        self._setup_hook_file(hookfile)
        # create an empty notifier file
        with open(package_data_downloader.NOTIFIER_FILE, "w"):
            pass
        package_data_downloader.process_download_requests()
        # because there was a failure it shouldn't be removed
        self.assertTrue(os.path.exists(
            package_data_downloader.NOTIFIER_FILE))


class PackageDataDownloaderTestCase(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp()
        # stampdir
        stampdir = os.path.join(self.tmpdir, "stampdir")
        os.makedirs(stampdir)
        package_data_downloader.STAMPDIR = stampdir
        # datadir
        datadir = os.path.join(self.tmpdir, "datadir")
        os.makedirs(datadir)
        package_data_downloader.DATADIR = datadir

    def tearDown(self):
        shutil.rmtree(self.tmpdir)

    def test_wrong_template_translations(self):
        package_data_downloader.NOTIFIER_SOURCE_FILE = \
            'data/package-data-downloads-failed.in'
        package_data_downloader.NOTIFIER_FILE = \
            self.tmpdir + "/data-downloads-failed"
        package_data_downloader.NOTIFIER_PERMANENT_SOURCE_FILE = \
            'data/package-data-downloads-failed-permanently.in'
        package_data_downloader.NOTIFIER_PERMANENT_FILE = \
            package_data_downloader.NOTIFIER_FILE + '-permanently'
        package_data_downloader.trigger_update_notifier([], False)
        package_data_downloader.trigger_update_notifier([], True)

    def test_permanently_failed(self):
        # create a bunch of files using the provided mechanism
        test_files = ["foo.permanent-failure", "bar.failure", "baz"]
        for f in test_files:
            package_data_downloader.create_or_update_stampfile(
                os.path.join(package_data_downloader.STAMPDIR, f))
        self.assertEqual(
            sorted(os.listdir(package_data_downloader.STAMPDIR)),
            sorted(test_files))
        # test hook_is_permanently_failed()
        self.assertTrue(
            package_data_downloader.hook_is_permanently_failed("foo"))
        self.assertFalse(
            package_data_downloader.hook_is_permanently_failed("bar"))
        self.assertFalse(
            package_data_downloader.hook_is_permanently_failed("baz"))
        # existing_permanent_failures()
        self.assertEqual(
            package_data_downloader.existing_permanent_failures(),
            ["foo"])

    def test_hook_aged_out(self):
        # test to see if a hook has failed for more than 3 days
        # create a failure file
        with open(os.path.join(package_data_downloader.STAMPDIR,
                               "aged.failed"), "w"):
            pass
        from datetime import datetime, timedelta
        # patch datetime so we think its the future
        with patch('package_data_downloader.datetime') as mock_datetime:
            thefuture = (datetime.now() + timedelta(days=3))
            mock_datetime.now.return_value = thefuture
            mock_datetime.fromtimestamp.side_effect = \
                lambda *args, **kw: datetime.fromtimestamp(*args, **kw)
            self.assertEqual(package_data_downloader.datetime.now(),
                             thefuture)
            self.assertTrue(package_data_downloader.hook_aged_out("aged"))

    def test_mark_hook_failed(self):
        # prepare
        package_data_downloader.create_or_update_stampfile(
            os.path.join(package_data_downloader.STAMPDIR, "foo"))
        # temp failure
        package_data_downloader.mark_hook_failed("foo")
        self.assertEqual(os.listdir(package_data_downloader.STAMPDIR),
                         ["foo.failed"])
        self.assertFalse(
            package_data_downloader.hook_is_permanently_failed("foo"))
        # permanent
        package_data_downloader.mark_hook_failed("foo", permanent=True)
        self.assertEqual(os.listdir(package_data_downloader.STAMPDIR),
                         ["foo.permanent-failure"])
        self.assertTrue(
            package_data_downloader.hook_is_permanently_failed("foo"))

    def test_get_hook_file_names(self):
        # test that .dpkg-* is ignored
        test_datadir_files = ["foo.dpkg-new", "bar"]
        for name in test_datadir_files:
            with open(os.path.join(package_data_downloader.DATADIR, name),
                      "w"):
                pass
        self.assertEqual(package_data_downloader.get_hook_file_names(),
                         ["bar"])

    def test_trigger_update_notifier(self):
        # write to tmpdir
        package_data_downloader.NOTIFIER_FILE = os.path.join(
            self.tmpdir, "data-downloads-failed")
        # point to local repo file
        package_data_downloader.NOTIFIER_SOURCE_FILE = \
            "data/package-data-downloads-failed.in"
        package_data_downloader.trigger_update_notifier(
            failures=["foo", "bar"], permanent=False)
        data = open(package_data_downloader.NOTIFIER_FILE).read()
        self.assertEqual(data, """Priority: High
_Name: Failure to download extra data files
Terminal: True
Command: pkexec /usr/lib/update-notifier/package-data-downloader
_Description:
 The following packages requested additional data downloads after package
 installation, but the data could not be downloaded or could not be
 processed.
 .
 .
   foo, bar
 .
 .
 The download will be attempted again later, or you can try the download
 again now.  Running this command requires an active Internet connection.
""")


if __name__ == "__main__":
    unittest.main()