~vila/udd/717204-stop-too-fast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/python

import codecs
import logging
import os
import random
import signal
from stat import ST_DEV, ST_INO
import subprocess
import sys
import threading
import time

from launchpadlib.errors import HTTPError

sys.path.insert(0, os.path.dirname(__file__))
import icommon

#import httplib2
#httplib2.debuglevel = 1


class WatchedFileHandler(logging.FileHandler):
    """
    A handler for logging to a file, which watches the file
    to see if it has changed while in use. This can happen because of
    usage of programs such as newsyslog and logrotate which perform
    log file rotation. This handler, intended for use under Unix,
    watches the file to see if it has changed since the last emit.
    (A file has changed if its device or inode have changed.)
    If it has changed, the old file stream is closed, and the file
    opened to get a new stream.

    This handler is not appropriate for use under Windows, because
    under Windows open files cannot be moved or renamed - logging
    opens the files with exclusive locks - and so there is no need
    for such a handler. Furthermore, ST_INO is not supported under
    Windows; stat always returns zero for this value.

    This handler is based on a suggestion and patch by Chad J.
    Schroeder.
    """
    def __init__(self, filename, mode='a', encoding=None):
        logging.FileHandler.__init__(self, filename, mode, encoding)
        if not os.path.exists(self.baseFilename):
            self.dev, self.ino = -1, -1
        else:
            stat = os.stat(self.baseFilename)
            self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
        self.encoding = encoding

    def _open(self):
        """
        Open the current base file with the (original) mode and encoding.
        Return the resulting stream.
        """
        if self.encoding is None:
            stream = open(self.baseFilename, self.mode)
        else:
            stream = codecs.open(self.baseFilename, self.mode, self.encoding)
        return stream


    def emit(self, record):
        """
        Emit a record.

        First check if the underlying file has changed, and if it
        has, close the old stream and reopen the file to get the
        current stream.
        """
        if not os.path.exists(self.baseFilename):
            stat = None
            changed = 1
        else:
            stat = os.stat(self.baseFilename)
            changed = (stat[ST_DEV] != self.dev) or (stat[ST_INO] != self.ino)
        if changed and self.stream is not None:
            self.stream.close()
            self.stream = self._open()
            if stat is None:
                stat = os.stat(self.baseFilename)
            self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
        logging.FileHandler.emit(self, record)


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

debug_handler = WatchedFileHandler(icommon.debug_log_file)
progress_handler = WatchedFileHandler(icommon.progress_log_file)

debug_handler.setLevel(logging.DEBUG)
progress_handler.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

debug_handler.setFormatter(formatter)
progress_handler.setFormatter(formatter)

logger.addHandler(debug_handler)
logger.addHandler(progress_handler)

logger.info("Starting up")


logger.debug("Getting Launchpad object")
lp = icommon.get_lp()
debian = lp.distributions['debian']
ubuntu = lp.distributions['ubuntu']
d_archive = debian.main_archive
u_archive = ubuntu.main_archive

lp_lock = threading.Lock()


def subprocess_setup():
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)


def pool_base(name):
    if name.startswith("lib"):
        return name[:4]
    return name[0]


class ThreadedImporter(threading.Thread):

    def __init__(self, package, job_id):
        super(ThreadedImporter, self).__init__()
        self.package = package
        self.job_id = job_id
        self.success = None
        self.output = None
        self.should_stop = threading.Event()

    def run(self):
        success = False
        output = icommon.running_sentinel
        logger.info("Trying %s" % self.package)
        proc = subprocess.Popen("import_package.py %s" % self.package,
            shell=True, stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
            preexec_fn=subprocess_setup)
        proc.stdin.close()
        while proc.poll() is None:
            if self.should_stop.isSet():
                os.kill(proc.pid, signal.SIGTERM)
            time.sleep(1)
        output = proc.stdout.read()
        if proc.returncode == icommon.no_lock_returncode:
            logger.info("Couldn't lock %s, skipping" % self.package)
            return
        self.success = (proc.returncode == 0)
        self.output = output



class Stop(Exception):
    pass


class AllQueue(object):
    """A Queue that always returns a package, even if we are not sure that
       it is needed"""

    def __init__(self):
        self.tried = set()
        self.packages_db = icommon.PackageDatabase(
                icommon.sqlite_package_file)
        self.status_db = icommon.StatusDatabase(icommon.sqlite_file)

    def next_job(self):
        while True:
            try:
                to_try = self.packages_db.get_one(self.tried)
            except StopIteration:
                return (None, None)
            if to_try is not None:
                self.tried.add(to_try)
                job_id = self.status_db.start_package(to_try)
                if job_id is not None:
                    return (job_id, to_try)
            else:
                self.tried = set()


