~ci-train-bot/ubuntu-system-image/system-image-ubuntu-xenial-landing-023

« back to all changes in this revision

Viewing changes to systemimage/download.py

  • Committer: Barry Warsaw
  • Date: 2015-05-08 21:41:15 UTC
  • mfrom: (0.3.2)
  • mto: (240.2.1 system-image)
  • mto: This revision was merged to the branch mainline in revision 241.
  • Revision ID: barry@python.org-20150508214115-7x9811br9w04x9ae
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2013-2014 Canonical Ltd.
 
1
# Copyright (C) 2013-2015 Canonical Ltd.
2
2
# Author: Barry Warsaw <barry@ubuntu.com>
3
3
 
4
4
# This program is free software: you can redistribute it and/or modify
17
17
 
18
18
__all__ = [
19
19
    'Canceled',
20
 
    'DBusDownloadManager',
21
20
    'DuplicateDestinationError',
22
21
    'Record',
 
22
    'get_download_manager',
23
23
    ]
24
24
 
25
25
 
28
28
import logging
29
29
 
30
30
from collections import namedtuple
31
 
from contextlib import suppress
32
31
from io import StringIO
33
32
from pprint import pformat
34
 
from systemimage.config import config
35
 
from systemimage.reactor import Reactor
36
 
from systemimage.settings import Settings
37
 
 
38
 
# The systemimage.testing module will not be available on installed systems
39
 
# unless the system-image-dev binary package is installed, which is not usually
40
 
# the case.  Disable _print() debugging in that case.
41
 
def _print(*args, **kws):
42
 
    with suppress(ImportError):
43
 
        # We must import this here to avoid circular imports.
44
 
        from systemimage.testing.helpers import debug
45
 
        with debug(check_flag=True) as ddlog:
46
 
            ddlog(*args, **kws)
47
 
 
48
 
 
49
 
# Parameterized for testing purposes.
50
 
DOWNLOADER_INTERFACE = 'com.canonical.applications.Downloader'
51
 
MANAGER_INTERFACE = 'com.canonical.applications.DownloadManager'
52
 
OBJECT_NAME = 'com.canonical.applications.Downloader'
53
 
OBJECT_INTERFACE = 'com.canonical.applications.GroupDownload'
54
 
USER_AGENT = 'Ubuntu System Image Upgrade Client; Build {}'
 
33
 
 
34
try:
 
35
    import pycurl
 
36
except ImportError:                                 # pragma: no cover
 
37
    pycurl = None
55
38
 
56
39
 
57
40
log = logging.getLogger('systemimage')
58
41
 
59
42
 
60
 
def _headers():
61
 
    return {'User-Agent': USER_AGENT.format(config.build_number)}
62
 
 
63
 
 
64
43
class Canceled(Exception):
65
44
    """Raised when the download was canceled."""
66
45
 
89
68
        url=url, destination=destination, checksum=checksum)
90
69
 
91
70
 
92
 
class DownloadReactor(Reactor):
93
 
    def __init__(self, bus, callback=None, pausable=False):
94
 
        super().__init__(bus)
95
 
        self._callback = callback
96
 
        self._pausable = pausable
97
 
        self.error = None
98
 
        self.canceled = False
99
 
        self.received = 0
100
 
        self.total = 0
101
 
        self.react_to('canceled')
102
 
        self.react_to('error')
103
 
        self.react_to('finished')
104
 
        self.react_to('paused')
105
 
        self.react_to('progress')
106
 
        self.react_to('resumed')
107
 
        self.react_to('started')
108
 
 
109
 
    def _do_started(self, signal, path, started):
110
 
        _print('STARTED:', started)
111
 
 
112
 
    def _do_finished(self, signal, path, local_paths):
113
 
        _print('FINISHED:', local_paths)
114
 
        self.quit()
115
 
 
116
 
    def _do_error(self, signal, path, error_message):
117
 
        _print('ERROR:', error_message)
118
 
        log.error(error_message)
119
 
        self.error = error_message
120
 
        self.quit()
121
 
 
122
 
    def _do_progress(self, signal, path, received, total):
123
 
        self.received = received
124
 
        self.total = total
125
 
        _print('PROGRESS:', received, total)
126
 
        if self._callback is not None:
127
 
            # Be defensive, so yes, use a bare except.  If an exception occurs
128
 
            # in the callback, log it, but continue onward.
129
 
            try:
130
 
                self._callback(received, total)
