from .. import controller from odoo import http from odoo.http import request from odoo.tools.config import config import random import string import requests import json from difflib import SequenceMatcher class User(controller.Controller): prefix = '/api/v1/' def get_user_by_email(self, email): return request.env['res.users'].search([ ('login', '=', email), ('active', 'in', [True, False]) ]) def response_with_token(self, user): data = request.env['res.users'].sudo().api_single_response(user) data['token'] = self.create_user_token(user) return data @http.route(prefix + 'user/login', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def login(self, **kw): email = kw.get('email') password = kw.get('password') if not email or not password: return self.response(code=400, description='email and password is required') user = self.get_user_by_email(email) if user and not user.active: return self.response({ 'is_auth': False, 'reason': 'NOT_ACTIVE' }) try: uid = request.session.authenticate( config.get('db_name'), email, password) user = request.env['res.users'].browse(uid) role = '' if user.is_inbound and user.is_outbound: role = 'admin' elif user.is_outbound: role = 'outbound' elif user.is_inbound: role = 'inbound' data = { 'is_auth': True, 'role': role, 'user': self.response_with_token(user), } return self.response(data) except: return self.response({ 'is_auth': False, 'reason': 'NOT_FOUND' }) @http.route(prefix + 'user/validate-sso', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def user_validate_sso(self, **kw): access_token = kw.get('access_token') try: userinfo_url = 'https://www.googleapis.com/oauth2/v3/userinfo?access_token=' + access_token res_userinfo = requests.get(userinfo_url) userinfo = json.loads(res_userinfo.text) name = userinfo['name'] email = userinfo['email'] except: return self.response({ 'is_auth': False, 'reason': 'INVALID_TOKEN' }) user = self.get_user_by_email(email) if not user: user_data = { 'name': name, 'login': email, 'oauth_provider_id': request.env.ref('auth_oauth.provider_google').id, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) data = { 'is_auth': True, 'user': self.response_with_token(user) } return self.response(data) @http.route(prefix + 'user/register', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def register(self, **kw): name = kw.get('name') email = kw.get('email') password = kw.get('password') if not name or not email or not password: return self.response(code=400, description='email, name and password is required') company = kw.get('company', False) phone = kw.get('phone') response = { 'register': False, 'reason': None } user = self.get_user_by_email(email) if user: if user.active: response['reason'] = 'EMAIL_USED' else: user.send_activation_mail() response['reason'] = 'NOT_ACTIVE' return self.response(response) user_data = { 'name': name, 'login': email, 'phone': phone, 'password': password, 'active': False, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) user.partner_id.email = email if company: parameter = [ ('company_type', '=', 'company'), ('name', 'ilike', company) ] match_company = request.env['res.partner'].search(parameter, limit=1) match_ratio = 0 if match_company: match_ratio = SequenceMatcher(None, match_company.name, company).ratio() if match_ratio > 0.8: request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': match_company.id, 'user_input': company }) else: new_company = request.env['res.partner'].create({ 'name': company }) user.parent_id = new_company.id user.send_activation_mail() response['register'] = True return self.response(response) @http.route(prefix + 'user/activation-request', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def request_activation_user(self, **kw): email = kw.get('email') response = { 'activation_request': False, 'reason': None } user = self.get_user_by_email(email) if not user: response['reason'] = 'NOT_FOUND' return self.response(response) if user.active: response['reason'] = 'ACTIVE' return self.response(response) user.send_activation_mail() response['activation_request'] = True return self.response(response) @http.route(prefix + 'user/activation', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') response = { 'activation': False, 'reason': None, 'user': None } user = request.env['res.users'].search([('activation_token', '=', token), ('active', '=', False)], limit=1) if not user: response['reason'] = 'INVALID_TOKEN' return self.response(response) user.active = True user.activation_token = '' response.update({ 'activation': True, 'user': self.response_with_token(user) }) return self.response(response) @http.route(prefix + 'user/activation-token', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_token(self, **kw): return self.activation_user(**kw) @http.route(prefix + 'user/activation-otp', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_otp(self, **kw): email = kw.get('email') otp = kw.get('otp') response = { 'activation': False, 'reason': None, 'user': None } user = self.get_user_by_email(email) if user.otp_code != otp: response['reason'] = 'INVALID_OTP' return self.response(response) user.active = True response.update({ 'activation': True, 'user': self.response_with_token(user) }) return self.response(response) @http.route(prefix + 'user/forgot-password', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def forgot_password_user(self, **kw): email = kw.get('email') user = self.get_user_by_email(email) if not user: return self.response({'success': False, 'reason': 'NOT_FOUND'}) token_source = string.ascii_letters + string.digits user.reset_password_token = ''.join( random.choice(token_source) for i in range(21)) return self.response({ 'success': True, 'token': user.reset_password_token, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/reset-password', auth='public', methods=['POST', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def reset_password_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') user = request.env['res.users'].search( [('reset_password_token', '=', token), ('active', 'in', [False, True])], limit=1) if not user: return self.response({'success': False, 'reason': 'INVALID_TOKEN'}) password = kw.get('password', '') user.password = password user.reset_password_token = '' return self.response({ 'success': True, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/', auth='public', methods=['PUT', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def update_user(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') allowed_field = ['name', 'phone', 'mobile', 'password'] for field in allowed_field: field_value = kw.get(field) if field_value or field_value == '': user[field] = field_value return self.response({ 'user': self.response_with_token(user) }) @http.route(prefix + 'user//address', auth='public', methods=['GET', 'OPTIONS']) @controller.Controller.must_authorized() def get_user_address_by_id(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') partner_ids = [user.partner_id.id] + [x.id for x in user.child_ids] partners = request.env['res.partner'].search( [('id', 'in', partner_ids)], order='write_date DESC') address = [request.env['res.users'].api_address_response( x) for x in partners] return self.response(address)