Package etl :: Module job
[hide private]
[frames] | no frames]

Source Code for Module etl.job

  1  # -*- encoding: utf-8 -*- 
  2  ############################################################################## 
  3  # 
  4  #    ETL system- Extract Transfer Load system 
  5  #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved 
  6  #    $Id$ 
  7  # 
  8  #    This program is free software: you can redistribute it and/or modify 
  9  #    it under the terms of the GNU General Public License as published by 
 10  #    the Free Software Foundation, either version 3 of the License, or 
 11  #    (at your option) any later version. 
 12  # 
 13  #    This program is distributed in the hope that it will be useful, 
 14  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  #    GNU General Public License for more details. 
 17  # 
 18  #    You should have received a copy of the GNU General Public License 
 19  #    along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 20  # 
 21  ############################################################################## 
 22  """ 
 23   Defines ETL job with ETL components. 
 24   
 25   Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).  
 26   GNU General Public License. 
 27  """ 
 28  from signal import signal 
 29  import time 
 30  import logger 
 31  import pickle 
 32  import datetime 
 33   
34 -class job(signal):
35 """ 36 Base class of ETL job. 37 """ 38
39 - def __init__(self, components=[], name='job'):
40 super(job, self).__init__() 41 self.name = name 42 self._components = components 43 self._cache = {} 44 for component in self._components: 45 component.job = self 46 self.status = 'open' # open, start, pause, stop, close 47 self.pickle = False 48 self.logger = logger.logger()
49
50 - def __str__(self):
51 res = '<Job name="%s" status="%s">' % (self.name, self.status) 52 components = [] 53 trans = [] 54 for component in self.get_components(): 55 res += "\n" + str(component) 56 for transition in self.get_transitions(): 57 res += "\n" + str(transition) 58 return res 59
60 - def __copy__(self):
61 """ 62 Copy job instance. 63 """ 64 new_outputs = [] 65 new_transitions = [] 66 new_components = {} 67 for transition in self.get_transitions(): 68 new_tra=transition.copy() 69 if transition.source not in new_components: 70 new_components[transition.source] = transition.source.copy() 71 if transition.destination not in new_components: 72 new_components[transition.destination] = transition.destination.copy() 73 74 new_tra.source = new_components[new_tra.source] 75 new_tra.destination = new_components[new_tra.destination] 76 new_tra.destination.trans_in.append((new_tra.channel_destination, new_tra)) 77 new_tra.source.trans_out.append((new_tra.channel_source, new_tra)) 78 new_transitions.append(new_tra) 79 80 res = job(new_components.values(), self.name + '(copy)') 81 self.signal('copy', {'date': datetime.datetime.today()}) 82 return res
83
84 - def copy(self):
85 return self.__copy__() 86
87 - def get_components(self):
88 return self._components
89
90 - def add_component(self, component):
91 self._componets.append(component)
92
93 - def get_transitions(self):
94 transitions = [] 95 for component in self.get_components(): 96 for channel, tran_in in component.trans_in: 97 if tran_in not in transitions: 98 transitions.append(tran_in) 99 for channel, tran_out in component.trans_out: 100 if tran_out not in transitions: 101 transitions.append(tran_out) 102 return transitions
103 104
105 - def pause(self):
106 for tran in self.get_transitions(): 107 tran.pause() 108 self.status = 'pause' 109 self.signal('pause', {'date': datetime.datetime.today()}) 110 111
112 - def restart(self):
113 for tran in self.get_transitions(): 114 tran.restart() 115 116 self.status = 'start' 117 self.signal('restart', {'date': datetime.datetime.today()}) 118
119 - def start(self):
120 self.status = 'start' 121 self.signal('start', {'date': datetime.datetime.today()}) 122 for c in self.get_end_components(): 123 for a in c.channel_get(): 124 pass
125
126 - def end(self):
127 self.status = 'end' 128 self.signal('end', {'date': datetime.datetime.today()})
129 130
131 - def open(self):
132 self.status = 'open'
133
134 - def close(self):
135 self.status = 'close'
136
137 - def stop(self):
138 for tran in self.get_transitions(): 139 tran.stop() 140 self.status = 'stop' 141 self.signal('stop', {'date': datetime.datetime.today()})
142
143 - def get_end_components(self):
144 end_components = [] 145 for component in self.get_components(): 146 if component.is_end(): 147 end_components.append(component) 148 return end_components
149 150 #TODO : make separate class like job.process and put below functions in new class 151
152 - def write(self):
153 """ 154 Store job instance into pickle object. 155 """ 156 return pickle.dumps(self)
157
158 - def read(self,value):
159 """ 160 Read job instance value from pickle object. 161 Parameter 162 value - pickle value 163 """ 164 return pickle.load(value) 165
166 - def run(self):
167 self.register_actions() 168 if self.pickle: 169 job = self.read(self.pickle) 170 job.restart() 171 job.end() 172 else: 173 self.start() 174 self.end() 175
176 - def get_statitic_info(self):
177 stat_info = 'Statistical Information (process time in microsec):\n' 178 stat_info += '======================================\n' 179 stat_info += 'Job : %s\n' %(self.name) 180 stat_info += '-------------------\n' 181 stat_info += 'Start : %s\n' %(self._cache['start_date']) 182 stat_info += 'End : %s\n' %(self._cache['end_date']) 183 stat_info += 'Total Process time : %s\n' %(self._cache['process_time']) 184 for component in self.get_components(): 185 stat_info += '\nComponent : %s\n'%(component) 186 stat_info += '---------------------------------\n' 187 stat_info += 'Start : %s\n' %(component._cache['start_date']) 188 stat_info += 'End : %s\n' %(component._cache['end_date']) 189 stat_info += 'Total Process time : %s\n' %(component._cache['process_time']) 190 for trans,value in component._cache['trans'].items(): 191 stat_info += '\nOut Transition : %s\n'%trans 192 stat_info += '---------------------------------\n' 193 stat_info += 'Total Inputs : %s\n'%value.get('total_inputs',0) 194 stat_info += 'Total Outputs : %s\n'%value.get('total_outputs',0) 195 stat_info += 'Total Input Process Time : %s\n'%value.get('input_process_time',0) 196 stat_info += 'Total Output Process Time : %s\n'%value.get('output_process_time',0) 197 stat_info += 'Input Process Time per Record : %s\n'%value.get('input_process_time_per_record',0) 198 stat_info += 'Output Process Time per Record : %s\n'%value.get('output_process_time_per_record',0) 199 return stat_info
200
201 - def register_actions(self):
202 self.register_actions_job(self) 203 for component in self.get_components(): 204 self.register_actions_component(component) 205 if component.connector: 206 self.register_actions_connector(component.connector) 207 for transition in self.get_transitions(): 208 self.register_actions_transition(transition)
209
210 - def register_actions_job(self, job):
211 job.signal_connect(job, 'start', self.action_job_start) 212 job.signal_connect(job, 'pause', self.action_job_pause) 213 job.signal_connect(job, 'stop', self.action_job_stop) 214 job.signal_connect(job, 'end', self.action_job_end) 215 job.signal_connect(job, 'copy', self.action_job_copy)
216
217 - def register_actions_component(self, component):
218 component.signal_connect(component, 'start', self.action_component_start) 219 component.signal_connect(component, 'start_input', self.action_component_start_input) 220 component.signal_connect(component, 'start_output', self.action_component_start_output) 221 component.signal_connect(component, 'get_input', self.action_component_get_input) 222 component.signal_connect(component, 'send_output', self.action_component_send_output) 223 component.signal_connect(component, 'no_input', self.action_component_no_input) 224 component.signal_connect(component, 'stop', self.action_component_stop) 225 component.signal_connect(component, 'end', self.action_component_end) 226 component.signal_connect(component, 'error', self.action_component_error) 227 component.signal_connect(component, 'warning', self.action_component_warning) 228
229 - def register_actions_connector(self, connector):
230 connector.signal_connect(connector, 'open', self.action_connector_open) 231 connector.signal_connect(connector, 'close', self.action_connector_close) 232 connector.signal_connect(connector, 'error', self.action_connector_error)
233 234
235 - def register_actions_transition(self, transition):
236 transition.signal_connect(transition, 'start', self.action_transition_start) 237 transition.signal_connect(transition, 'pause', self.action_transition_pause) 238 transition.signal_connect(transition, 'stop', self.action_transition_stop) 239 transition.signal_connect(transition, 'end', self.action_transition_end) 240 241
242 - def action_job_start(self, key, signal_data={}, data={}):
243 self.logger.notifyChannel("job", logger.LOG_INFO, 244 'the <' + key.name + '> has started now...') 245 key._cache['start_date'] = signal_data.get('date',False) 246 return True 247
248 - def action_job_restart(self, key, signal_data={}, data={}):
249 self.logger.notifyChannel("job", logger.LOG_INFO, 250 'the <' + key.name + '> has started now...') 251 return True 252
253 - def action_job_pause(self, key, signal_data={}, data={}):
254 self.logger.notifyChannel("job", logger.LOG_INFO, 255 'the <' + key.name + '> is paused now...') 256 return True 257
258 - def action_job_stop(self, key, signal_data={}, data={}):
259 self.logger.notifyChannel("job", logger.LOG_INFO, 260 'the <' + key.name + '> has stopped now...') 261 return True
262
263 - def action_job_end(self, key, signal_data={}, data={}):
264 self.logger.notifyChannel("job", logger.LOG_INFO, 265 'the <' + key.name + '> has ended now...') 266 start_time = key._cache.get('start_date',False) 267 current_time = signal_data.get('date',datetime.datetime.today()) 268 diff = 0 269 if current_time and start_time: 270 diff = (current_time - start_time).microseconds 271 272 key._cache['end_date'] = current_time 273 key._cache['process_time'] = diff 274 return True 275
276 - def action_job_copy(self, key, signal_data={}, data={}):
277 self.logger.notifyChannel("job", logger.LOG_INFO, 278 'the <' + key.name + '> is coping now...') 279 return True 280
281 - def action_connector_open(self, key, signal_data={}, data={}):
282 self.logger.notifyChannel("connector", logger.LOG_INFO, 283 'the <' + key.name + '> is open now...') 284 return True 285
286 - def action_connector_close(self, key, signal_data={}, data={}):
287 self.logger.notifyChannel("connector", logger.LOG_INFO, 288 'the <' + key.name + '> is closed now...') 289 return True
290
291 - def action_connector_error(self, key, signal_data={}, data={}):
292 self.logger.notifyChannel("connector", logger.LOG_ERROR, 293 '<' + key.name + '> : '+signal_data.get('message', False)) 294 return True
295 296
297 - def action_component_start(self, key, signal_data={}, data={}):
298 self.logger.notifyChannel("component", logger.LOG_INFO, 299 'the <' + key.name + '> has started now...') 300 key._cache['start_date'] = signal_data.get('date',False) 301 return True
302
303 - def action_component_start_input(self, key, signal_data={}, data={}):
304 if 'trans' not in key._cache: 305 key._cache['trans'] = {} 306 value = key._cache['trans'] 307 trans = signal_data.get('trans',False) 308 if trans not in value: 309 value[trans] = {} 310 value[trans].update({'start_input' : signal_data.get('date',False)}) 311 return True 312
313 - def action_component_start_output(self, key, signal_data={}, data={}):
314 if 'trans' not in key._cache: 315 key._cache['trans'] = {} 316 value = key._cache['trans'] 317 trans = signal_data.get('trans',False) 318 if trans not in value: 319 value[trans] = {} 320 value[trans].update({'start_output' : signal_data.get('date',False)}) 321 return True 322
323 - def action_component_get_input(self, key, signal_data={}, data={}):
324 if 'trans' not in key._cache: 325 key._cache['trans'] = {} 326 value = key._cache['trans'] 327 328 trans = signal_data.get('trans',False) 329 total = value[trans].get('total_inputs',0) 330 total += 1 331 start_time = value[trans].get('start_input',False) 332 current_time = signal_data.get('date',datetime.datetime.today()) 333 diff = 0 334 if current_time and start_time: 335 diff = (current_time - start_time).microseconds 336 process_per_record = 0 337 if total : 338 process_per_record = diff / total 339 value[trans].update({'total_inputs' : total, 'input_process_time' : diff ,'input_process_time_per_record' : process_per_record}) 340 return True 341
342 - def action_component_send_output(self, key, signal_data={}, data={}):
343 if 'trans' not in key._cache: 344 key._cache['trans'] = {} 345 value = key._cache['trans'] 346 347 trans = signal_data.get('trans',False) 348 total = value[trans].get('total_outputs',0) 349 total += 1 350 start_time = value[trans].get('start_output',False) 351 current_time = signal_data.get('date',datetime.datetime.today()) 352 diff = 0 353 if current_time and start_time: 354 diff = (current_time - start_time).microseconds 355 process_per_record = 0 356 if total : 357 process_per_record = diff / total 358 value[trans].update({'total_outputs' : total, 'output_process_time' : diff ,'output_process_time_per_record' : process_per_record}) 359 return True 360
361 - def action_component_no_input(self, key, signal_data={}, data={}):
362 self.logger.notifyChannel("component", logger.LOG_WARNING, 363 'the <' + key.name + '> has no input data...') 364 return True
365
366 - def action_component_stop(self, key, signal_data={}, data={}):
367 self.logger.notifyChannel("component", logger.LOG_INFO, 368 'the <' + key.name + '> has stopped now...') 369 return True 370
371 - def action_component_end(self, key, signal_data={}, data={}):
372 self.logger.notifyChannel("component", logger.LOG_INFO, 373 'the <' + key.name + '> has ended now...') 374 375 value = key._cache 376 start_time = value.get('start_date',False) 377 current_time = signal_data.get('date',datetime.datetime.today()) 378 diff = 0 379 if current_time and start_time: 380 diff = (current_time - start_time).microseconds 381 value.update({'end_date' : current_time, 'process_time' : diff}) 382 return True
383
384 - def action_component_error(self, key, signal_data={}, data={}):
385 self.logger.notifyChannel("component", logger.LOG_ERROR, 386 '<' + key.name + '> : ' + signal_data.get('message', 'False')) 387 return True
388
389 - def action_component_warning(self, key, signal_data={}, data={}):
390 self.logger.notifyChannel("component", logger.LOG_WARNING, 391 '<' + key.name + '> : '+signal_data.get('message', False)) 392 return True 393
394 - def action_transition_start(self, key, signal_data={}, data={}):
395 self.logger.notifyChannel("transition", logger.LOG_INFO, 396 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name)) 397 return True 398
399 - def action_transition_pause(self, key, signal_data={}, data={}):
400 self.logger.notifyChannel("transition", logger.LOG_INFO, 401 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name)) 402 return True 403
404 - def action_transition_stop(self, key, signal_data={}, data={}):
405 self.logger.notifyChannel("transition", logger.LOG_INFO, 406 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name)) 407 return True 408
409 - def action_transition_end(self, key, signal_data={}, data={}):
410 self.logger.notifyChannel("transition", logger.LOG_INFO, 411 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name)) 412 return True 413