1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2012 TeMPO Consulting, MSF. All Rights Reserved
# Developer: Olivier DOSSMANN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from osv import osv
from osv import fields
import os.path
from base64 import decodestring
from tempfile import NamedTemporaryFile
from zipfile import ZipFile as zf
import csv
from tools.misc import ustr
from tools.translate import _
from tools import config
import time
import sys
from account_override import ACCOUNT_RESTRICTED_AREA
class hr_payroll_import_period(osv.osv):
_name = 'hr.payroll.import.period'
_description = 'Payroll Import Periods'
_columns = {
'field': fields.char('Field', size=255, readonly=True, required=True),
'period_id': fields.many2one('account.period', string="Period", required=True, readonly=True),
}
_sql_constraints = [
('period_uniq', 'unique (period_id, field)', 'This period have already been validated!'),
]
hr_payroll_import_period()
class hr_payroll_import(osv.osv_memory):
_name = 'hr.payroll.import'
_description = 'Payroll Import'
_columns = {
'file': fields.binary(string="File", filters="*.zip", required=True),
'filename': fields.char(string="Imported filename", size=256),
'date_format': fields.selection([('%d/%m/%Y', 'dd/mm/yyyy'), ('%m-%d-%Y', 'mm-dd-yyyy'), ('%d-%m-%y', 'dd-mm-yy'), ('%d-%m-%Y', 'dd-mm-yyyy'), ('%d/%m/%y', 'dd/mm/yy')], "Date format", required=True, help="This is the date format used in the Homère file in order to recognize them."),
}
def update_payroll_entries(self, cr, uid, data='', field='', date_format='%d/%m/%Y', context=None):
"""
Import payroll entries regarding all elements given in "data"
"""
# Some verifications
if not context:
context = {}
# Prepare some values
# to have more info on import
res_amount = 0.0
res = False
created = 0
# verify that some data exists
if not data:
return False, res_amount, created
if not field:
raise osv.except_osv(_('Error'), _('No field given for payroll import!'))
# Prepare some values
vals = {}
employee_id = False
line_date = False
name = ''
ref = ''
destination_id = False
accounting_code, description, second_description, third, expense, receipt, project, financing_line, \
financing_contract, date, currency, project, analytic_line = zip(data)
# Check period
if not date and not date[0]:
raise osv.except_osv(_('Warning'), _('A date is missing!'))
try:
line_date = time.strftime('%Y-%m-%d', time.strptime(date[0], date_format))
except ValueError, e:
raise osv.except_osv(_('Error'), _('Wrong format for date: %s') % date[0])
period_ids = self.pool.get('account.period').get_period_from_date(cr, uid, line_date)
if not period_ids:
raise osv.except_osv(_('Warning'), _('No open period found for given date: %s') % (line_date,))
if len(period_ids) > 1:
raise osv.except_osv(_('Warning'), _('More than one period found for given date: %s') % (line_date,))
period_id = period_ids[0]
period = self.pool.get('account.period').browse(cr, uid, period_id)
# Check that period have not been inserted in database yet
period_validated_ids = self.pool.get('hr.payroll.import.period').search(cr, uid, [('period_id', '=', period_id), ('field', '=', field)])
if period_validated_ids:
raise osv.except_osv(_('Error'), _('Payroll entries have already been validated for: %s in this period: "%s"!') % (field, period.name,))
period = self.pool.get('account.period').browse(cr, uid, period_id)
# Check that account exists in OpenERP
if not accounting_code or not accounting_code[0]:
raise osv.except_osv(_('Warning'), _('One accounting code is missing!'))
account_ids = self.pool.get('account.account').search(cr, uid, [('code', '=', ustr(accounting_code[0]))])
if not account_ids:
raise osv.except_osv(_('Warning'), _('The accounting code \'%s\' doesn\'t exist!') % (ustr(accounting_code[0]),))
if len(account_ids) > 1:
raise osv.except_osv(_('Warning'), _('There is more than one account that have \'%s\' code!') % (ustr(accounting_code[0]),))
# Fetch DEBIT/CREDIT
debit = 0.0
credit = 0.0
if expense and expense[0]:
debit = float(expense[0])
if receipt and receipt[0]:
credit = float(receipt[0])
amount = round(debit - credit, 2)
# Verify account type
# if view type, raise an error
account = self.pool.get('account.account').browse(cr, uid, account_ids[0])
if account.type == 'view':
raise osv.except_osv(_('Warning'), _('This account is a view type account: %s') % (ustr(accounting_code[0]),))
# Check if it's a payroll rounding line
is_payroll_rounding = False
if third and third[0] and ustr(third[0]) == 'SAGA_BALANCE':
is_payroll_rounding = True
# Check if it's a counterpart line (In HOMERE import, it seems to be lines that have a filled in column "third")
is_counterpart = False
if third and third[0] and third[0] != '':
is_counterpart = True
# For non counterpart lines, check expected accounts
if not is_counterpart:
if not self.pool.get('account.account').search(cr, uid, ACCOUNT_RESTRICTED_AREA['payroll_lines'] + [('id', '=', account.id)]):
raise osv.except_osv(_('Warning'), _('This account is not authorized: %s') % (account.code,))
# If account is analytic-a-holic, fetch employee ID
if account.is_analytic_addicted:
# Add default destination from account
if not account.default_destination_id:
raise osv.except_osv(_('Warning'), _('No default Destination defined for this account: %s') % (account.code or '',))
destination_id = account.default_destination_id and account.default_destination_id.id or False
if second_description and second_description[0] and not is_payroll_rounding:
if not is_counterpart:
# fetch employee ID
employee_identification_id = ustr(second_description[0]).split(' ')[-1]
employee_ids = self.pool.get('hr.employee').search(cr, uid, [('identification_id', '=', employee_identification_id)])
if not employee_ids:
employee_name = ustr(second_description[0]).replace(employee_identification_id, '')
raise osv.except_osv(_('Error'), _('No employee found for this code: %s (%s).\nDEBIT: %s.\nCREDIT: %s.') % (employee_identification_id, employee_name, debit, credit,))
if len(employee_ids) > 1:
raise osv.except_osv(_('Error'), _('More than one employee have the same identification ID: %s') % (employee_identification_id,))
employee_id = employee_ids[0]
# Create description
name = 'Salary ' + str(time.strftime('%b %Y', time.strptime(date[0], date_format)))
# Create reference
date_format_separator = '/'
if '-' in date_format:
date_format_separator = '-'
separator = str(time.strftime('%m' + date_format_separator + '%Y', time.strptime(date[0], date_format)))
try:
ref = description and description[0] and ustr(description[0]).split(separator) and ustr(description[0]).split(separator)[1] or ''
except IndexError, e:
ref = ''
# Fetch description
if not name:
name = description and description[0] and ustr(description[0]) or ''
if is_payroll_rounding:
name = 'Payroll rounding'
if not employee_id:
if second_description and second_description[0]:
ref = ustr(second_description[0])
# Check if currency exists
if not currency and not currency[0]:
raise osv.except_osv(_('Warning'), _('One currency is missing!'))
currency_ids = self.pool.get('res.currency').search(cr, uid, [('name', '=', ustr(currency[0])), ('active', '=', True)])
if not currency_ids:
raise osv.except_osv(_('Error'), _('No \'%s\' currency or non-active currency.') % (ustr(currency[0]),))
if len(currency_ids) > 1:
raise osv.except_osv(_('Error'), _('More than one currency \'%s\' found.') % (ustr(currency[0]),))
currency_id = currency_ids[0]
# Create the payroll entry
vals = {
'date': line_date,
'document_date': line_date,
'period_id': period_id,
'employee_id': employee_id,
'name': name,
'ref': ref,
'account_id': account.id,
'amount': amount,
'currency_id': currency_id,
'state': 'draft',
'field': field,
'destination_id': destination_id,
}
# Retrieve analytic distribution from employee
if employee_id:
employee_data = self.pool.get('hr.employee').read(cr, uid, employee_id, ['cost_center_id', 'funding_pool_id', 'free1_id', 'free2_id'])
vals.update({
'cost_center_id': employee_data and employee_data.get('cost_center_id', False) and employee_data.get('cost_center_id')[0] or False,
'funding_pool_id': employee_data and employee_data.get('funding_pool_id', False) and employee_data.get('funding_pool_id')[0] or False,
'free1_id': employee_data and employee_data.get('free1_id', False) and employee_data.get('free1_id')[0] or False,
'free2_id': employee_data and employee_data.get('free2_id', False) and employee_data.get('free2_id')[0] or False,
})
# Write payroll entry
res = self.pool.get('hr.payroll.msf').create(cr, uid, vals, context={'from': 'import'})
if res:
created += 1
return True, amount, created
def _get_homere_password(self, cr, uid):
if sys.platform.startswith('win'):
homere_file = os.path.join(config['root_path'], 'homere.conf')
else:
homere_file = os.path.join(os.path.expanduser('~'),'tmp/homere.conf') # relative path from user directory to homere password file
# Search homere password file
if not os.path.exists(homere_file):
raise osv.except_osv(_("Error"), _("File '%s' doesn't exist!") % (homere_file,))
# Read homere file
homere_file_data = open(homere_file, 'rb')
pwd = homere_file_data.readline()
if not pwd:
raise osv.except_osv(_("Error"), _("File '%s' is empty !") % (homere_file,))
homere_file_data.close()
return pwd.decode('base64')
def button_validate(self, cr, uid, ids, context=None):
"""
Open ZIP file, take the CSV file into and parse it to import payroll entries
"""
# Do verifications
if not context:
context = {}
# Verify that no draft payroll entries exists
line_ids = self.pool.get('hr.payroll.msf').search(cr, uid, [('state', '=', 'draft')])
if len(line_ids):
raise osv.except_osv(_('Error'), _('You cannot import payroll entries. Please validate first draft payroll entries!'))
# Prepare some values
file_ext_separator = '.'
file_ext = "csv"
message = _("Payroll import failed.")
res = False
created = 0
processed = 0
xyargv = self._get_homere_password(cr, uid)
filename = ""
# Browse all given wizard
for wiz in self.browse(cr, uid, ids):
if not wiz.file:
raise osv.except_osv(_('Error'), _('Nothing to import.'))
# Decode file string
fileobj = NamedTemporaryFile('w+b', delete=False)
fileobj.write(decodestring(wiz.file))
# now we determine the file format
filename = fileobj.name
fileobj.close()
try:
zipobj = zf(filename, 'r')
filename = wiz.filename or ""
except:
raise osv.except_osv(_('Error'), _('Given file is not a zip file!'))
if zipobj.namelist():
namelist = zipobj.namelist()
# Search CSV
csvfile = None
for name in namelist:
if name.split(file_ext_separator) and name.split(file_ext_separator)[-1] == file_ext:
csvfile = name
if not 'envoi.ini' in namelist:
raise osv.except_osv(_('Warning'), _('No envoi.ini file found in given ZIP file!'))
# Read information from 'envoi.ini' file
field = False
try:
import ConfigParser
Config = ConfigParser.SafeConfigParser()
Config.readfp(zipobj.open('envoi.ini', 'r', xyargv))
field = Config.get('DEFAUT', 'PAYS')
except Exception, e:
raise osv.except_osv(_('Error'), _('Could not read envoi.ini file in given ZIP file.'))
if not field:
raise osv.except_osv(_('Warning'), _('Field not found in envoi.ini file.'))
# Read CSV file
if csvfile:
try:
reader = csv.reader(zipobj.open(csvfile, 'r', xyargv), delimiter=';', quotechar='"', doublequote=False, escapechar='\\')
reader.next()
except:
fileobj.close()
raise osv.except_osv(_('Error'), _('Problem to read given file.'))
res = True
res_amount = 0.0
amount = 0.0
for line in reader:
processed += 1
update, amount, nb_created = self.update_payroll_entries(cr, uid, line, field, wiz.date_format)
res_amount += round(amount, 2)
if not update:
res = False
created += nb_created
# Check balance
if round(res_amount, 2) != 0.0:
# adapt difference by writing on payroll rounding line
pr_ids = self.pool.get('hr.payroll.msf').search(cr, uid, [('state', '=', 'draft'), ('name', '=', 'Payroll rounding')])
if not pr_ids:
raise osv.except_osv(_('Error'), _('An error occured on balance and no payroll rounding line found.'))
# Fetch Payroll rounding amount
pr = self.pool.get('hr.payroll.msf').browse(cr, uid, pr_ids[0])
# To compute new amount, you should:
# - take payroll rounding amount
# - take the opposite of res_amount (wich is the current difference)
# - add both
new_amount = round(pr.amount, 2) + (-1 * round(res_amount, 2))
self.pool.get('hr.payroll.msf').write(cr, uid, pr_ids[0], {'amount': round(new_amount, 2),})
else:
raise osv.except_osv(_('Error'), _('Right CSV is not present in this zip file. Please use "File > File sending > Monthly" in Homère.'))
fileobj.close()
if res:
message = _("Payroll import successful")
context.update({'message': message})
view_id = self.pool.get('ir.model.data').get_object_reference(cr, uid, 'msf_homere_interface', 'payroll_import_confirmation')
view_id = view_id and view_id[1] or False
# This is to redirect to Payroll Tree View
context.update({'from': 'payroll_import'})
res_id = self.pool.get('hr.payroll.import.confirmation').create(cr, uid, {'filename': filename,'created': created, 'total': processed, 'state': 'payroll'}, context=context)
return {
'name': 'Payroll Import Confirmation',
'type': 'ir.actions.act_window',
'res_model': 'hr.payroll.import.confirmation',
'view_mode': 'form',
'view_type': 'form',
'view_id': [view_id],
'res_id': res_id,
'target': 'new',
'context': context,
}
hr_payroll_import()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|