import base64 from .. import controller from odoo import http from odoo.http import request from odoo.tools.config import config import random import string import requests import json from difflib import SequenceMatcher import mimetypes import base64 class User(controller.Controller): prefix = '/api/v1/' def get_user_by_email(self, email): return request.env['res.users'].search([ ('login', '=', email), ('active', 'in', [True, False]) ]) def response_with_token(self, user): data = request.env['res.users'].sudo().api_single_response(user) data['token'] = self.create_user_token(user) return data @http.route(prefix + 'user/login', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def login(self, **kw): email = kw.get('email') password = kw.get('password') if not email or not password: return self.response(code=400, description='email and password is required') user = self.get_user_by_email(email) if user and not user.active: return self.response({ 'is_auth': False, 'reason': 'NOT_ACTIVE' }) try: uid = request.session.authenticate( config.get('db_name'), email, password) user = request.env['res.users'].browse(uid) role = '' if user.is_inbound and user.is_outbound: role = 'admin' elif user.is_outbound: role = 'outbound' elif user.is_inbound: role = 'inbound' data = { 'is_auth': True, 'role': role, 'user': self.response_with_token(user), } return self.response(data) except: return self.response({ 'is_auth': False, 'reason': 'NOT_FOUND' }) @http.route(prefix + 'user/validate-sso', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def user_validate_sso(self, **kw): access_token = kw.get('access_token') try: userinfo_url = 'https://www.googleapis.com/oauth2/v3/userinfo?access_token=' + access_token res_userinfo = requests.get(userinfo_url) userinfo = json.loads(res_userinfo.text) name = userinfo['name'] email = userinfo['email'] except: return self.response({ 'is_auth': False, 'reason': 'INVALID_TOKEN' }) user = self.get_user_by_email(email) if not user: user_data = { 'name': name, 'login': email, 'oauth_provider_id': request.env.ref('auth_oauth.provider_google').id, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) user.partner_id.email = email user.partner_id.customer_type = 'nonpkp' user.partner_id.npwp = '00.000.000.0-000.000' user.partner_id.sppkp = '-' user.partner_id.nama_wajib_pajak = user.name user.partner_id.user_id = 3222 user.partner_id.property_account_receivable_id = 395 user.partner_id.property_account_payable_id = 438 data = { 'is_auth': True, 'user': self.response_with_token(user) } return self.response(data) @http.route(prefix + 'user/register', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def register(self, **kw): name = kw.get('name') email = kw.get('email') password = kw.get('password') phone = kw.get('phone') # Form Data npwp = kw.get('npwp') npwp_document = kw.get('npwp_document', False) npwp_filename = kw.get('npwp_filename', 'npwp_document') sppkp = kw.get('sppkp', False) sppkp_document = kw.get('sppkp_document', False) sppkp_filename = kw.get('sppkp_filename', 'sppkp_document') email_partner = kw.get('email_partner') business_name = kw.get('business_name', False) industry_id = int(kw.get('industry_id', '0') or 0) company_type_id = int(kw.get('company_type_id', '0') or 0) alamat_wajib_pajak = kw.get('alamat_wajib_pajak', False) alamat_bisnis = kw.get('alamat_bisnis', False) nama_wajib_pajak = kw.get('nama_wajib_pajak', False) is_pkp = kw.get('is_pkp') type_acc = kw.get('type_acc', 'individu') or 'individu' if not name or not email or not password: return self.response(code=400, description='email, name, and password are required') response = { 'register': False, 'reason': None } user = self.get_user_by_email(email) if user: if user.active: response['reason'] = 'EMAIL_USED' else: user.send_activation_mail() response['reason'] = 'NOT_ACTIVE' return self.response(response) # Create user data user_data = { 'name': name, 'login': email, 'mobile': phone, 'phone': phone, 'password': password, 'active': False, 'sel_groups_1_9_10': 9 } user = request.env['res.users'].create(user_data) user.partner_id.email = email user.partner_id.mobile = phone if type_acc == 'business' and business_name: # Eksekusi query SQL menggunakan Levenshtein distance query = """ SELECT name, levenshtein(name::text, %s) AS distance FROM res_partner WHERE levenshtein(name::text, %s) < 3 ORDER BY distance ASC """ params = (business_name, business_name) request.env.cr.execute(query, params) result = request.env.cr.fetchone() if result: match_company_name = result[0] match_company_id = result[2] # Create a user company request request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': match_company_id, 'user_input': business_name }) else: if not nama_wajib_pajak and is_pkp == 'false': nama_wajib_pajak = business_name if not alamat_wajib_pajak and is_pkp == 'false': alamat_wajib_pajak = alamat_bisnis new_company_data = { 'name': business_name if business_name else business_name, 'company_type_id': company_type_id if company_type_id else False, 'industry_id': industry_id, 'customer_type': 'pkp' if is_pkp == 'true' else 'nonpkp', 'npwp': npwp if is_pkp == 'true' else (npwp if npwp else '00.000.000.0-000.000'), 'sppkp': sppkp if is_pkp == 'true' else (sppkp if sppkp else '-'), 'nama_wajib_pajak': nama_wajib_pajak, 'alamat_lengkap_text': alamat_wajib_pajak, 'email': email_partner, 'street': alamat_bisnis, 'company_type': 'company', 'user_id': 3222, 'property_account_receivable_id': 395, 'property_account_payable_id': 438, 'active': False, } new_company = request.env['res.partner'].create(new_company_data) request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': new_company.id, 'user_input': business_name }) if npwp_document: npwp_mimetype, _ = mimetypes.guess_type(npwp_filename) npwp_mimetype = npwp_mimetype or 'application/octet-stream' pdf_data = base64.b64decode(npwp_document) npwp_attachment = request.env['ir.attachment'].create({ 'name': 'NPWP Document', 'type': 'binary', 'datas': base64.b64encode(pdf_data), 'res_model': 'res.partner', 'res_id': new_company.id, 'mimetype': npwp_mimetype }) new_company.message_post(body="NPWP Uploaded", attachment_ids=[npwp_attachment.id]) if sppkp_document: sppkp_mimetype, _ = mimetypes.guess_type(sppkp_filename) sppkp_mimetype = sppkp_mimetype or 'application/octet-stream' pdf_data = base64.b64decode(sppkp_document) sppkp_attachment = request.env['ir.attachment'].create({ 'name': 'SPPKP Document', 'type': 'binary', 'datas': base64.b64encode(pdf_data), 'res_model': 'res.partner', 'res_id': new_company.id, 'mimetype': sppkp_mimetype }) new_company.message_post(body="SPPKP Uploaded", attachment_ids=[sppkp_attachment.id]) if type_acc == 'individu': user.partner_id.customer_type = 'nonpkp' user.partner_id.npwp = '00.000.000.0-000.000' user.partner_id.sppkp = '-' user.partner_id.nama_wajib_pajak = name user.partner_id.user_id = 3222 user.partner_id.property_account_receivable_id= 395 user.partner_id.property_account_payable_id = 438 user.send_activation_mail() response['register'] = True return self.response(response) @http.route(prefix + 'user/activation-request', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def request_activation_user(self, **kw): email = kw.get('email') response = { 'activation_request': False, 'reason': None } user = self.get_user_by_email(email) if not user: response['reason'] = 'NOT_FOUND' return self.response(response) if user.active: response['reason'] = 'ACTIVE' return self.response(response) user.send_activation_mail() response['activation_request'] = True return self.response(response) @http.route(prefix + 'user/activation', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') response = { 'activation': False, 'reason': None, 'user': None } user = request.env['res.users'].search([('activation_token', '=', token), ('active', '=', False)], limit=1) if not user: response['reason'] = 'INVALID_TOKEN' return self.response(response) user.active = True user.activation_token = '' response.update({ 'activation': True, 'user': self.response_with_token(user) }) return self.response(response) @http.route(prefix + 'user/activation-token', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_token(self, **kw): return self.activation_user(**kw) @http.route(prefix + 'user/activation-otp', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def activation_user_with_otp(self, **kw): email = kw.get('email') otp = kw.get('otp') response = { 'activation': False, 'reason': None, 'user': None } user = self.get_user_by_email(email) if user.otp_code != otp: response['reason'] = 'INVALID_OTP' return self.response(response) user.active = True response.update({ 'activation': True, 'user': self.response_with_token(user) }) parameter = [ ('user_id', '=', user.partner_id.id) ] company_request = request.env['user.company.request'].search(parameter, limit=1) if company_request: user.parent_name = company_request.user_input if company_request.is_approve == False: user.send_company_request_mail() return self.response(response) @http.route(prefix + 'user/forgot-password', auth='public', methods=['POST'], csrf=False) @controller.Controller.must_authorized() def forgot_password_user(self, **kw): email = kw.get('email') user = self.get_user_by_email(email) if not user: return self.response({'success': False, 'reason': 'NOT_FOUND'}) token_source = string.ascii_letters + string.digits user.reset_password_token = ''.join( random.choice(token_source) for i in range(21)) return self.response({ 'success': True, 'token': user.reset_password_token, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/reset-password', auth='public', methods=['POST', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def reset_password_user(self, **kw): token = kw.get('token') if not token: return self.response(code=400, description='token is required') user = request.env['res.users'].search( [('reset_password_token', '=', token), ('active', 'in', [False, True])], limit=1) if not user: return self.response({'success': False, 'reason': 'INVALID_TOKEN'}) password = kw.get('password', '') user.password = password user.reset_password_token = '' return self.response({ 'success': True, 'user': request.env['res.users'].api_single_response(user) }) @http.route(prefix + 'user/', auth='public', methods=['PUT', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def update_user(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') allowed_field = ['name', 'phone', 'mobile', 'password'] for field in allowed_field: field_value = kw.get(field) if field_value or field_value == '': user[field] = field_value return self.response({ 'user': self.response_with_token(user) }) @http.route(prefix + 'user//after_request_tempo', auth='public', methods=['PUT', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def update_user_tempo(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') return self.response({ 'user': self.response_with_token(user, payment_tempo='Review') }) @http.route(prefix + 'user//address', auth='public', methods=['GET', 'OPTIONS']) @controller.Controller.must_authorized() def get_user_address_by_id(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) if not user: return self.response(code=404, description='User not found') partner_ids = [user.partner_id.id] + [x.id for x in user.child_ids] partners = request.env['res.partner'].search( [('id', 'in', partner_ids)], order='write_date DESC') address = [request.env['res.users'].api_address_response( x) for x in partners] return self.response(address) @http.route(prefix + 'user//switch', auth='public', methods=['PUT', 'OPTIONS'], csrf=False) @controller.Controller.must_authorized() def switch_account(self, **kw): id = kw.get('id') user = request.env['res.users'].search([('id', '=', id)], limit=1) response = { 'switch': False, 'reason': None } # Form Data npwp = kw.get('npwp') npwp_document = kw.get('npwp_document', False) npwp_filename = kw.get('npwp_filename', 'npwp_document') sppkp = kw.get('sppkp', False) sppkp_document = kw.get('sppkp_document', False) sppkp_filename = kw.get('sppkp_filename', 'sppkp_document') email_partner = kw.get('email_partner') business_name = kw.get('business_name', False) industry_id = int(kw.get('industry_id', '0') or 0) company_type_id = int(kw.get('company_type_id', '0') or 0) alamat_wajib_pajak = kw.get('alamat_wajib_pajak', False) alamat_bisnis = kw.get('alamat_bisnis', False) nama_wajib_pajak = kw.get('nama_wajib_pajak', False) is_pkp = kw.get('is_pkp') type_acc = kw.get('type_acc', False) response = { 'switch': False, 'reason': None } if type_acc == 'business': parameter = [ ('company_type', '=', 'company'), ('name', 'ilike', business_name) ] match_company = request.env['res.partner'].search(parameter, limit=1) match_ratio = 0 if match_company: match_ratio = SequenceMatcher(None, match_company.name, business_name).ratio() if match_ratio > 0.8: # Create a user company request request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': match_company.id, 'user_input': business_name }) else: if not nama_wajib_pajak and is_pkp == 'false': nama_wajib_pajak = business_name if not alamat_wajib_pajak and is_pkp == 'false': alamat_wajib_pajak = alamat_bisnis new_company_data = { 'name': business_name if business_name else business_name, 'company_type_id': company_type_id if company_type_id else False, 'industry_id': industry_id, 'customer_type': 'pkp' if is_pkp == 'true' else 'nonpkp', 'npwp': npwp if is_pkp == 'true' else (npwp if npwp else '00.000.000.0-000.000'), 'sppkp': sppkp if is_pkp == 'true' else (sppkp if sppkp else '-'), 'nama_wajib_pajak': nama_wajib_pajak, 'alamat_lengkap_text': alamat_wajib_pajak, 'email': email_partner, 'street': alamat_bisnis, 'company_type': 'company', 'user_id': 3222, 'property_account_receivable_id': 395, 'property_account_payable_id': 438, 'active': False, } new_company = request.env['res.partner'].create(new_company_data) request.env['user.company.request'].create({ 'user_id': user.partner_id.id, 'user_company_id': new_company.id, 'user_input': business_name }) # user.partner_id.parent_id = new_company.id # user.partner_id.customer_type = new_company.customer_type # user.partner_id.npwp = new_company.npwp # user.partner_id.sppkp = new_company.sppkp # user.partner_id.nama_wajib_pajak = new_company.nama_wajib_pajak # user.partner_id.alamat_lengkap_text = new_company.alamat_lengkap_text if npwp_document: npwp_mimetype, _ = mimetypes.guess_type(npwp_filename) npwp_mimetype = npwp_mimetype or 'application/octet-stream' pdf_data = base64.b64decode(npwp_document) npwp_attachment = request.env['ir.attachment'].create({ 'name': 'NPWP Document', 'type': 'binary', 'datas': base64.b64encode(pdf_data), 'res_model': 'res.partner', 'res_id': new_company.id, 'mimetype': npwp_mimetype }) new_company.message_post(body="NPWP Uploaded", attachment_ids=[npwp_attachment.id]) if sppkp_document: sppkp_mimetype, _ = mimetypes.guess_type(sppkp_filename) sppkp_mimetype = sppkp_mimetype or 'application/octet-stream' pdf_data = base64.b64decode(sppkp_document) sppkp_attachment = request.env['ir.attachment'].create({ 'name': 'SPPKP Document', 'type': 'binary', 'datas': base64.b64encode(pdf_data), 'res_model': 'res.partner', 'res_id': new_company.id, 'mimetype': sppkp_mimetype }) new_company.message_post(body="SPPKP Uploaded", attachment_ids=[sppkp_attachment.id]) response['switch'] = 'Pending' return self.response(response) @http.route(prefix + 'user//switch_progres', auth='public', methods=['GET', 'OPTIONS'], csrf=False) # @controller.Controller.must_authorized() def switch_account_progres(self, **kw): id = int(kw.get('id')) user = request.env['res.users'].search([('id', '=', id)], limit=1) response = { 'status': '' } parameter = [ ('user_id', '=', id) ] new_company_request = request.env['user.company.request'].search(parameter, limit=1) is_approve = '' if new_company_request: # Mengambil nilai is_approve if new_company_request.is_approve == False: response['status'] = 'pending' else: response['status'] = new_company_request.is_approve else: response['status'] = 'unknown' return self.response(response)