summaryrefslogtreecommitdiff
path: root/addons/account_edi_proxy_client/models/account_edi_proxy_user.py
blob: 9e71a9a3c7c75b9082620a1f791676a68389b85e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from odoo import models, fields, _
from odoo.exceptions import UserError
from .account_edi_proxy_auth import OdooEdiProxyAuth

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.fernet import Fernet
import requests
import uuid
import base64
import logging


_logger = logging.getLogger(__name__)


SERVER_URL = 'https://l10n-it-edi.api.odoo.com'
TIMEOUT = 30


class AccountEdiProxyError(Exception):

    def __init__(self, code, message=False):
        self.code = code
        self.message = message
        super().__init__(message or code)


class AccountEdiProxyClientUser(models.Model):
    """Represents a user of the proxy for an electronic invoicing format.
    An edi_proxy_user has a unique identification on a specific format (for example, the vat for Peppol) which
    allows to identify him when receiving a document addressed to him. It is linked to a specific company on a specific
    Odoo database.
    It also owns a key with which each file should be decrypted with (the proxy encrypt all the files with the public key).
    """
    _name = 'account_edi_proxy_client.user'
    _description = 'Account EDI proxy user'

    active = fields.Boolean(default=True)
    id_client = fields.Char(required=True, index=True)
    company_id = fields.Many2one('res.company', string='Company', required=True,
        default=lambda self: self.env.company)
    edi_format_id = fields.Many2one('account.edi.format', required=True)
    edi_format_code = fields.Char(related='edi_format_id.code', readonly=True)
    edi_identification = fields.Char(required=True, help="The unique id that identifies this user for on the edi format, typically the vat")
    private_key = fields.Binary(required=True, attachment=False, groups="base.group_system", help="The key to encrypt all the user's data")
    refresh_token = fields.Char(groups="base.group_system")

    _sql_constraints = [
        ('unique_id_client', 'unique(id_client)', 'This id_client is already used on another user.'),
        ('unique_edi_identification_per_format', 'unique(edi_identification, edi_format_id)', 'This edi identification is already assigned to a user'),
    ]

    def _make_request(self, url, params=False):
        ''' Make a request to proxy and handle the generic elements of the reponse (errors, new refresh token).
        '''
        payload = {
            'jsonrpc': '2.0',
            'method': 'call',
            'params': params or {},
            'id': uuid.uuid4().hex,
        }

        if self.env['ir.config_parameter'].get_param('account_edi_proxy_client.demo', False):
            # Last barrier : in case the demo mode is not handled by the caller, we block access.
            raise Exception("Can't access the proxy in demo mode")

        try:
            response = requests.post(
                url,
                json=payload,
                timeout=TIMEOUT,
                headers={'content-type': 'application/json'},
                auth=OdooEdiProxyAuth(user=self)).json()
        except (ValueError, requests.exceptions.ConnectionError, requests.exceptions.MissingSchema, requests.exceptions.Timeout, requests.exceptions.HTTPError):
            raise AccountEdiProxyError('connection_error',
                _('The url that this service requested returned an error. The url it tried to contact was %s', url))

        if 'error' in response:
            message = _('The url that this service requested returned an error. The url it tried to contact was %s. %s', url, response['error']['message'])
            if response['error']['code'] == 404:
                message = _('The url that this service does not exist. The url it tried to contact was %s', url)
            raise AccountEdiProxyError('connection_error', message)

        proxy_error = response['result'].pop('proxy_error', False)
        if proxy_error:
            error_code = proxy_error['code']
            if error_code == 'refresh_token_expired':
                self._renew_token()
                return self._make_request(url, params)
            if error_code == 'no_such_user':
                # This error is also raised if the user didn't exchange data and someone else claimed the edi_identificaiton.
                self.active = False
            raise AccountEdiProxyError(error_code, proxy_error['message'] or False)

        return response['result']

    def _register_proxy_user(self, company, edi_format, edi_identification):
        ''' Generate the public_key/private_key that will be used to encrypt the file, send a request to the proxy
        to register the user with the public key and create the user with the private key.

        :param company: the company of the user.
        :param edi_identification: The unique ID that identifies this user on this edi network and to which the files will be addressed.
                                   Typically the vat.
        '''
        # public_exponent=65537 is a default value that should be used most of the time, as per the documentation of cryptography.
        # key_size=2048 is considered a reasonable default key size, as per the documentation of cryptography.
        # see https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        public_key = private_key.public_key()
        public_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        if self.env['ir.config_parameter'].get_param('account_edi_proxy_client.demo', False):
            # simulate registration
            response = {'id_client': 'demo', 'refresh_token': 'demo'}
        else:
            try:
                response = self._make_request(SERVER_URL + '/iap/account_edi/1/create_user', params={
                    'dbuuid': company.env['ir.config_parameter'].get_param('database.uuid'),
                    'company_id': company.id,
                    'edi_format_code': edi_format.code,
                    'edi_identification': edi_identification,
                    'public_key': base64.b64encode(public_pem)
                })
            except AccountEdiProxyError as e:
                raise UserError(e.message)
            if 'error' in response:
                raise UserError(response['error'])

        self.create({
            'id_client': response['id_client'],
            'company_id': company.id,
            'edi_format_id': edi_format.id,
            'edi_identification': edi_identification,
            'private_key': base64.b64encode(private_pem),
            'refresh_token': response['refresh_token'],
        })

    def _renew_token(self):
        ''' Request the proxy for a new refresh token.

        Request to the proxy should be made with a refresh token that expire after 24h to avoid
        that multiple database use the same credentials. When receiving an error for an expired refresh_token,
        This method makes a request to get a new refresh token.
        '''
        response = self._make_request(SERVER_URL + '/iap/account_edi/1/renew_token')
        if 'error' in response:
            # can happen if the database was duplicated and the refresh_token was refreshed by the other database.
            # we don't want two database to be able to query the proxy with the same user
            # because it could lead to not inconsistent data.
            _logger.error(response['error'])
            raise UserError('Proxy error, please contact Odoo (code: 3)')
        self.refresh_token = response['refresh_token']

    def _decrypt_data(self, data, symmetric_key):
        ''' Decrypt the data. Note that the data is encrypted with a symmetric key, which is encrypted with an asymmetric key.
        We must therefore decrypt the symmetric key.

        :param data:            The data to decrypt.
        :param symmetric_key:   The symmetric_key encrypted with self.private_key.public_key()
        '''
        private_key = serialization.load_pem_private_key(
            base64.b64decode(self.private_key),
            password=None,
            backend=default_backend()
        )
        key = private_key.decrypt(
            base64.b64decode(symmetric_key),
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        f = Fernet(key)
        return f.decrypt(base64.b64decode(data))