diff options
Diffstat (limited to 'addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py')
| -rw-r--r-- | addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py new file mode 100644 index 00000000..29c7b12b --- /dev/null +++ b/addons/hw_drivers/iot_handlers/drivers/KeyboardUSBDriver.py @@ -0,0 +1,367 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import ctypes +import evdev +import json +import logging +from lxml import etree +import os +from pathlib import Path +from queue import Queue, Empty +import subprocess +from threading import Lock +import time +import urllib3 +from usb import util + +from odoo import http, _ +from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers +from odoo.addons.hw_drivers.driver import Driver +from odoo.addons.hw_drivers.event_manager import event_manager +from odoo.addons.hw_drivers.main import iot_devices +from odoo.addons.hw_drivers.tools import helpers + +_logger = logging.getLogger(__name__) +xlib = ctypes.cdll.LoadLibrary('libX11.so.6') + + +class KeyboardUSBDriver(Driver): + connection_type = 'usb' + keyboard_layout_groups = [] + available_layouts = [] + + def __init__(self, identifier, device): + if not hasattr(KeyboardUSBDriver, 'display'): + os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority" + KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8")) + + super(KeyboardUSBDriver, self).__init__(identifier, device) + self.device_connection = 'direct' + self.device_name = self._set_name() + + # from https://github.com/xkbcommon/libxkbcommon/blob/master/test/evdev-scancodes.h + self._scancode_to_modifier = { + 42: 'left_shift', + 54: 'right_shift', + 58: 'caps_lock', + 69: 'num_lock', + 100: 'alt_gr', # right alt + } + self._tracked_modifiers = {modifier: False for modifier in self._scancode_to_modifier.values()} + + if not KeyboardUSBDriver.available_layouts: + KeyboardUSBDriver.load_layouts_list() + KeyboardUSBDriver.send_layouts_list() + + for evdev_device in [evdev.InputDevice(path) for path in evdev.list_devices()]: + if (device.idVendor == evdev_device.info.vendor) and (device.idProduct == evdev_device.info.product): + self.input_device = evdev_device + + self._set_device_type('scanner') if self._is_scanner() else self._set_device_type() + + @classmethod + def supported(cls, device): + for cfg in device: + for itf in cfg: + if itf.bInterfaceClass == 3 and itf.bInterfaceProtocol != 2: + device.interface_protocol = itf.bInterfaceProtocol + return True + return False + + @classmethod + def get_status(self): + """Allows `hw_proxy.Proxy` to retrieve the status of the scanners""" + status = 'connected' if any(iot_devices[d].device_type == "scanner" for d in iot_devices) else 'disconnected' + return {'status': status, 'messages': ''} + + @classmethod + def send_layouts_list(cls): + server = helpers.get_odoo_server_url() + if server: + urllib3.disable_warnings() + pm = urllib3.PoolManager(cert_reqs='CERT_NONE') + server = server + '/iot/keyboard_layouts' + try: + pm.request('POST', server, fields={'available_layouts': json.dumps(cls.available_layouts)}) + except Exception as e: + _logger.error('Could not reach configured server') + _logger.error('A error encountered : %s ' % e) + + @classmethod + def load_layouts_list(cls): + tree = etree.parse("/usr/share/X11/xkb/rules/base.xml", etree.XMLParser(ns_clean=True, recover=True)) + layouts = tree.xpath("//layout") + for layout in layouts: + layout_name = layout.xpath("./configItem/name")[0].text + layout_description = layout.xpath("./configItem/description")[0].text + KeyboardUSBDriver.available_layouts.append({ + 'name': layout_description, + 'layout': layout_name, + }) + for variant in layout.xpath("./variantList/variant"): + variant_name = variant.xpath("./configItem/name")[0].text + variant_description = variant.xpath("./configItem/description")[0].text + KeyboardUSBDriver.available_layouts.append({ + 'name': variant_description, + 'layout': layout_name, + 'variant': variant_name, + }) + + def _set_name(self): + try: + manufacturer = util.get_string(self.dev, self.dev.iManufacturer) + product = util.get_string(self.dev, self.dev.iProduct) + return ("%s - %s") % (manufacturer, product) + except ValueError as e: + _logger.warning(e) + return _('Unknown input device') + + def action(self, data): + if data.get('action', False) == 'update_layout': + layout = { + 'layout': data.get('layout'), + 'variant': data.get('variant'), + } + self._change_keyboard_layout(layout) + self.save_layout(layout) + elif data.get('action', False) == 'update_is_scanner': + is_scanner = {'is_scanner': data.get('is_scanner')} + self.save_is_scanner(is_scanner) + else: + self.data['value'] = '' + event_manager.device_changed(self) + + def run(self): + try: + for event in self.input_device.read_loop(): + if self._stopped.isSet(): + break + if event.type == evdev.ecodes.EV_KEY: + data = evdev.categorize(event) + + modifier_name = self._scancode_to_modifier.get(data.scancode) + if modifier_name: + if modifier_name in ('caps_lock', 'num_lock'): + if data.keystate == 1: + self._tracked_modifiers[modifier_name] = not self._tracked_modifiers[modifier_name] + else: + self._tracked_modifiers[modifier_name] = bool(data.keystate) # 1 for keydown, 0 for keyup + elif data.keystate == 1: + self.key_input(data.scancode) + + except Exception as err: + _logger.warning(err) + + def _change_keyboard_layout(self, new_layout): + """Change the layout of the current device to what is specified in + new_layout. + + Args: + new_layout (dict): A dict containing two keys: + - layout (str): The layout code + - variant (str): An optional key to represent the variant of the + selected layout + """ + if hasattr(self, 'keyboard_layout'): + KeyboardUSBDriver.keyboard_layout_groups.remove(self.keyboard_layout) + + if new_layout: + self.keyboard_layout = new_layout.get('layout') or 'us' + if new_layout.get('variant'): + self.keyboard_layout += "(%s)" % new_layout['variant'] + else: + self.keyboard_layout = 'us' + + KeyboardUSBDriver.keyboard_layout_groups.append(self.keyboard_layout) + subprocess.call(["setxkbmap", "-display", ":0.0", ",".join(KeyboardUSBDriver.keyboard_layout_groups)]) + + # Close then re-open display to refresh the mapping + xlib.XCloseDisplay(KeyboardUSBDriver.display) + KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8")) + + def save_layout(self, layout): + """Save the layout to a file on the box to read it when restarting it. + We need that in order to keep the selected layout after a reboot. + + Args: + new_layout (dict): A dict containing two keys: + - layout (str): The layout code + - variant (str): An optional key to represent the variant of the + selected layout + """ + file_path = Path.home() / 'odoo-keyboard-layouts.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + else: + data = {} + data[self.device_identifier] = layout + helpers.write_file('odoo-keyboard-layouts.conf', json.dumps(data)) + + def save_is_scanner(self, is_scanner): + """Save the type of device. + We need that in order to keep the selected type of device after a reboot. + """ + file_path = Path.home() / 'odoo-keyboard-is-scanner.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + else: + data = {} + data[self.device_identifier] = is_scanner + helpers.write_file('odoo-keyboard-is-scanner.conf', json.dumps(data)) + self._set_device_type('scanner') if is_scanner.get('is_scanner') else self._set_device_type() + + def load_layout(self): + """Read the layout from the saved filed and set it as current layout. + If no file or no layout is found we use 'us' by default. + """ + file_path = Path.home() / 'odoo-keyboard-layouts.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + layout = data.get(self.device_identifier, {'layout': 'us'}) + else: + layout = {'layout': 'us'} + self._change_keyboard_layout(layout) + + def _is_scanner(self): + """Read the device type from the saved filed and set it as current type. + If no file or no device type is found we try to detect it automatically. + """ + device_name = self.device_name.lower() + scanner_name = ['barcode', 'scanner', 'reader'] + is_scanner = any(x in device_name for x in scanner_name) or self.dev.interface_protocol == '0' + + file_path = Path.home() / 'odoo-keyboard-is-scanner.conf' + if file_path.exists(): + data = json.loads(file_path.read_text()) + is_scanner = data.get(self.device_identifier, {}).get('is_scanner', is_scanner) + return is_scanner + + def _keyboard_input(self, scancode): + """Deal with a keyboard input. Send the character corresponding to the + pressed key represented by its scancode to the connected Odoo instance. + + Args: + scancode (int): The scancode of the pressed key. + """ + self.data['value'] = self._scancode_to_char(scancode) + if self.data['value']: + event_manager.device_changed(self) + + def _barcode_scanner_input(self, scancode): + """Deal with a barcode scanner input. Add the new character scanned to + the current barcode or complete the barcode if "Return" is pressed. + When a barcode is completed, two tasks are performed: + - Send a device_changed update to the event manager to notify the + listeners that the value has changed (used in Enterprise). + - Add the barcode to the list barcodes that are being queried in + Community. + + Args: + scancode (int): The scancode of the pressed key. + """ + if scancode == 28: # Return + self.data['value'] = self._current_barcode + event_manager.device_changed(self) + self._barcodes.put((time.time(), self._current_barcode)) + self._current_barcode = '' + else: + self._current_barcode += self._scancode_to_char(scancode) + + def _set_device_type(self, device_type='keyboard'): + """Modify the device type between 'keyboard' and 'scanner' + + Args: + type (string): Type wanted to switch + """ + if device_type == 'scanner': + self.device_type = 'scanner' + self.key_input = self._barcode_scanner_input + self._barcodes = Queue() + self._current_barcode = '' + self.input_device.grab() + self.read_barcode_lock = Lock() + else: + self.device_type = 'keyboard' + self.key_input = self._keyboard_input + self.load_layout() + + def _scancode_to_char(self, scancode): + """Translate a received scancode to a character depending on the + selected keyboard layout and the current state of the keyboard's + modifiers. + + Args: + scancode (int): The scancode of the pressed key, to be translated to + a character + + Returns: + str: The translated scancode. + """ + # Scancode -> Keysym : Depends on the keyboard layout + group = KeyboardUSBDriver.keyboard_layout_groups.index(self.keyboard_layout) + modifiers = self._get_active_modifiers(scancode) + keysym = ctypes.c_int(xlib.XkbKeycodeToKeysym(KeyboardUSBDriver.display, scancode + 8, group, modifiers)) + + # Translate Keysym to a character + key_pressed = ctypes.create_string_buffer(5) + xlib.XkbTranslateKeySym(KeyboardUSBDriver.display, ctypes.byref(keysym), 0, ctypes.byref(key_pressed), 5, ctypes.byref(ctypes.c_int())) + if key_pressed.value: + return key_pressed.value.decode('utf-8') + return '' + + def _get_active_modifiers(self, scancode): + """Get the state of currently active modifiers. + + Args: + scancode (int): The scancode of the key being translated + + Returns: + int: The current state of the modifiers: + 0 -- Lowercase + 1 -- Highercase or (NumLock + key pressed on keypad) + 2 -- AltGr + 3 -- Highercase + AltGr + """ + modifiers = 0 + uppercase = (self._tracked_modifiers['right_shift'] or self._tracked_modifiers['left_shift']) ^ self._tracked_modifiers['caps_lock'] + if uppercase or (scancode in [71, 72, 73, 75, 76, 77, 79, 80, 81, 82, 83] and self._tracked_modifiers['num_lock']): + modifiers += 1 + + if self._tracked_modifiers['alt_gr']: + modifiers += 2 + + return modifiers + + def read_next_barcode(self): + """Get the value of the last barcode that was scanned but not sent yet + and not older than 5 seconds. This function is used in Community, when + we don't have access to the IoTLongpolling. + + Returns: + str: The next barcode to be read or an empty string. + """ + + # Previous query still running, stop it by sending a fake barcode + if self.read_barcode_lock.locked(): + self._barcodes.put((time.time(), "")) + + with self.read_barcode_lock: + try: + timestamp, barcode = self._barcodes.get(True, 55) + if timestamp > time.time() - 5: + return barcode + except Empty: + return '' + +proxy_drivers['scanner'] = KeyboardUSBDriver + + +class KeyboardUSBController(http.Controller): + @http.route('/hw_proxy/scanner', type='json', auth='none', cors='*') + def get_barcode(self): + scanners = [iot_devices[d] for d in iot_devices if iot_devices[d].device_type == "scanner"] + if scanners: + return scanners[0].read_next_barcode() + time.sleep(5) + return None |