class ImportDriver(threading.Thread):

    MAX_THREADS = 6

    def __init__(self):
        super(ImportDriver, self).__init__()
        self.threads = []
        self.stop_requested = threading.Event()
        self.stop_now = threading.Event()
        self.stopped = threading.Event()
        # Avoid a race when starting, we clear this when we actually
        # start
        self.stopped.set()

    def request_stop(self):
        self.stop_requested.set()

    def force_stop(self):
        self.stop_now.set()

    def should_stop(self):
        return self.stop_requested.isSet() or self.must_stop()

    def must_stop(self):
        return self.stop_now.isSet()

    def get_next(self):
        job_id, package = self.status_db.next_job()
        if package is None:
            (job_id, package) = self.queue.next_job()
        return (job_id, package)

    def sleep(self, max):
        for i in range(max):
            if self.should_stop():
                return True
            time.sleep(1)
        return self.should_stop()

    def deep_sleep(self, max):
        for i in range(max):
            if self.must_stop():
                return True
            time.sleep(1)
        return self.must_stop()

    def get_max_threads(self):
        if os.path.exists(icommon.max_threads_file):
            f = open(icommon.max_threads_file)
            try:
                return int(f.read().splitlines()[0].strip())
            except Exception, e:
                logger.warning("Error reading max threads file: %s", str(e))
                return self.MAX_THREADS
            finally:
                f.close()
        else:
            return self.MAX_THREADS

    def package_finished(self, package, job_id, success, output):
        if success:
            logger.info("Success %s: %s" % (package,
                        output.decode("utf-8", "replace").encode("ascii",
                            "replace").replace("\n", " ")))
        else:
            logger.warning("Importing %s failed:\n%s"
                    % (package, output.decode("utf-8", "replace").encode("ascii",
                            "replace")))
        self.status_db.finish_job(package, job_id, success,
                output.decode("utf-8", "replace").encode("utf-8", "replace"))

    def _wait_until_threads_reaches(self, target):
        i = 0
        while len(self.threads) > target:
            if i % 6 == 0:
                logger.info("threads for %s still active"
                        % str([t.package for t in self.threads]))
            removed = self._retire_finished_threads()
            if not removed:
                self.sleep(10)
            i += 1


    def _retire_finished_threads(self):
        removed = False
        for thread in self.threads[:]:
            if not thread.isAlive():
                removed = True
                logger.info("thread for %s finished" % thread.package)
                if thread.success is not None:
                    self.package_finished(thread.package, thread.job_id,
                            thread.success, thread.output)
                self.threads.remove(thread)
            elif self.must_stop():
                thread.should_stop.set()
        return removed


    def run(self):
        self.stopped.clear()
        try:
            self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
            self.queue = AllQueue()
            self.threads = []
            asked_to_stop = False
            while not asked_to_stop:
                self._retire_finished_threads()
                max_threads = self.get_max_threads()
                job_id, next = self.get_next()
                if next is None:
                    logger.debug("No package in queue, sleeping")
                    asked_to_stop = self.sleep(10)
                    continue
                if self.should_stop():
                    asked_to_stop = True
                    continue
                logger.debug("Starting thread for %s" % next)
                new_thread = ThreadedImporter(next, job_id)
                new_thread.start()
                self.threads.append(new_thread)
                self._wait_until_threads_reaches(max_threads-1)
                if self.should_stop():
                    asked_to_stop = True
            # We've been asked to stop
            logger.info("Driver asked to stop")
            self._wait_until_threads_reaches(0)
        except Exception, e:
            logger.critical("Driver hit %s" % str(e))
        finally:
            logger.info("Driver stopping")
            self.stopped.set()


class ImportController(object):

    def __init__(self):
        self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
        self.stop_requested = False
        self.driver = ImportDriver()
        self.stopped = threading.Event()
        self.stopped.set()

    def request_stop(self):
        self.stop_requested = True

    def should_graceful_stop(self):
        if os.path.exists(icommon.stop_file):
            return True
        return self.should_stop()

    def should_stop(self):
        return self.stop_requested

    def check_stop(self):
        if self.should_stop():
            self.driver.stop_now.set()
            raise Stop
        if self.should_graceful_stop():
            self.driver.stop_requested.set()
            raise Stop

    def sleep(self, max):
        for i in range(max):
            self.check_stop()
            time.sleep(1)
        self.check_stop()

    def add_jobs_for_interrupted(self):
        self.status_db.add_jobs_for_interrupted()

    def run(self):
        lock = icommon.lock_main()
        if lock is None:
            logger.info("Another main process is running")
            raise Stop
        try:
            self.add_jobs_for_interrupted()
            self.stopped.clear()
            try:
                self.driver.start()
                while True:
                    if self.driver.stopped.isSet():
                        self.stopped.set()
                        break
                    self.sleep(10)
            finally:
                self.driver.stop_requested.set()
                logger.info("Waiting for driver to finish")
                self.driver.stopped.wait()
                self.stopped.set()
                logger.info("Finished")
        finally:
            lock.close()


controller = ImportController()


def die(signum, frame):
    logger.info("Received signal")
    signal.signal(signum, signal.SIG_DFL)

    controller.driver.stop_now.set()
    logger.info("Waiting for driver to finish")
    controller.driver.stopped.wait()
    logger.info("Driver finished: stopping")
    sys.exit(1)


signal.signal(signal.SIGINT, die)
signal.signal(signal.SIGTERM, die)

try:
    controller.run()
except HTTPError, e:
    logger.critical(e.content)
    raise
except Exception, e:
    logger.critical(str(e))
    raise