131
 
            except:
132
 
                log.exception('Exception in progress callback')
133
 
 
134
 
    def _do_canceled(self, signal, path, canceled):
135
 
        # Why would we get this signal if it *wasn't* canceled?  Anyway,
136
 
        # this'll be a D-Bus data type so converted it to a vanilla Python
137
 
        # boolean.
138
 
        _print('CANCELED:', canceled)
139
 
        self.canceled = bool(canceled)
140
 
        self.quit()
141
 
 
142
 
    def _do_paused(self, signal, path, paused):
143
 
        _print('PAUSE:', paused, self._pausable)
144
 
        send_paused = self._pausable and config.dbus_service is not None
145
 
        if send_paused:                             # pragma: no branch
146
 
            # We could plumb through the `service` object from service.py (the
147
 
            # main entry point for system-image-dbus, but that's actually a
148
 
            # bit of a pain, so do the expedient thing and grab the interface
149
 
            # here.
150
 
            percentage = (int(self.received / self.total * 100.0)
151
 
                          if self.total > 0 else 0)
152
 
            config.dbus_service.UpdatePaused(percentage)
153
 
 
154
 
    def _do_resumed(self, signal, path, resumed):
155
 
        _print('RESUME:', resumed)
156
 
        # There currently is no UpdateResumed() signal.
157
 
 
158
 
    def _default(self, *args, **kws):
159
 
        _print('SIGNAL:', args, kws)                # pragma: no cover
160
 
 
161
 
 
162
 
class DBusDownloadManager:
163
 
    def __init__(self, callback=None):
 
71
class DownloadManagerBase:
 
72
    """Base class for all download managers."""
 
73
 
 
74
    def __init__(self):
164
75
        """
165
76
        :param callback: If given, a function that is called every so often
166
77
            during downloading.
168
79
            of bytes received so far, and the total amount of bytes to be
169
80
            downloaded.
170
81
        """
171
 
        self._iface = None
 
82
        # This is a list of functions that are called every so often during
 
83
        # downloading.  Functions in this list take two arguments, the number
 
84
        # of bytes received so far, and the total amount of bytes to be
 
85
        # downloaded.
 
86
        self.callbacks = []
 
87
        self.total = 0
 
88
        self.received = 0
172
89
        self._queued_cancel = False
173
 
        self.callback = callback
174
90
 
175
91
    def __repr__(self): # pragma: no cover
176
 
        return '<DBusDownloadManager at 0x{:x}>'.format(id(self))
 
92
        return '<{} at 0x{:x}>'.format(self.__class__.__name__, id(self))
 
93
 
 
94
    def _get_download_records(self, downloads):
 
95
        """Convert the downloads items to download records."""
 
96
        records = [item if isinstance(item, _RecordType) else Record(*item)
 
97
                   for item in downloads]
 
98
        destinations = set(record.destination for record in records)
 
99
        # Check for duplicate destinations, specifically for a local file path
 
100
        # coming from two different sources.  It's okay if there are duplicate
 
101
        # destination records in the download request, but each of those must
 
102
        # be specified by the same source url and have the same checksum.
 
103
        #
 
104
        # An easy quick check just asks if the set of destinations is smaller
 
105
        # than the total number of requested downloads.  It can't be larger.
 
106
        # If it *is* smaller, then there are some duplicates, however the
 
107
        # duplicates may be legitimate, so look at the details.
 
108
        #
 
109
        # Note though that we cannot pass duplicates destinations to udm, so we
 
110
        # have to filter out legitimate duplicates.  That's fine since they
 
111
        # really are pointing to the same file, and will end up in the
 
112
        # destination location.
 
113
        if len(destinations) < len(downloads):
 
114
            by_destination = dict()
 
115
            unique_downloads = set()
 
116
            for record in records:
 
117
                by_destination.setdefault(record.destination, set()).add(
 
118
                    record)
 
119
                unique_downloads.add(record)
 
120
            duplicates = []
 
121
            for dst, seen in by_destination.items():
 
122
                if len(seen) > 1:
 
123
                    # Tuples will look better in the pretty-printed output.
 
124
                    duplicates.append(
 
125
                        (dst, sorted(tuple(dup) for dup in seen)))
 
126
            if len(duplicates) > 0:
 
127
                raise DuplicateDestinationError(sorted(duplicates))
 
128
            # Uniquify the downloads.
 
129
            records = list(unique_downloads)
 
