~ellisonbg/pyxg/trunk

« back to all changes in this revision

Viewing changes to xg.py

  • Committer: Brian Granger
  • Date: 2010-01-04 04:11:12 UTC
  • Revision ID: ellisonbg@gmail.com-20100104041112-bgqid1rzjkykekph
Initial import of pyxg into bzr.  History is lost.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#****************************************************************************
 
2
#       Copyright (C) 2005 Brian Granger <bgranger@scu.edu> and 
 
3
#                          Barry Wark <bwark@u.washington.edu>
 
4
#
 
5
#  Distributed under the terms of the BSD License.  
 
6
#****************************************************************************
 
7
 
 
8
# PyXG-0.1.0, Released 5/3/2005
 
9
 
 
10
"""PyXG provides a python interface to Apple's Xgrid.  
 
11
 
 
12
Xgrid is Apple's software for building and managing clusters of 
 
13
Macintosh computers for use in high performance computation.  
 
14
See http://www.apple.com/server/macosx/features/xgrid.html for more details.
 
15
 
 
16
This module wraps the xgrid command line in Tiger.  It will not work with
 
17
the Technonogy Previews of Xgrid.  The command line is wrapped in this module 
 
18
as the goal is to provide an interface to Xgrid that can be used from an interactive python prompt.  The Cocoa API for Xgrid (XgridFoundation) is 
 
19
based on an event-loop paradigm and is less well suited for interactive work.  If you want to use Xgrid and python from within a Cocoa application, you should use XgridFoundation and PyObjC.
 
20
 
 
21
Features
 
22
========
 
23
 
 
24
    1.  Use Xgrid from within python scripts as well as in interactive python
 
25
        sessions.
 
26
    2.  Create, submit and manage simple (one task) and batch (many task) Xgrid
 
27
        jobs using python's elegant syntax.
 
28
    3.  Work with multiple Xgrid controllers simultaneouly.
 
29
    4.  List available grids for each controller and query their status.
 
30
 
 
31
Quick Start
 
32
===========
 
33
 
 
34
Import xg, create a Connection and Controller object:
 
35
 
 
36
>>> from xg import *
 
37
>>> conn = Connection(hostname='xgrid.work.com',password='secret')
 
38
>>> cont = Controller(conn)
 
39
 
 
40
List the grids managed by the controller:
 
41
 
 
42
>>> cont.gridIDs()
 
43
(0, 3)
 
44
>>> cont.grids()
 
45
[<Grid with gridID = 0>, <Grid with gridID = 3>]
 
46
 
 
47
Work with the default grid, listing active jobs:
 
48
 
 
49
>>> g = cont.grid(0)
 
50
>>> g.jobIDs()
 
51
(229, 230, 231, 232)
 
52
>>> g.printJobs()
 
53
##################################################
 
54
id   Date Started             Status     CPU Power 
 
55
##################################################
 
56
229  2005-12-22 11:18:47 -0800 Finished   0         
 
57
230  2005-12-22 11:18:50 -0800 Finished   0         
 
58
231  2005-12-22 11:18:52 -0800 Finished   0         
 
59
232  2005-12-22 11:18:55 -0800 Finished   0         
 
60
 
 
61
Get a job from the default grid and work with it:
 
62
 
 
63
>>> j = g.job(229)      
 
64
>>> j.printInfo()
 
65
{
 
66
    name:  /usr/bin/cal
 
67
    jobStatus:  Finished
 
68
    taskCount:  1
 
69
    undoneTaskCount:  0
 
70
    percentDone:  100
 
71
}
 
72
>>> j.printInfo(verbose=False)
 
73
229  2005-12-22 11:18:47 -0800 Finished   0   
 
74
>>> j.printSpecification()
 
75
{
 
76
    applicationIdentifier :  com.apple.xgrid.cli
 
77
    taskSpecifications :  {0 = {arguments = (); command = "/usr/bin/cal"; }; }
 
78
    name :  /usr/bin/cal
 
79
    inputFiles :  {}
 
80
    submissionIdentifier :  abc
 
81
}
 
82
 
 
83
Get job results:
 
84
 
 
85
>>> j.results(stdout="job229.out",stderr="job229.err")
 
86
Job stdout saved in file: job229.out 
 
87
 
 
88
Use a Grid object to submit a single task job:
 
89
 
 
90
>>> j = g.submit(cmd='/usr/bin/cal')
 
91
Job submitted with id:  234
 
92
>>> j.printInfo(verbose=False)
 
93
234  2005-12-22 13:09:52 -0800 Finished   0      
 
94
"""
 
95
 
 
96
#####################################################################
 
97
#   Imports                                                         #
 
98
##################################################################### 
 
99
 
 
100
import string
 
101
import os, sys
 
102
import time
 
103
import commands
 
104
import re
 
105
import itertools
 
106
import tempfile
 
107
import os.path
 
108
 
 
109
try:
 
110
    from Foundation import *
 
111
    import objc
 
112
except ImportError, e:
 
113
    print "This module requires PyObjC."
 
114
    raise e
 
115
    
 
116
#####################################################################
 
117
#   Exceptions                                                      #
 
118
#####################################################################  
 
119
 
 
120
class XgridError:
 
121
    """Xgrid exception class."""
 
122
    def __init__(self, err):
 
123
        self.err = err
 
124
        
 
125
    def __repr__(self):
 
126
        return "Xgrid Error: %s" % (self.err)
 
127
    
 
128
class InvalidIdentifier(XgridError):
 
129
    """Xgrid exception for invalid job or grid identifiers."""
 
130
    def __init__(self, id):
 
131
        self.id = id
 
132
    def __repr__(self):
 
133
        return "Invalid Xgrid Identifier: " + str(self.id)
 
134
 
 
135
class InvalidGridIdentifier(InvalidIdentifier):
 
136
    """Invalide grid identifier exception."""
 
137
    def __repr__(self):
 
138
        return "Invalid Grid Identifier: " + str(self.id)
 
139
        
 
140
class InvalidJobIdentifier(InvalidIdentifier):
 
141
    """Invalide job identifier exception."""
 
142
    def __repr__(self):
 
143
        return "Invalid Job Identifier: " + str(self.id)
 
144
        
 
145
class InvalidAction(XgridError):
 
146
    """Invalide action exception."""
 
147
    def __init__(self,action):
 
148
        self.action = action
 
149
    def __repr__(self):
 
150
        return "Invalid Xgrid Action: " + str(self.action)
 
151
 
 
152
class InvalidIdentifierType:
 
153
    """Invalid job or grid identifier type."""
 
154
    def __init__(self,bad_var):
 
155
        self.bad_var = bad_var
 
156
    def __repr__(self):
 
157
        return "Invalid Xgrid Identifier Type: " + str(self.bad_var)
 
158
 
 
159
# Setting this flag causes printing of every Xgrid command that is executed
 
160
PYXGRID_DEBUG = False
 
161
VERSION = '0.2.0'
 
162
 
 
163
#####################################################################
 
164
#   See if there is an Xgrid cluster defined by environment vars    #
 
165
#####################################################################  
 
166
 
 
167
defaultXgridHostname = os.environ.get('XGRID_CONTROLLER_HOSTNAME')
 
168
defaultXgridPassword = os.environ.get('XGRID_CONTROLLER_PASSWORD')
 
169
 
 
170
if not defaultXgridPassword:
 
