summaryrefslogtreecommitdiff
path: root/addons/hw_drivers/iot_handlers/drivers
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/hw_drivers/iot_handlers/drivers
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/hw_drivers/iot_handlers/drivers')
-rw-r--r--addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py220
-rw-r--r--addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py367
-rw-r--r--addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py271
-rw-r--r--addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py144
-rw-r--r--addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py316
5 files changed, 1318 insertions, 0 deletions
diff --git a/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py b/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py
new file mode 100644
index 00000000..0b79dd5b
--- /dev/null
+++ b/addons/hw_drivers/iot_handlers/drivers/DisplayDriver.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import jinja2
+import json
+import logging
+import netifaces as ni
+import os
+import subprocess
+import threading
+import time
+import urllib3
+
+from odoo import http
+from odoo.addons.hw_drivers.connection_manager import connection_manager
+from odoo.addons.hw_drivers.driver import Driver
+from odoo.addons.hw_drivers.event_manager import event_manager
+from odoo.addons.hw_drivers.main import iot_devices
+from odoo.addons.hw_drivers.tools import helpers
+
+path = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../views'))
+loader = jinja2.FileSystemLoader(path)
+
+jinja_env = jinja2.Environment(loader=loader, autoescape=True)
+jinja_env.filters["json"] = json.dumps
+
+pos_display_template = jinja_env.get_template('pos_display.html')
+
+_logger = logging.getLogger(__name__)
+
+
+class DisplayDriver(Driver):
+ connection_type = 'display'
+
+ def __init__(self, identifier, device):
+ super(DisplayDriver, self).__init__(identifier, device)
+ self.device_type = 'display'
+ self.device_connection = 'hdmi'
+ self.device_name = device['name']
+ self.event_data = threading.Event()
+ self.owner = False
+ self.rendered_html = ''
+ if self.device_identifier != 'distant_display':
+ self._x_screen = device.get('x_screen', '0')
+ self.load_url()
+
+ @classmethod
+ def supported(cls, device):
+ return True # All devices with connection_type == 'display' are supported
+
+ @classmethod
+ def get_default_display(cls):
+ displays = list(filter(lambda d: iot_devices[d].device_type == 'display', iot_devices))
+ return len(displays) and iot_devices[displays[0]]
+
+ def action(self, data):
+ if data.get('action') == "update_url" and self.device_identifier != 'distant_display':
+ self.update_url(data.get('url'))
+ elif data.get('action') == "display_refresh" and self.device_identifier != 'distant_display':
+ self.call_xdotools('F5')
+ elif data.get('action') == "take_control":
+ self.take_control(self.data['owner'], data.get('html'))
+ elif data.get('action') == "customer_facing_display":
+ self.update_customer_facing_display(self.data['owner'], data.get('html'))
+ elif data.get('action') == "get_owner":
+ self.data = {
+ 'value': '',
+ 'owner': self.owner,
+ }
+ event_manager.device_changed(self)
+
+ def run(self):
+ while self.device_identifier != 'distant_display' and not self._stopped.isSet():
+ time.sleep(60)
+ if self.url != 'http://localhost:8069/point_of_sale/display/' + self.device_identifier:
+ # Refresh the page every minute
+ self.call_xdotools('F5')
+
+ def update_url(self, url=None):
+ os.environ['DISPLAY'] = ":0." + self._x_screen
+ os.environ['XAUTHORITY'] = '/run/lightdm/pi/xauthority'
+ firefox_env = os.environ.copy()
+ firefox_env['HOME'] = '/tmp/' + self._x_screen
+ self.url = url or 'http://localhost:8069/point_of_sale/display/' + self.device_identifier
+ new_window = subprocess.call(['xdotool', 'search', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox'])
+ subprocess.Popen(['firefox', self.url], env=firefox_env)
+ if new_window:
+ self.call_xdotools('F11')
+
+ def load_url(self):
+ url = None
+ if helpers.get_odoo_server_url():
+ # disable certifiacte verification
+ urllib3.disable_warnings()
+ http = urllib3.PoolManager(cert_reqs='CERT_NONE')
+ try:
+ response = http.request('GET', "%s/iot/box/%s/display_url" % (helpers.get_odoo_server_url(), helpers.get_mac_address()))
+ if response.status == 200:
+ data = json.loads(response.data.decode('utf8'))
+ url = data[self.device_identifier]
+ except json.decoder.JSONDecodeError:
+ url = response.data.decode('utf8')
+ except Exception:
+ pass
+ return self.update_url(url)
+
+ def call_xdotools(self, keystroke):
+ os.environ['DISPLAY'] = ":0." + self._x_screen
+ os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority"
+ try:
+ subprocess.call(['xdotool', 'search', '--sync', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox', 'key', keystroke])
+ return "xdotool succeeded in stroking " + keystroke
+ except:
+ return "xdotool threw an error, maybe it is not installed on the IoTBox"
+
+ def update_customer_facing_display(self, origin, html=None):
+ if origin == self.owner:
+ self.rendered_html = html
+ self.event_data.set()
+
+ def get_serialized_order(self):
+ # IMPLEMENTATION OF LONGPOLLING
+ # Times out 2 seconds before the JS request does
+ if self.event_data.wait(28):
+ self.event_data.clear()
+ return {'rendered_html': self.rendered_html}
+ return {'rendered_html': False}
+
+ def take_control(self, new_owner, html=None):
+ # ALLOW A CASHIER TO TAKE CONTROL OVER THE POSBOX, IN CASE OF MULTIPLE CASHIER PER DISPLAY
+ self.owner = new_owner
+ self.rendered_html = html
+ self.data = {
+ 'value': '',
+ 'owner': self.owner,
+ }
+ event_manager.device_changed(self)
+ self.event_data.set()
+
+class DisplayController(http.Controller):
+
+ @http.route('/hw_proxy/display_refresh', type='json', auth='none', cors='*')
+ def display_refresh(self):
+ display = DisplayDriver.get_default_display()
+ if display and display.device_identifier != 'distant_display':
+ return display.call_xdotools('F5')
+
+ @http.route('/hw_proxy/customer_facing_display', type='json', auth='none', cors='*')
+ def customer_facing_display(self, html=None):
+ display = DisplayDriver.get_default_display()
+ if display:
+ display.update_customer_facing_display(http.request.httprequest.remote_addr, html)
+ return {'status': 'updated'}
+ return {'status': 'failed'}
+
+ @http.route('/hw_proxy/take_control', type='json', auth='none', cors='*')
+ def take_control(self, html=None):
+ display = DisplayDriver.get_default_display()
+ if display:
+ display.take_control(http.request.httprequest.remote_addr, html)
+ return {
+ 'status': 'success',
+ 'message': 'You now have access to the display',
+ }
+
+ @http.route('/hw_proxy/test_ownership', type='json', auth='none', cors='*')
+ def test_ownership(self):
+ display = DisplayDriver.get_default_display()
+ if display and display.owner == http.request.httprequest.remote_addr:
+ return {'status': 'OWNER'}
+ return {'status': 'NOWNER'}
+
+ @http.route(['/point_of_sale/get_serialized_order', '/point_of_sale/get_serialized_order/<string:display_identifier>'], type='json', auth='none')
+ def get_serialized_order(self, display_identifier=None):
+ if display_identifier:
+ display = iot_devices.get(display_identifier)
+ else:
+ display = DisplayDriver.get_default_display()
+
+ if display:
+ return display.get_serialized_order()
+ return {
+ 'rendered_html': False,
+ 'error': "No display found",
+ }
+
+ @http.route(['/point_of_sale/display', '/point_of_sale/display/<string:display_identifier>'], type='http', auth='none')
+ def display(self, display_identifier=None):
+ cust_js = None
+ interfaces = ni.interfaces()
+
+ with open(os.path.join(os.path.dirname(__file__), "../../static/src/js/worker.js")) as js:
+ cust_js = js.read()
+
+ display_ifaces = []
+ for iface_id in interfaces:
+ if 'wlan' in iface_id or 'eth' in iface_id:
+ iface_obj = ni.ifaddresses(iface_id)
+ ifconfigs = iface_obj.get(ni.AF_INET, [])
+ essid = helpers.get_ssid()
+ for conf in ifconfigs:
+ if conf.get('addr'):
+ display_ifaces.append({
+ 'iface_id': iface_id,
+ 'essid': essid,
+ 'addr': conf.get('addr'),
+ 'icon': 'sitemap' if 'eth' in iface_id else 'wifi',
+ })
+
+ if not display_identifier:
+ display_identifier = DisplayDriver.get_default_display().device_identifier
+
+ return pos_display_template.render({
+ 'title': "Odoo -- Point of Sale",
+ 'breadcrumb': 'POS Client display',
+ 'cust_js': cust_js,
+ 'display_ifaces': display_ifaces,
+ 'display_identifier': display_identifier,
+ 'pairing_code': connection_manager.pairing_code,
+ })
diff --git a/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py
new file mode 100644
index 00000000..29c7b12b
--- /dev/null
+++ b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py
@@ -0,0 +1,367 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import ctypes
+import evdev
+import json
+import logging
+from lxml import etree
+import os
+from pathlib import Path
+from queue import Queue, Empty
+import subprocess
+from threading import Lock
+import time
+import urllib3
+from usb import util
+
+from odoo import http, _
+from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
+from odoo.addons.hw_drivers.driver import Driver
+from odoo.addons.hw_drivers.event_manager import event_manager
+from odoo.addons.hw_drivers.main import iot_devices
+from odoo.addons.hw_drivers.tools import helpers
+
+_logger = logging.getLogger(__name__)
+xlib = ctypes.cdll.LoadLibrary('libX11.so.6')
+
+
+class KeyboardUSBDriver(Driver):
+ connection_type = 'usb'
+ keyboard_layout_groups = []
+ available_layouts = []
+
+ def __init__(self, identifier, device):
+ if not hasattr(KeyboardUSBDriver, 'display'):
+ os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority"
+ KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8"))
+
+ super(KeyboardUSBDriver, self).__init__(identifier, device)
+ self.device_connection = 'direct'
+ self.device_name = self._set_name()
+
+ # from https://github.com/xkbcommon/libxkbcommon/blob/master/test/evdev-scancodes.h
+ self._scancode_to_modifier = {
+ 42: 'left_shift',
+ 54: 'right_shift',
+ 58: 'caps_lock',
+ 69: 'num_lock',
+ 100: 'alt_gr', # right alt
+ }
+ self._tracked_modifiers = {modifier: False for modifier in self._scancode_to_modifier.values()}
+
+ if not KeyboardUSBDriver.available_layouts:
+ KeyboardUSBDriver.load_layouts_list()
+ KeyboardUSBDriver.send_layouts_list()
+
+ for evdev_device in [evdev.InputDevice(path) for path in evdev.list_devices()]:
+ if (device.idVendor == evdev_device.info.vendor) and (device.idProduct == evdev_device.info.product):
+ self.input_device = evdev_device
+
+ self._set_device_type('scanner') if self._is_scanner() else self._set_device_type()
+
+ @classmethod
+ def supported(cls, device):
+ for cfg in device:
+ for itf in cfg:
+ if itf.bInterfaceClass == 3 and itf.bInterfaceProtocol != 2:
+ device.interface_protocol = itf.bInterfaceProtocol
+ return True
+ return False
+
+ @classmethod
+ def get_status(self):
+ """Allows `hw_proxy.Proxy` to retrieve the status of the scanners"""
+ status = 'connected' if any(iot_devices[d].device_type == "scanner" for d in iot_devices) else 'disconnected'
+ return {'status': status, 'messages': ''}
+
+ @classmethod
+ def send_layouts_list(cls):
+ server = helpers.get_odoo_server_url()
+ if server:
+ urllib3.disable_warnings()
+ pm = urllib3.PoolManager(cert_reqs='CERT_NONE')
+ server = server + '/iot/keyboard_layouts'
+ try:
+ pm.request('POST', server, fields={'available_layouts': json.dumps(cls.available_layouts)})
+ except Exception as e:
+ _logger.error('Could not reach configured server')
+ _logger.error('A error encountered : %s ' % e)
+
+ @classmethod
+ def load_layouts_list(cls):
+ tree = etree.parse("/usr/share/X11/xkb/rules/base.xml", etree.XMLParser(ns_clean=True, recover=True))
+ layouts = tree.xpath("//layout")
+ for layout in layouts:
+ layout_name = layout.xpath("./configItem/name")[0].text
+ layout_description = layout.xpath("./configItem/description")[0].text
+ KeyboardUSBDriver.available_layouts.append({
+ 'name': layout_description,
+ 'layout': layout_name,
+ })
+ for variant in layout.xpath("./variantList/variant"):
+ variant_name = variant.xpath("./configItem/name")[0].text
+ variant_description = variant.xpath("./configItem/description")[0].text
+ KeyboardUSBDriver.available_layouts.append({
+ 'name': variant_description,
+ 'layout': layout_name,
+ 'variant': variant_name,
+ })
+
+ def _set_name(self):
+ try:
+ manufacturer = util.get_string(self.dev, self.dev.iManufacturer)
+ product = util.get_string(self.dev, self.dev.iProduct)
+ return ("%s - %s") % (manufacturer, product)
+ except ValueError as e:
+ _logger.warning(e)
+ return _('Unknown input device')
+
+ def action(self, data):
+ if data.get('action', False) == 'update_layout':
+ layout = {
+ 'layout': data.get('layout'),
+ 'variant': data.get('variant'),
+ }
+ self._change_keyboard_layout(layout)
+ self.save_layout(layout)
+ elif data.get('action', False) == 'update_is_scanner':
+ is_scanner = {'is_scanner': data.get('is_scanner')}
+ self.save_is_scanner(is_scanner)
+ else:
+ self.data['value'] = ''
+ event_manager.device_changed(self)
+
+ def run(self):
+ try:
+ for event in self.input_device.read_loop():
+ if self._stopped.isSet():
+ break
+ if event.type == evdev.ecodes.EV_KEY:
+ data = evdev.categorize(event)
+
+ modifier_name = self._scancode_to_modifier.get(data.scancode)
+ if modifier_name:
+ if modifier_name in ('caps_lock', 'num_lock'):
+ if data.keystate == 1:
+ self._tracked_modifiers[modifier_name] = not self._tracked_modifiers[modifier_name]
+ else:
+ self._tracked_modifiers[modifier_name] = bool(data.keystate) # 1 for keydown, 0 for keyup
+ elif data.keystate == 1:
+ self.key_input(data.scancode)
+
+ except Exception as err:
+ _logger.warning(err)
+
+ def _change_keyboard_layout(self, new_layout):
+ """Change the layout of the current device to what is specified in
+ new_layout.
+
+ Args:
+ new_layout (dict): A dict containing two keys:
+ - layout (str): The layout code
+ - variant (str): An optional key to represent the variant of the
+ selected layout
+ """
+ if hasattr(self, 'keyboard_layout'):
+ KeyboardUSBDriver.keyboard_layout_groups.remove(self.keyboard_layout)
+
+ if new_layout:
+ self.keyboard_layout = new_layout.get('layout') or 'us'
+ if new_layout.get('variant'):
+ self.keyboard_layout += "(%s)" % new_layout['variant']
+ else:
+ self.keyboard_layout = 'us'
+
+ KeyboardUSBDriver.keyboard_layout_groups.append(self.keyboard_layout)
+ subprocess.call(["setxkbmap", "-display", ":0.0", ",".join(KeyboardUSBDriver.keyboard_layout_groups)])
+
+ # Close then re-open display to refresh the mapping
+ xlib.XCloseDisplay(KeyboardUSBDriver.display)
+ KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8"))
+
+ def save_layout(self, layout):
+ """Save the layout to a file on the box to read it when restarting it.
+ We need that in order to keep the selected layout after a reboot.
+
+ Args:
+ new_layout (dict): A dict containing two keys:
+ - layout (str): The layout code
+ - variant (str): An optional key to represent the variant of the
+ selected layout
+ """
+ file_path = Path.home() / 'odoo-keyboard-layouts.conf'
+ if file_path.exists():
+ data = json.loads(file_path.read_text())
+ else:
+ data = {}
+ data[self.device_identifier] = layout
+ helpers.write_file('odoo-keyboard-layouts.conf', json.dumps(data))
+
+ def save_is_scanner(self, is_scanner):
+ """Save the type of device.
+ We need that in order to keep the selected type of device after a reboot.
+ """
+ file_path = Path.home() / 'odoo-keyboard-is-scanner.conf'
+ if file_path.exists():
+ data = json.loads(file_path.read_text())
+ else:
+ data = {}
+ data[self.device_identifier] = is_scanner
+ helpers.write_file('odoo-keyboard-is-scanner.conf', json.dumps(data))
+ self._set_device_type('scanner') if is_scanner.get('is_scanner') else self._set_device_type()
+
+ def load_layout(self):
+ """Read the layout from the saved filed and set it as current layout.
+ If no file or no layout is found we use 'us' by default.
+ """
+ file_path = Path.home() / 'odoo-keyboard-layouts.conf'
+ if file_path.exists():
+ data = json.loads(file_path.read_text())
+ layout = data.get(self.device_identifier, {'layout': 'us'})
+ else:
+ layout = {'layout': 'us'}
+ self._change_keyboard_layout(layout)
+
+ def _is_scanner(self):
+ """Read the device type from the saved filed and set it as current type.
+ If no file or no device type is found we try to detect it automatically.
+ """
+ device_name = self.device_name.lower()
+ scanner_name = ['barcode', 'scanner', 'reader']
+ is_scanner = any(x in device_name for x in scanner_name) or self.dev.interface_protocol == '0'
+
+ file_path = Path.home() / 'odoo-keyboard-is-scanner.conf'
+ if file_path.exists():
+ data = json.loads(file_path.read_text())
+ is_scanner = data.get(self.device_identifier, {}).get('is_scanner', is_scanner)
+ return is_scanner
+
+ def _keyboard_input(self, scancode):
+ """Deal with a keyboard input. Send the character corresponding to the
+ pressed key represented by its scancode to the connected Odoo instance.
+
+ Args:
+ scancode (int): The scancode of the pressed key.
+ """
+ self.data['value'] = self._scancode_to_char(scancode)
+ if self.data['value']:
+ event_manager.device_changed(self)
+
+ def _barcode_scanner_input(self, scancode):
+ """Deal with a barcode scanner input. Add the new character scanned to
+ the current barcode or complete the barcode if "Return" is pressed.
+ When a barcode is completed, two tasks are performed:
+ - Send a device_changed update to the event manager to notify the
+ listeners that the value has changed (used in Enterprise).
+ - Add the barcode to the list barcodes that are being queried in
+ Community.
+
+ Args:
+ scancode (int): The scancode of the pressed key.
+ """
+ if scancode == 28: # Return
+ self.data['value'] = self._current_barcode
+ event_manager.device_changed(self)
+ self._barcodes.put((time.time(), self._current_barcode))
+ self._current_barcode = ''
+ else:
+ self._current_barcode += self._scancode_to_char(scancode)
+
+ def _set_device_type(self, device_type='keyboard'):
+ """Modify the device type between 'keyboard' and 'scanner'
+
+ Args:
+ type (string): Type wanted to switch
+ """
+ if device_type == 'scanner':
+ self.device_type = 'scanner'
+ self.key_input = self._barcode_scanner_input
+ self._barcodes = Queue()
+ self._current_barcode = ''
+ self.input_device.grab()
+ self.read_barcode_lock = Lock()
+ else:
+ self.device_type = 'keyboard'
+ self.key_input = self._keyboard_input
+ self.load_layout()
+
+ def _scancode_to_char(self, scancode):
+ """Translate a received scancode to a character depending on the
+ selected keyboard layout and the current state of the keyboard's
+ modifiers.
+
+ Args:
+ scancode (int): The scancode of the pressed key, to be translated to
+ a character
+
+ Returns:
+ str: The translated scancode.
+ """
+ # Scancode -> Keysym : Depends on the keyboard layout
+ group = KeyboardUSBDriver.keyboard_layout_groups.index(self.keyboard_layout)
+ modifiers = self._get_active_modifiers(scancode)
+ keysym = ctypes.c_int(xlib.XkbKeycodeToKeysym(KeyboardUSBDriver.display, scancode + 8, group, modifiers))
+
+ # Translate Keysym to a character
+ key_pressed = ctypes.create_string_buffer(5)
+ xlib.XkbTranslateKeySym(KeyboardUSBDriver.display, ctypes.byref(keysym), 0, ctypes.byref(key_pressed), 5, ctypes.byref(ctypes.c_int()))
+ if key_pressed.value:
+ return key_pressed.value.decode('utf-8')
+ return ''
+
+ def _get_active_modifiers(self, scancode):
+ """Get the state of currently active modifiers.
+
+ Args:
+ scancode (int): The scancode of the key being translated
+
+ Returns:
+ int: The current state of the modifiers:
+ 0 -- Lowercase
+ 1 -- Highercase or (NumLock + key pressed on keypad)
+ 2 -- AltGr
+ 3 -- Highercase + AltGr
+ """
+ modifiers = 0
+ uppercase = (self._tracked_modifiers['right_shift'] or self._tracked_modifiers['left_shift']) ^ self._tracked_modifiers['caps_lock']
+ if uppercase or (scancode in [71, 72, 73, 75, 76, 77, 79, 80, 81, 82, 83] and self._tracked_modifiers['num_lock']):
+ modifiers += 1
+
+ if self._tracked_modifiers['alt_gr']:
+ modifiers += 2
+
+ return modifiers
+
+ def read_next_barcode(self):
+ """Get the value of the last barcode that was scanned but not sent yet
+ and not older than 5 seconds. This function is used in Community, when
+ we don't have access to the IoTLongpolling.
+
+ Returns:
+ str: The next barcode to be read or an empty string.
+ """
+
+ # Previous query still running, stop it by sending a fake barcode
+ if self.read_barcode_lock.locked():
+ self._barcodes.put((time.time(), ""))
+
+ with self.read_barcode_lock:
+ try:
+ timestamp, barcode = self._barcodes.get(True, 55)
+ if timestamp > time.time() - 5:
+ return barcode
+ except Empty:
+ return ''
+
+proxy_drivers['scanner'] = KeyboardUSBDriver
+
+
+class KeyboardUSBController(http.Controller):
+ @http.route('/hw_proxy/scanner', type='json', auth='none', cors='*')
+ def get_barcode(self):
+ scanners = [iot_devices[d] for d in iot_devices if iot_devices[d].device_type == "scanner"]
+ if scanners:
+ return scanners[0].read_next_barcode()
+ time.sleep(5)
+ return None
diff --git a/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py b/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py
new file mode 100644
index 00000000..bbd79f1c
--- /dev/null
+++ b/addons/hw_drivers/iot_handlers/drivers/PrinterDriver.py
@@ -0,0 +1,271 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from base64 import b64decode
+from cups import IPPError, IPP_PRINTER_IDLE, IPP_PRINTER_PROCESSING, IPP_PRINTER_STOPPED
+import dbus
+import io
+import logging
+import netifaces as ni
+import os
+from PIL import Image, ImageOps
+import re
+import subprocess
+import tempfile
+from uuid import getnode as get_mac
+
+from odoo import http
+from odoo.addons.hw_drivers.connection_manager import connection_manager
+from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
+from odoo.addons.hw_drivers.driver import Driver
+from odoo.addons.hw_drivers.event_manager import event_manager
+from odoo.addons.hw_drivers.iot_handlers.interfaces.PrinterInterface import PPDs, conn, cups_lock
+from odoo.addons.hw_drivers.main import iot_devices
+from odoo.addons.hw_drivers.tools import helpers
+
+_logger = logging.getLogger(__name__)
+
+RECEIPT_PRINTER_COMMANDS = {
+ 'star': {
+ 'center': b'\x1b\x1d\x61\x01', # ESC GS a n
+ 'cut': b'\x1b\x64\x02', # ESC d n
+ 'title': b'\x1b\x69\x01\x01%s\x1b\x69\x00\x00', # ESC i n1 n2
+ 'drawers': [b'\x07', b'\x1a'] # BEL & SUB
+ },
+ 'escpos': {
+ 'center': b'\x1b\x61\x01', # ESC a n
+ 'cut': b'\x1d\x56\x41\n', # GS V m
+ 'title': b'\x1b\x21\x30%s\x1b\x21\x00', # ESC ! n
+ 'drawers': [b'\x1b\x3d\x01', b'\x1b\x70\x00\x19\x19', b'\x1b\x70\x01\x19\x19'] # ESC = n then ESC p m t1 t2
+ }
+}
+
+def cups_notification_handler(message, uri, device_identifier, state, reason, accepting_jobs):
+ if device_identifier in iot_devices:
+ reason = reason if reason != 'none' else None
+ state_value = {
+ IPP_PRINTER_IDLE: 'connected',
+ IPP_PRINTER_PROCESSING: 'processing',
+ IPP_PRINTER_STOPPED: 'stopped'
+ }
+ iot_devices[device_identifier].update_status(state_value[state], message, reason)
+
+# Create a Cups subscription if it doesn't exist yet
+try:
+ conn.getSubscriptions('/printers/')
+except IPPError:
+ conn.createSubscription(
+ uri='/printers/',
+ recipient_uri='dbus://',
+ events=['printer-state-changed']
+ )
+
+# Listen for notifications from Cups
+bus = dbus.SystemBus()
+bus.add_signal_receiver(cups_notification_handler, signal_name="PrinterStateChanged", dbus_interface="org.cups.cupsd.Notifier")
+
+
+class PrinterDriver(Driver):
+ connection_type = 'printer'
+
+ def __init__(self, identifier, device):
+ super(PrinterDriver, self).__init__(identifier, device)
+ self.device_type = 'printer'
+ self.device_connection = device['device-class'].lower()
+ self.device_name = device['device-make-and-model']
+ self.state = {
+ 'status': 'connecting',
+ 'message': 'Connecting to printer',
+ 'reason': None,
+ }
+ self.send_status()
+
+ self.receipt_protocol = 'star' if 'STR_T' in device['device-id'] else 'escpos'
+ if 'direct' in self.device_connection and any(cmd in device['device-id'] for cmd in ['CMD:STAR;', 'CMD:ESC/POS;']):
+ self.print_status()
+
+ @classmethod
+ def supported(cls, device):
+ if device.get('supported', False):
+ return True
+ protocol = ['dnssd', 'lpd', 'socket']
+ if any(x in device['url'] for x in protocol) and device['device-make-and-model'] != 'Unknown' or 'direct' in device['device-class']:
+ model = cls.get_device_model(device)
+ ppdFile = ''
+ for ppd in PPDs:
+ if model and model in PPDs[ppd]['ppd-product']:
+ ppdFile = ppd
+ break
+ with cups_lock:
+ if ppdFile:
+ conn.addPrinter(name=device['identifier'], ppdname=ppdFile, device=device['url'])
+ else:
+ conn.addPrinter(name=device['identifier'], device=device['url'])
+ conn.setPrinterInfo(device['identifier'], device['device-make-and-model'])
+ conn.enablePrinter(device['identifier'])
+ conn.acceptJobs(device['identifier'])
+ conn.setPrinterUsersAllowed(device['identifier'], ['all'])
+ conn.addPrinterOptionDefault(device['identifier'], "usb-no-reattach", "true")
+ conn.addPrinterOptionDefault(device['identifier'], "usb-unidir", "true")
+ return True
+ return False
+
+ @classmethod
+ def get_device_model(cls, device):
+ device_model = ""
+ if device.get('device-id'):
+ for device_id in [device_lo for device_lo in device['device-id'].split(';')]:
+ if any(x in device_id for x in ['MDL', 'MODEL']):
+ device_model = device_id.split(':')[1]
+ break
+ elif device.get('device-make-and-model'):
+ device_model = device['device-make-and-model']
+ return re.sub("[\(].*?[\)]", "", device_model).strip()
+
+ @classmethod
+ def get_status(cls):
+ status = 'connected' if any(iot_devices[d].device_type == "printer" and iot_devices[d].device_connection == 'direct' for d in iot_devices) else 'disconnected'
+ return {'status': status, 'messages': ''}
+
+ def action(self, data):
+ if data.get('action') == 'cashbox':
+ self.open_cashbox()
+ elif data.get('action') == 'print_receipt':
+ self.print_receipt(b64decode(data['receipt']))
+ else:
+ self.print_raw(b64decode(data['document']))
+
+ def disconnect(self):
+ self.update_status('disconnected', 'Printer was disconnected')
+ super(PrinterDriver, self).disconnect()
+
+ def update_status(self, status, message, reason=None):
+ """Updates the state of the current printer.
+
+ Args:
+ status (str): The new value of the status
+ message (str): A comprehensive message describing the status
+ reason (str): The reason fo the current status
+ """
+ if self.state['status'] != status or self.state['reason'] != reason:
+ self.state = {
+ 'status': status,
+ 'message': message,
+ 'reason': reason,
+ }
+ self.send_status()
+
+ def send_status(self):
+ """ Sends the current status of the printer to the connected Odoo instance.
+ """
+ self.data = {
+ 'value': '',
+ 'state': self.state,
+ }
+ event_manager.device_changed(self)
+
+ def print_raw(self, data):
+ process = subprocess.Popen(["lp", "-d", self.device_identifier], stdin=subprocess.PIPE)
+ process.communicate(data)
+
+ def print_receipt(self, receipt):
+ im = Image.open(io.BytesIO(receipt))
+
+ # Convert to greyscale then to black and white
+ im = im.convert("L")
+ im = ImageOps.invert(im)
+ im = im.convert("1")
+
+ print_command = getattr(self, 'format_%s' % self.receipt_protocol)(im)
+ self.print_raw(print_command)
+
+ def format_star(self, im):
+ width = int((im.width + 7) / 8)
+
+ raster_init = b'\x1b\x2a\x72\x41'
+ raster_page_length = b'\x1b\x2a\x72\x50\x30\x00'
+ raster_send = b'\x62'
+ raster_close = b'\x1b\x2a\x72\x42'
+
+ raster_data = b''
+ dots = im.tobytes()
+ while len(dots):
+ raster_data += raster_send + width.to_bytes(2, 'little') + dots[:width]
+ dots = dots[width:]
+
+ return raster_init + raster_page_length + raster_data + raster_close
+
+ def format_escpos(self, im):
+ width = int((im.width + 7) / 8)
+
+ raster_send = b'\x1d\x76\x30\x00'
+ max_slice_height = 255
+
+ raster_data = b''
+ dots = im.tobytes()
+ while len(dots):
+ im_slice = dots[:width*max_slice_height]
+ slice_height = int(len(im_slice) / width)
+ raster_data += raster_send + width.to_bytes(2, 'little') + slice_height.to_bytes(2, 'little') + im_slice
+ dots = dots[width*max_slice_height:]
+
+ return raster_data + RECEIPT_PRINTER_COMMANDS['escpos']['cut']
+
+ def print_status(self):
+ """Prints the status ticket of the IoTBox on the current printer."""
+ wlan = ''
+ ip = ''
+ mac = ''
+ homepage = ''
+ pairing_code = ''
+
+ ssid = helpers.get_ssid()
+ wlan = '\nWireless network:\n%s\n\n' % ssid
+
+ interfaces = ni.interfaces()
+ ips = []
+ for iface_id in interfaces:
+ iface_obj = ni.ifaddresses(iface_id)
+ ifconfigs = iface_obj.get(ni.AF_INET, [])
+ for conf in ifconfigs:
+ if conf.get('addr') and conf.get('addr'):
+ ips.append(conf.get('addr'))
+ if len(ips) == 0:
+ ip = '\nERROR: Could not connect to LAN\n\nPlease check that the IoTBox is correc-\ntly connected with a network cable,\n that the LAN is setup with DHCP, and\nthat network addresses are available'
+ elif len(ips) == 1:
+ ip = '\nIP Address:\n%s\n' % ips[0]
+ else:
+ ip = '\nIP Addresses:\n%s\n' % '\n'.join(ips)
+
+ if len(ips) >= 1:
+ ips_filtered = [i for i in ips if i != '127.0.0.1']
+ main_ips = ips_filtered and ips_filtered[0] or '127.0.0.1'
+ mac = '\nMAC Address:\n%s\n' % helpers.get_mac_address()
+ homepage = '\nHomepage:\nhttp://%s:8069\n\n' % main_ips
+
+ code = connection_manager.pairing_code
+ if code:
+ pairing_code = '\nPairing Code:\n%s\n' % code
+
+ commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
+ title = commands['title'] % b'IoTBox Status'
+ self.print_raw(commands['center'] + title + b'\n' + wlan.encode() + mac.encode() + ip.encode() + homepage.encode() + pairing_code.encode() + commands['cut'])
+
+ def open_cashbox(self):
+ """Sends a signal to the current printer to open the connected cashbox."""
+ commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
+ for drawer in commands['drawers']:
+ self.print_raw(drawer)
+
+
+class PrinterController(http.Controller):
+
+ @http.route('/hw_proxy/default_printer_action', type='json', auth='none', cors='*')
+ def default_printer_action(self, data):
+ printer = next((d for d in iot_devices if iot_devices[d].device_type == 'printer' and iot_devices[d].device_connection == 'direct'), None)
+ if printer:
+ iot_devices[printer].action(data)
+ return True
+ return False
+
+proxy_drivers['printer'] = PrinterDriver
diff --git a/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py b/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py
new file mode 100644
index 00000000..412d773e
--- /dev/null
+++ b/addons/hw_drivers/iot_handlers/drivers/SerialBaseDriver.py
@@ -0,0 +1,144 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from collections import namedtuple
+from contextlib import contextmanager
+import logging
+import serial
+from threading import Lock
+import time
+import traceback
+
+from odoo import _
+from odoo.addons.hw_drivers.event_manager import event_manager
+from odoo.addons.hw_drivers.driver import Driver
+
+_logger = logging.getLogger(__name__)
+
+SerialProtocol = namedtuple(
+ 'SerialProtocol',
+ "name baudrate bytesize stopbits parity timeout writeTimeout measureRegexp statusRegexp "
+ "commandTerminator commandDelay measureDelay newMeasureDelay "
+ "measureCommand emptyAnswerValid")
+
+
+@contextmanager
+def serial_connection(path, protocol, is_probing=False):
+ """Opens a serial connection to a device and closes it automatically after use.
+
+ :param path: path to the device
+ :type path: string
+ :param protocol: an object containing the serial protocol to connect to a device
+ :type protocol: namedtuple
+ :param is_probing: a flag thet if set to `True` makes the timeouts longer, defaults to False
+ :type is_probing: bool, optional
+ """
+
+ PROBING_TIMEOUT = 1
+ port_config = {
+ 'baudrate': protocol.baudrate,
+ 'bytesize': protocol.bytesize,
+ 'stopbits': protocol.stopbits,
+ 'parity': protocol.parity,
+ 'timeout': PROBING_TIMEOUT if is_probing else protocol.timeout, # longer timeouts for probing
+ 'writeTimeout': PROBING_TIMEOUT if is_probing else protocol.writeTimeout # longer timeouts for probing
+ }
+ connection = serial.Serial(path, **port_config)
+ yield connection
+ connection.close()
+
+
+class SerialDriver(Driver):
+ """Abstract base class for serial drivers."""
+
+ _protocol = None
+ connection_type = 'serial'
+
+ STATUS_CONNECTED = 'connected'
+ STATUS_ERROR = 'error'
+ STATUS_CONNECTING = 'connecting'
+
+ def __init__(self, identifier, device):
+ """ Attributes initialization method for `SerialDriver`.
+
+ :param device: path to the device
+ :type device: str
+ """
+
+ super(SerialDriver, self).__init__(identifier, device)
+ self._actions = {
+ 'get_status': self._push_status,
+ }
+ self.device_connection = 'serial'
+ self._device_lock = Lock()
+ self._status = {'status': self.STATUS_CONNECTING, 'message_title': '', 'message_body': ''}
+ self._set_name()
+
+ def _get_raw_response(connection):
+ pass
+
+ def _push_status(self):
+ """Updates the current status and pushes it to the frontend."""
+
+ self.data['status'] = self._status
+ event_manager.device_changed(self)
+
+ def _set_name(self):
+ """Tries to build the device's name based on its type and protocol name but falls back on a default name if that doesn't work."""
+
+ try:
+ name = ('%s serial %s' % (self._protocol.name, self.device_type)).title()
+ except Exception:
+ name = 'Unknown Serial Device'
+ self.device_name = name
+
+ def _take_measure(self):
+ pass
+
+ def _do_action(self, data):
+ """Helper function that calls a specific action method on the device.
+
+ :param data: the `_actions` key mapped to the action method we want to call
+ :type data: string
+ """
+
+ try:
+ with self._device_lock:
+ self._actions[data['action']](data)
+ time.sleep(self._protocol.commandDelay)
+ except Exception:
+ msg = _('An error occured while performing action %s on %s') % (data, self.device_name)
+ _logger.exception(msg)
+ self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
+ self._push_status()
+
+ def action(self, data):
+ """Establish a connection with the device if needed and have it perform a specific action.
+
+ :param data: the `_actions` key mapped to the action method we want to call
+ :type data: string
+ """
+
+ if self._connection and self._connection.isOpen():
+ self._do_action(data)
+ else:
+ with serial_connection(self.device_identifier, self._protocol) as connection:
+ self._connection = connection
+ self._do_action(data)
+
+ def run(self):
+ """Continuously gets new measures from the device."""
+
+ try:
+ with serial_connection(self.device_identifier, self._protocol) as connection:
+ self._connection = connection
+ self._status['status'] = self.STATUS_CONNECTED
+ self._push_status()
+ while not self._stopped.isSet():
+ self._take_measure()
+ time.sleep(self._protocol.newMeasureDelay)
+ except Exception:
+ msg = _('Error while reading %s', self.device_name)
+ _logger.exception(msg)
+ self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
+ self._push_status()
diff --git a/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py b/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py
new file mode 100644
index 00000000..cc1d5469
--- /dev/null
+++ b/addons/hw_drivers/iot_handlers/drivers/SerialScaleDriver.py
@@ -0,0 +1,316 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from collections import namedtuple
+import logging
+import re
+import serial
+import threading
+import time
+
+from odoo import http
+from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
+from odoo.addons.hw_drivers.event_manager import event_manager
+from odoo.addons.hw_drivers.iot_handlers.drivers.SerialBaseDriver import SerialDriver, SerialProtocol, serial_connection
+
+
+_logger = logging.getLogger(__name__)
+
+# Only needed to ensure compatibility with older versions of Odoo
+ACTIVE_SCALE = None
+new_weight_event = threading.Event()
+
+ScaleProtocol = namedtuple('ScaleProtocol', SerialProtocol._fields + ('zeroCommand', 'tareCommand', 'clearCommand', 'autoResetWeight'))
+
+# 8217 Mettler-Toledo (Weight-only) Protocol, as described in the scale's Service Manual.
+# e.g. here: https://www.manualslib.com/manual/861274/Mettler-Toledo-Viva.html?page=51#manual
+# Our recommended scale, the Mettler-Toledo "Ariva-S", supports this protocol on
+# both the USB and RS232 ports, it can be configured in the setup menu as protocol option 3.
+# We use the default serial protocol settings, the scale's settings can be configured in the
+# scale's menu anyway.
+Toledo8217Protocol = ScaleProtocol(
+ name='Toledo 8217',
+ baudrate=9600,
+ bytesize=serial.SEVENBITS,
+ stopbits=serial.STOPBITS_ONE,
+ parity=serial.PARITY_EVEN,
+ timeout=1,
+ writeTimeout=1,
+ measureRegexp=b"\x02\\s*([0-9.]+)N?\\r",
+ statusRegexp=b"\x02\\s*(\\?.)\\r",
+ commandDelay=0.2,
+ measureDelay=0.5,
+ newMeasureDelay=0.2,
+ commandTerminator=b'',
+ measureCommand=b'W',
+ zeroCommand=b'Z',
+ tareCommand=b'T',
+ clearCommand=b'C',
+ emptyAnswerValid=False,
+ autoResetWeight=False,
+)
+
+# The ADAM scales have their own RS232 protocol, usually documented in the scale's manual
+# e.g at https://www.adamequipment.com/media/docs/Print%20Publications/Manuals/PDF/AZEXTRA/AZEXTRA-UM.pdf
+# https://www.manualslib.com/manual/879782/Adam-Equipment-Cbd-4.html?page=32#manual
+# Only the baudrate and label format seem to be configurable in the AZExtra series.
+ADAMEquipmentProtocol = ScaleProtocol(
+ name='Adam Equipment',
+ baudrate=4800,
+ bytesize=serial.EIGHTBITS,
+ stopbits=serial.STOPBITS_ONE,
+ parity=serial.PARITY_NONE,
+ timeout=0.2,
+ writeTimeout=0.2,
+ measureRegexp=b"\s*([0-9.]+)kg", # LABEL format 3 + KG in the scale settings, but Label 1/2 should work
+ statusRegexp=None,
+ commandTerminator=b"\r\n",
+ commandDelay=0.2,
+ measureDelay=0.5,
+ # AZExtra beeps every time you ask for a weight that was previously returned!
+ # Adding an extra delay gives the operator a chance to remove the products
+ # before the scale starts beeping. Could not find a way to disable the beeps.
+ newMeasureDelay=5,
+ measureCommand=b'P',
+ zeroCommand=b'Z',
+ tareCommand=b'T',
+ clearCommand=None, # No clear command -> Tare again
+ emptyAnswerValid=True, # AZExtra does not answer unless a new non-zero weight has been detected
+ autoResetWeight=True, # AZExtra will not return 0 after removing products
+)
+
+
+# Ensures compatibility with older versions of Odoo
+class ScaleReadOldRoute(http.Controller):
+ @http.route('/hw_proxy/scale_read', type='json', auth='none', cors='*')
+ def scale_read(self):
+ if ACTIVE_SCALE:
+ return {'weight': ACTIVE_SCALE._scale_read_old_route()}
+ return None
+
+
+class ScaleDriver(SerialDriver):
+ """Abstract base class for scale drivers."""
+ last_sent_value = None
+
+ def __init__(self, identifier, device):
+ super(ScaleDriver, self).__init__(identifier, device)
+ self.device_type = 'scale'
+ self._set_actions()
+ self._is_reading = True
+
+ # Ensures compatibility with older versions of Odoo
+ # Only the last scale connected is kept
+ global ACTIVE_SCALE
+ ACTIVE_SCALE = self
+ proxy_drivers['scale'] = ACTIVE_SCALE
+
+ # Ensures compatibility with older versions of Odoo
+ # and allows using the `ProxyDevice` in the point of sale to retrieve the status
+ def get_status(self):
+ """Allows `hw_proxy.Proxy` to retrieve the status of the scales"""
+
+ status = self._status
+ return {'status': status['status'], 'messages': [status['message_title'], ]}
+
+ def _set_actions(self):
+ """Initializes `self._actions`, a map of action keys sent by the frontend to backend action methods."""
+
+ self._actions.update({
+ 'read_once': self._read_once_action,
+ 'set_zero': self._set_zero_action,
+ 'set_tare': self._set_tare_action,
+ 'clear_tare': self._clear_tare_action,
+ 'start_reading': self._start_reading_action,
+ 'stop_reading': self._stop_reading_action,
+ })
+
+ def _start_reading_action(self, data):
+ """Starts asking for the scale value."""
+ self._is_reading = True
+
+ def _stop_reading_action(self, data):
+ """Stops asking for the scale value."""
+ self._is_reading = False
+
+ def _clear_tare_action(self, data):
+ """Clears the scale current tare weight."""
+
+ # if the protocol has no clear tare command, we can just tare again
+ clearCommand = self._protocol.clearCommand or self._protocol.tareCommand
+ self._connection.write(clearCommand + self._protocol.commandTerminator)
+
+ def _read_once_action(self, data):
+ """Reads the scale current weight value and pushes it to the frontend."""
+
+ self._read_weight()
+ self.last_sent_value = self.data['value']
+ event_manager.device_changed(self)
+
+ def _set_zero_action(self, data):
+ """Makes the weight currently applied to the scale the new zero."""
+
+ self._connection.write(self._protocol.zeroCommand + self._protocol.commandTerminator)
+
+ def _set_tare_action(self, data):
+ """Sets the scale's current weight value as tare weight."""
+
+ self._connection.write(self._protocol.tareCommand + self._protocol.commandTerminator)
+
+ @staticmethod
+ def _get_raw_response(connection):
+ """Gets raw bytes containing the updated value of the device.
+
+ :param connection: a connection to the device's serial port
+ :type connection: pyserial.Serial
+ :return: the raw response to a weight request
+ :rtype: str
+ """
+
+ answer = []
+ while True:
+ char = connection.read(1)
+ if not char:
+ break
+ else:
+ answer.append(bytes(char))
+ return b''.join(answer)
+
+ def _read_weight(self):
+ """Asks for a new weight from the scale, checks if it is valid and, if it is, makes it the current value."""
+
+ protocol = self._protocol
+ self._connection.write(protocol.measureCommand + protocol.commandTerminator)
+ answer = self._get_raw_response(self._connection)
+ match = re.search(self._protocol.measureRegexp, answer)
+ if match:
+ self.data = {
+ 'value': float(match.group(1)),
+ 'status': self._status
+ }
+
+ # Ensures compatibility with older versions of Odoo
+ def _scale_read_old_route(self):
+ """Used when the iot app is not installed"""
+ with self._device_lock:
+ self._read_weight()
+ return self.data['value']
+
+ def _take_measure(self):
+ """Reads the device's weight value, and pushes that value to the frontend."""
+
+ with self._device_lock:
+ self._read_weight()
+ if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR:
+ self.last_sent_value = self.data['value']
+ event_manager.device_changed(self)
+
+
+class Toledo8217Driver(ScaleDriver):
+ """Driver for the Toldedo 8217 serial scale."""
+ _protocol = Toledo8217Protocol
+
+ def __init__(self, identifier, device):
+ super(Toledo8217Driver, self).__init__(identifier, device)
+ self.device_manufacturer = 'Toledo'
+
+ @classmethod
+ def supported(cls, device):
+ """Checks whether the device, which port info is passed as argument, is supported by the driver.
+
+ :param device: path to the device
+ :type device: str
+ :return: whether the device is supported by the driver
+ :rtype: bool
+ """
+
+ protocol = cls._protocol
+
+ try:
+ with serial_connection(device['identifier'], protocol, is_probing=True) as connection:
+ connection.write(b'Ehello' + protocol.commandTerminator)
+ time.sleep(protocol.commandDelay)
+ answer = connection.read(8)
+ if answer == b'\x02E\rhello':
+ connection.write(b'F' + protocol.commandTerminator)
+ return True
+ except serial.serialutil.SerialTimeoutException:
+ pass
+ except Exception:
+ _logger.exception('Error while probing %s with protocol %s' % (device, protocol.name))
+ return False
+
+
+class AdamEquipmentDriver(ScaleDriver):
+ """Driver for the Adam Equipment serial scale."""
+
+ _protocol = ADAMEquipmentProtocol
+ priority = 0 # Test the supported method of this driver last, after all other serial drivers
+
+ def __init__(self, identifier, device):
+ super(AdamEquipmentDriver, self).__init__(identifier, device)
+ self._is_reading = False
+ self._last_weight_time = 0
+ self.device_manufacturer = 'Adam'
+
+ def _check_last_weight_time(self):
+ """The ADAM doesn't make the difference between a value of 0 and "the same value as last time":
+ in both cases it returns an empty string.
+ With this, unless the weight changes, we give the user `TIME_WEIGHT_KEPT` seconds to log the new weight,
+ then change it back to zero to avoid keeping it indefinetely, which could cause issues.
+ In any case the ADAM must always go back to zero before it can weight again.
+ """
+
+ TIME_WEIGHT_KEPT = 10
+
+ if self.data['value'] is None:
+ if time.time() - self._last_weight_time > TIME_WEIGHT_KEPT:
+ self.data['value'] = 0
+ else:
+ self._last_weight_time = time.time()
+
+ def _take_measure(self):
+ """Reads the device's weight value, and pushes that value to the frontend."""
+
+ if self._is_reading:
+ with self._device_lock:
+ self._read_weight()
+ self._check_last_weight_time()
+ if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR:
+ self.last_sent_value = self.data['value']
+ event_manager.device_changed(self)
+ else:
+ time.sleep(0.5)
+
+ # Ensures compatibility with older versions of Odoo
+ def _scale_read_old_route(self):
+ """Used when the iot app is not installed"""
+
+ time.sleep(3)
+ with self._device_lock:
+ self._read_weight()
+ self._check_last_weight_time()
+ return self.data['value']
+
+ @classmethod
+ def supported(cls, device):
+ """Checks whether the device at `device` is supported by the driver.
+
+ :param device: path to the device
+ :type device: str
+ :return: whether the device is supported by the driver
+ :rtype: bool
+ """
+
+ protocol = cls._protocol
+
+ try:
+ with serial_connection(device['identifier'], protocol, is_probing=True) as connection:
+ connection.write(protocol.measureCommand + protocol.commandTerminator)
+ # Checking whether writing to the serial port using the Adam protocol raises a timeout exception is about the only thing we can do.
+ return True
+ except serial.serialutil.SerialTimeoutException:
+ pass
+ except Exception:
+ _logger.exception('Error while probing %s with protocol %s' % (device, protocol.name))
+ return False