63
63
MausTransform.initialize(MausConfiguration.transform)
64
64
MausTransform.birth(MausConfiguration.configuration)
66
# Bind the callback method to the Celery worker_process_init signal.
66
67
worker_process_init.connect(worker_process_init_callback)
68
def process_birth(config_id, transform, configuration):
69
def process_birth(pids, config_id, transform, configuration):
70
71
Create and birth a new transform. This is invoked in a sub-process
71
72
via a call from the Celery master process. Any existing transform
72
is death-ed first. If the new configuration ID is equal to the
73
current configuration ID then this is a no-op.
74
@aram pids List of process IDs whose process_birth method has been
75
invoked. If this process is in the list then this method just returns
74
77
@param config_id Configuration ID from client.
75
78
@param transform Either a single name can be given - representing
76
79
a single transform - or a list of transforms - representing a
77
80
MapPyGroup. Sub-lists are treated as nested MapPyGroups. If None
78
then the current transform isdeathed and rebirthed.
81
then the current transform is deathed and rebirthed.
79
82
@param configuration Valid JSON configuration document.
80
83
@return status of (PID, None) if all went well or (PID,
81
84
{"error":ERROR, "message":MESSAGE}) if an exception arose. PID is
86
89
logger = logging.getLogger(__name__)
87
# Only update if the configuration config_id is new.
88
if (MausConfiguration.config_id != config_id):
90
# Check if processed already.
91
if (not os.getpid() in pids):
90
93
if logger.isEnabledFor(logging.INFO):
91
94
logger.info("Birthing transform %s" % transform)
103
106
logger.debug("Status: %s " % status)
104
107
return (os.getpid(), status)
109
def process_death(pids):
108
111
Execute death on the current transform. This is invoked in a
109
112
sub-process via a call from the Celery master process. If death
110
113
has already been invoked then this is a no-op.
114
@aram pids List of process IDs whose process_birth method has been
115
invoked. If this process is in the list then this method just returns
111
117
@return status of (PID, None) if all went well or (PID,
112
118
{"error":ERROR, "message":MESSAGE}) if an exception arose. PID is
113
119
the sub-process ID. This lets the master process know that the
117
123
logger = logging.getLogger(__name__)
118
# Only call if the transform is not already dead.
119
if (not MausTransform.is_dead):
121
if logger.isEnabledFor(logging.INFO):
122
logger.info("Deathing transform")
123
MausTransform.death()
124
except Exception as exc: # pylint:disable = W0703
126
status["error"] = str(exc.__class__)
127
status["message"] = str(exc)
124
# Check if processed already.
125
if (not os.getpid() in pids):
126
# Only call if the transform is not already dead.
127
if (not MausTransform.is_dead):
129
if logger.isEnabledFor(logging.INFO):
130
logger.info("Deathing transform")
131
MausTransform.death()
132
except Exception as exc: # pylint:disable = W0703
134
status["error"] = str(exc.__class__)
135
status["message"] = str(exc)
128
136
if logger.isEnabledFor(logging.DEBUG):
129
137
logger.debug("Status: %s " % status)
130
138
return (os.getpid(), status)