1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2011 MSF, TeMPO consulting
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from osv import osv
from osv import fields
from tools.translate import _
from time import strftime
class analytic_line(osv.osv):
_name = "account.analytic.line"
_inherit = "account.analytic.line"
def _get_fake_is_fp_compat_with(self, cr, uid, ids, field_name, args, context=None):
"""
Fake method for 'is_fp_compat_with' field
"""
res = {}
for i in ids:
res[i] = ''
return res
def _search_is_fp_compat_with(self, cr, uid, obj, name, args, context=None):
"""
Return domain that permit to give all analytic line compatible with a given FP.
"""
if not args:
return []
res = []
# We just support '=' operator
for arg in args:
if not arg[1]:
raise osv.except_osv(_('Warning'), _('Some search args are missing!'))
if arg[1] not in ['=',]:
raise osv.except_osv(_('Warning'), _('This filter is not implemented yet!'))
if not arg[2]:
raise osv.except_osv(_('Warning'), _('Some search args are missing!'))
analytic_account = self.pool.get('account.analytic.account').browse(cr, uid, arg[2])
tuple_list = [x.account_id and x.destination_id and (x.account_id.id, x.destination_id.id) for x in analytic_account.tuple_destination_account_ids]
cost_center_ids = [x and x.id for x in analytic_account.cost_center_ids]
for cc in cost_center_ids:
for t in tuple_list:
if res:
res = ['|'] + res
res.append('&')
res.append('&')
res.append(('cost_center_id', '=', cc))
res.append(('general_account_id', '=', t[0]))
res.append(('destination_id', '=', t[1]))
return res
def _journal_type_get(self, cr, uid, context=None):
"""
Get journal types
"""
return self.pool.get('account.analytic.journal').get_journal_type(cr, uid, context)
def _get_entry_sequence(self, cr, uid, ids, field_names, args, context=None):
"""
Give right entry sequence. Either move_id.move_id.name,
or commitment_line_id.commit_id.name, or
if the line was imported, the stored name
"""
if not context:
context = {}
res = {}
for l in self.browse(cr, uid, ids, context):
res[l.id] = ''
if l.move_id:
res[l.id] = l.move_id.move_id.name
elif l.commitment_line_id:
res[l.id] = l.commitment_line_id.commit_id.name
elif l.imported_commitment:
res[l.id] = l.imported_entry_sequence
elif not l.move_id:
# UF-2217
# on create the value is inserted by a sql query, so we can retreive it after the insertion
# the field has store=True so we don't create a loop
# on write the value is not updated by the query, the method always returns the value set at creation
res[l.id] = l.entry_sequence
return res
def _get_period_id(self, cr, uid, ids, field_name, args, context=None):
"""
Fetch period_id from:
- move_id
- commitment_line_id
"""
# Checks
if not context:
context = {}
# Prepare some values
res = {}
period_obj = self.pool.get('account.period')
for al in self.browse(cr, uid, ids, context):
res[al.id] = False
# UTP-943: Since this ticket, we search period regarding analytic line posting date.
period_ids = period_obj.get_period_from_date(cr, uid, date=al.date)
if period_ids:
res[al.id] = period_ids[0]
return res
def _search_period_id(self, cr, uid, obj, name, args, context=None):
"""
Search period regarding date.
First fetch period date_start and date_stop.
Then check that analytic line have a posting date bewteen these two date.
Finally do this check as "OR" for each given period.
Examples:
- Just january:
['&', ('date', '>=', '2013-01-01'), ('date', '<=', '2013-01-31')]
- January + February:
['|', '&', ('date', '>=', '2013-01-01'), ('date', '<=', '2013-01-31'), '&', ('date', '>=', '2013-02-01'), ('date', '<=', '2013-02-28')]
- January + February + March
['|', '|', '&', ('date', '>=', '2013-01-01'), ('date', '<=', '2013-01-31'), '&', ('date', '>=', '2013-02-01'), ('date', '<=', '2013-02-28'), '&', ('date', '>=', '2013-03-01'), ('date', '<=', '2013-03-31')]
"""
# Checks
if not context:
context = {}
if not args:
return []
new_args = []
period_obj = self.pool.get('account.period')
for arg in args:
if len(arg) == 3 and arg[1] in ['=', 'in']:
periods = arg[2]
if isinstance(periods, (int, long)):
periods = [periods]
if len(periods) > 1:
for _ in range(len(periods) - 1):
new_args.append('|')
for p_id in periods:
period = period_obj.browse(cr, uid, [p_id])[0]
new_args.append('&')
new_args.append(('date', '>=', period.date_start))
new_args.append(('date', '<=', period.date_stop))
return new_args
def _get_from_commitment_line(self, cr, uid, ids, field_name, args, context=None):
"""
Check if line comes from a 'engagement' journal type. If yes, True. Otherwise False.
"""
if context is None:
context = {}
res = {}
for al in self.browse(cr, uid, ids, context=context):
res[al.id] = False
if al.journal_id.type == 'engagement':
res[al.id] = True
return res
def _get_is_unposted(self, cr, uid, ids, field_name, args, context=None):
"""
Check journal entry state. If unposted: True, otherwise False.
A line that comes from a commitment cannot be posted. So it's always to False.
"""
if context is None:
context = {}
res = {}
for al in self.browse(cr, uid, ids, context=context):
res[al.id] = False
if al.move_state != 'posted' and al.journal_id.type != 'engagement':
res[al.id] = True
return res
_columns = {
'commitment_line_id': fields.many2one('account.commitment.line', string='Commitment Voucher Line', ondelete='cascade'),
'is_fp_compat_with': fields.function(_get_fake_is_fp_compat_with, fnct_search=_search_is_fp_compat_with, method=True, type="char", size=254, string="Is compatible with some FP?"),
'move_state': fields.related('move_id', 'move_id', 'state', type='selection', size=64, relation="account.move.line", selection=[('draft', 'Unposted'), ('posted', 'Posted')], string='Journal Entry state', readonly=True, help="Indicates that this line come from an Unposted Journal Entry."),
'journal_type': fields.related('journal_id', 'type', type='selection', selection=_journal_type_get, string="Journal Type", readonly=True, \
help="Indicates the Journal Type of the Analytic journal item"),
'entry_sequence': fields.function(_get_entry_sequence, method=True, type='text', string="Entry Sequence", readonly=True, store=True),
'period_id': fields.function(_get_period_id, fnct_search=_search_period_id, method=True, string="Period", readonly=True, type="many2one", relation="account.period", store=False),
'from_commitment_line': fields.function(_get_from_commitment_line, method=True, type='boolean', string="Commitment?"),
'is_unposted': fields.function(_get_is_unposted, method=True, type='boolean', string="Unposted?"),
'imported_commitment': fields.boolean(string="From imported commitment?"),
'imported_entry_sequence': fields.text("Imported Entry Sequence"),
}
_defaults = {
'imported_commitment': lambda *a: False,
}
def create(self, cr, uid, vals, context=None):
"""
Check date for given date and given account_id
"""
# Some verifications
if not context:
context = {}
# Default behaviour
res = super(analytic_line, self).create(cr, uid, vals, context=context)
# Check soft/hard closed contract
sql = """SELECT fcc.id
FROM financing_contract_funding_pool_line fcfpl, account_analytic_account a, financing_contract_format fcf, financing_contract_contract fcc
WHERE fcfpl.funding_pool_id = a.id
AND fcfpl.contract_id = fcf.id
AND fcc.format_id = fcf.id
AND a.id = %s
AND fcc.state in ('soft_closed', 'hard_closed');"""
cr.execute(sql, tuple([vals.get('account_id')]))
sql_res = cr.fetchall()
if sql_res:
account = self.pool.get('account.analytic.account').browse(cr, uid, vals.get('account_id'))
contract = self.pool.get('financing.contract.contract').browse(cr, uid, sql_res[0][0])
raise osv.except_osv(_('Warning'), _('Selected Funding Pool analytic account (%s) is blocked by a soft/hard closed contract: %s') % (account and account.code or '', contract and contract.name or ''))
return res
def update_account(self, cr, uid, ids, account_id, date=False, context=None):
"""
Update account on given analytic lines with account_id on given date
"""
# Some verifications
if not context:
context = {}
if isinstance(ids, (int, long)):
ids = [ids]
if not account_id:
return False
if not date:
date = strftime('%Y-%m-%d')
# Prepare some value
account = self.pool.get('account.analytic.account').browse(cr, uid, [account_id], context)[0]
context.update({'from': 'mass_reallocation'}) # this permits reallocation to be accepted when rewrite analaytic lines
correction_journal_ids = self.pool.get('account.analytic.journal').search(cr, uid, [('type', '=', 'correction'), ('is_current_instance', '=', True)])
correction_journal_id = correction_journal_ids and correction_journal_ids[0] or False
if not correction_journal_id:
raise osv.except_osv(_('Error'), _('No analytic journal found for corrections!'))
# Process lines
for aline in self.browse(cr, uid, ids, context=context):
if account.category in ['OC', 'DEST']:
# Period verification
period = aline.move_id and aline.move_id.period_id or False
# Prepare some values
fieldname = 'cost_center_id'
if account.category == 'DEST':
fieldname = 'destination_id'
# if period is not closed, so override line.
if period and period.state not in ['done', 'mission-closed']:
# Update account # Date: UTP-943 speak about original date for non closed periods
self.write(cr, uid, [aline.id], {fieldname: account_id, 'date': aline.date,
'source_date': aline.source_date or aline.date}, context=context)
# else reverse line before recreating them with right values
else:
# First reverse line
rev_ids = self.pool.get('account.analytic.line').reverse(cr, uid, [aline.id], posting_date=date)
# UTP-943: Shoud have a correction journal on these lines
self.pool.get('account.analytic.line').write(cr, uid, rev_ids, {'journal_id': correction_journal_id, 'is_reversal': True, 'reversal_origin': aline.id, 'last_corrected_id': False})
# UTP-943: Check that period is open
correction_period_ids = self.pool.get('account.period').get_period_from_date(cr, uid, date, context=context)
if not correction_period_ids:
raise osv.except_osv(_('Error'), _('No period found for this date: %s') % (date,))
for p in self.pool.get('account.period').browse(cr, uid, correction_period_ids, context=context):
if p.state != 'draft':
raise osv.except_osv(_('Error'), _('Period (%s) is not open.') % (p.name,))
# then create new lines
cor_ids = self.pool.get('account.analytic.line').copy(cr, uid, aline.id, {fieldname: account_id, 'date': date,
'source_date': aline.source_date or aline.date, 'journal_id': correction_journal_id}, context=context)
self.pool.get('account.analytic.line').write(cr, uid, cor_ids, {'last_corrected_id': aline.id})
# finally flag analytic line as reallocated
self.pool.get('account.analytic.line').write(cr, uid, [aline.id], {'is_reallocated': True})
else:
# Update account
self.write(cr, uid, [aline.id], {'account_id': account_id}, context=context)
# Set line as corrected upstream if we are in COORDO/HQ instance
self.pool.get('account.move.line').corrected_upstream_marker(cr, uid, [aline.move_id.id], context=context)
return True
def check_analytic_account(self, cr, uid, ids, account_id, context=None):
"""
Analytic distribution validity verification with given account for given ids.
Return all valid ids.
"""
# Some verifications
if not context:
context = {}
if isinstance(ids, (int, long)):
ids = [ids]
# Prepare some value
account = self.pool.get('account.analytic.account').read(cr, uid, account_id, ['category', 'date_start', 'date'], context=context)
account_type = account and account.get('category', False) or False
res = []
if not account_type:
return res
try:
msf_private_fund = self.pool.get('ir.model.data').get_object_reference(cr, uid, 'analytic_distribution',
'analytic_account_msf_private_funds')[1]
except ValueError:
msf_private_fund = 0
expired_date_ids = []
date_start = account and account.get('date_start', False) or False
date_stop = account and account.get('date', False) or False
# Date verification for all lines and fetch all necessary elements sorted by analytic distribution
for aline in self.browse(cr, uid, ids):
# UTP-800: Change date comparison regarding FP. If FP, use document date. Otherwise use date.
aline_cmp_date = aline.date
if account_type == 'FUNDING':
aline_cmp_date = aline.document_date
# Add line to expired_date if date is not in date_start - date_stop
if (date_start and aline_cmp_date < date_start) or (date_stop and aline_cmp_date > date_stop):
expired_date_ids.append(aline.id)
# Process regarding account_type
if account_type == 'OC':
for aline in self.browse(cr, uid, ids):
# Verify that:
# - the line doesn't have any draft/open contract
check_accounts = self.pool.get('account.analytic.account').is_blocked_by_a_contract(cr, uid, [aline.account_id.id])
if check_accounts and aline.account_id.id in check_accounts:
continue
if aline.account_id and aline.account_id.id == msf_private_fund:
res.append(aline.id)
elif aline.account_id and aline.cost_center_id and aline.account_id.cost_center_ids:
if account_id in [x and x.id for x in aline.account_id.cost_center_ids] or aline.account_id.id == msf_private_fund:
res.append(aline.id)
elif account_type == 'FUNDING':
fp = self.pool.get('account.analytic.account').read(cr, uid, account_id, ['cost_center_ids', 'tuple_destination_account_ids'], context=context)
cc_ids = fp and fp.get('cost_center_ids', []) or []
tuple_destination_account_ids = fp and fp.get('tuple_destination_account_ids', []) or []
tuple_list = [x.account_id and x.destination_id and (x.account_id.id, x.destination_id.id) for x in self.pool.get('account.destination.link').browse(cr, uid, tuple_destination_account_ids)]
# Browse all analytic line to verify them
for aline in self.browse(cr, uid, ids):
# Verify that:
# - the line doesn't have any draft/open contract
check_accounts = self.pool.get('account.analytic.account').is_blocked_by_a_contract(cr, uid, [aline.account_id.id])
if check_accounts and aline.account_id.id in check_accounts:
continue
# No verification if account is MSF Private Fund because of its compatibility with all elements.
if account_id == msf_private_fund:
res.append(aline.id)
continue
# Verify that:
# - the line have a cost_center_id field (we expect it's a line with a funding pool account)
# - the cost_center is in compatible cost center from the new funding pool
# - the general account is in compatible account/destination tuple
# - the destination is in compatible account/destination tuple
if aline.cost_center_id and aline.cost_center_id.id in cc_ids and aline.general_account_id and aline.destination_id and (aline.general_account_id.id, aline.destination_id.id) in tuple_list:
res.append(aline.id)
else:
# Case of FREE1 and FREE2 lines
for i in ids:
res.append(i)
# Delete elements that are in expired_date_ids
for e in expired_date_ids:
if e in res:
res.remove(e)
return res
analytic_line()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|