~banking-addons-team/banking-addons/banking-addons-70

« back to all changes in this revision

Viewing changes to account_banking_camt/camt.py

  • Committer: Holger Brunn
  • Author(s): stefan at therp
  • Date: 2013-10-28 14:54:32 UTC
  • mfrom: (185.7.14 7.0-add_camt_import)
  • Revision ID: hbrunn@therp.nl-20131028145432-7knhs856tag5i3v7
[ADD] CAMT.053 parser

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
##############################################################################
 
3
#
 
4
#    Copyright (C) 2013 Therp BV (<http://therp.nl>)
 
5
#    All Rights Reserved
 
6
#
 
7
#    This program is free software: you can redistribute it and/or modify
 
8
#    it under the terms of the GNU Affero General Public License as published by
 
9
#    the Free Software Foundation, either version 3 of the License, or
 
10
#    (at your option) any later version.
 
11
#
 
12
#    This program is distributed in the hope that it will be useful,
 
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
#    GNU Affero General Public License for more details.
 
16
#
 
17
#    You should have received a copy of the GNU Affero General Public License
 
18
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
19
#
 
20
##############################################################################
 
21
 
 
22
from lxml import etree
 
23
from openerp.osv.orm import except_orm
 
24
from account_banking.parsers import models
 
25
from account_banking.parsers.convert import str2date
 
26
 
 
27
bt = models.mem_bank_transaction
 
28
 
 
29
class transaction(models.mem_bank_transaction):
 
30
 
 
31
    def __init__(self, values, *args, **kwargs):
 
32
        super(transaction, self).__init__(*args, **kwargs)
 
33
        for attr in values:
 
34
            setattr(self, attr, values[attr])
 
35
 
 
36
    def is_valid(self):
 
37
        return not self.error_message
 
38
 
 
39
class parser(models.parser):
 
40
    code = 'CAMT'
 
41
    country_code = 'NL'
 
42
    name = 'Generic CAMT Format'
 
43
    doc = '''\
 
44
CAMT Format parser
 
45
'''
 
46
 
 
47
    def tag(self, node):
 
48
        """
 
49
        Return the tag of a node, stripped from its namespace
 
50
        """
 
51
        return node.tag[len(self.ns):]
 
52
 
 
53
    def assert_tag(self, node, expected):
 
54
        """
 
55
        Get node's stripped tag and compare with expected
 
56
        """
 
57
        assert self.tag(node) == expected, (
 
58
            "Expected tag '%s', got '%s' instead" %
 
59
            (self.tag(node), expected))
 
60
 
 
61
    def xpath(self, node, expr):
 
62
        """
 
63
        Wrap namespaces argument into call to Element.xpath():
 
64
 
 
65
        self.xpath(node, './ns:Acct/ns:Id')
 
66
        """
 
67
        return node.xpath(expr, namespaces={'ns': self.ns[1:-1]})
 
68
 
 
69
    def find(self, node, expr):
 
70
        """
 
71
        Like xpath(), but return first result if any or else False
 
72
        
 
73
        Return None to test nodes for being truesy
 
74
        """
 
75
        result = node.xpath(expr, namespaces={'ns': self.ns[1:-1]})
 
76
        if result:
 
77
            return result[0]
 
78
        return None
 
79
 
 
80
    def get_balance_type_node(self, node, balance_type):
 
81
        """
 
82
        :param node: BkToCstmrStmt/Stmt/Bal node
 
83
        :param balance type: one of 'OPBD', 'PRCD', 'ITBD', 'CLBD'
 
84
        """
 
85
        code_expr = './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' % balance_type
 
86
        return self.xpath(node, code_expr)
 
87
    
 
88
    def parse_amount(self, node):
 
89
        """
 
90
        Parse an element that contains both Amount and CreditDebitIndicator
 
91
        
 
92
        :return: signed amount
 
93
        :returntype: float
 
94
        """
 
95
        sign = -1 if node.find(self.ns + 'CdtDbtInd').text == 'DBIT' else 1
 
96
        return sign * float(node.find(self.ns + 'Amt').text)
 
97
        
 
98
    def get_start_balance(self, node):
 
