~usb-creator-hackers/usb-creator/trunk

« back to all changes in this revision

Viewing changes to usbcreator/backends/base/backend.py

  • Committer: Benjamin Drung
  • Date: 2022-11-02 13:01:26 UTC
  • Revision ID: benjamin.drung@canonical.com-20221102130126-4z0xyivy5f37dp13
Move to https://code.launchpad.net/~usb-creator-hackers/usb-creator/+git/main

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import usbcreator.install
2
 
from usbcreator import misc
3
 
import logging
4
 
import os
5
 
 
6
 
def abstract(func):
7
 
    def not_implemented(*args):
8
 
        raise NotImplementedError('%s is not implemented by the backend.' %
9
 
                                  func.__name__)
10
 
    return not_implemented
11
 
 
12
 
class Backend:
13
 
    def __init__(self):
14
 
        self.sources = {}
15
 
        self.targets = {}
16
 
        self.current_source = None
17
 
        self.install_thread = None
18
 
 
19
 
    # Public methods.
20
 
 
21
 
    def add_image(self, filename):
22
 
        logging.debug('Backend told to add: %s' % filename)
23
 
        filename = os.path.abspath(os.path.expanduser(filename))
24
 
        if not os.path.isfile(filename):
25
 
            return
26
 
        if filename in self.sources:
27
 
            logging.warn('Source already added.')
28
 
            # TODO evand 2009-07-27: Scroll to source and select.
29
 
            return
30
 
 
31
 
        extension = os.path.splitext(filename)[1]
32
 
        # TODO evand 2009-07-25: What's the equivalent of `file` on Windows?
33
 
        # Going by extension is a bit rough.
34
 
        if not extension:
35
 
            logging.error('File did not have an extension.  '
36
 
                          'Could not determine file type.')
37
 
            # TODO evand 2009-07-26: Error dialog.
38
 
            return
39
 
 
40
 
        extension = extension.lower()
41
 
        if extension == '.iso':
42
 
            label = self._is_casper_cd(filename)
43
 
            if label:
44
 
                self.sources[filename] = {
45
 
                    'device' : filename,
46
 
                    'size' : os.path.getsize(filename),
47
 
                    'label' : label,
48
 
                    'type' : misc.SOURCE_ISO,
49
 
                }
50
 
                if misc.callable(self.source_added_cb):
51
 
                    self.source_added_cb(filename)
52
 
        elif extension == '.img':
53
 
            self.sources[filename] = {
54
 
                'device' : filename,
55
 
                'size' : os.path.getsize(filename),
56
 
                'label' : '',
57
 
                'type' : misc.SOURCE_IMG,
58
 
            }
59
 
            if misc.callable(self.source_added_cb):
60
 
                self.source_added_cb(filename)
61
 
        else:
62
 
            logging.error('Filename extension type not supported.')
63
 
 
64
 
    @abstract
65
 
    def detect_devices(self):
66
 
        pass
67
 
 
68
 
    def set_current_source(self, source):
69
 
        if source == None or source in self.sources:
70
 
            self.current_source = source
71
 
        else:
72
 
            raise KeyError(source)
73
 
        self.update_free()
74
 
 
75
 
    def get_current_source(self):
76
 
        return self.current_source
77
 
 
78
 
    # Common helpers
79
 
 
80
 
    def _device_removed(self, device):
81
 
        logging.debug('Device has been removed from the system: %s' % device)
82
 
        if device in self.sources:
83
 
            if misc.callable(self.source_removed_cb):
84
 
                self.source_removed_cb(device)
85
 
            self.sources.pop(device)
86
 
        elif device in self.targets:
87
 
            if misc.callable(self.target_removed_cb):
88
 
                self.target_removed_cb(device)
89
 
            self.targets.pop(device)
90
 
    
91
 
    # Signals.
92
 
 
93
 
    def source_added_cb(self, drive):
94
 
        pass
95
 
 
96
 
    def target_added_cb(self, drive):
97
 
        pass
98
 
 
99
 
    def source_removed_cb(self, drive):
100
 
        pass
101
 
 
102
 
    def target_removed_cb(self, drive):
103
 
        pass
104
 
 
105
 
    def target_changed_cb(self, udi):
106
 
        pass
107
 
 
108
 
    # Installation signals.
109
 
 
110
 
    def success_cb(self):
111
 
        pass
112
 
 
113
 
    def failure_cb(self, message=None):
114
 
        pass
115
 
 
116
 
    def install_progress_cb(self, complete):
117
 
        pass
118
 
 
119
 
    def install_progress_message_cb(self, message):
120
 
        pass
121
 
 
122
 
    def install_progress_pulse_cb(self):
123
 
        pass
124
 
 
125
 
    def install_progress_pulse_stop_cb(self):
126
 
        pass
127
 
 
128
 
    def retry_cb(self, message):
129
 
        pass
130
 
 
131
 
    def update_free(self):
132
 
        if not self.current_source:
133
 
            return True
134
 
 
135
 
        for k in self.targets:
136
 
            status = self.targets[k]['status']
137
 
            changed = self._update_free(k)            
138
 
            if status == misc.CANNOT_USE:
139
 
                continue
140
 
            if changed and misc.callable(self.target_changed_cb):
141
 
                self.target_changed_cb(k)
142
 
        return True
143
 
 
144
 
    # Internal functions.
145
 
 
146
 
    def _update_free(self, k):
147
 
        # TODO evand 2009-08-28: Replace this mess with inotify watches.
148
 
        # Incorporate accounting for files we will delete.  Defer updates if
149
 
        # sufficient time has not passed since the last update.
150
 
        if not self.current_source:
151
 
            return False
152
 
        current_source = self.sources[self.current_source]
153
 
        changed = False
154
 
        target = self.targets[k]
155
 
 
156
 
        status = target['status']
157
 
        target['status'] = misc.CAN_USE
158
 
        if target['capacity'] < current_source['size']:
159
 
            target['status'] = misc.CANNOT_USE
160
 
        if status != target['status']:
161
 
            changed = True
162
 
        return changed
163
 
 
164
 
    def install(self, source, target, device=None,
165
 
                allow_system_internal=False):
166
 
        logging.debug('Starting install thread.')
167
 
        self.install_thread = usbcreator.install.install(
168
 
            source, target, device=device,
169
 
            allow_system_internal=allow_system_internal)
170
 
        # Connect signals.
171
 
        self.install_thread.success = self.success_cb
172
 
        self.install_thread.failure = self.failure_cb
173
 
        self.install_thread.progress = self.install_progress_cb
174
 
        self.install_thread.progress_message = self.install_progress_message_cb
175
 
        self.install_thread.progress_pulse = self.install_progress_pulse_cb
176
 
        self.install_thread.progress_pulse_stop = self.install_progress_pulse_stop_cb
177
 
        self.install_thread.retry = self.retry_cb
178
 
        self.install_thread.start()
179
 
 
180
 
    def cancel_install(self):
181
 
        logging.debug('cancel_install')
182
 
        if self.install_thread and self.install_thread.is_alive():
183
 
            # TODO evand 2009-07-24: Should set the timeout for join, and
184
 
            # continue to process in a loop until the thread finishes, calling
185
 
            # into the frontend for UI event processing then break.  The
186
 
            # frontend should notify the user that it's canceling by changing
187
 
            # the progress message to "Canceling the installation..." and
188
 
            # disabiling the cancel button.
189
 
            self.install_thread.join()