171
    defaultXgridPassword = ''
 
172
 
 
173
#####################################################################
 
174
#       Some utilities for running and parsing Xgrid commands       #
 
175
##################################################################### 
 
176
 
 
177
class NSString(objc.Category(NSString)):    
 
178
    def xGridPropertyList(self):
 
179
        """Category to extend NSString.
 
180
        
 
181
        This enables the handling of illegal 'old-style' plists returned by 
 
182
        the xgrid command-line tool.
 
183
        
 
184
        In particular, xgrid returns "old-style" plists that contain dates 
 
185
        that aren't quoted strings.  Because old-style plists can't contain
 
186
        dates in native format (only as quoted strings), the built-in
 
187
        CoreFoundation parser chokes on the output. (A bug has been filed 
 
188
        with apple)
 
189
        
 
190
        xGridPropertyList: uses a compiled RegEx to add quotes around date
 
191
        strings in the xgrid output before passing the result to NSString's
 
192
        propertyList:
 
193
        """
 
194
 
 
195
        str = unicode(self)
 
196
        m = re.compile(r'(?P<prefix>^\s*date.* = )(?P<date>.*?);')
 
197
        lines = str.splitlines()
 
198
        
 
199
        for (i,l) in itertools.izip(itertools.count(), lines):
 
200
            if (m.search(l)):
 
201
                lines[i]=m.sub(r'\g<prefix>"\g<date>";', l)
 
202
        
 
203
        sep = '\n'
 
204
        str = sep.join(lines)
 
205
        
 
206
        return NSString.stringWithString_(str).propertyList()
 
207
 
 
208
def xgridParse(cmd="xgrid -grid list"):
 
209
    """Submits and parses output from the xgrid command line.  
 
210
    
 
211
    The output of the xgrid CLI is a (sometimes illegal) old-style plist.
 
212
    This function runs an xgrid command and parses the output of the command
 
213
    into a valid NSDictionary (a python dict).
 
214
  
 
215
    To handle the illegal plists returned by the xgrid CLI, we use the
 
216
    xGridPropertyList: method of NSString (defined above).
 
217
 
 
218
    See the xgrid man pages for details on the xgrid command.
 
219
    This fuction will return a nested python structure that
 
220
    reflects the output of the xgrid command.
 
221
    """
 
222
 
 
223
    # When set, print the actual commands Xgrid sent
 
224
    if PYXGRID_DEBUG==True:
 
225
        print cmd
 
226
        
 
227
    # Run the xgrid command
 
228
    result = commands.getstatusoutput(cmd)
 
229
 
 
230
    # Check for good exit status (0) and parse output
 
231
    if result[0] == 0:
 
232
        if result[1]:
 
233
            return NSString.stringWithString_(result[1]).xGridPropertyList()
 
234
        else:
 
235
            return {}
 
236
    else:
 
237
        raise XgridError("xgrid command error: %s" % result[0])
 
238
 
 
239
#####################################################################
 
240
#               Other Utilities                                                                                         #
 
241
#####################################################################  
 
242
  
 
243
def processID(id):
 
244
    """Makes sure that the id is a unicode string"""
 
245
    
 
246
    if (isinstance(id,str) or isinstance(id, unicode)):
 
247
        return unicode(id)
 
248
    elif isinstance(id,int):
 
249
        return unicode(id)
 
250
    else:
 
251
        raise InvalidIdentifierType(id)
 
252
        
 
253
#####################################################################
 
254
#               Classes                                                                                                         #
 
255
#####################################################################
 
256
 
 
257
class Connection:
 
258
    """Track information needed to connect to an XGrid controller."""
 
259
    
 
260
    def __init__(self, hostname=0, password=0, kerberos=False):
 
261
        """Create a Connection object to be passed to other objects.
 
262
        
 
263
        To connect to a specific Xgrid controller, create a Connection
 
264
        object and then pass it to the Controller, Grid or Job objects
 
265
        you create.  This class performs no verification of the hostname 
 
266
        or password.  
 
267
        
 
268
        Examples
 
269
        ========
 
270
        
 
271
        Use the controller and password given in environmental vars.
 
272
            
 
273
        >>> cn = Connection()
 
274
            
 
275
        Specify a hostname and password.
 
276
            
 
277
        >>> cn = Connection('xgrid.work.com','topsecret')
 
278
            
 
279
        Use Kerberos.
 
280
            
 
281
        >>> cn = Connection('xgrid.work.com',kerberos=True)
 
282
 
 
283
        Usage
 
284
        =====
 
285
        
 
286
        @param hostname: The hostname of the xgrid controller, like
 
287
            "xgrid.work.com".  If set to 0, it will default to the value set 
 
288
            in the environment variable XGRID_CONTROLLER_HOSTNAME
 
289
        @type  hostname: string
 
290
        @param password: The password of the xgrid controller, like 
 
291
            "mysecret".  If set to 0, it will default to the value set in the
 
292
            environment variable XGRID_CONTROLLER_PASSWORD.  For no password,
 
293
            set it equal to the empty string: password=''.
 
294
        @type  password: string
 
295
        @param kerberos: If True, connect using single sign on (SSO), instead 
 
296
            of a password.  You must have already obtained a kerberos 
 
297
            ticket-granting ticket from the KDC that controlls the kerberos
 
298
            domain containing the Xgrid controller. If kerberos is True, the
 
299
            password is ignored.
 
300
        @type  kerberos: boolean
 
301
        """
 
302
        
 
303
        # Setup the hostname and password
 
304
        if hostname == 0:
 
305
            if defaultXgridHostname:
 
306
                self.hostname = defaultXgridHostname
 
307
            else:
 
308
                raise XgridError('No controller hostname specified')
 
309
        else:
 
310
            self.hostname = hostname
 
311
        
 
312
        if kerberos: # kerberos overrides password
 
313
            self.kerberos = True
 
314
            self.password = False
 
315
        else:
 
316
            self.kerberos = False  
 
317
            if password == 0:
 
318
                self.password = defaultXgridPassword
 
319
            else:
 
320
                self.password = password
 
321
            
 
322
        self._buildConnectString()
 
323
        
 
324
    def _buildConnectString(self):
 
325
        """Builds the connect_string."""
 
326
        self._connectString = '-h %s ' % self.hostname
 
327
        if (self.kerberos):
 
328
            self._connectString = '%s-auth Kerberos ' % self._connectString
 
329
        else:
 
330
            if self.password:
 
331
                self._connectString = '%s-p %s ' % \
 
332
                    (self._connectString, self.password)
 
333
                    
 
334
    def connectString(self):
 
335
        """Returns the connection string to be used in Xgrid commands."""
 
336
        return self._connectString
 
337
            
 
338
class JobManager:
 
339
    """Manage a set of Xgrid jobs."""
 
340
    
 
341
    def __init__(self, gridID=u'0', connection=None, update=0):
 
