~launchpad-results/launchpad-results/trunk

« back to all changes in this revision

Viewing changes to lib/lpresults/scripts/multiapplication.py

  • Committer: Marc Tardif
  • Date: 2011-09-14 01:25:40 UTC
  • Revision ID: marc.tardif@canonical.com-20110914012540-1gs255vhv6kb0mg4
Added updating of submissions periodically.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 
3
3
__metaclass__ = type
4
4
 
5
 
import shutil
6
 
 
7
 
from os import path
8
 
from urlparse import urlsplit
 
5
__all__ = [
 
6
    "MultiApplication",
 
7
    ]
9
8
 
10
9
from multiprocessing import (
11
10
    Process,
12
11
    Queue,
13
12
    )
14
13
 
15
 
from lpresults.services.application import (
16
 
    Application,
17
 
    ApplicationRunner,
18
 
    )
19
 
from lpresults.services.functor import Functor
 
14
from lpresults.scripts.application import Application
 
15
from lpresults.scripts.functor import Functor
20
16
 
21
17
 
22
18
class MultiApplication(Application):
23
19
 
24
20
    # Application defaults
25
 
    concurrency = 10
26
 
 
27
 
    def __init__(self, concurrency=None, **kwargs):
28
 
        super(MultiApplication, self).__init__(**kwargs)
29
 
 
30
 
        if concurrency is not None:
31
 
            self.concurrency = concurrency
32
 
 
 
21
    default_concurrency = 10
 
22
 
 
23
    def addOptions(self, parser):
 
24
        """See Application."""
 
25
        super(MultiApplication, self).addOptions(parser)
 
26
 
 
27
        parser.add_option("-c", "--concurrency",
 
28
            default=self.default_concurrency,
 
29
            metavar="NUM",
 
30
            type="int",
 
31
            help="Number of processes to run at a time,"
 
32
                " defaults to %default")
 
33
 
 
34
    def parseOptions(self, options, args):
 
35
        """See Application."""
 
36
        super(MultiApplication, self).parseOptions(options, args)
 
37
 
 
38
        self.concurrency = options.concurrency
33
39
        self.input_queue = Queue()
34
40
        self.output_queue = Queue()
35
41
 
36
 
    def getLaunchpad(self, *args, **kwargs):
37
 
        """See Application."""
38
 
        # Cache is broken with multiple processes, so each child process
39
 
        # needs to be given a different cache directory but the same
40
 
        # credentials can be shared safely.
41
 
        #
42
 
        # See: https://bugs.launchpad.net/launchpadlib/+bug/459418
43
 
        index = kwargs.pop("index", None)
44
 
        if index is not None:
45
 
            launchpad_kwargs = self.launchpad_factory._kwargs
46
 
            parent_launchpadlib_dir = launchpad_kwargs["launchpadlib_dir"]
47
 
            child_launchpadlib_dir = path.join(
48
 
                parent_launchpadlib_dir, str(index))
49
 
            kwargs.setdefault("launchpadlib_dir", child_launchpadlib_dir)
50
 
 
51
 
            # Copy the parent cache directory to the child
52
 
            host_name = urlsplit(launchpad_kwargs["service_root"])[1]
53
 
            child_cache_dir = path.join(
54
 
                child_launchpadlib_dir, host_name, "cache")
55
 
            if not path.exists(child_cache_dir):
56
 
                parent_cache_dir = path.join(
57
 
                    parent_launchpadlib_dir, host_name, "cache")
58
 
                shutil.copytree(parent_cache_dir, child_cache_dir)
59
 
 
60
 
        return super(MultiApplication, self).getLaunchpad(*args, **kwargs)
61
 
 
62
42
    def processInput(self):
63
43
        pass
64
44
 
66
46
        pass
67
47
 
68
48
    def processMessages(self, index):
 
49
        """Process each message here. Must be defined."""
69
50
        raise NotImplementedError()
70
51
 
71
 
    def start(self):
72
 
        # Establish an early Launchpad connection to get the credentials
73
 
        self.getLaunchpad()
74
 
 
 
52
    def process(self):
 
53
        """See Application."""
75
54
        if not self.concurrency:
76
55
            self.processInput()
77
56
            self.processMessages(None)
97
76
                for process in processes:
98
77
                    process.terminate()
99
78
                    process.join()
100
 
 
101
 
 
102
 
class MultiApplicationRunner(ApplicationRunner):
103
 
 
104
 
    # Runner defaults
105
 
    application_factory = MultiApplication
106
 
 
107
 
    # Multi defaults
108
 
    default_concurrency = application_factory.concurrency
109
 
 
110
 
    def getParser(self, **kwargs):
111
 
        """See ApplicationRunner."""
112
 
        parser = super(MultiApplicationRunner, self).getParser(**kwargs)
113
 
 
114
 
        parser.add_option("-c", "--concurrency",
115
 
            default=self.default_concurrency,
116
 
            metavar="NUM",
117
 
            type="int",
118
 
            help="Number of processes to run at a time,"
119
 
                " defaults to %default")
120
 
 
121
 
        return parser