summaryrefslogtreecommitdiff
path: root/addons/auth_totp/models/res_users.py
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/auth_totp/models/res_users.py
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/auth_totp/models/res_users.py')
-rw-r--r--addons/auth_totp/models/res_users.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/addons/auth_totp/models/res_users.py b/addons/auth_totp/models/res_users.py
new file mode 100644
index 00000000..5ba901a2
--- /dev/null
+++ b/addons/auth_totp/models/res_users.py
@@ -0,0 +1,238 @@
+# -*- coding: utf-8 -*-
+import base64
+import functools
+import hmac
+import io
+import logging
+import os
+import re
+import struct
+import time
+
+import werkzeug.urls
+
+from odoo import _, api, fields, models
+from odoo.addons.base.models.res_users import check_identity
+from odoo.exceptions import AccessDenied, UserError
+from odoo.http import request, db_list
+
+_logger = logging.getLogger(__name__)
+
+compress = functools.partial(re.sub, r'\s', '')
+class Users(models.Model):
+ _inherit = 'res.users'
+
+ totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS)
+ totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled')
+
+ def __init__(self, pool, cr):
+ init_res = super().__init__(pool, cr)
+ type(self).SELF_READABLE_FIELDS = self.SELF_READABLE_FIELDS + ['totp_enabled']
+ return init_res
+
+ def _mfa_url(self):
+ r = super()._mfa_url()
+ if r is not None:
+ return r
+ if self.totp_enabled:
+ return '/web/login/totp'
+
+ @api.depends('totp_secret')
+ def _compute_totp_enabled(self):
+ for r, v in zip(self, self.sudo()):
+ r.totp_enabled = bool(v.totp_secret)
+
+ def _rpc_api_keys_only(self):
+ # 2FA enabled means we can't allow password-based RPC
+ self.ensure_one()
+ return self.totp_enabled or super()._rpc_api_keys_only()
+
+ def _get_session_token_fields(self):
+ return super()._get_session_token_fields() | {'totp_secret'}
+
+ def _totp_check(self, code):
+ sudo = self.sudo()
+ key = base64.b32decode(sudo.totp_secret)
+ match = TOTP(key).match(code)
+ if match is None:
+ _logger.info("2FA check: FAIL for %s %r", self, self.login)
+ raise AccessDenied()
+ _logger.info("2FA check: SUCCESS for %s %r", self, self.login)
+
+ def _totp_try_setting(self, secret, code):
+ if self.totp_enabled or self != self.env.user:
+ _logger.info("2FA enable: REJECT for %s %r", self, self.login)
+ return False
+
+ secret = compress(secret).upper()
+ match = TOTP(base64.b32decode(secret)).match(code)
+ if match is None:
+ _logger.info("2FA enable: REJECT CODE for %s %r", self, self.login)
+ return False
+
+ self.sudo().totp_secret = secret
+ if request:
+ self.flush()
+ # update session token so the user does not get logged out (cache cleared by change)
+ new_token = self.env.user._compute_session_token(request.session.sid)
+ request.session.session_token = new_token
+
+ _logger.info("2FA enable: SUCCESS for %s %r", self, self.login)
+ return True
+
+ @check_identity
+ def totp_disable(self):
+ logins = ', '.join(map(repr, self.mapped('login')))
+ if not (self == self.env.user or self.env.user._is_admin() or self.env.su):
+ _logger.info("2FA disable: REJECT for %s (%s) by uid #%s", self, logins, self.env.user.id)
+ return False
+
+ self.sudo().write({'totp_secret': False})
+ if request and self == self.env.user:
+ self.flush()
+ # update session token so the user does not get logged out (cache cleared by change)
+ new_token = self.env.user._compute_session_token(request.session.sid)
+ request.session.session_token = new_token
+
+ _logger.info("2FA disable: SUCCESS for %s (%s) by uid #%s", self, logins, self.env.user.id)
+ return {
+ 'type': 'ir.actions.client',
+ 'tag': 'display_notification',
+ 'params': {
+ 'type': 'warning',
+ 'message': _("Two-factor authentication disabled for user(s) %s", logins),
+ 'next': {'type': 'ir.actions.act_window_close'},
+ }
+ }
+
+ @check_identity
+ def totp_enable_wizard(self):
+ if self.env.user != self:
+ raise UserError(_("Two-factor authentication can only be enabled for yourself"))
+
+ if self.totp_enabled:
+ raise UserError(_("Two-factor authentication already enabled"))
+
+ secret_bytes_count = TOTP_SECRET_SIZE // 8
+ secret = base64.b32encode(os.urandom(secret_bytes_count)).decode()
+ # format secret in groups of 4 characters for readability
+ secret = ' '.join(map(''.join, zip(*[iter(secret)]*4)))
+ w = self.env['auth_totp.wizard'].create({
+ 'user_id': self.id,
+ 'secret': secret,
+ })
+ return {
+ 'type': 'ir.actions.act_window',
+ 'target': 'new',
+ 'res_model': 'auth_totp.wizard',
+ 'name': _("Enable Two-Factor Authentication"),
+ 'res_id': w.id,
+ 'views': [(False, 'form')],
+ }
+
+class TOTPWizard(models.TransientModel):
+ _name = 'auth_totp.wizard'
+ _description = "Two-Factor Setup Wizard"
+
+ user_id = fields.Many2one('res.users', required=True, readonly=True)
+ secret = fields.Char(required=True, readonly=True)
+ url = fields.Char(store=True, readonly=True, compute='_compute_qrcode')
+ qrcode = fields.Binary(
+ attachment=False, store=True, readonly=True,
+ compute='_compute_qrcode',
+ )
+ code = fields.Char(string="Verification Code", size=7)
+
+ @api.depends('user_id.login', 'user_id.company_id.display_name', 'secret')
+ def _compute_qrcode(self):
+ # TODO: make "issuer" configurable through config parameter?
+ global_issuer = request and request.httprequest.host.split(':', 1)[0]
+ for w in self:
+ issuer = global_issuer or w.user_id.company_id.display_name
+ w.url = url = werkzeug.urls.url_unparse((
+ 'otpauth', 'totp',
+ werkzeug.urls.url_quote(f'{issuer}:{w.user_id.login}', safe=':'),
+ werkzeug.urls.url_encode({
+ 'secret': compress(w.secret),
+ 'issuer': issuer,
+ # apparently a lowercase hash name is anathema to google
+ # authenticator (error) and passlib (no token)
+ 'algorithm': ALGORITHM.upper(),
+ 'digits': DIGITS,
+ 'period': TIMESTEP,
+ }), ''
+ ))
+
+ data = io.BytesIO()
+ import qrcode
+ qrcode.make(url.encode(), box_size=4).save(data, optimise=True, format='PNG')
+ w.qrcode = base64.b64encode(data.getvalue()).decode()
+
+ @check_identity
+ def enable(self):
+ try:
+ c = int(compress(self.code))
+ except ValueError:
+ raise UserError(_("The verification code should only contain numbers"))
+ if self.user_id._totp_try_setting(self.secret, c):
+ self.secret = '' # empty it, because why keep it until GC?
+ return {
+ 'type': 'ir.actions.client',
+ 'tag': 'display_notification',
+ 'params': {
+ 'type': 'success',
+ 'message': _("Two-factor authentication is now enabled."),
+ 'next': {'type': 'ir.actions.act_window_close'},
+ }
+ }
+ raise UserError(_('Verification failed, please double-check the 6-digit code'))
+
+# 160 bits, as recommended by HOTP RFC 4226, section 4, R6.
+# Google Auth uses 80 bits by default but supports 160.
+TOTP_SECRET_SIZE = 160
+
+# The algorithm (and key URI format) allows customising these parameters but
+# google authenticator doesn't support it
+# https://github.com/google/google-authenticator/wiki/Key-Uri-Format
+ALGORITHM = 'sha1'
+DIGITS = 6
+TIMESTEP = 30
+
+class TOTP:
+ def __init__(self, key):
+ self._key = key
+
+ def match(self, code, t=None, window=TIMESTEP):
+ """
+ :param code: authenticator code to check against this key
+ :param int t: current timestamp (seconds)
+ :param int window: fuzz window to account for slow fingers, network
+ latency, desynchronised clocks, ..., every code
+ valid between t-window an t+window is considered
+ valid
+ """
+ if t is None:
+ t = time.time()
+
+ low = int((t - window) / TIMESTEP)
+ high = int((t + window) / TIMESTEP) + 1
+
+ return next((
+ counter for counter in range(low, high)
+ if hotp(self._key, counter) == code
+ ), None)
+
+def hotp(secret, counter):
+ # C is the 64b counter encoded in big-endian
+ C = struct.pack(">Q", counter)
+ mac = hmac.new(secret, msg=C, digestmod=ALGORITHM).digest()
+ # the data offset is the last nibble of the hash
+ offset = mac[-1] & 0xF
+ # code is the 4 bytes at the offset interpreted as a 31b big-endian uint
+ # (31b to avoid sign concerns). This effectively limits digits to 9 and
+ # hard-limits it to 10: each digit is normally worth 3.32 bits but the
+ # 10th is only worth 1.1 (9 digits encode 29.9 bits).
+ code = struct.unpack_from('>I', mac, offset)[0] & 0x7FFFFFFF
+ r = code % (10 ** DIGITS)
+ # NOTE: use text / bytes instead of int?
+ return r