342
        """Create a JobManager for a given Grid and Connection.
 
343
        
 
344
        This class is mainly designed to be a base class of the Conroller
 
345
        and Grid classes, both of which need to manage Xgrid jobs.  The class
 
346
        provides basic capabilities to list active jobs and perform various
 
347
        actions on those jobs (stop, restart, resume, suspend, delete).  Job
 
348
        submission is handled by the Controller, Grid and Job classes.
 
349
        
 
350
        Usage
 
351
        =====
 
352
        
 
353
        @arg gridID: The grid identifier of the grid on which the JobManager
 
354
            will manage jobs.  Internally, the grid identifier is a unicode
 
355
            string, but gridID can be given in any of the formats u'0', '0' 
 
356
            or 0.  If gridID=u'0', the JobManager will manage jobs on the
 
357
            default grid
 
358
        @type gridID: unicode, str or int
 
359
        @arg connection:  Instance of Connection class.  If empty a default
 
360
            Connection object is used.
 
361
        @type connection:  Connection
 
362
        @arg update:  A boolean flag that determines whether or not the 
 
363
            internal state is updated upon creation.  This involves a call to 
 
364
            the Xgrid controller.
 
365
        @type update: boolean
 
366
        """
 
367
        
 
368
        self.gridID = processID(gridID)
 
369
 
 
370
        if connection is None:
 
371
            self._connection = Connection()
 
372
        else:    
 
373
            self._connection = connection
 
374
 
 
375
        self._jobs = []
 
376
        self._jobIDs = ()
 
377
        if update:
 
378
            self._updateJobs()
 
379
            
 
380
    def _updateJobs(self):
 
381
        """Updates the _jobIDs and _jobs instance variables."""
 
382
        
 
383
        gridIDString = u''
 
384
        if self.gridID:
 
385
            gridIDString = u'-gid ' + self.gridID 
 
386
            
 
387
        cmd = 'xgrid %s-job list %s' % (self._connection.connectString(),
 
388
            gridIDString)
 
389
        result = xgridParse(cmd)
 
390
        self._checkGridID(result,self.gridID)
 
391
        self._jobIDs = result['jobList']
 
392
        
 
393
        # Now build the array of Job objects
 
394
        self._jobs = []
 
395
        for jid in self._jobIDs:
 
396
            self._jobs.append(Job(jid, self._connection))
 
397
 
 
398
    def _checkGridID(self, result, gridID):
 
399
        """Checks a dictionary for an InvalidGridIdentifier error."""
 
400
        if result.has_key('error'):
 
401
            if result['error']=='InvalidGridIdentifier':
 
402
                raise InvalidGridIdentifier(gridID)
 
403
 
 
404
    def jobs(self, update=1):
 
405
        """Returns a list of initialized Job objects for all active jobs.
 
406
 
 
407
        @arg update:  A boolean flag that determines whether or not the 
 
408
            internal state is updated upon creation.  This involves a call to 
 
409
            the Xgrid controller.
 
410
        @type update: boolean
 
411
        @return: a lists of active Job objects.
 
412
        @rtype: list
 
413
        """
 
414
 
 
415
        if update:
 
416
            self._updateJobs()
 
417
        return self._jobs
 
418
 
 
419
    def job(self, jobID=u'999999999', update=1):
 
420
        """Returns the Job object with job identifier id.
 
421
 
 
422
        @arg jobID: The job identifier.  Can be given as unicode, str or int.
 
423
        @type jobID: unicode, str, or int
 
424
        @arg update:  A boolean flag that determines whether or not the 
 
425
            internal state is updated upon creation.  This involves a call to 
 
426
            the Xgrid controller.
 
427
        @type update: boolean
 
428
        @return: Initialize Job object.
 
429
        """
 
430
        
 
431
        processedID = processID(jobID)
 
432
        
 
433
        if update:
 
434
            self._updateJobs()
 
435
        if processedID in self._jobIDs:
 
436
            return Job(processedID, self._connection)
 
437
        else:
 
438
            raise InvalidJobIdentifier(processedID)
 
439
 
 
440
    def jobIDs(self, update=1):
 
441
        """Returns a tuple of job identifiers for all active jobs.
 
442
        
 
443
        @arg update:  A boolean flag that determines whether or not the 
 
444
            internal state is updated upon creation.  This involves a call to 
 
445
            the Xgrid controller.
 
446
        @type update: boolean
 
447
        @returns: Tuple of job identifiers.
 
448
        @rtype: tuple
 
449
        """
 
450
        
 
451
        if update:
 
452
            self._updateJobs()
 
453
        return self._jobIDs
 
454
    
 
455
    # Job management methods
 
456
    
 
457
    def perform(self, action, jobIDs):
 
458
        """Performs an action on a subset of active jobs.
 
459
        
 
460
        @arg action:  The action to be performed as a string.  Implemented
 
461
            actions are stop, resume, delete, restart, and suspend.
 
462
        @type action: str
 
463
        @arg jobIDs:  Jobs to perform the action on.  
 
464
        @type jobIDs:  Either the string 'all' or a python sequence of 
 
465
            job identifiers.
 
466
        """
 
467
        
 
468
        # Validate the action
 
469
        actions = ('stop', 'suspend', 'resume', 'delete', 'restart')
 
470
        if action not in actions:
 
471
            raise InvalidAction(action)
 
472
            
 
473
        if jobIDs == 'all':
 
474
            # Delete all jobs
 
475
            self._updateJobs()
 
476
            jobList = self._jobIDs  # list of jobs to act on
 
477
        elif isinstance(jobIDs,tuple) or isinstance(jobIDs,list):
 
478
            # Delete some jobs
 
479
            jobList = jobIDs
 
480
        else:
 
481
            raise TypeError, jobIDs
 
482
 
 
483
        for jid in jobList:
 
484
            tempJob = Job(processID(jid), self._connection)
 
485
            tempJob.perform(action) # this will raise any errors
 
486
        
 
487
    def stopAll(self):
 
488
        """Stops all active jobs."""
 
489
        self.perform(action='stop',jobIDs='all')
 
490
        
 
491
    def suspendAll(self):
 
492
        """Suspends all active jobs."""
 
493
        self.perform(action='suspend',jobIDs='all')        
 
494
 
 
495
    def resumeAll(self):
 
496
        """Resumes all active jobs."""
 
497
        self.perform(action='resume',jobIDs='all')        
 
498
 
 
499
    def deleteAll(self):
 
500
        """Deletes all active jobs."""
 
501
        self.perform(action='delete',jobIDs='all')        
 
502
 
 
503
    def restartAll(self):
 
504
        """Restarts all active jobs."""
 
505
        self.perform(action='restart',jobIDs='all')
 
506
 
 
507
    def printJobs(self):
 
508
        """Prints information about all active Xgrid jobs."""
 
509
    
 
510
        self._updateJobs()
 
511
        print "##################################################"
 
512
        print "%-4s %-24s %-10s %-10s" % \
 
513
            ("id", "Date Started", "Status", "CPU Power")
 
514
        print "##################################################"
 
515
        for j in self._jobs:
 
516
            j.printInfo(0)
 
517
 
 
518
class GridManager:
 
519
    """Manage the grids of a given Xgrid controller."""
 
520
    
 
521
    def __init__(self, connection=None, update=0):
 
522
        """A class to manage a set of Xgrid grids.
 
523
        
 
524
        This class is meant to be a base class for the Controller class.
 
525
        It provides basic capabilities for listing the available grids.
 
526
        
 
527
        @arg connection:
 
528
            Instance of Connection class.  If empty a default Connection object
 
529
            is used.
 
530
        @type connection: Connection
 
531
        @arg update:  
 
532
            A boolean flag that determines whether or not the 
 
533
            internal state is updated upon creation.  This involves a call to 
 
534
            the Xgrid controller.
 
535
        @type update: boolean
 
536
        """
 
