1
# -*- coding: utf-8 -*-
3
# ==============================================================================
4
# COPYRIGHT (C) 1991 - 2003 EDF R&D WWW.CODE-ASTER.ORG
5
# THIS PROGRAM IS FREE SOFTWARE; YOU CAN REDISTRIBUTE IT AND/OR MODIFY
6
# IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE AS PUBLISHED BY
7
# THE FREE SOFTWARE FOUNDATION; EITHER VERSION 2 OF THE LICENSE, OR
8
# (AT YOUR OPTION) ANY LATER VERSION.
10
# THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT
11
# WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF
12
# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE GNU
13
# GENERAL PUBLIC LICENSE FOR MORE DETAILS.
15
# YOU SHOULD HAVE RECEIVED A COPY OF THE GNU GENERAL PUBLIC LICENSE
16
# ALONG WITH THIS PROGRAM; IF NOT, WRITE TO EDF R&D CODE_ASTER,
17
# 1 AVENUE DU GENERAL DE GAULLE, 92141 CLAMART CEDEX, FRANCE.
18
# ==============================================================================
21
Allow to manage jobs on several hosts according to the available
22
cpu and memory resources.
28
from pprint import pprint, pformat
30
from asrun.i18n import _
31
from asrun.mystring import print3
32
from asrun.thread import Lock
40
return type(val) in (int, long, float)
44
NORESOURCE, ALLOCATED, OVERLIMIT = [object() for i in range(3)]
47
class ResourceManagerError(Exception):
51
class ResourceManager:
53
Class to manage resources to run a lot of calculations of several hosts.
57
def __init__(self, hostinfo):
61
self.hostinfo = hostinfo.copy()
62
self.all_hosts = self.hostinfo.keys()
64
self.d_crit = {# numkey, default, reverse order
66
'cpurun' : ('000_cpurun', 0, False, ),
67
'memrun' : ('010_memrun', 0, False, ),
68
# characteristics of hosts (constant)
69
'mem' : ('101_mem', 0, True, ),
70
'cpu' : ('102_cpu', 0, True, ),
71
'nomjob' : ('998_job', '', False, ),
72
'host' : ('999_host', '', False, ),
74
self.l_crit = [(v[0], v[2]) for v in self.d_crit.values()]
76
self.job_keys = [k for k, v in self.d_crit.items() if v[0] > '100_']
77
# build the list of all infos
80
# to store host availability
81
self.host_connection = {}
82
for host, info in self.hostinfo.items():
83
self.host_connection[host] = True
86
for crit, val in self.d_crit.items():
87
numkey, default, reverse = val
88
d[numkey] = info.get(crit, default)
89
if numkey > '100_' and numkey < '900_' \
91
self.limit.get(crit) is None or (reverse and self.limit[crit] < d[numkey]) \
92
or (not reverse and self.limit[crit] > d[numkey]) \
94
self.limit[crit] = d[numkey]
96
# to store job parameters
99
def get(self, host, key, default=None):
101
Return a current value.
104
if self.d_crit[key][0] > '100_':
105
res = self.hostinfo[host][key]
107
nkey = self.d_crit[key][0]
108
for info in self.infos:
109
if info['999_host'] == host:
114
def set(self, host, key, value):
119
if self.d_crit[key][0] > '100_':
120
raise ResourceManagerError, "can not be changed : '%s'" % key
122
nkey = self.d_crit[key][0]
123
for info in self.infos:
124
if info['999_host'] == host:
128
raise ResourceManagerError, "can't set '%s'" % key
130
def add(self, host, key, value):
132
Add 'value' to the current value.
134
current = self.get(host, key)
135
self.set(host, key, current + value)
137
def sub(self, host, key, value):
139
Substract 'value' to the current value.
141
current = self.get(host, key)
142
self.set(host, key, current - value)
144
def store_job(self, host='unknown', **kwjob):
146
Store 'kwjob' in history.
152
dico['allocated'] = time.strftime('%a %H:%M:%S')
153
self.history[kwjob['nomjob']] = dico
155
def get_job(self, jobname):
157
Get 'jobname' from history.
159
dico = self.history.get(jobname)
162
dico['released'] = time.strftime('%a %H:%M:%S')
165
def get_history(self):
167
Return a copy of the jobs's 'history'.
169
dico = self.history.copy()
172
def action(self, what, *args, **kwargs):
174
Run safely a method which access to infos attribute.
180
result = getattr(self, what)(*args, **kwargs)
181
except Exception, err:
182
tberr = traceback.format_exc()
185
raise ResourceManagerError, tberr
188
def get_first(self, values=None):
190
Return the most available host.
193
values = self.infos[:]
196
for crit, rev in self.l_crit:
197
values.sort(reverse=rev)
201
val0 = values[0][crit]
203
for info in values[1:]:
204
if info[crit] != val0:
211
print3('--- FIN ---')
215
def is_connected(self, host):
217
Tell if 'host' is connected.
219
return self.host_connection[host]
221
def get_all_connected_hosts(self):
223
Return all connected hosts.
225
return [h for h in self.all_hosts if self.is_connected(h)]
227
def suitable_host(self, **kwjob):
229
Limit infos to capable hosts.
232
for info in self.infos:
233
if not self.is_connected(info[self.d_crit['host'][0]]):
236
for par, val in kwjob.items():
237
if not self.isok(par, val, info):
244
def available_host(self, **kwjob):
246
Return the most available host accepting job parameters.
249
print3('job parameters : ', end=' ')
251
values = self.suitable_host(**kwjob)
253
print '%d suitable hosts : %s' \
254
% (len(values), [info[self.d_crit['host'][0]] for info in values])
255
avail = self.get_first(values)
258
def isok(self, key, value, info):
260
Tell if 'value' is under the limit vs 'info[par]'.
262
if not key in self.job_keys:
264
val_ref = info[self.d_crit[key][0]]
265
if not self.d_crit.has_key(key + 'run'):
267
val_run = info[self.d_crit[key + 'run'][0]]
268
ok = (value + val_run) <= val_ref
272
print3('%-2s host=%-24s para=%-4s allocated=%-4s requested=%-4s ref=%-4s' % \
273
(rep, info[self.d_crit['host'][0]], key, val_run, value, val_ref))
276
def CheckHosts(self, run):
278
Check connection to known hosts, update host_connection attribute.
281
for host in self.all_hosts:
283
iret, output = run.Shell('echo hello', mach=host)
284
if output.find('hello') < 0:
285
run.DBG(u"CheckHosts failed on %s" % host)
286
self.host_connection[host] = False
289
run.DBG(u"CheckHosts success on %s" % host)
290
self.host_connection[host] = True
294
def Request(self, **kwjob):
296
Ask for an available host and block resources.
298
info = self.action('available_host', **kwjob)
299
host = info.get(self.d_crit['host'][0], None)
301
print3('job allocated on %s : %s' % (host, pformat(kwjob)))
304
for key in self.job_keys:
305
if isnum(kwjob.get(key)):
306
self.action('add', host, key + 'run', kwjob[key])
309
for key, lim in self.limit.items():
310
if kwjob.get(key) and (\
311
(self.d_crit[key][2] and kwjob[key] > lim) or \
312
(not self.d_crit[key][2] and kwjob[key] < lim) ):
315
self.action('store_job', host, **kwjob)
320
def Free(self, jobname):
322
Free job resources on 'host'.
324
kwjob = self.action('get_job', jobname)
329
for key in self.job_keys:
330
if isnum(kwjob.get(key)):
331
self.action('sub', host, key + 'run', kwjob[key])
332
ddbg[key] = kwjob[key]
334
print3('job released : %s' % (pformat(ddbg)))
336
def repr_history(self):
338
Return jobs's history.
340
dico = self.action('get_history')
341
ljob = [(v['allocated'], k) for k,v in dico.items()]
343
head = '%s %s %s %s %s %s' % (_(u"job").center(14), _(u"host").center(16),
344
_(u"started").center(12), _(u"cpu").rjust(6),
345
_(u"memory").rjust(6), _(u"ended").rjust(12))
346
fmt = '%(job_)-14s %(host_)-16s %(allocated)12s %(cpu)6d %(mem)6d %(released)12s'
347
dini = { 'job_' : '', 'host_' : '', 'allocated' : '', 'cpu' : 0, 'mem' : 0, 'released' : '' }
349
for start, job in ljob:
352
d.update({ 'job_' : job[:14], 'host_' : d['host'][:16] })
354
return os.linesep.join(txt)
360
infos = self.infos[:]
361
lkey = ['cpu', 'mem']
362
head= ' host cpu mem'
363
fmt = '%(host_)-16s %(cpu)6d %(mem)6d'
366
d = { 'host_' : info[self.d_crit['host'][0]][:16] }
368
d[k] = info[self.d_crit[k + 'run'][0]]
370
return os.linesep.join(txt)
372
def GetConfig(self, host):
374
Return the configuration of 'host'.
376
return self.hostinfo.get(host, {}).copy()
378
def get_sum(self, key):
380
Return the sum of a resource.
382
return sum([self.get(host, key, 0) for host in self.get_all_connected_hosts()])
385
class PureLocalResource(ResourceManager):
387
Derived class to run only on local host.
389
def __init__(self, run):
390
"""Initialization using ASTER_RUN object."""
391
host = run.GetHostName().split('.')[0]
392
cpu = run.GetCpuInfo('numcpu')
393
mem = run.GetMemInfo('memtotal') or 99e3
394
c = { host : { 'host' : host, 'cpu' : cpu, 'mem' : mem }}
395
ResourceManager.__init__(self, hostinfo=c)
398
if __name__ == '__main__':
401
from asrun.run import AsRunFactory
402
run=AsRunFactory('/opt/aster/ASTK/ASTK_SERV')
403
rc = PureLocalResource(run)
405
from asrun.rcfile import parse_config
406
c = parse_config('../unittest/astout/hostfile')
407
#for v in c.values():
409
rc = ResourceManager(c)
412
print3('cpu =', rc.action('get', 'cli70cx', 'cpu'))
413
print3('mem =', rc.action('get', 'cli70cx', 'mem'))
414
print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
416
assert rc.isok('mem', 384, rc.infos[0]) is True
417
rc.action('add', 'cli70cx', 'memrun', 256)
418
print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
419
assert rc.isok('mem', 384, rc.infos[0]) is False
420
rc.action('sub', 'cli70cx', 'memrun', 256)
421
print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
423
host1, status = rc.Request(nomjob='job1', cpu=1, mem=256)
425
host, status = rc.Request(cpu=1, mem=256, time=44)
426
assert host is None and status == NORESOURCE
427
host, status = rc.Request(cpu=1, mem=2048, time=14)
428
assert host is None and status == OVERLIMIT
429
host2, status = rc.Request(nomjob='job2', cpu=1, mem=128)
434
host3, status = rc.Request(nomjob='job3', cpu=2, mem=64)
439
host4, status = rc.Request(nomjob='job4', cpu=1)
443
print3(rc.repr_history())