~ubuntu-branches/ubuntu/lucid/python2.6/lucid

« back to all changes in this revision

Viewing changes to Doc/includes/mp_distributing.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-03-11 13:30:19 UTC
  • mto: (10.1.13 sid)
  • mto: This revision was merged to the branch mainline in revision 44.
  • Revision ID: james.westby@ubuntu.com-20100311133019-sblbooa3uqrkoe70
Tags: upstream-2.6.5~rc2
ImportĀ upstreamĀ versionĀ 2.6.5~rc2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
# Module to allow spawning of processes on foreign host
3
 
#
4
 
# Depends on `multiprocessing` package -- tested with `processing-0.60`
5
 
#
6
 
# Copyright (c) 2006-2008, R Oudkerk
7
 
# All rights reserved.
8
 
#
9
 
 
10
 
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
11
 
 
12
 
#
13
 
# Imports
14
 
#
15
 
 
16
 
import sys
17
 
import os
18
 
import tarfile
19
 
import shutil
20
 
import subprocess
21
 
import logging
22
 
import itertools
23
 
import Queue
24
 
 
25
 
try:
26
 
    import cPickle as pickle
27
 
except ImportError:
28
 
    import pickle
29
 
 
30
 
from multiprocessing import Process, current_process, cpu_count
31
 
from multiprocessing import util, managers, connection, forking, pool
32
 
 
33
 
#
34
 
# Logging
35
 
#
36
 
 
37
 
def get_logger():
38
 
    return _logger
39
 
 
40
 
_logger = logging.getLogger('distributing')
41
 
_logger.propagate = 0
42
 
 
43
 
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
44
 
_handler = logging.StreamHandler()
45
 
_handler.setFormatter(_formatter)
46
 
_logger.addHandler(_handler)
47
 
 
48
 
info = _logger.info
49
 
debug = _logger.debug
50
 
 
51
 
#
52
 
# Get number of cpus
53
 
#
54
 
 
55
 
try:
56
 
    slot_count = cpu_count()
57
 
except NotImplemented:
58
 
    slot_count = 1
59
 
 
60
 
#
61
 
# Manager type which spawns subprocesses
62
 
#
63
 
 
64
 
class HostManager(managers.SyncManager):
65
 
    '''
66
 
    Manager type used for spawning processes on a (presumably) foreign host
67
 
    '''
68
 
    def __init__(self, address, authkey):
69
 
        managers.SyncManager.__init__(self, address, authkey)
70
 
        self._name = 'Host-unknown'
71
 
 
72
 
    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
73
 
        if hasattr(sys.modules['__main__'], '__file__'):
74
 
            main_path = os.path.basename(sys.modules['__main__'].__file__)
75
 
        else:
76
 
            main_path = None
77
 
        data = pickle.dumps((target, args, kwargs))
78
 
        p = self._RemoteProcess(data, main_path)
79
 
        if name is None:
80
 
            temp = self._name.split('Host-')[-1] + '/Process-%s'
81
 
            name = temp % ':'.join(map(str, p.get_identity()))
82
 
        p.set_name(name)
83
 
        return p
84
 
 
85
 
    @classmethod
86
 
    def from_address(cls, address, authkey):
87
 
        manager = cls(address, authkey)
88
 
        managers.transact(address, authkey, 'dummy')
89
 
        manager._state.value = managers.State.STARTED
90
 
        manager._name = 'Host-%s:%s' % manager.address
91
 
        manager.shutdown = util.Finalize(
92
 
            manager, HostManager._finalize_host,
93
 
            args=(manager._address, manager._authkey, manager._name),
94
 
            exitpriority=-10
95
 
            )
96
 
        return manager
97
 
 
98
 
    @staticmethod
99
 
    def _finalize_host(address, authkey, name):
100
 
        managers.transact(address, authkey, 'shutdown')
101
 
 
102
 
    def __repr__(self):
103
 
        return '<Host(%s)>' % self._name
104
 
 
105
 
#
106
 
# Process subclass representing a process on (possibly) a remote machine
107
 
#
108
 
 
109
 
class RemoteProcess(Process):
110
 
    '''
111
 
    Represents a process started on a remote host
112
 
    '''
113
 
    def __init__(self, data, main_path):
114
 
        assert not main_path or os.path.basename(main_path) == main_path
115
 
        Process.__init__(self)
116
 
        self._data = data
117
 
        self._main_path = main_path
118
 
 
119
 
    def _bootstrap(self):
120
 
        forking.prepare({'main_path': self._main_path})
121
 
        self._target, self._args, self._kwargs = pickle.loads(self._data)