537
        
 
538
        if connection is None:
 
539
            self._connection = Connection()
 
540
        else:    
 
541
            self._connection = connection
 
542
            
 
543
        self._grids = []
 
544
        self._gridIDs = ()
 
545
 
 
546
        if update:
 
547
            self._updateGrids()
 
548
 
 
549
    def _updateGrids(self):
 
550
        """Updates the _gridIDs and _grids instance variables."""
 
551
         
 
552
        cmd = 'xgrid %s-grid list' % self._connection.connectString()
 
553
        result = xgridParse(cmd)
 
554
        self._gridIDs = result['gridList']
 
555
               
 
556
        # Now build the array of Grid objects
 
557
        self._grids = []
 
558
        for gridID in self._gridIDs:
 
559
            self._grids.append(Grid(gridID, self._connection)) 
 
560
 
 
561
    def grids(self, update=1):
 
562
        """Returns a list of initialized Grid objects.
 
563
        
 
564
        @arg update:  A boolean flag that determines whether or not the 
 
565
            internal state is updated upon creation.  This involves a call to 
 
566
            the Xgrid controller.
 
567
        @type update: boolean
 
568
        """
 
569
 
 
570
        if update:
 
571
            self._updateGrids()
 
572
        return self._grids
 
573
 
 
574
    def grid(self, gridID=u'0', update=1):
 
575
        """Returns the Grid object with grid identifier gridID.
 
576
                
 
577
        @arg gridID:
 
578
            The unicode string identifier of the grid.  If no gridID is given, 
 
579
            the default grid u'0' is used.
 
580
        @type gridID: unicode, int or str
 
581
        @arg update:  A boolean flag that determines whether or not the 
 
582
            internal state is updated upon creation.  This involves a call to 
 
583
            the Xgrid controller.
 
584
        @type update: boolean
 
585
        """
 
586
        
 
587
        processedGridID = processID(gridID)
 
588
            
 
589
        if update:
 
590
            self._updateGrids()
 
591
        if processedGridID in self._gridIDs:
 
592
            return Grid(processedGridID, self._connection)
 
593
        else:
 
594
            raise InvalidGridIdentifier(gridID)
 
595
 
 
596
    def gridIDs(self, update=1):
 
597
        """Returns a tuple of grid identifiers for all avialable grids.
 
598
        
 
599
        @arg update:  A boolean flag that determines whether or not the 
 
600
            internal state is updated upon creation.  This involves a call to 
 
601
            the Xgrid controller.
 
602
        @type update: boolean
 
603
        """
 
604
        
 
605
        if update:
 
606
            self._updateGrids()
 
607
        return self._gridIDs
 
608
 
 
609
class Controller(JobManager, GridManager):
 
610
    """A class for working with an Xgrid controller."""
 
611
    
 
612
    def __init__(self, connection=None, update=0):
 
613
        """This class provides an interface to an Xgrid controller.
 
614
        
 
615
        An Xgrid controller is a single machine that manages a set of
 
616
        of grids.  Each grid in turn, consists of a set of agents and
 
617
        jobs running on the agents.  
 
618
                
 
619
        This class provides access to the grids and jobs managed by the
 
620
        controller.  In Xgrid, both grids and jobs have identifiers, which are
 
621
        unicode strings, like u'0', but this module can take identifiers as
 
622
        strings or integers as well.  
 
623
        
 
624
        Controller and Grid objects can be used to submit Xgrid jobs, but the
 
625
        Job class is used to retrieve job results.
 
626
                
 
627
        The Controller is only the JobManager for the default Grid.  To access
 
628
        the jobs of other grids, create instances of their Grid objects.
 
629
 
 
630
        Examples
 
631
        ========
 
632
        
 
633
        >>> cn = Connection('myhost','mypassword')
 
634
        
 
635
        >>> c = Controller(cn)
 
636
        
 
637
        >>> c.jobIDs()
 
638
        (1, 2, 3)
 
639
        
 
640
        >>> j1 = c.job('1')     # Get an initialized Job object with id = '1'
 
641
        >>> j1
 
642
        <Job with id = 1>
 
643
        
 
644
        >>> c.grid_ids()        # List the grid ids
 
645
        ('0',)
 
646
        
 
647
        >>> c.grid('10')        # Get an initialized Grid object with id = '10'
 
648
        <Grid with gridID = 10>
 
649
        
 
650
        >>> c.grid()            # Get the Grid boject for the default grid        
 
651
 
 
652
        @arg connection: Instance of Connection class.  If empty a default 
 
653
            Connection object is used.
 
654
        @type connection: Connection
 
655
            
 
656
        @arg update:  A boolean flag that determines whether or not the 
 
657
            internal state is updated upon creation.  This involves a call to 
 
658
            the Xgrid controller.
 
659
        @type update: boolean    
 
660
        """
 
661
        JobManager.__init__(self, u'', connection)
 
662
        GridManager.__init__(self, connection)
 
663
            
 
664
        if update:
 
665
            self._update()
 
666
                  
 
667
    def _update(self):
 
668
        """Updates all instance variables for active grids and jobs."""
 
669
        
 
670
        self._updateGrids()
 
671
        self._updateJobs()
 
672
        
 
673
    # Job Submission
 
674
        
 
675
    def submit(self, cmd, args='', stdin='', indir='', email='',gridID=u'0'):
 
676
        """Submits a single task job to the specified grid.
 
677
        
 
678
        This is a nonblocking job submission method for a single job
 
679
        with no sub-tasks.  For more complicated jobs with sub-tasks, use
 
680
        the batch() method and the JobSpecification class.  
 
681
        
 
682
        Job results can be obtained by calling the results() method of the 
 
683
        Job object.
 
684
                
 
685
        @arg cmd:  
 
686
            The command the execute as a string.  The executable is not
 
687
            copied if the full path is given, otherwise it is.
 
688
        @type cmd: str            
 
689
        @arg args:  
 
690
            The command line arguments to be passed to the command.  
 
691
        @type args: list or str
 
692
        @arg stdin:
 
693
            A local file to use as the stdin stream for the job.
 
694
        @type stdin: str
 
695
        @arg indir:
 
696
            A local directory to copy to the remote agent.
 
697
        @type indir: str
 
698
        @arg email:
 
699
            An email to which notification will be send of various job
 
700
            state changes.
 
701
        @type email: str
 
702
        @arg gridID:
 
703
            The identifier of the Grid to which the job will be submitted.  
 
704
            If empty, the default grid u'0' is used.
 
705
        @type gridID: unicode, str or int
 
706
        @returns: Initialized Job object for sumbitted job.
 
707
        @rtype: Job                        
 
708
        """      
 
709
    
 
710
        j = Job(connection=self._connection)
 
711
        id = j.submit(cmd, args, stdin, indir, email, gridID)
 
712
        return j
 
713
        
 
714
    def batch(self, specification, gridID=u'0', silent=False):
 
715
        """Submits a batch job to the specified grid.
 
716
        
 
717
        This is a nonblocking job submission method used for submitting
 
718
        complex multi-task jobs.  For single task jobs, use submit().  
 
719
        
 
720
        To retrieve job results, use the results() method of the Job object. 
 
721
        
 
722
        @arg specification:
 
723
            The job specification of the job, which must be an instance of the 
 
724
            JobSpecification class.  See the docstring for JobSpecification 
 
725
            for more details.
 
726
        @type specification: JobSpecification
 
727
        @arg gridID:
 
728
            The identifier of the Grid to which the job will be submitted.  
 
729
            If empty, the default grid u'0' is used.
 
730
        @type gridID: unicode, str or int
 
731
        @arg silent:
 
732
            If set to True will slience all messages.
 
733
        @type silent: boolean   
 
734
        @returns: Initialized Job object for sumbitted job.
 
735
        @rtype: Job        
 
736
        """      
 
