1
#****************************************************************************
2
# Copyright (C) 2005 Brian Granger <bgranger@scu.edu> and
3
# Barry Wark <bwark@u.washington.edu>
5
# Distributed under the terms of the BSD License.
6
#****************************************************************************
8
# PyXG-0.1.0, Released 5/3/2005
10
"""PyXG provides a python interface to Apple's Xgrid.
12
Xgrid is Apple's software for building and managing clusters of
13
Macintosh computers for use in high performance computation.
14
See http://www.apple.com/server/macosx/features/xgrid.html for more details.
16
This module wraps the xgrid command line in Tiger. It will not work with
17
the Technonogy Previews of Xgrid. The command line is wrapped in this module
18
as the goal is to provide an interface to Xgrid that can be used from an interactive python prompt. The Cocoa API for Xgrid (XgridFoundation) is
19
based on an event-loop paradigm and is less well suited for interactive work. If you want to use Xgrid and python from within a Cocoa application, you should use XgridFoundation and PyObjC.
24
1. Use Xgrid from within python scripts as well as in interactive python
26
2. Create, submit and manage simple (one task) and batch (many task) Xgrid
27
jobs using python's elegant syntax.
28
3. Work with multiple Xgrid controllers simultaneouly.
29
4. List available grids for each controller and query their status.
34
Import xg, create a Connection and Controller object:
37
>>> conn = Connection(hostname='xgrid.work.com',password='secret')
38
>>> cont = Controller(conn)
40
List the grids managed by the controller:
45
[<Grid with gridID = 0>, <Grid with gridID = 3>]
47
Work with the default grid, listing active jobs:
53
##################################################
54
id Date Started Status CPU Power
55
##################################################
56
229 2005-12-22 11:18:47 -0800 Finished 0
57
230 2005-12-22 11:18:50 -0800 Finished 0
58
231 2005-12-22 11:18:52 -0800 Finished 0
59
232 2005-12-22 11:18:55 -0800 Finished 0
61
Get a job from the default grid and work with it:
72
>>> j.printInfo(verbose=False)
73
229 2005-12-22 11:18:47 -0800 Finished 0
74
>>> j.printSpecification()
76
applicationIdentifier : com.apple.xgrid.cli
77
taskSpecifications : {0 = {arguments = (); command = "/usr/bin/cal"; }; }
80
submissionIdentifier : abc
85
>>> j.results(stdout="job229.out",stderr="job229.err")
86
Job stdout saved in file: job229.out
88
Use a Grid object to submit a single task job:
90
>>> j = g.submit(cmd='/usr/bin/cal')
91
Job submitted with id: 234
92
>>> j.printInfo(verbose=False)
93
234 2005-12-22 13:09:52 -0800 Finished 0
96
#####################################################################
98
#####################################################################
110
from Foundation import *
112
except ImportError, e:
113
print "This module requires PyObjC."
116
#####################################################################
118
#####################################################################
121
"""Xgrid exception class."""
122
def __init__(self, err):
126
return "Xgrid Error: %s" % (self.err)
128
class InvalidIdentifier(XgridError):
129
"""Xgrid exception for invalid job or grid identifiers."""
130
def __init__(self, id):
133
return "Invalid Xgrid Identifier: " + str(self.id)
135
class InvalidGridIdentifier(InvalidIdentifier):
136
"""Invalide grid identifier exception."""
138
return "Invalid Grid Identifier: " + str(self.id)
140
class InvalidJobIdentifier(InvalidIdentifier):
141
"""Invalide job identifier exception."""
143
return "Invalid Job Identifier: " + str(self.id)
145
class InvalidAction(XgridError):
146
"""Invalide action exception."""
147
def __init__(self,action):
150
return "Invalid Xgrid Action: " + str(self.action)
152
class InvalidIdentifierType:
153
"""Invalid job or grid identifier type."""
154
def __init__(self,bad_var):
155
self.bad_var = bad_var
157
return "Invalid Xgrid Identifier Type: " + str(self.bad_var)
159
# Setting this flag causes printing of every Xgrid command that is executed
160
PYXGRID_DEBUG = False
163
#####################################################################
164
# See if there is an Xgrid cluster defined by environment vars #
165
#####################################################################
167
defaultXgridHostname = os.environ.get('XGRID_CONTROLLER_HOSTNAME')
168
defaultXgridPassword = os.environ.get('XGRID_CONTROLLER_PASSWORD')
170
if not defaultXgridPassword:
171
defaultXgridPassword = ''
173
#####################################################################
174
# Some utilities for running and parsing Xgrid commands #
175
#####################################################################
177
class NSString(objc.Category(NSString)):
178
def xGridPropertyList(self):
179
"""Category to extend NSString.
181
This enables the handling of illegal 'old-style' plists returned by
182
the xgrid command-line tool.
184
In particular, xgrid returns "old-style" plists that contain dates
185
that aren't quoted strings. Because old-style plists can't contain
186
dates in native format (only as quoted strings), the built-in
187
CoreFoundation parser chokes on the output. (A bug has been filed
190
xGridPropertyList: uses a compiled RegEx to add quotes around date
191
strings in the xgrid output before passing the result to NSString's
196
m = re.compile(r'(?P<prefix>^\s*date.* = )(?P<date>.*?);')
197
lines = str.splitlines()
199
for (i,l) in itertools.izip(itertools.count(), lines):
201
lines[i]=m.sub(r'\g<prefix>"\g<date>";', l)
204
str = sep.join(lines)
206
return NSString.stringWithString_(str).propertyList()
208
def xgridParse(cmd="xgrid -grid list"):
209
"""Submits and parses output from the xgrid command line.
211
The output of the xgrid CLI is a (sometimes illegal) old-style plist.
212
This function runs an xgrid command and parses the output of the command
213
into a valid NSDictionary (a python dict).
215
To handle the illegal plists returned by the xgrid CLI, we use the
216
xGridPropertyList: method of NSString (defined above).
218
See the xgrid man pages for details on the xgrid command.
219
This fuction will return a nested python structure that
220
reflects the output of the xgrid command.
223
# When set, print the actual commands Xgrid sent
224
if PYXGRID_DEBUG==True:
227
# Run the xgrid command
228
result = commands.getstatusoutput(cmd)
230
# Check for good exit status (0) and parse output
233
return NSString.stringWithString_(result[1]).xGridPropertyList()
237
raise XgridError("xgrid command error: %s" % result[0])
239
#####################################################################
241
#####################################################################
244
"""Makes sure that the id is a unicode string"""
246
if (isinstance(id,str) or isinstance(id, unicode)):
248
elif isinstance(id,int):
251
raise InvalidIdentifierType(id)
253
#####################################################################
255
#####################################################################
258
"""Track information needed to connect to an XGrid controller."""
260
def __init__(self, hostname=0, password=0, kerberos=False):
261
"""Create a Connection object to be passed to other objects.
263
To connect to a specific Xgrid controller, create a Connection
264
object and then pass it to the Controller, Grid or Job objects
265
you create. This class performs no verification of the hostname
271
Use the controller and password given in environmental vars.
273
>>> cn = Connection()
275
Specify a hostname and password.
277
>>> cn = Connection('xgrid.work.com','topsecret')
281
>>> cn = Connection('xgrid.work.com',kerberos=True)
286
@param hostname: The hostname of the xgrid controller, like
287
"xgrid.work.com". If set to 0, it will default to the value set
288
in the environment variable XGRID_CONTROLLER_HOSTNAME
289
@type hostname: string
290
@param password: The password of the xgrid controller, like
291
"mysecret". If set to 0, it will default to the value set in the
292
environment variable XGRID_CONTROLLER_PASSWORD. For no password,
293
set it equal to the empty string: password=''.
294
@type password: string
295
@param kerberos: If True, connect using single sign on (SSO), instead
296
of a password. You must have already obtained a kerberos
297
ticket-granting ticket from the KDC that controlls the kerberos
298
domain containing the Xgrid controller. If kerberos is True, the
300
@type kerberos: boolean
303
# Setup the hostname and password
305
if defaultXgridHostname:
306
self.hostname = defaultXgridHostname
308
raise XgridError('No controller hostname specified')
310
self.hostname = hostname
312
if kerberos: # kerberos overrides password
314
self.password = False
316
self.kerberos = False
318
self.password = defaultXgridPassword
320
self.password = password
322
self._buildConnectString()
324
def _buildConnectString(self):
325
"""Builds the connect_string."""
326
self._connectString = '-h %s ' % self.hostname
328
self._connectString = '%s-auth Kerberos ' % self._connectString
331
self._connectString = '%s-p %s ' % \
332
(self._connectString, self.password)
334
def connectString(self):
335
"""Returns the connection string to be used in Xgrid commands."""
336
return self._connectString
339
"""Manage a set of Xgrid jobs."""
341
def __init__(self, gridID=u'0', connection=None, update=0):
342
"""Create a JobManager for a given Grid and Connection.
344
This class is mainly designed to be a base class of the Conroller
345
and Grid classes, both of which need to manage Xgrid jobs. The class
346
provides basic capabilities to list active jobs and perform various
347
actions on those jobs (stop, restart, resume, suspend, delete). Job
348
submission is handled by the Controller, Grid and Job classes.
353
@arg gridID: The grid identifier of the grid on which the JobManager
354
will manage jobs. Internally, the grid identifier is a unicode
355
string, but gridID can be given in any of the formats u'0', '0'
356
or 0. If gridID=u'0', the JobManager will manage jobs on the
358
@type gridID: unicode, str or int
359
@arg connection: Instance of Connection class. If empty a default
360
Connection object is used.
361
@type connection: Connection
362
@arg update: A boolean flag that determines whether or not the
363
internal state is updated upon creation. This involves a call to
364
the Xgrid controller.
365
@type update: boolean
368
self.gridID = processID(gridID)
370
if connection is None:
371
self._connection = Connection()
373
self._connection = connection
380
def _updateJobs(self):
381
"""Updates the _jobIDs and _jobs instance variables."""
385
gridIDString = u'-gid ' + self.gridID
387
cmd = 'xgrid %s-job list %s' % (self._connection.connectString(),
389
result = xgridParse(cmd)
390
self._checkGridID(result,self.gridID)
391
self._jobIDs = result['jobList']
393
# Now build the array of Job objects
395
for jid in self._jobIDs:
396
self._jobs.append(Job(jid, self._connection))
398
def _checkGridID(self, result, gridID):
399
"""Checks a dictionary for an InvalidGridIdentifier error."""
400
if result.has_key('error'):
401
if result['error']=='InvalidGridIdentifier':
402
raise InvalidGridIdentifier(gridID)
404
def jobs(self, update=1):
405
"""Returns a list of initialized Job objects for all active jobs.
407
@arg update: A boolean flag that determines whether or not the
408
internal state is updated upon creation. This involves a call to
409
the Xgrid controller.
410
@type update: boolean
411
@return: a lists of active Job objects.
419
def job(self, jobID=u'999999999', update=1):
420
"""Returns the Job object with job identifier id.
422
@arg jobID: The job identifier. Can be given as unicode, str or int.
423
@type jobID: unicode, str, or int
424
@arg update: A boolean flag that determines whether or not the
425
internal state is updated upon creation. This involves a call to
426
the Xgrid controller.
427
@type update: boolean
428
@return: Initialize Job object.
431
processedID = processID(jobID)
435
if processedID in self._jobIDs:
436
return Job(processedID, self._connection)
438
raise InvalidJobIdentifier(processedID)
440
def jobIDs(self, update=1):
441
"""Returns a tuple of job identifiers for all active jobs.
443
@arg update: A boolean flag that determines whether or not the
444
internal state is updated upon creation. This involves a call to
445
the Xgrid controller.
446
@type update: boolean
447
@returns: Tuple of job identifiers.
455
# Job management methods
457
def perform(self, action, jobIDs):
458
"""Performs an action on a subset of active jobs.
460
@arg action: The action to be performed as a string. Implemented
461
actions are stop, resume, delete, restart, and suspend.
463
@arg jobIDs: Jobs to perform the action on.
464
@type jobIDs: Either the string 'all' or a python sequence of
468
# Validate the action
469
actions = ('stop', 'suspend', 'resume', 'delete', 'restart')
470
if action not in actions:
471
raise InvalidAction(action)
476
jobList = self._jobIDs # list of jobs to act on
477
elif isinstance(jobIDs,tuple) or isinstance(jobIDs,list):
481
raise TypeError, jobIDs
484
tempJob = Job(processID(jid), self._connection)
485
tempJob.perform(action) # this will raise any errors
488
"""Stops all active jobs."""
489
self.perform(action='stop',jobIDs='all')
491
def suspendAll(self):
492
"""Suspends all active jobs."""
493
self.perform(action='suspend',jobIDs='all')
496
"""Resumes all active jobs."""
497
self.perform(action='resume',jobIDs='all')
500
"""Deletes all active jobs."""
501
self.perform(action='delete',jobIDs='all')
503
def restartAll(self):
504
"""Restarts all active jobs."""
505
self.perform(action='restart',jobIDs='all')
508
"""Prints information about all active Xgrid jobs."""
511
print "##################################################"
512
print "%-4s %-24s %-10s %-10s" % \
513
("id", "Date Started", "Status", "CPU Power")
514
print "##################################################"
519
"""Manage the grids of a given Xgrid controller."""
521
def __init__(self, connection=None, update=0):
522
"""A class to manage a set of Xgrid grids.
524
This class is meant to be a base class for the Controller class.
525
It provides basic capabilities for listing the available grids.
528
Instance of Connection class. If empty a default Connection object
530
@type connection: Connection
532
A boolean flag that determines whether or not the
533
internal state is updated upon creation. This involves a call to
534
the Xgrid controller.
535
@type update: boolean
538
if connection is None:
539
self._connection = Connection()
541
self._connection = connection
549
def _updateGrids(self):
550
"""Updates the _gridIDs and _grids instance variables."""
552
cmd = 'xgrid %s-grid list' % self._connection.connectString()
553
result = xgridParse(cmd)
554
self._gridIDs = result['gridList']
556
# Now build the array of Grid objects
558
for gridID in self._gridIDs:
559
self._grids.append(Grid(gridID, self._connection))
561
def grids(self, update=1):
562
"""Returns a list of initialized Grid objects.
564
@arg update: A boolean flag that determines whether or not the
565
internal state is updated upon creation. This involves a call to
566
the Xgrid controller.
567
@type update: boolean
574
def grid(self, gridID=u'0', update=1):
575
"""Returns the Grid object with grid identifier gridID.
578
The unicode string identifier of the grid. If no gridID is given,
579
the default grid u'0' is used.
580
@type gridID: unicode, int or str
581
@arg update: A boolean flag that determines whether or not the
582
internal state is updated upon creation. This involves a call to
583
the Xgrid controller.
584
@type update: boolean
587
processedGridID = processID(gridID)
591
if processedGridID in self._gridIDs:
592
return Grid(processedGridID, self._connection)
594
raise InvalidGridIdentifier(gridID)
596
def gridIDs(self, update=1):
597
"""Returns a tuple of grid identifiers for all avialable grids.
599
@arg update: A boolean flag that determines whether or not the
600
internal state is updated upon creation. This involves a call to
601
the Xgrid controller.
602
@type update: boolean
609
class Controller(JobManager, GridManager):
610
"""A class for working with an Xgrid controller."""
612
def __init__(self, connection=None, update=0):
613
"""This class provides an interface to an Xgrid controller.
615
An Xgrid controller is a single machine that manages a set of
616
of grids. Each grid in turn, consists of a set of agents and
617
jobs running on the agents.
619
This class provides access to the grids and jobs managed by the
620
controller. In Xgrid, both grids and jobs have identifiers, which are
621
unicode strings, like u'0', but this module can take identifiers as
622
strings or integers as well.
624
Controller and Grid objects can be used to submit Xgrid jobs, but the
625
Job class is used to retrieve job results.
627
The Controller is only the JobManager for the default Grid. To access
628
the jobs of other grids, create instances of their Grid objects.
633
>>> cn = Connection('myhost','mypassword')
635
>>> c = Controller(cn)
640
>>> j1 = c.job('1') # Get an initialized Job object with id = '1'
644
>>> c.grid_ids() # List the grid ids
647
>>> c.grid('10') # Get an initialized Grid object with id = '10'
648
<Grid with gridID = 10>
650
>>> c.grid() # Get the Grid boject for the default grid
652
@arg connection: Instance of Connection class. If empty a default
653
Connection object is used.
654
@type connection: Connection
656
@arg update: A boolean flag that determines whether or not the
657
internal state is updated upon creation. This involves a call to
658
the Xgrid controller.
659
@type update: boolean
661
JobManager.__init__(self, u'', connection)
662
GridManager.__init__(self, connection)
668
"""Updates all instance variables for active grids and jobs."""
675
def submit(self, cmd, args='', stdin='', indir='', email='',gridID=u'0'):
676
"""Submits a single task job to the specified grid.
678
This is a nonblocking job submission method for a single job
679
with no sub-tasks. For more complicated jobs with sub-tasks, use
680
the batch() method and the JobSpecification class.
682
Job results can be obtained by calling the results() method of the
686
The command the execute as a string. The executable is not
687
copied if the full path is given, otherwise it is.
690
The command line arguments to be passed to the command.
691
@type args: list or str
693
A local file to use as the stdin stream for the job.
696
A local directory to copy to the remote agent.
699
An email to which notification will be send of various job
703
The identifier of the Grid to which the job will be submitted.
704
If empty, the default grid u'0' is used.
705
@type gridID: unicode, str or int
706
@returns: Initialized Job object for sumbitted job.
710
j = Job(connection=self._connection)
711
id = j.submit(cmd, args, stdin, indir, email, gridID)
714
def batch(self, specification, gridID=u'0', silent=False):
715
"""Submits a batch job to the specified grid.
717
This is a nonblocking job submission method used for submitting
718
complex multi-task jobs. For single task jobs, use submit().
720
To retrieve job results, use the results() method of the Job object.
723
The job specification of the job, which must be an instance of the
724
JobSpecification class. See the docstring for JobSpecification
726
@type specification: JobSpecification
728
The identifier of the Grid to which the job will be submitted.
729
If empty, the default grid u'0' is used.
730
@type gridID: unicode, str or int
732
If set to True will slience all messages.
733
@type silent: boolean
734
@returns: Initialized Job object for sumbitted job.
738
j = Job(connection=self._connection)
739
id = j.batch(specification, gridID, silent=silent)
742
class Grid(JobManager):
743
"""A class for working with jobs on a specific Xgrid grid."""
745
def __init__(self, gridID=u'0', connection=None, update=0):
746
"""This class provides an interface to an Xgrid grid.
748
An Xgrid grid is a collection of agents and jobs running on the
749
agents. This class provides access to the jobs running on a grid.
750
Currently, Xgrid does not expose an API for working directly with
751
the agents in a grid.
753
Instances of this class can be obtained using two methods.
755
1. By calling the grid() or grids() methods of the GridManager
756
or Controller classes.
758
2. By creating a new Grid object directly with a valid gridID:
763
The grid identifier of the grid. If gridID is empty the default
764
grid (u'0') will be used.
765
@type gridID: unicode, int or str
767
Instance of Connection class. If empty a default Connection object
769
@type connection: Connection
770
@arg update: A boolean flag that determines whether or not the
771
internal state is updated upon creation. This involves a call to
772
the Xgrid controller.
773
@type update: boolean
775
JobManager.__init__(self, gridID, connection)
787
def _updateInfo(self):
788
cmd = 'xgrid %s-grid attributes -gid %s' % \
789
(self._connection.connectString(), self.gridID)
790
result = xgridParse(cmd)
791
self._checkGridID(result,self.gridID)
792
self._info = result['gridAttributes']
794
def _checkGridID(self,result,gridID):
795
if result.has_key('error'):
796
if result['error']=='InvalidGridIdentifier':
797
raise InvalidGridIdentifier(gridID)
799
def info(self, update=1):
806
def submit(self, cmd, args='', stdin='', indir='', email=''):
807
"""Submits a single task job to the current grid.
809
This is a nonblocking job submission method for a single job
810
with no sub-tasks. For more complicated jobs with sub-tasks, use
811
the batch() method and the JobSpecification class.
813
Job results can be obtained by calling the results() method of the
817
The command the execute as a string. The executable is not
818
copied if the full path is given, otherwise it is.
821
The command line arguments to be passed to the command.
822
@type args: list or str
824
A local file to use as the stdin stream for the job.
827
A local directory to copy to the remote agent.
830
An email to which notification will be send of various job
833
@returns: Initialized Job object for sumbitted job.
837
j = Job(connection=self._connection)
838
id = j.submit(cmd, args, stdin, indir, email, self.gridID)
841
def batch(self, specification):
842
"""Submits a batch job to the current grid.
844
This is a nonblocking job submission method used for submitting
845
complex multi-task jobs. For single task jobs, use submit().
847
To retrieve job results, use the results() method of the Job class.
850
The job specification of the job, which must be an instance of the
851
JobSpecification class. See the docstring for JobSpecification
853
@type specification: JobSpecification
854
@returns: Initialized Job object for sumbitted job.
858
j = Job(connection=self._connection)
859
id = j.batch(specification, self.gridID)
865
result = '<Grid with gridID = %s>' % self.gridID
869
"""A class for working with an Xgrid job."""
871
def __init__(self, jobID=u'999999999', connection=None):
872
"""An Xgrid job class.
874
This class allows a user to work with an Xgrid job. It provides
875
capabilities for starting jobs, managing them and retrieving
878
Job instances are created in two ways:
880
1. By calling the job() or jobs() methods of the Grid or Controller
883
2. By simply creating a new Job object:
885
>>> j = Job(u'200') # Create a new job with id of 200
888
The job identifier of the job. To create a new job, leave blank.
889
@type jobID: unicode, str or int
891
Instance of Connection class. If empty a default Connection object
893
@type connection: Connection
896
self.jobID = processID(jobID)
898
if connection is None:
899
self._connection = Connection()
901
self._connection = connection
903
self._specification = {}
906
# Semi-private methods
908
def _updateInfo(self):
909
cmd = 'xgrid %s-job attributes -id %s' % \
910
(self._connection.connectString(), self.jobID)
911
result = xgridParse(cmd)
912
self._checkJobID(result)
913
self._info = result['jobAttributes']
915
def _updateSpecification(self):
916
cmd = 'xgrid %s-job specification -id %s' % \
917
(self._connection.connectString(), self.jobID)
918
result = xgridParse(cmd)
919
self._checkJobID(result)
920
self._specification = result['jobSpecification']
924
self._updateSpecification()
926
def _checkJobID(self,result):
927
if result.has_key('error'):
928
if result['error']=='InvalidJobIdentifier':
929
raise InvalidJobIdentifier(self.jobID)
931
def _checkGridID(self,result,gridID):
932
if result.has_key('error'):
933
if result['error']=='InvalidGridIdentifier':
934
raise InvalidGridIdentifier(gridID)
938
def specification(self, update=1):
939
"""Return the Xgrid job specification.
941
The Xgrid job specification is the dictionary that Xgrid uses
942
to submit the job. It contains keys that describe the command
943
arguments, directories, etc.
947
self._updateSpecification()
948
return self._specification
950
def info(self, update=1):
951
"""Return the current status information about a job.
953
The job info is a dictionary of keys describing the current state
954
of the job. This includes start/stop dates, name, etc.
956
The method printInfo() prints the info() dictionary in a nice form.
958
@arg update: A boolean flag that determines whether or not the
959
internal state is updated upon creation. This involves a call to
960
the Xgrid controller.
961
@type update: boolean
969
# Job submission and results
971
def results(self, stdout='', outdir='', stderr='', block=10,silent=False):
972
"""Retrive the results of an Xgrid job.
974
This method provides both a blocking and nonblocking method of
975
getting the results of an Xgrid job. The job does not need to be
976
completed to retrieve the results. Because of this, the results
977
method can be used to get partial results while the job continues
978
to run. It can also automatically name output files.
981
The local file in which to put the stdout stream of the remote job.
982
If this is empty, the method will automatically generate a name in
983
the local directory of the form: xgridjob-jobID.out. This file
984
always is placed in the cwd rather than the outdir
987
The local file in which to put the stderr stream of the remote job.
988
If this is empty, the method will automatically generate a name in
989
the local directory of the form: xgridjob-jobID.err.
992
The local directory in which to put the files retrieved from the
993
remote job. This is only for files other than the stdout and
994
stderr files. When empty, the other files are not brought back.
995
This is to prevent any accidental overwrites of results.
998
Whether or not to block until the job is finished. If block=0,
999
partially completed results are retrieved and the job will
1000
continue to run. If block > 0, the job status is queried every
1001
block seconds and the results are returned when the job
1004
@arg silent: Silence all messages.
1005
@type silent: boolean
1013
so = '-so ' + stdout + ' '
1015
temp_stdout = 'xgridjob-' + self.jobID + '.out'
1016
so = '-so ' + temp_stdout + ' '
1019
out = '-out ' + outdir
1022
se = '-se ' + stderr + ' '
1024
temp_stderr = 'xgridjob-' + self.jobID + '.err'
1025
se = '-se ' + temp_stderr + ' '
1027
cmd = "xgrid %s-job results -id %s %s%s%s" % \
1028
(self._connection.connectString(),self.jobID,so,se,out)
1030
# Block until the results are back!
1033
while not self._info['jobStatus'] == 'Finished':
1036
log = xgridParse(cmd)
1038
log = xgridParse(cmd)
1040
if (not silent) and (len(so) > 0):
1041
print "Job stdout saved in file: " + so[4:]
1045
def submit(self, cmd, args='', stdin='', indir='', email='',gridID=u'0'):
1046
"""Submits a single task job to the specified grid.
1048
This is a nonblocking job submission method for a single job
1049
with no sub-tasks. For more complicated jobs with sub-tasks, use
1052
Job results can be obtained by calling the results() method.
1055
The command the execute as a string. The executable is not
1056
copied if the full path is given, otherwise it is.
1059
The command line arguments to be passed to the command.
1060
@type args: list or str
1062
A local file to use as the stdin stream for the job.
1065
A local directory to copy to the remote agent.
1068
An email to which notification will be send of various job
1072
The identifier of the Grid to which the job will be submitted.
1073
If empty, the default grid u'0' is used.
1074
@type gridID: unicode, str or int
1075
@returns: Initialized Job object for sumbitted job.
1079
processedGridID = processID(gridID)
1081
# First build the submit_string
1087
stdinString = '-si ' + stdin + ' '
1089
indirString = '-in ' + indir + ' '
1091
emailString = '-email ' + email + ' '
1093
# Process the arguments
1094
if isinstance(args,str):
1096
elif isinstance(args,list):
1099
argList.append(str(a)+" ")
1100
argString = "".join(argList).strip()
1104
submitString = stdinString + indirString + emailString + \
1105
cmd + ' ' + argString
1107
# Now submit the job and set the job id
1108
#print "Submitting job to grid: ", gridID
1109
cmd = "xgrid %s-gid %s -job submit %s" % \
1110
(self._connection.connectString(), gridID, submitString)
1111
jobinfo = xgridParse(cmd)
1112
self._checkGridID(jobinfo, processedGridID)
1113
self.jobID = jobinfo['jobIdentifier']
1114
print "Job submitted with id: ", self.jobID
1117
def batch(self, specification, gridID=u'0', silent=False):
1118
"""Submits a batch job to the specified grid.
1120
This is a nonblocking job submission method used for submitting
1121
complex multi-task jobs. For single task jobs, use submit().
1123
To retrieve job results, use the results() method.
1126
The job specification of the job, which must be an instance of the
1127
JobSpecification class. See the docstring for JobSpecification
1129
@type specification: JobSpecification
1131
The identifier of the Grid to which the job will be submitted.
1132
If empty, the default grid u'0' is used.
1133
@type gridID: unicode, str or int
1135
If set to True will slience all messages.
1136
@type silent: boolean
1137
@returns: Initialized Job object for sumbitted job.
1141
if not isinstance(specification, JobSpecification):
1144
processedGridID = processID(gridID)
1146
#job_dict = propertyListFromPythonCollection(specification.jobspec())
1147
jobSpec = specification.jobSpec()
1148
plistFile = tempfile.NamedTemporaryFile().name
1149
jobSpec.writeToFile_atomically_(plistFile,1)
1150
cmd = "xgrid %s-gid %s -job batch %s" % \
1151
(self._connection.connectString(), processedGridID, plistFile)
1152
jobinfo = xgridParse(cmd)
1153
self._checkGridID(jobinfo, processedGridID)
1154
self.jobID = jobinfo['jobIdentifier']
1157
print "Job submitted with id: ", self.jobID
1162
# Job control methods
1164
def perform(self, action):
1165
"""Performs an action on a job.
1168
The action to be performed as a string. Implemented actions
1169
are stop, resume, delete, restart, and suspend.
1173
actions = ('stop', 'suspend', 'resume', 'delete', 'restart')
1174
if action in actions:
1175
cmd = 'xgrid %s-job %s -id %s' % \
1176
(self._connection.connectString(), action, self.jobID)
1177
result = xgridParse(cmd)
1178
self._checkJobID(result)
1179
print "Action %s performed on job %s" % (action,self.jobID)
1180
# If delete reset everything but the controller
1181
if action == 'delete':
1182
self.jobID = u'999999999'
1183
self._specification = {}
1186
raise InvalidAction(action)
1189
"""Stops the job."""
1190
self.perform('stop')
1193
"""Suspends the job."""
1194
self.perform('suspend')
1197
"""Resumes the job."""
1198
self.perform('resume')
1201
"""Deletes the job."""
1202
self.perform('delete')
1205
"""Restarts the job."""
1206
self.perform('restart')
1211
result = '<Job with jobID = %s>' % self.jobID
1214
def printInfo(self,verbose=True):
1215
"Prints the info() dictionary of a job."""
1217
if verbose == False:
1218
output = "%-4s %-24s %-10s %-10s" % \
1219
(self.jobID, self._info['dateStarted'],
1220
self._info['jobStatus'],
1221
self._info['activeCPUPower'])
1223
elif verbose == True:
1225
for key in self._info.keys():
1226
print ' ', key, ': ', self._info[key]
1229
def printSpecification(self):
1230
"""Print the job specification used to submit the job."""
1232
self._updateSpecification()
1234
for key in self._specification.keys():
1235
print ' ', key, ': ', self._specification[key]
1238
class JobSpecification:
1239
"""A class used for constructing multi-task batch jobs."""
1242
"""This class is used to setup the plist file for multi-task jobs.
1244
self._jobDict = NSMutableDictionary.dictionaryWithCapacity_(10)
1245
self._jobSpec = NSArray.arrayWithObject_(self._jobDict)
1246
self._jobDict[u'taskSpecifications'] = {}
1249
self._jobDict[u'applicationIdentifier'] = u'PyXG'
1250
self._jobDict[u'schedulerParameters'] = {}
1251
self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] \
1256
def _checkSchedulerParameters(self):
1257
if not self._jobDict.has_key(u'schedulerParameters'):
1258
self._jobDict[u'schedulerParameters'] = {}
1260
def _checkInputFiles(self):
1261
if not self._jobDict.has_key(u'inputFiles'):
1262
self._jobDict[u'inputFiles'] = {}
1265
"""Prints the full job specification dictionary."""
1266
return self._jobSpec
1268
# Job/Task setup methods
1270
def setName(self, name):
1271
"""Set the name (a string) of the job."""
1272
self._jobDict[u'name'] = unicode(name)
1275
"""Returns the job name."""
1276
return self._jobDict.get(u'name')
1278
def setEmail(self, email):
1279
"""Set the notification email for the batch job."""
1280
self._jobDict[u'notificationEmail'] = unicode(email)
1283
"""Returns the notification email."""
1284
return self._jobDict.get(u'notificationEmail')
1286
def setTasksMustStartSimultaneously(self, simul):
1287
"""Sets the tasksMustStartSimultanously flag."""
1290
self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] = u'YES'
1292
self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] = u'NO'
1294
def tasksMustStartSimultaneously(self):
1295
"""Returns the value of tasksMustStartSimultaneously."""
1296
return self._jobDict[u'schedulerParameters'].get(u'tasksMustStartSimultaneously')
1298
def setMinimumTaskCount(self,count):
1299
"""Sets the min number of tasks that should be started."""
1300
#self._checkSchedulerParameters()
1301
self._jobDict[u'schedulerParameters'][u'minimumTaskCount'] = count
1303
def minimumTaskCount(self):
1304
"""Returns the value of minimumTaskCount."""
1305
return self._jobDict[u'schedulerParameters'].get(u'minimumTaskCount')
1307
def setDependsOnJobs(self,jobArray):
1308
"""Takes a list of Xgrid job ids that must complete before this job begins."""
1309
#self._checkSchedulerParameters()
1310
self._jobDict[u'schedulerParameters'][u'dependsOnJobs'] = \
1311
[unicode(j) for j in jobArray]
1313
def dependsOnJobs(self):
1314
"""Returns the value of dependsOnJobs."""
1315
return self._jobDict[u'schedulerParameters'].get(u'dependsOnJobs')
1317
def addFile(self, localFilePath, fileName, isExecutable=0):
1318
"""Specifies a local file to copy to the Xgrid agents.
1320
This file is encoded into a base64 string and inserted into the
1321
job specification dictionary.
1324
The full path of the file on the client (local) computer
1325
@type localFilePath: unicode or str
1327
The name to call the file on the agent
1328
@type fileName: unicode or str
1330
Set to 1 if the file should be executable
1331
@type isExecutable: boolean
1334
assert os.path.isfile(localFilePath), "File does not exist: %s" % localFilePath
1335
path = NSString.stringWithString_(unicode(localFilePath)).stringByStandardizingPath()
1336
data = NSData.dataWithContentsOfFile_(path)
1337
self._checkInputFiles()
1339
isExecString = u'YES'
1341
isExecString = u'NO'
1342
self._jobDict[u'inputFiles'][unicode(fileName)] = \
1343
{u'fileData':data,u'isExecutable':isExecString}
1345
def delFile(self,fileName):
1346
"""Deletes the file named filename from the JobSpecification.
1348
List filenames using the flies() method.
1350
if self._jobDict.has_key(u'inputFiles'):
1351
if self._jobDict[u'inputFiles'].has_key(unicode(fileName)):
1352
del self._jobDict[u'inputFiles'][unicode(fileName)]
1355
"""Prints a list of included filenames."""
1356
f = self._jobDict.get(u'inputFiles')
1360
def addTask(self, cmd, args=u'', env={}, inputStream=u'',
1362
"""Adds a task to the jobSpecification.
1365
The command the execute as a string. The executable is not
1366
copied if the full path is given, otherwise it is.
1369
The command line arguments to be passed to the command.
1370
@type args: list or str
1372
A Python dictionary of environment variables to use on the agents.
1373
@type env: unicode or str
1375
A local file to send to the agents that will be used a stdin for the
1377
@type inputStream: unicode or str
1378
@arg dependsOnTasks:
1379
A list of task ids that must complete before this one begins
1380
@type dependsOnTasks: list
1383
taskName = unicode('task%i' % self.nextTask)
1386
# Process the arguments
1387
if isinstance(args,str) or isinstance(args,unicode):
1388
argList = args.split(' ')
1389
elif isinstance(args,list):
1394
taskSpec[u'command'] = unicode(cmd)
1396
taskSpec[u'arguments'] = [unicode(a) for a in argList]
1398
taskSpec[u'environment'] = env
1400
taskSpec[u'inputStream'] = unicode(inputStream)
1402
taskSpec[u'dependsOnTasks'] = dependsOnTasks
1403
self._jobDict[u'taskSpecifications'][taskName] = taskSpec
1408
def delTask(self,task):
1409
"""Deletes the task named task.
1411
List the task names using the tasks() method.
1413
if self._jobDict[u'taskSpecifications'].has_key(unicode(task)):
1414
del self._jobDict[u'taskSpecifications'][unicode(task)]
1420
"""Return a list of the task names."""
1421
return self._jobDict[u'taskSpecifications'].keys()
1423
def printTasks(self):
1424
"""Print the task specifications of all tasks."""
1425
for tid in self._jobDict[u'taskSpecifications'].keys():
1426
print str(tid) + " " + str(self._jobDict[u'taskSpecifications'][tid])