1#!/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
6# Copyright (c) 2017 Red Hat, Inc.
7
8import libevdev
9import os
10import pytest
11import shutil
12import subprocess
13import time
14
15import logging
16
17from .base_device import BaseDevice, EvdevMatch, SysfsFile
18from pathlib import Path
19from typing import Final, List, Tuple
20
21logger = logging.getLogger("hidtools.test.base")
22
23# application to matches
24application_matches: Final = {
25    # pyright: ignore
26    "Accelerometer": EvdevMatch(
27        req_properties=[
28            libevdev.INPUT_PROP_ACCELEROMETER,
29        ]
30    ),
31    "Game Pad": EvdevMatch(  # in systemd, this is a lot more complex, but that will do
32        requires=[
33            libevdev.EV_ABS.ABS_X,
34            libevdev.EV_ABS.ABS_Y,
35            libevdev.EV_ABS.ABS_RX,
36            libevdev.EV_ABS.ABS_RY,
37            libevdev.EV_KEY.BTN_START,
38        ],
39        excl_properties=[
40            libevdev.INPUT_PROP_ACCELEROMETER,
41        ],
42    ),
43    "Joystick": EvdevMatch(  # in systemd, this is a lot more complex, but that will do
44        requires=[
45            libevdev.EV_ABS.ABS_RX,
46            libevdev.EV_ABS.ABS_RY,
47            libevdev.EV_KEY.BTN_START,
48        ],
49        excl_properties=[
50            libevdev.INPUT_PROP_ACCELEROMETER,
51        ],
52    ),
53    "Key": EvdevMatch(
54        requires=[
55            libevdev.EV_KEY.KEY_A,
56        ],
57        excl_properties=[
58            libevdev.INPUT_PROP_ACCELEROMETER,
59            libevdev.INPUT_PROP_DIRECT,
60            libevdev.INPUT_PROP_POINTER,
61        ],
62    ),
63    "Mouse": EvdevMatch(
64        requires=[
65            libevdev.EV_REL.REL_X,
66            libevdev.EV_REL.REL_Y,
67            libevdev.EV_KEY.BTN_LEFT,
68        ],
69        excl_properties=[
70            libevdev.INPUT_PROP_ACCELEROMETER,
71        ],
72    ),
73    "Pad": EvdevMatch(
74        requires=[
75            libevdev.EV_KEY.BTN_0,
76        ],
77        excludes=[
78            libevdev.EV_KEY.BTN_TOOL_PEN,
79            libevdev.EV_KEY.BTN_TOUCH,
80            libevdev.EV_ABS.ABS_DISTANCE,
81        ],
82        excl_properties=[
83            libevdev.INPUT_PROP_ACCELEROMETER,
84        ],
85    ),
86    "Pen": EvdevMatch(
87        requires=[
88            libevdev.EV_KEY.BTN_STYLUS,
89            libevdev.EV_ABS.ABS_X,
90            libevdev.EV_ABS.ABS_Y,
91        ],
92        excl_properties=[
93            libevdev.INPUT_PROP_ACCELEROMETER,
94        ],
95    ),
96    "Stylus": EvdevMatch(
97        requires=[
98            libevdev.EV_KEY.BTN_STYLUS,
99            libevdev.EV_ABS.ABS_X,
100            libevdev.EV_ABS.ABS_Y,
101        ],
102        excl_properties=[
103            libevdev.INPUT_PROP_ACCELEROMETER,
104        ],
105    ),
106    "Touch Pad": EvdevMatch(
107        requires=[
108            libevdev.EV_KEY.BTN_LEFT,
109            libevdev.EV_ABS.ABS_X,
110            libevdev.EV_ABS.ABS_Y,
111        ],
112        excludes=[libevdev.EV_KEY.BTN_TOOL_PEN, libevdev.EV_KEY.BTN_STYLUS],
113        req_properties=[
114            libevdev.INPUT_PROP_POINTER,
115        ],
116        excl_properties=[
117            libevdev.INPUT_PROP_ACCELEROMETER,
118        ],
119    ),
120    "Touch Screen": EvdevMatch(
121        requires=[
122            libevdev.EV_KEY.BTN_TOUCH,
123            libevdev.EV_ABS.ABS_X,
124            libevdev.EV_ABS.ABS_Y,
125        ],
126        excludes=[libevdev.EV_KEY.BTN_TOOL_PEN, libevdev.EV_KEY.BTN_STYLUS],
127        req_properties=[
128            libevdev.INPUT_PROP_DIRECT,
129        ],
130        excl_properties=[
131            libevdev.INPUT_PROP_ACCELEROMETER,
132        ],
133    ),
134}
135
136
137class UHIDTestDevice(BaseDevice):
138    def __init__(self, name, application, rdesc_str=None, rdesc=None, input_info=None):
139        super().__init__(name, application, rdesc_str, rdesc, input_info)
140        self.application_matches = application_matches
141        if name is None:
142            name = f"uhid test {self.__class__.__name__}"
143        if not name.startswith("uhid test "):
144            name = "uhid test " + self.name
145        self.name = name
146
147
148class BaseTestCase:
149    class TestUhid(object):
150        syn_event = libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT)  # type: ignore
151        key_event = libevdev.InputEvent(libevdev.EV_KEY)  # type: ignore
152        abs_event = libevdev.InputEvent(libevdev.EV_ABS)  # type: ignore
153        rel_event = libevdev.InputEvent(libevdev.EV_REL)  # type: ignore
154        msc_event = libevdev.InputEvent(libevdev.EV_MSC.MSC_SCAN)  # type: ignore
155
156        # List of kernel modules to load before starting the test
157        # if any module is not available (not compiled), the test will skip.
158        # Each element is a tuple '(kernel driver name, kernel module)',
159        # for example ("playstation", "hid-playstation")
160        kernel_modules: List[Tuple[str, str]] = []
161
162        # List of in kernel HID-BPF object files to load
163        # before starting the test
164        # Any existing pre-loaded HID-BPF module will be removed
165        # before the ones in this list will be manually loaded.
166        # Each Element is a tuple '(hid_bpf_object, rdesc_fixup_present)',
167        # for example '("xppen-ArtistPro16Gen2.bpf.o", True)'
168        # If 'rdesc_fixup_present' is True, the test needs to wait
169        # for one unbind and rebind before it can be sure the kernel is
170        # ready
171        hid_bpfs: List[Tuple[str, bool]] = []
172
173        def assertInputEventsIn(self, expected_events, effective_events):
174            effective_events = effective_events.copy()
175            for ev in expected_events:
176                assert ev in effective_events
177                effective_events.remove(ev)
178            return effective_events
179
180        def assertInputEvents(self, expected_events, effective_events):
181            remaining = self.assertInputEventsIn(expected_events, effective_events)
182            assert remaining == []
183
184        @classmethod
185        def debug_reports(cls, reports, uhdev=None, events=None):
186            data = [" ".join([f"{v:02x}" for v in r]) for r in reports]
187
188            if uhdev is not None:
189                human_data = [
190                    uhdev.parsed_rdesc.format_report(r, split_lines=True)
191                    for r in reports
192                ]
193                try:
194                    human_data = [
195                        f'\n\t       {" " * h.index("/")}'.join(h.split("\n"))
196                        for h in human_data
197                    ]
198                except ValueError:
199                    # '/' not found: not a numbered report
200                    human_data = ["\n\t      ".join(h.split("\n")) for h in human_data]
201                data = [f"{d}\n\t ====> {h}" for d, h in zip(data, human_data)]
202
203            reports = data
204
205            if len(reports) == 1:
206                print("sending 1 report:")
207            else:
208                print(f"sending {len(reports)} reports:")
209            for report in reports:
210                print("\t", report)
211
212            if events is not None:
213                print("events received:", events)
214
215        def create_device(self):
216            raise Exception("please reimplement me in subclasses")
217
218        def _load_kernel_module(self, kernel_driver, kernel_module):
219            sysfs_path = Path("/sys/bus/hid/drivers")
220            if kernel_driver is not None:
221                sysfs_path /= kernel_driver
222            else:
223                # special case for when testing all available modules:
224                # we don't know beforehand the name of the module from modinfo
225                sysfs_path = Path("/sys/module") / kernel_module.replace("-", "_")
226            if not sysfs_path.exists():
227                ret = subprocess.run(["/usr/sbin/modprobe", kernel_module])
228                if ret.returncode != 0:
229                    pytest.skip(
230                        f"module {kernel_module} could not be loaded, skipping the test"
231                    )
232
233        @pytest.fixture()
234        def load_kernel_module(self):
235            for kernel_driver, kernel_module in self.kernel_modules:
236                self._load_kernel_module(kernel_driver, kernel_module)
237            yield
238
239        def load_hid_bpfs(self):
240            script_dir = Path(os.path.dirname(os.path.realpath(__file__)))
241            root_dir = (script_dir / "../../../../..").resolve()
242            bpf_dir = root_dir / "drivers/hid/bpf/progs"
243
244            udev_hid_bpf = shutil.which("udev-hid-bpf")
245            if not udev_hid_bpf:
246                pytest.skip("udev-hid-bpf not found in $PATH, skipping")
247
248            wait = False
249            for _, rdesc_fixup in self.hid_bpfs:
250                if rdesc_fixup:
251                    wait = True
252
253            for hid_bpf, _ in self.hid_bpfs:
254                # We need to start `udev-hid-bpf` in the background
255                # and dispatch uhid events in case the kernel needs
256                # to fetch features on the device
257                process = subprocess.Popen(
258                    [
259                        "udev-hid-bpf",
260                        "--verbose",
261                        "add",
262                        str(self.uhdev.sys_path),
263                        str(bpf_dir / hid_bpf),
264                    ],
265                )
266                while process.poll() is None:
267                    self.uhdev.dispatch(1)
268
269                if process.poll() != 0:
270                    pytest.fail(
271                        f"Couldn't insert hid-bpf program '{hid_bpf}', marking the test as failed"
272                    )
273
274            if wait:
275                # the HID-BPF program exports a rdesc fixup, so it needs to be
276                # unbound by the kernel and then rebound.
277                # Ensure we get the bound event exactly 2 times (one for the normal
278                # uhid loading, and then the reload from HID-BPF)
279                now = time.time()
280                while self.uhdev.kernel_ready_count < 2 and time.time() - now < 2:
281                    self.uhdev.dispatch(1)
282
283                if self.uhdev.kernel_ready_count < 2:
284                    pytest.fail(
285                        f"Couldn't insert hid-bpf programs, marking the test as failed"
286                    )
287
288        def unload_hid_bpfs(self):
289            ret = subprocess.run(
290                ["udev-hid-bpf", "--verbose", "remove", str(self.uhdev.sys_path)],
291            )
292            if ret.returncode != 0:
293                pytest.fail(
294                    f"Couldn't unload hid-bpf programs, marking the test as failed"
295                )
296
297        @pytest.fixture()
298        def new_uhdev(self, load_kernel_module):
299            return self.create_device()
300
301        def assertName(self, uhdev):
302            evdev = uhdev.get_evdev()
303            assert uhdev.name in evdev.name
304
305        @pytest.fixture(autouse=True)
306        def context(self, new_uhdev, request):
307            try:
308                with HIDTestUdevRule.instance():
309                    with new_uhdev as self.uhdev:
310                        for skip_cond in request.node.iter_markers("skip_if_uhdev"):
311                            test, message, *rest = skip_cond.args
312
313                            if test(self.uhdev):
314                                pytest.skip(message)
315
316                        self.uhdev.create_kernel_device()
317                        now = time.time()
318                        while not self.uhdev.is_ready() and time.time() - now < 5:
319                            self.uhdev.dispatch(1)
320
321                        if self.hid_bpfs:
322                            self.load_hid_bpfs()
323
324                        if self.uhdev.get_evdev() is None:
325                            logger.warning(
326                                f"available list of input nodes: (default application is '{self.uhdev.application}')"
327                            )
328                            logger.warning(self.uhdev.input_nodes)
329                        yield
330                        if self.hid_bpfs:
331                            self.unload_hid_bpfs()
332                        self.uhdev = None
333            except PermissionError:
334                pytest.skip("Insufficient permissions, run me as root")
335
336        @pytest.fixture(autouse=True)
337        def check_taint(self):
338            # we are abusing SysfsFile here, it's in /proc, but meh
339            taint_file = SysfsFile("/proc/sys/kernel/tainted")
340            taint = taint_file.int_value
341
342            yield
343
344            assert taint_file.int_value == taint
345
346        def test_creation(self):
347            """Make sure the device gets processed by the kernel and creates
348            the expected application input node.
349
350            If this fail, there is something wrong in the device report
351            descriptors."""
352            uhdev = self.uhdev
353            assert uhdev is not None
354            assert uhdev.get_evdev() is not None
355            self.assertName(uhdev)
356            assert len(uhdev.next_sync_events()) == 0
357            assert uhdev.get_evdev() is not None
358
359
360class HIDTestUdevRule(object):
361    _instance = None
362    """
363    A context-manager compatible class that sets up our udev rules file and
364    deletes it on context exit.
365
366    This class is tailored to our test setup: it only sets up the udev rule
367    on the **second** context and it cleans it up again on the last context
368    removed. This matches the expected pytest setup: we enter a context for
369    the session once, then once for each test (the first of which will
370    trigger the udev rule) and once the last test exited and the session
371    exited, we clean up after ourselves.
372    """
373
374    def __init__(self):
375        self.refs = 0
376        self.rulesfile = None
377
378    def __enter__(self):
379        self.refs += 1
380        if self.refs == 2 and self.rulesfile is None:
381            self.create_udev_rule()
382            self.reload_udev_rules()
383
384    def __exit__(self, exc_type, exc_value, traceback):
385        self.refs -= 1
386        if self.refs == 0 and self.rulesfile:
387            os.remove(self.rulesfile.name)
388            self.reload_udev_rules()
389
390    def reload_udev_rules(self):
391        subprocess.run("udevadm control --reload-rules".split())
392        subprocess.run("systemd-hwdb update".split())
393
394    def create_udev_rule(self):
395        import tempfile
396
397        os.makedirs("/run/udev/rules.d", exist_ok=True)
398        with tempfile.NamedTemporaryFile(
399            prefix="91-uhid-test-device-REMOVEME-",
400            suffix=".rules",
401            mode="w+",
402            dir="/run/udev/rules.d",
403            delete=False,
404        ) as f:
405            f.write(
406                """
407KERNELS=="*input*", ATTRS{name}=="*uhid test *", ENV{LIBINPUT_IGNORE_DEVICE}="1"
408KERNELS=="*hid*", ENV{HID_NAME}=="*uhid test *", ENV{HID_BPF_IGNORE_DEVICE}="1"
409KERNELS=="*input*", ATTRS{name}=="*uhid test * System Multi Axis", ENV{ID_INPUT_TOUCHSCREEN}="", ENV{ID_INPUT_SYSTEM_MULTIAXIS}="1"
410"""
411            )
412            self.rulesfile = f
413
414    @classmethod
415    def instance(cls):
416        if not cls._instance:
417            cls._instance = HIDTestUdevRule()
418        return cls._instance
419