import base64 import datetime import functools import io import json from array import array import jwt from odoo import http from odoo.http import request from odoo.modules import get_module_resource from odoo.tools.config import config from PIL import Image from pytz import timezone class Controller(http.Controller): jwt_secret_key = "NTNv7j0TuYARvmNMmWXo6fKvM4o6nvaUi9ryX38ZHL1bkrnD1ObOQ8JAUmHCBq7Iy7otZcyAagBLHVKvvYaIpmMuxmARQ97jUVG16Jkpkp1wXOPsrF9zwew6TpczyHkHgX5EuLg2MeBuiTqJACs1J0apruOOJCggOtkjB" # jwt_secret_key = "8fc74567db6878107842b466a4d6622230167740" @staticmethod def must_authorized(private=False, private_key=''): def wrapper(func): @functools.wraps(func) def inner_wrapper(*args, **kwargs): self = args[0] auth = self.authenticate() if not auth: return self.unauthorized_response() self.set_user_pricelist_ctx() if private: auth_key = None if isinstance(auth, bool) else auth[private_key] param_key = int(kwargs.get(private_key, -1)) if auth_key != param_key: return self.unauthorized_response() return func(*args, **kwargs) return inner_wrapper return wrapper def authenticate(self): wsgienv = request.httprequest.environ try: db = wsgienv['HTTP_DB'] username = wsgienv['HTTP_USERNAME'] password = wsgienv['HTTP_PASSWORD'] request.session.authenticate(db, username, password) return True except: try: authorization = wsgienv['HTTP_AUTHORIZATION'] except: authorization = None request.session.authenticate(config.get('db_name'), 'it@fixcomart.co.id', 'P@ssvv0rd755') token = request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '' result = False if authorization == token: result = True user_token = self.verify_user_token() if user_token: result = user_token return result def set_user_pricelist_ctx(self): user_token = self.authenticate() pricelist = request.env['product.pricelist'].new() if isinstance(user_token, dict): partner = request.env['res.partner'].browse(user_token['partner_id']) if partner: pricelist = partner.property_product_pricelist context = request.env.context.copy() context.update({'user_pricelist': pricelist}) request.env.context = context def get_request_params(self, kw, queries): result = { 'valid': True, 'reason': [], 'value': {}, 'query': {} } for key, rules in queries.items(): is_number = 'number' in rules is_exclude_if_null = 'exclude_if_null' in rules alias = next((r.replace('alias:', '') for r in rules if r.startswith('alias:')), key) default = next((r.replace('default:', '') for r in rules if r.startswith('default:')), None) value = kw.get(key, '') if value in ['null', 'undefined']: value = '' if 'required' in rules and not value: result['reason'].append(f"{key} is required") if 'number' in rules and value and not value.isdigit(): result['reason'].append(f"{key} must be a number") result['query'][key] = value if not value and default: value = default if is_number and value.isdigit(): value = int(value) if not value and is_exclude_if_null: continue result['value'][alias] = value result['valid'] = not result['reason'] return result def time_to_str(self, object, format): time = '' if isinstance(object, datetime.datetime): time = object.astimezone(timezone('Asia/Jakarta')).strftime(format) return time def response(self, data=[], code=200, description='OK', headers=[]): response = { 'status': { 'code': code, 'description': description } } if code == 200: response.update({'result': data}) response = json.dumps(response) return request.make_response(response, [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', '*'), ('Access-Control-Allow-Methods', '*'), ('Content-Type', 'application/json'), ] + headers) def unauthorized_response(self): test = "masuk kesini unauthorized response" print(test) return self.response(code=401, description='Unauthorized') def search_filter(self, model: str, kw: dict, query: array = []): """ To search data by default API Params if exist """ limit = kw.get('limit', 0) offset = kw.get('offset', 0) order = kw.get('order', '') return request.env[model].search(query, limit=int(limit), offset=int(offset), order=order) def create_user_token(self, user): return jwt.encode({'id': user.id}, self.jwt_secret_key) def verify_user_token(self): try: token = request.httprequest.environ['HTTP_TOKEN'] user_token = jwt.decode(token, self.jwt_secret_key, algorithms=['HS256']) user = request.env['res.users'].browse([user_token['id']]) if not user: return False data = { 'user_id': user.id, 'partner_id': None } if user.partner_id: data['partner_id'] = user.partner_id.id return data except: return False def get_partner_child_ids(self, partner_id): partner = request.env['res.partner'].search([('id', '=', partner_id)], limit=1) partner_child_ids = [x['id'] for x in partner.child_ids] + [partner.id] if partner.parent_id: partner_child_ids += [x['id'] for x in partner.parent_id.child_ids] partner_child_ids += [partner.parent_id.id] return partner_child_ids @http.route('/api/token', auth='public', methods=['GET', 'OPTIONS']) def get_api_token(self, **kw): return self.response(request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '') @http.route('/api/ip-address', auth='public', methods=['GET', 'OPTIONS']) def get_ip_address(self): address = request.httprequest.remote_addr address = address if address != '127.0.0.1' else False return self.response(address) @http.route('/api/image///', auth='public', methods=['GET']) def get_image(self, model, field, id, **kw): model_name = model model = request.env[model].sudo().search([('id', '=', id)], limit=1) image = model[field] if field in model else '' if model_name in ['product.template']: version = '1' if field in ['image_256', 'image_512', 'image_1024', 'image_1920'] else '2' ratio = kw.get('ratio', '') variant = kw.get('variant', False) image = model[field] if version == '1' else model['image_256'] if not variant: image = self.add_watermark_to_image(image, ratio, version) response_headers = [ ('Content-Type', 'image/jpg'), ('Cache-Control', 'public, max-age=3600') ] return request.make_response( base64.b64decode(image), response_headers ) def add_watermark_to_image(self, image, ratio, version='1'): if not image: return '' LOGO_FILENAME = { '1': 'logo-indoteknik-gray.png', '2': 'logo-indoteknik-link.png' } logo_path = get_module_resource('indoteknik_api', 'static', 'src', 'images', LOGO_FILENAME.get(version)) logo_img = Image.open(logo_path).convert('RGBA') img_data = io.BytesIO(base64.b64decode(image)) img = Image.open(img_data).convert('RGBA') img_width, img_height = img.size longest_wh = max(img_height, img_width) # Resize logo image logo_img_w = img_width // 2.2 logo_img_h = img_height // 2.2 new_img = img if ratio == 'square': new_img = Image.new('RGBA', (longest_wh, longest_wh), (255, 255, 255, 255)) paste_x = (longest_wh - img_width) // 2 paste_y = (longest_wh - img_height) // 2 new_img.paste(img, (paste_x, paste_y), img) if version == '2': logo_img_w = img_width // 1.8 logo_img_h = img_height // 1.8 logo_img.thumbnail((logo_img_w, logo_img_h)) # Add watermark new_img.paste(logo_img, (12, 10), logo_img) buffered = io.BytesIO() new_img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode('utf-8')