~ubuntu-branches/ubuntu/quantal/astk/quantal

« back to all changes in this revision

Viewing changes to ASTK_SERV/asrun/repart.py

  • Committer: Bazaar Package Importer
  • Author(s): Christophe Trophime
  • Date: 2010-04-25 16:43:13 UTC
  • Revision ID: james.westby@ubuntu.com-20100425164313-0s0wtsmbiewbdz53
Tags: upstream-1.8.0
ImportĀ upstreamĀ versionĀ 1.8.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
 
 
3
# ==============================================================================
 
4
# COPYRIGHT (C) 1991 - 2003  EDF R&D                  WWW.CODE-ASTER.ORG
 
5
# THIS PROGRAM IS FREE SOFTWARE; YOU CAN REDISTRIBUTE IT AND/OR MODIFY
 
6
# IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE AS PUBLISHED BY
 
7
# THE FREE SOFTWARE FOUNDATION; EITHER VERSION 2 OF THE LICENSE, OR
 
8
# (AT YOUR OPTION) ANY LATER VERSION.
 
9
#
 
10
# THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT
 
11
# WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF
 
12
# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE GNU
 
13
# GENERAL PUBLIC LICENSE FOR MORE DETAILS.
 
14
#
 
15
# YOU SHOULD HAVE RECEIVED A COPY OF THE GNU GENERAL PUBLIC LICENSE
 
16
# ALONG WITH THIS PROGRAM; IF NOT, WRITE TO EDF R&D CODE_ASTER,
 
17
#    1 AVENUE DU GENERAL DE GAULLE, 92141 CLAMART CEDEX, FRANCE.
 
18
# ==============================================================================
 
19
 
 
20
"""
 
21
Allow to manage jobs on several hosts according to the available
 
22
cpu and memory resources.
 
23
"""
 
24
 
 
25
import os
 
26
import time
 
27
import traceback
 
28
from pprint import pprint, pformat
 
29
 
 
30
from asrun.i18n         import _
 
31
from asrun.mystring     import print3
 
32
from asrun.thread       import Lock
 
33
 
 
34
_DBG_tri   = False
 
35
_DBG_crit  = False
 
36
_DBG_alloc = False
 
37
 
 
38
 
 
39
def isnum(val):
 
40
   return type(val) in (int, long, float)
 
41
 
 
42
 
 
43
# constants
 
44
NORESOURCE, ALLOCATED, OVERLIMIT = [object() for i in range(3)]
 
45
 
 
46
 
 
47
class ResourceManagerError(Exception):
 
48
   pass
 
49
 
 
50
 
 
51
class ResourceManager:
 
52
   """
 
53
   Class to manage resources to run a lot of calculations of several hosts.
 
54
   """
 
55
   lock = Lock()
 
56
 
 
57
   def __init__(self, hostinfo):
 
58
      """
 
59
      Initializations.
 
60
      """
 
61
      self.hostinfo = hostinfo.copy()
 
62
      self.all_hosts = self.hostinfo.keys()
 
63
      
 
64
      self.d_crit = {# numkey,   default, reverse order
 
65
         # criteria
 
66
         'cpurun' : ('000_cpurun',  0,     False, ),
 
67
         'memrun' : ('010_memrun',  0,     False, ),
 
68
         # characteristics of hosts (constant)
 
69
         'mem'    : ('101_mem',     0,     True,  ),
 
70
         'cpu'    : ('102_cpu',     0,     True,  ),
 
71
         'nomjob' : ('998_job',    '',     False, ),
 
72
         'host'   : ('999_host',   '',     False, ),
 
73
      }
 
74
      self.l_crit = [(v[0], v[2]) for v in self.d_crit.values()]
 
75
      self.l_crit.sort()
 
76
      self.job_keys = [k for k, v in self.d_crit.items() if v[0] > '100_']
 
77
      # build the list of all infos
 
78
      self.infos = []
 
79
      self.limit = {}
 
80
      # to store host availability
 
81
      self.host_connection = {}
 
82
      for host, info in self.hostinfo.items():
 
83
         self.host_connection[host] = True
 
84
         info['host'] = host
 
85
         d = {}
 
86
         for crit, val in self.d_crit.items():
 
87
            numkey, default, reverse = val
 
88
            d[numkey] = info.get(crit, default)
 
89
            if numkey > '100_' and numkey < '900_' \
 
90
            and ( \
 
91
               self.limit.get(crit) is None or (reverse and self.limit[crit] < d[numkey]) \
 
92
               or  (not reverse and self.limit[crit] > d[numkey]) \
 
93
            ):
 
94
               self.limit[crit] = d[numkey]
 
95
         self.infos.append(d)
 
