diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/hw_escpos/controllers/main.py | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/hw_escpos/controllers/main.py')
| -rw-r--r-- | addons/hw_escpos/controllers/main.py | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/addons/hw_escpos/controllers/main.py b/addons/hw_escpos/controllers/main.py new file mode 100644 index 00000000..b1a01576 --- /dev/null +++ b/addons/hw_escpos/controllers/main.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from __future__ import print_function +import logging +import math +import os +import os.path +import subprocess +import time +import netifaces as ni +import traceback + +try: + from .. escpos import * + from .. escpos.exceptions import * + from .. escpos.printer import Usb +except ImportError: + escpos = printer = None + +from queue import Queue +from threading import Thread, Lock + +try: + import usb.core +except ImportError: + usb = None + +from odoo import http, _ +from odoo.addons.hw_drivers.controllers import proxy + +_logger = logging.getLogger(__name__) + +# workaround https://bugs.launchpad.net/openobject-server/+bug/947231 +# related to http://bugs.python.org/issue7980 +from datetime import datetime +datetime.strptime('2012-01-01', '%Y-%m-%d') + +class EscposDriver(Thread): + def __init__(self): + Thread.__init__(self) + self.queue = Queue() + self.lock = Lock() + self.status = {'status':'connecting', 'messages':[]} + + def connected_usb_devices(self): + connected = [] + + # printers can either define bDeviceClass=7, or they can define one of + # their interfaces with bInterfaceClass=7. This class checks for both. + class FindUsbClass(object): + def __init__(self, usb_class): + self._class = usb_class + def __call__(self, device): + # first, let's check the device + if device.bDeviceClass == self._class: + return True + # transverse all devices and look through their interfaces to + # find a matching class + for cfg in device: + intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class) + + if intf is not None: + return True + + return False + + printers = usb.core.find(find_all=True, custom_match=FindUsbClass(7)) + + # if no printers are found after this step we will take the + # first epson or star device we can find. + # epson + if not printers: + printers = usb.core.find(find_all=True, idVendor=0x04b8) + # star + if not printers: + printers = usb.core.find(find_all=True, idVendor=0x0519) + + for printer in printers: + try: + description = usb.util.get_string(printer, printer.iManufacturer) + " " + usb.util.get_string(printer, printer.iProduct) + except Exception as e: + _logger.error("Can not get printer description: %s" % e) + description = 'Unknown printer' + connected.append({ + 'vendor': printer.idVendor, + 'product': printer.idProduct, + 'name': description + }) + + return connected + + def lockedstart(self): + with self.lock: + if not self.is_alive(): + self.daemon = True + self.start() + + def get_escpos_printer(self): + + printers = self.connected_usb_devices() + if len(printers) > 0: + try: + print_dev = Usb(printers[0]['vendor'], printers[0]['product']) + except HandleDeviceError: + # Escpos printers are now integrated to PrinterDriver, if the IoTBox is printing + # through Cups at the same time, we get an USBError(16, 'Resource busy'). This means + # that the Odoo instance connected to this IoTBox is up to date and no longer uses + # this escpos library. + return None + self.set_status( + 'connected', + "Connected to %s (in=0x%02x,out=0x%02x)" % (printers[0]['name'], print_dev.in_ep, print_dev.out_ep) + ) + return print_dev + else: + self.set_status('disconnected','Printer Not Found') + return None + + def get_status(self): + self.push_task('status') + return self.status + + def open_cashbox(self,printer): + printer.cashdraw(2) + printer.cashdraw(5) + + def set_status(self, status, message = None): + _logger.info(status+' : '+ (message or 'no message')) + if status == self.status['status']: + if message != None and (len(self.status['messages']) == 0 or message != self.status['messages'][-1]): + self.status['messages'].append(message) + else: + self.status['status'] = status + if message: + self.status['messages'] = [message] + else: + self.status['messages'] = [] + + if status == 'error' and message: + _logger.error('ESC/POS Error: %s', message) + elif status == 'disconnected' and message: + _logger.warning('ESC/POS Device Disconnected: %s', message) + + def run(self): + printer = None + if not escpos: + _logger.error('ESC/POS cannot initialize, please verify system dependencies.') + return + while True: + try: + error = True + timestamp, task, data = self.queue.get(True) + + printer = self.get_escpos_printer() + + if printer == None: + if task != 'status': + self.queue.put((timestamp,task,data)) + error = False + time.sleep(5) + continue + elif task == 'receipt': + if timestamp >= time.time() - 1 * 60 * 60: + self.print_receipt_body(printer,data) + printer.cut() + elif task == 'xml_receipt': + if timestamp >= time.time() - 1 * 60 * 60: + printer.receipt(data) + elif task == 'cashbox': + if timestamp >= time.time() - 12: + self.open_cashbox(printer) + elif task == 'status': + pass + error = False + + except NoDeviceError as e: + print("No device found %s" % e) + except HandleDeviceError as e: + printer = None + print("Impossible to handle the device due to previous error %s" % e) + except TicketNotPrinted as e: + print("The ticket does not seems to have been fully printed %s" % e) + except NoStatusError as e: + print("Impossible to get the status of the printer %s" % e) + except Exception as e: + self.set_status('error') + _logger.exception(e) + finally: + if error: + self.queue.put((timestamp, task, data)) + if printer: + printer.close() + printer = None + + def push_task(self,task, data = None): + self.lockedstart() + self.queue.put((time.time(),task,data)) + + def print_receipt_body(self,eprint,receipt): + + def check(string): + return string != True and bool(string) and string.strip() + + def price(amount): + return ("{0:."+str(receipt['precision']['price'])+"f}").format(amount) + + def money(amount): + return ("{0:."+str(receipt['precision']['money'])+"f}").format(amount) + + def quantity(amount): + if math.floor(amount) != amount: + return ("{0:."+str(receipt['precision']['quantity'])+"f}").format(amount) + else: + return str(amount) + + def printline(left, right='', width=40, ratio=0.5, indent=0): + lwidth = int(width * ratio) + rwidth = width - lwidth + lwidth = lwidth - indent + + left = left[:lwidth] + if len(left) != lwidth: + left = left + ' ' * (lwidth - len(left)) + + right = right[-rwidth:] + if len(right) != rwidth: + right = ' ' * (rwidth - len(right)) + right + + return ' ' * indent + left + right + '\n' + + def print_taxes(): + taxes = receipt['tax_details'] + for tax in taxes: + eprint.text(printline(tax['tax']['name'],price(tax['amount']), width=40,ratio=0.6)) + + # Receipt Header + if receipt['company']['logo']: + eprint.set(align='center') + eprint.print_base64_image(receipt['company']['logo']) + eprint.text('\n') + else: + eprint.set(align='center',type='b',height=2,width=2) + eprint.text(receipt['company']['name'] + '\n') + + eprint.set(align='center',type='b') + if check(receipt['company']['contact_address']): + eprint.text(receipt['company']['contact_address'] + '\n') + if check(receipt['company']['phone']): + eprint.text('Tel:' + receipt['company']['phone'] + '\n') + if check(receipt['company']['vat']): + eprint.text('VAT:' + receipt['company']['vat'] + '\n') + if check(receipt['company']['email']): + eprint.text(receipt['company']['email'] + '\n') + if check(receipt['company']['website']): + eprint.text(receipt['company']['website'] + '\n') + if check(receipt['header']): + eprint.text(receipt['header']+'\n') + if check(receipt['cashier']): + eprint.text('-'*32+'\n') + eprint.text('Served by '+receipt['cashier']+'\n') + + # Orderlines + eprint.text('\n\n') + eprint.set(align='center') + for line in receipt['orderlines']: + pricestr = price(line['price_display']) + if line['discount'] == 0 and line['unit_name'] == 'Units' and line['quantity'] == 1: + eprint.text(printline(line['product_name'],pricestr,ratio=0.6)) + else: + eprint.text(printline(line['product_name'],ratio=0.6)) + if line['discount'] != 0: + eprint.text(printline('Discount: '+str(line['discount'])+'%', ratio=0.6, indent=2)) + if line['unit_name'] == 'Units': + eprint.text( printline( quantity(line['quantity']) + ' x ' + price(line['price']), pricestr, ratio=0.6, indent=2)) + else: + eprint.text( printline( quantity(line['quantity']) + line['unit_name'] + ' x ' + price(line['price']), pricestr, ratio=0.6, indent=2)) + + # Subtotal if the taxes are not included + taxincluded = True + if money(receipt['subtotal']) != money(receipt['total_with_tax']): + eprint.text(printline('','-------')); + eprint.text(printline(_('Subtotal'),money(receipt['subtotal']),width=40, ratio=0.6)) + print_taxes() + #eprint.text(printline(_('Taxes'),money(receipt['total_tax']),width=40, ratio=0.6)) + taxincluded = False + + + # Total + eprint.text(printline('','-------')); + eprint.set(align='center',height=2) + eprint.text(printline(_(' TOTAL'),money(receipt['total_with_tax']),width=40, ratio=0.6)) + eprint.text('\n\n'); + + # Paymentlines + eprint.set(align='center') + for line in receipt['paymentlines']: + eprint.text(printline(line['journal'], money(line['amount']), ratio=0.6)) + + eprint.text('\n'); + eprint.set(align='center',height=2) + eprint.text(printline(_(' CHANGE'),money(receipt['change']),width=40, ratio=0.6)) + eprint.set(align='center') + eprint.text('\n'); + + # Extra Payment info + if receipt['total_discount'] != 0: + eprint.text(printline(_('Discounts'),money(receipt['total_discount']),width=40, ratio=0.6)) + if taxincluded: + print_taxes() + #eprint.text(printline(_('Taxes'),money(receipt['total_tax']),width=40, ratio=0.6)) + + # Footer + if check(receipt['footer']): + eprint.text('\n'+receipt['footer']+'\n\n') + eprint.text(receipt['name']+'\n') + eprint.text( str(receipt['date']['date']).zfill(2) + +'/'+ str(receipt['date']['month']+1).zfill(2) + +'/'+ str(receipt['date']['year']).zfill(4) + +' '+ str(receipt['date']['hour']).zfill(2) + +':'+ str(receipt['date']['minute']).zfill(2) ) + + +driver = EscposDriver() + +proxy.proxy_drivers['escpos'] = driver + + +class EscposProxy(proxy.ProxyController): + + @http.route('/hw_proxy/open_cashbox', type='json', auth='none', cors='*') + def open_cashbox(self): + _logger.info('ESC/POS: OPEN CASHBOX') + driver.push_task('cashbox') + + @http.route('/hw_proxy/print_receipt', type='json', auth='none', cors='*') + def print_receipt(self, receipt): + _logger.info('ESC/POS: PRINT RECEIPT') + driver.push_task('receipt',receipt) + + @http.route('/hw_proxy/print_xml_receipt', type='json', auth='none', cors='*') + def print_xml_receipt(self, receipt): + _logger.info('ESC/POS: PRINT XML RECEIPT') + driver.push_task('xml_receipt',receipt) |
