~camptocamp/account-financial-tools/add-credit-control-legal-claim-nbi

« back to all changes in this revision

Viewing changes to account_credit_control/policy.py

  • Committer: Guewen Baconnier @ Camptocamp
  • Date: 2012-10-22 11:06:14 UTC
  • Revision ID: guewen.baconnier@camptocamp.com-20121022110614-6dm0mu819ume9gl3
[IMP] renamed module to account_credit_control, wordings, removed scenarios (moved to lp:oerpscenario
(lp:c2c-addons/6.1  rev 89.1.1)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
##############################################################################
 
3
#
 
4
#    Author: Nicolas Bessi
 
5
#    Copyright 2012 Camptocamp SA
 
6
#
 
7
#    This program is free software: you can redistribute it and/or modify
 
8
#    it under the terms of the GNU Affero General Public License as
 
9
#    published by the Free Software Foundation, either version 3 of the
 
10
#    License, or (at your option) any later version.
 
11
#
 
12
#    This program is distributed in the hope that it will be useful,
 
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
#    GNU Affero General Public License for more details.
 
16
#
 
17
#    You should have received a copy of the GNU Affero General Public License
 
18
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
19
#
 
20
##############################################################################
 
21
from openerp.osv.orm import Model, fields
 
22
from openerp.tools.translate import _
 
23
 
 
24
class CreditControlPolicy(Model):
 
25
    """Define a policy of reminder"""
 
26
 
 
27
    _name = "credit.control.policy"
 
28
    _description = """Define a reminder policy"""
 
29
    _columns = {'name': fields.char('Name', required=True, size=128),
 
30
 
 
31
                'level_ids' : fields.one2many('credit.control.policy.level',
 
32
                                                     'policy_id',
 
33
                                                     'Policy Levels'),
 
34
 
 
35
                'do_nothing' : fields.boolean('Do nothing',
 
36
                                              help=('For policies who should not '
 
37
                                                    'generate lines or are obsolete')),
 
38
 
 
39
                'company_id' : fields.many2one('res.company', 'Company')
 
40
                }
 
41
 
 
42
 
 
43
    def _get_account_related_lines(self, cursor, uid, policy_id, lookup_date, lines, context=None):
 
44
        """ We get all the lines related to accounts with given credit policy.
 
45
            We try not to use direct SQL in order to respect security rules.
 
46
            As we define the first set it is important, The date is used to do a prefilter.
 
47
            !!!We take the asumption that only receivable lines have a maturity date
 
48
            and account must be reconcillable"""
 
49
        context = context or {}
 
50
        move_l_obj = self.pool.get('account.move.line')
 
51
        account_obj = self.pool.get('account.account')
 
52
        acc_ids =  account_obj.search(cursor, uid, [('credit_policy_id', '=', policy_id)])
 
53
        if not acc_ids:
 
54
            return lines
 
55
        move_ids =  move_l_obj.search(cursor, uid, [('account_id', 'in', acc_ids),
 
56
                                                    ('date_maturity', '<=', lookup_date),
 
57
                                                    ('reconcile_id', '=', False),
 
58
                                                    ('partner_id', '!=', False)])
 
59
 
 
60
        lines += move_ids
 
61
        return lines
 
62
 
 
63
 
 
64
    def _get_sum_reduce_range(self, cursor, uid, policy_id, lookup_date, lines, model,
 
65
                              move_relation_field, context=None):
 
66
        """ We get all the lines related to the model with given credit policy.
 
67
            We also reduce from the global set (lines) the move line to be excluded.
 
68
            We try not to use direct SQL in order to respect security rules.
 
69
            As we define the first set it is important.
 
70
            The policy relation field MUST be named credit_policy_id
 
71
            and the model must have a relation
 
72
            with account move line.
 
73
            !!! We take the asumption that only receivable lines have a maturity date
 
74
            and account must be reconcillable"""
 
75
        # MARK possible place for a good optimisation
 
76
        context = context or {}
 
77
        my_obj = self.pool.get(model)
 
78
        move_l_obj = self.pool.get('account.move.line')
 
79
        add_obj_ids =  my_obj.search(cursor, uid, [('credit_policy_id', '=', policy_id)])
 
80
        if add_obj_ids:
 
81
            add_lines = move_l_obj.search(cursor, uid, [(move_relation_field, 'in', add_obj_ids),
 
82
                                                        ('date_maturity', '<=', lookup_date),
 
83
                                                        ('partner_id', '!=', False),
 
84
                                                        ('reconcile_id', '=', False)])
 
85
            lines = list(set(lines + add_lines))
 
86
        # we get all the lines that must be excluded at partner_level
 
87
        # from the global set (even the one included at account level)
 
88
        neg_obj_ids =  my_obj.search(cursor, uid, [('credit_policy_id', '!=', policy_id),
 
89
                                                   ('credit_policy_id', '!=', False)])
 
90
        if neg_obj_ids:
 
91
            # should we add ('id', 'in', lines) in domain ? it may give a veeery long SQL...
 
92
            neg_lines = move_l_obj.search(cursor, uid, [(move_relation_field, 'in', neg_obj_ids),
 
93
                                                        ('date_maturity', '<=', lookup_date),
 
94
                                                        ('partner_id', '!=', False),
 
95
                                                        ('reconcile_id', '=', False)])
 
96
            if neg_lines:
 
97
                lines = list(set(lines) - set(neg_lines))
 
98
        return lines
 
99
 
 
100
 
 
101
    def _get_partner_related_lines(self, cursor, uid, policy_id, lookup_date, lines, context=None):
 
102
        return self._get_sum_reduce_range(cursor, uid, policy_id, lookup_date, lines,
 
103
                                          'res.partner', 'partner_id', context=context)
 
104
 
 
105
 
 
106
    def _get_invoice_related_lines(self, cursor, uid, policy_id, lookup_date, lines, context=None):
 
107
        return self._get_sum_reduce_range(cursor, uid, policy_id, lookup_date, lines,
 
108
                                          'account.invoice', 'invoice', context=context)
 
109
 
 
110
 
 
111
    def _get_moves_line_to_process(self, cursor, uid, policy_id, lookup_date, context=None):
 
112
        """Retrive all the move line to be procces for current policy.
 
113
           This function is planned to be use only on one id.
 
114
           Priority of inclustion, exlusion is account, partner, invoice"""
 
115
        context = context or {}
 
116
        lines = []
 
117
        if isinstance(policy_id, list):
 
118
            policy_id = policy_id[0]
 
119
        # order of call MUST be respected priority is account, partner, invoice
 
120
        lines = self._get_account_related_lines(cursor, uid, policy_id,
 
121
                                                lookup_date, lines, context=context)
 
122
        lines = self._get_partner_related_lines(cursor, uid, policy_id,
 
123
                                                lookup_date, lines, context=context)
 
124
        lines = self._get_invoice_related_lines(cursor, uid, policy_id,
 
125
                                                lookup_date, lines, context=context)
 
126
        return lines
 
127
 
 
128
    def _check_lines_policies(self, cursor, uid, policy_id, lines, context=None):
 
129
        """ Check if there is credit line related to same move line but
 
130
            related to an other policy"""
 
131
        context = context or {}
 
132
        if not lines:
 
133
            return []
 
134
        if isinstance(policy_id, list):
 
135
            policy_id = policy_id[0]
 
136
        cursor.execute("SELECT move_line_id FROM credit_control_line"
 
137
                       " WHERE policy_id != %s and move_line_id in %s",
 
138
                       (policy_id, tuple(lines)))
 
139
        res = cursor.fetchall()
 
140
        if res:
 
141
            return [x[0] for x in res]
 
142
        else:
 
143
            return []
 
144
 
 
145
 
 
146
 
 
147
class CreditControlPolicyLevel(Model):
 
148
    """Define a policy level. A level allows to determine if
 
149
    a move line is due and the level of overdue of the line"""
 
150
 
 
151
    _name = "credit.control.policy.level"
 
152
    _order = 'level'
 
153
    _description = """A credit control policy level"""
 
154
    _columns = {'policy_id': fields.many2one('credit.control.policy',
 
155
                                              'Related Policy', required=True),
 
156
                'name': fields.char('Name', size=128, required=True),
 
157
                'level': fields.float('level', required=True),
 
158
 
 
159
                'computation_mode': fields.selection([('net_days', 'Due date'),
 
160
                                                      ('end_of_month', 'Due Date: end of Month'),
 
161
                                                      ('previous_date', 'Previous reminder')],
 
162
                                                     'Compute mode',
 
163
                                                     required=True),
 
164
 
 
165
                'delay_days': fields.integer('Delay in day', required='True'),
 
166
                'mail_template_id': fields.many2one('email.template', 'Mail template',
 
167
                                                    required=True),
 
168
                'canal': fields.selection([('manual', 'Manual'),
 
169
                                           ('mail', 'Mail')],
 
170
                                          'Canal', required=True),
 
171
                'custom_text': fields.text('Custom message', required=True, translate=True),
 
172
                }
 
173
 
 
174
 
 
175
    def _check_level_mode(self, cursor, uid, rids, context=None):
 
176
        """We check that the smallest level is not based
 
177
            on a level using previous_date mode"""
 
178
        if not isinstance(rids, list):
 
179
            rids = [rids]
 
180
        for level in self.browse(cursor, uid, rids, context):
 
181
            smallest_level_id = self.search(cursor, uid, [('policy_id', '=', level.policy_id.id)],
 
182
                                           order='level asc', limit=1, context=context)
 
183
            smallest_level = self.browse(cursor, uid, smallest_level_id[0], context)
 
184
            if smallest_level.computation_mode == 'previous_date':
 
185
                return False
 
186
        return True
 
187
 
 
188
 
 
189
 
 
190
    _sql_constraint = [('unique level',
 
191
                        'UNIQUE (policy_id, level)',
 
192
                        'Level must be unique per policy')]
 
193
 
 
194
    _constraints = [(_check_level_mode,
 
195
                     'The smallest level can not be of type Previous reminder',
 
196
                     ['level'])]
 
197
 
 
198
    def _previous_level(self, cursor, uid, policy_level, context=None):
 
199
        """ For one policy level, returns the id of the previous level
 
200
 
 
201
        If there is no previous level, it returns None, it means that's the
 
202
        first policy level
 
203
 
 
204
        :param browse_record policy_level: policy level
 
205
        :return: previous level id or None if there is no previous level
 
206
        """
 
207
        previous_level_ids = self.search(
 
208
            cursor,
 
209
            uid,
 
210
            [('policy_id', '=', policy_level.policy_id.id),
 
211
             ('level', '<', policy_level.level)],
 
212
            order='level desc',
 
213
            limit=1,
 
214
            context=context)
 
215
        return previous_level_ids[0] if previous_level_ids else None
 
216
 
 
217
    # ----- time related functions ---------
 
218
 
 
219
    def _net_days_get_boundary(self):
 
220
        return " (mv_line.date_maturity + %(delay)s)::date <= date(%(lookup_date)s)"
 
221
 
 
222
    def _end_of_month_get_boundary(self):
 
223
        return ("(date_trunc('MONTH', (mv_line.date_maturity + %(delay)s))+INTERVAL '1 MONTH - 1 day')::date"
 
224
                "<= date(%(lookup_date)s)")
 
225
 
 
226
    def _previous_date_get_boundary(self):
 
227
        return "(cr_line.date + %(delay)s)::date <= date(%(lookup_date)s)"
 
228
 
 
229
    def _get_sql_date_boundary_for_computation_mode(self, cursor, uid, level, lookup_date, context=None):
 
230
        """Return a where clauses statement for the given
 
231
           lookup date and computation mode of the level"""
 
232
        fname = "_%s_get_boundary" % (level.computation_mode,)
 
233
        if hasattr(self, fname):
 
234
            fnc = getattr(self, fname)
 
235
            return fnc()
 
236
        else:
 
237
            raise NotImplementedError(_('Can not get function for computation mode: '
 
238
                                        '%s is not implemented') % (fname,))
 
239
 
 
240
    # -----------------------------------------
 
241
 
 
242
    def _get_first_level_lines(self, cursor, uid, level, lookup_date, lines, context=None):
 
243
        if not lines:
 
244
            return []
 
245
        """Retrieve all the line that are linked to a frist level.
 
246
           We use Raw SQL for perf. Security rule where applied in
 
247
           policy object when line where retrieved"""
 
248
        sql = ("SELECT DISTINCT mv_line.id\n"
 
249
               " FROM account_move_line mv_line\n"
 
250
               " WHERE mv_line.id in %(line_ids)s\n"
 
251
               " AND NOT EXISTS (SELECT cr_line.id from credit_control_line cr_line\n"
 
252
               "                  WHERE cr_line.move_line_id = mv_line.id)")
 
253
        sql += " AND" + self._get_sql_date_boundary_for_computation_mode(
 
254
                cursor, uid, level, lookup_date, context)
 
255
        data_dict = {'lookup_date': lookup_date, 'line_ids': tuple(lines),
 
256
                     'delay': level.delay_days}
 
257
 
 
258
        cursor.execute(sql, data_dict)
 
259
        res = cursor.fetchall()
 
260
        if not res:
 
261
            return []
 
262
        return [x[0] for x in res]
 
263
 
 
264
 
 
265
    def _get_other_level_lines(self, cursor, uid, level, lookup_date, lines, context=None):
 
266
        # We filter line that have a level smaller than current one
 
267
        # TODO if code fits need refactor _get_first_level_lines and _get_other_level_lines
 
268
        # Code is not DRY
 
269
        if not lines:
 
270
            return []
 
271
        sql = ("SELECT mv_line.id\n"
 
272
               " FROM account_move_line mv_line\n"
 
273
               " JOIN  credit_control_line cr_line\n"
 
274
               " ON (mv_line.id = cr_line.move_line_id)\n"
 
275
               " WHERE cr_line.id = (SELECT credit_control_line.id FROM credit_control_line\n"
 
276
               "                            WHERE credit_control_line.move_line_id = mv_line.id\n"
 
277
               "                              ORDER BY credit_control_line.level desc limit 1)\n"
 
278
               " AND cr_line.level = %(level)s\n"
 
279
               " AND mv_line.id in %(line_ids)s\n")
 
280
        sql += " AND " + self._get_sql_date_boundary_for_computation_mode(
 
281
                cursor, uid, level, lookup_date, context)
 
282
        previous_level_id = self._previous_level(
 
283
                cursor, uid, level, context=context)
 
284
        previous_level = self.browse(
 
285
                cursor, uid, previous_level_id, context=context)
 
286
        data_dict =  {'lookup_date': lookup_date, 'line_ids': tuple(lines),
 
287
                     'delay': level.delay_days, 'level': previous_level.level}
 
288
 
 
289
        cursor.execute(sql, data_dict)
 
290
        res = cursor.fetchall()
 
291
        if not res:
 
292
            return []
 
293
        return [x[0] for x in res]
 
294
 
 
295
    def get_level_lines(self, cursor, uid, level_id, lookup_date, lines, context=None):
 
296
        """get all move lines in entry lines that match the current level"""
 
297
        assert not (isinstance(level_id, list) and len(level_id) > 1), "level_id: only one id expected"
 
298
        if isinstance(level_id, list):
 
299
            level_id = level_id[0]
 
300
        matching_lines = []
 
301
        level = self.browse(cursor, uid, level_id, context=context)
 
302
        if self._previous_level(cursor, uid, level, context=context) is None:
 
303
            matching_lines += self._get_first_level_lines(
 
304
                cursor, uid, level, lookup_date, lines, context=context)
 
305
        else:
 
306
            matching_lines += self._get_other_level_lines(
 
307
                cursor, uid, level, lookup_date, lines, context=context)
 
308
 
 
309
        return matching_lines
 
310