96
      # to store job parameters
 
97
      self.history = {}
 
98
 
 
99
   def get(self, host, key, default=None):
 
100
      """
 
101
      Return a current value.
 
102
      """
 
103
      res = default
 
104
      if self.d_crit[key][0] > '100_':
 
105
         res = self.hostinfo[host][key]
 
106
      else:
 
107
         nkey = self.d_crit[key][0]
 
108
         for info in self.infos:
 
109
            if info['999_host'] == host:
 
110
               res = info[nkey]
 
111
               break
 
112
      return res
 
113
 
 
114
   def set(self, host, key, value):
 
115
      """
 
116
      Set a value.
 
117
      """
 
118
      done = False
 
119
      if self.d_crit[key][0] > '100_':
 
120
         raise ResourceManagerError, "can not be changed : '%s'" % key
 
121
      else:
 
122
         nkey = self.d_crit[key][0]
 
123
         for info in self.infos:
 
124
            if info['999_host'] == host:
 
125
               info[nkey] = value
 
126
               done = True
 
127
      if not done:
 
128
         raise ResourceManagerError, "can't set '%s'" % key
 
129
 
 
130
   def add(self, host, key, value):
 
131
      """
 
132
      Add 'value' to the current value.
 
133
      """
 
134
      current = self.get(host, key)
 
135
      self.set(host, key, current + value)
 
136
 
 
137
   def sub(self, host, key, value):
 
138
      """
 
139
      Substract 'value' to the current value.
 
140
      """
 
141
      current = self.get(host, key)
 
142
      self.set(host, key, current - value)
 
143
 
 
144
   def store_job(self, host='unknown', **kwjob):
 
145
      """
 
146
      Store 'kwjob' in history.
 
147
      """
 
148
      if host is None:
 
149
         return
 
150
      dico = kwjob.copy()
 
151
      dico['host'] = host
 
152
      dico['allocated'] = time.strftime('%a %H:%M:%S')
 
153
      self.history[kwjob['nomjob']] = dico
 
154
 
 
155
   def get_job(self, jobname):
 
156
      """
 
157
      Get 'jobname' from history.
 
158
      """
 
159
      dico = self.history.get(jobname)
 
160
      if dico is None:
 
161
         return {}
 
162
      dico['released'] = time.strftime('%a %H:%M:%S')
 
163
      return dico.copy()
 
164
 
 
165
   def get_history(self):
 
166
      """
 
167
      Return a copy of the jobs's 'history'.
 
168
      """
 
169
      dico = self.history.copy()
 
170
      return dico
 
171
 
 
172
   def action(self, what, *args, **kwargs):
 
173
      """
 
174
      Run safely a method which access to infos attribute.
 
175
      """
 
176
      result = None
 
177
      self.lock.acquire()
 
178
      tberr = None
 
179
      try:
 
180
         result = getattr(self, what)(*args, **kwargs)
 
181
      except Exception, err:
 
182
         tberr = traceback.format_exc()
 
183
      self.lock.release()
 
184
      if tberr:
 
185
         raise ResourceManagerError, tberr
 
186
      return result
 
187
 
 
188
   def get_first(self, values=None):
 
189
      """
 
190
      Return the most available host.
 
191
      """
 
192
      if values is None:
 
193
         values = self.infos[:]
 
194
      if len(values) == 0:
 
195
         return {}
 
196
      for crit, rev in self.l_crit:
 
197
         values.sort(reverse=rev)
 
198
         if _DBG_tri:
 
199
            print3(crit)
 
200
            pprint(values)
 
201
         val0 = values[0][crit]
 
202
         new = [values[0],]
 
203
         for info in values[1:]:
 
204
            if info[crit] != val0:
 
205
               break
 
206
            new.append(info)
 
207
         values = new
 
208
         if len(values) == 1:
 
209
            break
 
210
      if _DBG_tri:
 
211
         print3('--- FIN ---')
 
212
         pprint(values)
 
213
      return values[0]
 
214
 
 
215
   def is_connected(self, host):
 
216
      """
 
217
      Tell if 'host' is connected.
 
218
      """
 
219
      return self.host_connection[host]
 
220
 
 
221
   def get_all_connected_hosts(self):
 
222
      """
 
223
      Return all connected hosts.
 
224
      """
 
225
      return [h for h in self.all_hosts if self.is_connected(h)]
 
226
 
 
227
   def suitable_host(self, **kwjob):
 
228
      """
 
229
      Limit infos to capable hosts.
 
230
      """
 
231
      values = []
 
232
      for info in self.infos:
 
233
         if not self.is_connected(info[self.d_crit['host'][0]]):
 
234
            continue
 
235
         isok = True
 
