summaryrefslogtreecommitdiff
path: root/addons/hw_escpos/escpos/printer.py
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/hw_escpos/escpos/printer.py
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/hw_escpos/escpos/printer.py')
-rw-r--r--addons/hw_escpos/escpos/printer.py228
1 files changed, 228 insertions, 0 deletions
diff --git a/addons/hw_escpos/escpos/printer.py b/addons/hw_escpos/escpos/printer.py
new file mode 100644
index 00000000..4e27f6ba
--- /dev/null
+++ b/addons/hw_escpos/escpos/printer.py
@@ -0,0 +1,228 @@
+#!/usr/bin/python
+
+from __future__ import print_function
+import serial
+import socket
+import usb.core
+import usb.util
+
+from .escpos import *
+from .constants import *
+from .exceptions import *
+from time import sleep
+
+class Usb(Escpos):
+ """ Define USB printer """
+
+ def __init__(self, idVendor, idProduct, interface=0, in_ep=None, out_ep=None):
+ """
+ @param idVendor : Vendor ID
+ @param idProduct : Product ID
+ @param interface : USB device interface
+ @param in_ep : Input end point
+ @param out_ep : Output end point
+ """
+
+ self.errorText = "ERROR PRINTER\n\n\n\n\n\n"+PAPER_FULL_CUT
+
+ self.idVendor = idVendor
+ self.idProduct = idProduct
+ self.interface = interface
+ self.in_ep = in_ep
+ self.out_ep = out_ep
+
+ # pyusb dropped the 'interface' parameter from usb.Device.write() at 1.0.0b2
+ # https://github.com/pyusb/pyusb/commit/20cd8c1f79b24082ec999c022b56c3febedc0964#diff-b5a4f98a864952f0f55d569dd14695b7L293
+ if usb.version_info < (1, 0, 0) or (usb.version_info == (1, 0, 0) and usb.version_info[3] in ("a1", "a2", "a3", "b1")):
+ self.write_kwargs = dict(interface=self.interface)
+ else:
+ self.write_kwargs = {}
+
+ self.open()
+
+ def open(self):
+ """ Search device on USB tree and set is as escpos device """
+
+ self.device = usb.core.find(idVendor=self.idVendor, idProduct=self.idProduct)
+ if self.device is None:
+ raise NoDeviceError()
+ try:
+ if self.device.is_kernel_driver_active(self.interface):
+ self.device.detach_kernel_driver(self.interface)
+ self.device.set_configuration()
+ usb.util.claim_interface(self.device, self.interface)
+
+ cfg = self.device.get_active_configuration()
+ intf = cfg[(0,0)] # first interface
+ if self.in_ep is None:
+ # Attempt to detect IN/OUT endpoint addresses
+ try:
+ is_IN = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
+ is_OUT = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
+ endpoint_in = usb.util.find_descriptor(intf, custom_match=is_IN)
+ endpoint_out = usb.util.find_descriptor(intf, custom_match=is_OUT)
+ self.in_ep = endpoint_in.bEndpointAddress
+ self.out_ep = endpoint_out.bEndpointAddress
+ except usb.core.USBError:
+ # default values for officially supported printers
+ self.in_ep = 0x82
+ self.out_ep = 0x01
+
+ except usb.core.USBError as e:
+ raise HandleDeviceError(e)
+
+ def close(self):
+ i = 0
+ while True:
+ try:
+ if not self.device.is_kernel_driver_active(self.interface):
+ usb.util.release_interface(self.device, self.interface)
+ self.device.attach_kernel_driver(self.interface)
+ usb.util.dispose_resources(self.device)
+ else:
+ self.device = None
+ return True
+ except usb.core.USBError as e:
+ i += 1
+ if i > 10:
+ return False
+
+ sleep(0.1)
+
+ def _raw(self, msg):
+ """ Print any command sent in raw format """
+ if len(msg) != self.device.write(self.out_ep, msg, timeout=5000, **self.write_kwargs):
+ self.device.write(self.out_ep, self.errorText, **self.write_kwargs)
+ raise TicketNotPrinted()
+
+ def __extract_status(self):
+ maxiterate = 0
+ rep = None
+ while rep == None:
+ maxiterate += 1
+ if maxiterate > 10000:
+ raise NoStatusError()
+ r = self.device.read(self.in_ep, 20, self.interface).tolist()
+ while len(r):
+ rep = r.pop()
+ return rep
+
+ def get_printer_status(self):
+ status = {
+ 'printer': {},
+ 'offline': {},
+ 'error' : {},
+ 'paper' : {},
+ }
+
+ self.device.write(self.out_ep, DLE_EOT_PRINTER, **self.write_kwargs)
+ printer = self.__extract_status()
+ self.device.write(self.out_ep, DLE_EOT_OFFLINE, **self.write_kwargs)
+ offline = self.__extract_status()
+ self.device.write(self.out_ep, DLE_EOT_ERROR, **self.write_kwargs)
+ error = self.__extract_status()
+ self.device.write(self.out_ep, DLE_EOT_PAPER, **self.write_kwargs)
+ paper = self.__extract_status()
+
+ status['printer']['status_code'] = printer
+ status['printer']['status_error'] = not ((printer & 147) == 18)
+ status['printer']['online'] = not bool(printer & 8)
+ status['printer']['recovery'] = bool(printer & 32)
+ status['printer']['paper_feed_on'] = bool(printer & 64)
+ status['printer']['drawer_pin_high'] = bool(printer & 4)
+ status['offline']['status_code'] = offline
+ status['offline']['status_error'] = not ((offline & 147) == 18)
+ status['offline']['cover_open'] = bool(offline & 4)
+ status['offline']['paper_feed_on'] = bool(offline & 8)
+ status['offline']['paper'] = not bool(offline & 32)
+ status['offline']['error'] = bool(offline & 64)
+ status['error']['status_code'] = error
+ status['error']['status_error'] = not ((error & 147) == 18)
+ status['error']['recoverable'] = bool(error & 4)
+ status['error']['autocutter'] = bool(error & 8)
+ status['error']['unrecoverable'] = bool(error & 32)
+ status['error']['auto_recoverable'] = not bool(error & 64)
+ status['paper']['status_code'] = paper
+ status['paper']['status_error'] = not ((paper & 147) == 18)
+ status['paper']['near_end'] = bool(paper & 12)
+ status['paper']['present'] = not bool(paper & 96)
+
+ return status
+
+ def __del__(self):
+ """ Release USB interface """
+ if self.device:
+ self.close()
+ self.device = None
+
+
+
+class Serial(Escpos):
+ """ Define Serial printer """
+
+ def __init__(self, devfile="/dev/ttyS0", baudrate=9600, bytesize=8, timeout=1):
+ """
+ @param devfile : Device file under dev filesystem
+ @param baudrate : Baud rate for serial transmission
+ @param bytesize : Serial buffer size
+ @param timeout : Read/Write timeout
+ """
+ self.devfile = devfile
+ self.baudrate = baudrate
+ self.bytesize = bytesize
+ self.timeout = timeout
+ self.open()
+
+
+ def open(self):
+ """ Setup serial port and set is as escpos device """
+ self.device = serial.Serial(port=self.devfile, baudrate=self.baudrate, bytesize=self.bytesize, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout, dsrdtr=True)
+
+ if self.device is not None:
+ print("Serial printer enabled")
+ else:
+ print("Unable to open serial printer on: %s" % self.devfile)
+
+
+ def _raw(self, msg):
+ """ Print any command sent in raw format """
+ self.device.write(msg)
+
+
+ def __del__(self):
+ """ Close Serial interface """
+ if self.device is not None:
+ self.device.close()
+
+
+
+class Network(Escpos):
+ """ Define Network printer """
+
+ def __init__(self,host,port=9100):
+ """
+ @param host : Printer's hostname or IP address
+ @param port : Port to write to
+ """
+ self.host = host
+ self.port = port
+ self.open()
+
+
+ def open(self):
+ """ Open TCP socket and set it as escpos device """
+ self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.device.connect((self.host, self.port))
+
+ if self.device is None:
+ print("Could not open socket for %s" % self.host)
+
+
+ def _raw(self, msg):
+ self.device.send(msg)
+
+
+ def __del__(self):
+ """ Close TCP connection """
+ self.device.close()
+