122
 
        return Process._bootstrap(self)
123
 
 
124
 
    def get_identity(self):
125
 
        return self._identity
126
 
 
127
 
HostManager.register('_RemoteProcess', RemoteProcess)
128
 
 
129
 
#
130
 
# A Pool class that uses a cluster
131
 
#
132
 
 
133
 
class DistributedPool(pool.Pool):
134
 
 
135
 
    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
136
 
        self._cluster = cluster
137
 
        self.Process = cluster.Process
138
 
        pool.Pool.__init__(self, processes or len(cluster),
139
 
                           initializer, initargs)
140
 
 
141
 
    def _setup_queues(self):
142
 
        self._inqueue = self._cluster._SettableQueue()
143
 
        self._outqueue = self._cluster._SettableQueue()
144
 
        self._quick_put = self._inqueue.put
145
 
        self._quick_get = self._outqueue.get
146
 
 
147
 
    @staticmethod
148
 
    def _help_stuff_finish(inqueue, task_handler, size):
149
 
        inqueue.set_contents([None] * size)
150
 
 
151
 
#
152
 
# Manager type which starts host managers on other machines
153
 
#
154
 
 
155
 
def LocalProcess(**kwds):
156
 
    p = Process(**kwds)
157
 
    p.set_name('localhost/' + p.name)
158
 
    return p
159
 
 
160
 
class Cluster(managers.SyncManager):
161
 
    '''
162
 
    Represents collection of slots running on various hosts.
163
 
 
164
 
    `Cluster` is a subclass of `SyncManager` so it allows creation of
165
 
    various types of shared objects.
166
 
    '''
167
 
    def __init__(self, hostlist, modules):
168
 
        managers.SyncManager.__init__(self, address=('localhost', 0))
169
 
        self._hostlist = hostlist
170
 
        self._modules = modules
171
 
        if __name__ not in modules:
172
 
            modules.append(__name__)
173
 
        files = [sys.modules[name].__file__ for name in modules]
174
 
        for i, file in enumerate(files):
175
 
            if file.endswith('.pyc') or file.endswith('.pyo'):
176
 
                files[i] = file[:-4] + '.py'
177
 
        self._files = [os.path.abspath(file) for file in files]
178
 
 
179
 
    def start(self):
180
 
        managers.SyncManager.start(self)
181
 
 
182
 
        l = connection.Listener(family='AF_INET', authkey=self._authkey)
183
 
 
184
 
        for i, host in enumerate(self._hostlist):
185
 
            host._start_manager(i, self._authkey, l.address, self._files)
186
 
 
187
 
        for host in self._hostlist:
188
 
            if host.hostname != 'localhost':
189
 
                conn = l.accept()
190
 
                i, address, cpus = conn.recv()
191
 
                conn.close()
192
 
                other_host = self._hostlist[i]
193
 
                other_host.manager = HostManager.from_address(address,
194
 
                                                              self._authkey)
195
 
                other_host.slots = other_host.slots or cpus
196
 
                other_host.Process = other_host.manager.Process
197
 
            else:
198
 
                host.slots = host.slots or slot_count
199
 
                host.Process = LocalProcess
200
 
 
201
 
        self._slotlist = [
202
 
            Slot(host) for host in self._hostlist for i in range(host.slots)
203
 
            ]
204
 
        self._slot_iterator = itertools.cycle(self._slotlist)
205
 
        self._base_shutdown = self.shutdown
206
 
        del self.shutdown
207
 
 
208
 
    def shutdown(self):
209
 
        for host in self._hostlist:
210
 
            if host.hostname != 'localhost':
211
 
                host.manager.shutdown()
212
 
        self._base_shutdown()
213
 
 
214
 
    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
215
 
        slot = self._slot_iterator.next()
216
 
        return slot.Process(
217
 
            group=group, target=target, name=name, args=args, kwargs=kwargs
218
 
            )
219
 
 
220
 
    def Pool(self, processes=None, initializer=None, initargs=()):
221
 
        return DistributedPool(self, processes, initializer, initargs)
222
 
 
223
 
    def __getitem__(self, i):
224
 
        return self._slotlist[i]
225
 
 
226
 
    def __len__(self):
227
 
        return len(self._slotlist)
228
 
 
229
 
    def __iter__(self):
230
 
        return iter(self._slotlist)
231
 
 
232
 
#
233
 
# Queue subclass used by distributed pool
234
 
#
235
 
 
236
 
class SettableQueue(Queue.Queue):
237
 
    def empty(self):
238
 
        return not self.queue
239
 
    def full(self):