737
    
 
738
        j = Job(connection=self._connection)
 
739
        id = j.batch(specification, gridID, silent=silent)
 
740
        return j
 
741
        
 
742
class Grid(JobManager):
 
743
    """A class for working with jobs on a specific Xgrid grid."""
 
744
    
 
745
    def __init__(self, gridID=u'0', connection=None, update=0):
 
746
        """This class provides an interface to an Xgrid grid.
 
747
        
 
748
        An Xgrid grid is a collection of agents and jobs running on the
 
749
        agents.  This class provides access to the jobs running on a grid.
 
750
        Currently, Xgrid does not expose an API for working directly with
 
751
        the agents in a grid.
 
752
        
 
753
        Instances of this class can be obtained using two methods.
 
754
        
 
755
            1. By calling the grid() or grids() methods of the GridManager 
 
756
            or Controller classes.
 
757
 
 
758
            2. By creating a new Grid object directly with a valid gridID:
 
759
            
 
760
        >>> g = Grid(u'0')
 
761
            
 
762
        @arg gridID:
 
763
            The grid identifier of the grid. If gridID is empty the default 
 
764
            grid (u'0') will be used.
 
765
        @type gridID: unicode, int or str
 
766
        @arg connection:
 
767
            Instance of Connection class.  If empty a default Connection object
 
768
            is used.
 
769
        @type connection: Connection
 
770
        @arg update:  A boolean flag that determines whether or not the 
 
771
            internal state is updated upon creation.  This involves a call to 
 
772
            the Xgrid controller.
 
773
        @type update: boolean    
 
774
        """
 
775
        JobManager.__init__(self, gridID, connection)
 
776
        
 
777
        self._info = {}
 
778
        if update:
 
779
            self._update()
 
780
        
 
781
    # Private methods
 
782
        
 
783
    def _update(self):
 
784
        self._updateJobs()
 
785
        self._updateInfo()
 
786
            
 
787
    def _updateInfo(self):
 
788
        cmd = 'xgrid %s-grid attributes -gid %s' % \
 
789
            (self._connection.connectString(), self.gridID)
 
790
        result = xgridParse(cmd)
 
791
        self._checkGridID(result,self.gridID)
 
792
        self._info = result['gridAttributes']
 
793
 
 
794
    def _checkGridID(self,result,gridID):
 
795
        if result.has_key('error'):
 
796
            if result['error']=='InvalidGridIdentifier':
 
797
                raise InvalidGridIdentifier(gridID)
 
798
 
 
799
    def info(self, update=1):
 
800
        if update:
 
801
            self._updateInfo()
 
802
        return self._info
 
803
 
 
804
    # Job Submission
 
805
        
 
806
    def submit(self, cmd, args='', stdin='', indir='', email=''):
 
807
        """Submits a single task job to the current grid.
 
808
        
 
809
        This is a nonblocking job submission method for a single job
 
810
        with no sub-tasks.  For more complicated jobs with sub-tasks, use
 
811
        the batch() method and the JobSpecification class.  
 
812
        
 
813
        Job results can be obtained by calling the results() method of the 
 
814
        Job object.
 
815
                
 
816
        @arg cmd:  
 
817
            The command the execute as a string.  The executable is not
 
818
            copied if the full path is given, otherwise it is.
 
819
        @type cmd: str            
 
820
        @arg args:  
 
821
            The command line arguments to be passed to the command.  
 
822
        @type args: list or str
 
823
        @arg stdin:
 
824
            A local file to use as the stdin stream for the job.
 
825
        @type stdin: str
 
826
        @arg indir:
 
827
            A local directory to copy to the remote agent.
 
828
        @type indir: str
 
829
        @arg email:
 
830
            An email to which notification will be send of various job
 
831
            state changes.
 
832
        @type email: str
 
833
        @returns: Initialized Job object for sumbitted job.
 
834
        @rtype: Job                           
 
835
        """      
 
836
 
 
837
        j = Job(connection=self._connection)
 
838
        id = j.submit(cmd, args, stdin, indir, email, self.gridID)
 
839
        return j
 
840
        
 
841
    def batch(self, specification):
 
842
        """Submits a batch job to the current grid.
 
843
        
 
844
        This is a nonblocking job submission method used for submitting
 
845
        complex multi-task jobs.  For single task jobs, use submit().  
 
846
        
 
847
        To retrieve job results, use the results() method of the Job class. 
 
848
        
 
849
        @arg specification:
 
850
            The job specification of the job, which must be an instance of the 
 
851
            JobSpecification class.  See the docstring for JobSpecification 
 
852
            for more details.
 
853
        @type specification: JobSpecification
 
854
        @returns: Initialized Job object for sumbitted job.
 
855
        @rtype: Job        
 
856
        """  
 
857
            
 
858
        j = Job(connection=self._connection)
 
859
        id = j.batch(specification, self.gridID)
 
860
        return j
 
861
        
 
862
    # Other methods
 
863
 
 
864
    def __repr__(self):
 
865
        result = '<Grid with gridID = %s>' % self.gridID
 
866
        return result
 
867
        
 
868
class Job:
 
869
    """A class for working with an Xgrid job."""
 
870
    
 
871
    def __init__(self, jobID=u'999999999', connection=None):
 
872
        """An Xgrid job class.
 
873
        
 
874
        This class allows a user to work with an Xgrid job.  It provides
 
875
        capabilities for starting jobs, managing them and retrieving 
 
876
        their results.
 
877
        
 
878
        Job instances are created in two ways:
 
879
        
 
880
            1. By calling the job() or jobs() methods of the Grid or Controller
 
881
            classes.  
 
882
           
 
883
            2. By simply creating a new Job object:
 
884
          
 
885
        >>> j = Job(u'200')    # Create a new job with id of 200
 
886
        
 
887
        @arg jobID:
 
888
            The job identifier of the job.  To create a new job, leave blank.
 
889
        @type jobID: unicode, str or int
 
890
        @arg connection:
 
891
            Instance of Connection class.  If empty a default Connection object
 
892
            is used.
 
893
        @type connection: Connection
 
894
        """
 
895
 
 
896
        self.jobID = processID(jobID)
 
897
 
 
898
        if connection is None:
 
899
            self._connection = Connection()
 
900
        else:    
 
901
            self._connection = connection
 
902
 
 
903
        self._specification = {}
 
904
        self._info ={}
 
905
 
 
906
    # Semi-private methods
 
907
 
 
908
    def _updateInfo(self):
 
909
        cmd = 'xgrid %s-job attributes -id %s' % \
 
910
            (self._connection.connectString(), self.jobID)
 
911
        result = xgridParse(cmd)
 
912
        self._checkJobID(result)
 
913
        self._info = result['jobAttributes']
 
914
        
 
915
    def _updateSpecification(self):
 
916
        cmd = 'xgrid %s-job specification -id %s' % \
 
917
            (self._connection.connectString(), self.jobID)
 
