diff --git a/.github/workflows/install.yaml b/.github/workflows/install.yaml index e879179..bba0df4 100644 --- a/.github/workflows/install.yaml +++ b/.github/workflows/install.yaml @@ -11,10 +11,10 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] include: - os: ubuntu-latest - python-version: "3.9" + python-version: "3.10" steps: - uses: actions/checkout@v6 diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 20d254b..bd4f387 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,6 +22,7 @@ jobs: - name: Check for pylint errors run: | - python -m pip install pylint setuptools + python -m pip install pylint setuptools ruff python setup.py build python -m pylint --verbose -E build/lib*/evdev + python -m ruff check build/lib*/evdev diff --git a/examples/udev-example.py b/examples/udev-example.py index 8e827f6..9290dfd 100755 --- a/examples/udev-example.py +++ b/examples/udev-example.py @@ -37,11 +37,11 @@ # should adapt this to your needs if "py-evdev-uinput" in name: if udev.action == "add": - print("Device added: %s" % udev) + print(f"Device added: {udev}") fds[dev.fd] = InputDevice(udev.device_node) break if udev.action == "remove": - print("Device removed: %s" % udev) + print(f"Device removed: {udev}") def helper(): global fds diff --git a/pyproject.toml b/pyproject.toml index d0b4f7c..57877c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ description = "Bindings to the Linux input handling subsystem" keywords = ["evdev", "input", "uinput"] readme = "README.md" license = "BSD-3-Clause" -requires-python = ">=3.9" +requires-python = ">=3.10" authors = [ { name="Georgi Valkov", email="georgi.t.valkov@gmail.com" }, ] @@ -32,6 +32,7 @@ classifiers = [ line-length = 120 [tool.ruff.lint] +select = ["UP", "TC"] ignore = ["E265", "E241", "F403", "F401", "E401", "E731"] [tool.bumpversion] diff --git a/setup.py b/setup.py index 1f6eaac..d902b67 100755 --- a/setup.py +++ b/setup.py @@ -9,9 +9,12 @@ from setuptools import setup, Extension, Command from setuptools.command import build_ext as _build_ext +from evdev import genecodes_c + curdir = Path(__file__).resolve().parent ecodes_c_path = curdir / "src/evdev/ecodes.c" +ecodes_pyi_path = curdir / "src/evdev/_ecodes.pyi" def create_ecodes(headers=None, reproducible=False): @@ -63,13 +66,14 @@ def create_ecodes(headers=None, reproducible=False): sys.stderr.write(textwrap.dedent(msg)) sys.exit(1) - print("writing %s (using %s)" % (ecodes_c_path, " ".join(headers))) - with ecodes_c_path.open("w") as fh: - cmd = [sys.executable, "src/evdev/genecodes_c.py"] - if reproducible: - cmd.append("--reproducible") - cmd.extend(["--ecodes", *headers]) - run(cmd, check=True, stdout=fh) + for path, arg in [(ecodes_c_path, "--ecodes"), (ecodes_pyi_path, "--stubs")]: + print("writing %s (using %s)" % (path, " ".join(headers))) + with path.open("w") as fh: + cmd = [sys.executable, "src/evdev/genecodes_c.py"] + if reproducible: + cmd.append("--reproducible") + cmd.extend([arg, *headers]) + run(cmd, check=True, stdout=fh) class build_ecodes(Command): @@ -96,8 +100,8 @@ def run(self): class build_ext(_build_ext.build_ext): def has_ecodes(self): - if ecodes_c_path.exists(): - print("ecodes.c already exists ... skipping build_ecodes") + if ecodes_c_path.exists() and ecodes_pyi_path.exists(): + print("ecodes.c and _ecodes.pyi already exist ... skipping build_ecodes") return False return True diff --git a/src/evdev/__init__.py b/src/evdev/__init__.py index bae0fec..042e79f 100644 --- a/src/evdev/__init__.py +++ b/src/evdev/__init__.py @@ -2,8 +2,11 @@ # Gather everything into a single, convenient namespace. # -------------------------------------------------------------------------- -# The superfluous "import name as name" syntax is here to satisfy mypy's attrs-defined rule. -# Alternatively all exported objects can be listed in __all__. +# The "import name as name" syntax is here to satisfy Python's type system +# import conventions: +# https://typing.python.org/en/latest/spec/distributing.html#import-conventions + +from __future__ import annotations from . import ( ecodes as ecodes, diff --git a/src/evdev/_input.pyi b/src/evdev/_input.pyi new file mode 100644 index 0000000..69ee30c --- /dev/null +++ b/src/evdev/_input.pyi @@ -0,0 +1,42 @@ +"""Python bindings to certain linux input subsystem functions""" + +def ioctl_devinfo(fd: int, /) -> tuple[int, int, int, int, str, str, str]: + """fetch input device info""" + +def ioctl_capabilities( + fd: int, / +) -> dict[int, list[int | tuple[int, tuple[int, int, int, int, int, int]]]]: + """fetch input device capabilities""" + +def ioctl_EVIOCGABS(fd: int, ev_code: int, /) -> tuple[int, int, int, int, int, int]: + """get input device absinfo""" + +def ioctl_EVIOCSABS( + fd: int, + ev_code: int, + absinfo: tuple[int, int, int, int, int, int], + /, +) -> None: + """set input device absinfo""" + +def ioctl_EVIOCGREP(fd: int, /) -> tuple[int, int]: ... +def ioctl_EVIOCSREP(fd: int, delay: int, period: int, /) -> int: ... +def ioctl_EVIOCGVERSION(fd: int, /) -> int: ... +def ioctl_EVIOCGRAB(fd: int, flag: int, /) -> None: ... +def ioctl_EVIOCGEFFECTS(fd: int, /) -> int: + """fetch the number of effects the device can keep in its memory.""" + +def ioctl_EVIOCG_bits(fd: int, evtype: int, /) -> list[int]: + """get state of KEY|LED|SND|SW""" + +def ioctl_EVIOCGPROP(fd: int, /) -> list[int]: + """get device properties""" + +def device_read(fd: int, /) -> tuple[int, int, int, int, int] | None: + """read an input event from a device""" + +def device_read_many(fd: int, /) -> tuple[tuple[int, int, int, int, int], ...]: + """read all available input events from a device""" + +def upload_effect(fd: int, effect_data: bytes, /) -> int: ... +def erase_effect(fd: int, ff_id: int, /) -> None: ... diff --git a/src/evdev/_uinput.pyi b/src/evdev/_uinput.pyi new file mode 100644 index 0000000..288b2f4 --- /dev/null +++ b/src/evdev/_uinput.pyi @@ -0,0 +1,42 @@ +"""Python bindings for parts of linux/uinput.c.""" + +from typing import Final + +maxnamelen: Final[int] + +def open(devnode: str, /) -> int: + """Open uinput device node.""" + +def setup( + fd: int, + name: str, + vendor: int, + product: int, + version: int, + bustype: int, + absinfo: list[list[int]], + max_effects: int, + /, +) -> None: + """Set an uinput device up.""" + +def create(fd: int, /) -> None: + """Create an uinput device.""" + +def close(fd: int, /) -> None: + """Destroy uinput device.""" + +def write(fd: int, type: int, code: int, value: int, /) -> None: + """Write event to uinput device.""" + +def enable(fd: int, type: int, code: int, /) -> None: + """Enable a type of event.""" + +def set_phys(fd: int, phys: str, /) -> None: + """Set physical path""" + +def get_sysname(fd: int, /) -> str: + """Obtain the sysname of the uinput device.""" + +def set_prop(fd: int, prop: int, /) -> None: + """Set device input property""" diff --git a/src/evdev/device.py b/src/evdev/device.py index a7f9b92..f07d2d9 100644 --- a/src/evdev/device.py +++ b/src/evdev/device.py @@ -1,13 +1,72 @@ +from __future__ import annotations + import contextlib import os -from typing import Dict, Generic, Iterator, List, Literal, NamedTuple, Tuple, TypeVar, Union, overload +from typing import TYPE_CHECKING, Any, Generic, Literal, NamedTuple, TypeVar, overload from . import _input, ecodes, util -try: - from .eventio_async import EvdevError, EventIO -except ImportError: - from .eventio import EvdevError, EventIO + +if TYPE_CHECKING: + from collections.abc import Generator + + from . import ff + from .eventio_async import EvdevError as EvdevError, EventIO + + _T = TypeVar('_T') + _KeyT = TypeVar('_KeyT') + _ValueT = TypeVar('_ValueT') + _AbsKeyT = TypeVar('_AbsKeyT') + _AbsValueT = TypeVar('_AbsValueT') + + class _Capabilities(dict['_KeyT | _AbsKeyT', '_ValueT | _AbsValueT']): + @overload + def __getitem__(self, key: _AbsKeyT, /) -> _AbsValueT: ... # pyright: ignore[reportNoOverloadImplementation] + @overload + def __getitem__(self, key: _KeyT, /) -> _ValueT: ... + @overload + def __getitem__(self, key: _KeyT | _AbsKeyT, /) -> _AbsValueT | _ValueT: ... + def __getitem__(self, key: Any, /) -> Any: ... + @overload # type: ignore[override] + def get(self, key: _AbsKeyT, default: None = None, /) -> _AbsValueT | None: ... # pyright: ignore[reportNoOverloadImplementation] + @overload + def get(self, key: _KeyT, default: None = None, /) -> _ValueT | None: ... + @overload + def get(self, key: _KeyT | _AbsKeyT, default: None = None, /) -> _AbsValueT | _ValueT | None: ... + @overload + def get(self, key: _AbsKeyT, default: _AbsValueT | _T, /) -> _AbsValueT | _T: ... + @overload + def get(self, key: _KeyT, default: _ValueT | _T, /) -> _ValueT | _T: ... + @overload + def get(self, key: _KeyT | _AbsKeyT, default: _AbsValueT | _ValueT | _T, /) -> _AbsValueT | _ValueT | _T: ... + def get(self, key: Any, default: Any = None, /) -> Any: ... # pyright: ignore[reportIncompatibleMethodOverride] + + class _AbsInfoCapabilities( + _Capabilities[ + ecodes._CapabilitiesKeys, + ecodes._CapabilitiesAbsKeys, + list[int], + list[tuple[int, 'AbsInfo']] + ] + ): ... + + class _VerboseAbsInfoCapabilities( + _Capabilities[ + ecodes._CapabilitiesVerboseKeys, + ecodes._CapabilitiesVerboseAbsKeys, + list[tuple[str, int]], + list[tuple[tuple[str, int], 'AbsInfo']], + ] + ): ... +else: + try: + from .eventio_async import EvdevError as EvdevError, EventIO + except ImportError: + from .eventio import EvdevError as EvdevError, EventIO + + _Capabilities = dict + _AbsInfoCapabilities = dict + _VerboseAbsInfoCapabilities = dict _AnyStr = TypeVar("_AnyStr", str, bytes) @@ -79,7 +138,7 @@ class KbdInfo(NamedTuple): repeat: int def __str__(self): - return "delay {}, repeat {}".format(self.delay, self.repeat) + return f"delay {self.delay}, repeat {self.repeat}" class DeviceInfo(NamedTuple): @@ -109,7 +168,7 @@ class InputDevice(EventIO, Generic[_AnyStr]): __slots__ = ("path", "fd", "info", "name", "phys", "uniq", "_rawcapabilities", "version", "ff_effects_count") - def __init__(self, dev: Union[_AnyStr, "os.PathLike[_AnyStr]"]): + def __init__(self, dev: _AnyStr | os.PathLike[_AnyStr]) -> None: """ Arguments --------- @@ -118,7 +177,7 @@ def __init__(self, dev: Union[_AnyStr, "os.PathLike[_AnyStr]"]): """ #: Path to input device. - self.path: _AnyStr = dev if not hasattr(dev, "__fspath__") else dev.__fspath__() + self.path: _AnyStr = dev if isinstance(dev, (str, bytes)) else dev.__fspath__() # Certain operations are possible only when the device is opened in read-write mode. try: @@ -160,8 +219,14 @@ def __del__(self) -> None: except (OSError, ImportError, AttributeError): pass - def _capabilities(self, absinfo: bool = True): - res = {} + @overload + def _capabilities(self, absinfo: Literal[True] = ...) -> _AbsInfoCapabilities: ... + @overload + def _capabilities(self, absinfo: Literal[False]) -> dict[int, list[int]]: ... + @overload + def _capabilities(self, absinfo: bool) -> _AbsInfoCapabilities | dict[int, list[int]]: ... + def _capabilities(self, absinfo: bool = True) -> _AbsInfoCapabilities | dict[int, list[int]]: + res: dict[Any, Any] = {} for etype, _ecodes in self._rawcapabilities.items(): for code in _ecodes: @@ -179,12 +244,36 @@ def _capabilities(self, absinfo: bool = True): return res @overload - def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]: - ... + def capabilities( + self, verbose: Literal[False] = ..., absinfo: Literal[True] = ... + ) -> _AbsInfoCapabilities: ... @overload - def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]: - ... - def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]: + def capabilities( + self, verbose: Literal[False], absinfo: Literal[False] + ) -> dict[int, list[int]]: ... + @overload + def capabilities( + self, verbose: Literal[True], absinfo: Literal[True] = ... + ) -> _VerboseAbsInfoCapabilities: ... + @overload + def capabilities( + self, verbose: Literal[True], absinfo: Literal[False] + ) -> dict[tuple[str, int], list[tuple[str, int]]]: ... + @overload + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> ( + _AbsInfoCapabilities + | dict[int, list[int]] + | _VerboseAbsInfoCapabilities + | dict[tuple[str, int], list[tuple[str, int]]] + ): ... + def capabilities( + self, verbose: bool = False, absinfo: bool = True + ) -> ( + _AbsInfoCapabilities + | dict[int, list[int]] + | _VerboseAbsInfoCapabilities + | dict[tuple[str, int], list[tuple[str, int]]] + ): """ Return the event types that this device supports as a mapping of supported event types to lists of handled event codes. @@ -229,7 +318,21 @@ def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dic else: return self._capabilities(absinfo) - def input_props(self, verbose: bool = False): + @overload + def input_props( + self, verbose: Literal[False] = ... + ) -> list[int]: ... + @overload + def input_props( + self, verbose: Literal[True] + ) -> list[tuple[str | tuple[str, ...], int]]: ... + @overload + def input_props( + self, verbose: bool + ) -> list[int] | list[tuple[str | tuple[str, ...], int]]: ... + def input_props( + self, verbose: bool = False + ) -> list[int] | list[tuple[str | tuple[str, ...], int]]: """ Get device properties and quirks. @@ -250,7 +353,13 @@ def input_props(self, verbose: bool = False): return props - def leds(self, verbose: bool = False): + @overload + def leds(self, verbose: Literal[False] = ...) -> list[int]: ... + @overload + def leds(self, verbose: Literal[True]) -> list[tuple[str | tuple[str, ...], int]]: ... + @overload + def leds(self, verbose: bool) -> list[int] | list[tuple[str | tuple[str, ...], int]]: ... + def leds(self, verbose: bool = False) -> list[int] | list[tuple[str | tuple[str, ...], int]]: """ Return currently set LED keys. @@ -281,7 +390,7 @@ def set_led(self, led_num: int, value: int) -> None: """ self.write(ecodes.EV_LED, led_num, value) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: """ Two devices are equal if their :data:`info` attributes are equal. """ @@ -295,7 +404,7 @@ def __repr__(self) -> str: msg = (self.__class__.__name__, self.path) return "{}({!r})".format(*msg) - def __fspath__(self): + def __fspath__(self) -> _AnyStr: return self.path def close(self) -> None: @@ -332,7 +441,7 @@ def ungrab(self) -> None: _input.ioctl_EVIOCGRAB(self.fd, 0) @contextlib.contextmanager - def grab_context(self) -> Iterator[None]: + def grab_context(self) -> Generator[None]: """ A context manager for the duration of which only the current process will be able to receive events from the device. @@ -341,7 +450,7 @@ def grab_context(self) -> Iterator[None]: yield self.ungrab() - def upload_effect(self, effect: "ff.Effect"): + def upload_effect(self, effect: ff.Effect) -> int: """ Upload a force feedback effect to a force feedback device. """ @@ -350,7 +459,7 @@ def upload_effect(self, effect: "ff.Effect"): ff_id = _input.upload_effect(self.fd, data) return ff_id - def erase_effect(self, ff_id) -> None: + def erase_effect(self, ff_id: int) -> None: """ Erase a force effect from a force feedback device. This also stops the effect. @@ -359,19 +468,33 @@ def erase_effect(self, ff_id) -> None: _input.erase_effect(self.fd, ff_id) @property - def repeat(self): + def repeat(self) -> KbdInfo: """ Get or set the keyboard repeat rate (in characters per minute) and delay (in milliseconds). """ - return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd)) + return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd)) # pylint: disable=not-an-iterable @repeat.setter - def repeat(self, value: Tuple[int, int]): - return _input.ioctl_EVIOCSREP(self.fd, *value) + def repeat(self, value: tuple[int, int]) -> None: + _input.ioctl_EVIOCSREP(self.fd, *value) - def active_keys(self, verbose: bool = False): + @overload + def active_keys( + self, verbose: Literal[False] = ... + ) -> list[int]: ... + @overload + def active_keys( + self, verbose: Literal[True] + ) -> list[tuple[str | tuple[str, ...], int]]: ... + @overload + def active_keys( + self, verbose: bool + ) -> list[int] | list[tuple[str | tuple[str, ...], int]]: ... + def active_keys( + self, verbose: bool = False + ) -> list[int] | list[tuple[str | tuple[str, ...], int]]: """ Return currently active keys. @@ -394,7 +517,7 @@ def active_keys(self, verbose: bool = False): return active_keys - def absinfo(self, axis_num: int): + def absinfo(self, axis_num: int) -> AbsInfo: """ Return current :class:`AbsInfo` for input device axis @@ -410,7 +533,16 @@ def absinfo(self, axis_num: int): """ return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num)) - def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None: + def set_absinfo( + self, + axis_num: int, + value: int | None = None, + min: int | None = None, + max: int | None = None, + fuzz: int | None = None, + flat: int | None = None, + resolution: int | None = None + ) -> None: """ Update :class:`AbsInfo` values. Only specified values will be overwritten. diff --git a/src/evdev/ecodes.py b/src/evdev/ecodes.py index fd4afc4..d4e9c86 100644 --- a/src/evdev/ecodes.py +++ b/src/evdev/ecodes.py @@ -2,4 +2,6 @@ # build time by genecodes_py.py (see build_ext in setup.py). # This stub exists to make development of evdev itself more convenient. +from __future__ import annotations + from .ecodes_runtime import * diff --git a/src/evdev/ecodes_runtime.py b/src/evdev/ecodes_runtime.py index 47f3b23..9ea2aa6 100644 --- a/src/evdev/ecodes_runtime.py +++ b/src/evdev/ecodes_runtime.py @@ -39,6 +39,8 @@ 'FF_PERIODIC' """ +from __future__ import annotations + from inspect import getmembers from . import _ecodes diff --git a/src/evdev/eventio.py b/src/evdev/eventio.py index bdb91a4..be954cf 100644 --- a/src/evdev/eventio.py +++ b/src/evdev/eventio.py @@ -1,11 +1,20 @@ +from __future__ import annotations + import fcntl import functools import os import select -from typing import Iterator, Union +from typing import TYPE_CHECKING, Concatenate, ParamSpec, TypeVar from . import _input, _uinput, ecodes -from .events import InputEvent +from .events import HasEvent, InputEvent, is_has_event + +if TYPE_CHECKING: + from functools import _Wrapped + from collections.abc import Callable, Iterator + +_P = ParamSpec("_P") +_R = TypeVar("_R") # -------------------------------------------------------------------------- @@ -28,7 +37,10 @@ class EventIO: beeps). """ - def fileno(self): + fd: int + path: str | bytes + + def fileno(self) -> int: """ Return the file descriptor to the open event device. This makes it possible to pass instances directly to :func:`select.select()` and @@ -43,10 +55,9 @@ def read_loop(self) -> Iterator[InputEvent]: while True: r, w, x = select.select([self.fd], [], []) - for event in self.read(): - yield event + yield from self.read() - def read_one(self) -> Union[InputEvent, None]: + def read_one(self) -> InputEvent | None: """ Read and return a single input event as an instance of :class:`InputEvent `. @@ -74,24 +85,24 @@ def read(self) -> Iterator[InputEvent]: yield InputEvent(*event) # pylint: disable=no-self-argument - def need_write(func): + def need_write(func: Callable[Concatenate[EventIO, _P], _R]) -> _Wrapped[Concatenate[EventIO, _P], _R, _P, _R]: """ Decorator that raises :class:`EvdevError` if there is no write access to the input device. """ @functools.wraps(func) - def wrapper(*args): - fd = args[0].fd + def wrapper(self: EventIO, /, *args: _P.args, **kwargs: _P.kwargs) -> _R: + fd = self.fd if fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_RDWR: # pylint: disable=not-callable - return func(*args) - msg = 'no write access to device "%s"' % args[0].path + return func(self, *args, **kwargs) + msg = f'no write access to device "{self.path}"' raise EvdevError(msg) return wrapper - def write_event(self, event): + def write_event(self, event: InputEvent | HasEvent) -> None: """ Inject an input event into the input subsystem. Events are queued until a synchronization event is received. @@ -109,13 +120,13 @@ def write_event(self, event): >>> ui.write_event(ev) """ - if hasattr(event, "event"): + if is_has_event(event): event = event.event self.write(event.type, event.code, event.value) @need_write - def write(self, etype: int, code: int, value: int): + def write(self, etype: int, code: int, value: int) -> None: """ Inject an input event into the input subsystem. Events are queued until a synchronization event is received. @@ -139,7 +150,7 @@ def write(self, etype: int, code: int, value: int): _uinput.write(self.fd, etype, code, value) - def syn(self): + def syn(self) -> None: """ Inject a ``SYN_REPORT`` event into the input subsystem. Events queued by :func:`write()` will be fired. If possible, events @@ -148,5 +159,5 @@ def syn(self): self.write(ecodes.EV_SYN, ecodes.SYN_REPORT, 0) - def close(self): + def close(self) -> None: pass diff --git a/src/evdev/eventio_async.py b/src/evdev/eventio_async.py index 4af1aab..622ddcc 100644 --- a/src/evdev/eventio_async.py +++ b/src/evdev/eventio_async.py @@ -1,22 +1,29 @@ +from __future__ import annotations + import asyncio import select import sys +from typing import TYPE_CHECKING from . import eventio -from .events import InputEvent # needed for compatibility -from .eventio import EvdevError +from .eventio import EvdevError as EvdevError + +if TYPE_CHECKING: + from collections.abc import Iterator + from typing_extensions import Self -if sys.version_info >= (3, 11): + from .events import InputEvent +elif sys.version_info >= (3, 11): from typing import Self else: from typing import Any as Self class ReadIterator: - def __init__(self, device): - self.current_batch = iter(()) + def __init__(self, device: EventIO) -> None: + self.current_batch: Iterator[InputEvent] = iter(()) self.device = device # Standard iterator protocol. @@ -35,8 +42,8 @@ def __next__(self) -> InputEvent: def __aiter__(self) -> Self: return self - def __anext__(self) -> "asyncio.Future[InputEvent]": - future = asyncio.Future() + def __anext__(self) -> asyncio.Future[InputEvent]: + future: asyncio.Future[InputEvent] = asyncio.Future() try: # Read from the previous batch of events. future.set_result(next(self.current_batch)) @@ -69,22 +76,22 @@ def _set_result(self, future, cb): except Exception as error: future.set_exception(error) - def async_read_one(self): + def async_read_one(self) -> asyncio.Future[InputEvent | None]: """ Asyncio coroutine to read and return a single input event as an instance of :class:`InputEvent `. """ - future = asyncio.Future() + future: asyncio.Future[InputEvent | None] = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, self.read_one)) return future - def async_read(self): + def async_read(self) -> asyncio.Future[Iterator[InputEvent]]: """ Asyncio coroutine to read multiple input events from device. Return a generator object that yields :class:`InputEvent ` instances. """ - future = asyncio.Future() + future: asyncio.Future[Iterator[InputEvent]] = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, self.read)) return future @@ -96,7 +103,7 @@ def async_read_loop(self) -> ReadIterator: """ return ReadIterator(self) - def close(self): + def close(self) -> None: try: loop = asyncio.get_event_loop() loop.remove_reader(self.fileno()) diff --git a/src/evdev/events.py b/src/evdev/events.py index 922bfe6..f5c26b8 100644 --- a/src/evdev/events.py +++ b/src/evdev/events.py @@ -34,20 +34,25 @@ KeyEvent(InputEvent(1337197425L, 477835L, 1, 28, 0L)) """ +from __future__ import annotations + # event type descriptions have been taken mot-a-mot from: # http://www.kernel.org/doc/Documentation/input/event-codes.txt # pylint: disable=no-name-in-module -from typing import Final +from typing import TYPE_CHECKING, Final, Protocol from .ecodes import ABS, EV_ABS, EV_KEY, EV_REL, EV_SYN, KEY, REL, SYN, keys +if TYPE_CHECKING: + from typing_extensions import TypeIs + class InputEvent: """A generic input event.""" __slots__ = "sec", "usec", "type", "code", "value" - def __init__(self, sec, usec, type, code, value): + def __init__(self, sec: int, usec: int, type: int, code: int, value: int) -> None: #: Time in seconds since epoch at which event occurred. self.sec: int = sec @@ -67,15 +72,23 @@ def timestamp(self) -> float: """Return event timestamp as a float.""" return self.sec + (self.usec / 1000000.0) - def __str__(self): + def __str__(self) -> str: msg = "event at {:f}, code {:02d}, type {:02d}, val {:02d}" return msg.format(self.timestamp(), self.code, self.type, self.value) - def __repr__(self): + def __repr__(self) -> str: msg = "{}({!r}, {!r}, {!r}, {!r}, {!r})" return msg.format(self.__class__.__name__, self.sec, self.usec, self.type, self.code, self.value) +class HasEvent(Protocol): + event: InputEvent + + +def is_has_event(obj: object) -> TypeIs[HasEvent]: + return hasattr(obj, "event") + + class KeyEvent: """An event generated by a keyboard, button or other key-like devices.""" @@ -85,7 +98,7 @@ class KeyEvent: __slots__ = "scancode", "keycode", "keystate", "event" - def __init__(self, event: InputEvent, allow_unknown: bool = False): + def __init__(self, event: InputEvent, allow_unknown: bool = False) -> None: """ The ``allow_unknown`` argument determines what to do in the event of an event code for which a key code cannot be found. If ``False`` a ``KeyError`` will be raised. @@ -105,14 +118,14 @@ def __init__(self, event: InputEvent, allow_unknown: bool = False): self.keycode = keys[event.code] except KeyError: if allow_unknown: - self.keycode = "0x{:02X}".format(event.code) + self.keycode = f"0x{event.code:02X}" else: raise #: Reference to an :class:`InputEvent` instance. self.event: InputEvent = event - def __str__(self): + def __str__(self) -> str: try: ks = ("up", "down", "hold")[self.keystate] except IndexError: @@ -121,8 +134,8 @@ def __str__(self): msg = "key event at {:f}, {} ({}), {}" return msg.format(self.event.timestamp(), self.scancode, self.keycode, ks) - def __repr__(self): - return "{}({!r})".format(self.__class__.__name__, self.event) + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.event!r})" class RelEvent: @@ -130,16 +143,16 @@ class RelEvent: __slots__ = "event" - def __init__(self, event: InputEvent): + def __init__(self, event: InputEvent) -> None: #: Reference to an :class:`InputEvent` instance. self.event: InputEvent = event - def __str__(self): + def __str__(self) -> str: msg = "relative axis event at {:f}, {}" return msg.format(self.event.timestamp(), REL[self.event.code]) - def __repr__(self): - return "{}({!r})".format(self.__class__.__name__, self.event) + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.event!r})" class AbsEvent: @@ -147,16 +160,16 @@ class AbsEvent: __slots__ = "event" - def __init__(self, event: InputEvent): + def __init__(self, event: InputEvent) -> None: #: Reference to an :class:`InputEvent` instance. self.event: InputEvent = event - def __str__(self): + def __str__(self) -> str: msg = "absolute axis event at {:f}, {}" return msg.format(self.event.timestamp(), ABS[self.event.code]) - def __repr__(self): - return "{}({!r})".format(self.__class__.__name__, self.event) + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.event!r})" class SynEvent: @@ -167,21 +180,21 @@ class SynEvent: __slots__ = "event" - def __init__(self, event: InputEvent): + def __init__(self, event: InputEvent) -> None: #: Reference to an :class:`InputEvent` instance. self.event: InputEvent = event - def __str__(self): + def __str__(self) -> str: msg = "synchronization event at {:f}, {}" return msg.format(self.event.timestamp(), SYN[self.event.code]) - def __repr__(self): - return "{}({!r})".format(self.__class__.__name__, self.event) + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.event!r})" #: A mapping of event types to :class:`InputEvent` sub-classes. Used #: by :func:`evdev.util.categorize()` -event_factory = { +event_factory: dict[int, type[KeyEvent | RelEvent | AbsEvent | SynEvent]] = { EV_KEY: KeyEvent, EV_REL: RelEvent, EV_ABS: AbsEvent, diff --git a/src/evdev/evtest.py b/src/evdev/evtest.py index 6ea3bb5..d4539d1 100644 --- a/src/evdev/evtest.py +++ b/src/evdev/evtest.py @@ -16,6 +16,8 @@ evtest /dev/input/event0 /dev/input/event1 """ +from __future__ import annotations + import atexit import optparse import re @@ -113,21 +115,21 @@ def print_capabilities(device): capabilities = device.capabilities(verbose=True) input_props = device.input_props(verbose=True) - print("Device name: {.name}".format(device)) - print("Device info: {.info}".format(device)) - print("Repeat settings: {}\n".format(device.repeat)) + print(f"Device name: {device.name}") + print(f"Device info: {device.info}") + print(f"Repeat settings: {device.repeat}\n") if ("EV_LED", ecodes.EV_LED) in capabilities: leds = ",".join(i[0] for i in device.leds(True)) - print("Active LEDs: %s" % leds) + print(f"Active LEDs: {leds}") active_keys = ",".join(k[0] for k in device.active_keys(True)) - print("Active keys: %s\n" % active_keys) + print(f"Active keys: {active_keys}\n") if input_props: print("Input properties:") for type, code in input_props: - print(" %s %s" % (type, code)) + print(f" {type} {code}") print() print("Device capabilities:") @@ -137,11 +139,11 @@ def print_capabilities(device): # code <- ('BTN_RIGHT', 273) or (['BTN_LEFT', 'BTN_MOUSE'], 272) if isinstance(code[1], AbsInfo): print(" Code {:<4} {}:".format(*code[0])) - print(" {}".format(code[1])) + print(f" {code[1]}") else: # Multiple names may resolve to one value. s = ", ".join(code[0]) if isinstance(code[0], list) else code[0] - print(" Code {:<4} {}".format(s, code[1])) + print(f" Code {s:<4} {code[1]}") print("") diff --git a/src/evdev/ff.py b/src/evdev/ff.py index 260c362..ba3c105 100644 --- a/src/evdev/ff.py +++ b/src/evdev/ff.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import ctypes from . import ecodes @@ -16,6 +18,9 @@ class Replay(ctypes.Structure): @delay: delay before effect should start playing """ + length: int + delay: int + _fields_ = [ ("length", _u16), ("delay", _u16), @@ -29,6 +34,9 @@ class Trigger(ctypes.Structure): @interval: controls how soon the effect can be re-triggered """ + button: int + interval: int + _fields_ = [ ("button", _u16), ("interval", _u16), @@ -49,6 +57,11 @@ class Envelope(ctypes.Structure): Valid range for the attack and fade levels is 0x0000 - 0x7fff """ + attack_length: int + attack_level: int + fade_length: int + fade_level: int + _fields_ = [ ("attack_length", _u16), ("attack_level", _u16), @@ -64,6 +77,9 @@ class Constant(ctypes.Structure): @envelope: envelope data """ + level: int + ff_envelope: Envelope + _fields_ = [ ("level", _s16), ("ff_envelope", Envelope), @@ -78,6 +94,10 @@ class Ramp(ctypes.Structure): @envelope: envelope data """ + start_level: int + end_level: int + ff_envelope: Envelope + _fields_ = [ ("start_level", _s16), ("end_level", _s16), @@ -96,6 +116,13 @@ class Condition(ctypes.Structure): @center: position of the dead zone """ + right_saturation: int + left_saturation: int + right_coeff: int + left_coeff: int + deadband: int + center: int + _fields_ = [ ("right_saturation", _u16), ("left_saturation", _u16), @@ -119,6 +146,15 @@ class Periodic(ctypes.Structure): @custom_data: buffer of samples (FF_CUSTOM only) """ + waveform: int + period: int + magnitude: int + offset: int + phase: int + envelope: Envelope + custom_len: int + custom_data: ctypes._Pointer[ctypes.c_int16] + _fields_ = [ ("waveform", _u16), ("period", _u16), @@ -141,6 +177,9 @@ class Rumble(ctypes.Structure): represents the magnitude of the vibration generated by the heavy one. """ + strong_magnitude: int + weak_magnitude: int + _fields_ = [ ("strong_magnitude", _u16), ("weak_magnitude", _u16), @@ -148,6 +187,12 @@ class Rumble(ctypes.Structure): class EffectType(ctypes.Union): + ff_constant_effect: Constant + ff_ramp_effect: Ramp + ff_periodic_effect: Periodic + ff_condition_effect: ctypes.Array[Condition] + ff_rumble_effect: Rumble + _fields_ = [ ("ff_constant_effect", Constant), ("ff_ramp_effect", Ramp), @@ -158,6 +203,13 @@ class EffectType(ctypes.Union): class Effect(ctypes.Structure): + type: int + id: int + direction: int + ff_trigger: Trigger + ff_replay: Replay + u: EffectType + _fields_ = [ ("type", _u16), ("id", _s16), @@ -169,6 +221,11 @@ class Effect(ctypes.Structure): class UInputUpload(ctypes.Structure): + request_id: int + retval: int + effect: Effect + old: Effect + _fields_ = [ ("request_id", _u32), ("retval", _s32), @@ -178,6 +235,10 @@ class UInputUpload(ctypes.Structure): class UInputErase(ctypes.Structure): + request_id: int + retval: int + effect_id: int + _fields_ = [ ("request_id", _u32), ("retval", _s32), diff --git a/src/evdev/genecodes_c.py b/src/evdev/genecodes_c.py index 15a6693..96de524 100644 --- a/src/evdev/genecodes_c.py +++ b/src/evdev/genecodes_c.py @@ -2,6 +2,8 @@ Generate a Python extension module with the constants defined in linux/input.h. """ +from __future__ import annotations + import getopt import os import re @@ -92,25 +94,7 @@ # pylint: skip-file -ecodes: dict[str, int] -keys: dict[int, str|list[str]] -bytype: dict[int, dict[int, str|list[str]]] - -KEY: dict[int, str|list[str]] -ABS: dict[int, str|list[str]] -REL: dict[int, str|list[str]] -SW: dict[int, str|list[str]] -MSC: dict[int, str|list[str]] -LED: dict[int, str|list[str]] -BTN: dict[int, str|list[str]] -REP: dict[int, str|list[str]] -SND: dict[int, str|list[str]] -ID: dict[int, str|list[str]] -EV: dict[int, str|list[str]] -BUS: dict[int, str|list[str]] -SYN: dict[int, str|list[str]] -FF_STATUS: dict[int, str|list[str]] -FF_INPUT_PROP: dict[int, str|list[str]] +from typing import Final %s """ @@ -120,7 +104,7 @@ def parse_headers(headers=headers): for header in headers: try: fh = open(header) - except (IOError, OSError): + except OSError: continue for line in fh: @@ -131,15 +115,15 @@ def parse_headers(headers=headers): all_macros = list(parse_headers()) if not all_macros: - print("no input macros found in: %s" % " ".join(headers), file=sys.stderr) + print(f"no input macros found in: {' '.join(headers)}", file=sys.stderr) sys.exit(1) # pylint: disable=possibly-used-before-assignment, used-before-assignment if ("--ecodes", "") in opts: - body = (" PyModule_AddIntMacro(m, %s);" % macro for macro in all_macros) + body = (f" PyModule_AddIntMacro(m, {macro});" for macro in all_macros) template = template_ecodes elif ("--stubs", "") in opts: - body = ("%s: int" % macro for macro in all_macros) + body = (f"{macro}: Final[int]" for macro in all_macros) template = template_stubs body = os.linesep.join(body) diff --git a/src/evdev/genecodes_py.py b/src/evdev/genecodes_py.py index f00020c..e1fab20 100644 --- a/src/evdev/genecodes_py.py +++ b/src/evdev/genecodes_py.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import sys from unittest import mock from pprint import PrettyPrinter @@ -15,7 +17,9 @@ print('"""') print() -print("from typing import Final, Dict, Tuple, Union") +print("from __future__ import annotations") +print() +print("from typing import Final, Literal, TypeAlias") print() for name, value in ecodes.ecodes.items(): @@ -23,26 +27,26 @@ print() entries = [ - ("ecodes", "Dict[str, int]", "#: Mapping of names to values."), - ("bytype", "Dict[int, Dict[int, Union[str, Tuple[str]]]]", "#: Mapping of event types to other value/name mappings."), - ("keys", "Dict[int, Union[str, Tuple[str]]]", "#: Keys are a combination of all BTN and KEY codes."), - ("KEY", "Dict[int, Union[str, Tuple[str]]]", None), - ("ABS", "Dict[int, Union[str, Tuple[str]]]", None), - ("REL", "Dict[int, Union[str, Tuple[str]]]", None), - ("SW", "Dict[int, Union[str, Tuple[str]]]", None), - ("MSC", "Dict[int, Union[str, Tuple[str]]]", None), - ("LED", "Dict[int, Union[str, Tuple[str]]]", None), - ("BTN", "Dict[int, Union[str, Tuple[str]]]", None), - ("REP", "Dict[int, Union[str, Tuple[str]]]", None), - ("SND", "Dict[int, Union[str, Tuple[str]]]", None), - ("ID", "Dict[int, Union[str, Tuple[str]]]", None), - ("EV", "Dict[int, Union[str, Tuple[str]]]", None), - ("BUS", "Dict[int, Union[str, Tuple[str]]]", None), - ("SYN", "Dict[int, Union[str, Tuple[str]]]", None), - ("FF", "Dict[int, Union[str, Tuple[str]]]", None), - ("UI_FF", "Dict[int, Union[str, Tuple[str]]]", None), - ("FF_STATUS", "Dict[int, Union[str, Tuple[str]]]", None), - ("INPUT_PROP", "Dict[int, Union[str, Tuple[str]]]", None) + ("ecodes", "dict[str, int]", "#: Mapping of names to values."), + ("bytype", "dict[int, dict[int, str | tuple[str, ...]]]", "#: Mapping of event types to other value/name mappings."), + ("keys", "dict[int, str | tuple[str, ...]]", "#: Keys are a combination of all BTN and KEY codes."), + ("KEY", "dict[int, str | tuple[str, ...]]", None), + ("ABS", "dict[int, str | tuple[str, ...]]", None), + ("REL", "dict[int, str | tuple[str, ...]]", None), + ("SW", "dict[int, str | tuple[str, ...]]", None), + ("MSC", "dict[int, str | tuple[str, ...]]", None), + ("LED", "dict[int, str | tuple[str, ...]]", None), + ("BTN", "dict[int, str | tuple[str, ...]]", None), + ("REP", "dict[int, str | tuple[str, ...]]", None), + ("SND", "dict[int, str | tuple[str, ...]]", None), + ("ID", "dict[int, str | tuple[str, ...]]", None), + ("EV", "dict[int, str]", None), + ("BUS", "dict[int, str | tuple[str, ...]]", None), + ("SYN", "dict[int, str | tuple[str, ...]]", None), + ("FF", "dict[int, str | tuple[str, ...]]", None), + ("UI_FF", "dict[int, str | tuple[str, ...]]", None), + ("FF_STATUS", "dict[int, str | tuple[str, ...]]", None), + ("INPUT_PROP", "dict[int, str | tuple[str, ...]]", None) ] for key, annotation, doc in entries: @@ -51,4 +55,21 @@ print(f"{key}: {annotation} = ", end="") pprint(getattr(ecodes, key)) - print() \ No newline at end of file + print() + +caps_pairs: list[tuple[int, str]] = sorted( + (code, name) for code, name in ecodes.EV.items() if name != "EV_ABS" and code < ecodes.EV_MAX +) + +print(f"_CapabilitiesKeys: TypeAlias = Literal[{', '.join(str(code) for code, _ in caps_pairs)}]") +print(f"_CapabilitiesAbsKeys: TypeAlias = Literal[{ecodes.EV_ABS}]") +print("_CapabilitiesVerboseKeys: TypeAlias = (") + +tuple_emitted = False +for code, name in caps_pairs: + prefix = "" if not tuple_emitted else "| " + print(f' {prefix}tuple[Literal["{name}"], Literal[{code}]]') + tuple_emitted = True + +print(")") +print(f'_CapabilitiesVerboseAbsKeys: TypeAlias = tuple[Literal["EV_ABS"], Literal[{ecodes.EV_ABS}]]') diff --git a/src/evdev/uinput.py b/src/evdev/uinput.py index 2c69c2b..b137d80 100644 --- a/src/evdev/uinput.py +++ b/src/evdev/uinput.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import ctypes import os import platform @@ -5,16 +7,37 @@ import stat import time from collections import defaultdict -from typing import Union, Tuple, Dict, Sequence, Optional +from typing import TYPE_CHECKING, Literal, overload from . import _uinput, ecodes, ff, util from .device import InputDevice, AbsInfo -from .events import InputEvent - -try: - from evdev.eventio_async import EventIO -except ImportError: - from evdev.eventio import EventIO +from .events import InputEvent as InputEvent + +if TYPE_CHECKING: + from _typeshed import StrOrBytesPath + from collections.abc import Iterable, Mapping, Sequence + from typing_extensions import Self, TypedDict, Unpack + + from .device import _AbsInfoCapabilities, _VerboseAbsInfoCapabilities + from .eventio_async import EventIO + + class _FromDeviceKwargs(TypedDict, total=False): + name: str + vendor: int + product: int + version: int + bustype: int + devnode: str + phys: str + input_props: Iterable[int] | None + max_effects: int +else: + try: + from evdev.eventio_async import EventIO + except ImportError: + from evdev.eventio import EventIO + + from typing import Any as Self class UInputError(Exception): @@ -42,10 +65,10 @@ class UInput(EventIO): @classmethod def from_device( cls, - *devices: Union[InputDevice, Union[str, bytes, os.PathLike]], - filtered_types: Tuple[int] = (ecodes.EV_SYN, ecodes.EV_FF), - **kwargs, - ): + *devices: InputDevice[str] | InputDevice[bytes] | StrOrBytesPath, + filtered_types: Iterable[int] = (ecodes.EV_SYN, ecodes.EV_FF), + **kwargs: Unpack[_FromDeviceKwargs], + ) -> Self: """ Create an UInput device with the capabilities of one or more input devices. @@ -55,20 +78,20 @@ def from_device( devices : InputDevice|str Varargs of InputDevice instances or paths to input devices. - filtered_types : Tuple[event type codes] + filtered_types : tuple[event type codes] Event types to exclude from the capabilities of the uinput device. **kwargs Keyword arguments to UInput constructor (i.e. name, vendor etc.). """ - device_instances = [] + device_instances: list[InputDevice[str] | InputDevice[bytes]] = [] for dev in devices: if not isinstance(dev, InputDevice): dev = InputDevice(str(dev)) device_instances.append(dev) - all_capabilities = defaultdict(set) + all_capabilities: dict[int, set[int | tuple[int, AbsInfo]]] = defaultdict(set) if "max_effects" not in kwargs: kwargs["max_effects"] = min([dev.ff_effects_count for dev in device_instances]) @@ -86,7 +109,7 @@ def from_device( def __init__( self, - events: Optional[Dict[int, Sequence[int]]] = None, + events: Mapping[int, Iterable[int | tuple[int, AbsInfo | Sequence[int]]]] | None = None, name: str = "py-evdev-uinput", vendor: int = 0x1, product: int = 0x1, @@ -94,12 +117,12 @@ def __init__( bustype: int = 0x3, devnode: str = "/dev/uinput", phys: str = "py-evdev-uinput", - input_props=None, + input_props: Iterable[int] | None = None, # CentOS 7 has sufficiently old headers that FF_MAX_EFFECTS is not defined there, # which causes the whole module to fail loading. Fallback on a hardcoded value of # FF_MAX_EFFECTS if it is not defined in the ecodes. - max_effects=ecodes.ecodes.get("FF_MAX_EFFECTS", 96), - ): + max_effects: int = ecodes.ecodes.get("FF_MAX_EFFECTS", 96), + ) -> None: """ Arguments --------- @@ -180,18 +203,41 @@ def __init__( #: An :class:`InputDevice ` instance #: for the fake input device. ``None`` if the device cannot be #: opened for reading and writing. - self.device: InputDevice = self._find_device(self.fd) + self.device: InputDevice[str] | None = self._find_device(self.fd) - def _prepare_events(self, events): + def _prepare_events( + self, + events: Mapping[ + int, + Iterable[ + int + | tuple[int, AbsInfo | Sequence[int]] + | list[int | AbsInfo | Sequence[int]] + ] + ], + ) -> tuple[ + list[list[int]], list[tuple[int, int]] + ]: """Prepare events for passing to _uinput.enable and _uinput.setup""" - absinfo, prepared_events = [], [] + absinfo: list[list[int]] = [] + prepared_events: list[tuple[int, int]] = [] for etype, codes in events.items(): for code in codes: # Handle max, min, fuzz, flat. + # NOTE: AbsInfo is wrong here: AbsInfo[0] is the latest reported value rather + # than an event code. The type has been changed to disallow passing an AbsInfo + # directly in the iterable object, but this instance check was kept for + # backwards compatibility. if isinstance(code, (tuple, list, AbsInfo)): # Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0). - f = [code[0]] - f.extend(code[1]) + + # list doesn't allow us to type each index individually, so the following + # helps the type checker out and is a no-op at runtime + if TYPE_CHECKING: + assert isinstance(code[0], int) + assert isinstance(code[1], Sequence) + + f = [code[0], *code[1]] # Ensure the tuple is always 6 ints long, since uinput.c:uinput_create # does little in the way of checking the length. f.extend([0] * (6 - len(code[1]))) @@ -200,19 +246,19 @@ def _prepare_events(self, events): prepared_events.append((etype, code)) return absinfo, prepared_events - def __enter__(self): + def __enter__(self) -> Self: return self - def __exit__(self, type, value, tb): + def __exit__(self, type: object, value: object, tb: object) -> None: if hasattr(self, "fd"): self.close() - def __repr__(self): + def __repr__(self) -> str: # TODO: v = (repr(getattr(self, i)) for i in ("name", "bustype", "vendor", "product", "version", "phys")) return "{}({})".format(self.__class__.__name__, ", ".join(v)) - def __str__(self): + def __str__(self) -> str: msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\nevent types: {}' evtypes = [i[0] for i in self.capabilities(True).keys()] @@ -222,7 +268,7 @@ def __str__(self): return msg - def close(self): + def close(self) -> None: # Close the associated InputDevice, if it was previously opened. if self.device is not None: self.device.close() @@ -232,14 +278,34 @@ def close(self): _uinput.close(self.fd) self.fd = -1 - def capabilities(self, verbose: bool = False, absinfo: bool = True): + @overload + def capabilities(self, verbose: Literal[False] = False, absinfo: Literal[True] = True) -> _AbsInfoCapabilities: ... + @overload + def capabilities(self, verbose: Literal[False], absinfo: Literal[False]) -> dict[int, list[int]]: ... + @overload + def capabilities(self, verbose: Literal[True], absinfo: Literal[True] = True) -> _VerboseAbsInfoCapabilities: ... + @overload + def capabilities(self, verbose: Literal[True], absinfo: Literal[False]) -> dict[tuple[str, int], list[tuple[str, int]]]: ... + @overload + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> ( + _AbsInfoCapabilities + | dict[int, list[int]] + | _VerboseAbsInfoCapabilities + | dict[tuple[str, int], list[tuple[str, int]]] + ): ... + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> ( + _AbsInfoCapabilities + | dict[int, list[int]] + | _VerboseAbsInfoCapabilities + | dict[tuple[str, int], list[tuple[str, int]]] + ): """See :func:`capabilities `.""" if self.device is None: raise UInputError("input device not opened - cannot read capabilities") return self.device.capabilities(verbose, absinfo) - def begin_upload(self, effect_id): + def begin_upload(self, effect_id: int) -> ff.UInputUpload: upload = ff.UInputUpload() upload.effect_id = effect_id @@ -249,12 +315,12 @@ def begin_upload(self, effect_id): return upload - def end_upload(self, upload): + def end_upload(self, upload: ff.UInputUpload) -> None: ret = self.dll._uinput_end_upload(self.fd, ctypes.byref(upload)) if ret: raise UInputError("Failed to end uinput upload: " + os.strerror(ret)) - def begin_erase(self, effect_id): + def begin_erase(self, effect_id: int) -> ff.UInputErase: erase = ff.UInputErase() erase.effect_id = effect_id @@ -263,12 +329,12 @@ def begin_erase(self, effect_id): raise UInputError("Failed to begin uinput erase: " + os.strerror(ret)) return erase - def end_erase(self, erase): + def end_erase(self, erase: ff.UInputErase) -> None: ret = self.dll._uinput_end_erase(self.fd, ctypes.byref(erase)) if ret: raise UInputError("Failed to end uinput erase: " + os.strerror(ret)) - def _verify(self): + def _verify(self) -> None: """ Verify that an uinput device exists and is readable and writable by the current process. @@ -288,7 +354,7 @@ def _verify(self): msg = "uinput device name must not be longer than {} characters" raise UInputError(msg.format(_uinput.maxnamelen)) - def _find_device(self, fd: int) -> InputDevice: + def _find_device(self, fd: int) -> InputDevice[str] | None: """ Tries to find the device node. Will delegate this task to one of several platform-specific functions. @@ -306,7 +372,7 @@ def _find_device(self, fd: int) -> InputDevice: # use the generic fallback method. return self._find_device_fallback() - def _find_device_linux(self, sysname: str) -> InputDevice: + def _find_device_linux(self, sysname: str) -> InputDevice[str]: """ Tries to find the device node when running on Linux. """ @@ -342,7 +408,7 @@ def _find_device_linux(self, sysname: str) -> InputDevice: # shall be the exception that this function raises. return InputDevice(device_path) - def _find_device_fallback(self) -> Union[InputDevice, None]: + def _find_device_fallback(self) -> InputDevice[str] | None: """ Tries to find the device node when UI_GET_SYSNAME is not available or we're running on a system sufficiently exotic that we do not know how @@ -356,7 +422,7 @@ def _find_device_fallback(self) -> Union[InputDevice, None]: # Strictly speaking, we cannot be certain that everything returned by list_devices() # ends at event[0-9]+: it might return something like "/dev/input/events_all". Find # the devices that have the expected structure and extract their device number. - path_number_pairs = [] + path_number_pairs: list[tuple[str, int]] = [] regex = re.compile("/dev/input/event([0-9]+)") for path in util.list_devices("/dev/input/"): regex_match = regex.fullmatch(path) diff --git a/src/evdev/util.py b/src/evdev/util.py index db89a22..7bd5c0b 100644 --- a/src/evdev/util.py +++ b/src/evdev/util.py @@ -1,22 +1,30 @@ +from __future__ import annotations + import collections import glob import os import re import stat -from typing import Union, List +from typing import TYPE_CHECKING, overload from . import ecodes from .events import InputEvent, event_factory, KeyEvent, RelEvent, AbsEvent, SynEvent +if TYPE_CHECKING: + from _typeshed import StrOrBytesPath + from collections.abc import Iterator, Mapping, Sequence + + from .device import AbsInfo + -def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]: +def list_devices(input_device_dir: str | os.PathLike[str] = "/dev/input") -> list[str]: """List readable character devices in ``input_device_dir``.""" - fns = glob.glob("{}/event*".format(input_device_dir)) + fns = glob.glob(f"{input_device_dir}/event*") return list(filter(is_device, fns)) -def is_device(fn: Union[str, bytes, os.PathLike]) -> bool: +def is_device(fn: StrOrBytesPath) -> bool: """Check if ``fn`` is a readable and writable character device.""" if not os.path.exists(fn): @@ -32,7 +40,7 @@ def is_device(fn: Union[str, bytes, os.PathLike]) -> bool: return True -def categorize(event: InputEvent) -> Union[InputEvent, KeyEvent, RelEvent, AbsEvent, SynEvent]: +def categorize(event: InputEvent) -> InputEvent | KeyEvent | RelEvent | AbsEvent | SynEvent: """ Categorize an event according to its type. @@ -47,7 +55,15 @@ def categorize(event: InputEvent) -> Union[InputEvent, KeyEvent, RelEvent, AbsEv return event -def resolve_ecodes_dict(typecodemap, unknown="?"): +def resolve_ecodes_dict( + typecodemap: Mapping[int, Sequence[int | tuple[int, AbsInfo]]], + unknown: str = "?", +) -> Iterator[ + tuple[ + tuple[str | tuple[str, ...], int], + list[tuple[str | tuple[str, ...], int] | tuple[tuple[str | tuple[str, ...], int], AbsInfo]], + ] +]: """ Resolve event codes and types to their verbose names. @@ -82,7 +98,33 @@ def resolve_ecodes_dict(typecodemap, unknown="?"): yield (type_name, etype), resolved -def resolve_ecodes(ecode_dict, ecode_list, unknown="?"): +@overload +def resolve_ecodes( # pyright: ignore[reportOverlappingOverload] + ecode_dict: Mapping[int, str | tuple[str, ...]], + ecode_list: Sequence[int], + unknown: str = ..., +) -> list[tuple[str | tuple[str, ...], int]]: ... +@overload +def resolve_ecodes( + ecode_dict: Mapping[int, str | tuple[str, ...]], + ecode_list: Sequence[tuple[int, AbsInfo]], + unknown: str = ..., +) -> list[tuple[tuple[str | tuple[str, ...], int], AbsInfo]]: ... +@overload +def resolve_ecodes( + ecode_dict: Mapping[int, str | tuple[str, ...]], + ecode_list: Sequence[int | tuple[int, AbsInfo]], + unknown: str = ..., +) -> list[tuple[str | tuple[str, ...], int] | tuple[tuple[str | tuple[str, ...], int], AbsInfo]]: ... +def resolve_ecodes( + ecode_dict: Mapping[int, str | tuple[str, ...]], + ecode_list: Sequence[int | tuple[int, AbsInfo]], + unknown: str = "?", +) -> ( + list[tuple[str | tuple[str, ...], int]] + | list[tuple[tuple[str | tuple[str, ...], int], AbsInfo]] + | list[tuple[str | tuple[str, ...], int] | tuple[tuple[str | tuple[str, ...], int], AbsInfo]] +): """ Resolve event codes and types to their verbose names. @@ -91,27 +133,29 @@ def resolve_ecodes(ecode_dict, ecode_list, unknown="?"): >>> resolve_ecodes(ecodes.BTN, [272, 273, 274]) [(['BTN_LEFT', 'BTN_MOUSE'], 272), ('BTN_RIGHT', 273), ('BTN_MIDDLE', 274)] """ - res = [] + res: list[ + tuple[str | tuple[str, ...], int] + | tuple[tuple[str | tuple[str, ...], int], AbsInfo] + ] = [] for ecode in ecode_list: # elements with AbsInfo(), eg { 3 : [(0, AbsInfo(...)), (1, AbsInfo(...))] } if isinstance(ecode, tuple): if ecode[0] in ecode_dict: - l = ((ecode_dict[ecode[0]], ecode[0]), ecode[1]) + res.append(((ecode_dict[ecode[0]], ecode[0]), ecode[1])) else: - l = ((unknown, ecode[0]), ecode[1]) + res.append(((unknown, ecode[0]), ecode[1])) # just ecodes, e.g: { 0 : [0, 1, 3], 1 : [30, 48] } else: if ecode in ecode_dict: - l = (ecode_dict[ecode], ecode) + res.append((ecode_dict[ecode], ecode)) else: - l = (unknown, ecode) - res.append(l) + res.append((unknown, ecode)) return res -def find_ecodes_by_regex(regex): +def find_ecodes_by_regex(regex: str | re.Pattern[str]) -> dict[int, list[int]]: """ Find ecodes matching a regex and return a mapping of event type to event codes. @@ -130,7 +174,7 @@ def find_ecodes_by_regex(regex): """ regex = re.compile(regex) # re.compile is idempotent - result = collections.defaultdict(list) + result: dict[int, list[int]] = collections.defaultdict(list) for type_code, codes in ecodes.bytype.items(): for code, names in codes.items():