99
        """
 
100
        Find the (only) balance node with code OpeningBalance, or
 
101
        the only one with code 'PreviousClosingBalance'
 
102
        or the first balance node with code InterimBalance in
 
103
        the case of preceeding pagination.
 
104
 
 
105
        :param node: BkToCstmrStmt/Stmt/Bal node
 
106
        """
 
107
        nodes = (
 
108
            self.get_balance_type_node(node, 'OPBD') or
 
109
            self.get_balance_type_node(node, 'PRCD') or
 
110
            self.get_balance_type_node(node, 'ITBD'))
 
111
        return self.parse_amount(nodes[0])
 
112
 
 
113
    def get_end_balance(self, node):
 
114
        """
 
115
        Find the (only) balance node with code ClosingBalance, or
 
116
        the second (and last) balance node with code InterimBalance in
 
117
        the case of continued pagination.
 
118
 
 
119
        :param node: BkToCstmrStmt/Stmt/Bal node
 
120
        """
 
121
        nodes = (
 
122
            self.get_balance_type_node(node, 'CLBD') or
 
123
            self.get_balance_type_node(node, 'ITBD'))
 
124
        return self.parse_amount(nodes[-1])
 
125
 
 
126
    def parse_Stmt(self, cr, node):
 
127
        """
 
128
        Parse a single Stmt node.
 
129
 
 
130
        Be sure to craft a unique, but short enough statement identifier,
 
131
        as it is used as the basis of the generated move lines' names
 
132
        which overflow when using the full IBAN and CAMT statement id.
 
133
        """
 
134
        statement = models.mem_bank_statement()
 
135
        statement.local_account = (
 
136
            self.xpath(node, './ns:Acct/ns:Id/ns:IBAN')[0].text
 
137
            if self.xpath(node, './ns:Acct/ns:Id/ns:IBAN')
 
138
            else self.xpath(node, './ns:Acct/ns:Id/ns:Othr/ns:Id')[0].text)
 
139
 
 
140
        identifier = node.find(self.ns + 'Id').text
 
141
        if identifier.upper().startswith('CAMT053'):
 
142
            identifier = identifier[7:]
 
143
        statement.id = self.get_unique_statement_id(
 
144
            cr, "%s-%s" % (
 
145
                self.get_unique_account_identifier(
 
146
                    cr, statement.local_account),
 
147
                identifier)
 
148
            )
 
149
 
 
150
        statement.local_currency = self.xpath(node, './ns:Acct/ns:Ccy')[0].text
 
151
        statement.start_balance = self.get_start_balance(node)
 
152
        statement.end_balance = self.get_end_balance(node)
 
153
        number = 0
 
154
        for Ntry in self.xpath(node, './ns:Ntry'):
 
155
            transaction_detail = self.parse_Ntry(Ntry)
 
156
            if number == 0:
 
157
                # Take the statement date from the first transaction
 
158
                statement.date = str2date(
 
159
                    transaction_detail['execution_date'], "%Y-%m-%d")
 
160
            number += 1
 
161
            transaction_detail['id'] = str(number).zfill(4)
 
162
            statement.transactions.append(
 
163
                transaction(transaction_detail))
 
164
        return statement
 
165
 
 
166
    def get_transfer_type(self, node):
 
167
        """
 
168
        Map entry descriptions to transfer types. To extend with
 
169
        proper mapping from BkTxCd/Domn/Cd/Fmly/Cd to transfer types
 
170
        if we can get our hands on real life samples.
 
171
 
 
172
        For now, leave as a hook for bank specific overrides to map
 
173
        properietary codes from BkTxCd/Prtry/Cd.
 
174
 
 
175
        :param node: Ntry node
 
176
        """
 
177
        return bt.ORDER
 
178
 
 
179
    def parse_Ntry(self, node):
 
180
        """
 
181
        :param node: Ntry node
 
182
        """
 
183
        entry_details = {
 
184
            'execution_date': self.xpath(node, './ns:BookgDt/ns:Dt')[0].text,
 
185
            'effective_date': self.xpath(node, './ns:ValDt/ns:Dt')[0].text,
 
186
            'transfer_type': self.get_transfer_type(node),
 
187
            'transferred_amount': self.parse_amount(node)
 
188
            }
 
