import base64 import datetime import functools import io import json from array import array from io import BytesIO import jwt from odoo import http from odoo.http import request from odoo.modules import get_module_resource from odoo.tools.config import config from PIL import Image from PIL.WebPImagePlugin import Image from PIL import features from pytz import timezone class Controller(http.Controller): jwt_secret_key = "NTNv7j0TuYARvmNMmWXo6fKvM4o6nvaUi9ryX38ZHL1bkrnD1ObOQ8JAUmHCBq7Iy7otZcyAagBLHVKvvYaIpmMuxmARQ97jUVG16Jkpkp1wXOPsrF9zwew6TpczyHkHgX5EuLg2MeBuiTqJACs1J0apruOOJCggOtkjB" @staticmethod def must_authorized(private=False, private_key=''): def wrapper(func): @functools.wraps(func) def inner_wrapper(*args, **kwargs): self = args[0] auth = self.authenticate() if not auth: return self.unauthorized_response() self.set_user_pricelist_ctx() if private: auth_key = None if isinstance(auth, bool) else auth[private_key] param_key = int(kwargs.get(private_key, -1)) if auth_key != param_key: return self.unauthorized_response() return func(*args, **kwargs) return inner_wrapper return wrapper def authenticate(self): wsgienv = request.httprequest.environ try: db = wsgienv['HTTP_DB'] username = wsgienv['HTTP_USERNAME'] password = wsgienv['HTTP_PASSWORD'] request.session.authenticate(db, username, password) return True except: try: authorization = wsgienv['HTTP_AUTHORIZATION'] except: authorization = None request.session.authenticate(config.get('db_name'), 'it@fixcomart.co.id', 'Fixcomart378') token = request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '' result = False if authorization == token: result = True user_token = self.verify_user_token() if user_token: result = user_token return result def set_user_pricelist_ctx(self): user_token = self.authenticate() pricelist = request.env['product.pricelist'].new() if isinstance(user_token, dict): partner = request.env['res.partner'].browse(user_token['partner_id']) if partner: pricelist = partner.property_product_pricelist context = request.env.context.copy() context.update({'user_pricelist': pricelist}) request.env.context = context def get_request_params(self, kw, queries): result = { 'valid': True, 'reason': [], 'value': {}, 'query': {} } for key, rules in queries.items(): is_number = 'number' in rules is_boolean = 'boolean' in rules is_exclude_if_null = 'exclude_if_null' in rules alias = next((r.replace('alias:', '') for r in rules if r.startswith('alias:')), key) default = next((r.replace('default:', '') for r in rules if r.startswith('default:')), None) value = kw.get(key, '') if value in ['null', 'undefined']: value = '' if 'required' in rules and not value: result['reason'].append(f"{key} is required") if is_number and value and not value.isdigit(): result['reason'].append(f"{key} must be a number") if is_boolean and value and value.lower() not in ['true', 'false', '1', '0']: result['reason'].append(f"{key} must be a boolean") result['query'][key] = value if not value and default: value = default if is_number and value.isdigit(): value = int(value) if is_boolean: value = value.lower() in ['true', '1'] if not value and is_exclude_if_null: continue result['value'][alias] = value result['valid'] = not result['reason'] return result def time_to_str(self, object, format): time = '' if isinstance(object, datetime.datetime): time = object.astimezone(timezone('Asia/Jakarta')).strftime(format) return time def response(self, data=[], code=200, description='OK', headers=[]): response = { 'status': { 'code': code, 'description': description } } if code == 200: response.update({'result': data}) response = json.dumps(response) return request.make_response(response, [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', '*'), ('Access-Control-Allow-Methods', '*'), ('Content-Type', 'application/json'), ] + headers) def unauthorized_response(self): return self.response(code=401, description='Unauthorized') def search_filter(self, model: str, kw: dict, query: array = []): """ To search data by default API Params if exist """ limit = kw.get('limit', 0) offset = kw.get('offset', 0) order = kw.get('order', '') return request.env[model].search(query, limit=int(limit), offset=int(offset), order=order) def create_user_token(self, user): return jwt.encode({'id': user.id}, self.jwt_secret_key) def verify_user_token(self): try: token = request.httprequest.environ['HTTP_TOKEN'] user_token = jwt.decode(token, self.jwt_secret_key, algorithms=['HS256']) user = request.env['res.users'].browse([ user_token['id'] ]) if not user: return False data = { 'user_id': user.id, 'partner_id': None } if user.partner_id: data['partner_id'] = user.partner_id.id return data except: return False def get_partner_child_ids(self, partner_id): partner = request.env['res.partner'].search([('id', '=', partner_id)], limit=1) partner_child_ids = [x['id'] for x in partner.child_ids] + [partner.id] if partner.parent_id: partner_child_ids += [x['id'] for x in partner.parent_id.child_ids] partner_child_ids += [partner.parent_id.id] return partner_child_ids @http.route('/api/token', auth='public', methods=['GET', 'OPTIONS']) def get_api_token(self, **kw): return self.response(request.env['ir.config_parameter'].sudo().get_param('rest_api_token') or '') @http.route('/api/ip-address', auth='public', methods=['GET', 'OPTIONS']) def get_ip_address(self): address = request.httprequest.remote_addr address = address if address != '127.0.0.1' else False return self.response(address) @http.route('/api/image///', auth='public', methods=['GET']) def get_image(self, model, field, id, **kw): model_name = model model = request.env[model].sudo().search([('id', '=', id)], limit=1) image = model[field] if field in model else '' if model_name in ['product.template']: version = '1' if field in ['image_256', 'image_512', 'image_1024', 'image_1920'] else '2' ratio = kw.get('ratio', '') variant = kw.get('variant', False) image = model[field] if version == '1' else model['image_256'] if not variant: image = self.add_watermark_to_image(image, ratio, version) # image = self.convert_to_webp(image) response_headers = [ ('Content-Type', 'image/jpg'), ('Cache-Control', 'public, max-age=3600') ] return request.make_response( base64.b64decode(image), response_headers ) def convert_to_webp(self, image_base64): """Convert image from base64 to WebP format and return base64 WebP.""" try: print(f"Image base64 length: {len(image_base64)}") # Decode Base64 to Bytes image_data = base64.b64decode(image_base64) image = Image.open(BytesIO(image_data)) if image.format == "PNG" and image.mode != "RGBA": image = image.convert("RGBA") # Convert to WebP with BytesIO() as output: image.save(output, format="WEBP", quality=85) webp_data = output.getvalue() # Encode back to Base64 return base64.b64encode(webp_data).decode('utf-8') except Exception as e: print(f"Error details: {e}") # If conversion fails, return the original image request.env.cr.rollback() # Rollback any transactions return image_base64 def add_watermark_to_image(self, image, ratio, version = '1'): if not image: return '' LOGO_FILENAME = { '1': 'logo-indoteknik-gray.png', '2': 'logo-indoteknik-link.png' } logo_path = get_module_resource('indoteknik_api', 'static', 'src', 'images', LOGO_FILENAME.get(version)) logo_img = Image.open(logo_path).convert('RGBA') img_data = io.BytesIO(base64.b64decode(image)) img = Image.open(img_data).convert('RGBA') img_width, img_height = img.size longest_wh = max(img_height, img_width) # Resize logo image logo_img_w = img_width // 2.2 logo_img_h = img_height // 2.2 new_img = img if ratio == 'square': new_img = Image.new('RGBA', (longest_wh, longest_wh), (255, 255, 255, 255)) paste_x = (longest_wh - img_width) // 2 paste_y = (longest_wh - img_height) // 2 new_img.paste(img, (paste_x, paste_y), img) if version == '2': logo_img_w = img_width // 1.8 logo_img_h = img_height // 1.8 logo_img.thumbnail((logo_img_w, logo_img_h)) # Add watermark new_img.paste(logo_img, (12, 10), logo_img) buffered = io.BytesIO() new_img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode('utf-8')