~sophie-middleton08/maus/devel

« back to all changes in this revision

Viewing changes to src/common_py/mauscelery/mausprocess.py

  • Committer: Chris Rogers
  • Date: 2012-04-16 13:33:25 UTC
  • mfrom: (659.1.17 release-candidate)
  • Revision ID: chris.rogers@stfc.ac.uk-20120416133325-11tyj7pb7xs7m37m
ReleaseĀ 0.2.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
    MausTransform.initialize(MausConfiguration.transform)
64
64
    MausTransform.birth(MausConfiguration.configuration)
65
65
 
 
66
# Bind the callback method to the Celery worker_process_init signal.
66
67
worker_process_init.connect(worker_process_init_callback) 
67
68
 
68
 
def process_birth(config_id, transform, configuration):
 
69
def process_birth(pids, config_id, transform, configuration):
69
70
    """
70
71
    Create and birth a new transform. This is invoked in a sub-process
71
72
    via a call from the Celery master process. Any existing transform
72
 
    is death-ed first. If the new configuration ID is equal to the
73
 
    current configuration ID then this is a no-op.
 
73
    is death-ed first. 
 
74
    @aram pids List of process IDs whose process_birth method has been
 
75
    invoked. If this process is in the list then this method just returns
 
76
    (PID, None).
74
77
    @param config_id Configuration ID from client.
75
78
    @param transform Either a single name can be given - representing
76
79
    a single transform - or a list of transforms - representing a
77
80
    MapPyGroup. Sub-lists are treated as nested MapPyGroups. If None
78
 
    then the current transform isdeathed and rebirthed.  
 
81
    then the current transform is deathed and rebirthed.  
79
82
    @param configuration Valid JSON configuration document.
80
83
    @return status of (PID, None) if all went well or (PID,
81
84
    {"error":ERROR, "message":MESSAGE}) if an exception arose. PID is
84
87
    """
85
88
    status = None
86
89
    logger = logging.getLogger(__name__)
87
 
    # Only update if the configuration config_id is new.
88
 
    if (MausConfiguration.config_id != config_id):
 
90
    # Check if processed already.  
 
91
    if (not os.getpid() in pids):
89
92
        try:
90
93
            if logger.isEnabledFor(logging.INFO):
91
94
                logger.info("Birthing transform %s" % transform)
103
106
        logger.debug("Status: %s " % status)
104
107
    return (os.getpid(), status)
105
108
 
106
 
def process_death():
 
109
def process_death(pids):
107
110
    """
108
111
    Execute death on the current transform. This is invoked in a
109
112
    sub-process via a call from the Celery master process. If death
110
113
    has already been invoked then this is a no-op. 
 
114
    @aram pids List of process IDs whose process_birth method has been
 
115
    invoked. If this process is in the list then this method just returns
 
116
    (PID, None).
111
117
    @return status of (PID, None) if all went well or (PID,
112
118
    {"error":ERROR, "message":MESSAGE}) if an exception arose. PID is
113
119
    the sub-process ID. This lets the master process know that the
115
121
    """
116
122
    status = None
117
123
    logger = logging.getLogger(__name__)
118
 
    # Only call if the transform is not already dead.
119
 
    if (not MausTransform.is_dead):
120
 
        try:
121
 
            if logger.isEnabledFor(logging.INFO):
122
 
                logger.info("Deathing transform")
123
 
            MausTransform.death() 
124
 
        except Exception as exc: # pylint:disable = W0703
125
 
            status = {}
126
 
            status["error"] = str(exc.__class__)
127
 
            status["message"] = str(exc)
 
124
    # Check if processed already.
 
125
    if (not os.getpid() in pids):
 
126
        # Only call if the transform is not already dead.
 
127
        if (not MausTransform.is_dead):
 
128
            try:
 
129
                if logger.isEnabledFor(logging.INFO):
 
130
                    logger.info("Deathing transform")
 
131
                MausTransform.death() 
 
132
            except Exception as exc: # pylint:disable = W0703
 
133
                status = {}
 
134
                status["error"] = str(exc.__class__)
 
135
                status["message"] = str(exc)
128
136
    if logger.isEnabledFor(logging.DEBUG):
129
137
        logger.debug("Status: %s " % status)
130
138
    return (os.getpid(), status)