from odoo import api, fields, models, _ from odoo.exceptions import UserError import time import requests import json import hmac import base64 from hashlib import sha256 Request_URI = '/openapi/order/v1/batch-get' ACCESS_KEY = '24bb6a1ec618ec6a' SECRET_KEY = '32e4a78ad05ee230' class DetailOrder(models.Model): _name = "detail.order" _inherit = ['mail.thread'] json_ginee = fields.Text('JSON Ginee') detail_order = fields.Text() execute_status = fields.Selection([ ('from_webhook', 'From Webhook'), ('detail_order', 'Detail Order'), ('so_confirm', 'SO Confirm'), ('done', 'Done'), ('failed', 'Failed'), ], 'Execute Status') sale_id = fields.Many2one('sale.order', 'Sale Order') picking_id = fields.Many2one('stock.picking', 'Picking') invoice_id = fields.Many2one('account.move', 'Invoice') message_error = fields.Text('Message Error') # get detail order section def get_order_id(self): try: if self.json_ginee: json_data = json.loads(self.json_ginee) order_id = json_data.get('payload', {}).get('orderId') if not order_id: raise UserError(_("Order ID not found in JSON data")) return order_id raise UserError(_("No JSON data available")) except json.JSONDecodeError: raise UserError(_("Invalid JSON format in json_ginee field")) except Exception as e: raise UserError(_("Error extracting order ID: %s") % str(e)) def process_queue_item(self, limit=100, max_exec_time=30): domain = [('execute_status', '=', False)] records = self.search(domain, order='create_date asc', limit=limit) start_time = time.time() for rec in records: end_time = time.time() elapsed_time = end_time - start_time if elapsed_time > max_exec_time: break rec.execute_queue() def execute_queue(self): try: order_id = self.get_order_id() authorization = self.sign_request() headers = { 'Content-Type': 'application/json', 'X-Advai-Country': 'ID', 'Authorization': authorization } payload = { "orderIds": [order_id] } # URL endpoint Ginee url = "https://api.ginee.com/openapi/order/v1/batch-get" # Melakukan POST request response = requests.post( url, headers=headers, data=json.dumps(payload) ) # Cek status response if response.status_code == 200: data = response.json() self.detail_order = json.dumps(data) self.execute_status = 'detail_order' else: self.write({ 'execute_status': 'failed', 'json_ginee': json.dumps({ 'error': f"Request failed with status code {response.status_code}", 'response': response.text }) }) except Exception as e: self.write({ 'execute_status': 'failed', 'json_ginee': json.dumps({ 'error': str(e) }) }) # detail order to so section def get_order_id_detail(self): try: if self.detail_order: json_data = json.loads(self.detail_order) order_id = json_data.get('data', {})[0].get('orderId') order_status = json_data.get('data', {})[0].get('orderStatus') print_info = json_data.get('data', {})[0].get('printInfo', {}).get('labelPrintStatus') if not order_id: raise UserError(_("Order ID not found in JSON data")) return order_id, order_status, print_info raise UserError(_("No JSON data available")) except json.JSONDecodeError: raise UserError(_("Invalid JSON format in detail_order field")) except Exception as e: raise UserError(_("Error extracting order ID: %s") % str(e)) def process_queue_item_detail(self, limit=100): domain = [('execute_status', '=', 'detail_order')] records = self.search(domain, order='create_date asc', limit=limit) for rec in records: rec.execute_queue_detail() def prepare_data_so(self, json_data): data = { 'partner_id': 45, 'client_order_ref': json_data.get('data', {})[0].get('orderId'), 'warehouse_id': 4, 'picking_policy': 'direct', } return data def prepare_data_so_line(self, json_data): order_lines = [] items = json_data.get('data', [{}])[0].get('items', []) for item in items: product = self.env['product.product'].search( [('default_code', '=', item.get('sku'))], limit=1 ) if not product: raise UserError(_("Product not found for SKU: %s") % item.get('sku')) line_data = { 'product_id': product.id, 'product_uom_qty': item.get('quantity'), 'price_unit': item.get('actualPrice'), } order_lines.append((0, 0, line_data)) return order_lines def execute_queue_detail(self): try: json_data = json.loads(self.detail_order) data = self.prepare_data_so(json_data) order_lines = self.prepare_data_so_line(json_data) order_id, order_status, print_info = self.get_order_id_detail() if order_status == 'READY_TO_SHIP' and print_info == 'NOT_PRINTED': # if order_status == 'SHIPPING' and print_info == 'PRINTED': data['order_line'] = order_lines sale_order = self.env['sale.order'].create(data) self.sale_id = sale_order.id sale_order.action_confirm() self.picking_id = sale_order.picking_ids[0].id self.execute_status = 'so_confirm' elif order_status == 'READY_TO_SHIP' and print_info == 'PRINTED': data['order_line'] = order_lines sale_order = self.env['sale.order'].create(data) self.sale_id = sale_order.id sale_order.action_confirm() self.picking_id = sale_order.picking_ids[0].id self.picking_id.sync_qty_reserved_qty_done() self.execute_status = 'so_confirm' except Exception as e: self.write({ 'execute_status': 'failed', 'message_error': json.dumps({ 'error': str(e) }) }) def sign_request(self): signData = '$'.join(['POST', Request_URI]) + '$' authorization = ACCESS_KEY + ':' + base64.b64encode( hmac.new(SECRET_KEY.encode('utf-8'), signData.encode('utf-8'), digestmod=sha256).digest() ).decode('ascii') return authorization # check print do section def get_order_id_check_print(self): try: if self.detail_order: json_data = json.loads(self.detail_order) order_id = json_data.get('data', {})[0].get('orderId') if not order_id: raise UserError(_("Order ID not found in JSON data")) return order_id raise UserError(_("No JSON data available")) except json.JSONDecodeError: raise UserError(_("Invalid JSON format in check_print field")) except Exception as e: raise UserError(_("Error extracting order ID: %s") % str(e)) def process_queue_item_check_print(self, limit=100): domain = [('picking_id.state', 'not in', ['done', 'cancel'])] records = self.search(domain, order='create_date asc', limit=limit) for rec in records: rec.execute_queue_check_print() def execute_queue_check_print(self): try: order_id = self.get_order_id_check_print() authorization = self.sign_request() headers = { 'Content-Type': 'application/json', 'X-Advai-Country': 'ID', 'Authorization': authorization } payload = { "orderIds": [order_id] } url = "https://api.ginee.com/openapi/order/v1/batch-get" response = requests.post( url, headers=headers, data=json.dumps(payload) ) if response.status_code == 200: data = response.json() self.detail_order = json.dumps(data) detail_order = json.loads(self.detail_order) if detail_order['data'][0]['printInfo']['labelPrintStatus'] == 'PRINTED': #ubah ke printed lagi nanti self.picking_id.sync_qty_reserved_qty_done() # self.sale_id.api_create_invoices(self.sale_id.id) self.execute_status = 'done' else: self.write({ 'execute_status': 'failed', 'json_ginee': json.dumps({ 'error': f"Request failed with status code {response.status_code}", 'response': response.text }) }) except Exception as e: self.write({ 'execute_status': 'failed', 'json_ginee': json.dumps({ 'error': str(e) }) })