1
1
# -*- encoding: utf-8 -*-
2
2
#########################################################################
4
# Copyright (C) 2010 Sébastien Beau #
6
#This program is free software: you can redistribute it and/or modify #
7
#it under the terms of the GNU General Public License as published by #
8
#the Free Software Foundation, either version 3 of the License, or #
9
#(at your option) any later version. #
11
#This program is distributed in the hope that it will be useful, #
12
#but WITHOUT ANY WARRANTY; without even the implied warranty of #
13
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
14
#GNU General Public License for more details. #
16
#You should have received a copy of the GNU General Public License #
17
#along with this program. If not, see <http://www.gnu.org/licenses/>. #
4
# Kettle connector for OpenERP
5
# Copyright (C) 2010 Sébastien Beau <sebastien.beau@akretion.com>
6
# Copyright (C) 2011 Akretion (http://www.akretion.com). All Rights Reserved
7
# @author Alexis de Lattre <alexis.delattre@akretion.com> : some enhancements
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# GNU Affero General Public License for more details.
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
22
#########################################################################
25
# Use this to create KTR temp file http://docs.python.org/dev/library/tempfile.html
26
# When starting the transfo by the wizard, switch to a scroll bar, like when we do an "update modules" in the administration menu
27
# Can we avoid the "Read from filesystem" bool on the kettle.transfo obj ?
20
29
from osv import fields,osv
26
36
from installer import installer
28
38
from tools import config
39
from tools.translate import _
30
41
class kettle_server(osv.osv):
31
42
_name = 'kettle.server'
32
43
_description = 'kettle server'
34
45
def button_install(self, cr, uid, ids, context=None):
36
47
inst.install(self.read(cr, uid, ids, ['kettle_dir'])[0]['kettle_dir'].replace('data-integration', ''))
39
50
def button_update_terminatooor(self, cr, uid, ids, context=None):
41
52
inst.update_terminatoor(self.read(cr, uid, ids, ['kettle_dir'])[0]['kettle_dir'].replace('data-integration', ''))
45
'name': fields.char('Server Name', size=64, required=True),
46
'kettle_dir': fields.char('Kettle Directory', size=255, required=True),
56
'name': fields.char('Server name', size=64, required=True),
57
'kettle_dir': fields.char('Kettle installation directory', size=256, required=True),
47
58
'url': fields.char('Kettle URL', size=64, required=True, help='URL of Kettle server if any (can be localhost)'),
48
'transformation': fields.one2many('kettle.transformation', 'server_id', 'Transformation'),
49
'user': fields.char('Kettle Server User', size=32),
50
'password': fields.char('Kettle Server Password', size=32),
59
'user': fields.char('Kettle server user', size=32),
60
'password': fields.char('Kettle server password', size=32),
54
64
'kettle_dir': lambda *a: tools.config['addons_path'].replace('/addons', '/data-integration'),
59
71
class kettle_transformation(osv.osv):
60
72
_name = 'kettle.transformation'
61
73
_description = 'kettle transformation'
64
'name': fields.char('Transformation Name', size=64, required=True),
65
'server_id': fields.many2one('kettle.server', 'Server', required=True),
76
'name': fields.char('Transformation / Job name', size=64, required=True),
66
77
'file': fields.binary('File'),
67
'filename': fields.char('File Name', size=64),
78
'read_from_filesystem': fields.boolean('Read from filesystem', help="If active, OpenERP will read the file from the filesystem. Otherwise, it will read the file from the 'File' field."),
79
'filename': fields.char('Filename', size=128, help="If the Kettle file is attached, enter the filename. If the Kettle file is read from the filesystem, enter the relative or absolute path to the file."),
70
def error_wizard(self, cr, uid, id, context):
71
error_description = self.pool.get('ir.attachment').read(cr, uid, id, ['description'], context)['description']
72
if error_description and "USER_ERROR" in error_description:
73
raise osv.except_osv('USER_ERROR', error_description)
75
raise osv.except_osv('KETTLE ERROR', 'An error occurred, please look in the kettle log')
77
def execute_transformation(self, cr, uid, id, log_file_name, attachment_id, context):
78
transfo = self.browse(cr, uid, id, context)
79
kettle_dir = transfo.server_id.kettle_dir
80
filename = transfo.filename
81
logger = netsvc.Logger()
82
transformation_temp = open(kettle_dir + '/openerp_tmp/'+ filename, 'w')
84
file_temp = base64.decodestring(transfo.file)
85
if context.get('filter', False):
86
for key in context['filter']:
87
file_temp = file_temp.replace(key, context['filter'][key])
88
transformation_temp.write(file_temp)
89
transformation_temp.close()
91
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "start kettle task : open kettle log with tail -f " + kettle_dir +'/openerp_tmp/' + log_file_name)
92
cmd = "cd " + kettle_dir + "; nohup sh pan.sh -file=openerp_tmp/" + filename + " > openerp_tmp/" + log_file_name
93
os_result = os.system(cmd)
96
prefixe_log_name = "[ERROR]"
98
note = self.pool.get('ir.attachment').read(cr, uid, attachment_id, ['description'], context)['description']
99
if note and 'WARNING' in note:
100
prefixe_log_name = "[WARNING]"
102
prefixe_log_name = "[SUCCESS]"
104
self.pool.get('ir.attachment').write(cr, uid, [attachment_id], {'datas': base64.encodestring(open(kettle_dir +"/openerp_tmp/" + log_file_name, 'rb').read()), 'datas_fname': 'Task.log', 'name' : prefixe_log_name + 'TASK_LOG'}, context)
106
os.remove(kettle_dir +"/openerp_tmp/" + log_file_name)
108
self.error_wizard(cr, uid, attachment_id, context)
109
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "kettle task finish with success")
111
82
kettle_transformation()
113
85
class kettle_task(osv.osv):
114
86
_name = 'kettle.task'
115
_description = 'kettle task'
87
_description = 'kettle task'
118
'name': fields.char('Task Name', size=64, required=True),
119
'transformation_id': fields.many2one('kettle.transformation', 'Transformation', required=True),
90
'name': fields.char('Task name', size=64, required=True),
91
'server_id': fields.many2one('kettle.server', 'Server', required=True),
92
'transformation_id': fields.many2one('kettle.transformation', 'Transformation / Job', required=True),
120
93
'scheduler': fields.many2one('ir.cron', 'Scheduler', readonly=True),
121
'parameters': fields.text('Parameters'),
122
'upload_file': fields.boolean('Upload File'),
123
'output_file' : fields.boolean('Output File'),
124
'active_python_code' : fields.boolean('Active Python Code'),
125
'python_code_before' : fields.text('Python Code Executed Before Transformation'),
126
'python_code_after' : fields.text('Python Code Executed After Transformation'),
127
'last_date' : fields.datetime('Last Execution'),
94
'upload_file': fields.boolean('Upload file', help="If active, OpenERP will propose to give a file as input for the transformation/job when starting the task"),
95
'output_file' : fields.boolean('Output file', help="If active, OpenERP will store as an attachement the file that has been generated by the Kettle transformation/job"),
96
'active_python_code' : fields.boolean('Active Python code'),
97
'python_code_before' : fields.text('Python code executed before transformation'),
98
'python_code_after' : fields.text('Python code executed after transformation'),
99
'last_date' : fields.datetime('Last execution'),
100
'parameter_ids': fields.one2many('kettle.parameter', 'task_id'),
130
103
def attach_file_to_task(self, cr, uid, id, datas_fname, attach_name, delete = False, context = None):
133
106
context.update({'default_res_id' : id, 'default_res_model': 'kettle.task'})
134
datas = base64.encodestring(open(context['kettle_dir'] + '/' + datas_fname,'rb').read())
135
os.remove(context['kettle_dir'] + '/' + datas_fname)
136
attachment_id = self.pool.get('ir.attachment').create(cr, uid, {'name': attach_name, 'datas': datas, 'datas_fname': datas_fname.split("/").pop()}, context)
107
datas = base64.encodestring(open(os.path.join(context['kettle_dir'], datas_fname), 'rb').read())
108
os.remove(os.path.join(context['kettle_dir'], datas_fname))
109
attachment_id = self.pool.get('ir.attachment').create(cr, uid, {'name': attach_name, 'datas': datas, 'datas_fname': os.path.basename(datas_fname)}, context)
137
110
return attachment_id
139
113
def attach_output_file_to_task(self, cr, uid, id, datas_fname, attach_name, delete = False, context = None):
140
114
filename_completed = False
141
filename = datas_fname.split('/').pop()
142
dir = context['kettle_dir']+'/openerp_tmp'
115
filename = os.path.basename(datas_fname)
116
dir = os.path.join(context['kettle_dir'], 'openerp_tmp')
143
117
files = os.listdir(dir)
144
118
for file in files:
145
119
if filename in file:
146
120
filename_completed = file
147
121
if filename_completed:
148
self.attach_file_to_task(cr, uid, id, 'openerp_tmp/'+ filename_completed, attach_name, delete, context)
122
self.attach_file_to_task(cr, uid, id, os.path.join('openerp_tmp', filename_completed), attach_name, delete, context)
150
raise osv.except_osv('USER ERROR', 'the output file was not found, are you sure that you transformation will give you an output file?')
124
raise osv.except_osv(_('Error'), _('The output file was not found. Are you sure that your transformation/job was supposed to generate an output file?'))
152
127
def execute_python_code(self, cr, uid, id, position, context):
153
128
logger = netsvc.Logger()
154
129
task = self.read(cr, uid, id, ['active_python_code', 'python_code_' + position], context)
157
132
exec(task['python_code_' + position])
158
133
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "python code executed")
137
def error_wizard(self, cr, uid, id, context):
138
error_description = self.pool.get('ir.attachment').read(cr, uid, id, ['description'], context)['description']
139
if error_description and "USER_ERROR" in error_description:
140
raise osv.except_osv('USER_ERROR', error_description)
142
raise osv.except_osv('KETTLE ERROR', 'An error occurred, please look in the kettle log')
145
def run_kettle_task(self, cr, uid, task, parameters, log_file_name, attachment_id, context):
146
'''Execute the Kettle transfo/job'''
147
kettle_dir = task.server_id.kettle_dir
148
if not os.path.exists(kettle_dir):
149
raise osv.except_osv(_('Error :'), _("The directory for Kettle '%s' doesn't exist on the filesystem.") % kettle_dir)
150
if not os.path.isfile(os.path.join(kettle_dir, u'pan.sh')) or not os.path.isfile(os.path.join(kettle_dir, u'kitchen.sh')):
151
raise osv.except_osv(_('Error :'), _("The directory for Kettle '%s' should contain at least the files kitchen.sh and pan.sh.") % kettle_dir)
152
transfo = task.transformation_id
153
logger = netsvc.Logger()
155
if transfo.read_from_filesystem:
156
if os.path.exists(transfo.filename):
157
path_to_file = transfo.filename
158
elif os.path.exists(os.path.join(config['addons_path'], transfo.filename)):
159
path_to_file = os.path.abspath(os.path.join(config['addons_path'], transfo.filename))
161
raise osv.except_osv(_('Error :'), _("The filename '%s' is not an absolute path nor a relative path that can be accessed from the addons directory.") % transfo.filename)
164
file_temp = base64.decodestring(transfo.file)
165
filename = cr.dbname + '_' + str(config['port']) + '_' + str(context['default_res_id']) + '_' + str(task.id) + '_' + '_DATE_' + context['start_date'].replace(' ', '_') + os.path.split(transfo.filename)[1]
166
path_to_file = os.path.join(kettle_dir, 'openerp_tmp', filename)
167
file_temp_fd = open(path_to_file, 'w')
169
file_temp_fd.write(file_temp)
171
logger.notifyChannel('kettle-connector', netsvc.LOG_WARNING, "Can't write Kettle job/transformation '%s' in temporary file '%s'" % (transfo.name, path_to_file))
172
logger.notifyChannel('kettle-connector', netsvc.LOG_WARNING, str(e))
173
raise osv.except_osv(_('Error :'), _("Can't write Kettle job/transformation '%s' in temporary file '%s'" % (transfo.name, path_to_file)))
179
for key, value in parameters.items():
180
cmd_params += u' -param:' + key + u'=' + value
183
if len(transfo.filename) > 4 and transfo.filename[-3:].lower() == 'kjb':
184
kettle_exec = u'kitchen.sh'
186
kettle_exec = u'pan.sh'
187
logfilename = os.path.join(kettle_dir, 'openerp_tmp', log_file_name)
188
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "Starting Kettle task : you can open Kettle logs with 'tail -f %s'" % logfilename)
189
# We need to 'cd' to the install dir of Kettle until PDI 4.1.1
190
# cf http://jira.pentaho.com/browse/PDI-5076
191
cmd = u'cd ' + kettle_dir + u'; nohup sh ' + kettle_exec + u" -file=" + path_to_file + cmd_params + u" > " + logfilename + u" 2>&1"
193
os_result = os.system(cmd.encode(sys.stdout.encoding or 'UTF-8', 'replace'))
196
prefixe_log_name = "[ERROR]"
198
note = self.pool.get('ir.attachment').read(cr, uid, attachment_id, ['description'], context)['description']
199
if note and 'WARNING' in note:
200
prefixe_log_name = "[WARNING]"
202
prefixe_log_name = "[SUCCESS]"
204
self.pool.get('ir.attachment').write(cr, uid, [attachment_id], {'datas': base64.encodestring(open(logfilename, 'rb').read()), 'datas_fname': 'Task.log', 'name' : prefixe_log_name + 'TASK_LOG'}, context)
206
os.remove(logfilename)
208
self.error_wizard(cr, uid, attachment_id, context)
209
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "Kettle task successfully executed")
161
213
def start_kettle_task(self, cr, uid, ids, context=None):
162
214
if context == None:
164
216
logger = netsvc.Logger()
165
217
user = self.pool.get('res.users').browse(cr, uid, uid, context)
167
context.update({'default_res_id' : id, 'default_res_model': 'kettle.task', 'start_date' : time.strftime('%Y-%m-%d %H:%M:%S')})
168
log_file_name = 'TASK_LOG_ID' + str(id) + '_DATE_' + context['start_date'].replace(' ', '_') + ".log"
218
for task in self.browse(cr, uid, ids, context=context):
219
context.update({'default_res_id' : task.id, 'default_res_model': 'kettle.task', 'start_date' : time.strftime('%Y-%m-%d_%H:%M:%S')})
220
log_file_name = 'TASK_LOG_ID' + str(task.id) + '_DATE_' + context['start_date'].replace(' ', '_') + ".log"
169
221
attachment_id = self.pool.get('ir.attachment').create(cr, uid, {'name': log_file_name}, context)
172
context['filter'] = {
173
'AUTO_REP_db_erp': str(cr.dbname),
174
'AUTO_REP_user_erp': str(user.login),
175
'AUTO_REP_db_pass_erp': str(user.password),
176
'AUTO_REP_kettle_task_id' : str(id),
177
'AUTO_REP_kettle_task_attachment_id' : str(attachment_id),
178
'AUTO_REP_erp_url' : "http://localhost:" + str(config['xmlrpc_port']) + "/xmlrpc"
180
task = self.read(cr, uid, id, ['upload_file', 'parameters', 'transformation_id', 'output_file', 'name', 'last_date'], context)
181
server_id = self.pool.get('kettle.transformation').read(cr, uid, task['transformation_id'][0], ['server_id'])['server_id'][0]
182
context['kettle_dir'] = self.pool.get('kettle.server').read(cr, uid, server_id, ['kettle_dir'], context)['kettle_dir']
184
if task['last_date']:
185
context['filter']['AUTO_REP_last_date'] = task['last_date']
187
if task['output_file']:
188
context['filter'].update({'AUTO_REP_file_out' : str('openerp_tmp/output_'+ task['name'] + context['start_date'])})
190
if task['upload_file']:
191
if not (context and context.get('input_filename',False)):
192
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "the task " + task['name'] + " can't be executed because the anyone File was uploaded")
225
parameters['oerp_db'] = cr.dbname
226
parameters['oerp_user'] = user.login
227
parameters['oerp_pwd'] = user.password
228
parameters['oerp_host'] = 'localhost'
229
parameters['oerp_port'] = str(config['port'])
231
parameters['kettle_task_id'] = str(task.id)
232
parameters['task_attachment_id'] = str(attachment_id)
234
context['kettle_dir'] = task.server_id.kettle_dir
237
parameters['last_date'] = task.last_date
240
parameters['file_out'] = os.path.join(task.server_id.kettle_dir, 'openerp_tmp/output_' + task.name + context['start_date'])
243
if not (context and context.get('input_filename', False)):
244
logger.notifyChannel('kettle-connector', netsvc.LOG_INFO, "The task %s can't be executed because the anyone File was uploaded" % task.name)
195
context['filter'].update({'AUTO_REP_file_in' : str(context['input_filename']), 'AUTO_REP_file_in_name' : str(context['input_filename'])})
197
context = self.execute_python_code(cr, uid, id, 'before', context)
199
context['filter'].update(eval('{' + str(task['parameters'] or '')+ '}'))
200
self.pool.get('kettle.transformation').execute_transformation(cr, uid, task['transformation_id'][0], log_file_name, attachment_id, context)
202
context = self.execute_python_code(cr, uid, id, 'after', context)
204
if context.get('input_filename',False):
205
self.attach_file_to_task(cr, uid, id, context['input_filename'], '[FILE IN] FILE IMPORTED ' + context['start_date'], True, context)
247
parameters['file_in'] = context['input_filename']
249
context = self.execute_python_code(cr, uid, task.id, 'before', context)
251
# Add the params defined on the task. Note that it may override default params
253
for parameter in task.parameter_ids:
254
parameters[parameter.name] = parameter.value
256
self.run_kettle_task(cr, uid, task, parameters, log_file_name, attachment_id, context)
258
context = self.execute_python_code(cr, uid, task.id, 'after', context)
260
if context.get('input_filename', False):
261
self.attach_file_to_task(cr, uid, task.id, context['input_filename'], '[FILE IN] FILE IMPORTED ' + context['start_date'], True, context=context)
207
263
if task['output_file']:
208
self.attach_output_file_to_task(cr, uid, id, context['filter']['AUTO_REP_file_out'], '[FILE OUT] FILE IMPORTED ' + context['start_date'], True, context)
210
self.write(cr, uid, [id], {'last_date' : context['start_date']})
264
self.attach_output_file_to_task(cr, uid, task.id, parameters['file_out'], '[FILE OUT] FILE IMPORTED ' + context['start_date'], True, context=context)
266
self.write(cr, uid, [task.id], {'last_date' : context['start_date']})
272
class kettle_parameter(osv.osv):
273
_name = "kettle.parameter"
274
_description = "Kettle parameters"
277
'task_id': fields.many2one('kettle.task', 'Task'),
278
'name': fields.char('Name', size=128, required="True", help="Name of the parameter"),
279
'value': fields.char('Value', size=256, help="Value of the parameter."),
280
'user_id': fields.many2one('res.users', 'User', help="Only visible for this user. This can be usefull for password fields."),
214
286
class kettle_wizard(osv.osv_memory):
215
287
_name = 'kettle.wizard'
216
_description = 'kettle wizard'
288
_description = 'kettle wizard'
219
'upload_file': fields.boolean("Upload File?"),
291
'upload_file': fields.boolean("Upload file?"),
220
292
'file': fields.binary('File'),
221
293
'filename': fields.char('Filename', size=64),