from array import array import datetime import base64 import json from odoo import http from odoo.http import request from odoo.tools.config import config from pytz import timezone import jwt class Controller(http.Controller): jwt_secret_key = "NTNv7j0TuYARvmNMmWXo6fKvM4o6nvaUi9ryX38ZHL1bkrnD1ObOQ8JAUmHCBq7Iy7otZcyAagBLHVKvvYaIpmMuxmARQ97jUVG16Jkpkp1wXOPsrF9zwew6TpczyHkHgX5EuLg2MeBuiTqJACs1J0apruOOJCggOtkjB4c" def authenticate(self): wsgienv = request.httprequest.environ try: db = wsgienv['HTTP_DB'] username = wsgienv['HTTP_USERNAME'] password = wsgienv['HTTP_PASSWORD'] request.session.authenticate(db, username, password) return True except: try: authorization = wsgienv['HTTP_AUTHORIZATION'] except: authorization = None request.session.authenticate(config.get('db_name'), 'it@fixcomart.co.id', 'Fixcomart378') token = request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '' result = False if authorization == token: result = True user_token = self.verify_user_token() if user_token: result = user_token return result def get_request_params(self, kw, queries): result = { 'valid': True, 'reason': [], 'value': {}, 'query': {} } for key in queries: rules = queries[key] is_number = len([r for r in rules if r == 'number']) > 0 has_alias = [r for r in rules if r.startswith('alias:')] alias = key if len(has_alias) > 0: alias = has_alias[0].replace('alias:', '') has_default = [r for r in rules if r.startswith('default:')] default = None if len(has_default) > 0: default = has_default[0].replace('default:', '') value = kw.get(key, '') if value in ['null', 'undefined']: value = '' for rule in rules: if rule == 'required' and not value: result['reason'].append(key + ' is ' + rule) elif rule == 'number' and value and not value.isdigit(): result['reason'].append(key + ' must be ' + rule) result['query'][key] = value if not value and default: value = default if is_number and value.isdigit(): value = int(value) if not value and not default: value = None result['value'][alias] = value if len(result['reason']) > 0: result['valid'] = False return result def time_to_str(self, object, format): time = '' if isinstance(object, datetime.datetime): time = object.astimezone(timezone('Asia/Jakarta')).strftime(format) return time def response(self, data=[], code=200, description='OK'): response = { 'status': { 'code': code, 'description': description } } if code == 200: response.update({'result': data}) response = json.dumps(response) return request.make_response(response, [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', '*'), ('Access-Control-Allow-Methods', '*'), ('Content-Type', 'application/json'), ]) def unauthorized_response(self): return self.response(code=401, description='Unauthorized') def search_filter(self, model: str, kw: dict, query: array = []): """ To search data by default API Params if exist """ limit = kw.get('limit', 0) offset = kw.get('offset', 0) order = kw.get('order', '') return request.env[model].search(query, limit=int(limit), offset=int(offset), order=order) def create_user_token(self, user): return jwt.encode({'id': user.id}, self.jwt_secret_key) def verify_user_token(self): try: token = request.httprequest.environ['HTTP_TOKEN'] user_token = jwt.decode(token, self.jwt_secret_key, algorithms=['HS256']) user = request.env['res.users'].browse([ user_token['id'] ]) if not user: return False data = { 'id': user.id, 'partner_id': None } if user.partner_id: data['partner_id'] = user.partner_id.id return data except: return False def get_partner_child_ids(self, partner_id): parent_partner_id = request.env['res.partner'].search([('id', '=', partner_id)], limit=1) parent_partner_id = parent_partner_id.parent_id.id or parent_partner_id.id partner_childs = request.env['res.partner'].search([('parent_id', '=', int(parent_partner_id))]) partner_child_ids = [v['id'] for v in partner_childs] + [partner_id] return partner_child_ids @http.route('/api/token', auth='public', methods=['GET', 'OPTIONS']) def get_api_token(self, **kw): return self.response(request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '') @http.route('/api/image///', auth='public', methods=['GET']) def get_image(self, model, field, id): model = request.env[model].sudo().search([('id', '=', id)], limit=1) image = model[field] if model[field] else '' return request.make_response(base64.b64decode(image), [('Content-Type', 'image/jpg')])