A better MT940 has become available on PyPI

This commit is contained in:
Raphael Michel
2015-10-18 18:05:47 +02:00
parent ae4d102288
commit 705d8bd931
5 changed files with 22 additions and 318 deletions

View File

@@ -1,6 +1,6 @@
[run]
source = pretix
omit = */migrations/*,*/urls.py,*/tests/*,*/testdummy/*,*/admin.py,*/mt940.py,pretix/wsgi.py,pretix/settings.py
omit = */migrations/*,*/urls.py,*/tests/*,*/testdummy/*,*/admin.py,pretix/wsgi.py,pretix/settings.py
[report]
exclude_lines =

View File

@@ -1,274 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2013, Cédric Krier
# Copyright (c) 2014-2015, Nicolas Évrard
# Copyright (c) 2013-2015, B2CK
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the <organization> nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""a parser for MT940 files
"""
__version__ = '0.2'
__all__ = ['MT940', 'rabo_description', 'abn_amro_description',
'ing_description']
import datetime
import re
from collections import defaultdict, namedtuple
from decimal import Decimal
SECTIONS = {
'begin': [':940:'],
'statement': [':20:'],
'account': [':25:'],
'information': [':28:', ':28C:'],
'start_balance': [':60F:'],
'transaction': [':61:'],
'description': [':86:'],
'end_balance': [':62F:'],
}
def _parse_date(date):
return datetime.datetime.strptime(date, '%y%m%d').date()
def _parse_amount(amount, sign='C'):
amount = Decimal(amount.replace(',', '.'))
if sign in ('D', 'RC'):
return -amount
return amount
TRANSACTION_RE = re.compile(r"""
(?P<date>\d{6})
(?P<booking>\d{4})?
(?P<sign>D|C|RC|RD)
(?P<code>\w)?? # ING skips this mandatory field
(?P<amount>(\d|,){1,15})
(?P<id>\w{4})
(?P<reference>.{0,34})""", re.VERBOSE)
class MT940(object):
def __init__(self, name):
self.statements = []
with open(name, 'rU') as f:
values = defaultdict(str)
transactions = []
for line in self._readline(f):
for name, sections in SECTIONS.iteritems():
if name == 'begin':
continue
for section in sections:
if line.startswith(section):
if name in values and name == 'statement':
self._set_statement(values, transactions)
if name.endswith('_balance'):
values[name] = self._get_balance(
line[len(section):])
elif name == 'transaction':
transactions.append(
self._get_transaction(line[len(section):]))
elif name == 'description':
transactions[-1] = (transactions[-1][:-1]
+ (line[len(section):],))
else:
values[name] += line[len(section):]
if values:
self._set_statement(values, transactions)
@staticmethod
def _readline(f):
buf = []
for line in f:
line = line.strip('\n')
if buf:
if (line.startswith(':')
or line.startswith('-')):
yield '\n'.join(buf)
del buf[:]
buf.append(line)
if buf:
yield '\n'.join(buf)
@staticmethod
def _get_balance(balance):
date = _parse_date(balance[1:7])
amount = _parse_amount(balance[10:], balance[0])
return Balance(date=date, amount=amount, currency=balance[7:10])
@staticmethod
def _get_transaction(transaction):
lines = transaction.splitlines()
if len(lines) == 1:
transaction, = lines
additional_data = None
else:
transaction, additional_data = lines
transaction = TRANSACTION_RE.match(transaction)
date = _parse_date(transaction.group('date'))
if transaction.group('booking'):
booking = _parse_date(
transaction.group('date')[:2]
+ transaction.group('booking'))
else:
booking = None
amount = _parse_amount(transaction.group('amount'),
transaction.group('sign'))
id_ = transaction.group('id')
reference = transaction.group('reference')
reference, _, institution_reference = reference.partition('//')
return (date, booking, amount, id_, reference,
institution_reference, additional_data, '')
def _set_statement(self, values, transactions):
self.statements.append(
Statement(
transactions=[Transaction(*t) for t in transactions],
**values))
values.clear()
del transactions[:]
Statement = namedtuple('Statement', ['statement', 'account', 'information',
'start_balance', 'transactions', 'end_balance'])
Balance = namedtuple('Balance', ['date', 'amount', 'currency'])
Transaction = namedtuple('Transaction', ['date', 'booking', 'amount', 'id',
'reference', 'institution_reference', 'additional_data',
'description'])
def _find_swift_tags(tags, description):
values = {}
for tag, name in tags:
if description.startswith(tag):
description = description[len(tag):]
try:
i = description.index('/')
except ValueError:
i = len(description)
values[name] = description[:i]
description = description[i:]
if not description:
break
return values
RABO_TAGS = [
('/MARF/', 'marf'),
('/EREF/', 'eref'),
('/PREF/', 'pref'),
('/BENM/', 'benm'),
('/ORDP/', 'ordp'),
('/NAME/', 'name'),
('/ID/', 'id'),
('/ADDR/', 'addr'),
('/REMI/', 'remi'),
('/CDTRREFTP//CD/SCOR/ISSR/CUR/CDTRREF/', 'cdtrref'),
('/CSID/', 'csid'),
('/ISDT/', 'isdt'),
('/RTRN/', 'rtrn'),
]
def rabo_description(description):
"Return dictionnary with Rabo informations"
description = ''.join(description.splitlines())
return _find_swift_tags(RABO_TAGS, description)
ABN_AMRO_ACCOUNT = re.compile(r"""
^([0-9]{1,3}\.[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,3})""", re.VERBOSE)
ABN_AMRO_GIRO = re.compile(r"""
^GIRO\ +([0-9]+)""", re.VERBOSE)
ABN_AMRO_TAGS = [
('/TRTP/', 'trtp'),
('/IBAN/', 'iban'),
('/BIC/', 'bic'),
('/CSID', 'csid'),
('/NAME/', 'name'),
('/REMI/', 'remi'),
('/EREF/', 'eref'),
('/ORDP//ID/', 'ordp'),
('/BENM//ID/', 'benm'),
]
def abn_amro_description(description):
"Retrun dictionnary with ABN AMRO informations"
description = ''.join(description.splitlines())
values = {}
m = ABN_AMRO_ACCOUNT.match(description)
if m:
values['account'] = m.group(1).replace('.', '')
m = ABN_AMRO_GIRO.match(description)
if m:
values['account'] = m.group(1)
values.update(_find_swift_tags(ABN_AMRO_TAGS, description))
return values
ING_TAGS = re.compile(r'/(RTRN|EREF|PREF|MARF|CSID|CNTP|REMI|PURP|ULT[CD])/')
ING_TAGS_DEFINITION = {
'RTRN': ('rtrn', []),
'EREF': ('eref', []),
'PREF': ('pref', []),
'MARF': ('marf', []),
'CSID': ('csid', []),
'CNTP': ('cntp', ['account_number', 'bic', 'name', 'city']),
'REMI': ('remi', ['code', 'issuer', 'remittance_info']),
'PURP': ('purp', []),
'ULTC': ('ultc', ['name', 'id']),
'ULTD': ('ultd', ['name', 'id']),
}
def ing_description(description):
"Return dictionnary with ING informations"
description = ''.join(description.splitlines())
values = {}
ing_tags = iter(ING_TAGS.split(description)[1:])
for tag, tag_value in zip(ing_tags, ing_tags):
tag_value = tag_value[:-1]
name, subfields = ING_TAGS_DEFINITION[tag]
if not subfields:
values[name] = tag_value
continue
values[name] = {}
if 'name' in subfields or 'remittance_info' in subfields:
special_tag = 'name' if 'name' in subfields else 'remittance_info'
tag_idx = subfields.index(special_tag)
subtags = tag_value.split('/', tag_idx)
for sf_name, sf_value in zip(subfields[:tag_idx], subtags[:-1]):
values[name][sf_name] = sf_value
subtags = subtags[-1].rsplit('/', len(subfields) - tag_idx - 1)
for sf_name, sf_value in zip(subfields[tag_idx:], subtags):
values[name][sf_name] = sf_value
else:
subtags = tag_value.split('/')
for sf_name, sf_value in zip(subfields, subtags):
values[name][sf_name] = sf_value
return values

View File

@@ -1,54 +1,26 @@
import io
from collections import defaultdict
from decimal import Decimal
from . import mt940
class MT940(mt940.MT940):
def __init__(self, f):
# Default implementation only takes a filename, but our file object
# is not necessarily a file on the disk.
self.statements = []
values = defaultdict(str)
transactions = []
for line in self._readline(f):
for name, sections in mt940.SECTIONS.items():
if name == 'begin':
continue
for section in sections:
if line.startswith(section):
if name in values and name == 'statement':
self._set_statement(values, transactions)
if name.endswith('_balance'):
values[name] = self._get_balance(
line[len(section):])
elif name == 'transaction':
transactions.append(
self._get_transaction(line[len(section):]))
elif name == 'description':
transactions[-1] = (transactions[-1][:-1]
+ (line[len(section):],))
else:
values[name] += line[len(section):]
if values:
self._set_statement(values, transactions)
import mt940
def parse(file):
data = file.read()
try:
import chardet
charset = chardet.detect(data)['encoding']
except ImportError:
charset = file.charset
data = data.decode(charset or 'utf-8')
mt = MT940(io.StringIO(data))
mt = mt940.parse(io.StringIO(data.strip()))
result = []
for statement in mt.statements:
for t in statement.transactions:
result.append({
'reference': t.reference + '\n' + t.description,
'amount': str(t.amount),
'date': t.booking.isoformat(),
})
for t in mt:
result.append({
'reference': "\n".join([
t.data.get(f) for f in ('transaction_details', 'customer_reference', 'bank_reference',
'extra_details') if t.data.get(f, '')]),
'amount': str(t.data['amount'].amount.quantize(Decimal('.01'))),
'date': t.data['date'].isoformat()
})
return result

View File

@@ -98,7 +98,12 @@ class ImportView(EventPermissionRequiredMixin, TemplateView):
return self.get(*self.args, **self.kwargs)
def process_mt940(self):
return self.confirm_view(mt940import.parse(self.request.FILES.get('file')))
try:
return self.confirm_view(mt940import.parse(self.request.FILES.get('file')))
except:
logger.exception('Failed to import MT940 file')
messages.error(self.request, _('We were unable to process your input.'))
return self.redirect_back()
@cached_property
def hbci_form(self):
@@ -129,8 +134,8 @@ class ImportView(EventPermissionRequiredMixin, TemplateView):
hint = self.request.event.settings.get('banktransfer_csvhint', as_type=dict)
try:
parsed = csvimport.parse(data, hint)
except csvimport.HintMismatchError as e: # TODO: narrow down
logger.error('Import using stored hint failed: ' + str(e))
except csvimport.HintMismatchError: # TODO: narrow down
logger.exception('Import using stored hint failed')
else:
return self.confirm_view(parsed)

View File

@@ -1,3 +1,4 @@
chardet>=2.3,<3
defusedxml
mt-940