189
        TxDtls = self.xpath(node, './ns:NtryDtls/ns:TxDtls')
 
190
        if len(TxDtls) == 1:
 
191
            vals = self.parse_TxDtls(TxDtls[0], entry_details)
 
192
        else:
 
193
            vals = entry_details
 
194
        return vals
 
195
 
 
196
    def get_party_values(self, TxDtls):
 
197
        """
 
198
        Determine to get either the debtor or creditor party node
 
199
        and extract the available data from it
 
200
        """
 
201
        vals = {}
 
202
        party_type = self.find(
 
203
            TxDtls, '../../ns:CdtDbtInd').text == 'CRDT' and 'Dbtr' or 'Cdtr'
 
204
        party_node = self.find(TxDtls, './ns:RltdPties/ns:%s' % party_type)
 
205
        account_node = self.find(
 
206
            TxDtls, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type)
 
207
        bic_node = self.find(
 
208
            TxDtls,
 
209
            './ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type)
 
210
        if party_node is not None:
 
211
            name_node = self.find(party_node, './ns:Nm')
 
212
            vals['remote_owner'] = (
 
213
                name_node.text if name_node is not None else False)
 
214
            country_node = self.find(party_node, './ns:PstlAdr/ns:Ctry')
 
215
            vals['remote_owner_country'] = (
 
216
                country_node.text if country_node is not None else False)
 
217
            address_node = self.find(party_node, './ns:PstlAdr/ns:AdrLine')
 
218
            if address_node is not None:
 
219
                vals['remote_owner_address'] = [address_node.text]
 
220
        if account_node is not None:
 
221
            iban_node = self.find(account_node, './ns:IBAN')
 
222
            if iban_node is not None:
 
223
                vals['remote_account'] = iban_node.text
 
224
                if bic_node is not None:
 
225
                    vals['remote_bank_bic'] = bic_node.text
 
226
            else:
 
227
                domestic_node = self.find(account_node, './ns:Othr/ns:Id')
 
228
                vals['remote_account'] = (
 
229
                    domestic_node.text if domestic_node is not None else False)
 
230
        return vals
 
231
 
 
232
    def parse_TxDtls(self, TxDtls, entry_values):
 
233
        """
 
234
        Parse a single TxDtls node
 
235
        """
 
236
        vals = dict(entry_values)
 
237
        unstructured = self.xpath(TxDtls, './ns:RmtInf/ns:Ustrd')
 
238
        if unstructured:
 
239
            vals['message'] = ' '.join([x.text for x in unstructured])
 
240
        structured = self.find(
 
241
            TxDtls, './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref')
 
242
        if structured is None or not structured.text:
 
243
            structured = self.find(TxDtls, './ns:Refs/ns:EndToEndId') 
 
244
        if structured is not None:
 
245
            vals['reference'] = structured.text
 
246
        else:
 
247
            if vals.get('message'):
 
248
                vals['reference'] = vals['message']
 
249
        vals.update(self.get_party_values(TxDtls))
 
250
        return vals
 
251
 
 
252
    def check_version(self):
 
253
        """
 
254
        Sanity check the document's namespace
 
255
        """
 
256
        if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.'):
 
257
            raise except_orm(
 
258
                "Error",
 
259
                "This does not seem to be a CAMT format bank statement.")
 
260
 
 
261
        if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.053.'):
 
262
            raise except_orm(
 
263
                "Error",
 
264
                "Only CAMT.053 is supported at the moment.")
 
265
        return True
 
266
 
 
267
    def parse(self, cr, data):
 
268
        """
 
269
        Parse a CAMT053 XML file
 
270
        """
 
271
        root = etree.fromstring(data)
 
272
        self.ns = root.tag[:root.tag.index("}") + 1]
 
273
        self.check_version()
 
274
        self.assert_tag(root[0][0], 'GrpHdr')
 
275
        statements = []
 
276
        for node in root[0][1:]:
 
277
            statements.append(self.parse_Stmt(cr, node))
 
278
        return statements