summaryrefslogtreecommitdiff
path: root/addons/hw_escpos/controllers
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/hw_escpos/controllers
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/hw_escpos/controllers')
-rw-r--r--addons/hw_escpos/controllers/__init__.py4
-rw-r--r--addons/hw_escpos/controllers/main.py344
2 files changed, 348 insertions, 0 deletions
diff --git a/addons/hw_escpos/controllers/__init__.py b/addons/hw_escpos/controllers/__init__.py
new file mode 100644
index 00000000..5d4b25db
--- /dev/null
+++ b/addons/hw_escpos/controllers/__init__.py
@@ -0,0 +1,4 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from . import main
diff --git a/addons/hw_escpos/controllers/main.py b/addons/hw_escpos/controllers/main.py
new file mode 100644
index 00000000..b1a01576
--- /dev/null
+++ b/addons/hw_escpos/controllers/main.py
@@ -0,0 +1,344 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from __future__ import print_function
+import logging
+import math
+import os
+import os.path
+import subprocess
+import time
+import netifaces as ni
+import traceback
+
+try:
+ from .. escpos import *
+ from .. escpos.exceptions import *
+ from .. escpos.printer import Usb
+except ImportError:
+ escpos = printer = None
+
+from queue import Queue
+from threading import Thread, Lock
+
+try:
+ import usb.core
+except ImportError:
+ usb = None
+
+from odoo import http, _
+from odoo.addons.hw_drivers.controllers import proxy
+
+_logger = logging.getLogger(__name__)
+
+# workaround https://bugs.launchpad.net/openobject-server/+bug/947231
+# related to http://bugs.python.org/issue7980
+from datetime import datetime
+datetime.strptime('2012-01-01', '%Y-%m-%d')
+
+class EscposDriver(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.queue = Queue()
+ self.lock = Lock()
+ self.status = {'status':'connecting', 'messages':[]}
+
+ def connected_usb_devices(self):
+ connected = []
+
+ # printers can either define bDeviceClass=7, or they can define one of
+ # their interfaces with bInterfaceClass=7. This class checks for both.
+ class FindUsbClass(object):
+ def __init__(self, usb_class):
+ self._class = usb_class
+ def __call__(self, device):
+ # first, let's check the device
+ if device.bDeviceClass == self._class:
+ return True
+ # transverse all devices and look through their interfaces to
+ # find a matching class
+ for cfg in device:
+ intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
+
+ if intf is not None:
+ return True
+
+ return False
+
+ printers = usb.core.find(find_all=True, custom_match=FindUsbClass(7))
+
+ # if no printers are found after this step we will take the
+ # first epson or star device we can find.
+ # epson
+ if not printers:
+ printers = usb.core.find(find_all=True, idVendor=0x04b8)
+ # star
+ if not printers:
+ printers = usb.core.find(find_all=True, idVendor=0x0519)
+
+ for printer in printers:
+ try:
+ description = usb.util.get_string(printer, printer.iManufacturer) + " " + usb.util.get_string(printer, printer.iProduct)
+ except Exception as e:
+ _logger.error("Can not get printer description: %s" % e)
+ description = 'Unknown printer'
+ connected.append({
+ 'vendor': printer.idVendor,
+ 'product': printer.idProduct,
+ 'name': description
+ })
+
+ return connected
+
+ def lockedstart(self):
+ with self.lock:
+ if not self.is_alive():
+ self.daemon = True
+ self.start()
+
+ def get_escpos_printer(self):
+
+ printers = self.connected_usb_devices()
+ if len(printers) > 0:
+ try:
+ print_dev = Usb(printers[0]['vendor'], printers[0]['product'])
+ except HandleDeviceError:
+ # Escpos printers are now integrated to PrinterDriver, if the IoTBox is printing
+ # through Cups at the same time, we get an USBError(16, 'Resource busy'). This means
+ # that the Odoo instance connected to this IoTBox is up to date and no longer uses
+ # this escpos library.
+ return None
+ self.set_status(
+ 'connected',
+ "Connected to %s (in=0x%02x,out=0x%02x)" % (printers[0]['name'], print_dev.in_ep, print_dev.out_ep)
+ )
+ return print_dev
+ else:
+ self.set_status('disconnected','Printer Not Found')
+ return None
+
+ def get_status(self):
+ self.push_task('status')
+ return self.status
+
+ def open_cashbox(self,printer):
+ printer.cashdraw(2)
+ printer.cashdraw(5)
+
+ def set_status(self, status, message = None):
+ _logger.info(status+' : '+ (message or 'no message'))
+ if status == self.status['status']:
+ if message != None and (len(self.status['messages']) == 0 or message != self.status['messages'][-1]):
+ self.status['messages'].append(message)
+ else:
+ self.status['status'] = status
+ if message:
+ self.status['messages'] = [message]
+ else:
+ self.status['messages'] = []
+
+ if status == 'error' and message:
+ _logger.error('ESC/POS Error: %s', message)
+ elif status == 'disconnected' and message:
+ _logger.warning('ESC/POS Device Disconnected: %s', message)
+
+ def run(self):
+ printer = None
+ if not escpos:
+ _logger.error('ESC/POS cannot initialize, please verify system dependencies.')
+ return
+ while True:
+ try:
+ error = True
+ timestamp, task, data = self.queue.get(True)
+
+ printer = self.get_escpos_printer()
+
+ if printer == None:
+ if task != 'status':
+ self.queue.put((timestamp,task,data))
+ error = False
+ time.sleep(5)
+ continue
+ elif task == 'receipt':
+ if timestamp >= time.time() - 1 * 60 * 60:
+ self.print_receipt_body(printer,data)
+ printer.cut()
+ elif task == 'xml_receipt':
+ if timestamp >= time.time() - 1 * 60 * 60:
+ printer.receipt(data)
+ elif task == 'cashbox':
+ if timestamp >= time.time() - 12:
+ self.open_cashbox(printer)
+ elif task == 'status':
+ pass
+ error = False
+
+ except NoDeviceError as e:
+ print("No device found %s" % e)
+ except HandleDeviceError as e:
+ printer = None
+ print("Impossible to handle the device due to previous error %s" % e)
+ except TicketNotPrinted as e:
+ print("The ticket does not seems to have been fully printed %s" % e)
+ except NoStatusError as e:
+ print("Impossible to get the status of the printer %s" % e)
+ except Exception as e:
+ self.set_status('error')
+ _logger.exception(e)
+ finally:
+ if error:
+ self.queue.put((timestamp, task, data))
+ if printer:
+ printer.close()
+ printer = None
+
+ def push_task(self,task, data = None):
+ self.lockedstart()
+ self.queue.put((time.time(),task,data))
+
+ def print_receipt_body(self,eprint,receipt):
+
+ def check(string):
+ return string != True and bool(string) and string.strip()
+
+ def price(amount):
+ return ("{0:."+str(receipt['precision']['price'])+"f}").format(amount)
+
+ def money(amount):
+ return ("{0:."+str(receipt['precision']['money'])+"f}").format(amount)
+
+ def quantity(amount):
+ if math.floor(amount) != amount:
+ return ("{0:."+str(receipt['precision']['quantity'])+"f}").format(amount)
+ else:
+ return str(amount)
+
+ def printline(left, right='', width=40, ratio=0.5, indent=0):
+ lwidth = int(width * ratio)
+ rwidth = width - lwidth
+ lwidth = lwidth - indent
+
+ left = left[:lwidth]
+ if len(left) != lwidth:
+ left = left + ' ' * (lwidth - len(left))
+
+ right = right[-rwidth:]
+ if len(right) != rwidth:
+ right = ' ' * (rwidth - len(right)) + right
+
+ return ' ' * indent + left + right + '\n'
+
+ def print_taxes():
+ taxes = receipt['tax_details']
+ for tax in taxes:
+ eprint.text(printline(tax['tax']['name'],price(tax['amount']), width=40,ratio=0.6))
+
+ # Receipt Header
+ if receipt['company']['logo']:
+ eprint.set(align='center')
+ eprint.print_base64_image(receipt['company']['logo'])
+ eprint.text('\n')
+ else:
+ eprint.set(align='center',type='b',height=2,width=2)
+ eprint.text(receipt['company']['name'] + '\n')
+
+ eprint.set(align='center',type='b')
+ if check(receipt['company']['contact_address']):
+ eprint.text(receipt['company']['contact_address'] + '\n')
+ if check(receipt['company']['phone']):
+ eprint.text('Tel:' + receipt['company']['phone'] + '\n')
+ if check(receipt['company']['vat']):
+ eprint.text('VAT:' + receipt['company']['vat'] + '\n')
+ if check(receipt['company']['email']):
+ eprint.text(receipt['company']['email'] + '\n')
+ if check(receipt['company']['website']):
+ eprint.text(receipt['company']['website'] + '\n')
+ if check(receipt['header']):
+ eprint.text(receipt['header']+'\n')
+ if check(receipt['cashier']):
+ eprint.text('-'*32+'\n')
+ eprint.text('Served by '+receipt['cashier']+'\n')
+
+ # Orderlines
+ eprint.text('\n\n')
+ eprint.set(align='center')
+ for line in receipt['orderlines']:
+ pricestr = price(line['price_display'])
+ if line['discount'] == 0 and line['unit_name'] == 'Units' and line['quantity'] == 1:
+ eprint.text(printline(line['product_name'],pricestr,ratio=0.6))
+ else:
+ eprint.text(printline(line['product_name'],ratio=0.6))
+ if line['discount'] != 0:
+ eprint.text(printline('Discount: '+str(line['discount'])+'%', ratio=0.6, indent=2))
+ if line['unit_name'] == 'Units':
+ eprint.text( printline( quantity(line['quantity']) + ' x ' + price(line['price']), pricestr, ratio=0.6, indent=2))
+ else:
+ eprint.text( printline( quantity(line['quantity']) + line['unit_name'] + ' x ' + price(line['price']), pricestr, ratio=0.6, indent=2))
+
+ # Subtotal if the taxes are not included
+ taxincluded = True
+ if money(receipt['subtotal']) != money(receipt['total_with_tax']):
+ eprint.text(printline('','-------'));
+ eprint.text(printline(_('Subtotal'),money(receipt['subtotal']),width=40, ratio=0.6))
+ print_taxes()
+ #eprint.text(printline(_('Taxes'),money(receipt['total_tax']),width=40, ratio=0.6))
+ taxincluded = False
+
+
+ # Total
+ eprint.text(printline('','-------'));
+ eprint.set(align='center',height=2)
+ eprint.text(printline(_(' TOTAL'),money(receipt['total_with_tax']),width=40, ratio=0.6))
+ eprint.text('\n\n');
+
+ # Paymentlines
+ eprint.set(align='center')
+ for line in receipt['paymentlines']:
+ eprint.text(printline(line['journal'], money(line['amount']), ratio=0.6))
+
+ eprint.text('\n');
+ eprint.set(align='center',height=2)
+ eprint.text(printline(_(' CHANGE'),money(receipt['change']),width=40, ratio=0.6))
+ eprint.set(align='center')
+ eprint.text('\n');
+
+ # Extra Payment info
+ if receipt['total_discount'] != 0:
+ eprint.text(printline(_('Discounts'),money(receipt['total_discount']),width=40, ratio=0.6))
+ if taxincluded:
+ print_taxes()
+ #eprint.text(printline(_('Taxes'),money(receipt['total_tax']),width=40, ratio=0.6))
+
+ # Footer
+ if check(receipt['footer']):
+ eprint.text('\n'+receipt['footer']+'\n\n')
+ eprint.text(receipt['name']+'\n')
+ eprint.text( str(receipt['date']['date']).zfill(2)
+ +'/'+ str(receipt['date']['month']+1).zfill(2)
+ +'/'+ str(receipt['date']['year']).zfill(4)
+ +' '+ str(receipt['date']['hour']).zfill(2)
+ +':'+ str(receipt['date']['minute']).zfill(2) )
+
+
+driver = EscposDriver()
+
+proxy.proxy_drivers['escpos'] = driver
+
+
+class EscposProxy(proxy.ProxyController):
+
+ @http.route('/hw_proxy/open_cashbox', type='json', auth='none', cors='*')
+ def open_cashbox(self):
+ _logger.info('ESC/POS: OPEN CASHBOX')
+ driver.push_task('cashbox')
+
+ @http.route('/hw_proxy/print_receipt', type='json', auth='none', cors='*')
+ def print_receipt(self, receipt):
+ _logger.info('ESC/POS: PRINT RECEIPT')
+ driver.push_task('receipt',receipt)
+
+ @http.route('/hw_proxy/print_xml_receipt', type='json', auth='none', cors='*')
+ def print_xml_receipt(self, receipt):
+ _logger.info('ESC/POS: PRINT XML RECEIPT')
+ driver.push_task('xml_receipt',receipt)