240
 
        return self.maxsize > 0 and len(self.queue) == self.maxsize
241
 
    def set_contents(self, contents):
242
 
        # length of contents must be at least as large as the number of
243
 
        # threads which have potentially called get()
244
 
        self.not_empty.acquire()
245
 
        try:
246
 
            self.queue.clear()
247
 
            self.queue.extend(contents)
248
 
            self.not_empty.notifyAll()
249
 
        finally:
250
 
            self.not_empty.release()
251
 
 
252
 
Cluster.register('_SettableQueue', SettableQueue)
253
 
 
254
 
#
255
 
# Class representing a notional cpu in the cluster
256
 
#
257
 
 
258
 
class Slot(object):
259
 
    def __init__(self, host):
260
 
        self.host = host
261
 
        self.Process = host.Process
262
 
 
263
 
#
264
 
# Host
265
 
#
266
 
 
267
 
class Host(object):
268
 
    '''
269
 
    Represents a host to use as a node in a cluster.
270
 
 
271
 
    `hostname` gives the name of the host.  If hostname is not
272
 
    "localhost" then ssh is used to log in to the host.  To log in as
273
 
    a different user use a host name of the form
274
 
    "username@somewhere.org"
275
 
 
276
 
    `slots` is used to specify the number of slots for processes on
277
 
    the host.  This affects how often processes will be allocated to
278
 
    this host.  Normally this should be equal to the number of cpus on
279
 
    that host.
280
 
    '''
281
 
    def __init__(self, hostname, slots=None):
282
 
        self.hostname = hostname
283
 
        self.slots = slots
284
 
 
285
 
    def _start_manager(self, index, authkey, address, files):
286
 
        if self.hostname != 'localhost':
287
 
            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
288
 
            debug('startup files copied to %s:%s', self.hostname, tempdir)
289
 
            p = subprocess.Popen(
290
 
                ['ssh', self.hostname, 'python', '-c',
291
 
                 '"import os; os.chdir(%r); '
292
 
                 'from distributing import main; main()"' % tempdir],
293
 
                stdin=subprocess.PIPE
294
 
                )
295
 
            data = dict(
296
 
                name='BoostrappingHost', index=index,
297
 
                dist_log_level=_logger.getEffectiveLevel(),
298
 
                dir=tempdir, authkey=str(authkey), parent_address=address
299
 
                )
300
 
            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
301
 
            p.stdin.close()
302
 
 
303
 
#
304
 
# Copy files to remote directory, returning name of directory
305
 
#
306
 
 
307
 
unzip_code = '''"
308
 
import tempfile, os, sys, tarfile
309
 
tempdir = tempfile.mkdtemp(prefix='distrib-')
310
 
os.chdir(tempdir)
311
 
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
312
 
for ti in tf:
313
 
    tf.extract(ti)
314
 
print tempdir
315
 
"'''
316
 
 
317
 
def copy_to_remote_temporary_directory(host, files):
318
 
    p = subprocess.Popen(
319
 
        ['ssh', host, 'python', '-c', unzip_code],
320
 
        stdout=subprocess.PIPE, stdin=subprocess.PIPE
321
 
        )
322
 
    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
323
 
    for name in files:
324
 
        tf.add(name, os.path.basename(name))
325
 
    tf.close()
326
 
    p.stdin.close()
327
 
    return p.stdout.read().rstrip()
328
 
 
329
 
#
330
 
# Code which runs a host manager
331
 
#
332
 
 
333
 
def main():
334
 
    # get data from parent over stdin
335
 
    data = pickle.load(sys.stdin)
336
 
    sys.stdin.close()
337
 
 
338
 
    # set some stuff
339
 
    _logger.setLevel(data['dist_log_level'])
340
 
    forking.prepare(data)
341
 
 
342
 
    # create server for a `HostManager` object
343
 
    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
344
 
    current_process()._server = server
345
 
 
346
 
    # report server address and number of cpus back to parent
347
 
    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
348
 
    conn.send((data['index'], server.address, slot_count))
349
 
    conn.close()
350
 
 
351
 
    # set name etc
352
 
    current_process().set_name('Host-%s:%s' % server.address)
353
 
    util._run_after_forkers()
354
 
 
355
 
    # register a cleanup function
356
 
    def cleanup(directory):
357
 
        debug('removing directory %s', directory)
358
 
        shutil.rmtree(directory)
359
 
        debug('shutting down host manager')
360
 
    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
361
 
 
362
 
    # start host manager
363
 
    debug('remote host manager starting in %s', data['dir'])
364
 
    server.serve_forever()