from array import array import datetime import base64 import json from odoo import http from odoo.http import request from odoo.tools.config import config from pytz import timezone import jwt import functools class Controller(http.Controller): jwt_secret_key = "NTNv7j0TuYARvmNMmWXo6fKvM4o6nvaUi9ryX38ZHL1bkrnD1ObOQ8JAUmHCBq7Iy7otZcyAagBLHVKvvYaIpmMuxmARQ97jUVG16Jkpkp1wXOPsrF9zwew6TpczyHkHgX5EuLg2MeBuiTqJACs1J0apruOOJCggOtkjB4c" @staticmethod def must_authorized(private=False, private_key=''): def wrapper(func): @functools.wraps(func) def inner_wrapper(*args, **kwargs): self = args[0] auth = self.authenticate() if not auth: return self.unauthorized_response() if private: auth_key = int(auth[private_key]) param_key = int(kwargs.get(private_key, -1)) if auth_key != param_key: return self.unauthorized_response() return func(*args, **kwargs) return inner_wrapper return wrapper def authenticate(self): wsgienv = request.httprequest.environ try: db = wsgienv['HTTP_DB'] username = wsgienv['HTTP_USERNAME'] password = wsgienv['HTTP_PASSWORD'] request.session.authenticate(db, username, password) return True except: try: authorization = wsgienv['HTTP_AUTHORIZATION'] except: authorization = None request.session.authenticate(config.get('db_name'), 'it@fixcomart.co.id', 'Fixcomart378') token = request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '' result = False if authorization == token: result = True user_token = self.verify_user_token() if user_token: result = user_token return result def user_pricelist(self): user_token = self.authenticate() pricelist = False if isinstance(user_token, dict): partner = request.env['res.partner'].browse(user_token['partner_id']) if partner: pricelist = partner.property_product_pricelist return pricelist def get_request_params(self, kw, queries): result = { 'valid': True, 'reason': [], 'value': {}, 'query': {} } for key, rules in queries.items(): is_number = 'number' in rules is_exclude_if_null = 'exclude_if_null' in rules alias = next((r.replace('alias:', '') for r in rules if r.startswith('alias:')), key) default = next((r.replace('default:', '') for r in rules if r.startswith('default:')), None) value = kw.get(key, '') if value in ['null', 'undefined']: value = '' if 'required' in rules and not value: result['reason'].append(f"{key} is required") if 'number' in rules and value and not value.isdigit(): result['reason'].append(f"{key} must be a number") result['query'][key] = value if not value and default: value = default if is_number and value.isdigit(): value = int(value) if not value and is_exclude_if_null: continue result['value'][alias] = value result['valid'] = not result['reason'] return result def time_to_str(self, object, format): time = '' if isinstance(object, datetime.datetime): time = object.astimezone(timezone('Asia/Jakarta')).strftime(format) return time def response(self, data=[], code=200, description='OK', headers=[]): response = { 'status': { 'code': code, 'description': description } } if code == 200: response.update({'result': data}) response = json.dumps(response) return request.make_response(response, [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', '*'), ('Access-Control-Allow-Methods', '*'), ('Content-Type', 'application/json'), ] + headers) def unauthorized_response(self): return self.response(code=401, description='Unauthorized') def search_filter(self, model: str, kw: dict, query: array = []): """ To search data by default API Params if exist """ limit = kw.get('limit', 0) offset = kw.get('offset', 0) order = kw.get('order', '') return request.env[model].search(query, limit=int(limit), offset=int(offset), order=order) def create_user_token(self, user): return jwt.encode({'id': user.id}, self.jwt_secret_key) def verify_user_token(self): try: token = request.httprequest.environ['HTTP_TOKEN'] user_token = jwt.decode(token, self.jwt_secret_key, algorithms=['HS256']) user = request.env['res.users'].browse([ user_token['id'] ]) if not user: return False data = { 'user_id': user.id, 'partner_id': None } if user.partner_id: data['partner_id'] = user.partner_id.id return data except: return False def get_partner_child_ids(self, partner_id): partner = request.env['res.partner'].search([('id', '=', partner_id)], limit=1) partner_child_ids = [x['id'] for x in partner.child_ids] + [partner.id] if partner.parent_id: partner_child_ids += [x['id'] for x in partner.parent_id.child_ids] partner_child_ids += [partner.parent_id.id] return partner_child_ids @http.route('/api/token', auth='public', methods=['GET', 'OPTIONS']) def get_api_token(self, **kw): return self.response(request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '') @http.route('/api/ip-address', auth='public', methods=['GET', 'OPTIONS']) def get_ip_address(self): address = request.httprequest.remote_addr address = address if address != '127.0.0.1' else False return self.response(address) @http.route('/api/image///', auth='public', methods=['GET']) def get_image(self, model, field, id): model = request.env[model].sudo().search([('id', '=', id)], limit=1) image = model[field] if model[field] else '' request.env['user.activity.log'].record_activity() return request.make_response(base64.b64decode(image), [('Content-Type', 'image/jpg')])