236
         for par, val in kwjob.items():
 
237
            if not self.isok(par, val, info):
 
238
               isok = False
 
239
               break
 
240
         if isok:
 
241
            values.append(info)
 
242
      return values
 
243
 
 
244
   def available_host(self, **kwjob):
 
245
      """
 
246
      Return the most available host accepting job parameters.
 
247
      """
 
248
      if _DBG_crit:
 
249
         print3('job parameters : ', end=' ')
 
250
         pprint(kwjob)
 
251
      values = self.suitable_host(**kwjob)
 
252
      if _DBG_crit:
 
253
         print '%d suitable hosts : %s' \
 
254
            % (len(values), [info[self.d_crit['host'][0]] for info in values])
 
255
      avail = self.get_first(values)
 
256
      return avail
 
257
 
 
258
   def isok(self, key, value, info):
 
259
      """
 
260
      Tell if 'value' is under the limit vs 'info[par]'.
 
261
      """
 
262
      if not key in self.job_keys:
 
263
         return False
 
264
      val_ref = info[self.d_crit[key][0]]
 
265
      if not self.d_crit.has_key(key + 'run'):
 
266
         return True
 
267
      val_run = info[self.d_crit[key + 'run'][0]]
 
268
      ok = (value + val_run) <= val_ref
 
269
      if _DBG_crit:
 
270
         rep = '-'
 
271
         if ok: rep = 'ok'
 
272
         print3('%-2s host=%-24s para=%-4s allocated=%-4s requested=%-4s ref=%-4s' % \
 
273
            (rep, info[self.d_crit['host'][0]], key, val_run, value, val_ref))
 
274
      return ok
 
275
 
 
276
   def CheckHosts(self, run):
 
277
      """
 
278
      Check connection to known hosts, update host_connection attribute.
 
279
      """
 
280
      ok = tot = 0
 
281
      for host in self.all_hosts:
 
282
         tot += 1
 
283
         iret, output = run.Shell('echo hello', mach=host)
 
284
         if output.find('hello') < 0:
 
285
            run.DBG(u"CheckHosts failed on %s" % host)
 
286
            self.host_connection[host] = False
 
287
         else:
 
288
            ok += 1
 
289
            run.DBG(u"CheckHosts success on %s" % host)
 
290
            self.host_connection[host] = True
 
291
      return ok, tot
 
292
 
 
293
 
 
294
   def Request(self, **kwjob):
 
295
      """
 
296
      Ask for an available host and block resources.
 
297
      """
 
298
      info = self.action('available_host', **kwjob)
 
299
      host = info.get(self.d_crit['host'][0], None)
 
300
      if _DBG_alloc:
 
301
         print3('job allocated on %s : %s' % (host, pformat(kwjob)))
 
302
      if host is not None:
 
303
         status = ALLOCATED
 
304
         for key in self.job_keys:
 
305
            if isnum(kwjob.get(key)):
 
306
               self.action('add', host, key + 'run', kwjob[key])
 
307
      else:
 
308
         status = NORESOURCE
 
309
         for key, lim in self.limit.items():
 
310
            if kwjob.get(key) and (\
 
311
               (self.d_crit[key][2]     and kwjob[key] > lim) or \
 
312
               (not self.d_crit[key][2] and kwjob[key] < lim) ):
 
313
               status = OVERLIMIT
 
314
               break
 
315
      self.action('store_job', host, **kwjob)
 
316
      if _DBG_alloc:
 
317
         print3(self.Load())
 
318
      return host, status
 
319
 
 
320
   def Free(self, jobname):
 
321
      """
 
322
      Free job resources on 'host'.
 
323
      """
 
324
      kwjob = self.action('get_job', jobname)
 
325
      if not kwjob:
 
326
         return
 
327
      host = kwjob['host']
 
328
      ddbg = {}
 
329
      for key in self.job_keys:
 
330
         if isnum(kwjob.get(key)):
 
331
            self.action('sub', host, key + 'run', kwjob[key])
 
332
            ddbg[key] = kwjob[key]
 
333
      if _DBG_alloc:
 
334
         print3('job released : %s' % (pformat(ddbg)))
 
335
 
 
336
   def repr_history(self):
 
337
      """
 
338
      Return jobs's history.
 
339
      """
 
340
      dico = self.action('get_history')
 
341
      ljob = [(v['allocated'], k) for k,v in dico.items()]
 
342
      ljob.sort()
 
343
      head = '%s %s %s %s %s %s' % (_(u"job").center(14), _(u"host").center(16),
 
344
                                    _(u"started").center(12), _(u"cpu").rjust(6),
 
345
                                    _(u"memory").rjust(6), _(u"ended").rjust(12))
 