130
        return records
 
131
 
 
132
    def _do_callback(self):
 
133
        # Be defensive, so yes, use a bare except.  If an exception occurs in
 
134
        # the callback, log it, but continue onward.
 
135
        for callback in self.callbacks:
 
136
            try:
 
137
                callback(self.received, self.total)
 
138
            except:
 
139
                log.exception('Exception in progress callback')
 
140
 
 
141
    def cancel(self):
 
142
        """Cancel any current downloads."""
 
143
        self._queued_cancel = True
 
144
 
 
145
    def pause(self):
 
146
        """Pause the download, but only if one is in progress."""
 
147
        pass                                        # pragma: no cover
 
148
 
 
149
    def resume(self):
 
150
        """Resume the download, but only if one is in progress."""
 
151
        pass                                        # pragma: no cover
 
152
 
 
153
    def _get_files(self, records, pausable):
 
154
        raise NotImplementedError                   # pragma: no cover
177
155
 
178
156
    def get_files(self, downloads, *, pausable=False):
179
157
        """Download a bunch of files concurrently.
204
182
        :raises: DuplicateDestinationError if more than one source url is
205
183
            downloaded to the same destination file.
206
184
        """
207
 
        assert self._iface is None
208
185
        if self._queued_cancel:
209
186
            # A cancel is queued, so don't actually download anything.
210
187
            raise Canceled
211
188
        if len(downloads) == 0:
212
189
            # Nothing to download.  See LP: #1245597.
213
190
            return
214
 
        # Convert the downloads items to download records.
215
 
        records = [item if isinstance(item, _RecordType) else Record(*item)
216
 
                   for item in downloads]
217
 
        destinations = set(record.destination for record in records)
218
 
        # Check for duplicate destinations, specifically for a local file path
219
 
        # coming from two different sources.  It's okay if there are duplicate
220
 
        # destination records in the download request, but each of those must
221
 
        # be specified by the same source url and have the same checksum.
222
 
        #
223
 
        # An easy quick check just asks if the set of destinations is smaller
224
 
        # than the total number of requested downloads.  It can't be larger.
225
 
        # If it *is* smaller, then there are some duplicates, however the
226
 
        # duplicates may be legitimate, so look at the details.
227
 
        #
228
 
        # Note though that we cannot pass duplicates destinations to udm,
229
 
        # so we have to filter out legitimate duplicates.  That's fine since
230
 
        # they really are pointing to the same file, and will end up in the
231
 
        # destination location.
232
 
        if len(destinations) < len(downloads):
233
 
            by_destination = dict()
234
 
            unique_downloads = set()
235
 
            for record in records:
236
 
                by_destination.setdefault(record.destination, set()).add(
237
 
                    record)
238
 
                unique_downloads.add(record)
239
 
            duplicates = []
240
 
            for dst, seen in by_destination.items():
241
 
                if len(seen) > 1:
242
 
                    # Tuples will look better in the pretty-printed output.
243
 
                    duplicates.append(
244
 
                        (dst, sorted(tuple(dup) for dup in seen)))
245
 
            if len(duplicates) > 0:
246
 
                raise DuplicateDestinationError(sorted(duplicates))
247
 
            # Uniquify the downloads.
248
 
            records = list(unique_downloads)
249
 
        bus = dbus.SystemBus()
250
 
        service = bus.get_object(DOWNLOADER_INTERFACE, '/')
251
 
        iface = dbus.Interface(service, MANAGER_INTERFACE)
252
 
        # Better logging of the requested downloads.
 
191
        records = self._get_download_records(downloads)
 
192
        # Better logging of the requested downloads.  However, we want the
 
193
        # entire block of multiline log output to appear under a single
 
194
        # timestamp.
253
195
        fp = StringIO()
254
196
        print('[0x{:x}] Requesting group download:'.format(id(self)), file=fp)
255
197
        for record in records:
258
200
            else:
259
201
                print('\t{} [{}] -> {}'.format(*record), file=fp)
260
202
        log.info('{}'.format(fp.getvalue()))
261
 
        object_path = iface.createDownloadGroup(
262
 
            records,
263
 
            'sha256',
264
 
            False,        # Don't allow GSM yet.
265
 
            # https://bugs.freedesktop.org/show_bug.cgi?id=55594
266
 
            dbus.Dictionary(signature='sv'),
267
 
            _headers())
268
 
        download = bus.get_object(OBJECT_NAME, object_path)
