1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Defines ETL job with ETL components.
24
25 Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
26 GNU General Public License.
27 """
28 from signal import signal
29 import time
30 import logger
31 import pickle
32 import datetime
33
35 """
36 Base class of ETL job.
37 """
38
39 - def __init__(self, components=[], name='job'):
40 super(job, self).__init__()
41 self.name = name
42 self._components = components
43 self._cache = {}
44 for component in self._components:
45 component.job = self
46 self.status = 'open'
47 self.pickle = False
48 self.logger = logger.logger()
49
51 res = '<Job name="%s" status="%s">' % (self.name, self.status)
52 components = []
53 trans = []
54 for component in self.get_components():
55 res += "\n" + str(component)
56 for transition in self.get_transitions():
57 res += "\n" + str(transition)
58 return res
59
61 """
62 Copy job instance.
63 """
64 new_outputs = []
65 new_transitions = []
66 new_components = {}
67 for transition in self.get_transitions():
68 new_tra=transition.copy()
69 if transition.source not in new_components:
70 new_components[transition.source] = transition.source.copy()
71 if transition.destination not in new_components:
72 new_components[transition.destination] = transition.destination.copy()
73
74 new_tra.source = new_components[new_tra.source]
75 new_tra.destination = new_components[new_tra.destination]
76 new_tra.destination.trans_in.append((new_tra.channel_destination, new_tra))
77 new_tra.source.trans_out.append((new_tra.channel_source, new_tra))
78 new_transitions.append(new_tra)
79
80 res = job(new_components.values(), self.name + '(copy)')
81 self.signal('copy', {'date': datetime.datetime.today()})
82 return res
83
85 return self.__copy__()
86
88 return self._components
89
92
94 transitions = []
95 for component in self.get_components():
96 for channel, tran_in in component.trans_in:
97 if tran_in not in transitions:
98 transitions.append(tran_in)
99 for channel, tran_out in component.trans_out:
100 if tran_out not in transitions:
101 transitions.append(tran_out)
102 return transitions
103
104
106 for tran in self.get_transitions():
107 tran.pause()
108 self.status = 'pause'
109 self.signal('pause', {'date': datetime.datetime.today()})
110
111
113 for tran in self.get_transitions():
114 tran.restart()
115
116 self.status = 'start'
117 self.signal('restart', {'date': datetime.datetime.today()})
118
125
127 self.status = 'end'
128 self.signal('end', {'date': datetime.datetime.today()})
129
130
133
135 self.status = 'close'
136
138 for tran in self.get_transitions():
139 tran.stop()
140 self.status = 'stop'
141 self.signal('stop', {'date': datetime.datetime.today()})
142
149
150
151
153 """
154 Store job instance into pickle object.
155 """
156 return pickle.dumps(self)
157
158 - def read(self,value):
159 """
160 Read job instance value from pickle object.
161 Parameter
162 value - pickle value
163 """
164 return pickle.load(value)
165
167 self.register_actions()
168 if self.pickle:
169 job = self.read(self.pickle)
170 job.restart()
171 job.end()
172 else:
173 self.start()
174 self.end()
175
177 stat_info = 'Statistical Information (process time in microsec):\n'
178 stat_info += '======================================\n'
179 stat_info += 'Job : %s\n' %(self.name)
180 stat_info += '-------------------\n'
181 stat_info += 'Start : %s\n' %(self._cache['start_date'])
182 stat_info += 'End : %s\n' %(self._cache['end_date'])
183 stat_info += 'Total Process time : %s\n' %(self._cache['process_time'])
184 for component in self.get_components():
185 stat_info += '\nComponent : %s\n'%(component)
186 stat_info += '---------------------------------\n'
187 stat_info += 'Start : %s\n' %(component._cache['start_date'])
188 stat_info += 'End : %s\n' %(component._cache['end_date'])
189 stat_info += 'Total Process time : %s\n' %(component._cache['process_time'])
190 for trans,value in component._cache['trans'].items():
191 stat_info += '\nOut Transition : %s\n'%trans
192 stat_info += '---------------------------------\n'
193 stat_info += 'Total Inputs : %s\n'%value.get('total_inputs',0)
194 stat_info += 'Total Outputs : %s\n'%value.get('total_outputs',0)
195 stat_info += 'Total Input Process Time : %s\n'%value.get('input_process_time',0)
196 stat_info += 'Total Output Process Time : %s\n'%value.get('output_process_time',0)
197 stat_info += 'Input Process Time per Record : %s\n'%value.get('input_process_time_per_record',0)
198 stat_info += 'Output Process Time per Record : %s\n'%value.get('output_process_time_per_record',0)
199 return stat_info
200
209
211 job.signal_connect(job, 'start', self.action_job_start)
212 job.signal_connect(job, 'pause', self.action_job_pause)
213 job.signal_connect(job, 'stop', self.action_job_stop)
214 job.signal_connect(job, 'end', self.action_job_end)
215 job.signal_connect(job, 'copy', self.action_job_copy)
216
218 component.signal_connect(component, 'start', self.action_component_start)
219 component.signal_connect(component, 'start_input', self.action_component_start_input)
220 component.signal_connect(component, 'start_output', self.action_component_start_output)
221 component.signal_connect(component, 'get_input', self.action_component_get_input)
222 component.signal_connect(component, 'send_output', self.action_component_send_output)
223 component.signal_connect(component, 'no_input', self.action_component_no_input)
224 component.signal_connect(component, 'stop', self.action_component_stop)
225 component.signal_connect(component, 'end', self.action_component_end)
226 component.signal_connect(component, 'error', self.action_component_error)
227 component.signal_connect(component, 'warning', self.action_component_warning)
228
233
234
236 transition.signal_connect(transition, 'start', self.action_transition_start)
237 transition.signal_connect(transition, 'pause', self.action_transition_pause)
238 transition.signal_connect(transition, 'stop', self.action_transition_stop)
239 transition.signal_connect(transition, 'end', self.action_transition_end)
240
241
243 self.logger.notifyChannel("job", logger.LOG_INFO,
244 'the <' + key.name + '> has started now...')
245 key._cache['start_date'] = signal_data.get('date',False)
246 return True
247
249 self.logger.notifyChannel("job", logger.LOG_INFO,
250 'the <' + key.name + '> has started now...')
251 return True
252
254 self.logger.notifyChannel("job", logger.LOG_INFO,
255 'the <' + key.name + '> is paused now...')
256 return True
257
262
264 self.logger.notifyChannel("job", logger.LOG_INFO,
265 'the <' + key.name + '> has ended now...')
266 start_time = key._cache.get('start_date',False)
267 current_time = signal_data.get('date',datetime.datetime.today())
268 diff = 0
269 if current_time and start_time:
270 diff = (current_time - start_time).microseconds
271
272 key._cache['end_date'] = current_time
273 key._cache['process_time'] = diff
274 return True
275
277 self.logger.notifyChannel("job", logger.LOG_INFO,
278 'the <' + key.name + '> is coping now...')
279 return True
280
282 self.logger.notifyChannel("connector", logger.LOG_INFO,
283 'the <' + key.name + '> is open now...')
284 return True
285
290
295
296
298 self.logger.notifyChannel("component", logger.LOG_INFO,
299 'the <' + key.name + '> has started now...')
300 key._cache['start_date'] = signal_data.get('date',False)
301 return True
302
304 if 'trans' not in key._cache:
305 key._cache['trans'] = {}
306 value = key._cache['trans']
307 trans = signal_data.get('trans',False)
308 if trans not in value:
309 value[trans] = {}
310 value[trans].update({'start_input' : signal_data.get('date',False)})
311 return True
312
314 if 'trans' not in key._cache:
315 key._cache['trans'] = {}
316 value = key._cache['trans']
317 trans = signal_data.get('trans',False)
318 if trans not in value:
319 value[trans] = {}
320 value[trans].update({'start_output' : signal_data.get('date',False)})
321 return True
322
324 if 'trans' not in key._cache:
325 key._cache['trans'] = {}
326 value = key._cache['trans']
327
328 trans = signal_data.get('trans',False)
329 total = value[trans].get('total_inputs',0)
330 total += 1
331 start_time = value[trans].get('start_input',False)
332 current_time = signal_data.get('date',datetime.datetime.today())
333 diff = 0
334 if current_time and start_time:
335 diff = (current_time - start_time).microseconds
336 process_per_record = 0
337 if total :
338 process_per_record = diff / total
339 value[trans].update({'total_inputs' : total, 'input_process_time' : diff ,'input_process_time_per_record' : process_per_record})
340 return True
341
343 if 'trans' not in key._cache:
344 key._cache['trans'] = {}
345 value = key._cache['trans']
346
347 trans = signal_data.get('trans',False)
348 total = value[trans].get('total_outputs',0)
349 total += 1
350 start_time = value[trans].get('start_output',False)
351 current_time = signal_data.get('date',datetime.datetime.today())
352 diff = 0
353 if current_time and start_time:
354 diff = (current_time - start_time).microseconds
355 process_per_record = 0
356 if total :
357 process_per_record = diff / total
358 value[trans].update({'total_outputs' : total, 'output_process_time' : diff ,'output_process_time_per_record' : process_per_record})
359 return True
360
365
367 self.logger.notifyChannel("component", logger.LOG_INFO,
368 'the <' + key.name + '> has stopped now...')
369 return True
370
372 self.logger.notifyChannel("component", logger.LOG_INFO,
373 'the <' + key.name + '> has ended now...')
374
375 value = key._cache
376 start_time = value.get('start_date',False)
377 current_time = signal_data.get('date',datetime.datetime.today())
378 diff = 0
379 if current_time and start_time:
380 diff = (current_time - start_time).microseconds
381 value.update({'end_date' : current_time, 'process_time' : diff})
382 return True
383
388
390 self.logger.notifyChannel("component", logger.LOG_WARNING,
391 '<' + key.name + '> : '+signal_data.get('message', False))
392 return True
393
395 self.logger.notifyChannel("transition", logger.LOG_INFO,
396 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name))
397 return True
398
400 self.logger.notifyChannel("transition", logger.LOG_INFO,
401 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name))
402 return True
403
405 self.logger.notifyChannel("transition", logger.LOG_INFO,
406 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name))
407 return True
408
410 self.logger.notifyChannel("transition", logger.LOG_INFO,
411 'the <%s> to <%s> has started now...'%(key.source.name, key.destination.name))
412 return True
413