diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/hw_drivers/iot_handlers/drivers | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/hw_drivers/iot_handlers/drivers')
5 files changed, 1318 insertions, 0 deletions
diff --git a/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py b/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py new file mode 100644 index 00000000..0b79dd5b --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import jinja2 +import json +import logging +import netifaces as ni +import os +import subprocess +import threading +import time +import urllib3 + +from odoo import http +from odoo.addons.hw_drivers.connection_manager import connection_manager +from odoo.addons.hw_drivers.driver import Driver +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.main import iot_devices +from odoo.addons.hw_drivers.tools import helpers + +path = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../views')) +loader = jinja2.FileSystemLoader(path) + +jinja_env = jinja2.Environment(loader=loader, autoescape=True) +jinja_env.filters["json"] = json.dumps + +pos_display_template = jinja_env.get_template('pos_display.html') + +_logger = logging.getLogger(__name__) + + +class DisplayDriver(Driver): + connection_type = 'display' + + def __init__(self, identifier, device): + super(DisplayDriver, self).__init__(identifier, device) + self.device_type = 'display' + self.device_connection = 'hdmi' + self.device_name = device['name'] + self.event_data = threading.Event() + self.owner = False + self.rendered_html = '' + if self.device_identifier != 'distant_display': + self._x_screen = device.get('x_screen', '0') + self.load_url() + + @classmethod + def supported(cls, device): + return True # All devices with connection_type == 'display' are supported + + @classmethod + def get_default_display(cls): + displays = list(filter(lambda d: iot_devices[d].device_type == 'display', iot_devices)) + return len(displays) and iot_devices[displays[0]] + + def action(self, data): + if data.get('action') == "update_url" and self.device_identifier != 'distant_display': + self.update_url(data.get('url')) + elif data.get('action') == "display_refresh" and self.device_identifier != 'distant_display': + self.call_xdotools('F5') + elif data.get('action') == "take_control": + self.take_control(self.data['owner'], data.get('html')) + elif data.get('action') == "customer_facing_display": + self.update_customer_facing_display(self.data['owner'], data.get('html')) + elif data.get('action') == "get_owner": + self.data = { + 'value': '', + 'owner': self.owner, + } + event_manager.device_changed(self) + + def run(self): + while self.device_identifier != 'distant_display' and not self._stopped.isSet(): + time.sleep(60) + if self.url != 'http://localhost:8069/point_of_sale/display/' + self.device_identifier: + # Refresh the page every minute + self.call_xdotools('F5') + + def update_url(self, url=None): + os.environ['DISPLAY'] = ":0." + self._x_screen + os.environ['XAUTHORITY'] = '/run/lightdm/pi/xauthority' + firefox_env = os.environ.copy() + firefox_env['HOME'] = '/tmp/' + self._x_screen + self.url = url or 'http://localhost:8069/point_of_sale/display/' + self.device_identifier + new_window = subprocess.call(['xdotool', 'search', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox']) + subprocess.Popen(['firefox', self.url], env=firefox_env) + if new_window: + self.call_xdotools('F11') + + def load_url(self): + url = None + if helpers.get_odoo_server_url(): + # disable certifiacte verification + urllib3.disable_warnings() + http = urllib3.PoolManager(cert_reqs='CERT_NONE') + try: + response = http.request('GET', "%s/iot/box/%s/display_url" % (helpers.get_odoo_server_url(), helpers.get_mac_address())) + if response.status == 200: + data = json.loads(response.data.decode('utf8')) + url = data[self.device_identifier] + except json.decoder.JSONDecodeError: + url = response.data.decode('utf8') + except Exception: + pass + return self.update_url(url) + + def call_xdotools(self, keystroke): + os.environ['DISPLAY'] = ":0." + self._x_screen + os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority" + try: + subprocess.call(['xdotool', 'search', '--sync', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox', 'key', keystroke]) + return "xdotool succeeded in stroking " + keystroke + except: + return "xdotool threw an error, maybe it is not installed on the IoTBox" + + def update_customer_facing_display(self, origin, html=None): + if origin == self.owner: + self.rendered_html = html + self.event_data.set() + + def get_serialized_order(self): + # IMPLEMENTATION OF LONGPOLLING + # Times out 2 seconds before the JS request does + if self.event_data.wait(28): + self.event_data.clear() + return {'rendered_html': self.rendered_html} + return {'rendered_html': False} + + def take_control(self, new_owner, html=None): + # ALLOW A CASHIER TO TAKE CONTROL OVER THE POSBOX, IN CASE OF MULTIPLE CASHIER PER DISPLAY + self.owner = new_owner + self.rendered_html = html + self.data = { + 'value': '', + 'owner': self.owner, + } + event_manager.device_changed(self) + self.event_data.set() + +class DisplayController(http.Controller): + + @http.route('/hw_proxy/display_refresh', type='json', auth='none', cors='*') + def display_refresh(self): + display = DisplayDriver.get_default_display() + if display and display.device_identifier != 'distant_display': + return display.call_xdotools('F5') + + @http.route('/hw_proxy/customer_facing_display', type='json', auth='none', cors='*') + def customer_facing_display(self, html=None): + display = DisplayDriver.get_default_display() + if display: + display.update_customer_facing_display(http.request.httprequest.remote_addr, html) + return {'status': 'updated'} + return {'status': 'failed'} + + @http.route('/hw_proxy/take_control', type='json', auth='none', cors='*') + def take_control(self, html=None): + display = DisplayDriver.get_default_display() + if display: + display.take_control(http.request.httprequest.remote_addr, html) + return { + 'status': 'success', + 'message': 'You now have access to the display', + } + + @http.route('/hw_proxy/test_ownership', type='json', auth='none', cors='*') + def test_ownership(self): + display = DisplayDriver.get_default_display() + if display and display.owner == http.request.httprequest.remote_addr: + return {'status': 'OWNER'} + return {'status': 'NOWNER'} + + @http.route(['/point_of_sale/get_serialized_order', '/point_of_sale/get_serialized_order/<string:display_identifier>'], type='json', auth='none') + def get_serialized_order(self, display_identifier=None): + if display_identifier: + display = iot_devices.get(display_identifier) + else: + display = DisplayDriver.get_default_display() + + if display: + return display.get_serialized_order() + return { + 'rendered_html': False, + 'error': "No display found", + } + + @http.route(['/point_of_sale/display', '/point_of_sale/display/<string:display_identifier>'], type='http', auth='none') + def display(self, display_identifier=None): + cust_js = None + interfaces = ni.interfaces() + + with open(os.path.join(os.path.dirname(__file__), "../../static/src/js/worker.js")) as js: + cust_js = js.read() + + display_ifaces = [] + for iface_id in interfaces: + if 'wlan' in iface_id or 'eth' in iface_id: + iface_obj = ni.ifaddresses(iface_id) + ifconfigs = iface_obj.get(ni.AF_INET, []) + essid = helpers.get_ssid() + for conf in ifconfigs: + if conf.get('addr'): + display_ifaces.append({ + 'iface_id': iface_id, + 'essid': essid, + 'addr': conf.get('addr'), + 'icon': 'sitemap' if 'eth' in iface_id else 'wifi', + }) + + if not display_identifier: + display_identifier = DisplayDriver.get_default_display().device_identifier + + return pos_display_template.render({ + 'title': "Odoo -- Point of Sale", + 'breadcrumb': 'POS Client display', + 'cust_js': cust_js, + 'display_ifaces': display_ifaces, + 'display_identifier': display_identifier, + 'pairing_code': connection_manager.pairing_code, + }) diff --git a/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py new file mode 100644 index 00000000..29c7b12b --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py @@ -0,0 +1,367 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import ctypes +import evdev +import json +import logging +from lxml import etree +import os +from pathlib import Path +from queue import Queue, Empty +import subprocess +from threading import Lock +import time +import urllib3 +from usb import util + +from odoo import http, _ +from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers +from odoo.addons.hw_drivers.driver import Driver +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.main import iot_devices +from odoo.addons.hw_drivers.tools import helpers + +_logger = logging.getLogger(__name__) +xlib = ctypes.cdll.LoadLibrary('libX11.so.6') + + +class KeyboardUSBDriver(Driver): + connection_type = 'usb' + keyboard_layout_groups = [] + available_layouts = [] + + def __init__(self, identifier, device): + if not hasattr(KeyboardUSBDriver, 'display'): + os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority" + KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8")) + + super(KeyboardUSBDriver, self).__init__(identifier, device) + self.device_connection = 'direct' + self.device_name = self._set_name() + + # from https://github.com/xkbcommon/libxkbcommon/blob/master/test/evdev-scancodes.h + self._scancode_to_modifier = { + 42: 'left_shift', + 54: 'right_shift', + 58: 'caps_lock', + 69: 'num_lock', + 100: 'alt_gr', # right alt + } + self._tracked_modifiers = {modifier: False for modifier in self._scancode_to_modifier.values()} + + if not KeyboardUSBDriver.available_layouts: + KeyboardUSBDriver.load_layouts_list() + KeyboardUSBDriver.send_layouts_list() + + for evdev_device in [evdev.InputDevice(path) for path in evdev.list_devices()]: + if (device.idVendor == evdev_device.info.vendor) and (device.idProduct == evdev_device.info.product): + self.input_device = evdev_device + + self._set_device_type('scanner') if self._is_scanner() else self._set_device_type() + + @classmethod + def supported(cls, device): + for cfg in device: + for itf in cfg: + if itf.bInterfaceClass == 3 and itf.bInterfaceProtocol != 2: + device.interface_protocol = itf.bInterfaceProtocol + return True + return False + + @classmethod + def get_status(self): + """Allows `hw_proxy.Proxy` to retrieve the status of the scanners""" + status = 'connected' if any(iot_devices[d].device_type == "scanner" for d in iot_devices) else 'disconnected' + return {'status': status, 'messages': ''} + + @classmethod + def send_layouts_list(cls): + server = helpers.get_odoo_server_url() + if server: + urllib3.disable_warnings() + pm = urllib3.PoolManager(cert_reqs='CERT_NONE') + server = server + '/iot/keyboard_layouts' + try: + pm.request('POST', server, fields={'available_layouts': json.dumps(cls.available_layouts)}) + except Exception as e: + _logger.error('Could not reach configured server') + _logger.error('A error encountered : %s ' % e) + + @classmethod + def load_layouts_list(cls): + tree = etree.parse("/usr/share/X11/xkb/rules/base.xml", etree.XMLParser(ns_clean=True, recover=True)) + layouts = tree.xpath("//layout") + for layout in layouts: + layout_name = layout.xpath("./configItem/name")[0].text + layout_description = layout.xpath("./configItem/description")[0].text + KeyboardUSBDriver.available_layouts.append({ + 'name': layout_description, + 'layout': layout_name, + }) + for variant in layout.xpath("./variantList/variant"): + variant_name = variant.xpath("./configItem/name")[0].text + variant_description = variant.xpath("./configItem/description")[0].text + KeyboardUSBDriver.available_layouts.append({ + 'name': variant_description, + 'layout': layout_name, + 'variant': variant_name, + }) + + def _set_name(self): + try: + manufacturer = util.get_string(self.dev, self.dev.iManufacturer) + product = util.get_string(self.dev, self.dev.iProduct) + return ("%s - %s") % (manufacturer, product) + except ValueError as e: + _logger.warning(e) + return _('Unknown input device') + + def action(self, data): + if data.get('action', False) == 'update_layout': + layout = { + 'layout': data.get('layout'), + 'variant': data.get('variant'), + } + self._change_keyboard_layout(layout) + self.save_layout(layout) + elif data.get('action', False) == 'update_is_scanner': + is_scanner = {'is_scanner': data.get('is_scanner')} + self.save_is_scanner(is_scanner) + else: + self.data['value'] = '' + event_manager.device_changed(self) + + def run(self): + try: + for event in self.input_device.read_loop(): + if self._stopped.isSet(): + break + if event.type == evdev.ecodes.EV_KEY: + data = evdev.categorize(event) + + modifier_name = self._scancode_to_modifier.get(data.scancode) + if modifier_name: + if modifier_name in ('caps_lock', 'num_lock'): + if data.keystate == 1: + self._tracked_modifiers[modifier_name] = not self._tracked_modifiers[modifier_name] + else: + self._tracked_modifiers[modifier_name] = bool(data.keystate) # 1 for keydown, 0 for keyup + elif data.keystate == 1: + self.key_input(data.scancode) + + except Exception as err: + _logger.warning(err) + + def _change_keyboard_layout(self, new_layout): + """Change the layout of the current device to what is specified in + new_layout. + + Args: + new_layout (dict): A dict containing two keys: + - layout (str): The layout code + - variant (str): An optional key to represent the variant of the + selected layout + """ + if hasattr(self, 'keyboard_layout'): + KeyboardUSBDriver.keyboard_layout_groups.remove(self.keyboard_layout) + + if new_layout: + self.keyboard_layout = new_layout.get('layout') or 'us' + if new_layout.get('variant'): + self.keyboard_layout += "(%s)" % new_layout['variant'] + else: + self.keyboard_layout = 'us' + + KeyboardUSBDriver.keyboard_layout_groups.append(self.keyboard_layout) + subprocess.call(["setxkbmap", "-display", ":0.0", ",".join(KeyboardUSBDriver.keyboard_layout_groups)]) + + # Close then re-open display to refresh the mapping + xlib.XCloseDisplay(KeyboardUSBDriver.display) + KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8")) + + def save_layout(self, layout): + """Save the layout to a file on the box to read it when restarting it. + We need that in order to keep the selected layout after a reboot. + + Args: + new_layout (dict): A dict containing two keys: + - layout (str): The layout code + - variant (str): An optional key to represent the variant of the + selected layout + """ + file_path = Path.home() / 'odoo-keyboard-layouts.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + else: + data = {} + data[self.device_identifier] = layout + helpers.write_file('odoo-keyboard-layouts.conf', json.dumps(data)) + + def save_is_scanner(self, is_scanner): + """Save the type of device. + We need that in order to keep the selected type of device after a reboot. + """ + file_path = Path.home() / 'odoo-keyboard-is-scanner.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + else: + data = {} + data[self.device_identifier] = is_scanner + helpers.write_file('odoo-keyboard-is-scanner.conf', json.dumps(data)) + self._set_device_type('scanner') if is_scanner.get('is_scanner') else self._set_device_type() + + def load_layout(self): + """Read the layout from the saved filed and set it as current layout. + If no file or no layout is found we use 'us' by default. + """ + file_path = Path.home() / 'odoo-keyboard-layouts.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + layout = data.get(self.device_identifier, {'layout': 'us'}) + else: + layout = {'layout': 'us'} + self._change_keyboard_layout(layout) + + def _is_scanner(self): + """Read the device type from the saved filed and set it as current type. + If no file or no device type is found we try to detect it automatically. + """ + device_name = self.device_name.lower() + scanner_name = ['barcode', 'scanner', 'reader'] + is_scanner = any(x in device_name for x in scanner_name) or self.dev.interface_protocol == '0' + + file_path = Path.home() / 'odoo-keyboard-is-scanner.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + is_scanner = data.get(self.device_identifier, {}).get('is_scanner', is_scanner) + return is_scanner + + def _keyboard_input(self, scancode): + """Deal with a keyboard input. Send the character corresponding to the + pressed key represented by its scancode to the connected Odoo instance. + + Args: + scancode (int): The scancode of the pressed key. + """ + self.data['value'] = self._scancode_to_char(scancode) + if self.data['value']: + event_manager.device_changed(self) + + def _barcode_scanner_input(self, scancode): + """Deal with a barcode scanner input. Add the new character scanned to + the current barcode or complete the barcode if "Return" is pressed. + When a barcode is completed, two tasks are performed: + - Send a device_changed update to the event manager to notify the + listeners that the value has changed (used in Enterprise). + - Add the barcode to the list barcodes that are being queried in + Community. + + Args: + scancode (int): The scancode of the pressed key. + """ + if scancode == 28: # Return + self.data['value'] = self._current_barcode + event_manager.device_changed(self) + self._barcodes.put((time.time(), self._current_barcode)) + self._current_barcode = '' + else: + self._current_barcode += self._scancode_to_char(scancode) + + def _set_device_type(self, device_type='keyboard'): + """Modify the device type between 'keyboard' and 'scanner' + + Args: + type (string): Type wanted to switch + """ + if device_type == 'scanner': + self.device_type = 'scanner' + self.key_input = self._barcode_scanner_input + self._barcodes = Queue() + self._current_barcode = '' + self.input_device.grab() + self.read_barcode_lock = Lock() + else: + self.device_type = 'keyboard' + self.key_input = self._keyboard_input + self.load_layout() + + def _scancode_to_char(self, scancode): + """Translate a received scancode to a character depending on the + selected keyboard layout and the current state of the keyboard's + modifiers. + + Args: + scancode (int): The scancode of the pressed key, to be translated to + a character + + Returns: + str: The translated scancode. + """ + # Scancode -> Keysym : Depends on the keyboard layout + group = KeyboardUSBDriver.keyboard_layout_groups.index(self.keyboard_layout) + modifiers = self._get_active_modifiers(scancode) + keysym = ctypes.c_int(xlib.XkbKeycodeToKeysym(KeyboardUSBDriver.display, scancode + 8, group, modifiers)) + + # Translate Keysym to a character + key_pressed = ctypes.create_string_buffer(5) + xlib.XkbTranslateKeySym(KeyboardUSBDriver.display, ctypes.byref(keysym), 0, ctypes.byref(key_pressed), 5, ctypes.byref(ctypes.c_int())) + if key_pressed.value: + return key_pressed.value.decode('utf-8') + return '' + + def _get_active_modifiers(self, scancode): + """Get the state of currently active modifiers. + + Args: + scancode (int): The scancode of the key being translated + + Returns: + int: The current state of the modifiers: + 0 -- Lowercase + 1 -- Highercase or (NumLock + key pressed on keypad) + 2 -- AltGr + 3 -- Highercase + AltGr + """ + modifiers = 0 + uppercase = (self._tracked_modifiers['right_shift'] or self._tracked_modifiers['left_shift']) ^ self._tracked_modifiers['caps_lock'] + if uppercase or (scancode in [71, 72, 73, 75, 76, 77, 79, 80, 81, 82, 83] and self._tracked_modifiers['num_lock']): + modifiers += 1 + + if self._tracked_modifiers['alt_gr']: + modifiers += 2 + + return modifiers + + def read_next_barcode(self): + """Get the value of the last barcode that was scanned but not sent yet + and not older than 5 seconds. This function is used in Community, when + we don't have access to the IoTLongpolling. + + Returns: + str: The next barcode to be read or an empty string. + """ + + # Previous query still running, stop it by sending a fake barcode + if self.read_barcode_lock.locked(): + self._barcodes.put((time.time(), "")) + + with self.read_barcode_lock: + try: + timestamp, barcode = self._barcodes.get(True, 55) + if timestamp > time.time() - 5: + return barcode + except Empty: + return '' + +proxy_drivers['scanner'] = KeyboardUSBDriver + + +class KeyboardUSBController(http.Controller): + @http.route('/hw_proxy/scanner', type='json', auth='none', cors='*') + def get_barcode(self): + scanners = [iot_devices[d] for d in iot_devices if iot_devices[d].device_type == "scanner"] + if scanners: + return scanners[0].read_next_barcode() + time.sleep(5) + return None diff --git a/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py b/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py new file mode 100644 index 00000000..bbd79f1c --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from base64 import b64decode +from cups import IPPError, IPP_PRINTER_IDLE, IPP_PRINTER_PROCESSING, IPP_PRINTER_STOPPED +import dbus +import io +import logging +import netifaces as ni +import os +from PIL import Image, ImageOps +import re +import subprocess +import tempfile +from uuid import getnode as get_mac + +from odoo import http +from odoo.addons.hw_drivers.connection_manager import connection_manager +from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers +from odoo.addons.hw_drivers.driver import Driver +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.iot_handlers.interfaces.PrinterInterface import PPDs, conn, cups_lock +from odoo.addons.hw_drivers.main import iot_devices +from odoo.addons.hw_drivers.tools import helpers + +_logger = logging.getLogger(__name__) + +RECEIPT_PRINTER_COMMANDS = { + 'star': { + 'center': b'\x1b\x1d\x61\x01', # ESC GS a n + 'cut': b'\x1b\x64\x02', # ESC d n + 'title': b'\x1b\x69\x01\x01%s\x1b\x69\x00\x00', # ESC i n1 n2 + 'drawers': [b'\x07', b'\x1a'] # BEL & SUB + }, + 'escpos': { + 'center': b'\x1b\x61\x01', # ESC a n + 'cut': b'\x1d\x56\x41\n', # GS V m + 'title': b'\x1b\x21\x30%s\x1b\x21\x00', # ESC ! n + 'drawers': [b'\x1b\x3d\x01', b'\x1b\x70\x00\x19\x19', b'\x1b\x70\x01\x19\x19'] # ESC = n then ESC p m t1 t2 + } +} + +def cups_notification_handler(message, uri, device_identifier, state, reason, accepting_jobs): + if device_identifier in iot_devices: + reason = reason if reason != 'none' else None + state_value = { + IPP_PRINTER_IDLE: 'connected', + IPP_PRINTER_PROCESSING: 'processing', + IPP_PRINTER_STOPPED: 'stopped' + } + iot_devices[device_identifier].update_status(state_value[state], message, reason) + +# Create a Cups subscription if it doesn't exist yet +try: + conn.getSubscriptions('/printers/') +except IPPError: + conn.createSubscription( + uri='/printers/', + recipient_uri='dbus://', + events=['printer-state-changed'] + ) + +# Listen for notifications from Cups +bus = dbus.SystemBus() +bus.add_signal_receiver(cups_notification_handler, signal_name="PrinterStateChanged", dbus_interface="org.cups.cupsd.Notifier") + + +class PrinterDriver(Driver): + connection_type = 'printer' + + def __init__(self, identifier, device): + super(PrinterDriver, self).__init__(identifier, device) + self.device_type = 'printer' + self.device_connection = device['device-class'].lower() + self.device_name = device['device-make-and-model'] + self.state = { + 'status': 'connecting', + 'message': 'Connecting to printer', + 'reason': None, + } + self.send_status() + + self.receipt_protocol = 'star' if 'STR_T' in device['device-id'] else 'escpos' + if 'direct' in self.device_connection and any(cmd in device['device-id'] for cmd in ['CMD:STAR;', 'CMD:ESC/POS;']): + self.print_status() + + @classmethod + def supported(cls, device): + if device.get('supported', False): + return True + protocol = ['dnssd', 'lpd', 'socket'] + if any(x in device['url'] for x in protocol) and device['device-make-and-model'] != 'Unknown' or 'direct' in device['device-class']: + model = cls.get_device_model(device) + ppdFile = '' + for ppd in PPDs: + if model and model in PPDs[ppd]['ppd-product']: + ppdFile = ppd + break + with cups_lock: + if ppdFile: + conn.addPrinter(name=device['identifier'], ppdname=ppdFile, device=device['url']) + else: + conn.addPrinter(name=device['identifier'], device=device['url']) + conn.setPrinterInfo(device['identifier'], device['device-make-and-model']) + conn.enablePrinter(device['identifier']) + conn.acceptJobs(device['identifier']) + conn.setPrinterUsersAllowed(device['identifier'], ['all']) + conn.addPrinterOptionDefault(device['identifier'], "usb-no-reattach", "true") + conn.addPrinterOptionDefault(device['identifier'], "usb-unidir", "true") + return True + return False + + @classmethod + def get_device_model(cls, device): + device_model = "" + if device.get('device-id'): + for device_id in [device_lo for device_lo in device['device-id'].split(';')]: + if any(x in device_id for x in ['MDL', 'MODEL']): + device_model = device_id.split(':')[1] + break + elif device.get('device-make-and-model'): + device_model = device['device-make-and-model'] + return re.sub("[\(].*?[\)]", "", device_model).strip() + + @classmethod + def get_status(cls): + status = 'connected' if any(iot_devices[d].device_type == "printer" and iot_devices[d].device_connection == 'direct' for d in iot_devices) else 'disconnected' + return {'status': status, 'messages': ''} + + def action(self, data): + if data.get('action') == 'cashbox': + self.open_cashbox() + elif data.get('action') == 'print_receipt': + self.print_receipt(b64decode(data['receipt'])) + else: + self.print_raw(b64decode(data['document'])) + + def disconnect(self): + self.update_status('disconnected', 'Printer was disconnected') + super(PrinterDriver, self).disconnect() + + def update_status(self, status, message, reason=None): + """Updates the state of the current printer. + + Args: + status (str): The new value of the status + message (str): A comprehensive message describing the status + reason (str): The reason fo the current status + """ + if self.state['status'] != status or self.state['reason'] != reason: + self.state = { + 'status': status, + 'message': message, + 'reason': reason, + } + self.send_status() + + def send_status(self): + """ Sends the current status of the printer to the connected Odoo instance. + """ + self.data = { + 'value': '', + 'state': self.state, + } + event_manager.device_changed(self) + + def print_raw(self, data): + process = subprocess.Popen(["lp", "-d", self.device_identifier], stdin=subprocess.PIPE) + process.communicate(data) + + def print_receipt(self, receipt): + im = Image.open(io.BytesIO(receipt)) + + # Convert to greyscale then to black and white + im = im.convert("L") + im = ImageOps.invert(im) + im = im.convert("1") + + print_command = getattr(self, 'format_%s' % self.receipt_protocol)(im) + self.print_raw(print_command) + + def format_star(self, im): + width = int((im.width + 7) / 8) + + raster_init = b'\x1b\x2a\x72\x41' + raster_page_length = b'\x1b\x2a\x72\x50\x30\x00' + raster_send = b'\x62' + raster_close = b'\x1b\x2a\x72\x42' + + raster_data = b'' + dots = im.tobytes() + while len(dots): + raster_data += raster_send + width.to_bytes(2, 'little') + dots[:width] + dots = dots[width:] + + return raster_init + raster_page_length + raster_data + raster_close + + def format_escpos(self, im): + width = int((im.width + 7) / 8) + + raster_send = b'\x1d\x76\x30\x00' + max_slice_height = 255 + + raster_data = b'' + dots = im.tobytes() + while len(dots): + im_slice = dots[:width*max_slice_height] + slice_height = int(len(im_slice) / width) + raster_data += raster_send + width.to_bytes(2, 'little') + slice_height.to_bytes(2, 'little') + im_slice + dots = dots[width*max_slice_height:] + + return raster_data + RECEIPT_PRINTER_COMMANDS['escpos']['cut'] + + def print_status(self): + """Prints the status ticket of the IoTBox on the current printer.""" + wlan = '' + ip = '' + mac = '' + homepage = '' + pairing_code = '' + + ssid = helpers.get_ssid() + wlan = '\nWireless network:\n%s\n\n' % ssid + + interfaces = ni.interfaces() + ips = [] + for iface_id in interfaces: + iface_obj = ni.ifaddresses(iface_id) + ifconfigs = iface_obj.get(ni.AF_INET, []) + for conf in ifconfigs: + if conf.get('addr') and conf.get('addr'): + ips.append(conf.get('addr')) + if len(ips) == 0: + ip = '\nERROR: Could not connect to LAN\n\nPlease check that the IoTBox is correc-\ntly connected with a network cable,\n that the LAN is setup with DHCP, and\nthat network addresses are available' + elif len(ips) == 1: + ip = '\nIP Address:\n%s\n' % ips[0] + else: + ip = '\nIP Addresses:\n%s\n' % '\n'.join(ips) + + if len(ips) >= 1: + ips_filtered = [i for i in ips if i != '127.0.0.1'] + main_ips = ips_filtered and ips_filtered[0] or '127.0.0.1' + mac = '\nMAC Address:\n%s\n' % helpers.get_mac_address() + homepage = '\nHomepage:\nhttp://%s:8069\n\n' % main_ips + + code = connection_manager.pairing_code + if code: + pairing_code = '\nPairing Code:\n%s\n' % code + + commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol] + title = commands['title'] % b'IoTBox Status' + self.print_raw(commands['center'] + title + b'\n' + wlan.encode() + mac.encode() + ip.encode() + homepage.encode() + pairing_code.encode() + commands['cut']) + + def open_cashbox(self): + """Sends a signal to the current printer to open the connected cashbox.""" + commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol] + for drawer in commands['drawers']: + self.print_raw(drawer) + + +class PrinterController(http.Controller): + + @http.route('/hw_proxy/default_printer_action', type='json', auth='none', cors='*') + def default_printer_action(self, data): + printer = next((d for d in iot_devices if iot_devices[d].device_type == 'printer' and iot_devices[d].device_connection == 'direct'), None) + if printer: + iot_devices[printer].action(data) + return True + return False + +proxy_drivers['printer'] = PrinterDriver diff --git a/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py b/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py new file mode 100644 index 00000000..412d773e --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from collections import namedtuple +from contextlib import contextmanager +import logging +import serial +from threading import Lock +import time +import traceback + +from odoo import _ +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.driver import Driver + +_logger = logging.getLogger(__name__) + +SerialProtocol = namedtuple( + 'SerialProtocol', + "name baudrate bytesize stopbits parity timeout writeTimeout measureRegexp statusRegexp " + "commandTerminator commandDelay measureDelay newMeasureDelay " + "measureCommand emptyAnswerValid") + + +@contextmanager +def serial_connection(path, protocol, is_probing=False): + """Opens a serial connection to a device and closes it automatically after use. + + :param path: path to the device + :type path: string + :param protocol: an object containing the serial protocol to connect to a device + :type protocol: namedtuple + :param is_probing: a flag thet if set to `True` makes the timeouts longer, defaults to False + :type is_probing: bool, optional + """ + + PROBING_TIMEOUT = 1 + port_config = { + 'baudrate': protocol.baudrate, + 'bytesize': protocol.bytesize, + 'stopbits': protocol.stopbits, + 'parity': protocol.parity, + 'timeout': PROBING_TIMEOUT if is_probing else protocol.timeout, # longer timeouts for probing + 'writeTimeout': PROBING_TIMEOUT if is_probing else protocol.writeTimeout # longer timeouts for probing + } + connection = serial.Serial(path, **port_config) + yield connection + connection.close() + + +class SerialDriver(Driver): + """Abstract base class for serial drivers.""" + + _protocol = None + connection_type = 'serial' + + STATUS_CONNECTED = 'connected' + STATUS_ERROR = 'error' + STATUS_CONNECTING = 'connecting' + + def __init__(self, identifier, device): + """ Attributes initialization method for `SerialDriver`. + + :param device: path to the device + :type device: str + """ + + super(SerialDriver, self).__init__(identifier, device) + self._actions = { + 'get_status': self._push_status, + } + self.device_connection = 'serial' + self._device_lock = Lock() + self._status = {'status': self.STATUS_CONNECTING, 'message_title': '', 'message_body': ''} + self._set_name() + + def _get_raw_response(connection): + pass + + def _push_status(self): + """Updates the current status and pushes it to the frontend.""" + + self.data['status'] = self._status + event_manager.device_changed(self) + + def _set_name(self): + """Tries to build the device's name based on its type and protocol name but falls back on a default name if that doesn't work.""" + + try: + name = ('%s serial %s' % (self._protocol.name, self.device_type)).title() + except Exception: + name = 'Unknown Serial Device' + self.device_name = name + + def _take_measure(self): + pass + + def _do_action(self, data): + """Helper function that calls a specific action method on the device. + + :param data: the `_actions` key mapped to the action method we want to call + :type data: string + """ + + try: + with self._device_lock: + self._actions[data['action']](data) + time.sleep(self._protocol.commandDelay) + except Exception: + msg = _('An error occured while performing action %s on %s') % (data, self.device_name) + _logger.exception(msg) + self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()} + self._push_status() + + def action(self, data): + """Establish a connection with the device if needed and have it perform a specific action. + + :param data: the `_actions` key mapped to the action method we want to call + :type data: string + """ + + if self._connection and self._connection.isOpen(): + self._do_action(data) + else: + with serial_connection(self.device_identifier, self._protocol) as connection: + self._connection = connection + self._do_action(data) + + def run(self): + """Continuously gets new measures from the device.""" + + try: + with serial_connection(self.device_identifier, self._protocol) as connection: + self._connection = connection + self._status['status'] = self.STATUS_CONNECTED + self._push_status() + while not self._stopped.isSet(): + self._take_measure() + time.sleep(self._protocol.newMeasureDelay) + except Exception: + msg = _('Error while reading %s', self.device_name) + _logger.exception(msg) + self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()} + self._push_status() diff --git a/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py b/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py new file mode 100644 index 00000000..cc1d5469 --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py @@ -0,0 +1,316 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from collections import namedtuple +import logging +import re +import serial +import threading +import time + +from odoo import http +from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.iot_handlers.drivers.SerialBaseDriver import SerialDriver, SerialProtocol, serial_connection + + +_logger = logging.getLogger(__name__) + +# Only needed to ensure compatibility with older versions of Odoo +ACTIVE_SCALE = None +new_weight_event = threading.Event() + +ScaleProtocol = namedtuple('ScaleProtocol', SerialProtocol._fields + ('zeroCommand', 'tareCommand', 'clearCommand', 'autoResetWeight')) + +# 8217 Mettler-Toledo (Weight-only) Protocol, as described in the scale's Service Manual. +# e.g. here: https://www.manualslib.com/manual/861274/Mettler-Toledo-Viva.html?page=51#manual +# Our recommended scale, the Mettler-Toledo "Ariva-S", supports this protocol on +# both the USB and RS232 ports, it can be configured in the setup menu as protocol option 3. +# We use the default serial protocol settings, the scale's settings can be configured in the +# scale's menu anyway. +Toledo8217Protocol = ScaleProtocol( + name='Toledo 8217', + baudrate=9600, + bytesize=serial.SEVENBITS, + stopbits=serial.STOPBITS_ONE, + parity=serial.PARITY_EVEN, + timeout=1, + writeTimeout=1, + measureRegexp=b"\x02\\s*([0-9.]+)N?\\r", + statusRegexp=b"\x02\\s*(\\?.)\\r", + commandDelay=0.2, + measureDelay=0.5, + newMeasureDelay=0.2, + commandTerminator=b'', + measureCommand=b'W', + zeroCommand=b'Z', + tareCommand=b'T', + clearCommand=b'C', + emptyAnswerValid=False, + autoResetWeight=False, +) + +# The ADAM scales have their own RS232 protocol, usually documented in the scale's manual +# e.g at https://www.adamequipment.com/media/docs/Print%20Publications/Manuals/PDF/AZEXTRA/AZEXTRA-UM.pdf +# https://www.manualslib.com/manual/879782/Adam-Equipment-Cbd-4.html?page=32#manual +# Only the baudrate and label format seem to be configurable in the AZExtra series. +ADAMEquipmentProtocol = ScaleProtocol( + name='Adam Equipment', + baudrate=4800, + bytesize=serial.EIGHTBITS, + stopbits=serial.STOPBITS_ONE, + parity=serial.PARITY_NONE, + timeout=0.2, + writeTimeout=0.2, + measureRegexp=b"\s*([0-9.]+)kg", # LABEL format 3 + KG in the scale settings, but Label 1/2 should work + statusRegexp=None, + commandTerminator=b"\r\n", + commandDelay=0.2, + measureDelay=0.5, + # AZExtra beeps every time you ask for a weight that was previously returned! + # Adding an extra delay gives the operator a chance to remove the products + # before the scale starts beeping. Could not find a way to disable the beeps. + newMeasureDelay=5, + measureCommand=b'P', + zeroCommand=b'Z', + tareCommand=b'T', + clearCommand=None, # No clear command -> Tare again + emptyAnswerValid=True, # AZExtra does not answer unless a new non-zero weight has been detected + autoResetWeight=True, # AZExtra will not return 0 after removing products +) + + +# Ensures compatibility with older versions of Odoo +class ScaleReadOldRoute(http.Controller): + @http.route('/hw_proxy/scale_read', type='json', auth='none', cors='*') + def scale_read(self): + if ACTIVE_SCALE: + return {'weight': ACTIVE_SCALE._scale_read_old_route()} + return None + + +class ScaleDriver(SerialDriver): + """Abstract base class for scale drivers.""" + last_sent_value = None + + def __init__(self, identifier, device): + super(ScaleDriver, self).__init__(identifier, device) + self.device_type = 'scale' + self._set_actions() + self._is_reading = True + + # Ensures compatibility with older versions of Odoo + # Only the last scale connected is kept + global ACTIVE_SCALE + ACTIVE_SCALE = self + proxy_drivers['scale'] = ACTIVE_SCALE + + # Ensures compatibility with older versions of Odoo + # and allows using the `ProxyDevice` in the point of sale to retrieve the status + def get_status(self): + """Allows `hw_proxy.Proxy` to retrieve the status of the scales""" + + status = self._status + return {'status': status['status'], 'messages': [status['message_title'], ]} + + def _set_actions(self): + """Initializes `self._actions`, a map of action keys sent by the frontend to backend action methods.""" + + self._actions.update({ + 'read_once': self._read_once_action, + 'set_zero': self._set_zero_action, + 'set_tare': self._set_tare_action, + 'clear_tare': self._clear_tare_action, + 'start_reading': self._start_reading_action, + 'stop_reading': self._stop_reading_action, + }) + + def _start_reading_action(self, data): + """Starts asking for the scale value.""" + self._is_reading = True + + def _stop_reading_action(self, data): + """Stops asking for the scale value.""" + self._is_reading = False + + def _clear_tare_action(self, data): + """Clears the scale current tare weight.""" + + # if the protocol has no clear tare command, we can just tare again + clearCommand = self._protocol.clearCommand or self._protocol.tareCommand + self._connection.write(clearCommand + self._protocol.commandTerminator) + + def _read_once_action(self, data): + """Reads the scale current weight value and pushes it to the frontend.""" + + self._read_weight() + self.last_sent_value = self.data['value'] + event_manager.device_changed(self) + + def _set_zero_action(self, data): + """Makes the weight currently applied to the scale the new zero.""" + + self._connection.write(self._protocol.zeroCommand + self._protocol.commandTerminator) + + def _set_tare_action(self, data): + """Sets the scale's current weight value as tare weight.""" + + self._connection.write(self._protocol.tareCommand + self._protocol.commandTerminator) + + @staticmethod + def _get_raw_response(connection): + """Gets raw bytes containing the updated value of the device. + + :param connection: a connection to the device's serial port + :type connection: pyserial.Serial + :return: the raw response to a weight request + :rtype: str + """ + + answer = [] + while True: + char = connection.read(1) + if not char: + break + else: + answer.append(bytes(char)) + return b''.join(answer) + + def _read_weight(self): + """Asks for a new weight from the scale, checks if it is valid and, if it is, makes it the current value.""" + + protocol = self._protocol + self._connection.write(protocol.measureCommand + protocol.commandTerminator) + answer = self._get_raw_response(self._connection) + match = re.search(self._protocol.measureRegexp, answer) + if match: + self.data = { + 'value': float(match.group(1)), + 'status': self._status + } + + # Ensures compatibility with older versions of Odoo + def _scale_read_old_route(self): + """Used when the iot app is not installed""" + with self._device_lock: + self._read_weight() + return self.data['value'] + + def _take_measure(self): + """Reads the device's weight value, and pushes that value to the frontend.""" + + with self._device_lock: + self._read_weight() + if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR: + self.last_sent_value = self.data['value'] + event_manager.device_changed(self) + + +class Toledo8217Driver(ScaleDriver): + """Driver for the Toldedo 8217 serial scale.""" + _protocol = Toledo8217Protocol + + def __init__(self, identifier, device): + super(Toledo8217Driver, self).__init__(identifier, device) + self.device_manufacturer = 'Toledo' + + @classmethod + def supported(cls, device): + """Checks whether the device, which port info is passed as argument, is supported by the driver. + + :param device: path to the device + :type device: str + :return: whether the device is supported by the driver + :rtype: bool + """ + + protocol = cls._protocol + + try: + with serial_connection(device['identifier'], protocol, is_probing=True) as connection: + connection.write(b'Ehello' + protocol.commandTerminator) + time.sleep(protocol.commandDelay) + answer = connection.read(8) + if answer == b'\x02E\rhello': + connection.write(b'F' + protocol.commandTerminator) + return True + except serial.serialutil.SerialTimeoutException: + pass + except Exception: + _logger.exception('Error while probing %s with protocol %s' % (device, protocol.name)) + return False + + +class AdamEquipmentDriver(ScaleDriver): + """Driver for the Adam Equipment serial scale.""" + + _protocol = ADAMEquipmentProtocol + priority = 0 # Test the supported method of this driver last, after all other serial drivers + + def __init__(self, identifier, device): + super(AdamEquipmentDriver, self).__init__(identifier, device) + self._is_reading = False + self._last_weight_time = 0 + self.device_manufacturer = 'Adam' + + def _check_last_weight_time(self): + """The ADAM doesn't make the difference between a value of 0 and "the same value as last time": + in both cases it returns an empty string. + With this, unless the weight changes, we give the user `TIME_WEIGHT_KEPT` seconds to log the new weight, + then change it back to zero to avoid keeping it indefinetely, which could cause issues. + In any case the ADAM must always go back to zero before it can weight again. + """ + + TIME_WEIGHT_KEPT = 10 + + if self.data['value'] is None: + if time.time() - self._last_weight_time > TIME_WEIGHT_KEPT: + self.data['value'] = 0 + else: + self._last_weight_time = time.time() + + def _take_measure(self): + """Reads the device's weight value, and pushes that value to the frontend.""" + + if self._is_reading: + with self._device_lock: + self._read_weight() + self._check_last_weight_time() + if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR: + self.last_sent_value = self.data['value'] + event_manager.device_changed(self) + else: + time.sleep(0.5) + + # Ensures compatibility with older versions of Odoo + def _scale_read_old_route(self): + """Used when the iot app is not installed""" + + time.sleep(3) + with self._device_lock: + self._read_weight() + self._check_last_weight_time() + return self.data['value'] + + @classmethod + def supported(cls, device): + """Checks whether the device at `device` is supported by the driver. + + :param device: path to the device + :type device: str + :return: whether the device is supported by the driver + :rtype: bool + """ + + protocol = cls._protocol + + try: + with serial_connection(device['identifier'], protocol, is_probing=True) as connection: + connection.write(protocol.measureCommand + protocol.commandTerminator) + # Checking whether writing to the serial port using the Adam protocol raises a timeout exception is about the only thing we can do. + return True + except serial.serialutil.SerialTimeoutException: + pass + except Exception: + _logger.exception('Error while probing %s with protocol %s' % (device, protocol.name)) + return False |