918
        result = xgridParse(cmd)
 
919
        self._checkJobID(result)
 
920
        self._specification = result['jobSpecification']
 
921
        
 
922
    def _update(self):
 
923
        self._updateInfo()
 
924
        self._updateSpecification()
 
925
 
 
926
    def _checkJobID(self,result):
 
927
        if result.has_key('error'):
 
928
            if result['error']=='InvalidJobIdentifier':
 
929
                raise InvalidJobIdentifier(self.jobID)
 
930
 
 
931
    def _checkGridID(self,result,gridID):
 
932
        if result.has_key('error'):
 
933
            if result['error']=='InvalidGridIdentifier':
 
934
                raise InvalidGridIdentifier(gridID)
 
935
                         
 
936
    # Get methods
 
937
                                                      
 
938
    def specification(self, update=1):
 
939
        """Return the Xgrid job specification.
 
940
        
 
941
        The Xgrid job specification is the dictionary that Xgrid uses
 
942
        to submit the job.  It contains keys that describe the command
 
943
        arguments, directories, etc.
 
944
        """
 
945
        
 
946
        if update:
 
947
            self._updateSpecification()
 
948
        return self._specification
 
949
        
 
950
    def info(self, update=1):
 
951
        """Return the current status information about a job.
 
952
        
 
953
        The job info is a dictionary of keys describing the current state
 
954
        of the job.  This includes start/stop dates, name, etc.
 
955
        
 
956
        The method printInfo() prints the info() dictionary in a nice form.
 
957
        
 
958
        @arg update:  A boolean flag that determines whether or not the 
 
959
            internal state is updated upon creation.  This involves a call to 
 
960
            the Xgrid controller.
 
961
        @type update: boolean    
 
962
 
 
963
        """
 
964
        
 
965
        if update:
 
966
            self._updateInfo()
 
967
        return self._info
 
968
        
 
969
    # Job submission and results
 
970
 
 
971
    def results(self, stdout='', outdir='', stderr='', block=10,silent=False):
 
972
        """Retrive the results of an Xgrid job.
 
973
        
 
974
        This method provides both a blocking and nonblocking method of 
 
975
        getting the results of an Xgrid job.  The job does not need to be 
 
976
        completed to retrieve the results.  Because of this, the results 
 
977
        method can be used to get partial results while the job continues 
 
978
        to run.  It can also automatically name output files.
 
979
        
 
980
        @arg stdout:
 
981
            The local file in which to put the stdout stream of the remote job.
 
982
            If this is empty, the method will automatically generate a name in
 
983
            the local directory of the form: xgridjob-jobID.out.  This file 
 
984
            always is placed in the cwd rather than the outdir
 
985
        @type stdout: str   
 
986
        @arg stderr:
 
987
            The local file in which to put the stderr stream of the remote job.
 
988
            If this is empty, the method will automatically generate a name in
 
989
            the local directory of the form: xgridjob-jobID.err.  
 
990
        @type stderr: str
 
991
        @arg outdir:
 
992
            The local directory in which to put the files retrieved from the
 
993
            remote job.  This is only for files other than the stdout and
 
994
            stderr files.  When empty, the other files are not brought back.
 
995
            This is to prevent any accidental overwrites of results.
 
996
        @type outdir: str
 
997
        @arg block:
 
998
            Whether or not to block until the job is finished.  If block=0, 
 
999
            partially completed results are retrieved and the job will
 
1000
            continue to run.  If block > 0, the job status is queried every
 
1001
            block seconds and the results are returned when the job 
 
1002
            is completed.
 
1003
        @type block: int
 
1004
        @arg silent: Silence all messages.
 
1005
        @type silent: boolean            
 
1006
        """
 
1007
            
 
1008
        so = ''
 
1009
        se = ''
 
1010
        out = ''
 
1011
        
 
1012
        if stdout:
 
1013
            so = '-so ' + stdout + ' '
 
1014
        else:
 
1015
            temp_stdout = 'xgridjob-' + self.jobID + '.out'
 
1016
            so = '-so ' + temp_stdout + ' '
 
1017
            
 
1018
        if outdir:
 
1019
            out = '-out ' + outdir
 
1020
            
 
1021
        if stderr:
 
1022
            se = '-se ' + stderr + ' '
 
1023
        else:
 
1024
            temp_stderr = 'xgridjob-' + self.jobID + '.err'
 
1025
            se = '-se ' + temp_stderr + ' '        
 
1026
            
 
1027
        cmd = "xgrid %s-job results -id %s %s%s%s" % \
 
1028
            (self._connection.connectString(),self.jobID,so,se,out)
 
1029
        
 
1030
        # Block until the results are back! 
 
1031
        self._updateInfo()
 
1032
        if block:
 
1033
            while not self._info['jobStatus'] == 'Finished':
 
1034
                time.sleep(block)
 
1035
                self._updateInfo()
 
1036
            log = xgridParse(cmd)
 
1037
        else:
 
1038
            log = xgridParse(cmd)
 
1039
            
 
1040
        if (not silent) and (len(so) > 0):
 
1041
            print "Job stdout saved in file: " + so[4:]
 
1042
                    
 
1043
    # Job Submission
 
1044
 
 
1045
    def submit(self, cmd, args='', stdin='', indir='', email='',gridID=u'0'): 
 
1046
        """Submits a single task job to the specified grid.
 
1047
        
 
1048
        This is a nonblocking job submission method for a single job
 
1049
        with no sub-tasks.  For more complicated jobs with sub-tasks, use
 
1050
        the batch() method.  
 
1051
        
 
1052
        Job results can be obtained by calling the results() method.
 
1053
                
 
1054
        @arg cmd:  
 
1055
            The command the execute as a string.  The executable is not
 
1056
            copied if the full path is given, otherwise it is.
 
1057
        @type cmd: str            
 
1058
        @arg args:  
 
1059
            The command line arguments to be passed to the command.  
 
1060
        @type args: list or str
 
1061
        @arg stdin:
 
1062
            A local file to use as the stdin stream for the job.
 
1063
        @type stdin: str
 
1064
        @arg indir:
 
1065
            A local directory to copy to the remote agent.
 
1066
        @type indir: str
 
1067
        @arg email:
 
1068
            An email to which notification will be send of various job
 
1069
            state changes.
 
1070
        @type email: str
 
1071
        @arg gridID:
 
1072
            The identifier of the Grid to which the job will be submitted.  
 
1073
            If empty, the default grid u'0' is used.
 
1074
        @type gridID: unicode, str or int
 
1075
        @returns: Initialized Job object for sumbitted job.
 
1076
        @rtype: Job                           
 
1077
        """      
 
1078
 
 
1079
        processedGridID = processID(gridID)
 
1080
            
 
1081
        # First build the submit_string
 
1082
        submitString = ''
 
1083
        stdinString = ''
 
1084
        indirString = ''
 
1085
        emailString = ''
 
1086
        if stdin:
 
1087
            stdinString = '-si ' + stdin + ' '
 
1088
        if indir:
 
1089
            indirString = '-in ' + indir + ' '
 
1090
        if email:
 
1091
            emailString = '-email ' + email + ' '
 
1092
 
 
1093
        # Process the arguments
 
1094
        if isinstance(args,str):
 
1095
            argString = args
 
1096
        elif isinstance(args,list):
 
1097
            argList = []
 
1098
            for a in args:
 
