2
# Module to allow spawning of processes on foreign host
4
# Depends on `multiprocessing` package -- tested with `processing-0.60`
6
# Copyright (c) 2006-2008, R Oudkerk
10
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
26
import cPickle as pickle
30
from multiprocessing import Process, current_process, cpu_count
31
from multiprocessing import util, managers, connection, forking, pool
40
_logger = logging.getLogger('distributing')
43
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
44
_handler = logging.StreamHandler()
45
_handler.setFormatter(_formatter)
46
_logger.addHandler(_handler)
56
slot_count = cpu_count()
57
except NotImplemented:
61
# Manager type which spawns subprocesses
64
class HostManager(managers.SyncManager):
66
Manager type used for spawning processes on a (presumably) foreign host
68
def __init__(self, address, authkey):
69
managers.SyncManager.__init__(self, address, authkey)
70
self._name = 'Host-unknown'
72
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
73
if hasattr(sys.modules['__main__'], '__file__'):
74
main_path = os.path.basename(sys.modules['__main__'].__file__)
77
data = pickle.dumps((target, args, kwargs))
78
p = self._RemoteProcess(data, main_path)
80
temp = self._name.split('Host-')[-1] + '/Process-%s'
81
name = temp % ':'.join(map(str, p.get_identity()))
86
def from_address(cls, address, authkey):
87
manager = cls(address, authkey)
88
managers.transact(address, authkey, 'dummy')
89
manager._state.value = managers.State.STARTED
90
manager._name = 'Host-%s:%s' % manager.address
91
manager.shutdown = util.Finalize(
92
manager, HostManager._finalize_host,
93
args=(manager._address, manager._authkey, manager._name),
99
def _finalize_host(address, authkey, name):
100
managers.transact(address, authkey, 'shutdown')
103
return '<Host(%s)>' % self._name
106
# Process subclass representing a process on (possibly) a remote machine
109
class RemoteProcess(Process):
111
Represents a process started on a remote host
113
def __init__(self, data, main_path):
114
assert not main_path or os.path.basename(main_path) == main_path
115
Process.__init__(self)
117
self._main_path = main_path
119
def _bootstrap(self):
120
forking.prepare({'main_path': self._main_path})
121
self._target, self._args, self._kwargs = pickle.loads(self._data)
122
return Process._bootstrap(self)
124
def get_identity(self):
125
return self._identity
127
HostManager.register('_RemoteProcess', RemoteProcess)
130
# A Pool class that uses a cluster
133
class DistributedPool(pool.Pool):
135
def __init__(self, cluster, processes=None, initializer=None, initargs=()):
136
self._cluster = cluster
137
self.Process = cluster.Process
138
pool.Pool.__init__(self, processes or len(cluster),
139
initializer, initargs)
141
def _setup_queues(self):
142
self._inqueue = self._cluster._SettableQueue()
143
self._outqueue = self._cluster._SettableQueue()
144
self._quick_put = self._inqueue.put
145
self._quick_get = self._outqueue.get
148
def _help_stuff_finish(inqueue, task_handler, size):
149
inqueue.set_contents([None] * size)
152
# Manager type which starts host managers on other machines
155
def LocalProcess(**kwds):
157
p.set_name('localhost/' + p.name)
160
class Cluster(managers.SyncManager):
162
Represents collection of slots running on various hosts.
164
`Cluster` is a subclass of `SyncManager` so it allows creation of
165
various types of shared objects.
167
def __init__(self, hostlist, modules):
168
managers.SyncManager.__init__(self, address=('localhost', 0))
169
self._hostlist = hostlist
170
self._modules = modules
171
if __name__ not in modules:
172
modules.append(__name__)
173
files = [sys.modules[name].__file__ for name in modules]
174
for i, file in enumerate(files):
175
if file.endswith('.pyc') or file.endswith('.pyo'):
176
files[i] = file[:-4] + '.py'
177
self._files = [os.path.abspath(file) for file in files]
180
managers.SyncManager.start(self)
182
l = connection.Listener(family='AF_INET', authkey=self._authkey)
184
for i, host in enumerate(self._hostlist):
185
host._start_manager(i, self._authkey, l.address, self._files)
187
for host in self._hostlist:
188
if host.hostname != 'localhost':
190
i, address, cpus = conn.recv()
192
other_host = self._hostlist[i]
193
other_host.manager = HostManager.from_address(address,
195
other_host.slots = other_host.slots or cpus
196
other_host.Process = other_host.manager.Process
198
host.slots = host.slots or slot_count
199
host.Process = LocalProcess
202
Slot(host) for host in self._hostlist for i in range(host.slots)
204
self._slot_iterator = itertools.cycle(self._slotlist)
205
self._base_shutdown = self.shutdown
209
for host in self._hostlist:
210
if host.hostname != 'localhost':
211
host.manager.shutdown()
212
self._base_shutdown()
214
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
215
slot = self._slot_iterator.next()
217
group=group, target=target, name=name, args=args, kwargs=kwargs
220
def Pool(self, processes=None, initializer=None, initargs=()):
221
return DistributedPool(self, processes, initializer, initargs)
223
def __getitem__(self, i):
224
return self._slotlist[i]
227
return len(self._slotlist)
230
return iter(self._slotlist)
233
# Queue subclass used by distributed pool
236
class SettableQueue(Queue.Queue):
238
return not self.queue
240
return self.maxsize > 0 and len(self.queue) == self.maxsize
241
def set_contents(self, contents):
242
# length of contents must be at least as large as the number of
243
# threads which have potentially called get()
244
self.not_empty.acquire()
247
self.queue.extend(contents)
248
self.not_empty.notifyAll()
250
self.not_empty.release()
252
Cluster.register('_SettableQueue', SettableQueue)
255
# Class representing a notional cpu in the cluster
259
def __init__(self, host):
261
self.Process = host.Process
269
Represents a host to use as a node in a cluster.
271
`hostname` gives the name of the host. If hostname is not
272
"localhost" then ssh is used to log in to the host. To log in as
273
a different user use a host name of the form
274
"username@somewhere.org"
276
`slots` is used to specify the number of slots for processes on
277
the host. This affects how often processes will be allocated to
278
this host. Normally this should be equal to the number of cpus on
281
def __init__(self, hostname, slots=None):
282
self.hostname = hostname
285
def _start_manager(self, index, authkey, address, files):
286
if self.hostname != 'localhost':
287
tempdir = copy_to_remote_temporary_directory(self.hostname, files)
288
debug('startup files copied to %s:%s', self.hostname, tempdir)
289
p = subprocess.Popen(
290
['ssh', self.hostname, 'python', '-c',
291
'"import os; os.chdir(%r); '
292
'from distributing import main; main()"' % tempdir],
293
stdin=subprocess.PIPE
296
name='BoostrappingHost', index=index,
297
dist_log_level=_logger.getEffectiveLevel(),
298
dir=tempdir, authkey=str(authkey), parent_address=address
300
pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
304
# Copy files to remote directory, returning name of directory
308
import tempfile, os, sys, tarfile
309
tempdir = tempfile.mkdtemp(prefix='distrib-')
311
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
317
def copy_to_remote_temporary_directory(host, files):
318
p = subprocess.Popen(
319
['ssh', host, 'python', '-c', unzip_code],
320
stdout=subprocess.PIPE, stdin=subprocess.PIPE
322
tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
324
tf.add(name, os.path.basename(name))
327
return p.stdout.read().rstrip()
330
# Code which runs a host manager
334
# get data from parent over stdin
335
data = pickle.load(sys.stdin)
339
_logger.setLevel(data['dist_log_level'])
340
forking.prepare(data)
342
# create server for a `HostManager` object
343
server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
344
current_process()._server = server
346
# report server address and number of cpus back to parent
347
conn = connection.Client(data['parent_address'], authkey=data['authkey'])
348
conn.send((data['index'], server.address, slot_count))
352
current_process().set_name('Host-%s:%s' % server.address)
353
util._run_after_forkers()
355
# register a cleanup function
356
def cleanup(directory):
357
debug('removing directory %s', directory)
358
shutil.rmtree(directory)
359
debug('shutting down host manager')
360
util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
363
debug('remote host manager starting in %s', data['dir'])
364
server.serve_forever()