30
30
from collections import namedtuple
31
from contextlib import suppress
32
31
from io import StringIO
33
32
from pprint import pformat
34
from systemimage.config import config
35
from systemimage.reactor import Reactor
36
from systemimage.settings import Settings
38
# The systemimage.testing module will not be available on installed systems
39
# unless the system-image-dev binary package is installed, which is not usually
40
# the case. Disable _print() debugging in that case.
41
def _print(*args, **kws):
42
with suppress(ImportError):
43
# We must import this here to avoid circular imports.
44
from systemimage.testing.helpers import debug
45
with debug(check_flag=True) as ddlog:
49
# Parameterized for testing purposes.
50
DOWNLOADER_INTERFACE = 'com.canonical.applications.Downloader'
51
MANAGER_INTERFACE = 'com.canonical.applications.DownloadManager'
52
OBJECT_NAME = 'com.canonical.applications.Downloader'
53
OBJECT_INTERFACE = 'com.canonical.applications.GroupDownload'
54
USER_AGENT = 'Ubuntu System Image Upgrade Client; Build {}'
36
except ImportError: # pragma: no cover
57
40
log = logging.getLogger('systemimage')
61
return {'User-Agent': USER_AGENT.format(config.build_number)}
64
43
class Canceled(Exception):
65
44
"""Raised when the download was canceled."""
89
68
url=url, destination=destination, checksum=checksum)
92
class DownloadReactor(Reactor):
93
def __init__(self, bus, callback=None, pausable=False):
95
self._callback = callback
96
self._pausable = pausable
101
self.react_to('canceled')
102
self.react_to('error')
103
self.react_to('finished')
104
self.react_to('paused')
105
self.react_to('progress')
106
self.react_to('resumed')
107
self.react_to('started')
109
def _do_started(self, signal, path, started):
110
_print('STARTED:', started)
112
def _do_finished(self, signal, path, local_paths):
113
_print('FINISHED:', local_paths)
116
def _do_error(self, signal, path, error_message):
117
_print('ERROR:', error_message)
118
log.error(error_message)
119
self.error = error_message
122
def _do_progress(self, signal, path, received, total):
123
self.received = received
125
_print('PROGRESS:', received, total)
126
if self._callback is not None:
127
# Be defensive, so yes, use a bare except. If an exception occurs
128
# in the callback, log it, but continue onward.
130
self._callback(received, total)
132
log.exception('Exception in progress callback')
134
def _do_canceled(self, signal, path, canceled):
135
# Why would we get this signal if it *wasn't* canceled? Anyway,
136
# this'll be a D-Bus data type so converted it to a vanilla Python
138
_print('CANCELED:', canceled)
139
self.canceled = bool(canceled)
142
def _do_paused(self, signal, path, paused):
143
_print('PAUSE:', paused, self._pausable)
144
send_paused = self._pausable and config.dbus_service is not None
145
if send_paused: # pragma: no branch
146
# We could plumb through the `service` object from service.py (the
147
# main entry point for system-image-dbus, but that's actually a
148
# bit of a pain, so do the expedient thing and grab the interface
150
percentage = (int(self.received / self.total * 100.0)
151
if self.total > 0 else 0)
152
config.dbus_service.UpdatePaused(percentage)
154
def _do_resumed(self, signal, path, resumed):
155
_print('RESUME:', resumed)
156
# There currently is no UpdateResumed() signal.
158
def _default(self, *args, **kws):
159
_print('SIGNAL:', args, kws) # pragma: no cover
162
class DBusDownloadManager:
163
def __init__(self, callback=None):
71
class DownloadManagerBase:
72
"""Base class for all download managers."""
165
76
:param callback: If given, a function that is called every so often
166
77
during downloading.
168
79
of bytes received so far, and the total amount of bytes to be
82
# This is a list of functions that are called every so often during
83
# downloading. Functions in this list take two arguments, the number
84
# of bytes received so far, and the total amount of bytes to be
172
89
self._queued_cancel = False
173
self.callback = callback
175
91
def __repr__(self): # pragma: no cover
176
return '<DBusDownloadManager at 0x{:x}>'.format(id(self))
92
return '<{} at 0x{:x}>'.format(self.__class__.__name__, id(self))
94
def _get_download_records(self, downloads):
95
"""Convert the downloads items to download records."""
96
records = [item if isinstance(item, _RecordType) else Record(*item)
97
for item in downloads]
98
destinations = set(record.destination for record in records)
99
# Check for duplicate destinations, specifically for a local file path
100
# coming from two different sources. It's okay if there are duplicate
101
# destination records in the download request, but each of those must
102
# be specified by the same source url and have the same checksum.
104
# An easy quick check just asks if the set of destinations is smaller
105
# than the total number of requested downloads. It can't be larger.
106
# If it *is* smaller, then there are some duplicates, however the
107
# duplicates may be legitimate, so look at the details.
109
# Note though that we cannot pass duplicates destinations to udm, so we
110
# have to filter out legitimate duplicates. That's fine since they
111
# really are pointing to the same file, and will end up in the
112
# destination location.
113
if len(destinations) < len(downloads):
114
by_destination = dict()
115
unique_downloads = set()
116
for record in records:
117
by_destination.setdefault(record.destination, set()).add(
119
unique_downloads.add(record)
121
for dst, seen in by_destination.items():
123
# Tuples will look better in the pretty-printed output.
125
(dst, sorted(tuple(dup) for dup in seen)))
126
if len(duplicates) > 0:
127
raise DuplicateDestinationError(sorted(duplicates))
128
# Uniquify the downloads.
129
records = list(unique_downloads)
132
def _do_callback(self):
133
# Be defensive, so yes, use a bare except. If an exception occurs in
134
# the callback, log it, but continue onward.
135
for callback in self.callbacks:
137
callback(self.received, self.total)
139
log.exception('Exception in progress callback')
142
"""Cancel any current downloads."""
143
self._queued_cancel = True
146
"""Pause the download, but only if one is in progress."""
147
pass # pragma: no cover
150
"""Resume the download, but only if one is in progress."""
151
pass # pragma: no cover
153
def _get_files(self, records, pausable):
154
raise NotImplementedError # pragma: no cover
178
156
def get_files(self, downloads, *, pausable=False):
179
157
"""Download a bunch of files concurrently.
204
182
:raises: DuplicateDestinationError if more than one source url is
205
183
downloaded to the same destination file.
207
assert self._iface is None
208
185
if self._queued_cancel:
209
186
# A cancel is queued, so don't actually download anything.
211
188
if len(downloads) == 0:
212
189
# Nothing to download. See LP: #1245597.
214
# Convert the downloads items to download records.
215
records = [item if isinstance(item, _RecordType) else Record(*item)
216
for item in downloads]
217
destinations = set(record.destination for record in records)
218
# Check for duplicate destinations, specifically for a local file path
219
# coming from two different sources. It's okay if there are duplicate
220
# destination records in the download request, but each of those must
221
# be specified by the same source url and have the same checksum.
223
# An easy quick check just asks if the set of destinations is smaller
224
# than the total number of requested downloads. It can't be larger.
225
# If it *is* smaller, then there are some duplicates, however the
226
# duplicates may be legitimate, so look at the details.
228
# Note though that we cannot pass duplicates destinations to udm,
229
# so we have to filter out legitimate duplicates. That's fine since
230
# they really are pointing to the same file, and will end up in the
231
# destination location.
232
if len(destinations) < len(downloads):
233
by_destination = dict()
234
unique_downloads = set()
235
for record in records:
236
by_destination.setdefault(record.destination, set()).add(
238
unique_downloads.add(record)
240
for dst, seen in by_destination.items():
242
# Tuples will look better in the pretty-printed output.
244
(dst, sorted(tuple(dup) for dup in seen)))
245
if len(duplicates) > 0:
246
raise DuplicateDestinationError(sorted(duplicates))
247
# Uniquify the downloads.
248
records = list(unique_downloads)
249
bus = dbus.SystemBus()
250
service = bus.get_object(DOWNLOADER_INTERFACE, '/')
251
iface = dbus.Interface(service, MANAGER_INTERFACE)
252
# Better logging of the requested downloads.
191
records = self._get_download_records(downloads)
192
# Better logging of the requested downloads. However, we want the
193
# entire block of multiline log output to appear under a single
254
196
print('[0x{:x}] Requesting group download:'.format(id(self)), file=fp)
255
197
for record in records:
259
201
print('\t{} [{}] -> {}'.format(*record), file=fp)
260
202
log.info('{}'.format(fp.getvalue()))
261
object_path = iface.createDownloadGroup(
264
False, # Don't allow GSM yet.
265
# https://bugs.freedesktop.org/show_bug.cgi?id=55594
266
dbus.Dictionary(signature='sv'),
268
download = bus.get_object(OBJECT_NAME, object_path)
269
self._iface = dbus.Interface(download, OBJECT_INTERFACE)
270
# Are GSM downloads allowed? Yes, except if auto_download is set to 1
272
allow_gsm = Settings().get('auto_download') != '1'
273
DBusDownloadManager._set_gsm(self._iface, allow_gsm=allow_gsm)
274
# Start the download.
275
reactor = DownloadReactor(bus, self.callback, pausable)
276
reactor.schedule(self._iface.start)
277
log.info('[0x{:x}] Running group download reactor', id(self))
279
# This download is complete so the object path is no longer
280
# applicable. Setting this to None will cause subsequent cancels to
283
log.info('[0x{:x}] Group download reactor done', id(self))
284
if reactor.error is not None:
285
log.error('Reactor error: {}'.format(reactor.error))
287
log.info('Reactor canceled')
288
# Report any other problems.
289
if reactor.error is not None:
290
raise FileNotFoundError(reactor.error)
293
if reactor.timed_out:
296
for record in records:
297
assert os.path.exists(record.destination), (
298
'Missing destination: {}'.format(record))
301
def _set_gsm(iface, *, allow_gsm):
302
# This is a separate method for easier testing via mocks.
303
iface.allowGSMDownload(allow_gsm)
306
"""Cancel any current downloads."""
307
if self._iface is None:
308
# Since there's no download in progress right now, there's nothing
309
# to cancel. Setting this flag queues the cancel signal once the
310
# reactor starts running again. Yes, this is a bit weird, but if
311
# we don't do it this way, the caller will immediately get a
312
# Canceled exception, which isn't helpful because it's expecting
313
# one when the next download begins.
314
self._queued_cancel = True
203
self._get_files(records, pausable)
206
def get_download_manager(*args):
207
# We have to avoid circular imports since both download managers import
208
# various things from this module.
209
from systemimage.curl import CurlDownloadManager
210
from systemimage.udm import DOWNLOADER_INTERFACE, UDMDownloadManager
211
# Detect if we have ubuntu-download-manager.
213
# Use PyCURL based downloader if no udm is found, or if the environment
214
# variable is set. However, if we're told to use PyCURL and it's
215
# unavailable, throw an exception.
217
use_pycurl = os.environ.get('SYSTEMIMAGE_PYCURL')
218
if use_pycurl is None:
219
# Auto-detect. For backward compatibility, use udm if it's available,
220
# otherwise use PyCURL.
222
bus = dbus.SystemBus()
223
bus.get_object(DOWNLOADER_INTERFACE, '/')
225
except dbus.exceptions.DBusException:
226
udm_available = False
228
cls = UDMDownloadManager
230
raise ImportError('No module named {}'.format('pycurl'))
319
"""Pause the download, but only if one is in progress."""
320
if self._iface is not None: # pragma: no branch
324
"""Resume the download, but only if one is in progress."""
325
if self._iface is not None: # pragma: no branch
232
cls = CurlDownloadManager
234
cls = (CurlDownloadManager
235
if use_pycurl.lower() in ('1', 'yes', 'true')
236
else UDMDownloadManager)