1099
                argList.append(str(a)+" ")
 
1100
            argString = "".join(argList).strip()
 
1101
        else:
 
1102
            raise TypeError
 
1103
        
 
1104
        submitString = stdinString + indirString + emailString + \
 
1105
            cmd + ' ' + argString
 
1106
        
 
1107
        # Now submit the job and set the job id
 
1108
        #print "Submitting job to grid: ", gridID
 
1109
        cmd = "xgrid %s-gid %s -job submit %s" % \
 
1110
            (self._connection.connectString(), gridID, submitString)
 
1111
        jobinfo = xgridParse(cmd)
 
1112
        self._checkGridID(jobinfo, processedGridID) 
 
1113
        self.jobID = jobinfo['jobIdentifier']
 
1114
        print "Job submitted with id: ", self.jobID
 
1115
        return self.jobID
 
1116
        
 
1117
    def batch(self, specification, gridID=u'0', silent=False):
 
1118
        """Submits a batch job to the specified grid.
 
1119
        
 
1120
        This is a nonblocking job submission method used for submitting
 
1121
        complex multi-task jobs.  For single task jobs, use submit(). 
 
1122
        
 
1123
        To retrieve job results, use the results() method.
 
1124
         
 
1125
        @arg specification:
 
1126
            The job specification of the job, which must be an instance of the 
 
1127
            JobSpecification class.  See the docstring for JobSpecification 
 
1128
            for more details.
 
1129
        @type specification: JobSpecification
 
1130
        @arg gridID:
 
1131
            The identifier of the Grid to which the job will be submitted.  
 
1132
            If empty, the default grid u'0' is used.
 
1133
        @type gridID: unicode, str or int
 
1134
        @arg silent:
 
1135
            If set to True will slience all messages.
 
1136
        @type silent: boolean   
 
1137
        @returns: Initialized Job object for sumbitted job.
 
1138
        @rtype: Job        
 
1139
        """      
 
1140
 
 
1141
        if not isinstance(specification, JobSpecification):
 
1142
            raise XgridError
 
1143
 
 
1144
        processedGridID = processID(gridID)
 
1145
 
 
1146
        #job_dict = propertyListFromPythonCollection(specification.jobspec())
 
1147
        jobSpec = specification.jobSpec()
 
1148
        plistFile = tempfile.NamedTemporaryFile().name
 
1149
        jobSpec.writeToFile_atomically_(plistFile,1)
 
1150
        cmd = "xgrid %s-gid %s -job batch %s" % \
 
1151
            (self._connection.connectString(), processedGridID, plistFile)
 
1152
        jobinfo = xgridParse(cmd)
 
1153
        self._checkGridID(jobinfo, processedGridID)
 
1154
        self.jobID = jobinfo['jobIdentifier']
 
1155
        
 
1156
        if not silent:
 
1157
            print "Job submitted with id: ", self.jobID
 
1158
        
 
1159
        return self.jobID
 
1160
 
 
1161
 
 
1162
    # Job control methods
 
1163
            
 
1164
    def perform(self, action):
 
1165
        """Performs an action on a job.
 
1166
        
 
1167
        @arg action:
 
1168
            The action to be performed as a string.  Implemented actions
 
1169
            are stop, resume, delete, restart, and suspend.
 
1170
        @type action: str
 
1171
        """
 
1172
 
 
1173
        actions = ('stop', 'suspend', 'resume', 'delete', 'restart')
 
1174
        if action in actions:
 
1175
            cmd = 'xgrid %s-job %s -id %s' % \
 
1176
                (self._connection.connectString(), action, self.jobID)           
 
1177
            result = xgridParse(cmd)
 
1178
            self._checkJobID(result)
 
1179
            print "Action %s performed on job %s" % (action,self.jobID)
 
1180
            # If delete reset everything but the controller
 
1181
            if action == 'delete':
 
1182
                self.jobID = u'999999999'
 
1183
                self._specification = {}
 
1184
                self._info ={}
 
1185
        else:
 
1186
            raise InvalidAction(action)
 
1187
            
 
1188
    def stop(self):
 
1189
        """Stops the job."""
 
1190
        self.perform('stop')
 
1191
        
 
1192
    def suspend(self):
 
1193
        """Suspends the job."""
 
1194
        self.perform('suspend')
 
1195
        
 
1196
    def resume(self):
 
1197
        """Resumes the job."""
 
1198
        self.perform('resume')
 
1199
        
 
1200
    def delete(self):
 
1201
        """Deletes the job."""
 
1202
        self.perform('delete')
 
1203
 
 
1204
    def restart(self):
 
1205
        """Restarts the job."""
 
1206
        self.perform('restart')
 
1207
        
 
1208
    # Other methods
 
1209
    
 
1210
    def __repr__(self):
 
1211
        result = '<Job with jobID = %s>' % self.jobID
 
1212
        return result
 
1213
        
 
1214
    def printInfo(self,verbose=True):
 
1215
        "Prints the info() dictionary of a job."""
 
1216
        self._updateInfo()
 
1217
        if verbose == False:
 
1218
            output = "%-4s %-24s %-10s %-10s" % \
 
1219
                (self.jobID, self._info['dateStarted'],
 
1220
                 self._info['jobStatus'],
 
1221
                 self._info['activeCPUPower'])
 
1222
            print output
 
1223
        elif verbose == True:
 
1224
            print "{"
 
1225
            for key in self._info.keys():
 
1226
                print '   ', key, ': ', self._info[key]
 
1227
            print "}"
 
1228
    
 
1229
    def printSpecification(self):
 
1230
        """Print the job specification used to submit the job."""
 
1231
            
 
1232
        self._updateSpecification()
 
1233
        print "{"
 
1234
        for key in self._specification.keys():
 
1235
            print '   ', key, ': ', self._specification[key]
 
1236
        print "}"
 
1237
        
 
1238
class JobSpecification:
 
1239
    """A class used for constructing multi-task batch jobs."""
 
1240
    
 
1241
    def __init__(self):
 
1242
        """This class is used to setup the plist file for multi-task jobs.
 
1243
        """
 
1244
        self._jobDict = NSMutableDictionary.dictionaryWithCapacity_(10)
 
1245
        self._jobSpec = NSArray.arrayWithObject_(self._jobDict)
 
1246
        self._jobDict[u'taskSpecifications'] = {}
 
1247
        #self.tasks = []
 
1248
        self.nextTask = 0
 
1249
        self._jobDict[u'applicationIdentifier'] = u'PyXG'
 
1250
        self._jobDict[u'schedulerParameters'] = {}
 
1251
        self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] \
 
1252
            = u'NO'
 
1253
        
 
1254
    # Utility methods
 
1255
 
 
1256
    def _checkSchedulerParameters(self):
 
1257
        if not self._jobDict.has_key(u'schedulerParameters'):
 
1258
            self._jobDict[u'schedulerParameters'] = {}
 
1259
                            
 
1260
    def _checkInputFiles(self):
 
1261
        if not self._jobDict.has_key(u'inputFiles'):
 
1262
            self._jobDict[u'inputFiles'] = {}
 
1263
 
 
1264
    def jobSpec(self):
 
1265
        """Prints the full job specification dictionary."""
 
1266
        return self._jobSpec
 
1267
 
 
1268
    # Job/Task setup methods
 
1269
        
 
1270
    def setName(self, name):
 
1271
        """Set the name (a string) of the job."""
 
