import base64 from .. import controller from odoo import http from odoo.http import request from odoo.tools.config import config import random import string import requests import json from difflib import SequenceMatcher class User(controller.Controller): prefix = '/api/v1/' def get_user_by_email(self, email): return request.env['res.users'].search([ ('login', '=', email), ('active', 'in', [True, False]) ]) def response_with_token(self, user): data = request.env['res.users'].sudo().api_single_response(user) data['token'] = self.create_user_token(user) return data @http.route(prefix + 'user/login', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def login(self, **kw): email = kw.get('email') password = kw.get('password') if not email or not password: return self.response(code=400, description='email and password is required') user = self.get_user_by_email(email) if user and not user.active: return self.response({ 'is_auth': False, 'reason': 'NOT_ACTIVE' }) try: uid = request.session.authenticate( config.get('db_name'), email, password) user = request.env['res.users'].browse(uid) role = '' if user.is_inbound and user.is_outbound: role = 'admin' elif user.is_outbound: role = 'outbound' elif user.is_inbound: role = 'inbound' data = { 'is_auth': True, 'role': role, 'user': self.response_with_token(user), } return self.response(data) except: return self.response({ 'is_auth': False, 'reason': 'NOT_FOUND' }) @http.route(prefix + 'user/validate-sso', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def user_validate_sso(self, **kw): access_token = kw.get('access_token') try: userinfo_url = 'https://www.googleapis.com/oauth2/v3/userinfo?access_token=' + access_token res_userinfo = requests.get(userinfo_url) userinfo = json.loads(res_userinfo.text) name = userinfo['name'] email = userinfo['email'] except: return self.response({ 'is_auth': False, 'reason': 'INVALID_TOKEN' }) user = self.get_user_by_email(email) if not user: user_data = { 'name': name, 'login': email, 'oauth_provider_id': request.env.ref('auth_oauth.provider_google').id, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) data = { 'is_auth': True, 'user': self.response_with_token(user) } return self.response(data) @http.route(prefix + 'user/register', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def register(self, **kw): name = kw.get('name') email = kw.get('email') password = kw.get('password') if not name or not email or not password: return self.response(code=400, description='email, name and password is required') company = kw.get('company', False) phone = kw.get('phone') response = { 'register': False, 'reason': None } user = self.get_user_by_email(email) if user: if user.active: response['reason'] = 'EMAIL_USED' else: user.send_activation_mail() response['reason'] = 'NOT_ACTIVE' return self.response(response) user_data = { 'name': name, 'login': email, 'phone': phone, 'password': password, 'active': False, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) user.partner_id.email = email if company: parameter = [ ('company_type', '=', 'company'), ('name', 'ilike', company) ] match_company = request.env['res.partner'].search(parameter, limit=1) match_ratio = 0 if match_company: match_ratio = SequenceMatcher(None, match_company.name, company).ratio() if match_ratio > 0.8: request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': match_company.id, 'user_input': company }) else: new_company = request.env['res.partner'].create({ 'name': company }) user.parent_id = new_company.id user.send_activation_mail() response['register'] = True return self.response(response) @http.route(prefix + 'user/activation-request', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def request_activation_user(self, **kw): email = kw.get('email') response = { 'activation_request': False, 'reason': None } user = self.get_user_by_email(email) if not user: response['reason'] = 'NOT_FOUND' return self.response(response) if user.active: response['reason'] = 'ACTIVE' return self.response(response) user.send_activation_mail() response['activation_request'] = True return self.response(response) @http.route(prefix + 'user/activation', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') response = { 'activation': False, 'reason': None, 'user': None } user = request.env['res.users'].search([('activation_token', '=', token), ('active', '=', False)], limit=1) if not user: response['reason'] = 'INVALID_TOKEN' return self.response(response) user.active = True user.activation_token = '' response.update({ 'activation': True, 'user': self.response_with_token(user) }) return self.response(response) @http.route(prefix + 'user/activation-token', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_token(self, **kw): return self.activation_user(**kw) @http.route(prefix + 'user/activation-otp', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_otp(self, **kw): email = kw.get('email') otp = kw.get('otp') response = { 'activation': False, 'reason': None, 'user': None } user = self.get_user_by_email(email) if user.otp_code != otp: response['reason'] = 'INVALID_OTP' return self.response(response) user.active = True response.update({ 'activation': True, 'user': self.response_with_token(user) }) return self.response(response) @http.route(prefix + 'user/forgot-password', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def forgot_password_user(self, **kw): email = kw.get('email') user = self.get_user_by_email(email) if not user: return self.response({'success': False, 'reason': 'NOT_FOUND'}) token_source = string.ascii_letters + string.digits user.reset_password_token = ''.join( random.choice(token_source) for i in range(21)) return self.response({ 'success': True, 'token': user.reset_password_token, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/reset-password', auth='public', methods=['POST', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def reset_password_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') user = request.env['res.users'].search( [('reset_password_token', '=', token), ('active', 'in', [False, True])], limit=1) if not user: return self.response({'success': False, 'reason': 'INVALID_TOKEN'}) password = kw.get('password', '') user.password = password user.reset_password_token = '' return self.response({ 'success': True, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/', auth='public', methods=['PUT', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def update_user(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') allowed_field = ['name', 'phone', 'mobile', 'password'] for field in allowed_field: field_value = kw.get(field) if field_value or field_value == '': user[field] = field_value return self.response({ 'user': self.response_with_token(user) }) @http.route(prefix + 'user//address', auth='public', methods=['GET', 'OPTIONS']) @controller.Controller.must_authorized() def get_user_address_by_id(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') partner_ids = [user.partner_id.id] + [x.id for x in user.child_ids] partners = request.env['res.partner'].search( [('id', 'in', partner_ids)], order='write_date DESC') address = [request.env['res.users'].api_address_response( x) for x in partners] return self.response(address)