8
from urlparse import urlsplit
10
9
from multiprocessing import (
15
from lpresults.services.application import (
19
from lpresults.services.functor import Functor
14
from lpresults.scripts.application import Application
15
from lpresults.scripts.functor import Functor
22
18
class MultiApplication(Application):
24
20
# Application defaults
27
def __init__(self, concurrency=None, **kwargs):
28
super(MultiApplication, self).__init__(**kwargs)
30
if concurrency is not None:
31
self.concurrency = concurrency
21
default_concurrency = 10
23
def addOptions(self, parser):
24
"""See Application."""
25
super(MultiApplication, self).addOptions(parser)
27
parser.add_option("-c", "--concurrency",
28
default=self.default_concurrency,
31
help="Number of processes to run at a time,"
32
" defaults to %default")
34
def parseOptions(self, options, args):
35
"""See Application."""
36
super(MultiApplication, self).parseOptions(options, args)
38
self.concurrency = options.concurrency
33
39
self.input_queue = Queue()
34
40
self.output_queue = Queue()
36
def getLaunchpad(self, *args, **kwargs):
37
"""See Application."""
38
# Cache is broken with multiple processes, so each child process
39
# needs to be given a different cache directory but the same
40
# credentials can be shared safely.
42
# See: https://bugs.launchpad.net/launchpadlib/+bug/459418
43
index = kwargs.pop("index", None)
45
launchpad_kwargs = self.launchpad_factory._kwargs
46
parent_launchpadlib_dir = launchpad_kwargs["launchpadlib_dir"]
47
child_launchpadlib_dir = path.join(
48
parent_launchpadlib_dir, str(index))
49
kwargs.setdefault("launchpadlib_dir", child_launchpadlib_dir)
51
# Copy the parent cache directory to the child
52
host_name = urlsplit(launchpad_kwargs["service_root"])[1]
53
child_cache_dir = path.join(
54
child_launchpadlib_dir, host_name, "cache")
55
if not path.exists(child_cache_dir):
56
parent_cache_dir = path.join(
57
parent_launchpadlib_dir, host_name, "cache")
58
shutil.copytree(parent_cache_dir, child_cache_dir)
60
return super(MultiApplication, self).getLaunchpad(*args, **kwargs)
62
42
def processInput(self):
97
76
for process in processes:
98
77
process.terminate()
102
class MultiApplicationRunner(ApplicationRunner):
105
application_factory = MultiApplication
108
default_concurrency = application_factory.concurrency
110
def getParser(self, **kwargs):
111
"""See ApplicationRunner."""
112
parser = super(MultiApplicationRunner, self).getParser(**kwargs)
114
parser.add_option("-c", "--concurrency",
115
default=self.default_concurrency,
118
help="Number of processes to run at a time,"
119
" defaults to %default")