269
 
        self._iface = dbus.Interface(download, OBJECT_INTERFACE)
270
 
        # Are GSM downloads allowed?  Yes, except if auto_download is set to 1
271
 
        # (i.e. wifi-only).
272
 
        allow_gsm = Settings().get('auto_download') != '1'
273
 
        DBusDownloadManager._set_gsm(self._iface, allow_gsm=allow_gsm)
274
 
        # Start the download.
275
 
        reactor = DownloadReactor(bus, self.callback, pausable)
276
 
        reactor.schedule(self._iface.start)
277
 
        log.info('[0x{:x}] Running group download reactor', id(self))
278
 
        reactor.run()
279
 
        # This download is complete so the object path is no longer
280
 
        # applicable.  Setting this to None will cause subsequent cancels to
281
 
        # be queued.
282
 
        self._iface = None
283
 
        log.info('[0x{:x}] Group download reactor done', id(self))
284
 
        if reactor.error is not None:
285
 
            log.error('Reactor error: {}'.format(reactor.error))
286
 
        if reactor.canceled:
287
 
            log.info('Reactor canceled')
288
 
        # Report any other problems.
289
 
        if reactor.error is not None:
290
 
            raise FileNotFoundError(reactor.error)
291
 
        if reactor.canceled:
292
 
            raise Canceled
293
 
        if reactor.timed_out:
294
 
            raise TimeoutError
295
 
        # For sanity.
296
 
        for record in records:
297
 
            assert os.path.exists(record.destination), (
298
 
                'Missing destination: {}'.format(record))
299
 
 
300
 
    @staticmethod
301
 
    def _set_gsm(iface, *, allow_gsm):
302
 
        # This is a separate method for easier testing via mocks.
303
 
        iface.allowGSMDownload(allow_gsm)
304
 
 
305
 
    def cancel(self):
306
 
        """Cancel any current downloads."""
307
 
        if self._iface is None:
308
 
            # Since there's no download in progress right now, there's nothing
309
 
            # to cancel.  Setting this flag queues the cancel signal once the
310
 
            # reactor starts running again.  Yes, this is a bit weird, but if
311
 
            # we don't do it this way, the caller will immediately get a
312
 
            # Canceled exception, which isn't helpful because it's expecting
313
 
            # one when the next download begins.
314
 
            self._queued_cancel = True
 
203
        self._get_files(records, pausable)
 
204
 
 
205
 
 
206
def get_download_manager(*args):
 
207
    # We have to avoid circular imports since both download managers import
 
208
    # various things from this module.
 
209
    from systemimage.curl import CurlDownloadManager
 
210
    from systemimage.udm import DOWNLOADER_INTERFACE, UDMDownloadManager
 
211
    # Detect if we have ubuntu-download-manager.
 
212
    #
 
213
    # Use PyCURL based downloader if no udm is found, or if the environment
 
214
    # variable is set.  However, if we're told to use PyCURL and it's
 
215
    # unavailable, throw an exception.
 
216
    cls = None
 
217
    use_pycurl = os.environ.get('SYSTEMIMAGE_PYCURL')
 
218
    if use_pycurl is None:
 
219
        # Auto-detect.  For backward compatibility, use udm if it's available,
 
220
        # otherwise use PyCURL.
 
221
        try:
 
222
            bus = dbus.SystemBus()
 
223
            bus.get_object(DOWNLOADER_INTERFACE, '/')
 
224
            udm_available = True
 
225
        except dbus.exceptions.DBusException:
 
226
            udm_available = False
 
227
        if udm_available:
 
228
            cls = UDMDownloadManager
 
229
        elif pycurl is None:
 
230
            raise ImportError('No module named {}'.format('pycurl'))
315
231
        else:
316
 
            self._iface.cancel()
317
 
 
318
 
    def pause(self):
319
 
        """Pause the download, but only if one is in progress."""
320
 
        if self._iface is not None:                 # pragma: no branch
321
 
            self._iface.pause()
322
 
 
323
 
    def resume(self):
324
 
        """Resume the download, but only if one is in progress."""
325
 
        if self._iface is not None:                 # pragma: no branch
326
 
            self._iface.resume()
 
232
            cls = CurlDownloadManager
 
233
    else:
 
234
        cls = (CurlDownloadManager
 
235
               if use_pycurl.lower() in ('1', 'yes', 'true')
 
236
               else UDMDownloadManager)
 
237
    return cls(*args)