diff options
author | Thomas Touhey <thomas@touhey.fr> | 2021-12-19 01:10:37 +0100 |
---|---|---|
committer | Thomas Touhey <thomas@touhey.fr> | 2021-12-25 15:38:48 +0100 |
commit | f837de9e6972c91b01ec5a63e3cb53c78c6c2787 (patch) | |
tree | 53f1ec0c303d2df89168f09f11ee1777cf152cef | |
parent | ddcc406b64a3284005cecd34dfb93976fa168447 (diff) |
Used flake8, released version 0.3.0.3
-rwxr-xr-x | Makefile | 23 | ||||
-rw-r--r-- | Pipfile | 16 | ||||
-rw-r--r-- | Pipfile.lock | 116 | ||||
-rw-r--r-- | docs/conf.py | 49 | ||||
-rwxr-xr-x | fingerd/__init__.py | 52 | ||||
-rwxr-xr-x | fingerd/__main__.py | 9 | ||||
-rwxr-xr-x[-rw-r--r--] | fingerd/cli.py | 112 | ||||
-rwxr-xr-x | fingerd/core.py | 2351 | ||||
-rwxr-xr-x | fingerd/errors.py | 62 | ||||
-rwxr-xr-x | fingerd/fiction.py | 2101 | ||||
-rwxr-xr-x[-rw-r--r--] | fingerd/native.py | 23 | ||||
-rwxr-xr-x[-rw-r--r--] | fingerd/posix.py | 282 | ||||
-rwxr-xr-x[-rw-r--r--] | fingerd/version.py | 6 | ||||
-rw-r--r-- | requirements.txt | 6 | ||||
-rwxr-xr-x | scripts/fingerd | 2 | ||||
-rw-r--r-- | setup.cfg | 16 | ||||
-rwxr-xr-x | setup.py | 21 | ||||
-rwxr-xr-x | tests/__init__.py | 4 | ||||
-rw-r--r-- | tests/test_core.py | 18 | ||||
-rw-r--r-- | tests/test_fingerd.py | 85 |
20 files changed, 2789 insertions, 2565 deletions
@@ -1,23 +1,16 @@ #!/usr/bin/make -f - PE := pipenv run - ST := $(PE) ./setup.py - DNAME := dist/$(shell PIPENV_DONT_LOAD_ENV=1 $(ST) --name)-$(shell PIPENV_DONT_LOAD_ENV=1 $(ST) --version).tar.gz + DNAME := dist/$(shell ./setup.py --name)-$(shell ./setup.py --version).tar.gz test tests: - @$(PE) pytest -q --capture=fd - -prepare: - @pipenv install --dev -update: - @pipenv update --dev + @pytest -q --capture=fd docs: - @$(ST) build_sphinx + @./setup.py build_sphinx checkdocs: - @$(ST) checkdocs + @./setup.py checkdocs run: - @pipenv run python -m fingerd -b 'localhost:3999' + @python -m fingerd -b 'localhost:3999' redirect-localhost-ports: iptables -t nat -A OUTPUT -p tcp -s 127.0.0.1 -d 127.0.0.1 \ --dport 79 -j DNAT --to 127.0.0.1:3999 @@ -26,14 +19,14 @@ redirect-localhost-ports: dist: $(DNAME) $(DNAME): - $(ST) sdist + ./setup.py sdist upload: $(DNAME) @twine upload $(DNAME) install: - $(ST) install + ./setup.py install install-user: - $(ST) install --user + ./setup.py install --user .PHONY: test tests prepare update docs checkdocs .PHONY: run redirect-localhost-ports diff --git a/Pipfile b/Pipfile deleted file mode 100644 index e123388..0000000 --- a/Pipfile +++ /dev/null @@ -1,16 +0,0 @@ -[[source]] -url = 'https://pypi.python.org/simple' -verify_ssl = true -name = 'pypi' - -[requires] -python_version = '3.9' - -[packages] -toml = '*' -click = '*' -pytz = '*' -pyutmpx = '*' - -[dev-packages] -pytest = '*' diff --git a/Pipfile.lock b/Pipfile.lock deleted file mode 100644 index 63f0a11..0000000 --- a/Pipfile.lock +++ /dev/null @@ -1,116 +0,0 @@ -{ - "_meta": { - "hash": { - "sha256": "1286d0a3089864737e05764e507206586ba18e19887463c65938cdf0e23ba23d" - }, - "pipfile-spec": 6, - "requires": { - "python_version": "3.9" - }, - "sources": [ - { - "name": "pypi", - "url": "https://pypi.python.org/simple", - "verify_ssl": true - } - ] - }, - "default": { - "click": { - "hashes": [ - "sha256:8c04c11192119b1ef78ea049e0a6f0463e4c48ef00a30160c704337586f3ad7a", - "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6" - ], - "index": "pypi", - "version": "==8.0.1" - }, - "pytz": { - "hashes": [ - "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da", - "sha256:eb10ce3e7736052ed3623d49975ce333bcd712c7bb19a58b9e2089d4057d0798" - ], - "index": "pypi", - "version": "==2021.1" - }, - "pyutmpx": { - "hashes": [ - "sha256:9469aa51b1fd921db583ffa9a746944f089c12ce5718800474b878f331f0a30a" - ], - "index": "pypi", - "version": "==0.3.1" - }, - "toml": { - "hashes": [ - "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", - "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f" - ], - "index": "pypi", - "version": "==0.10.2" - } - }, - "develop": { - "attrs": { - "hashes": [ - "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1", - "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb" - ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", - "version": "==21.2.0" - }, - "iniconfig": { - "hashes": [ - "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3", - "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32" - ], - "version": "==1.1.1" - }, - "packaging": { - "hashes": [ - "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7", - "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14" - ], - "markers": "python_version >= '3.6'", - "version": "==21.0" - }, - "pluggy": { - "hashes": [ - "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159", - "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3" - ], - "markers": "python_version >= '3.6'", - "version": "==1.0.0" - }, - "py": { - "hashes": [ - "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3", - "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a" - ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==1.10.0" - }, - "pyparsing": { - "hashes": [ - "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", - "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b" - ], - "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==2.4.7" - }, - "pytest": { - "hashes": [ - "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89", - "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134" - ], - "index": "pypi", - "version": "==6.2.5" - }, - "toml": { - "hashes": [ - "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", - "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f" - ], - "index": "pypi", - "version": "==0.10.2" - } - } -} diff --git a/docs/conf.py b/docs/conf.py index 9f30bd2..dd7bc44 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# flake8: noqa # # Configuration file for the Sphinx documentation builder. # @@ -6,7 +7,8 @@ # full list see the documentation: # http://www.sphinx-doc.org/en/master/config -import os, sys +import os +import sys # -- Path setup -------------------------------------------------------------- @@ -24,13 +26,15 @@ author = 'Thomas Touhey' # The full version, including alpha/beta/rc tags + def _get_release(): - from os.path import dirname, join - from pkg_resources import find_distributions as find_dist + from os.path import dirname, join + from pkg_resources import find_distributions as find_dist + + module_path = join(dirname(__file__), '..') + dist = next(find_dist(module_path, True)) + return dist.version - module_path = join(dirname(__file__), '..') - dist = next(find_dist(module_path, True)) - return dist.version release = _get_release() @@ -44,9 +48,9 @@ release = _get_release() # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx_autodoc_typehints', - 'sphinx.ext.todo' + 'sphinx.ext.autodoc', + 'sphinx_autodoc_typehints', + 'sphinx.ext.todo', ] # Add any paths that contain templates here, relative to this directory. @@ -71,7 +75,9 @@ language = None # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path . -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', '.goutput*', '*.kate-swp'] +exclude_patterns = [ + '_build', 'Thumbs.db', '.DS_Store', '.goutput*', '*.kate-swp', +] # The name of the Pygments (syntax highlighting) style to use. pygments_style = 'sphinx' @@ -86,8 +92,7 @@ html_favicon = "favicon.png" # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -# -#html_theme = 'alabaster' + html_theme = 'sphinx_rtd_theme' # Theme options are theme-specific and customize the look and feel of a theme @@ -142,8 +147,10 @@ latex_elements = { # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'fingerd.tex', 'fingerd Documentation', - 'Thomas Touhey', 'manual'), + ( + master_doc, 'fingerd.tex', 'fingerd Documentation', + 'Thomas Touhey', 'manual' + ), ] @@ -152,8 +159,10 @@ latex_documents = [ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'fingerd', 'fingerd Documentation', - [author], 1) + ( + master_doc, 'fingerd', 'fingerd Documentation', + [author], 1 + ), ] @@ -163,7 +172,9 @@ man_pages = [ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'fingerd', 'fingerd Documentation', - author, 'fingerd', 'A modern finger (RFC 1288) server.', - 'Miscellaneous'), + ( + master_doc, 'fingerd', 'fingerd Documentation', + author, 'fingerd', 'A modern finger (RFC 1288) server.', + 'Miscellaneous' + ), ] diff --git a/fingerd/__init__.py b/fingerd/__init__.py index 2c4c4e3..e560976 100755 --- a/fingerd/__init__.py +++ b/fingerd/__init__.py @@ -1,25 +1,45 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** -""" finger is both a protocol and a utility to get the information and - status from a user on a distant machine. It was standardized in RFC 742 - in 1977, then in RFC 1288 in 1991, and has been abandoned by most - people since. +# ***************************************************************************** +""" Pure Python finger protocol implementation. - This Python module is a finger server implementation that allows you - to give out real information as well as fictional information. """ + finger is both a protocol and a utility to get the information and + status from a user on a distant machine. It was standardized in RFC 742 + in 1977, then in RFC 1288 in 1991, and has been abandoned by most + people since. -import os.path as _path - -from os import environ as _environ -from sys import stderr as _stderr + This Python module is a finger server implementation that allows you + to give out real information as well as fictional information. """ +from .core import ( + FingerFormatter, FingerInterface, FingerLogger, + FingerServer, FingerSession, FingerUser, +) +from .errors import ( + BindError, ConfigurationError, HostnameError, + InvalidBindError, NoBindsError, +) +from .fiction import ( + FingerAction, FingerFictionInterface, FingerScenario, + FingerScenarioInterface, FingerUserCreationAction, + FingerUserDeletionAction, FingerUserEditionAction, + FingerUserLoginAction, FingerUserLogoutAction, + FingerUserSessionChangeAction, +) +from .native import FingerNativeInterface from .version import version -from .errors import * -from .core import * -from .fiction import * -from .native import * + +__all__ = [ + 'BindError', 'ConfigurationError', 'FingerAction', + 'FingerFictionInterface', 'FingerFormatter', 'FingerInterface', + 'FingerLogger', 'FingerNativeInterface', 'FingerScenario', + 'FingerScenarioInterface', 'FingerServer', 'FingerSession', + 'FingerUser', 'FingerUserCreationAction', 'FingerUserDeletionAction', + 'FingerUserEditionAction', 'FingerUserLoginAction', + 'FingerUserLogoutAction', 'FingerUserSessionChangeAction', + 'HostnameError', 'InvalidBindError', 'NoBindsError', 'version', +] # End of file. diff --git a/fingerd/__main__.py b/fingerd/__main__.py index 5e39ae9..b01e5b2 100755 --- a/fingerd/__main__.py +++ b/fingerd/__main__.py @@ -1,16 +1,15 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** -""" Main script of the module. - Runs the server depending on the configuration content. """ +# ***************************************************************************** +""" Main script of the module. """ from .cli import cli as _cli __all__ = [] if __name__ == '__main__': - _cli() + _cli() # End of file. diff --git a/fingerd/cli.py b/fingerd/cli.py index 755764f..db1400d 100644..100755 --- a/fingerd/cli.py +++ b/fingerd/cli.py @@ -1,58 +1,80 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ fingerd CLI interface. """ +from platform import ( + python_implementation as _python_impl, + python_version as _python_version, +) +from sys import stderr as _stderr + import click as _click -from platform import (python_implementation as _python_impl, - python_version as _python_version) +from . core import ( + FingerInterface as _FingerInterface, + FingerServer as _FingerServer, +) +from .fiction import ( + FingerScenario as _FingerScenario, + FingerScenarioInterface as _FingerScenarioInterface, +) +from .native import FingerNativeInterface as _FingerNativeInterface from .version import version as _version -from . import (FingerNativeInterface as _FingerNativeInterface, - FingerScenario as _FingerScenario, - FingerScenarioInterface as _FingerScenarioInterface, - FingerInterface as _FingerInterface, - FingerServer as _FingerServer) __all__ = ['cli'] -@_click.command(context_settings = {'help_option_names': ['-h', '--help']}) -@_click.version_option(version = _version, - message = f"fingerd version {_version}, " - f"running on {_python_impl()} {_python_version()}") -@_click.option('-b', '--binds', default = 'localhost:79', - envvar = ('BIND', 'BINDS')) -@_click.option('-H', '--hostname', default = 'localhost', - envvar = ('FINGER_HOST',)) -@_click.option('-t', '--type', default = 'native', - envvar = ('FINGER_TYPE',)) -@_click.option('-s', '--scenario', default = 'actions.toml', - envvar = ('FINGER_ACTIONS',)) -def cli(binds, hostname, type, scenario): - """ fingerd is a modern finger (RFC 1288) server. - Find out more at <https://fingerd.touhey.pro/>. """ - - hostname = hostname.upper() - type = type.casefold() - - if type == 'native': - iface = _FingerNativeInterface() - elif type in ('actions', 'scenario'): - fic = _FingerScenario() - fic.load(scenario) - - iface = _FingerScenarioInterface(fic) - else: - if type != 'dummy': - print("warning: unknown interface type, falling back on dummy", - file = _stderr) - - iface = _FingerInterface() - - server = _FingerServer(binds = binds, - hostname = hostname, interface = iface) - server.serve_forever() + +@_click.command(context_settings={'help_option_names': ['-h', '--help']}) +@_click.version_option( + version=_version, + message=( + f'fingerd version {_version}, ' + f'running on {_python_impl()} {_python_version()}' + ), +) +@_click.option( + '-b', '--binds', default='localhost:79', + envvar=('BIND', 'BINDS'), +) +@_click.option( + '-H', '--hostname', default='localhost', + envvar=('FINGER_HOST',), +) +@_click.option( + '-t', '--type', 'type_', default='native', + envvar=('FINGER_TYPE',), +) +@_click.option( + '-s', '--scenario', default='actions.toml', + envvar=('FINGER_ACTIONS',), +) +def cli(binds, hostname, type_, scenario): + """ fingerd is a modern finger (RFC 1288) server. + Find out more at <https://fingerd.touhey.pro/>. """ + + hostname = hostname.upper() + type_ = type_.casefold() + + if type_ == 'native': + iface = _FingerNativeInterface() + elif type_ in ('actions', 'scenario'): + fic = _FingerScenario() + fic.load(scenario) + + iface = _FingerScenarioInterface(fic) + elif type_ != 'dummy': + print( + 'warning: unknown interface type, falling back on dummy', + file=_stderr, + ) + else: + iface = _FingerInterface() + + server = _FingerServer( + binds=binds, hostname=hostname, interface=iface) + server.serve_forever() # End of file. diff --git a/fingerd/core.py b/fingerd/core.py index ea828ea..8bd6d2d 100755 --- a/fingerd/core.py +++ b/fingerd/core.py @@ -1,1333 +1,1444 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ The main server class for the server, with the related utilities that - will not be presented to the user, is defined in this file. """ + will not be presented to the user, is defined in this file. """ -import string as _string -import socket as _socket -import signal as _signal -import sys as _sys +import asyncio as _asyncio import copy as _copy import multiprocessing as _multip -import asyncio as _asyncio +import signal as _signal +import socket as _socket +import string as _string +import sys as _sys -from enum import Enum as _Enum -from io import TextIOWrapper as _TextIOWrapper, StringIO as _StringIO +from collections.abc import Sequence as _Sequence from datetime import datetime as _dt, timedelta as _td, tzinfo as _tzinfo +from enum import Enum as _Enum from typing import Optional as _Optional -from collections.abc import Sequence as _Sequence +from croniter import croniter as _croniter from pytz import utc as _utc from .errors import ( - NoBindsError as _NoBindsError, - InvalidBindError as _InvalidBindError, - HostnameError as _HostnameError) + HostnameError as _HostnameError, + InvalidBindError as _InvalidBindError, + NoBindsError as _NoBindsError, +) __all__ = [ - 'FingerServer', - 'FingerInterface', 'FingerFormatter', 'FingerLogger', - 'FingerUser', 'FingerSession' + 'FingerFormatter', 'FingerInterface', 'FingerLogger', + 'FingerServer', 'FingerSession', 'FingerUser', 'cron', ] # --- -# Basic representations. +# Decorators. # --- -class FingerSession: - """ Representation of an active session for a given user on the system. - - :param time: The start time of the given session; - by default, the current datetime. """ - - __slots__ = ('_start', '_line', '_host', '_idle') - - def __init__(self, *_, time: _Optional[_dt] = None): - self._start = _dt.now() - self._idle = None - self._line = None - self._host = None - - self.start = time - self._idle = self._start - - def __repr__(self): - p = ('start', 'idle', 'line', 'orig') - p = (f"{x} = {repr(getattr(self, x))}" for x in p \ - if getattr(self, x) is not None) - return f"{self._class__.__name__}({', '.join(p)})" - @property - def start(self) -> _dt: - """ The timestamp at which the session has started. +def cron(spec: str): + """ Decorator for asking the finger server to call the current + function at the given time. """ - Note that when set, if no timezone is present, - the datetime is considered UTC. """ + spec = _croniter(spec) - return self._start + def decorator(func): + func.__cron__ = spec + return func - @start.setter - def start(self, value): - value = value if isinstance(value, _dt) else _dt(value) - if value.tzinfo is None: - value = value.replace(tzinfo = _utc) + return decorator - self._start = value - @property - def idle(self) -> _dt: - """ The timestamp since which the user is idle on the session. - - Note that when set, if no timezone is present, - the datetime is considered UTC; also, if the provided - datetime is before the session start timestamp, it is - set to it. """ - - return self._idle - - @idle.setter - def idle(self, value): - value = value if isinstance(value, _dt) else _dt(value) - if value.tzinfo is None: - value = value.replace(tzinfo = _utc) - - if value < self._start: - value = self._start - - self._idle = value - - @property - def line(self) -> _Optional[str]: - """ The line on which the user is. """ - - return self._line - - @line.setter - def line(self, value): - self._line = None if value is None else str(value) - - @property - def host(self) -> _Optional[str]: - """ The host from which the user is connected. """ - - return self._host +# --- +# Basic representations. +# --- - @host.setter - def host(self, value): - self._host = None if value is None else str(value) +class FingerSession: + """ Representation of an active session for a given user on the system. -class FingerUser: - """ Representation of a user on the system, returned by subclasses of - :py:class:`FingerInterface` and used by subclasses of - :py:class:`FingerFormatter`. - - :param login: The login of the user. - :param name: The display name of the user. - :param home: The path to the home of the user. - :param shell: The path to the user's default shell. """ - - __slots__ = ('_login', '_name', '_home', '_shell', '_office', - '_plan', '_last_login', '_sessions') - - def __init__(self, *_, login: _Optional[str] = None, - name: _Optional[str] = None, home: _Optional[str] = None, - shell: _Optional[str] = None): - self._login = None - self._name = '' - self._home = None - self._shell = None - self._office = None - self._plan = None - self._last_login = None - self._sessions = _FingerSessionManager() + :param time: The start time of the given session; + by default, the current datetime. """ - self.login = login - self.name = name - self.home = home - self.shell = shell + __slots__ = ('_start', '_line', '_host', '_idle') - def __repr__(self): - p = ('login', 'name', 'home', 'shell', 'office', - 'last_login', 'sessions') - p = (f"{x} = {repr(getattr(self, x))}" for x in p \ - if getattr(self, x) is not None) - return f"{self._class__.__name__}({', '.join(p)})" + def __init__(self, *_, time: _Optional[_dt] = None): + self._start = _dt.now() + self._idle = None + self._line = None + self._host = None - @property - def login(self) -> _Optional[str]: - """ The login name of the user, e.g. 'cake' or 'gaben'. """ + self.start = time + self._idle = self._start - return self._login + def __repr__(self): + p = ('start', 'idle', 'line', 'orig') + p = ( + f'{x} = {getattr(self, x)!r}' + for x in p if getattr(self, x) is not None) + return f"{self._class__.__name__}({', '.join(p)})" - @login.setter - def login(self, value: _Optional[str]) -> None: - self._login = value + @property + def start(self) -> _dt: + """ The timestamp at which the session has started. - @property - def name(self) -> _Optional[str]: - """ The display name of the user, e.g. 'Jean Dupont'. """ + Note that when set, if no timezone is present, + the datetime is considered UTC. """ - return self._name + return self._start - @name.setter - def name(self, value: _Optional[str]) -> None: - self._name = value + @start.setter + def start(self, value): + value = value if isinstance(value, _dt) else _dt(value) + if value.tzinfo is None: + value = value.replace(tzinfo=_utc) - @property - def last_login(self) -> _Optional[str]: - """ The last login date for the user, None if not known. """ + self._start = value - return self._last_login + @property + def idle(self) -> _dt: + """ The timestamp since which the user is idle on the session. - @last_login.setter - def last_login(self, value: _Optional[str]) -> None: - self._last_login = None if value is None else \ - value if isinstance(value, _dt) else _dt(value) + Note that when set, if no timezone is present, + the datetime is considered UTC; also, if the provided + datetime is before the session start timestamp, it is + set to it. """ - @property - def home(self) -> _Optional[str]: - """ The path to the user's home on the given system, None if - not known or defined. """ + return self._idle - return self._home + @idle.setter + def idle(self, value): + value = value if isinstance(value, _dt) else _dt(value) + if value.tzinfo is None: + value = value.replace(tzinfo=_utc) - @home.setter - def home(self, value: _Optional[str]) -> None: - self._home = None if value is None else str(value) + if value < self._start: + value = self._start - @property - def shell(self) -> _Optional[str]: - """ The path to the user's shell on the given system, None if - not known or defined. """ + self._idle = value - return self._shell + @property + def line(self) -> _Optional[str]: + """ The line on which the user is. """ - @shell.setter - def shell(self, value: _Optional[str]) -> None: - self._shell = None if value is None else str(value) + return self._line - @property - def office(self) -> _Optional[str]: - """ The display name of the user's office, None if not known - or defined. """ + @line.setter + def line(self, value): + self._line = None if value is None else str(value) - return self._office + @property + def host(self) -> _Optional[str]: + """ The host from which the user is connected. """ - @office.setter - def office(self, value: _Optional[str]) -> None: - self._office = None if value is None else str(value) + return self._host - @property - def plan(self) -> _Optional[str]: - """ The plan of the user, usually the content of the ``.plan`` - file in the user's home on real (and kind of obsolete) UNIX-like - systems. """ + @host.setter + def host(self, value): + self._host = None if value is None else str(value) - return self._plan - @plan.setter - def plan(self, value: _Optional[str]) -> None: - if value is None: - self._plan = None - else: - value = str(value) - self._plan = '\n'.join(value.splitlines()) +class FingerUser: + """ Representation of a user on the system, returned by subclasses of + :py:class:`FingerInterface` and used by subclasses of + :py:class:`FingerFormatter`. + + :param login: The login of the user. + :param name: The display name of the user. + :param home: The path to the home of the user. + :param shell: The path to the user's default shell. """ + + __slots__ = ( + '_login', '_name', '_home', '_shell', '_office', + '_plan', '_last_login', '_sessions', + ) + + def __init__( + self, *_, + login: _Optional[str] = None, + name: _Optional[str] = None, + home: _Optional[str] = None, + shell: _Optional[str] = None, + ): + self._login = None + self._name = '' + self._home = None + self._shell = None + self._office = None + self._plan = None + self._last_login = None + self._sessions = _FingerSessionManager() + + self.login = login + self.name = name + self.home = home + self.shell = shell + + def __repr__(self): + p = ( + 'login', 'name', 'home', 'shell', 'office', + 'last_login', 'sessions') + p = ( + f'{x} = {getattr(self, x)!r}' + for x in p if getattr(self, x) is not None) + return f"{self._class__.__name__}({', '.join(p)})" + + @property + def login(self) -> _Optional[str]: + """ The login name of the user, e.g. 'cake' or 'gaben'. """ + + return self._login + + @login.setter + def login(self, value: _Optional[str]) -> None: + self._login = value + + @property + def name(self) -> _Optional[str]: + """ The display name of the user, e.g. 'Jean Dupont'. """ + + return self._name + + @name.setter + def name(self, value: _Optional[str]) -> None: + self._name = value + + @property + def last_login(self) -> _Optional[str]: + """ The last login date for the user, None if not known. """ + + return self._last_login + + @last_login.setter + def last_login(self, value: _Optional[str]) -> None: + self._last_login = None if value is None else \ + value if isinstance(value, _dt) else _dt(value) + + @property + def home(self) -> _Optional[str]: + """ The path to the user's home on the given system, None if + not known or defined. """ + + return self._home + + @home.setter + def home(self, value: _Optional[str]) -> None: + self._home = None if value is None else str(value) + + @property + def shell(self) -> _Optional[str]: + """ The path to the user's shell on the given system, None if + not known or defined. """ + + return self._shell + + @shell.setter + def shell(self, value: _Optional[str]) -> None: + self._shell = None if value is None else str(value) + + @property + def office(self) -> _Optional[str]: + """ The display name of the user's office, None if not known + or defined. """ + + return self._office + + @office.setter + def office(self, value: _Optional[str]) -> None: + self._office = None if value is None else str(value) + + @property + def plan(self) -> _Optional[str]: + """ The plan of the user, usually the content of the ``.plan`` + file in the user's home on real (and kind of obsolete) UNIX-like + systems. """ + + return self._plan + + @plan.setter + def plan(self, value: _Optional[str]) -> None: + if value is None: + self._plan = None + else: + value = str(value) + self._plan = '\n'.join(value.splitlines()) - @property - def sessions(self) -> _Sequence[FingerSession]: - """ The current sessions array for the user, always defined. """ + @property + def sessions(self) -> _Sequence[FingerSession]: + """ The current sessions array for the user, always defined. """ - return self._sessions + return self._sessions class _FingerSessionManager: - """ Session manager. """ - - __slots__ = ('_sessions') - - def __init__(self): - self._sessions = [] - - def __repr__(self): - return repr(self._sessions) - - def __bool__(self): - return bool(self._sessions) - - def __iter__(self): - return iter(self._sessions[::-1]) - - def __len__(self): - return len(self._sessions) - - def __delitem__(self, key): - if key is None: - self._sessions.pop(0) - else: - for i in [i for i, x in enumerate(self._sessions) \ - if key == x.name][::-1]: - self._sessions.pop(i) - - def __getitem__(self, key): - if key is None: - try: - return self._sessions[0] - except IndexError: - raise KeyError("could not get latest session") from None - - if type(key) is int: - try: - return self._sessions[key] - except IndexError: - msg = f"could not get session #{repr(key)}" - raise IndexError(msg) from None - - try: - return next(x for x in self._sessions if key == x.name) - except StopIteration: - raise KeyError(f"could not get session {repr(key)}") from None - - def __setitem__(self, key, value): - if not isinstance(value, FingerSession): - raise TypeError("can only add sessions into a session manager") - value = _copy.deepcopy(value) - - if key is None: - # Check if except the first session, the key of the session, - # if any, does not override another key. - - if value.name is not None: - try: - next(i for i, x in self._sessions[1:] \ - if value.name == x.name) - except StopIteration: - pass - else: - msg = "value.name overrides another session's key" - raise ValueError(msg) from None - - try: - self._sessions[0] = value - return - except IndexError: - raise KeyError("could not set latest session") from None - - if type(key) is int: - # Check if except the key-th session, the key of the session, - # if any, does not override another key. - - if value.name is not None: - try: - next(i for i, x in self._sessions \ - if i != key and value.name == x.name) - except StopIteration: - pass - else: - msg = "value.name overrides another session's key" - raise ValueError(msg) from None - - try: - self._sessions[key] = value - return - except IndexError: - msg = f"could not set session #{repr(key)}" - raise IndexError(msg) from None - - value.name = key - - try: - i = next(i for i, x in enumerate(self._sessions) \ - if key == x.name) - except StopIteration: - raise KeyError(f"could not set session {repr(key)}") from None - - self._sessions[i] = value - - def add(self, session): - """ Add a session. """ - - if not isinstance(session, FingerSession): - raise TypeError("can only insert sessions into a session " - "manager") - - self._sessions.insert(0, session) + """ Session manager. """ + + __slots__ = ('_sessions',) + + def __init__(self): + self._sessions = [] + + def __repr__(self): + return repr(self._sessions) + + def __bool__(self): + return bool(self._sessions) + + def __iter__(self): + return iter(self._sessions[::-1]) + + def __len__(self): + return len(self._sessions) + + def __delitem__(self, key): + if key is None: + self._sessions.pop(0) + else: + for i in ([ + i for i, x in enumerate(self._sessions) + if key == x.name + ][::-1]): + self._sessions.pop(i) + + def __getitem__(self, key): + if key is None: + try: + return self._sessions[0] + except IndexError: + raise KeyError('could not get latest session') from None + + if type(key) is int: + try: + return self._sessions[key] + except IndexError: + msg = f'could not get session #{key!r}' + raise IndexError(msg) from None + + try: + return next(x for x in self._sessions if key == x.name) + except StopIteration: + raise KeyError(f'could not get session {key!r}') from None + + def __setitem__(self, key, value): + if not isinstance(value, FingerSession): + raise TypeError('can only add sessions into a session manager') + value = _copy.deepcopy(value) + + if key is None: + # Check if except the first session, the key of the session, + # if any, does not override another key. + + if value.name is not None: + try: + next( + i for i, x in self._sessions[1:] + if value.name == x.name) + except StopIteration: + pass + else: + msg = "value.name overrides another session's key" + raise ValueError(msg) from None + + try: + self._sessions[0] = value + return + except IndexError: + raise KeyError('could not set latest session') from None + + if type(key) is int: + # Check if except the key-th session, the key of the session, + # if any, does not override another key. + + if value.name is not None: + try: + next( + i for i, x in self._sessions + if i != key and value.name == x.name) + except StopIteration: + pass + else: + msg = "value.name overrides another session's key" + raise ValueError(msg) from None + + try: + self._sessions[key] = value + return + except IndexError: + msg = f'could not set session #{key!r}' + raise IndexError(msg) from None + + value.name = key + + try: + i = next( + i for i, x in enumerate(self._sessions) + if key == x.name) + except StopIteration: + raise KeyError(f'could not set session {key!r}') from None + + self._sessions[i] = value + + def add(self, session): + """ Add a session. """ + + if not isinstance(session, FingerSession): + raise TypeError( + 'can only insert sessions into a session manager', + ) + + self._sessions.insert(0, session) # --- # Formatter base class. # --- -class FingerFormatter: - """ Formatter for :py:class:`FingerServer`. - Provides text-formatted (as strings limited to ASCII) - answers for given queries with given results as objects. - This class must be subclassed by other formatters. - Only methods not starting with an underscore are called by - instances of :py:class:`FingerServer`; others are utilities - called by these. +class FingerFormatter: + """ Formatter for :py:class:`FingerServer`. + Provides text-formatted (as strings limited to ASCII) + answers for given queries with given results as objects. - Unless methods are overridden to have a different behaviour, - this formatter aims at RFC 1288 compliance. + This class must be subclassed by other formatters. + Only methods not starting with an underscore are called by + instances of :py:class:`FingerServer`; others are utilities + called by these. - :param tzinfo: Timezone used for formatting dates and - times. """ + Unless methods are overridden to have a different behaviour, + this formatter aims at RFC 1288 compliance. - def __init__(self, tzinfo: _Optional[_tzinfo] = None): - if tzinfo is None: - tzinfo = _dt.now().astimezone().tzinfo - self._tzinfo = tzinfo + :param tzinfo: Timezone used for formatting dates and + times. """ - def __repr__(self): - return f"{self.__class__.__name__}()" + def __init__(self, tzinfo: _Optional[_tzinfo] = None): + if tzinfo is None: + tzinfo = _dt.now().astimezone().tzinfo + self._tzinfo = tzinfo - # --- - # Internal formatting utilities. - # --- + def __repr__(self): + return f'{self.__class__.__name__}()' - def _format_idle(self, idle: _td) -> str: - """ Format an idle time delta. """ + # --- + # Internal formatting utilities. + # --- - def _iter_idle(idle): - days = int(idle.days) - hours = int(idle.seconds / 3600) - mins = int(idle.seconds % 3600 / 60) - secs = int(idle.seconds % 60) + def _format_idle(self, idle: _td) -> str: + """ Format an idle time delta. """ - if days: - yield f"{days} day{('', 's')[days > 1]}" - if hours: - yield f"{hours} hour{('', 's')[hours > 1]}" - if mins: - yield f"{mins} minute{('', 's')[mins > 1]}" - if secs: - yield f"{secs} second{('', 's')[secs > 1]}" + def _iter_idle(idle): + days = int(idle.days) + hours = int(idle.seconds / 3600) + mins = int(idle.seconds % 3600 / 60) + secs = int(idle.seconds % 60) + + if days: + yield f'{days} day{("", "s")[days > 1]}' + if hours: + yield f'{hours} hour{("", "s")[hours > 1]}' + if mins: + yield f'{mins} minute{("", "s")[mins > 1]}' + if secs: + yield f'{secs} second{("", "s")[secs > 1]}' - return f"{' '.join(_iter_idle(idle))} idle" + return f'{" ".join(_iter_idle(idle))} idle' - def _format_time(self, d: _td) -> str: - """ Format a date and time. """ + def _format_time(self, d: _td) -> str: + """ Format a date and time. """ - if d < _td(): - return "" + if d < _td(): + return '' - days = int(d.days) - hours = int(d.seconds / 3600) - mins = int(d.seconds % 3600 / 60) + days = int(d.days) + hours = int(d.seconds / 3600) + mins = int(d.seconds % 3600 / 60) - if days: - return f"{days}d" - elif hours or mins: - return f"{hours:02}:{mins:02}" + if days: + return f'{days}d' + elif hours or mins: + return f'{hours:02}:{mins:02}' - return "" - - def _format_when(self, d: _dt) -> str: - """ Format a date and time for 'when'. """ + return '' + + def _format_when(self, d: _dt) -> str: + """ Format a date and time for 'when'. """ - return d.astimezone(self._tzinfo).strftime("%a %H:%M") - - def _format_header(self, hostname: str, raw_query: str) -> str: - """ Returns the header of the formatted answer for every request, - except when an error has occurred in the user's query. + return d.astimezone(self._tzinfo).strftime('%a %H:%M') - :param hostname: The hostname configured for the server. - :param raw_query: The raw query given by the user. - :return: The header of the formatted answer as text. """ - - if raw_query: - raw_query = ' ' + raw_query - - return f"Site: {hostname}\r\n" f"Command line:{raw_query}\r\n" "\r\n" - - def _format_footer(self) -> str: - """ Returns the footer of the formatted answer for every request, - except when an error has occurred in the user's query. - - :return: The footer of the formatted answer as text. """ - - return "" - - # --- - # Used formatting functions. - # --- - - def format_query_error(self, hostname: str, raw_query: str) -> str: - """ Returns the formatted answer for when an error has occurred - in the user's query. - - :param hostname: The hostname configured for the server. - :param raw_query: The raw query given by the user. - :return: The formatted answer as text. """ - - return "Site: {}\r\n" \ - "You have made a mistake in your query!\r\n".format(hostname) - - def format_short(self, hostname: str, raw_query: str, - users: _Sequence[FingerUser]) -> str: - """ Returns the formatted answer for a user list in the 'short' - format. - - :param hostname: The hostname configured for the server. - :param raw_query: The raw query given by the user. - :param users: The user list. - :return: The formatted answer as text. """ - - if not users: - return "No user list available.\r\n" - - now = _dt.now().astimezone() - - lst = [] - for user in users: - if not user.sessions: - lst.append((user, None)) - for session in user.sessions: - lst.append((user, session)) - - _login = lambda u, s: u.login - _name = lambda u, s: u.name - _line = lambda u, s: s.line if s and s.line else '' - _idle = lambda u, s: self._format_time(now - s.idle) if s else '' - _logt = lambda u, s: self._format_when(s.start) if s else '' - _offic = lambda u, s: (f'({s.host})' if s.host - else u.office if u.office else '') - - columns = ( - ('Login',) + tuple(_login(u, s) for u, s in lst), - ('Name',) + tuple( _name(u, s) for u, s in lst), - ('TTY',) + tuple( _line(u, s) for u, s in lst), - ('Idle',) + tuple( _idle(u, s) for u, s in lst), - ('When',) + tuple( _logt(u, s) for u, s in lst), - ('Office',) + tuple(_offic(u, s) for u, s in lst)) - - sizes = tuple(max(map(len, c)) + 1 for i, c in enumerate(columns)) - align = ('<', '<', '<', '^', '^', '<') - - lines = [] - for line in range(len(columns[0])): - lines.append(' '.join(\ - f"{columns[i][line][:sizes[i]]:{align[i]}{sizes[i]}}" \ - for i in range(len(columns)))) - - return ( - self._format_header(hostname, raw_query) + - '\r\n'.join(lines) + '\r\n' + - self._format_footer()) - - def format_long(self, hostname: str, raw_query: str, - users: _Sequence[FingerUser]) -> str: - """ Returns the formatted answer for a user list in the 'long' - format. - - :param hostname: The hostname configured for the server. - :param raw_query: The raw query given by the user. - :param users: The user list. - :return: The formatted answer as text. """ - - if not users: - return "No user list available.\r\n" - - now = _dt.now().astimezone() - res = '' - - for user in users: - res += ("" - f"Login name: {user.login[:27]:<27} " - f"Name: {user.name if user.name else user.login}\r\n" - f"Directory: {user.home[:28] if user.home else '':<28} " - f"Shell: {user.shell if user.shell else ''}\r\n") - if user.office: - res += f"Office: {user.office if user.office else ''}\r\n" - - if user.sessions: - # List current sessions. - - for se in user.sessions: - since = (se.start.astimezone(self._tzinfo) - .strftime("%a %b %e %R")) - tz = self._tzinfo - res += f"On since {since} ({tz})" - if se.line is not None: - res += f" on {se.line}" - if se.host is not None: - res += f" from {se.host}" - res += "\r\n" - - idle = now - se.idle - if idle >= _td(seconds = 4): - res += f" {self._format_idle(idle)}\r\n" - elif user.last_login is not None: - # Show last login. - - date = (user.last_login.astimezone(self._tzinfo) - .strftime("%a %b %e %R")) - tz = self._tzinfo - res += f"Last login {date} ({tz}) on console\r\n" - else: - res += "Never logged in.\r\n" - - if user.plan is None: - res += "No plan.\r\n" - else: - res += "Plan:\r\n" - res += "\r\n".join(user.plan.splitlines()) - res += "\r\n" - - res += "\r\n" - - return ( - self._format_header(hostname, raw_query) + - res + - self._format_footer()) + def _format_header(self, hostname: str, raw_query: str) -> str: + """ Returns the header of the formatted answer for every request, + except when an error has occurred in the user's query. + + :param hostname: The hostname configured for the server. + :param raw_query: The raw query given by the user. + :return: The header of the formatted answer as text. """ + + if raw_query: + raw_query = ' ' + raw_query + + return ( + f'Site: {hostname}\r\n' + f'Command line:{raw_query}\r\n' + '\r\n' + ) + + def _format_footer(self) -> str: + """ Returns the footer of the formatted answer for every request, + except when an error has occurred in the user's query. + + :return: The footer of the formatted answer as text. """ + + return '' + + # --- + # Used formatting functions. + # --- + + def format_query_error(self, hostname: str, raw_query: str) -> str: + """ Returns the formatted answer for when an error has occurred + in the user's query. + + :param hostname: The hostname configured for the server. + :param raw_query: The raw query given by the user. + :return: The formatted answer as text. """ + + return ( + f'Site: {hostname}\r\n' + 'You have made a mistake in your query!\r\n' + ) + + def format_short( + self, + hostname: str, + raw_query: str, + users: _Sequence[FingerUser], + ) -> str: + """ Returns the formatted answer for a user list in the 'short' + format. + + :param hostname: The hostname configured for the server. + :param raw_query: The raw query given by the user. + :param users: The user list. + :return: The formatted answer as text. """ + + if not users: + return 'No user list available.\r\n' + + now = _dt.now().astimezone() + + lst = [] + for user in users: + if not user.sessions: + lst.append((user, None)) + for session in user.sessions: + lst.append((user, session)) + + def _login(u, s): + return u.login + + def _name(u, s): + return u.name + + def _line(u, s): + return s.line if s and s.line else '' + + def _idle(u, s): + return self._format_time(now - s.idle) if s else '' + + def _logt(u, s): + return self._format_when(s.start) if s else '' + + def _offic(u, s): + return ( + f'({s.host})' if s.host + else u.office if u.office else '') + + columns = ( + ('Login',) + tuple(_login(u, s) for u, s in lst), + ('Name',) + tuple(_name(u, s) for u, s in lst), + ('TTY',) + tuple(_line(u, s) for u, s in lst), + ('Idle',) + tuple(_idle(u, s) for u, s in lst), + ('When',) + tuple(_logt(u, s) for u, s in lst), + ('Office',) + tuple(_offic(u, s) for u, s in lst)) + + sizes = tuple(max(map(len, c)) + 1 for i, c in enumerate(columns)) + align = ('<', '<', '<', '^', '^', '<') + + lines = [] + for line in range(len(columns[0])): + lines.append(' '.join( + f'{columns[i][line][:sizes[i]]:{align[i]}{sizes[i]}}' + for i in range(len(columns)) + )) + + return ( + self._format_header(hostname, raw_query) + + '\r\n'.join(lines) + '\r\n' + + self._format_footer()) + + def format_long( + self, + hostname: str, + raw_query: str, + users: _Sequence[FingerUser], + ) -> str: + """ Returns the formatted answer for a user list in the 'long' + format. + + :param hostname: The hostname configured for the server. + :param raw_query: The raw query given by the user. + :param users: The user list. + :return: The formatted answer as text. """ + + if not users: + return 'No user list available.\r\n' + + now = _dt.now().astimezone() + res = '' + + for user in users: + res += ( + f'Login name: {user.login[:27]:<27} ' + f'Name: {user.name if user.name else user.login}\r\n' + f'Directory: {user.home[:28] if user.home else "":<28} ' + f'Shell: {user.shell if user.shell else ""}\r\n' + ) + if user.office: + res += f"Office: {user.office if user.office else ''}\r\n" + + if user.sessions: + # List current sessions. + + for se in user.sessions: + since = ( + se.start.astimezone(self._tzinfo) + .strftime('%a %b %e %R') + ) + tz = self._tzinfo + res += f'On since {since} ({tz})' + if se.line is not None: + res += f' on {se.line}' + if se.host is not None: + res += f' from {se.host}' + res += '\r\n' + + idle = now - se.idle + if idle >= _td(seconds=4): + res += f' {self._format_idle(idle)}\r\n' + elif user.last_login is not None: + # Show last login. + + date = ( + user.last_login.astimezone(self._tzinfo) + .strftime('%a %b %e %R')) + tz = self._tzinfo + res += f'Last login {date} ({tz}) on console\r\n' + else: + res += 'Never logged in.\r\n' + + if user.plan is None: + res += 'No plan.\r\n' + else: + res += 'Plan:\r\n' + res += '\r\n'.join(user.plan.splitlines()) + res += '\r\n' + + res += '\r\n' + + return ( + self._format_header(hostname, raw_query) + + res + + self._format_footer()) # --- # Interface (dummy) base class. # --- -class FingerInterface: - """ Data source for :py:class:`FingerServer`. - Provides users and answers for the various queries received - from the clients by the server. - - This class must be subclassed by other interfaces. - Only methods not starting with an underscore are called by - instances of :py:class:`FingerServer`; others are utilities - called by these. - - By default, it behaves like a dummy interface. """ - def __repr__(self): - return f"{self.__class__.__name__}()" - - def transmit_query(self, query: _Optional[str], host: str, - verbose: bool) -> str: - """ Transmit a user query to a foreign host, and return - the answer formatted by it. - - If used directly (not overridden by subclasses), this - method will refuse to transmit finger queries. - - :param query: The user query, set to None in case of - no query provided by the client. - :param host: The distant host to which to transmit the - query. - :param verbose: Whether the verbose flag (``/W``, long format) - has been passed by the current client - or not. - :return: The answer formatted by the distant server. """ - - return "This server won't transmit finger queries.\r\n" - - def search_users(self, query: _Optional[str], - active: _Optional[bool]) -> _Sequence[FingerUser]: - """ Search for users on the current host using the given query. - - :param query: The user query, set to None in case of no - query provided by the client. - :param active: Whether to get active users (True), - inactive users (False), or all users (None). - :return: The list of users found using the query provided - by the client. """ - - return [] - - def regular(seconds: float = 1): - """ Decorator for asking the finger server to call the current - function every second. """ - - try: - seconds = float(seconds) - assert 0 < seconds - except (TypeError, ValueError, AssertionError): - raise ValueError("invalid period") from None - - def decorator(func): - func.__period__ = seconds - return func - - return decorator +class FingerInterface: + """ Data source for :py:class:`FingerServer`. + Provides users and answers for the various queries received + from the clients by the server. + + This class must be subclassed by other interfaces. + Only methods not starting with an underscore are called by + instances of :py:class:`FingerServer`; others are utilities + called by these. + + By default, it behaves like a dummy interface. """ + + def __repr__(self): + return f'{self.__class__.__name__}()' + + def transmit_query( + self, + query: _Optional[str], + host: str, + verbose: bool, + ) -> str: + """ Transmit a user query to a foreign host, and return + the answer formatted by it. + + If used directly (not overridden by subclasses), this + method will refuse to transmit finger queries. + + :param query: The user query, set to None in case of + no query provided by the client. + :param host: The distant host to which to transmit the + query. + :param verbose: Whether the verbose flag (``/W``, long format) + has been passed by the current client + or not. + :return: The answer formatted by the distant server. """ + + return "This server won't transmit finger queries.\r\n" + + def search_users( + self, + query: _Optional[str], + active: _Optional[bool], + ) -> _Sequence[FingerUser]: + """ Search for users on the current host using the given query. + + :param query: The user query, set to None in case of no + query provided by the client. + :param active: Whether to get active users (True), + inactive users (False), or all users (None). + :return: The list of users found using the query provided + by the client. """ + + return [] # --- # Server logger base class. # --- + class FingerLogger: - """ Logger class for :py:class:`FingerServer`. - Provides methods for allowing custom behaviours on server and - request events. + """ Logger class for :py:class:`FingerServer`. + Provides methods for allowing custom behaviours on server and + request events. - This class must be subclassed by other loggers. - Only methods not starting with an underscore are called by - instances of :py:class:`FingerServer`; others are utilities called - by these. + This class must be subclassed by other loggers. + Only methods not starting with an underscore are called by + instances of :py:class:`FingerServer`; others are utilities called + by these. - Unless methods are overridden to have a different behaviour, - this logger emits messages on the given text stream, by default - stderr (standard error output). """ + Unless methods are overridden to have a different behaviour, + this logger emits messages on the given text stream, by default + stderr (standard error output). """ - def __init__(self, stream = _sys.stderr): - self._stream = stream - self._lock = _multip.Lock() + def __init__(self, stream=_sys.stderr): + self._stream = stream + self._lock = _multip.Lock() - def __repr__(self): - return f"{self.__class__.__name__}()" + def __repr__(self): + return f'{self.__class__.__name__}()' - # --- - # Internal methods. - # --- + # --- + # Internal methods. + # --- - def _write(self, msg: str) -> None: - """ Write the formatted string on the given stream. """ + def _write(self, msg: str) -> None: + """ Write the formatted string on the given stream. """ - self._lock.acquire() - print('\r' + msg, file = self._stream) - self._lock.release() + self._lock.acquire() + print('\r' + msg, file=self._stream) + self._lock.release() - def _write_s(self, src: str, msg: str) -> None: - """ Write the formatted string and add the source address of - the host to the given stream using - :py:meth:`FingerLogger._write`. """ + def _write_s(self, src: str, msg: str) -> None: + """ Write the formatted string and add the source address of + the host to the given stream using + :py:meth:`FingerLogger._write`. """ - self._write(f'[{src}] {msg}') + self._write(f'[{src}] {msg}') - # --- - # Public methods. - # --- + # --- + # Public methods. + # --- - def start(self, host: str, port: str) -> None: - """ This method is called when the server starts listening on - the given port of the given host. + def start(self, host: str, port: str) -> None: + """ This method is called when the server starts listening on + the given port of the given host. - :param host: The host on which the server has started - listening (usually an IPv4 or IPv6 address). - :param port: The TCP port on which the server has started - listening. """ + :param host: The host on which the server has started + listening (usually an IPv4 or IPv6 address). + :param port: The TCP port on which the server has started + listening. """ - self._write(f"Starting fingerd on [{host}]:{port}.") + self._write(f'Starting fingerd on [{host}]:{port}.') - def stop(self, host: str, port: str) -> None: - """ This method is called when the server stops listening on - the given port of the given host. + def stop(self, host: str, port: str) -> None: + """ This method is called when the server stops listening on + the given port of the given host. - :param host: The host on which the server has stopped - listening (usually an IPv4 or IPv6 address). - :param port: The TCP port on which the server has stopped - listening. """ + :param host: The host on which the server has stopped + listening (usually an IPv4 or IPv6 address). + :param port: The TCP port on which the server has stopped + listening. """ - self._write(f"Stopping fingerd on [{host}]:{port}.") + self._write(f'Stopping fingerd on [{host}]:{port}.') - def no_query(self, source: str) -> None: - """ This method is called when a connection has been opened with - the server by a client, but immediately closed without - emitting a query. + def no_query(self, source: str) -> None: + """ This method is called when a connection has been opened with + the server by a client, but immediately closed without + emitting a query. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). """ - self._write_s(source, "no query. (possible scan)") + self._write_s(source, 'no query. (possible scan)') - def bad_query(self, source: str) -> None: - """ This method is called when a client has emitted a bad request. + def bad_query(self, source: str) -> None: + """ This method is called when a client has emitted a bad request. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). """ - self._write_s(source, "bad request.") + self._write_s(source, 'bad request.') - def could_not_answer(self, source: str) -> None: - """ This method is called when a client has closed the connection - before or while the server was answering. + def could_not_answer(self, source: str) -> None: + """ This method is called when a client has closed the connection + before or while the server was answering. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). """ - self._write_s(source, "could not write the answer.") + self._write_s(source, 'could not write the answer.') - def transmit_list(self, source: str, host: str): - """ This method is called when a client has requested the server - to transmit a finger request to another finger server, without - a user query. + def transmit_list(self, source: str, host: str): + """ This method is called when a client has requested the server + to transmit a finger request to another finger server, without + a user query. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). - :param host: The host to which the user wants the request - to be transmitted. """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). + :param host: The host to which the user wants the request + to be transmitted. """ - self._write_s(source, f"transmit user list query to `{host}`.") + self._write_s(source, f'transmit user list query to `{host}`.') - def transmit(self, source: str, username: str, host: str) -> None: - """ This method is called when a client has requested the server - to transmit a finger request to another finger server, with - a user query. + def transmit(self, source: str, username: str, host: str) -> None: + """ This method is called when a client has requested the server + to transmit a finger request to another finger server, with + a user query. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). - :param username: The query the user wants transmitted to - the provided host. - :param host: The host to which the user wants the request - to be transmitted. """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). + :param username: The query the user wants transmitted to + the provided host. + :param host: The host to which the user wants the request + to be transmitted. """ - self._write_s(source, f"transmit user query for `{username}` " - f"to `{host}`.") + self._write_s( + source, + f'transmit user query for `{username}` to `{host}`.', + ) - def search_users(self, source: str, username: str) -> None: - """ This method is called when a client has requested a user list - to the current host, with a query provided. + def search_users(self, source: str, username: str) -> None: + """ This method is called when a client has requested a user list + to the current host, with a query provided. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). - :param username: The provided user query. """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). + :param username: The provided user query. """ - self._write_s(source, f"look for user `{username}`.") + self._write_s(source, f'look for user `{username}`.') - def list(self, source: str) -> None: - """ This method is called when a client has requested a user list - to the current host, without any provided queries. + def list_users(self, source: str) -> None: + """ This method is called when a client has requested a user list + to the current host, without any provided queries. - :param source: The source address from which the client - has started the connection (usually an - IPv4 or IPv6 address). """ + :param source: The source address from which the client + has started the connection (usually an + IPv4 or IPv6 address). """ - self._write_s(source, "list connected users.") + self._write_s(source, 'list connected users.') # --- # Bind-related configuration. # --- + class _BindAddress: - """ Bind address for fingerd. """ + """ Bind address for fingerd. """ - class Type(_Enum): - """ Bind address type. """ + class Type(_Enum): + """ Bind address type. """ - """ TCP on IPv4 bind. """ - TCP_IPv4 = 1 + """ TCP on IPv4 bind. """ + TCP_IPv4 = 1 - """ TCP on IPv6 bind. """ - TCP_IPv6 = 2 + """ TCP on IPv6 bind. """ + TCP_IPv6 = 2 - def __init__(self, family): - self._family = self.Type(family) + def __init__(self, family): + self._family = self.Type(family) - def __repr__(self): - return f"{self._class__.__name__}(family = {self._family})" + def __repr__(self): + return f'{self._class__.__name__}(family = {self._family!r})' - @property - def family(self): - """ Family as one of the `BindAddress.Type` enumeration values. """ + @property + def family(self): + """ Family as one of the `BindAddress.Type` enumeration values. """ - return self._family + return self._family class _TCP4Address(_BindAddress): - """ IPv4 TCP Address. """ + """ IPv4 TCP Address. """ - def __init__(self, address, port): - super().__init__(_BindAddress.Type.TCP_IPv4) + def __init__(self, address, port): + super().__init__(_BindAddress.Type.TCP_IPv4) - try: - self._addr = _socket.inet_pton(_socket.AF_INET, address) - except: - self._addr = address + try: + self._addr = _socket.inet_pton(_socket.AF_INET, address) + except Exception: + self._addr = address - self._port = port + self._port = port - @property - def runserver_params(self): - """ Return the data as `_runserver` parameters. """ + @property + def runserver_params(self): + """ Return the data as `_runserver` parameters. """ + + return ( + _socket.AF_INET, + _socket.inet_ntop(_socket.AF_INET, self._addr), + self._port) - return (_socket.AF_INET, _socket.inet_ntop(_socket.AF_INET, - self._addr), self._port) class _TCP6Address(_BindAddress): - """ IPv6 TCP Address. """ + """ IPv6 TCP Address. """ - def __init__(self, address, port): - super().__init__(_BindAddress.Type.TCP_IPv6) + def __init__(self, address, port): + super().__init__(_BindAddress.Type.TCP_IPv6) - try: - self._addr = _socket.inet_pton(_socket.AF_INET6, address) - except: - self._addr = address + try: + self._addr = _socket.inet_pton(_socket.AF_INET6, address) + except Exception: + self._addr = address - self._port = port + self._port = port - @property - def runserver_params(self): - """ Return the data as `_runserver` parameters. """ + @property + def runserver_params(self): + """ Return the data as `_runserver` parameters. """ - return (_socket.AF_INET6, _socket.inet_ntop(_socket.AF_INET6, - self._addr), self._port) + return ( + _socket.AF_INET6, + _socket.inet_ntop(_socket.AF_INET6, self._addr), + self._port, + ) class _BindsDecoder: - """ Binds decoder for fingerd. - - Takes a raw string and the protocol name, either 'finger' (the base - protocol managed by the class) or 'fingerd-control' (the protocol - used for controlling the live fingerd interface). """ - - def __init__(self, raw, proto = 'finger'): - proto = proto.casefold() - if proto not in ('finger',): - raise ValueError(f"unsupported protocol {proto}") - - self._binds = set() - - for x in map(lambda x: x.strip(), raw.split(',')): - addr = x - - # Try to find a scheme. - - scheme, *rest = x.split(':/') - if not rest: - # No scheme found, let's just guess the scheme based on - # the situation. + """ Binds decoder for fingerd. - x = scheme - scheme = {'finger': 'tcp'}[proto] - else: - # just don't add the ':' of ':/' again - x = '/' + ':/'.join(rest) + Takes a raw string and the protocol name, either 'finger' (the base + protocol managed by the class) or 'fingerd-control' (the protocol + used for controlling the live fingerd interface). """ - if (proto == 'finger' and scheme != 'tcp') \ - or scheme not in ('tcp',): - raise _InvalidBindError(addr, "unsupported scheme " - f"{repr(scheme)} for protocol {repr(proto)}") + def __init__(self, raw, proto='finger'): + proto = proto.casefold() + if proto not in ('finger',): + raise ValueError(f'unsupported protocol {proto}') - # Decode the address data. + self._binds = set() - if scheme == "tcp": - self._binds.update(self._decode_tcp_host(x)) + for x in map(lambda x: x.strip(), raw.split(',')): + addr = x + + # Try to find a scheme. - self._binds = tuple(self._binds) - - def __iter__(self): - return iter(self._binds) + scheme, *rest = x.split(':/') + if not rest: + # No scheme found, let's just guess the scheme based on + # the situation. - def __repr__(self): - return f"{self._class__.__name__}(binds = {self._binds})" + x = scheme + scheme = {'finger': 'tcp'}[proto] + else: + # just don't add the ':' of ':/' again + x = '/' + ':/'.join(rest) + + if ( + (proto == 'finger' and scheme != 'tcp') + or scheme not in ('tcp',) + ): + raise _InvalidBindError( + addr, + f'Unsupported scheme {scheme!r} for protocol {proto!r}', + ) + + # Decode the address data. + + if scheme == 'tcp': + self._binds.update(self._decode_tcp_host(x)) + + self._binds = tuple(self._binds) + + def __iter__(self): + return iter(self._binds) + + def __repr__(self): + return f'{self._class__.__name__}(binds = {self._binds})' + + def _decode_tcp_host(self, x): + """ Decode suitable hosts for a TCP bind. """ + + addrs = () + addr = x + + # TODO: manage the '*' case. + + # Get the host part first, we'll decode it later. + + if x[0] == '[': + # The host part is an IPv6, look for the closing ']' and + # decode it later. + + to = x.find(']') + if to < 0: + raise _InvalidBindError( + addr, "Expected closing ']'") + + host = x[1:to] + x = x[to + 1:] + + is_ipv6 = True + else: + # The host part is either an IPv4 or a host name, look for + # the ':' and decode it later. + + host, *x = x.split(':') + x = ':' + ':'.join(x) + + is_ipv6 = False + + # Decode the port part. + + if x == '': + port = 79 + elif x[0] == ':': + try: + port = int(x[1:]) + except ValueError: + try: + if x[1:] != '': + raise AssertionError('Expected a port number') + port = _socket.getservbyname(x[1:]) + except Exception: + raise _InvalidBindError( + addr, + 'Expected a valid port number or name ' + f'(got {x[1:]!r})', + ) from None + else: + raise _InvalidBindError( + addr, 'Garbage found after the host', + ) + + # Decode the host part and get the addresses. + + addrs = () + if is_ipv6: + # Decode the IPv6 address (validate it using `_socket.inet_pton`). + + ip6 = host + _socket.inet_pton(_socket.AF_INET6, host) + addrs += (_TCP6Address(ip6, port),) + else: + # Decode the host (try IPv4, otherwise, resolve domain). + + try: + ip = host.split('.') + if len(ip) < 2 or len(ip) > 4: + raise AssertionError('2 <= len(ip) <= 4') + + ip = list(map(int, ip)) + if not all(lambda x: 0 <= x < 256, ip): + raise AssertionError('non-8-bit component') + + if len(ip) == 2: + ip = [ip[0], 0, 0, ip[1]] + elif len(ip) == 3: + ip = [ip[0], 0, ip[1], ip[2]] + + addrs += (_TCP4Address(ip, port),) + except Exception: + entries = _socket.getaddrinfo( + host, port, + proto=_socket.IPPROTO_TCP, + type=_socket.SOCK_STREAM) + + for ent in entries: + if ( + ent[0] not in (_socket.AF_INET, _socket.AF_INET6) + or ent[1] not in (_socket.SOCK_STREAM,) + ): + continue + + if ent[0] == _socket.AF_INET: + ip = ent[4][0] + _socket.inet_pton(_socket.AF_INET, ent[4][0]) + addrs += (_TCP4Address(ip, port),) + else: + ip6 = ent[4][0] + _socket.inet_pton(_socket.AF_INET6, ent[4][0]) + addrs += (_TCP6Address(ip6, port),) + + return addrs - def _decode_tcp_host(self, x): - """ Decode suitable hosts for a TCP bind. """ - - addrs = () - addr = x - - # TODO: manage the '*' case. - - # Get the host part first, we'll decode it later. - - if x[0] == '[': - # The host part is an IPv6, look for the closing ']' and - # decode it later. - - to = x.find(']') - if to < 0: - raise _InvalidBindAddressError(addr, "expected " \ - "closing ']'") - - host = x[1:to] - x = x[to + 1:] - - is_ipv6 = True - else: - # The host part is either an IPv4 or a host name, look for - # the ':' and decode it later. - - host, *x = x.split(':') - x = ':' + ':'.join(x) - - is_ipv6 = False - - # Decode the port part. - - if x == '': - port = 79 - elif x[0] == ':': - try: - port = int(x[1:]) - except: - try: - assert x[1:] != '' - port = _socket.getservbyname(x[1:]) - except: - raise _InvalidBindError(addr, - "expected a valid port number or name " \ - f"(got {repr(x[1:])})") from None - else: - raise _InvalidBindAddressError(addr, - "garbage found after the host") - - # Decode the host part and get the addresses. - - addrs = () - if is_ipv6: - # Decode the IPv6 address (validate it using `_socket.inet_pton`). - - ip6 = host - _socket.inet_pton(_socket.AF_INET6, host) - addrs += (_TCP6Address(ip6, port),) - else: - # Decode the host (try IPv4, otherwise, resolve domain). - - try: - ip = host.split('.') - assert 2 <= len(ip) <= 4 - - ip = list(map(int, ip)) - assert all(lambda x: 0 <= x < 256, ip) - - if len(ip) == 2: - ip = [ip[0], 0, 0, ip[1]] - elif len(ip) == 3: - ip = [ip[0], 0, ip[1], ip[2]] - - addrs += (_TCP4Address(ip, port),) - except: - entries = _socket.getaddrinfo(host, port, - proto = _socket.IPPROTO_TCP, - type = _socket.SOCK_STREAM) - - for ent in entries: - if ent[0] not in (_socket.AF_INET, _socket.AF_INET6) \ - or ent[1] not in (_socket.SOCK_STREAM,): - continue - - if ent[0] == _socket.AF_INET: - ip = ent[4][0] - _socket.inet_pton(_socket.AF_INET, ent[4][0]) - addrs += (_TCP4Address(ip, port),) - else: - ip6 = ent[4][0] - _socket.inet_pton(_socket.AF_INET6, ent[4][0]) - addrs += (_TCP6Address(ip6, port),) - - return addrs # --- # Finger/TCP server implementation. # --- + class _FingerQuery: - """ A finger query. Requests information about connected or specific - users on a remote server. + """ A finger query. Requests information about connected or specific + users on a remote server. - There are three types of requests recognized by RFC 1288: + There are three types of requests recognized by RFC 1288: - * {C} is a request for a list of all online users. - * {Q1} is a request for a local user. - * {Q2} is a request for a distant user (with hostname). + * {C} is a request for a list of all online users. + * {Q1} is a request for a local user. + * {Q2} is a request for a distant user (with hostname). - /W means the RUIP (program answering the query) should be more - verbose (this token can be ignored). """ + /W means the RUIP (program answering the query) should be more + verbose (this token can be ignored). """ - # "By default, this program SHOULD filter any unprintable data, - # leaving only printable 7-bit characters (ASCII 32 through - # ASCII 126), tabs (ASCII 9) and CRLFs." + # "By default, this program SHOULD filter any unprintable data, + # leaving only printable 7-bit characters (ASCII 32 through + # ASCII 126), tabs (ASCII 9) and CRLFs." - allowed_chars = ("\t !\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" - + _string.ascii_letters + _string.digits) + allowed_chars = ( + "\t !\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" + + _string.ascii_letters + _string.digits) - def __init__(self, raw): - """ Initialize the query object by decoding the data. """ + def __init__(self, raw): + """ Initialize the query object by decoding the data. """ - # Get a character string out of the query. + # Get a character string out of the query. - raw = raw.decode('ascii', errors = 'ignore') - raw = ''.join(c for c in raw if c in self.allowed_chars) - self.line = raw + raw = raw.decode('ascii', errors='ignore') + raw = ''.join(c for c in raw if c in self.allowed_chars) + self.line = raw - # Get elements. + # Get elements. - self.host = None - self.username = None - self.verbose = False - for element in raw.split(): - if element[0] == '/': - if 'W' in element[1:]: - self.verbose = True - continue - elif self.username is not None: - raise Exception - self.username = element + self.host = None + self.username = None + self.verbose = False + for element in raw.split(): + if element[0] == '/': + if 'W' in element[1:]: + self.verbose = True + continue + elif self.username is not None: + raise Exception + self.username = element - if self.username is not None and '@' in self.username: - self.host, *self.username = self.username.split('@')[::-1] - self.username = '@'.join(self.username[::-1]) + if self.username is not None and '@' in self.username: + self.host, *self.username = self.username.split('@')[::-1] + self.username = '@'.join(self.username[::-1]) class FingerServer: - """ The main finger server class. + """ The main finger server class. + + :param binds: The hosts and ports on which the server should + listen to and answer finger requests. + :param hostname: The hostname to be included in answers sent + to clients. + :param interface: The interface to use for querying + users and sessions. + :param formatter: The formatter to use for formatting + answers sent to clients. + :param logger: The logger to use for logging server and + request events. """ + + def __init__( + self, + binds: str = 'localhost:79', + hostname: str = 'LOCALHOST', + interface: FingerInterface = FingerInterface(), + formatter: FingerFormatter = FingerFormatter(), + logger: FingerLogger = FingerLogger(), + ): + # Check the host name, which should be simple LDH, i.e. + # Letters, Digits, Hyphens. + + try: + hostname = hostname.upper() + if not all( + c in _string.ascii_letters + _string.digits + '.-' + for c in hostname + ): + raise AssertionError('Non-LDH hostname') + except Exception: + raise _HostnameError(hostname) + + # Check the interface and formatter classes. + + if not isinstance(interface, FingerInterface): + raise TypeError( + 'Please base your interface ' + 'on the base class provided by the fingerd module', + ) + + if not isinstance(formatter, FingerFormatter): + raise TypeError( + 'Please base your formatter ' + 'on the base class provided by the fingerd module', + ) + + if not isinstance(logger, FingerLogger): + raise TypeError( + 'Please base your logger ' + 'on the base class provided by the fingerd module', + ) + + # Keep the parameters. + + self._host = hostname + self._logger = logger + self._interface = interface + self._formatter = formatter + + # Check the binds. + + self._binds = [b for b in _BindsDecoder(binds)] + if not self._binds: + raise _NoBindsError() + + # Initialize multi-process related. + + self._p = None + + @property + def hostname(self): + """ The hostname configured for this server. """ + + return self._host + + @property + def logger(self): + """ The logger configured for this server. """ + + return self._logger + + @property + def interface(self): + """ The interface configured for this server. """ + + return self._interface + + @property + def formatter(self): + """ The formatter configured for this server. """ + + return self._formatter + + def _serve(self): + """ Serving process. """ + + async def handle_connection(inp, outp): + """ Handle a new incoming connection on one of the + FingerServer bindings. """ + + src, *_ = inp._transport.get_extra_info('peername') + + # Gather the request line. + + try: + line = await inp.readline() + except ConnectionResetError: + self.logger.no_query(src) + outp.close() + return + + # Decode the request. + + ans = '' + + try: + query = _FingerQuery(line) + except Exception: + self.logger.bad_query(src) + ans = self.formatter.format_query_error( + self.hostname, line) + else: + if query.host is not None: + if query.username: + self.logger.transmit( + src, query.username, query.host) + else: + self.logger.transmit_list(src, query.host) + + ans = self.interface.transmit_query( + query.host, query.username, query.verbose) + else: + if query.username: + self.logger.search_users(src, query.username) + users = self.interface.search_users( + query.username, None) + else: + self.logger.list_users(src) + users = self.interface.search_users(None, True) - :param binds: The hosts and ports on which the server should - listen to and answer finger requests. - :param hostname: The hostname to be included in answers sent - to clients. - :param interface: The interface to use for querying - users and sessions. - :param formatter: The formatter to use for formatting - answers sent to clients. - :param logger: The logger to use for logging server and - request events. """ + if query.username or query.verbose: + ans = self.formatter.format_long( + self.hostname, query.line, users) + else: + ans = self.formatter.format_short( + self.hostname, query.line, users) - def __init__(self, binds: str = 'localhost:79', - hostname: str = 'LOCALHOST', - interface: FingerInterface = FingerInterface(), - formatter: FingerFormatter = FingerFormatter(), - logger: FingerLogger = FingerLogger()): - # Check the host name. + # Write the output. - try: - hostname = hostname.upper() - assert all(c in _string.ascii_letters + _string.digits + '.-' - for c in hostname) - except: - raise _HostnameError(hostname) - - # Check the interface and formatter classes. - - if not isinstance(interface, FingerInterface): - raise TypeError("please base your interface " - "on the base class provided by the fingerd module") - - if not isinstance(formatter, FingerFormatter): - raise TypeError("please base your formatter " - "on the base class provided by the fingerd module") - - if not isinstance(logger, FingerLogger): - raise TypeError("please base your logger " - "on the base class provided by the fingerd module") - - # Keep the parameters. - - self._host = hostname - self._logger = logger - self._interface = interface - self._formatter = formatter - - # Check the binds. - - self._binds = [b for b in _BindsDecoder(binds)] - if not self._binds: - raise _NoBindsError() + try: + ans = '\r\n'.join(ans.splitlines()) + '\r\n' + outp.write(ans.encode('ascii', errors='ignore')) + except ConnectionResetError: + self.logger.could_not_answer(src) - # Initialize multi-process related. - - self._p = None - - @property - def hostname(self): - """ The hostname configured for this server. """ - - return self._host - - @property - def logger(self): - """ The logger configured for this server. """ - - return self._logger - - @property - def interface(self): - """ The interface configured for this server. """ - - return self._interface - - @property - def formatter(self): - """ The formatter configured for this server. """ - - return self._formatter - - def _serve(self): - """ Serving process. """ - - async def handle_connection(inp, outp): - """ Handle a new incoming connection on one of the - FingerServer bindings. """ - - src, *_ = inp._transport.get_extra_info('peername') - - # Gather the request line. - - try: - line = await inp.readline() - except ConnectionResetError: - self.logger.no_query(src) - outp.close() - return - - # Decode the request. - - ans = "" - - try: - query = _FingerQuery(line) - except: - self.logger.bad_query(src) - ans = self.formatter.format_query_error( - self.hostname, line) - else: - if query.host is not None: - if query.username: - self.logger.transmit(src, - query.username, query.host) - else: - self.logger.transmit_list(src, query.host) + outp.close() - ans = self.interface.transmit_query(query.host, - query.username, query.verbose) - else: - if query.username: - self.logger.search_users(src, query.username) - users = self.interface.search_users(query.username, - None) - else: - self.logger.list(src) - users = self.interface.search_users(None, True) + async def start_server(bind): + """ Start a given server. """ - if query.username or query.verbose: - ans = self.formatter.format_long(self.hostname, - query.line, users) - else: - ans = self.formatter.format_short(self.hostname, - query.line, users) + family, host, port = bind.runserver_params - # Write the output. + self.logger.start(host, port) - try: - ans = '\r\n'.join(ans.splitlines()) + '\r\n' - outp.write(ans.encode('ascii', errors = 'ignore')) - except ConnectionResetError: - self.logger.could_not_answer(src) + try: + server = await _asyncio.start_server( + handle_connection, + host=host, port=port, family=family, + reuse_address=True) + await server.serve_forever() + except _asyncio.CancelledError: + pass + finally: + self.logger.stop(host, port) - outp.close() + async def cron_call(func, spec): + """ Call a function periodically using a croniter + specification. """ + + spec.set_current(_dt.now()) + + while True: + try: + func() + except SystemExit: + return + + while True: + seconds = ( + spec.get_next(_dt) - _dt.now() + ).total_seconds() + if seconds >= 0: + break + + await _asyncio.sleep(seconds) # TODO + + async def start_servers(): + """ Start the servers. """ + + def get_coroutines(): + """ Tasks iterator. """ + + for bind in self._binds: + yield start_server(bind) + + for key in dir(self.interface): + member = getattr(self.interface, key) + if not callable(member): + continue + + try: + spec = member.__cron__ + except AttributeError: + continue + + if not isinstance(spec, _croniter): + continue + + yield cron_call(member, spec) + + tasks = [_asyncio.create_task(co) for co in get_coroutines()] + await _asyncio.wait( + tasks, return_when=_asyncio.FIRST_COMPLETED) + + exc = None + for task in tasks: + if exc is None: + exc = task.exception() - async def start_server(bind): - """ Start a given server. """ + task.cancel() + + if exc is not None: + raise exc - family, host, port = bind.runserver_params + try: + _asyncio.run(start_servers()) + except KeyboardInterrupt: + pass - self.logger.start(host, port) + def start(self) -> None: + """ Bind all ports for the given hosts and start the underlying + servers in separate processes. """ - try: - server = await _asyncio.start_server( - handle_connection, - host = host, port = port, family = family, - reuse_address = True) - await server.serve_forever() - except _asyncio.CancelledError: - pass - finally: - self.logger.stop(host, port) + if self._p is not None and self._p.is_alive(): + return - async def regular_call(bound_method, seconds): - while True: - try: - bound_method() - except SystemExit: - return - - await _asyncio.sleep(seconds) - - async def start_servers(): - """ Start the servers. """ - - def get_coroutines(): - """ Tasks iterator. """ - - for bind in self._binds: - yield start_server(bind) - - for key in dir(self.interface): - member = getattr(self.interface, key) - if not callable(member): - continue - - try: - period = float(member.__period__) - assert 0 < period - except (AttributeError, TypeError, ValueError, - AssertionError): - continue - - yield regular_call(member, period) - - tasks = [_asyncio.create_task(co) for co in get_coroutines()] - await _asyncio.wait(tasks, - return_when = _asyncio.FIRST_COMPLETED) - - exc = None - for task in tasks: - if exc is None: - exc = task.exception() - - task.cancel() - - if exc is not None: - raise exc - - try: - _asyncio.run(start_servers()) - except KeyboardInterrupt: - pass - - def start(self) -> None: - """ Bind all ports for the given hosts and start the underlying - servers in separate processes. """ - - if self._p is not None and self._p.is_alive(): - return - - self._p = _multip.Process(target = self._serve) - self._p.start() + self._p = _multip.Process(target=self._serve) + self._p.start() - def stop(self) -> None: - """ Unbind all ports for the given hosts and stop the underlying - server processes. """ + def stop(self) -> None: + """ Unbind all ports for the given hosts and stop the underlying + server processes. """ - if self._p is None or not self._p.is_alive(): - return + if self._p is None or not self._p.is_alive(): + return - self._p.join() - self._p = None + self._p.kill() + self._p.join() + self._p = None - def serve_forever(self) -> None: - """ Shortcut for synchronously starting all servers using - :py:meth:`FingerServer.start`, waiting for an interrupt - signal, and stopping all servers using - :py:meth:`FingerServer.stop`. """ + def serve_forever(self) -> None: + """ Shortcut for synchronously starting all servers using + :py:meth:`FingerServer.start`, waiting for an interrupt + signal, and stopping all servers using + :py:meth:`FingerServer.stop`. """ - if self._p is not None: - self.start() + if self._p is not None: + self.start() - try: - while True: - _signal.pause() - except KeyboardInterrupt: - pass + try: + while True: + _signal.pause() + except KeyboardInterrupt: + pass - self.stop() - else: - # If the server hasn't been started on another process - # using ``.start()``, we can just start is on this process. + self.stop() + else: + # If the server hasn't been started on another process + # using ``.start()``, we can just start is on this process. - self._serve() + self._serve() - def shutdown(self): - """ Shutdown the server, alias to `.stop()`. """ + def shutdown(self): + """ Shutdown the server, alias to `.stop()`. """ - self.stop() + self.stop() # End of file. diff --git a/fingerd/errors.py b/fingerd/errors.py index 98110e8..6987bdb 100755 --- a/fingerd/errors.py +++ b/fingerd/errors.py @@ -1,52 +1,58 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ This file defines the exceptions used throughout the module. """ __all__ = [ - 'ConfigurationError', - 'BindError', 'NoBindsError', 'InvalidBindError', - 'HostnameError' + 'BindError', 'ConfigurationError', 'HostnameError', + 'InvalidBindError', 'NoBindsError', ] # --- # Configuration-related errors. # --- + class ConfigurationError(Exception): - """ Generic exception for when an invalid configuration - exception occurs. """ + """ Generic exception for when an invalid configuration + exception occurs. """ - pass + pass -class BindError(ConfigurationError): - """ Exception raised when an error has occurred with the provided - binds. """ - def __init__(self, msg): - super().__init__("an error has occurred with the provided binds: " - + msg) +class HostnameError(ConfigurationError): + """ Exception raised when a host name is invalid. """ -class NoBindsError(BindError): - """ Exception raised when no binds were provided. """ + def __init__(self, hostname): + super().__init__('invalid host name {}.'.format(repr(hostname))) - def __init__(self): - super().__init__("no valid bind") -class InvalidBindError(BindError): - """ Exception raised when one of the provided binds came out - erroneous. """ +class BindError(ConfigurationError): + """ Exception raised when an error has occurred with the provided + binds. """ - def __init__(self, bind, msg = None): - super().__init__("one of the provided bind ({}) ".format(repr(bind)) - + "was invalid{}".format(': ' + msg if msg is not None else '')) + def __init__(self, msg): + super().__init__( + f'an error has occurred with the provided binds: {msg}') -class HostnameError(ConfigurationError): - """ Exception raised when a host name is invalid. """ - def __init__(self, hostname): - super().__init__("invalid host name {}.".format(repr(hostname))) +class NoBindsError(BindError): + """ Exception raised when no binds were provided. """ + + def __init__(self): + super().__init__('no valid bind') + + +class InvalidBindError(BindError): + """ Exception raised when one of the provided binds came out + erroneous. """ + + def __init__(self, bind, msg=None): + super().__init__( + f'one of the provided bind ({bind!r}) ' + f'was invalid{": " + msg if msg else ""}', + ) # End of file. diff --git a/fingerd/fiction.py b/fingerd/fiction.py index 2e68403..8ddb2ec 100755 --- a/fingerd/fiction.py +++ b/fingerd/fiction.py @@ -1,33 +1,36 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ Definitions for the finger server fiction interface. - This file contains everything to decode and use the actions file. """ + This file contains everything to decode and use the actions file. """ +import copy as _copy +import math as _math import os.path as _path -import copy as _copy, re as _re, math as _math -import itertools as _itertools +import re as _re -from enum import Enum as _Enum -from datetime import datetime as _dt, timedelta as _td -from typing import Optional as _Optional, Union as _Union, Tuple as _Tuple from collections import defaultdict as _defaultdict from collections.abc import Sequence as _Sequence +from datetime import datetime as _dt, timedelta as _td +from enum import Enum as _Enum +from typing import Optional as _Optional, Union as _Union from .core import ( - FingerInterface as _FingerInterface, - FingerUser as _FingerUser, - FingerSession as _FingerSession) + FingerInterface as _FingerInterface, + FingerSession as _FingerSession, + FingerUser as _FingerUser, + cron as _cron, +) __all__ = [ - 'FingerAction', 'FingerUserCreationAction', 'FingerUserEditionAction', - 'FingerUserDeletionAction', 'FingerUserLoginAction', - 'FingerUserSessionChangeAction', 'FingerUserLogoutAction', - - 'FingerFictionInterface', - 'FingerScenario', 'FingerScenarioInterface'] + 'FingerAction', 'FingerFictionInterface', 'FingerScenario', + 'FingerScenarioInterface', 'FingerUserCreationAction', + 'FingerUserDeletionAction', 'FingerUserEditionAction', + 'FingerUserLoginAction', 'FingerUserLogoutAction', + 'FingerUserSessionChangeAction', +] _toml = None @@ -35,172 +38,186 @@ _toml = None # Users and sessions for the fictions. # --- + class _FictionalUser(_FingerUser): - """ Representation of a user on the fictional system, - compatible with :py:class:`fingerd.core.FingerUser` - and implementing existing and/or new properties more suited for - fiction. + """ Representation of a user on the fictional system, + compatible with :py:class:`fingerd.core.FingerUser` + and implementing existing and/or new properties more suited for + fiction. - For now, there are no modifications from the base class. """ + For now, there are no modifications from the base class. """ + + pass - pass class _FictionalSession(_FingerSession): - """ Representation of an active session for a given user - on the fictional system, compatible with - :py:class:`fingerd.core.FingerSession`. + """ Representation of an active session for a given user + on the fictional system, compatible with + :py:class:`fingerd.core.FingerSession`. + + The two main modifications from the base class are the following: + + * Each session has a name, which identifies it. It can be used + to designate it, allowing outside code to edit and delete the + session specifically from an outside point of view. + * Since the actions only allow for setting the user idle or not, + when the user is active, the idle timestamp is simulated to make + it seem like the user makes signs of life at an irregular but + reasonable rate, like when the user often stops typing to think + or do a task outside of the computer. """ - The two main modifications from the base class are the following: + __slots__ = ('_name', '_is_idle', '_idle_last') - * Each session has a name, which identifies it. It can be used - to designate it, allowing outside code to edit and delete the - session specifically from an outside point of view. - * Since the actions only allow for setting the user idle or not, - when the user is active, the idle timestamp is simulated to make - it seem like the user makes signs of life at an irregular but - reasonable rate, like when the user stops typing to think - or do a task outside of the computer often. """ + def __init__(self, *args, name=None, is_idle=False, **kwargs): + super().__init__(*args, **kwargs) - def __init__(self, *args, name = None, is_idle = False, **kwargs): - super().__init__(*args, **kwargs) + self._name = None if name is None else str(name) + self._is_idle = is_idle + self._idle_last = self.start - self._name = None if name is None else str(name) - self._is_idle = is_idle - self._idle_last = self.start + def __repr__(self): + p = ('name', 'start', 'line', 'host', 'idle_since', 'active_since') + p = ( + f'{x} = {getattr(self, x)!r}' + for x in p if getattr(self, x) is not None + ) + return f"{self.__class__.__name__}({', '.join(p)})" - def __repr__(self): - p = ('name', 'start', 'line', 'host', 'idle_since', 'active_since') - p = (f"{x} = {repr(getattr(self, x))}" for x in p \ - if getattr(self, x) is not None) - return f"{self.__class__.__name__}({', '.join(p)})" + @property + def name(self): + """ Session name (for identifying). """ - @property - def name(self): - """ Session name (for identifying). """ + return self._name - return self._name + @name.setter + def name(self, value): + self._name = None if value is None else str(value) - @name.setter - def name(self, value): - self._name = None if value is None else str(value) + @property + def idle(self): + """ Idle time (simulated). """ - @property - def idle(self): - """ Idle time (simulated). """ + if self._is_idle: + return self._idle_last - if self._is_idle: - return self._idle_last + now = _dt.now().astimezone() - now = _dt.now().astimezone() + # Generate a number of seconds and return it. - # Generate a number of seconds and return it. + def s(x): + return _math.sin(x * (_math.pi / 2)) - x = (now - self._idle_last).seconds - s = lambda x: _math.sin(x * (_math.pi / 2)) - randsecs = int(abs(s(x) + s(x / 4))) + x = (now - self._idle_last).seconds + randsecs = int(abs(s(x) + s(x / 4))) - return now - _td(seconds = randsecs) + return now - _td(seconds=randsecs) - @idle.setter - def idle(self, value): - # Does nothing as we manage this time. + @idle.setter + def idle(self, value): + # Does nothing as we manage this time. - pass + pass - @property - def idle_since(self): - """ Idle since the given time. """ + @property + def idle_since(self): + """ Idle since the given time. """ - if not self._is_idle: - return None - return self._idle_last + if not self._is_idle: + return None + return self._idle_last - @idle_since.setter - def idle_since(self, value): - self._is_idle = True - self._idle_last = value + @idle_since.setter + def idle_since(self, value): + self._is_idle = True + self._idle_last = value - @property - def active_since(self): - """ Active since the given time. """ + @property + def active_since(self): + """ Active since the given time. """ - if self._is_idle: - return None - return self._idle_last + if self._is_idle: + return None + return self._idle_last + + @active_since.setter + def active_since(self, value): + self._is_idle = False + self._idle_last = value - @active_since.setter - def active_since(self, value): - self._is_idle = False - self._idle_last = value # --- # Parse a delta string. # --- -__delta0_re = _re.compile(r"(-?)(([0-9]+[a-z]+)+)") -__delta1_re = _re.compile(r"([0-9]+)([a-z]+)") +__delta0_re = _re.compile(r'(-?)(([0-9]+[a-z]+)+)') +__delta1_re = _re.compile(r'([0-9]+)([a-z]+)') + def _parse_delta(raw): - """ Parse a delta string as found in the configuration files. """ - - try: - delta = _td() - - sign, elements, _ = __delta0_re.fullmatch(raw).groups() - sign = (1, -1)[len(sign)] - for res in __delta1_re.finditer(elements): - num, typ = res.groups() - num = int(num) - - if typ == 'w': - delta += _td(weeks = sign * num) - elif typ in 'jd': - delta += _td(days = sign * num) - elif typ == 'h': - delta += _td(hours = sign * num) - elif typ == 'm': - delta += _td(minutes = sign * num) - elif typ == 's': - delta += _td(seconds = sign * num) - else: - raise Exception - - return delta - except Exception as e: - return None + """ Parse a delta string as found in the configuration files. """ + + try: + delta = _td() + + sign, elements, _ = __delta0_re.fullmatch(raw).groups() + sign = (1, -1)[len(sign)] + for res in __delta1_re.finditer(elements): + num, typ = res.groups() + num = int(num) + + if typ == 'w': + delta += _td(weeks=sign * num) + elif typ in 'jd': + delta += _td(days=sign * num) + elif typ == 'h': + delta += _td(hours=sign * num) + elif typ == 'm': + delta += _td(minutes=sign * num) + elif typ == 's': + delta += _td(seconds=sign * num) + else: + raise Exception + + return delta + except Exception: + return None + def _format_delta(td): - """ Create a delta string. """ + """ Create a delta string. """ - sls = zip((_td(days = 7), _td(days = 1), _td(seconds = 3600), - _td(seconds = 60)), 'wdhm') + sls = zip( + (_td(days=7), _td(days=1), _td(seconds=3600), _td(seconds=60)), + 'wdhm', + ) - if td >= _td(): - d = '' + if td >= _td(): + d = '' - for span, letter in sls: - n = td // span - if n: - d += f"{n}{letter}" - td -= span * n + for span, letter in sls: + n = td // span + if n: + d += f'{n}{letter}' + td -= span * n - s = td.seconds - if not d or s: - d += f"{s}s" - else: - d = '-' + s = td.seconds + if not d or s: + d += f'{s}s' + else: + d = '-' - for span, letter in sls: - n = -td // span - if n: - d += f"{n}{letter}" - td += span * n + for span, letter in sls: + n = -td // span + if n: + d += f'{n}{letter}' + td += span * n - s = (-td).seconds - if s: - d += f"{s}s" + s = (-td).seconds + if s: + d += f'{s}s' + + return d - return d # --- # Unchanged global. @@ -209,9 +226,11 @@ def _format_delta(td): # TODO: Make that ``UnchangedType()`` (or ``type(Unchanged)()``) always # return Unchanged, like for None and NoneType. + class _UnchangedType: - def __repr__(self): - return "Unchanged" + def __repr__(self): + return 'Unchanged' + Unchanged = _UnchangedType() @@ -219,945 +238,1019 @@ Unchanged = _UnchangedType() # Action description. # --- + class FingerAction: - """ Base class for actions in a fiction. """ + """ Base class for actions in a fiction. """ + + pass - pass class FingerUserCreationAction(FingerAction): - """ A user has been created. + """ A user has been created. + + :param login: The login of the user that is being created. + :param name: The initial value for :py:attr:`_FictionalUser.name`. + :param home: The initial value for :py:attr:`_FictionalUser.home`. + :param shell: The initial value for :py:attr:`_FictionalUser.shell`. + :param office: The initial value for :py:attr:`_FictionalUser.office`. + :param plan: The initial value for :py:attr:`_FictionalUser.plan`. """ - :param login: The login of the user that is being created. - :param name: The initial value for :py:attr:`_FictionalUser.name`. - :param home: The initial value for :py:attr:`_FictionalUser.home`. - :param shell: The initial value for :py:attr:`_FictionalUser.shell`. - :param office: The initial value for :py:attr:`_FictionalUser.office`. - :param plan: The initial value for :py:attr:`_FictionalUser.plan`. """ + def __init__( + self, + login: str, + name: _Union[str, None] = None, + home: _Union[str, None] = None, + shell: _Union[str, None] = None, + office: _Union[str, None] = None, + plan: _Union[str, None] = None, + ): + super().__init__() + self._login = login + self._name = name + self._home = home + self._shell = shell + self._office = office + self._plan = plan - def __init__(self, login: str, - name: _Union[str, None] = None, - home: _Union[str, None] = None, - shell: _Union[str, None] = None, - office: _Union[str, None] = None, - plan: _Union[str, None] = None): - super().__init__() - self._login = login - self._name = name - self._home = home - self._shell = shell - self._office = office - self._plan = plan + def __repr__(self): + p = ( + f'{x} = {getattr(self, x)!r}' + for x in ('login', 'name', 'home', 'shell', 'office', 'plan')) + return f"{self.__class__.__name__}({', '.join(p)})" - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login', - 'name', 'home', 'shell', 'office', 'plan')) - return f"{self.__class__.__name__}({', '.join(p)})" + @property + def login(self) -> str: + """ The login of the user that is being created. """ - @property - def login(self) -> str: - """ The login of the user that is being created. """ + return self._login - return self._login + @property + def name(self) -> _Union[str, None]: + """ The initial value for :py:attr:`_FictionalUser.name`. """ - @property - def name(self) -> _Union[str, None]: - """ The initial value for :py:attr:`_FictionalUser.name`. """ + return self._name - return self._name + @property + def home(self) -> _Union[str, None]: + """ The initial value for :py:attr:`_FictionalUser.home`. """ - @property - def home(self) -> _Union[str, None]: - """ The initial value for :py:attr:`_FictionalUser.home`. """ + return self._home - return self._home + @property + def shell(self) -> _Union[str, None]: + """ The initial value for :py:attr:`_FictionalUser.shell`. """ - @property - def shell(self) -> _Union[str, None]: - """ The initial value for :py:attr:`_FictionalUser.shell`. """ + return self._shell - return self._shell + @property + def office(self) -> _Union[str, None]: + """ The initial value for :py:attr:`_FictionalUser.office`. """ - @property - def office(self) -> _Union[str, None]: - """ The initial value for :py:attr:`_FictionalUser.office`. """ + return self._office - return self._office + @property + def plan(self) -> _Union[str, None]: + """ The initial value for :py:attr:`_FictionalUser.plan`. """ - @property - def plan(self) -> _Union[str, None]: - """ The initial value for :py:attr:`_FictionalUser.plan`. """ + return self._plan - return self._plan class FingerUserEditionAction(FingerAction): - """ A user has been edited. - - :param login: The login of the user that is being edited. - :param name: The new value for :py:attr:`_FictionalUser.name`; - :py:data:`Unchanged` if the property is unchanged. - :param home: The new value for :py:attr:`_FictionalUser.home`; - :py:data:`Unchanged` if the property is unchanged. - :param shell: The new value for :py:attr:`_FictionalUser.shell`; - :py:data:`Unchanged` if the property is unchanged. - :param office: The new value for :py:attr:`_FictionalUser.office`; - :py:data:`Unchanged` if the property is unchanged. - :param plan: The new value for :py:attr:`_FictionalUser.plan`; - :py:data:`Unchanged` if the property is unchanged. """ - - def __init__(self, login: str, - name: _Union[str, None, _UnchangedType] = Unchanged, - home: _Union[str, None, _UnchangedType] = Unchanged, - shell: _Union[str, None, _UnchangedType] = Unchanged, - office: _Union[str, None, _UnchangedType] = Unchanged, - plan: _Union[str, None, _UnchangedType] = Unchanged): - super().__init__() - self._login = login - self._name = name - self._home = home - self._shell = shell - self._office = office - self._plan = plan - - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login', - 'name', 'home', 'shell', 'office', 'plan')) - return f"{self.__class__.__name__}({', '.join(p)})" - - @property - def login(self) -> str: - """ The login of the user that is being edited. """ - - return self._login - - @property - def name(self) -> _Union[str, None, _UnchangedType]: - """ The new value for :py:attr:`_FictionalUser.name`; - :py:data:`Unchanged` if the property is unchanged. """ - - return self._name - - @property - def home(self) -> _Union[str, None, _UnchangedType]: - """ The new value for :py:attr:`_FictionalUser.home`; - :py:data:`Unchanged` if the property is unchanged. """ - - return self._home - - @property - def shell(self) -> _Union[str, None, _UnchangedType]: - """ The new value for :py:attr:`_FictionalUser.shell`; - :py:data:`Unchanged` if the property is unchanged. """ - - return self._shell - - @property - def office(self) -> _Union[str, None, _UnchangedType]: - """ The new value for :py:attr:`_FictionalUser.office`; - :py:data:`Unchanged` if the property is unchanged. """ - - return self._office - - @property - def plan(self) -> _Union[str, None, _UnchangedType]: - """ The new value for :py:attr:`_FictionalUser.plan`; - :py:data:`Unchanged` if the property is unchanged. """ - - return self._plan + """ A user has been edited. + + :param login: The login of the user that is being edited. + :param name: The new value for :py:attr:`_FictionalUser.name`; + :py:data:`Unchanged` if the property is unchanged. + :param home: The new value for :py:attr:`_FictionalUser.home`; + :py:data:`Unchanged` if the property is unchanged. + :param shell: The new value for :py:attr:`_FictionalUser.shell`; + :py:data:`Unchanged` if the property is unchanged. + :param office: The new value for :py:attr:`_FictionalUser.office`; + :py:data:`Unchanged` if the property is unchanged. + :param plan: The new value for :py:attr:`_FictionalUser.plan`; + :py:data:`Unchanged` if the property is unchanged. """ + + def __init__( + self, + login: str, + name: _Union[str, None, _UnchangedType] = Unchanged, + home: _Union[str, None, _UnchangedType] = Unchanged, + shell: _Union[str, None, _UnchangedType] = Unchanged, + office: _Union[str, None, _UnchangedType] = Unchanged, + plan: _Union[str, None, _UnchangedType] = Unchanged, + ): + super().__init__() + self._login = login + self._name = name + self._home = home + self._shell = shell + self._office = office + self._plan = plan + + def __repr__(self): + p = ( + f'{x} = {getattr(self, x)!r}' + for x in ('login', 'name', 'home', 'shell', 'office', 'plan')) + return f"{self.__class__.__name__}({', '.join(p)})" + + @property + def login(self) -> str: + """ The login of the user that is being edited. """ + + return self._login + + @property + def name(self) -> _Union[str, None, _UnchangedType]: + """ The new value for :py:attr:`_FictionalUser.name`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._name + + @property + def home(self) -> _Union[str, None, _UnchangedType]: + """ The new value for :py:attr:`_FictionalUser.home`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._home + + @property + def shell(self) -> _Union[str, None, _UnchangedType]: + """ The new value for :py:attr:`_FictionalUser.shell`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._shell + + @property + def office(self) -> _Union[str, None, _UnchangedType]: + """ The new value for :py:attr:`_FictionalUser.office`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._office + + @property + def plan(self) -> _Union[str, None, _UnchangedType]: + """ The new value for :py:attr:`_FictionalUser.plan`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._plan + class FingerUserDeletionAction(FingerAction): - """ A user has been deleted. + """ A user has been deleted. + + :param login: The login of the user to delete. """ - :param login: The login of the user to delete. """ + def __init__(self, login: str): + super().__init__() + self._login = login - def __init__(self, login: str): - super().__init__() - self._login = login + def __repr__(self): + p = (f'{x} = {getattr(self, x)!r}' for x in ('login')) + return f"{self.__class__.__name__}({', '.join(p)})" - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login')) - return f"{self.__class__.__name__}({', '.join(p)})" + @property + def login(self) -> str: + """ The user's login. """ - @property - def login(self) -> str: - """ The user's login. """ + return self._login - return self._login class FingerUserLoginAction(FingerAction): - """ A user has logged in. + """ A user has logged in. + + :param login: The login of the user to edit. + :param session_name: The name of the session to create. + :param line: The new value for :py:attr:`_FictionalSession.line`. + :param host: The new value for :py:attr:`_FictionalSession.host`. """ - :param login: The login of the user to edit. - :param session_name: The name of the session to create. - :param line: The new value for :py:attr:`_FictionalSession.line`. - :param host: The new value for :py:attr:`_FictionalSession.host`. """ + def __init__( + self, + login: str, + session_name: _Union[str, None] = None, + line: _Union[str, None] = None, + host: _Union[str, None] = None, + ): + super().__init__() + self._login = login + self._session = session_name + self._line = line + self._host = host - def __init__(self, login: str, - session_name: _Union[str, None] = None, - line: _Union[str, None] = None, - host: _Union[str, None] = None): - super().__init__() - self._login = login - self._session = session_name - self._line = line - self._host = host + def __repr__(self): + p = ( + f'{x} = {getattr(self, x)!r}' + for x in ('login', 'session_name')) + return f"{self.__class__.__name__}({', '.join(p)})" - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login', - 'session_name')) - return f"{self.__class__.__name__}({', '.join(p)})" + @property + def login(self) -> str: + """ The login of the user to edit """ - @property - def login(self) -> str: - """ The login of the user to edit """ + return self._login - return self._login + @property + def session_name(self) -> _Union[str, None]: + """ The name of the session to create. """ - @property - def session_name(self) -> _Union[str, None]: - """ The name of the session to create. """ + return self._session - return self._session + @property + def line(self) -> _Union[str, None]: + """ The name of the line from which the user has logged in. """ - @property - def line(self) -> _Union[str, None]: - """ The name of the line from which the user has logged in. """ + return self._line - return self._line + @property + def host(self) -> _Union[str, None]: + """ The name of the host from which the user has logged in. """ - @property - def host(self) -> _Union[str, None]: - """ The name of the host from which the user has logged in. """ + return self._host - return self._host class FingerUserSessionChangeAction(FingerAction): - """ A user session has undergone modifications. + """ A user session has undergone modifications. - :param login: The login of the user to edit. - :param session_name: The name of the session to edit. - :param is_idle: The new value for - :py:attr:`_FictionalSession.is_idle`; - :py:data:`Unchanged` if the property is unchanged. """ + :param login: The login of the user to edit. + :param session_name: The name of the session to edit. + :param is_idle: The new value for + :py:attr:`_FictionalSession.is_idle`; + :py:data:`Unchanged` if the property is unchanged. """ - def __init__(self, login: str, - session_name: _Union[str, None] = None, - idle: _Union[bool, _UnchangedType] = Unchanged): - super().__init__() - self._login = login - self._session = session_name - self._idle = bool(idle) + def __init__( + self, + login: str, + session_name: _Optional[str] = None, + idle: _Union[bool, _UnchangedType] = Unchanged, + ): + super().__init__() + self._login = login + self._session = session_name + self._idle = bool(idle) - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login', - 'session_name', 'idle')) - return f"{self.__class__.__name__}({', '.join(p)})" + def __repr__(self): + p = ( + f'{x} = {getattr(self, x)!r}' + for x in ('login', 'session_name', 'idle')) + return f"{self.__class__.__name__}({', '.join(p)})" - @property - def login(self) -> str: - """ The login of the user to edit. """ + @property + def login(self) -> str: + """ The login of the user to edit. """ - return self._login + return self._login - @property - def session_name(self) -> _Union[str, None]: - """ The name of the session to edit. """ + @property + def session_name(self) -> _Union[str, None]: + """ The name of the session to edit. """ - return self._session + return self._session - @property - def idle(self) -> _Union[bool, _UnchangedType]: - """ The new value for :py:attr:`_FictionalSession.is_idle`; - :py:data:`Unchanged` if the property is unchanged. """ + @property + def idle(self) -> _Union[bool, _UnchangedType]: + """ The new value for :py:attr:`_FictionalSession.is_idle`; + :py:data:`Unchanged` if the property is unchanged. """ + + return self._idle - return self._idle class FingerUserLogoutAction(FingerAction): - """ A user has logged out. + """ A user has logged out. + + :param login: The login of the user to edit. + :param session_name: The name of the session to delete. """ - :param login: The login of the user to edit. - :param session_name: The name of the session to delete. """ + def __init__( + self, + login: str, + session_name: _Optional[str] = None, + ): + super().__init__() + self._login = login + self._session = session_name - def __init__(self, login: str, - session_name: _Union[str, None] = None): - super().__init__() - self._login = login - self._session = session_name + def __repr__(self): + p = ( + f'{x} = {getattr(self, x)!r}' + for x in ('login', 'session_name')) + return f'{self.__class__.__name__}({", ".join(p)})' - def __repr__(self): - p = (f"{x} = {repr(getattr(self, x))}" for x in ('login', - 'session_name')) - return f"{self.__class__.__name__}({', '.join(p)})" + @property + def login(self) -> str: + """ The login of the user to edit. """ - @property - def login(self) -> str: - """ The login of the user to edit. """ + return self._login - return self._login + @property + def session_name(self) -> _Union[str, None]: + """ The name of the session to delete. """ - @property - def session_name(self) -> _Union[str, None]: - """ The name of the session to delete. """ + return self._session - return self._session # --- # Base fiction interface. # --- + class FingerFictionInterface(_FingerInterface): - """ Base finger fiction interface for managing a scene and updating - it using sequences of actions and given times. - - The basic state for this class is to have no users; it is possible - at any point in time to - - This class should be subclassed for interfaces specialized in various - sources for the data; for example, while - :py:class:`FingerScenarioInterface` is specialized in using a - static sequence of actions, another class could read events from - a live source. """ - - def __init__(self, start: _Optional[_dt] = None): - if start is None: - start = _dt.now().astimezone() - - super().__init__() - - # Initialize the object properties. - # - `users`: the users. - # - `lasttime`: the last datetime, in order to raise an - # exception if not applied in order. - - self._users = {} - self._lasttime = None - - # --- - # Expected methods from an interface. - # --- - - def search_users(self, query: _Optional[str], - active: _Optional[bool]) -> _Sequence[_FictionalUser]: - """ Look for users according to a check. """ - - return ([_copy.deepcopy(user) for user in self._users.values() - if not (query is not None and query not in user.login) - and not (active is not None and active != bool(user.sessions))]) - - # --- - # Elements proper to the fiction interface. - # --- - - def reset(self): - """ Reset the interface (revert all actions). """ - - self._users = {} - self._lasttime = None - - def apply(self, action, time: _Optional[_dt] = None): - """ Apply an action. """ - - if time is None: - time = _dt.now().astimezone() - - if self._lasttime is not None and self._lasttime > time: - raise ValueError("operations weren't applied in order!") - - self._lasttime = time - - if isinstance(action, FingerUserCreationAction): - # Create user `action.user`. - - if action.login is None: - raise ValueError("missing login") - if action.login in self._users: - raise ValueError("already got a user with that login") - - user = _FictionalUser(login = action.login, name = action.name) - user.shell = action.shell - user.home = action.home - user.office = action.office - user.plan = action.plan - - self._users[user.login] = user - elif isinstance(action, FingerUserEditionAction): - # Edit user `action.user` with the given modifications. - - if action.login is None: - raise ValueError("missing login") - if not action.login in self._users: - raise ValueError("got no user with login " - f"{repr(action.login)}") - - user = self._users[action.login] - if action.name is not Unchanged: - user.name = action.name - if action.shell is not Unchanged: - user.shell = action.shell - if action.home is not Unchanged: - user.home = action.home - if action.office is not Unchanged: - user.office = action.office - if action.plan is not Unchanged: - user.plan = action.plan - elif isinstance(action, FingerUserDeletionAction): - # Delete user with login `action.login`. - - if action.login is None: - raise ValueError("missing login") - if not action.login in self._users: - raise ValueError("got no user with login " - f"{repr(action.login)}") - - del self._users[action.login] - elif isinstance(action, FingerUserLoginAction): - # Login as user `action.login` with session `action.session_name`. - - session = _FictionalSession( - time = time, - name = action.session_name) - session.line = action.line - session.host = action.host - - if action.login is None: - raise ValueError("missing login") - - try: - user = self._users[action.login] - except KeyError: - raise ValueError("got no user with login " - f"{repr(action.login)}") from None - - # We don't check if the session exists or not; multiple - # sessions can have the same name, we just act on the last - # inserted one that still exists and has that name. - - user.sessions.add(session) - if user.last_login is None or user.last_login < session.start: - user.last_login = session.start - elif isinstance(action, FingerUserLogoutAction): - # Logout as user `action.login` from session `action.session_name`. - - if action.login is None: - raise ValueError("missing login") - - try: - user = self._users[action.login] - except KeyError: - raise ValueError("got no user with login " \ - f"{repr(action.login)}") from None - - try: - del user.sessions[action.session_name] - except (KeyError, IndexError): - raise ValueError(f"got no session {repr(action.name)} " - f"for user {repr(action.login)}") from None - elif isinstance(action, FingerUserSessionChangeAction): - # Make user with login `action.login` idle. - - if action.login is None: - raise ValueError("missing login") - - try: - user = self._users[action.login] - except KeyError: - raise ValueError("got no user with login " \ - f"{repr(action.login)}") from None - - try: - session = user.sessions[action.session_name] - except (KeyError, IndexError): - raise ValueError(f"got no session {repr(action.name)} " - f"for user {repr(action.login)}") from None - - since = time - if action.idle is Unchanged: - pass - elif action.idle: - session.idle_since = since - else: - session.active_since = since + """ Base finger fiction interface for managing a scene and updating + it using sequences of actions and given times. + + The basic state for this class is to have no users; it is possible + at any point in time to (TODO) + + This class should be subclassed for interfaces specialized in various + sources for the data; for example, while + :py:class:`FingerScenarioInterface` is specialized in using a + static sequence of actions, another class could read events from + a live source. """ + + def __init__(self, start: _Optional[_dt] = None): + if start is None: + start = _dt.now().astimezone() + + super().__init__() + + # Initialize the object properties. + # - `users`: the users. + # - `lasttime`: the last datetime, in order to raise an + # exception if not applied in order. + + self._users = {} + self._lasttime = None + + # --- + # Expected methods from an interface. + # --- + + def search_users( + self, + query: _Optional[str], + active: _Optional[bool], + ) -> _Sequence[_FictionalUser]: + """ Look for users according to a check. """ + + return ([ + _copy.deepcopy(user) for user in self._users.values() + if not (query is not None and query not in user.login) + and not (active is not None and active != bool(user.sessions))]) + + # --- + # Elements proper to the fiction interface. + # --- + + def reset(self): + """ Reset the interface (revert all actions). """ + + self._users = {} + self._lasttime = None + + def apply(self, action, time: _Optional[_dt] = None): + """ Apply an action. """ + + if time is None: + time = _dt.now().astimezone() + + if self._lasttime is not None and self._lasttime > time: + raise ValueError("operations weren't applied in order!") + + self._lasttime = time + + if isinstance(action, FingerUserCreationAction): + # Create user `action.user`. + + if action.login is None: + raise ValueError('missing login') + if action.login in self._users: + raise ValueError('already got a user with that login') + + user = _FictionalUser(login=action.login, name=action.name) + user.shell = action.shell + user.home = action.home + user.office = action.office + user.plan = action.plan + + self._users[user.login] = user + elif isinstance(action, FingerUserEditionAction): + # Edit user `action.user` with the given modifications. + + if action.login is None: + raise ValueError('missing login') + if action.login not in self._users: + raise ValueError( + f'got no user with login {action.login!r}', + ) + + user = self._users[action.login] + if action.name is not Unchanged: + user.name = action.name + if action.shell is not Unchanged: + user.shell = action.shell + if action.home is not Unchanged: + user.home = action.home + if action.office is not Unchanged: + user.office = action.office + if action.plan is not Unchanged: + user.plan = action.plan + elif isinstance(action, FingerUserDeletionAction): + # Delete user with login `action.login`. + + if action.login is None: + raise ValueError('missing login') + if action.login not in self._users: + raise ValueError( + f'got no user with login {action.login!r}', + ) + + del self._users[action.login] + elif isinstance(action, FingerUserLoginAction): + # Login as user `action.login` with session `action.session_name`. + + session = _FictionalSession( + time=time, + name=action.session_name, + ) + + session.line = action.line + session.host = action.host + + if action.login is None: + raise ValueError('missing login') + + try: + user = self._users[action.login] + except KeyError: + raise ValueError( + f'got no user with login {action.login!r}', + ) from None + + # We don't check if the session exists or not; multiple + # sessions can have the same name, we just act on the last + # inserted one that still exists and has that name. + + user.sessions.add(session) + if user.last_login is None or user.last_login < session.start: + user.last_login = session.start + elif isinstance(action, FingerUserLogoutAction): + # Logout as user `action.login` from session `action.session_name`. + + if action.login is None: + raise ValueError('missing login') + + try: + user = self._users[action.login] + except KeyError: + raise ValueError( + f'got no user with login {action.login!r}', + ) from None + + try: + del user.sessions[action.session_name] + except (KeyError, IndexError): + raise ValueError( + f'got no session {action.name!r} ' + f'for user {action.login!r}') from None + elif isinstance(action, FingerUserSessionChangeAction): + # Make user with login `action.login` idle. + + if action.login is None: + raise ValueError('missing login') + + try: + user = self._users[action.login] + except KeyError: + raise ValueError( + 'got no user with login ' + f'{action.login!r}', + ) from None + + try: + session = user.sessions[action.session_name] + except (KeyError, IndexError): + raise ValueError( + f'got no session {action.name!r} ' + f'for user {action.login!r}') from None + + since = time + if action.idle is Unchanged: + pass + elif action.idle: + session.idle_since = since + else: + session.active_since = since + # --- # Scenario definition and related interface definition. # --- -class FingerScenario: - """ Scenario for fingerd, consisting of actions (as instances of subclasses - of :py:class:`FingerAction`) located at a given timedelta, with - a given ending type and time. - - A scenario always uses timedeltas and not datetimes, since it can - start at any arbitrary point in time and some scenarios are even - on repeat. """ - - class EndingType(_Enum): - """ Ending type. - - .. data:: FREEZE - - Freeze the end state forever. - - .. data:: STOP - - Stop the server as soon as the scenario has reached - the ending time. - - .. data:: REPEAT - - Repeat the scenario from the beginning while - starting again from the initial state. """ - - FREEZE = 0 - STOP = 1 - REPEAT = 2 - - def __init__(self): - # Initialize the properties. - self._end_type = None - self._end_time = None - self._actions = [] - - def verify(self) -> None: - """ Verify that the current fiction is valid. - Raises ValueError in case it is not. """ - - # Check if the ending is well defined. - - if self._end_time is None: - raise ValueError("ending time (duration) has not been provided") - if not isinstance(self._end_type, self.EndingType): - raise ValueError("ending type has not been provided") - - # Check if the events are coherent. - - users = _defaultdict(lambda: False) - sessions = _defaultdict(lambda: _defaultdict(lambda: 0)) - - for i, (time, action, _) in enumerate(self._actions): - try: - if time >= self._end_time: - # Action will be ignored. - - pass - elif isinstance(action, FingerUserCreationAction): - if users[action.login]: - # The user we're trying to create already exists. - - raise ValueError("trying to create user " - f"{repr(action.login)} which already exists") - - users[action.login] = True - elif isinstance(action, FingerUserEditionAction): - if not users[action.login]: - # The user we're trying to edit doesn't exist. - - raise ValueError("trying to edit user " - f"{repr(action.login)} while it doesn't exist") - elif isinstance(action, FingerUserDeletionAction): - if not users[action.login]: - # The user we're trying to delete doesn't exist. +class FingerScenario: + """ Scenario for fingerd, consisting of actions (as instances of subclasses + of :py:class:`FingerAction`) located at a given timedelta, with + a given ending type and time. - raise ValueError("trying to delete user " - f"{repr(action.login)} while it doesn't exist") + A scenario always uses timedeltas and not datetimes, since it can + start at any arbitrary point in time and some scenarios are even + on repeat. """ + + class EndingType(_Enum): + """ Ending type. - del users[action.login] - del sessions[action.login] - elif isinstance(action, FingerUserLoginAction): - if not users[action.login]: - # The user we're trying to log in as doesn't exist. - - raise ValueError("trying to log in as user " - f"{repr(action.login)} which doesn't exist") - - sessions[action.login][action.session_name] += 1 - elif isinstance(action, FingerUserSessionChangeAction): - if sessions[action.login][action.session_name] <= 0: - # The user doesn't exist (anymore?) or the session - # does not exist. - - if users[action.login]: - raise ValueError("trying to change non existing " - f"session of user {repr(action.login)}") - else: - raise ValueError("trying to change session of " - f"non-existing user {repr(action.login)}") - elif isinstance(action, FingerUserLogoutAction): - if sessions[action.login][action.session_name] <= 0: - # The user doesn't exist (anymore?) or the session - # does not exist. - - if users[action.login]: - raise ValueError("trying to delete non existing " - f"session of user {repr(action.login)}") - else: - raise ValueError("trying to delete session of " - f"non-existing user {repr(action.login)}") - - sessions[action.login][action.session_name] -= 1 - except ValueError as e: - raise ValueError(f"at action #{i} at {_format_delta(time)}: " - f"{str(e)}") from None - - def load(self, path: str) -> None: - """ Decodes the content of a scenario in TOML format and, if - successful, stores the result in the current object for - later usage. - - :param path: Path of the TOML file to load. """ - - global _toml - - actions = [] - end_type = None - end_time = None - - # Load the required modules. - - if _toml is None: - try: - import toml - except ModuleNotFoundError: - raise ModuleNotFoundError("'toml' module " - "required") from None - - _toml = toml - del toml - - # Read the document and translate all of the timestamps. - - document = _toml.load(path) - i = 0 - - for key in document.keys(): - time = _parse_delta(key) - if time is None: - raise ValueError(f"found invalid time: {repr(key)}") - - if not isinstance(document[key], list): - raise ValueError(f"time {repr(key)} is not an array, " - f"you have probably written [{key}] instead of " - f"[[{key}]]") - - for j, data in enumerate(document[key]): - try: - typ = data['type'] - - if typ in ('interrupt', 'freeze', 'stop', 'repeat'): - # Set the ending type and time. - - if end_time is None or end_time > time: - end_type = { - 'interrupt': self.EndingType.FREEZE, - 'freeze': self.EndingType.FREEZE, - 'stop': self.EndingType.STOP, - 'repeat': self.EndingType.REPEAT}[typ] - end_time = time - - continue - elif typ == 'create': - # User creation. - - plan = None - if 'plan' in data: - pp = _path.join(_path.dirname(path), data['plan']) - plan = open(pp).read() - - action = FingerUserCreationAction( - login = data['login'], - name = data.get('name'), - shell = data.get('shell'), - home = data.get('home'), - office = data.get('office'), - plan = plan) - elif typ == 'update': - # User update. - - plan = Unchanged - if 'plan' in data: - if data['plan'] is False: - plan = None - else: - pp = _path.join(_path.dirname(path), - data['plan']) - plan = open(pp).read() - - g = (lambda k: None if k in data - and data[k] is False - else data.get(k, Unchanged)) - - action = FingerUserEditionAction( - login = data['login'], - name = g('name'), - shell = g('shell'), - home = g('home'), - office = g('office'), - plan = plan) - elif typ == 'delete': - # User deletion. - - action = FingerUserDeletionAction( - login = data['login']) - elif typ == 'login': - # User login. - - action = FingerUserLoginAction( - login = data['login'], - session_name = data.get('session'), - line = data.get('line'), - host = data.get('host')) - elif typ == 'logout': - # User logout. - - action = FingerUserLogoutAction( - login = data['login'], - session_name = data.get('session')) - elif typ in ('idle', 'active'): - # Idle change status. - - action = FingerUserSessionChangeAction( - login = data['login'], - session_name = data.get('session'), - idle = (typ == 'idle')) - else: - raise ValueError(f"invalid action type {repr(typ)}") - - actions.append((time, action, i)) - i += 1 - except Exception as e: - raise ValueError(f"at action #{j + 1} at " - f"{_format_delta(time)}: {str(e)}") from None - - # Sort and check the actions. - - actions.sort(key = lambda x: (x[0], x[2])) - - if end_type is None: - # If no ending was given in the script file, we ought to - # interrupt 10 seconds after the last action. - - end_type = self.EndingType.FREEZE - end_time = actions[-1][0] + _td(seconds = 10) - else: - # Otherwise, we are removing actions that are after the ending. - - actions = [a for a in actions if a[0] <= end_time] - - # FIXME: check that incompatible actions, such as double creation - # for a user, doesn't occur. - - self._end_type = end_type - self._end_time = end_time - self._actions = actions - - def get(self, to: _Optional[_td] = None, - since: _Optional[_td] = None) -> _Sequence[FingerAction]: - """ Returns a sequence of actions in order from the scenario. - - :param to: Maximum timedelta for the actions to gather. - :param since: Minimum timedelta for the actions to gather. - :return: The sequence of actions that occur and respect - the given constraints. """ - - if since is not None and to is not None and since > to: - raise ValueError(f"`since` ({since}) should be " \ - f"before `to` ({to}).") - - for time, action, _ in self._actions: - if since is not None and since >= time: - continue - if to is not None and time > to: - continue - if self._end_time is not None and time >= self._end_time: - continue - yield time, action - - def add(self, action: FingerAction, time: _td): - """ Add an action in the registered actions. """ - - try: - index = max(x[2] for x in self._actions) - except ValueError: - index = 0 - - self._actions.append((time, action, index + 1)) - self._actions.sort(key = lambda x: (x[0], x[2])) - - @property - def type(self) -> EndingType: - """ Type of action flow, either 'interrupt', 'stop' or 'repeat'. """ - - return self._end_type - - @type.setter - def type(self, value: EndingType) -> None: - if value is None: - self._end_type = None - return - - try: - value = self.EndingType(value) - except ValueError: - try: - if isinstance(value, str): - value = value.casefold() - value = { - None: None, - 'interrupt': self.EndingType.FREEZE, - 'freeze': self.EndingType.FREEZE, - 'stop': self.EndingType.STOP, - 'repeat': self.EndingType.REPEAT}[value] - except KeyError: - raise TypeError("invalid value for ending type: " - f"{repr(value)}") - - self._end_type = value - - @property - def duration(self) -> _td: - """ Maximum offset. """ - - return self._end_time - - @duration.setter - def duration(self, value: _Optional[_td]) -> None: - if isinstance(value, _td): - self._end_time = value - return - if value is None: - self._end_time = None - return - - try: - self._end_time = _td(seconds = value) - except TypeError: - raise TypeError(f"Invalid end time value: {repr(value)}") + .. py:data:: FREEZE + + Freeze the end state forever. + + .. py:data:: STOP + + Stop the server as soon as the scenario has reached + the ending time. + + .. py:data:: REPEAT + + Repeat the scenario from the beginning while + starting again from the initial state. """ + + FREEZE = 0 + STOP = 1 + REPEAT = 2 + + def __init__(self): + # Initialize the properties. + + self._end_type = None + self._end_time = None + self._actions = [] + + def verify(self) -> None: + """ Verify that the current fiction is valid. + Raises ValueError in case it is not. """ + + # Check if the ending is well defined. + + if self._end_time is None: + raise ValueError('ending time (duration) has not been provided') + if not isinstance(self._end_type, self.EndingType): + raise ValueError('ending type has not been provided') + + # Check if the events are coherent. + + users = _defaultdict(lambda: False) + sessions = _defaultdict(lambda: _defaultdict(lambda: 0)) + + for i, (time, action, _) in enumerate(self._actions): + try: + if time >= self._end_time: + # Action will be ignored. + + pass + elif isinstance(action, FingerUserCreationAction): + if users[action.login]: + # The user we're trying to create already exists. + + raise ValueError( + 'trying to create user ' + f'{action.login!r} which already exists') + + users[action.login] = True + elif isinstance(action, FingerUserEditionAction): + if not users[action.login]: + # The user we're trying to edit doesn't exist. + + raise ValueError( + 'trying to edit user ' + f"{action.login!r} while it doesn't exist") + elif isinstance(action, FingerUserDeletionAction): + if not users[action.login]: + # The user we're trying to delete doesn't exist. + + raise ValueError( + 'trying to delete user ' + f"{action.login!r} while it doesn't exist") + + del users[action.login] + del sessions[action.login] + elif isinstance(action, FingerUserLoginAction): + if not users[action.login]: + # The user we're trying to log in as doesn't exist. + + raise ValueError( + 'trying to log in as user ' + f"{action.login!r} which doesn't exist") + + sessions[action.login][action.session_name] += 1 + elif isinstance(action, FingerUserSessionChangeAction): + if sessions[action.login][action.session_name] <= 0: + # The user doesn't exist (anymore?) or the session + # does not exist. + + if users[action.login]: + raise ValueError( + 'trying to change non existing ' + f'session of user {action.login!r}') + else: + raise ValueError( + 'trying to change session of ' + f'non-existing user {action.login!r}') + elif isinstance(action, FingerUserLogoutAction): + if sessions[action.login][action.session_name] <= 0: + # The user doesn't exist (anymore?) or the session + # does not exist. + + if users[action.login]: + raise ValueError( + 'trying to delete non existing ' + f'session of user {action.login!r}') + else: + raise ValueError( + 'trying to delete session of ' + f'non-existing user {action.login!r}') + + sessions[action.login][action.session_name] -= 1 + except ValueError as e: + raise ValueError( + f'at action #{i} at {_format_delta(time)}: ' + f'{e!s}', + ) from None + + def load(self, path: str) -> None: + """ Decodes the content of a scenario in TOML format and, if + successful, stores the result in the current object for + later usage. + + :param path: Path of the TOML file to load. """ + + global _toml + + actions = [] + end_type = None + end_time = None + + # Load the required modules. + + if _toml is None: + try: + import toml + except ModuleNotFoundError: + raise ModuleNotFoundError( + "'toml' module required") from None + + _toml = toml + del toml + + # Read the document and translate all of the timestamps. + + document = _toml.load(path) + i = 0 + + for key in document.keys(): + time = _parse_delta(key) + if time is None: + raise ValueError(f'found invalid time: {key!r}') + + if not isinstance(document[key], list): + raise ValueError( + f'time {key!r} is not an array, ' + f'you have probably written [{key}] instead of ' + f'[[{key}]]', + ) + + for j, data in enumerate(document[key]): + try: + typ = data['type'] + + if typ in ('interrupt', 'freeze', 'stop', 'repeat'): + # Set the ending type and time. + + if end_time is None or end_time > time: + end_type = { + 'interrupt': self.EndingType.FREEZE, + 'freeze': self.EndingType.FREEZE, + 'stop': self.EndingType.STOP, + 'repeat': self.EndingType.REPEAT}[typ] + end_time = time + + continue + elif typ == 'create': + # User creation. + + plan = None + if 'plan' in data: + pp = _path.join(_path.dirname(path), data['plan']) + plan = open(pp).read() + + action = FingerUserCreationAction( + login=data['login'], + name=data.get('name'), + shell=data.get('shell'), # NOQA + home=data.get('home'), + office=data.get('office'), + plan=plan) + elif typ == 'update': + # User update. + + plan = Unchanged + if 'plan' in data: + if data['plan'] is False: + plan = None + else: + pp = _path.join( + _path.dirname(path), data['plan']) + plan = open(pp).read() + + def g(k): + if k in data and data[k] is False: + return None + return data.get(k, Unchanged) + + action = FingerUserEditionAction( + login=data['login'], + name=g('name'), + shell=g('shell'), # NOQA + home=g('home'), + office=g('office'), + plan=plan, + ) + elif typ == 'delete': + # User deletion. + + action = FingerUserDeletionAction( + login=data['login']) + elif typ == 'login': + # User login. + + action = FingerUserLoginAction( + login=data['login'], + session_name=data.get('session'), + line=data.get('line'), + host=data.get('host')) + elif typ == 'logout': + # User logout. + + action = FingerUserLogoutAction( + login=data['login'], + session_name=data.get('session')) + elif typ in ('idle', 'active'): + # Idle change status. + + action = FingerUserSessionChangeAction( + login=data['login'], + session_name=data.get('session'), + idle=(typ == 'idle'), + ) + else: + raise ValueError(f'invalid action type {typ!r}') + + actions.append((time, action, i)) + i += 1 + except Exception as e: + raise ValueError( + f'at action #{j + 1} at ' + f'{_format_delta(time)}: {e!s}') from None + + # Sort and check the actions. + + actions.sort(key=lambda x: (x[0], x[2])) + + if end_type is None: + # If no ending was given in the script file, we ought to + # interrupt 10 seconds after the last action. + + end_type = self.EndingType.FREEZE + end_time = actions[-1][0] + _td(seconds=10) + else: + # Otherwise, we are removing actions that are after the ending. + + actions = [a for a in actions if a[0] <= end_time] + + # FIXME: check that incompatible actions, such as double creation + # for a user, doesn't occur. + + self._end_type = end_type + self._end_time = end_time + self._actions = actions + + def get( + self, + to: _Optional[_td] = None, + since: _Optional[_td] = None, + ) -> _Sequence[FingerAction]: + """ Returns a sequence of actions in order from the scenario. + + :param to: Maximum timedelta for the actions to gather. + :param since: Minimum timedelta for the actions to gather. + :return: The sequence of actions that occur and respect + the given constraints. """ + + if since is not None and to is not None and since > to: + raise ValueError( + f'`since` ({since}) should be before `to` ({to}).') + + for time, action, _ in self._actions: + if since is not None and since >= time: + continue + if to is not None and time > to: + continue + if self._end_time is not None and time >= self._end_time: + continue + yield time, action + + def add(self, action: FingerAction, time: _td): + """ Add an action in the registered actions. """ + + try: + index = max(x[2] for x in self._actions) + except ValueError: + index = 0 + + self._actions.append((time, action, index + 1)) + self._actions.sort(key=lambda x: (x[0], x[2])) + + @property + def ending_type(self) -> EndingType: + """ Type of action flow, either 'interrupt', 'stop' or 'repeat'. """ + + return self._end_type + + @ending_type.setter + def ending_type(self, value: EndingType) -> None: + if value is None: + self._end_type = None + return + + try: + value = self.EndingType(value) + except ValueError: + try: + if isinstance(value, str): + value = value.casefold() + value = { + None: None, + 'interrupt': self.EndingType.FREEZE, + 'freeze': self.EndingType.FREEZE, + 'stop': self.EndingType.STOP, + 'repeat': self.EndingType.REPEAT}[value] + except KeyError: + raise TypeError( + f'invalid value for ending type: {value!r}') + + self._end_type = value + + @property + def duration(self) -> _td: + """ Maximum offset. """ + + return self._end_time + + @duration.setter + def duration(self, value: _Optional[_td]) -> None: + if isinstance(value, _td): + self._end_time = value + return + if value is None: + self._end_time = None + return + + try: + self._end_time = _td(seconds=value) + except TypeError: + raise TypeError(f'Invalid end time value: {value!r}') class FingerScenarioInterface(FingerFictionInterface): - """ Fiction interface, to follow actions written in a scenario. - - Subclasses :py:class:`FingerFictionInterface` and adds - a regular update method for updating the state according - to the given scenario. - - :param scenario: The scenario to follow using the given interface. - :param start: The start time at which to start; by default, the - current time is used. """ - - def __init__(self, scenario: FingerScenario, - start: _Optional[_dt] = None): - """ Initialize the interface. """ - - if start is None: - start = _dt.now().astimezone() - - super().__init__(start) - - # Initialize the object properties. - - if isinstance(scenario, FingerScenario): - scenario.verify() - scenario = _copy.copy(scenario) - elif scenario is not None: - raise TypeError("scenario should be a FingerScenario or None, " - f"is {scenario.__class__.__name__}.") - - # Initialize the object properties. - # - `scenario`: the script to follow. - # - `laststart`: the last registered start. - # - `lastdelta`: the last registered delta. - - self._scenario = scenario - self._start = start - self._laststart = None - self._lastdelta = None - - @_FingerInterface.regular(seconds = 2) - def update(self): - """ Update the state according to the scenario regularly. """ - - now = _dt.now().astimezone() - - # First, get the start and delta. - - start = self._start - - if (self._scenario is not None and - now > start + self._scenario.duration): - if self._scenario.type == FingerScenario.EndingType.STOP: - exit() - elif self._scenario.type == FingerScenario.EndingType.FREEZE: - delta = self._scenario.duration - else: - # We're on 'repeat', so we are going to have a slightly - # different start because we want the start of the new - # iteration. - # - # >>> from datetime import (datetime as dt - # timedelta as td) - # >>> a = dt(2000, 1, 1) - # >>> b = a + td(seconds = 27) - # >>> (b - a) % td(seconds = 10) - # datetime.timedelta(seconds=7) - # >>> b - (b - a) % td(seconds = 10) - # datetime.datetime(2000, 1, 1, 0, 0, 20) - # - # Let's see. - - start = now - (now - start) % self._scenario.duration - - # We're within the duration of the fiction, so we just use the - # offset from the start. - - delta = now - start - - # If last start was not the same or last start isn't initialized, - # let's start again from a blank slate. - # - # Previously we would have kept the events, but that wasn't good. - # Take the following examples: - # - # <first action> at +5s - # <second action> at +10s - # repeat at 15s - # - # Let's say our start is 01/01/2000 at 00:00:00. - # First request comes in at 00:00:07. - # Start is 00:00:00, delta is +7s. - # We apply <first action> at 00:00:05, and yield the state. - # Second request comes in at 00:00:26. - # Start is 00:00:15, delta is +11s. - # - # In this case, if we just compared deltas, the second one - # is further so that means we haven't started again and could - # just apply the second action at 00:00:25, which means both - # actions would not be separated by 20s instead of 5s; not - # the behaviour we want! - # - # So we look at if the start is the same or not. The question could - # also be if we need to still check the deltas, since time - # necessarily goes forward; but the local time on the system - # could be set backwards during the period, throwing everything off! - # - # The conclusion to all of this is, we need to check both the start - # and the delta. - - if (self._laststart is None or self._lastdelta is None or - self._laststart != start or self._lastdelta > delta): - self.reset() - self._lastdelta = None - - # Then, we apply the actions up to the current time. - - if self._scenario is not None: - for time, action in self._scenario.get(to = delta, - since = self._lastdelta): - self.apply(action, start + time) - - # Finally, we can keep track of where we were. - - self._laststart = start - self._lastdelta = delta + """ Fiction interface, to follow actions written in a scenario. + + Subclasses :py:class:`FingerFictionInterface` and adds + a regular update method for updating the state according + to the given scenario. + + :param scenario: The scenario to follow using the given interface. + :param start: The start time at which to start; by default, the + current time is used. """ + + def __init__( + self, + scenario: FingerScenario, + start: _Optional[_dt] = None, + ): + """ Initialize the interface. """ + + if start is None: + start = _dt.now().astimezone() + + super().__init__(start) + + # Initialize the object properties. + + if isinstance(scenario, FingerScenario): + scenario.verify() + scenario = _copy.copy(scenario) + elif scenario is not None: + raise TypeError( + 'scenario should be a FingerScenario or None, ' + f'is {scenario.__class__.__name__}.') + + # Initialize the object properties. + # - `scenario`: the script to follow. + # - `laststart`: the last registered start. + # - `lastdelta`: the last registered delta. + + self._scenario = scenario + self._start = start + self._laststart = None + self._lastdelta = None + + @_cron('* * * * * *') + def update(self): + """ Update the state according to the scenario every second. """ + + now = _dt.now().astimezone() + + # First, get the start and delta. + + start = self._start + + if ( + self._scenario is not None + and now > start + self._scenario.duration + ): + ending_type = self._scenario.ending_type + if ending_type == FingerScenario.EndingType.STOP: + exit() + elif ending_type == FingerScenario.EndingType.FREEZE: + delta = self._scenario.duration + else: + # We're on 'repeat', so we are going to have a slightly + # different start because we want the start of the new + # iteration. + # + # >>> from datetime import (datetime as dt + # timedelta as td) + # >>> a = dt(2000, 1, 1) + # >>> b = a + td(seconds = 27) + # >>> (b - a) % td(seconds = 10) + # datetime.timedelta(seconds=7) + # >>> b - (b - a) % td(seconds = 10) + # datetime.datetime(2000, 1, 1, 0, 0, 20) + # + # Let's see. + + start = now - (now - start) % self._scenario.duration + + # We're within the duration of the fiction, so we just use the + # offset from the start. + + delta = now - start + + # If last start was not the same or last start isn't initialized, + # let's start again from a blank slate. + # + # Previously we would have kept the events, but that wasn't good. + # Take the following examples: + # + # <first action> at +5s + # <second action> at +10s + # repeat at 15s + # + # Let's say our start is 01/01/2000 at 00:00:00. + # First request comes in at 00:00:07. + # Start is 00:00:00, delta is +7s. + # We apply <first action> at 00:00:05, and yield the state. + # Second request comes in at 00:00:26. + # Start is 00:00:15, delta is +11s. + # + # In this case, if we just compared deltas, the second one + # is further so that means we haven't started again and could + # just apply the second action at 00:00:25, which means both + # actions would not be separated by 20s instead of 5s; not + # the behaviour we want! + # + # So we look at if the start is the same or not. The question could + # also be if we need to still check the deltas, since time + # necessarily goes forward; but the local time on the system + # could be set backwards during the period, throwing everything off! + # + # The conclusion to all of this is, we need to check both the start + # and the delta. + + if ( + self._laststart is None or self._lastdelta is None + or self._laststart != start or self._lastdelta > delta + ): + self.reset() + self._lastdelta = None + + # Then, we apply the actions up to the current time. + + if self._scenario is not None: + for time, action in self._scenario.get( + to=delta, since=self._lastdelta, + ): + self.apply(action, start + time) + + # Finally, we can keep track of where we were. + + self._laststart = start + self._lastdelta = delta # End of file. diff --git a/fingerd/native.py b/fingerd/native.py index e1b0b6b..11d981c 100644..100755 --- a/fingerd/native.py +++ b/fingerd/native.py @@ -1,26 +1,27 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ Defining the native interface. """ from .core import FingerInterface as _FingerInterface __all__ = ['FingerNativeInterface'] + class _FingerNoNativeFoundInterface(_FingerInterface): - """ Placeholder that doesn't initiate. """ + """ Placeholder that doesn't initiate. """ - def __init__(self, *args, **kwargs): - raise NotImplementedError("Could not find a suitable " \ - "native interface.") + def __init__(self, *args, **kwargs): + raise NotImplementedError( + 'Could not find a suitable native interface.', + ) -from .posix import FingerPOSIXInterface as FingerNativeInterface -#try: -# from .posix import FingerPOSIXInterface as FingerNativeInterface -#except (ImportError, ModuleNotFoundError): -# FingerNativeInterface = _FingerNoNativeFoundInterface +try: + from .posix import FingerPOSIXInterface as FingerNativeInterface # NOQA +except (ImportError, ModuleNotFoundError): + FingerNativeInterface = _FingerNoNativeFoundInterface # End of file. diff --git a/fingerd/posix.py b/fingerd/posix.py index 8cb6ace..c626be2 100644..100755 --- a/fingerd/posix.py +++ b/fingerd/posix.py @@ -1,155 +1,165 @@ #!/usr/bin/env python3 -#****************************************************************************** +# ***************************************************************************** # Copyright (C) 2017-2018 Thomas "Cakeisalie5" Touhey <thomas@touhey.fr> # This file is part of the fingerd Python 3.x module, which is MIT-licensed. -#****************************************************************************** +# ***************************************************************************** """ Make use of the utmp/x file to read the user data. - This will be done using the `pyutmpx` Python module. """ + This will be done using the `pyutmpx` Python module. """ -import posix as _posix, pwd as _pwd +import pwd as _pwd -from os import stat as _stat -from os.path import exists as _exists, join as _joinpaths +from collections.abc import Sequence as _Sequence from copy import copy as _copy from datetime import datetime as _dt from multiprocessing import Lock as _Lock +from os import stat as _stat +from os.path import exists as _exists, join as _joinpaths from typing import Optional as _Optional -from collections.abc import Sequence as _Sequence +from pytz import utc as _utc import pyutmpx as _pyutmpx -from pytz import utc as _utc +from .core import ( + FingerInterface as _FingerInterface, + FingerSession as _FingerSession, + FingerUser as _FingerUser, +) -from .core import (FingerInterface as _FingerInterface, - FingerUser as _FingerUser, FingerSession as _FingerSession) +__all__ = ['FingerPOSIXInterface'] -__all__ = ["FingerPOSIXInterface"] class FingerPOSIXInterface(_FingerInterface): - """ Native finger interface for POSIX-compliant OSes we know, - using utmp and wtmp files. """ - - def __init__(self): - self._data = [] - self._lastrefreshtime = None - self._lock = _Lock() - - def search_users(self, query: _Optional[str], - active: _Optional[bool]) -> _Sequence[_FingerUser]: - """ Look for users given a check. """ - - self._lock.acquire() - - # Refresh the user list if required. - - if (self._lastrefreshtime is None or - abs((self._lastrefreshtime - _dt.now()).total_seconds()) >= 1): - # The method for gathering users and sessions in POSIX systems - # is the following: - # - # 1. Get the users in /etc/passwd, and check which ones not - # to make appear through the presence of ``.nofinger`` - # in their home directory. - # 2. Get the sessions in utmpx, and make them correspond to - # the related user. - # 3. For each session, get the idle time by gathering the - # mtime of the device. - - users = {} - usernames_by_id = {} - - for pw in _pwd.getpwall(): - usernames_by_id[pw.pw_uid] = pw.pw_name - - if _exists(_joinpaths(pw.pw_dir, '.nofinger')): - continue - - gecos = pw.pw_gecos.split(',') - user = _FingerUser( - login = pw.pw_name, - name = gecos[0]) - user.shell = pw.pw_shell - user.home = pw.pw_dir - - if len(gecos) >= 2: - user.office = gecos[1] - try: - with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan: - user.plan = plan.read() - except (FileNotFoundError, PermissionError): - pass - - users[user.login] = user - - try: - lastlog = _pyutmpx.lastlog - except AttributeError: - lastlog = None - - if lastlog is not None: - for lle in lastlog: - try: - login = usernames_by_id[lle.uid] - except KeyError: - continue - - try: - user = users[login] - except KeyError: - continue - - time = lle.time.replace(tzinfo = _utc) - - user.last_login = time - - try: - utmp = _pyutmpx.utmp - except AttributeError: - utmp = None - - if utmp is not None: - for ue in utmp: - if ue.type != _pyutmpx.USER_PROCESS: - continue - try: - user = users[ue.user] - except KeyError: - continue - - session = _FingerSession( - time = ue.time.replace(tzinfo = _utc)) - if ue.line: - session.line = ue.line - if ue.host: - session.host = ue.host - - session.idle = _dt.now() - if ue.line and not ue.line.startswith(':'): - dev_path = (("", "/dev/")[ue.line[0] != '/'] - + ue.line) - atime = _dt.fromtimestamp(_stat(dev_path).st_atime, - _utc) - session.idle = atime - - # Add the session to the user. - - user.sessions.add(session) - if (user.last_login is None or - user.last_login < session.start): - user.last_login = session.start - - # We're done refreshing! - - self._data = list(users.values()) - self._lastrefreshtime = _dt.now() - - # Get the results. - - results = ([_copy(user) for user in self._data - if not (query is not None and query not in user.login) - and not (active is not None and active != bool(user.sessions))]) - self._lock.release() - - return results + """ Native finger interface for POSIX-compliant OSes we know, + using utmp and wtmp files. """ + + def __init__(self): + self._data = [] + self._lastrefreshtime = None + self._lock = _Lock() + + def search_users( + self, + query: _Optional[str], + active: _Optional[bool], + ) -> _Sequence[_FingerUser]: + """ Look for users given a check. """ + + self._lock.acquire() + + # Refresh the user list if required. + + if ( + self._lastrefreshtime is None + or abs((self._lastrefreshtime - _dt.now()).total_seconds()) >= 1 + ): + # The method for gathering users and sessions in POSIX systems + # is the following: + # + # 1. Get the users in /etc/passwd, and check which ones not + # to make appear through the presence of ``.nofinger`` + # in their home directory. + # 2. Get the sessions in utmpx, and make them correspond to + # the related user. + # 3. For each session, get the idle time by gathering the + # mtime of the device. + + users = {} + usernames_by_id = {} + + for pw in _pwd.getpwall(): + usernames_by_id[pw.pw_uid] = pw.pw_name + + if _exists(_joinpaths(pw.pw_dir, '.nofinger')): + continue + + gecos = pw.pw_gecos.split(',') + user = _FingerUser( + login=pw.pw_name, + name=gecos[0]) + user.shell = pw.pw_shell + user.home = pw.pw_dir + + if len(gecos) >= 2: + user.office = gecos[1] + try: + with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan: + user.plan = plan.read() + except (FileNotFoundError, PermissionError): + pass + + users[user.login] = user + + try: + lastlog = _pyutmpx.lastlog + except AttributeError: + lastlog = None + + if lastlog is not None: + for lle in lastlog: + try: + login = usernames_by_id[lle.uid] + except KeyError: + continue + + try: + user = users[login] + except KeyError: + continue + + user.last_login = lle.time.replace(tzinfo=_utc) + + try: + utmp = _pyutmpx.utmp + except AttributeError: + utmp = None + + if utmp is not None: + for ue in utmp: + if ue.type != _pyutmpx.USER_PROCESS: + continue + try: + user = users[ue.user] + except KeyError: + continue + + session = _FingerSession( + time=ue.time.replace(tzinfo=_utc)) + if ue.line: + session.line = ue.line + if ue.host: + session.host = ue.host + + session.idle = _dt.now() + if ue.line and not ue.line.startswith(':'): + dev_path = ( + ('', '/dev/')[ue.line[0] != '/'] + + ue.line) + atime = _dt.fromtimestamp( + _stat(dev_path).st_atime, _utc) + session.idle = atime + + # Add the session to the user. + + user.sessions.add(session) + if ( + user.last_login is None + or user.last_login < session.start + ): + user.last_login = session.start + + # We're done refreshing! + + self._data = list(users.values()) + self._lastrefreshtime = _dt.now() + + # Get the results. + + results = ([ + _copy(user) for user in self._data + if not (query is not None and query not in user.login) + and not (active is not None and active != bool(user.sessions))]) + self._lock.release() + + return results # End of file. diff --git a/fingerd/version.py b/fingerd/version.py index c368fc7..7dd1530 100644..100755 --- a/fingerd/version.py +++ b/fingerd/version.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ fingerd version definition. """ -version = "0.2" +version = '0.3' # End of file. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9194da0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +click +croniter +python-dateutil +pytz +pyutmpx +toml diff --git a/scripts/fingerd b/scripts/fingerd index 28877d2..0e5ce52 100755 --- a/scripts/fingerd +++ b/scripts/fingerd @@ -10,6 +10,6 @@ from fingerd.cli import cli as _cli __all__ = [] if __name__ == '__main__': - _cli() + _cli() # End of file. @@ -23,14 +23,20 @@ include_package_data = True packages = fingerd scripts = scripts/fingerd -install_requires = - toml - click - pytz - pyutmpx [options.package_data] * = *.txt, *.rst [wheel] universal = True + +[flake8] +per-file-ignores = + tests/*:S101 +rst-roles = + py:class + py:attr + py:data + py:meth +rst-directives = + py:data @@ -1,19 +1,30 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2018-2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ Setup script for the fingerd Python package. """ +import os.path as _path from setuptools import setup as _setup kwargs = {} +# Add requirements using the requirements.txt file. + +requirements = set() +with open(_path.join(_path.dirname(__file__), 'requirements.txt'), 'r') as f: + requirements.update(f.read().splitlines()) + +kwargs['install_requires'] = sorted(filter(lambda x: x, requirements)) + +# Use Sphinx for building docs. + try: - from sphinx.setup_command import BuildDoc as _BuildDoc - kwargs['cmdclass'] = {'build_sphinx': _BuildDoc} + from sphinx.setup_command import BuildDoc as _BuildDoc + kwargs['cmdclass'] = {'build_sphinx': _BuildDoc} except ImportError: - pass + pass # Actually, most of the project's data is read from the `setup.cfg` file. diff --git a/tests/__init__.py b/tests/__init__.py index 1e6ccc3..c5a4391 100755 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -#************************************************************************** +# ***************************************************************************** # Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr> # This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** +# ***************************************************************************** """ Unit tests for the `fingerd` Python module. """ # This file is only there to indicate that the folder is a module. diff --git a/tests/test_core.py b/tests/test_core.py deleted file mode 100644 index dfe6f4a..0000000 --- a/tests/test_core.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 -#************************************************************************** -# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr> -# This file is part of the fingerd project, which is MIT-licensed. -#************************************************************************** -""" Tests for the fingerd core. """ - -from fingerd.core import * -from sys import stderr - -def test_logger(capfd): - l = FingerLogger(stderr) - l.start('127.0.0.1', '79') - - _, err = capfd.readouterr() - assert err == "Starting fingerd on [127.0.0.1]:79.\n" - -# End of file. diff --git a/tests/test_fingerd.py b/tests/test_fingerd.py new file mode 100644 index 0000000..69f3669 --- /dev/null +++ b/tests/test_fingerd.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +# ***************************************************************************** +# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr> +# This file is part of the fingerd project, which is MIT-licensed. +# ***************************************************************************** +""" Tests for the fingerd server. """ + +import socket + +from datetime import timedelta +from time import sleep + +from fingerd.core import FingerServer +from fingerd.fiction import ( + FingerScenario, FingerScenarioInterface, + FingerUserCreationAction, FingerUserLoginAction, FingerUserLogoutAction, +) + +import pytest + + +class TestFingerConnection: + @pytest.fixture + def fingerserver(self): + scenario = FingerScenario() + scenario.ending_type = 'freeze' + scenario.duration = timedelta(seconds=5) + scenario.add( + FingerUserCreationAction( + login='john', + name='John Doe', + home='/home/john', + shell='/bin/bash', # NOQA + office='84.6', + ), + timedelta(seconds=-5)) + scenario.add( + FingerUserLoginAction( + login='john', + line='tty1', + ), + timedelta(seconds=0)) + scenario.add( + FingerUserLogoutAction( + login='john', + ), + timedelta(seconds=1)) + + server = FingerServer( + 'localhost:3099', + hostname='example.org', + interface=FingerScenarioInterface(scenario), + ) + server.start() + + sleep(.1) + yield + + server.stop() + + def _send_command(self, command): + conn = socket.create_connection(('localhost', 3099)) + conn.send(command) + return conn.recv(1024) + + # --- + # Tests. + # --- + + def test_no_user_list(self, fingerserver): + result = self._send_command(b'user\r\n') + assert result == b'No user list available.\r\n' + + def test_existing_user_list(self, fingerserver): + result = self._send_command(b'\r\n') + + assert result != b'' + assert result != b'No user list available.\r\n' + + sleep(2) + result = self._send_command(b'\r\n') + + assert result == b'No user list available.\r\n' + +# End of file. |