1272
        self._jobDict[u'name'] = unicode(name)
 
1273
        
 
1274
    def name(self):
 
1275
        """Returns the job name."""
 
1276
        return self._jobDict.get(u'name')
 
1277
        
 
1278
    def setEmail(self, email):
 
1279
        """Set the notification email for the batch job."""
 
1280
        self._jobDict[u'notificationEmail'] = unicode(email)
 
1281
        
 
1282
    def email(self):
 
1283
        """Returns the notification email."""
 
1284
        return self._jobDict.get(u'notificationEmail')
 
1285
        
 
1286
    def setTasksMustStartSimultaneously(self, simul):
 
1287
        """Sets the tasksMustStartSimultanously flag."""
 
1288
 
 
1289
        if(simul):
 
1290
            self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] = u'YES'
 
1291
        else:
 
1292
            self._jobDict[u'schedulerParameters'][u'tasksMustStartSimultaneously'] = u'NO'
 
1293
    
 
1294
    def tasksMustStartSimultaneously(self):
 
1295
        """Returns the value of tasksMustStartSimultaneously."""
 
1296
        return self._jobDict[u'schedulerParameters'].get(u'tasksMustStartSimultaneously')
 
1297
            
 
1298
    def setMinimumTaskCount(self,count):
 
1299
        """Sets the min number of tasks that should be started."""
 
1300
        #self._checkSchedulerParameters()
 
1301
        self._jobDict[u'schedulerParameters'][u'minimumTaskCount'] = count
 
1302
    
 
1303
    def minimumTaskCount(self):
 
1304
        """Returns the value of minimumTaskCount."""
 
1305
        return self._jobDict[u'schedulerParameters'].get(u'minimumTaskCount')
 
1306
        
 
1307
    def setDependsOnJobs(self,jobArray):
 
1308
        """Takes a list of Xgrid job ids that must complete before this job begins."""
 
1309
        #self._checkSchedulerParameters()
 
1310
        self._jobDict[u'schedulerParameters'][u'dependsOnJobs'] = \
 
1311
            [unicode(j) for j in jobArray]
 
1312
        
 
1313
    def dependsOnJobs(self):
 
1314
        """Returns the value of dependsOnJobs."""
 
1315
        return self._jobDict[u'schedulerParameters'].get(u'dependsOnJobs') 
 
1316
        
 
1317
    def addFile(self, localFilePath, fileName, isExecutable=0):
 
1318
        """Specifies a local file to copy to the Xgrid agents.
 
1319
        
 
1320
        This file is encoded into a base64 string and inserted into the
 
1321
        job specification dictionary.
 
1322
        
 
1323
        @arg localFilePath:
 
1324
            The full path of the file on the client (local) computer
 
1325
        @type localFilePath: unicode or str
 
1326
        @arg fileName:
 
1327
            The name to call the file on the agent
 
1328
        @type fileName: unicode or str
 
1329
        @arg isExecutable:
 
1330
            Set to 1 if the file should be executable
 
1331
        @type isExecutable: boolean
 
1332
        """
 
1333
        
 
1334
        assert os.path.isfile(localFilePath), "File does not exist: %s" % localFilePath
 
1335
        path = NSString.stringWithString_(unicode(localFilePath)).stringByStandardizingPath()
 
1336
        data = NSData.dataWithContentsOfFile_(path)
 
1337
        self._checkInputFiles()
 
1338
        if isExecutable:
 
1339
            isExecString = u'YES'
 
1340
        else:
 
1341
            isExecString = u'NO'
 
1342
        self._jobDict[u'inputFiles'][unicode(fileName)] = \
 
1343
            {u'fileData':data,u'isExecutable':isExecString}
 
1344
    
 
1345
    def delFile(self,fileName):
 
1346
        """Deletes the file named filename from the JobSpecification.
 
1347
        
 
1348
        List filenames using the flies() method.
 
1349
        """
 
1350
        if self._jobDict.has_key(u'inputFiles'):
 
1351
            if self._jobDict[u'inputFiles'].has_key(unicode(fileName)):
 
1352
                del self._jobDict[u'inputFiles'][unicode(fileName)]
 
1353
                
 
1354
    def files(self):
 
1355
        """Prints a list of included filenames."""
 
1356
        f = self._jobDict.get(u'inputFiles')
 
1357
        if f:
 
1358
            return f.keys()
 
1359
            
 
1360
    def addTask(self, cmd, args=u'', env={}, inputStream=u'', 
 
1361
        dependsOnTasks=[]):
 
1362
        """Adds a task to the jobSpecification.
 
1363
        
 
1364
        @arg cmd:  
 
1365
            The command the execute as a string.  The executable is not
 
1366
            copied if the full path is given, otherwise it is.
 
1367
        @type cmd: str            
 
1368
        @arg args:  
 
1369
            The command line arguments to be passed to the command.  
 
1370
        @type args: list or str
 
1371
        @arg env:
 
1372
            A Python dictionary of environment variables to use on the agents.
 
1373
        @type env: unicode or str
 
1374
        @arg inputStream:
 
1375
            A local file to send to the agents that will be used a stdin for the 
 
1376
            task
 
1377
        @type inputStream: unicode or str
 
1378
        @arg dependsOnTasks:
 
1379
            A list of task ids that must complete before this one begins
 
1380
        @type dependsOnTasks: list
 
1381
        """
 
1382
        taskSpec = {}
 
1383
        taskName = unicode('task%i' % self.nextTask)
 
1384
        self.nextTask += 1
 
1385
        
 
1386
        # Process the arguments
 
1387
        if isinstance(args,str) or isinstance(args,unicode):
 
1388
            argList = args.split(' ')
 
1389
        elif isinstance(args,list):
 
1390
            argList = args
 
1391
        else:
 
1392
            raise TypeError
 
1393
        
 
1394
        taskSpec[u'command'] = unicode(cmd)
 
1395
        if args:
 
1396
            taskSpec[u'arguments'] = [unicode(a) for a in argList]
 
1397
        if env:
 
1398
            taskSpec[u'environment'] = env
 
1399
        if inputStream:
 
1400
            taskSpec[u'inputStream'] = unicode(inputStream)
 
1401
        if dependsOnTasks:
 
1402
            taskSpec[u'dependsOnTasks'] = dependsOnTasks
 
1403
        self._jobDict[u'taskSpecifications'][taskName] = taskSpec
 
1404
        
 
1405
    def copyTask(self):
 
1406
        pass
 
1407
        
 
1408
    def delTask(self,task):
 
1409
        """Deletes the task named task.
 
1410
        
 
1411
        List the task names using the tasks() method.
 
1412
        """
 
1413
        if self._jobDict[u'taskSpecifications'].has_key(unicode(task)):
 
1414
            del self._jobDict[u'taskSpecifications'][unicode(task)]        
 
1415
        
 
1416
    def editTask(self):
 
1417
        pass
 
1418
        
 
1419
    def tasks(self):
 
1420
        """Return a list of the task names."""
 
1421
        return self._jobDict[u'taskSpecifications'].keys()    
 
1422
        
 
1423
    def printTasks(self):
 
1424
        """Print the task specifications of all tasks."""
 
1425
        for tid in self._jobDict[u'taskSpecifications'].keys():
 
1426
            print str(tid) + "  " + str(self._jobDict[u'taskSpecifications'][tid])
 
1427