346
      fmt  = '%(job_)-14s %(host_)-16s %(allocated)12s %(cpu)6d %(mem)6d %(released)12s'
 
347
      dini = { 'job_' : '', 'host_' : '', 'allocated' : '', 'cpu' : 0, 'mem' : 0, 'released' : '' }
 
348
      txt = [head,]
 
349
      for start, job in ljob:
 
350
         d = dini.copy()
 
351
         d.update(dico[job])
 
352
         d.update({ 'job_' : job[:14], 'host_' : d['host'][:16] })
 
353
         txt.append(fmt % d)
 
354
      return os.linesep.join(txt)
 
355
 
 
356
   def Load(self):
 
357
      """
 
358
      Return current load.
 
359
      """
 
360
      infos = self.infos[:]
 
361
      lkey = ['cpu', 'mem']
 
362
      head= '    host      cpu    mem'
 
363
      fmt = '%(host_)-16s %(cpu)6d %(mem)6d'
 
364
      txt = [head,]
 
365
      for info in infos:
 
366
         d = { 'host_' : info[self.d_crit['host'][0]][:16] }
 
367
         for k in lkey:
 
368
            d[k] = info[self.d_crit[k + 'run'][0]]
 
369
         txt.append(fmt % d)
 
370
      return os.linesep.join(txt)
 
371
 
 
372
   def GetConfig(self, host):
 
373
      """
 
374
      Return the configuration of 'host'.
 
375
      """
 
376
      return self.hostinfo.get(host, {}).copy()
 
377
 
 
378
   def get_sum(self, key):
 
379
      """
 
380
      Return the sum of a resource.
 
381
      """
 
382
      return sum([self.get(host, key, 0) for host in self.get_all_connected_hosts()])
 
383
 
 
384
 
 
385
class PureLocalResource(ResourceManager):
 
386
   """
 
387
   Derived class to run only on local host.
 
388
   """
 
389
   def __init__(self, run):
 
390
      """Initialization using ASTER_RUN object."""
 
391
      host = run.GetHostName().split('.')[0]
 
392
      cpu  = run.GetCpuInfo('numcpu')
 
393
      mem  = run.GetMemInfo('memtotal') or 99e3
 
394
      c = { host : { 'host' : host, 'cpu' : cpu, 'mem' : mem }}
 
395
      ResourceManager.__init__(self, hostinfo=c)
 
396
 
 
397
 
 
398
if __name__ == '__main__':
 
399
   # unittest
 
400
   if False:
 
401
      from asrun.run import AsRunFactory
 
402
      run=AsRunFactory('/opt/aster/ASTK/ASTK_SERV')
 
403
      rc = PureLocalResource(run)
 
404
   else:
 
405
      from asrun.rcfile import parse_config
 
406
      c = parse_config('../unittest/astout/hostfile')
 
407
      #for v in c.values():
 
408
      #   del v['mem']
 
409
      rc = ResourceManager(c)
 
410
   
 
411
   if False:
 
412
      print3('cpu =', rc.action('get', 'cli70cx', 'cpu'))
 
413
      print3('mem =', rc.action('get', 'cli70cx', 'mem'))
 
414
      print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
 
415
      
 
416
      assert rc.isok('mem', 384, rc.infos[0]) is True
 
417
      rc.action('add', 'cli70cx', 'memrun', 256)
 
418
      print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
 
419
      assert rc.isok('mem', 384, rc.infos[0]) is False
 
420
      rc.action('sub', 'cli70cx', 'memrun', 256)
 
421
      print3('mem used =', rc.action('get', 'cli70cx', 'memrun'))
 
422
 
 
423
   host1, status = rc.Request(nomjob='job1', cpu=1, mem=256)
 
424
   #time.sleep(1)
 
425
   host, status = rc.Request(cpu=1, mem=256, time=44)
 
426
   assert host is None and status == NORESOURCE
 
427
   host, status = rc.Request(cpu=1, mem=2048, time=14)
 
428
   assert host is None and status == OVERLIMIT
 
429
   host2, status = rc.Request(nomjob='job2', cpu=1, mem=128)
 
430
   #time.sleep(1)
 
431
   print3()
 
432
   rc.Free('job2')
 
433
   print3()
 
434
   host3, status = rc.Request(nomjob='job3', cpu=2, mem=64)
 
435
   #time.sleep(1)
 
436
 
 
437
   rc.Free('job1')
 
438
   #time.sleep(1)
 
439
   host4, status = rc.Request(nomjob='job4', cpu=1)
 
440
   rc.Free('job3')
 
441
   rc.Free('job4')
 
442
 
 
443
   print3(rc.repr_history())
 
444