aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xMakefile23
-rw-r--r--Pipfile16
-rw-r--r--Pipfile.lock116
-rw-r--r--docs/conf.py49
-rwxr-xr-xfingerd/__init__.py52
-rwxr-xr-xfingerd/__main__.py9
-rwxr-xr-x[-rw-r--r--]fingerd/cli.py112
-rwxr-xr-xfingerd/core.py2351
-rwxr-xr-xfingerd/errors.py62
-rwxr-xr-xfingerd/fiction.py2101
-rwxr-xr-x[-rw-r--r--]fingerd/native.py23
-rwxr-xr-x[-rw-r--r--]fingerd/posix.py282
-rwxr-xr-x[-rw-r--r--]fingerd/version.py6
-rw-r--r--requirements.txt6
-rwxr-xr-xscripts/fingerd2
-rw-r--r--setup.cfg16
-rwxr-xr-xsetup.py21
-rwxr-xr-xtests/__init__.py4
-rw-r--r--tests/test_core.py18
-rw-r--r--tests/test_fingerd.py85
20 files changed, 2789 insertions, 2565 deletions
diff --git a/Makefile b/Makefile
index 3b27d1c..a8c4b2a 100755
--- a/Makefile
+++ b/Makefile
@@ -1,23 +1,16 @@
#!/usr/bin/make -f
- PE := pipenv run
- ST := $(PE) ./setup.py
- DNAME := dist/$(shell PIPENV_DONT_LOAD_ENV=1 $(ST) --name)-$(shell PIPENV_DONT_LOAD_ENV=1 $(ST) --version).tar.gz
+ DNAME := dist/$(shell ./setup.py --name)-$(shell ./setup.py --version).tar.gz
test tests:
- @$(PE) pytest -q --capture=fd
-
-prepare:
- @pipenv install --dev
-update:
- @pipenv update --dev
+ @pytest -q --capture=fd
docs:
- @$(ST) build_sphinx
+ @./setup.py build_sphinx
checkdocs:
- @$(ST) checkdocs
+ @./setup.py checkdocs
run:
- @pipenv run python -m fingerd -b 'localhost:3999'
+ @python -m fingerd -b 'localhost:3999'
redirect-localhost-ports:
iptables -t nat -A OUTPUT -p tcp -s 127.0.0.1 -d 127.0.0.1 \
--dport 79 -j DNAT --to 127.0.0.1:3999
@@ -26,14 +19,14 @@ redirect-localhost-ports:
dist: $(DNAME)
$(DNAME):
- $(ST) sdist
+ ./setup.py sdist
upload: $(DNAME)
@twine upload $(DNAME)
install:
- $(ST) install
+ ./setup.py install
install-user:
- $(ST) install --user
+ ./setup.py install --user
.PHONY: test tests prepare update docs checkdocs
.PHONY: run redirect-localhost-ports
diff --git a/Pipfile b/Pipfile
deleted file mode 100644
index e123388..0000000
--- a/Pipfile
+++ /dev/null
@@ -1,16 +0,0 @@
-[[source]]
-url = 'https://pypi.python.org/simple'
-verify_ssl = true
-name = 'pypi'
-
-[requires]
-python_version = '3.9'
-
-[packages]
-toml = '*'
-click = '*'
-pytz = '*'
-pyutmpx = '*'
-
-[dev-packages]
-pytest = '*'
diff --git a/Pipfile.lock b/Pipfile.lock
deleted file mode 100644
index 63f0a11..0000000
--- a/Pipfile.lock
+++ /dev/null
@@ -1,116 +0,0 @@
-{
- "_meta": {
- "hash": {
- "sha256": "1286d0a3089864737e05764e507206586ba18e19887463c65938cdf0e23ba23d"
- },
- "pipfile-spec": 6,
- "requires": {
- "python_version": "3.9"
- },
- "sources": [
- {
- "name": "pypi",
- "url": "https://pypi.python.org/simple",
- "verify_ssl": true
- }
- ]
- },
- "default": {
- "click": {
- "hashes": [
- "sha256:8c04c11192119b1ef78ea049e0a6f0463e4c48ef00a30160c704337586f3ad7a",
- "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6"
- ],
- "index": "pypi",
- "version": "==8.0.1"
- },
- "pytz": {
- "hashes": [
- "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da",
- "sha256:eb10ce3e7736052ed3623d49975ce333bcd712c7bb19a58b9e2089d4057d0798"
- ],
- "index": "pypi",
- "version": "==2021.1"
- },
- "pyutmpx": {
- "hashes": [
- "sha256:9469aa51b1fd921db583ffa9a746944f089c12ce5718800474b878f331f0a30a"
- ],
- "index": "pypi",
- "version": "==0.3.1"
- },
- "toml": {
- "hashes": [
- "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
- "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
- ],
- "index": "pypi",
- "version": "==0.10.2"
- }
- },
- "develop": {
- "attrs": {
- "hashes": [
- "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1",
- "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb"
- ],
- "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
- "version": "==21.2.0"
- },
- "iniconfig": {
- "hashes": [
- "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3",
- "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"
- ],
- "version": "==1.1.1"
- },
- "packaging": {
- "hashes": [
- "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7",
- "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14"
- ],
- "markers": "python_version >= '3.6'",
- "version": "==21.0"
- },
- "pluggy": {
- "hashes": [
- "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159",
- "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"
- ],
- "markers": "python_version >= '3.6'",
- "version": "==1.0.0"
- },
- "py": {
- "hashes": [
- "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3",
- "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"
- ],
- "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
- "version": "==1.10.0"
- },
- "pyparsing": {
- "hashes": [
- "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1",
- "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b"
- ],
- "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
- "version": "==2.4.7"
- },
- "pytest": {
- "hashes": [
- "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89",
- "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"
- ],
- "index": "pypi",
- "version": "==6.2.5"
- },
- "toml": {
- "hashes": [
- "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
- "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
- ],
- "index": "pypi",
- "version": "==0.10.2"
- }
- }
-}
diff --git a/docs/conf.py b/docs/conf.py
index 9f30bd2..dd7bc44 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+# flake8: noqa
#
# Configuration file for the Sphinx documentation builder.
#
@@ -6,7 +7,8 @@
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
-import os, sys
+import os
+import sys
# -- Path setup --------------------------------------------------------------
@@ -24,13 +26,15 @@ author = 'Thomas Touhey'
# The full version, including alpha/beta/rc tags
+
def _get_release():
- from os.path import dirname, join
- from pkg_resources import find_distributions as find_dist
+ from os.path import dirname, join
+ from pkg_resources import find_distributions as find_dist
+
+ module_path = join(dirname(__file__), '..')
+ dist = next(find_dist(module_path, True))
+ return dist.version
- module_path = join(dirname(__file__), '..')
- dist = next(find_dist(module_path, True))
- return dist.version
release = _get_release()
@@ -44,9 +48,9 @@ release = _get_release()
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx_autodoc_typehints',
- 'sphinx.ext.todo'
+ 'sphinx.ext.autodoc',
+ 'sphinx_autodoc_typehints',
+ 'sphinx.ext.todo',
]
# Add any paths that contain templates here, relative to this directory.
@@ -71,7 +75,9 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
-exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', '.goutput*', '*.kate-swp']
+exclude_patterns = [
+ '_build', 'Thumbs.db', '.DS_Store', '.goutput*', '*.kate-swp',
+]
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
@@ -86,8 +92,7 @@ html_favicon = "favicon.png"
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
-#
-#html_theme = 'alabaster'
+
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
@@ -142,8 +147,10 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
- (master_doc, 'fingerd.tex', 'fingerd Documentation',
- 'Thomas Touhey', 'manual'),
+ (
+ master_doc, 'fingerd.tex', 'fingerd Documentation',
+ 'Thomas Touhey', 'manual'
+ ),
]
@@ -152,8 +159,10 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
- (master_doc, 'fingerd', 'fingerd Documentation',
- [author], 1)
+ (
+ master_doc, 'fingerd', 'fingerd Documentation',
+ [author], 1
+ ),
]
@@ -163,7 +172,9 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
- (master_doc, 'fingerd', 'fingerd Documentation',
- author, 'fingerd', 'A modern finger (RFC 1288) server.',
- 'Miscellaneous'),
+ (
+ master_doc, 'fingerd', 'fingerd Documentation',
+ author, 'fingerd', 'A modern finger (RFC 1288) server.',
+ 'Miscellaneous'
+ ),
]
diff --git a/fingerd/__init__.py b/fingerd/__init__.py
index 2c4c4e3..e560976 100755
--- a/fingerd/__init__.py
+++ b/fingerd/__init__.py
@@ -1,25 +1,45 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
-""" finger is both a protocol and a utility to get the information and
- status from a user on a distant machine. It was standardized in RFC 742
- in 1977, then in RFC 1288 in 1991, and has been abandoned by most
- people since.
+# *****************************************************************************
+""" Pure Python finger protocol implementation.
- This Python module is a finger server implementation that allows you
- to give out real information as well as fictional information. """
+ finger is both a protocol and a utility to get the information and
+ status from a user on a distant machine. It was standardized in RFC 742
+ in 1977, then in RFC 1288 in 1991, and has been abandoned by most
+ people since.
-import os.path as _path
-
-from os import environ as _environ
-from sys import stderr as _stderr
+ This Python module is a finger server implementation that allows you
+ to give out real information as well as fictional information. """
+from .core import (
+ FingerFormatter, FingerInterface, FingerLogger,
+ FingerServer, FingerSession, FingerUser,
+)
+from .errors import (
+ BindError, ConfigurationError, HostnameError,
+ InvalidBindError, NoBindsError,
+)
+from .fiction import (
+ FingerAction, FingerFictionInterface, FingerScenario,
+ FingerScenarioInterface, FingerUserCreationAction,
+ FingerUserDeletionAction, FingerUserEditionAction,
+ FingerUserLoginAction, FingerUserLogoutAction,
+ FingerUserSessionChangeAction,
+)
+from .native import FingerNativeInterface
from .version import version
-from .errors import *
-from .core import *
-from .fiction import *
-from .native import *
+
+__all__ = [
+ 'BindError', 'ConfigurationError', 'FingerAction',
+ 'FingerFictionInterface', 'FingerFormatter', 'FingerInterface',
+ 'FingerLogger', 'FingerNativeInterface', 'FingerScenario',
+ 'FingerScenarioInterface', 'FingerServer', 'FingerSession',
+ 'FingerUser', 'FingerUserCreationAction', 'FingerUserDeletionAction',
+ 'FingerUserEditionAction', 'FingerUserLoginAction',
+ 'FingerUserLogoutAction', 'FingerUserSessionChangeAction',
+ 'HostnameError', 'InvalidBindError', 'NoBindsError', 'version',
+]
# End of file.
diff --git a/fingerd/__main__.py b/fingerd/__main__.py
index 5e39ae9..b01e5b2 100755
--- a/fingerd/__main__.py
+++ b/fingerd/__main__.py
@@ -1,16 +1,15 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
-""" Main script of the module.
- Runs the server depending on the configuration content. """
+# *****************************************************************************
+""" Main script of the module. """
from .cli import cli as _cli
__all__ = []
if __name__ == '__main__':
- _cli()
+ _cli()
# End of file.
diff --git a/fingerd/cli.py b/fingerd/cli.py
index 755764f..db1400d 100644..100755
--- a/fingerd/cli.py
+++ b/fingerd/cli.py
@@ -1,58 +1,80 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" fingerd CLI interface. """
+from platform import (
+ python_implementation as _python_impl,
+ python_version as _python_version,
+)
+from sys import stderr as _stderr
+
import click as _click
-from platform import (python_implementation as _python_impl,
- python_version as _python_version)
+from . core import (
+ FingerInterface as _FingerInterface,
+ FingerServer as _FingerServer,
+)
+from .fiction import (
+ FingerScenario as _FingerScenario,
+ FingerScenarioInterface as _FingerScenarioInterface,
+)
+from .native import FingerNativeInterface as _FingerNativeInterface
from .version import version as _version
-from . import (FingerNativeInterface as _FingerNativeInterface,
- FingerScenario as _FingerScenario,
- FingerScenarioInterface as _FingerScenarioInterface,
- FingerInterface as _FingerInterface,
- FingerServer as _FingerServer)
__all__ = ['cli']
-@_click.command(context_settings = {'help_option_names': ['-h', '--help']})
-@_click.version_option(version = _version,
- message = f"fingerd version {_version}, "
- f"running on {_python_impl()} {_python_version()}")
-@_click.option('-b', '--binds', default = 'localhost:79',
- envvar = ('BIND', 'BINDS'))
-@_click.option('-H', '--hostname', default = 'localhost',
- envvar = ('FINGER_HOST',))
-@_click.option('-t', '--type', default = 'native',
- envvar = ('FINGER_TYPE',))
-@_click.option('-s', '--scenario', default = 'actions.toml',
- envvar = ('FINGER_ACTIONS',))
-def cli(binds, hostname, type, scenario):
- """ fingerd is a modern finger (RFC 1288) server.
- Find out more at <https://fingerd.touhey.pro/>. """
-
- hostname = hostname.upper()
- type = type.casefold()
-
- if type == 'native':
- iface = _FingerNativeInterface()
- elif type in ('actions', 'scenario'):
- fic = _FingerScenario()
- fic.load(scenario)
-
- iface = _FingerScenarioInterface(fic)
- else:
- if type != 'dummy':
- print("warning: unknown interface type, falling back on dummy",
- file = _stderr)
-
- iface = _FingerInterface()
-
- server = _FingerServer(binds = binds,
- hostname = hostname, interface = iface)
- server.serve_forever()
+
+@_click.command(context_settings={'help_option_names': ['-h', '--help']})
+@_click.version_option(
+ version=_version,
+ message=(
+ f'fingerd version {_version}, '
+ f'running on {_python_impl()} {_python_version()}'
+ ),
+)
+@_click.option(
+ '-b', '--binds', default='localhost:79',
+ envvar=('BIND', 'BINDS'),
+)
+@_click.option(
+ '-H', '--hostname', default='localhost',
+ envvar=('FINGER_HOST',),
+)
+@_click.option(
+ '-t', '--type', 'type_', default='native',
+ envvar=('FINGER_TYPE',),
+)
+@_click.option(
+ '-s', '--scenario', default='actions.toml',
+ envvar=('FINGER_ACTIONS',),
+)
+def cli(binds, hostname, type_, scenario):
+ """ fingerd is a modern finger (RFC 1288) server.
+ Find out more at <https://fingerd.touhey.pro/>. """
+
+ hostname = hostname.upper()
+ type_ = type_.casefold()
+
+ if type_ == 'native':
+ iface = _FingerNativeInterface()
+ elif type_ in ('actions', 'scenario'):
+ fic = _FingerScenario()
+ fic.load(scenario)
+
+ iface = _FingerScenarioInterface(fic)
+ elif type_ != 'dummy':
+ print(
+ 'warning: unknown interface type, falling back on dummy',
+ file=_stderr,
+ )
+ else:
+ iface = _FingerInterface()
+
+ server = _FingerServer(
+ binds=binds, hostname=hostname, interface=iface)
+ server.serve_forever()
# End of file.
diff --git a/fingerd/core.py b/fingerd/core.py
index ea828ea..8bd6d2d 100755
--- a/fingerd/core.py
+++ b/fingerd/core.py
@@ -1,1333 +1,1444 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" The main server class for the server, with the related utilities that
- will not be presented to the user, is defined in this file. """
+ will not be presented to the user, is defined in this file. """
-import string as _string
-import socket as _socket
-import signal as _signal
-import sys as _sys
+import asyncio as _asyncio
import copy as _copy
import multiprocessing as _multip
-import asyncio as _asyncio
+import signal as _signal
+import socket as _socket
+import string as _string
+import sys as _sys
-from enum import Enum as _Enum
-from io import TextIOWrapper as _TextIOWrapper, StringIO as _StringIO
+from collections.abc import Sequence as _Sequence
from datetime import datetime as _dt, timedelta as _td, tzinfo as _tzinfo
+from enum import Enum as _Enum
from typing import Optional as _Optional
-from collections.abc import Sequence as _Sequence
+from croniter import croniter as _croniter
from pytz import utc as _utc
from .errors import (
- NoBindsError as _NoBindsError,
- InvalidBindError as _InvalidBindError,
- HostnameError as _HostnameError)
+ HostnameError as _HostnameError,
+ InvalidBindError as _InvalidBindError,
+ NoBindsError as _NoBindsError,
+)
__all__ = [
- 'FingerServer',
- 'FingerInterface', 'FingerFormatter', 'FingerLogger',
- 'FingerUser', 'FingerSession'
+ 'FingerFormatter', 'FingerInterface', 'FingerLogger',
+ 'FingerServer', 'FingerSession', 'FingerUser', 'cron',
]
# ---
-# Basic representations.
+# Decorators.
# ---
-class FingerSession:
- """ Representation of an active session for a given user on the system.
-
- :param time: The start time of the given session;
- by default, the current datetime. """
-
- __slots__ = ('_start', '_line', '_host', '_idle')
-
- def __init__(self, *_, time: _Optional[_dt] = None):
- self._start = _dt.now()
- self._idle = None
- self._line = None
- self._host = None
-
- self.start = time
- self._idle = self._start
-
- def __repr__(self):
- p = ('start', 'idle', 'line', 'orig')
- p = (f"{x} = {repr(getattr(self, x))}" for x in p \
- if getattr(self, x) is not None)
- return f"{self._class__.__name__}({', '.join(p)})"
- @property
- def start(self) -> _dt:
- """ The timestamp at which the session has started.
+def cron(spec: str):
+ """ Decorator for asking the finger server to call the current
+ function at the given time. """
- Note that when set, if no timezone is present,
- the datetime is considered UTC. """
+ spec = _croniter(spec)
- return self._start
+ def decorator(func):
+ func.__cron__ = spec
+ return func
- @start.setter
- def start(self, value):
- value = value if isinstance(value, _dt) else _dt(value)
- if value.tzinfo is None:
- value = value.replace(tzinfo = _utc)
+ return decorator
- self._start = value
- @property
- def idle(self) -> _dt:
- """ The timestamp since which the user is idle on the session.
-
- Note that when set, if no timezone is present,
- the datetime is considered UTC; also, if the provided
- datetime is before the session start timestamp, it is
- set to it. """
-
- return self._idle
-
- @idle.setter
- def idle(self, value):
- value = value if isinstance(value, _dt) else _dt(value)
- if value.tzinfo is None:
- value = value.replace(tzinfo = _utc)
-
- if value < self._start:
- value = self._start
-
- self._idle = value
-
- @property
- def line(self) -> _Optional[str]:
- """ The line on which the user is. """
-
- return self._line
-
- @line.setter
- def line(self, value):
- self._line = None if value is None else str(value)
-
- @property
- def host(self) -> _Optional[str]:
- """ The host from which the user is connected. """
-
- return self._host
+# ---
+# Basic representations.
+# ---
- @host.setter
- def host(self, value):
- self._host = None if value is None else str(value)
+class FingerSession:
+ """ Representation of an active session for a given user on the system.
-class FingerUser:
- """ Representation of a user on the system, returned by subclasses of
- :py:class:`FingerInterface` and used by subclasses of
- :py:class:`FingerFormatter`.
-
- :param login: The login of the user.
- :param name: The display name of the user.
- :param home: The path to the home of the user.
- :param shell: The path to the user's default shell. """
-
- __slots__ = ('_login', '_name', '_home', '_shell', '_office',
- '_plan', '_last_login', '_sessions')
-
- def __init__(self, *_, login: _Optional[str] = None,
- name: _Optional[str] = None, home: _Optional[str] = None,
- shell: _Optional[str] = None):
- self._login = None
- self._name = ''
- self._home = None
- self._shell = None
- self._office = None
- self._plan = None
- self._last_login = None
- self._sessions = _FingerSessionManager()
+ :param time: The start time of the given session;
+ by default, the current datetime. """
- self.login = login
- self.name = name
- self.home = home
- self.shell = shell
+ __slots__ = ('_start', '_line', '_host', '_idle')
- def __repr__(self):
- p = ('login', 'name', 'home', 'shell', 'office',
- 'last_login', 'sessions')
- p = (f"{x} = {repr(getattr(self, x))}" for x in p \
- if getattr(self, x) is not None)
- return f"{self._class__.__name__}({', '.join(p)})"
+ def __init__(self, *_, time: _Optional[_dt] = None):
+ self._start = _dt.now()
+ self._idle = None
+ self._line = None
+ self._host = None
- @property
- def login(self) -> _Optional[str]:
- """ The login name of the user, e.g. 'cake' or 'gaben'. """
+ self.start = time
+ self._idle = self._start
- return self._login
+ def __repr__(self):
+ p = ('start', 'idle', 'line', 'orig')
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in p if getattr(self, x) is not None)
+ return f"{self._class__.__name__}({', '.join(p)})"
- @login.setter
- def login(self, value: _Optional[str]) -> None:
- self._login = value
+ @property
+ def start(self) -> _dt:
+ """ The timestamp at which the session has started.
- @property
- def name(self) -> _Optional[str]:
- """ The display name of the user, e.g. 'Jean Dupont'. """
+ Note that when set, if no timezone is present,
+ the datetime is considered UTC. """
- return self._name
+ return self._start
- @name.setter
- def name(self, value: _Optional[str]) -> None:
- self._name = value
+ @start.setter
+ def start(self, value):
+ value = value if isinstance(value, _dt) else _dt(value)
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=_utc)
- @property
- def last_login(self) -> _Optional[str]:
- """ The last login date for the user, None if not known. """
+ self._start = value
- return self._last_login
+ @property
+ def idle(self) -> _dt:
+ """ The timestamp since which the user is idle on the session.
- @last_login.setter
- def last_login(self, value: _Optional[str]) -> None:
- self._last_login = None if value is None else \
- value if isinstance(value, _dt) else _dt(value)
+ Note that when set, if no timezone is present,
+ the datetime is considered UTC; also, if the provided
+ datetime is before the session start timestamp, it is
+ set to it. """
- @property
- def home(self) -> _Optional[str]:
- """ The path to the user's home on the given system, None if
- not known or defined. """
+ return self._idle
- return self._home
+ @idle.setter
+ def idle(self, value):
+ value = value if isinstance(value, _dt) else _dt(value)
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=_utc)
- @home.setter
- def home(self, value: _Optional[str]) -> None:
- self._home = None if value is None else str(value)
+ if value < self._start:
+ value = self._start
- @property
- def shell(self) -> _Optional[str]:
- """ The path to the user's shell on the given system, None if
- not known or defined. """
+ self._idle = value
- return self._shell
+ @property
+ def line(self) -> _Optional[str]:
+ """ The line on which the user is. """
- @shell.setter
- def shell(self, value: _Optional[str]) -> None:
- self._shell = None if value is None else str(value)
+ return self._line
- @property
- def office(self) -> _Optional[str]:
- """ The display name of the user's office, None if not known
- or defined. """
+ @line.setter
+ def line(self, value):
+ self._line = None if value is None else str(value)
- return self._office
+ @property
+ def host(self) -> _Optional[str]:
+ """ The host from which the user is connected. """
- @office.setter
- def office(self, value: _Optional[str]) -> None:
- self._office = None if value is None else str(value)
+ return self._host
- @property
- def plan(self) -> _Optional[str]:
- """ The plan of the user, usually the content of the ``.plan``
- file in the user's home on real (and kind of obsolete) UNIX-like
- systems. """
+ @host.setter
+ def host(self, value):
+ self._host = None if value is None else str(value)
- return self._plan
- @plan.setter
- def plan(self, value: _Optional[str]) -> None:
- if value is None:
- self._plan = None
- else:
- value = str(value)
- self._plan = '\n'.join(value.splitlines())
+class FingerUser:
+ """ Representation of a user on the system, returned by subclasses of
+ :py:class:`FingerInterface` and used by subclasses of
+ :py:class:`FingerFormatter`.
+
+ :param login: The login of the user.
+ :param name: The display name of the user.
+ :param home: The path to the home of the user.
+ :param shell: The path to the user's default shell. """
+
+ __slots__ = (
+ '_login', '_name', '_home', '_shell', '_office',
+ '_plan', '_last_login', '_sessions',
+ )
+
+ def __init__(
+ self, *_,
+ login: _Optional[str] = None,
+ name: _Optional[str] = None,
+ home: _Optional[str] = None,
+ shell: _Optional[str] = None,
+ ):
+ self._login = None
+ self._name = ''
+ self._home = None
+ self._shell = None
+ self._office = None
+ self._plan = None
+ self._last_login = None
+ self._sessions = _FingerSessionManager()
+
+ self.login = login
+ self.name = name
+ self.home = home
+ self.shell = shell
+
+ def __repr__(self):
+ p = (
+ 'login', 'name', 'home', 'shell', 'office',
+ 'last_login', 'sessions')
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in p if getattr(self, x) is not None)
+ return f"{self._class__.__name__}({', '.join(p)})"
+
+ @property
+ def login(self) -> _Optional[str]:
+ """ The login name of the user, e.g. 'cake' or 'gaben'. """
+
+ return self._login
+
+ @login.setter
+ def login(self, value: _Optional[str]) -> None:
+ self._login = value
+
+ @property
+ def name(self) -> _Optional[str]:
+ """ The display name of the user, e.g. 'Jean Dupont'. """
+
+ return self._name
+
+ @name.setter
+ def name(self, value: _Optional[str]) -> None:
+ self._name = value
+
+ @property
+ def last_login(self) -> _Optional[str]:
+ """ The last login date for the user, None if not known. """
+
+ return self._last_login
+
+ @last_login.setter
+ def last_login(self, value: _Optional[str]) -> None:
+ self._last_login = None if value is None else \
+ value if isinstance(value, _dt) else _dt(value)
+
+ @property
+ def home(self) -> _Optional[str]:
+ """ The path to the user's home on the given system, None if
+ not known or defined. """
+
+ return self._home
+
+ @home.setter
+ def home(self, value: _Optional[str]) -> None:
+ self._home = None if value is None else str(value)
+
+ @property
+ def shell(self) -> _Optional[str]:
+ """ The path to the user's shell on the given system, None if
+ not known or defined. """
+
+ return self._shell
+
+ @shell.setter
+ def shell(self, value: _Optional[str]) -> None:
+ self._shell = None if value is None else str(value)
+
+ @property
+ def office(self) -> _Optional[str]:
+ """ The display name of the user's office, None if not known
+ or defined. """
+
+ return self._office
+
+ @office.setter
+ def office(self, value: _Optional[str]) -> None:
+ self._office = None if value is None else str(value)
+
+ @property
+ def plan(self) -> _Optional[str]:
+ """ The plan of the user, usually the content of the ``.plan``
+ file in the user's home on real (and kind of obsolete) UNIX-like
+ systems. """
+
+ return self._plan
+
+ @plan.setter
+ def plan(self, value: _Optional[str]) -> None:
+ if value is None:
+ self._plan = None
+ else:
+ value = str(value)
+ self._plan = '\n'.join(value.splitlines())
- @property
- def sessions(self) -> _Sequence[FingerSession]:
- """ The current sessions array for the user, always defined. """
+ @property
+ def sessions(self) -> _Sequence[FingerSession]:
+ """ The current sessions array for the user, always defined. """
- return self._sessions
+ return self._sessions
class _FingerSessionManager:
- """ Session manager. """
-
- __slots__ = ('_sessions')
-
- def __init__(self):
- self._sessions = []
-
- def __repr__(self):
- return repr(self._sessions)
-
- def __bool__(self):
- return bool(self._sessions)
-
- def __iter__(self):
- return iter(self._sessions[::-1])
-
- def __len__(self):
- return len(self._sessions)
-
- def __delitem__(self, key):
- if key is None:
- self._sessions.pop(0)
- else:
- for i in [i for i, x in enumerate(self._sessions) \
- if key == x.name][::-1]:
- self._sessions.pop(i)
-
- def __getitem__(self, key):
- if key is None:
- try:
- return self._sessions[0]
- except IndexError:
- raise KeyError("could not get latest session") from None
-
- if type(key) is int:
- try:
- return self._sessions[key]
- except IndexError:
- msg = f"could not get session #{repr(key)}"
- raise IndexError(msg) from None
-
- try:
- return next(x for x in self._sessions if key == x.name)
- except StopIteration:
- raise KeyError(f"could not get session {repr(key)}") from None
-
- def __setitem__(self, key, value):
- if not isinstance(value, FingerSession):
- raise TypeError("can only add sessions into a session manager")
- value = _copy.deepcopy(value)
-
- if key is None:
- # Check if except the first session, the key of the session,
- # if any, does not override another key.
-
- if value.name is not None:
- try:
- next(i for i, x in self._sessions[1:] \
- if value.name == x.name)
- except StopIteration:
- pass
- else:
- msg = "value.name overrides another session's key"
- raise ValueError(msg) from None
-
- try:
- self._sessions[0] = value
- return
- except IndexError:
- raise KeyError("could not set latest session") from None
-
- if type(key) is int:
- # Check if except the key-th session, the key of the session,
- # if any, does not override another key.
-
- if value.name is not None:
- try:
- next(i for i, x in self._sessions \
- if i != key and value.name == x.name)
- except StopIteration:
- pass
- else:
- msg = "value.name overrides another session's key"
- raise ValueError(msg) from None
-
- try:
- self._sessions[key] = value
- return
- except IndexError:
- msg = f"could not set session #{repr(key)}"
- raise IndexError(msg) from None
-
- value.name = key
-
- try:
- i = next(i for i, x in enumerate(self._sessions) \
- if key == x.name)
- except StopIteration:
- raise KeyError(f"could not set session {repr(key)}") from None
-
- self._sessions[i] = value
-
- def add(self, session):
- """ Add a session. """
-
- if not isinstance(session, FingerSession):
- raise TypeError("can only insert sessions into a session "
- "manager")
-
- self._sessions.insert(0, session)
+ """ Session manager. """
+
+ __slots__ = ('_sessions',)
+
+ def __init__(self):
+ self._sessions = []
+
+ def __repr__(self):
+ return repr(self._sessions)
+
+ def __bool__(self):
+ return bool(self._sessions)
+
+ def __iter__(self):
+ return iter(self._sessions[::-1])
+
+ def __len__(self):
+ return len(self._sessions)
+
+ def __delitem__(self, key):
+ if key is None:
+ self._sessions.pop(0)
+ else:
+ for i in ([
+ i for i, x in enumerate(self._sessions)
+ if key == x.name
+ ][::-1]):
+ self._sessions.pop(i)
+
+ def __getitem__(self, key):
+ if key is None:
+ try:
+ return self._sessions[0]
+ except IndexError:
+ raise KeyError('could not get latest session') from None
+
+ if type(key) is int:
+ try:
+ return self._sessions[key]
+ except IndexError:
+ msg = f'could not get session #{key!r}'
+ raise IndexError(msg) from None
+
+ try:
+ return next(x for x in self._sessions if key == x.name)
+ except StopIteration:
+ raise KeyError(f'could not get session {key!r}') from None
+
+ def __setitem__(self, key, value):
+ if not isinstance(value, FingerSession):
+ raise TypeError('can only add sessions into a session manager')
+ value = _copy.deepcopy(value)
+
+ if key is None:
+ # Check if except the first session, the key of the session,
+ # if any, does not override another key.
+
+ if value.name is not None:
+ try:
+ next(
+ i for i, x in self._sessions[1:]
+ if value.name == x.name)
+ except StopIteration:
+ pass
+ else:
+ msg = "value.name overrides another session's key"
+ raise ValueError(msg) from None
+
+ try:
+ self._sessions[0] = value
+ return
+ except IndexError:
+ raise KeyError('could not set latest session') from None
+
+ if type(key) is int:
+ # Check if except the key-th session, the key of the session,
+ # if any, does not override another key.
+
+ if value.name is not None:
+ try:
+ next(
+ i for i, x in self._sessions
+ if i != key and value.name == x.name)
+ except StopIteration:
+ pass
+ else:
+ msg = "value.name overrides another session's key"
+ raise ValueError(msg) from None
+
+ try:
+ self._sessions[key] = value
+ return
+ except IndexError:
+ msg = f'could not set session #{key!r}'
+ raise IndexError(msg) from None
+
+ value.name = key
+
+ try:
+ i = next(
+ i for i, x in enumerate(self._sessions)
+ if key == x.name)
+ except StopIteration:
+ raise KeyError(f'could not set session {key!r}') from None
+
+ self._sessions[i] = value
+
+ def add(self, session):
+ """ Add a session. """
+
+ if not isinstance(session, FingerSession):
+ raise TypeError(
+ 'can only insert sessions into a session manager',
+ )
+
+ self._sessions.insert(0, session)
# ---
# Formatter base class.
# ---
-class FingerFormatter:
- """ Formatter for :py:class:`FingerServer`.
- Provides text-formatted (as strings limited to ASCII)
- answers for given queries with given results as objects.
- This class must be subclassed by other formatters.
- Only methods not starting with an underscore are called by
- instances of :py:class:`FingerServer`; others are utilities
- called by these.
+class FingerFormatter:
+ """ Formatter for :py:class:`FingerServer`.
+ Provides text-formatted (as strings limited to ASCII)
+ answers for given queries with given results as objects.
- Unless methods are overridden to have a different behaviour,
- this formatter aims at RFC 1288 compliance.
+ This class must be subclassed by other formatters.
+ Only methods not starting with an underscore are called by
+ instances of :py:class:`FingerServer`; others are utilities
+ called by these.
- :param tzinfo: Timezone used for formatting dates and
- times. """
+ Unless methods are overridden to have a different behaviour,
+ this formatter aims at RFC 1288 compliance.
- def __init__(self, tzinfo: _Optional[_tzinfo] = None):
- if tzinfo is None:
- tzinfo = _dt.now().astimezone().tzinfo
- self._tzinfo = tzinfo
+ :param tzinfo: Timezone used for formatting dates and
+ times. """
- def __repr__(self):
- return f"{self.__class__.__name__}()"
+ def __init__(self, tzinfo: _Optional[_tzinfo] = None):
+ if tzinfo is None:
+ tzinfo = _dt.now().astimezone().tzinfo
+ self._tzinfo = tzinfo
- # ---
- # Internal formatting utilities.
- # ---
+ def __repr__(self):
+ return f'{self.__class__.__name__}()'
- def _format_idle(self, idle: _td) -> str:
- """ Format an idle time delta. """
+ # ---
+ # Internal formatting utilities.
+ # ---
- def _iter_idle(idle):
- days = int(idle.days)
- hours = int(idle.seconds / 3600)
- mins = int(idle.seconds % 3600 / 60)
- secs = int(idle.seconds % 60)
+ def _format_idle(self, idle: _td) -> str:
+ """ Format an idle time delta. """
- if days:
- yield f"{days} day{('', 's')[days > 1]}"
- if hours:
- yield f"{hours} hour{('', 's')[hours > 1]}"
- if mins:
- yield f"{mins} minute{('', 's')[mins > 1]}"
- if secs:
- yield f"{secs} second{('', 's')[secs > 1]}"
+ def _iter_idle(idle):
+ days = int(idle.days)
+ hours = int(idle.seconds / 3600)
+ mins = int(idle.seconds % 3600 / 60)
+ secs = int(idle.seconds % 60)
+
+ if days:
+ yield f'{days} day{("", "s")[days > 1]}'
+ if hours:
+ yield f'{hours} hour{("", "s")[hours > 1]}'
+ if mins:
+ yield f'{mins} minute{("", "s")[mins > 1]}'
+ if secs:
+ yield f'{secs} second{("", "s")[secs > 1]}'
- return f"{' '.join(_iter_idle(idle))} idle"
+ return f'{" ".join(_iter_idle(idle))} idle'
- def _format_time(self, d: _td) -> str:
- """ Format a date and time. """
+ def _format_time(self, d: _td) -> str:
+ """ Format a date and time. """
- if d < _td():
- return ""
+ if d < _td():
+ return ''
- days = int(d.days)
- hours = int(d.seconds / 3600)
- mins = int(d.seconds % 3600 / 60)
+ days = int(d.days)
+ hours = int(d.seconds / 3600)
+ mins = int(d.seconds % 3600 / 60)
- if days:
- return f"{days}d"
- elif hours or mins:
- return f"{hours:02}:{mins:02}"
+ if days:
+ return f'{days}d'
+ elif hours or mins:
+ return f'{hours:02}:{mins:02}'
- return ""
-
- def _format_when(self, d: _dt) -> str:
- """ Format a date and time for 'when'. """
+ return ''
+
+ def _format_when(self, d: _dt) -> str:
+ """ Format a date and time for 'when'. """
- return d.astimezone(self._tzinfo).strftime("%a %H:%M")
-
- def _format_header(self, hostname: str, raw_query: str) -> str:
- """ Returns the header of the formatted answer for every request,
- except when an error has occurred in the user's query.
+ return d.astimezone(self._tzinfo).strftime('%a %H:%M')
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :return: The header of the formatted answer as text. """
-
- if raw_query:
- raw_query = ' ' + raw_query
-
- return f"Site: {hostname}\r\n" f"Command line:{raw_query}\r\n" "\r\n"
-
- def _format_footer(self) -> str:
- """ Returns the footer of the formatted answer for every request,
- except when an error has occurred in the user's query.
-
- :return: The footer of the formatted answer as text. """
-
- return ""
-
- # ---
- # Used formatting functions.
- # ---
-
- def format_query_error(self, hostname: str, raw_query: str) -> str:
- """ Returns the formatted answer for when an error has occurred
- in the user's query.
-
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :return: The formatted answer as text. """
-
- return "Site: {}\r\n" \
- "You have made a mistake in your query!\r\n".format(hostname)
-
- def format_short(self, hostname: str, raw_query: str,
- users: _Sequence[FingerUser]) -> str:
- """ Returns the formatted answer for a user list in the 'short'
- format.
-
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :param users: The user list.
- :return: The formatted answer as text. """
-
- if not users:
- return "No user list available.\r\n"
-
- now = _dt.now().astimezone()
-
- lst = []
- for user in users:
- if not user.sessions:
- lst.append((user, None))
- for session in user.sessions:
- lst.append((user, session))
-
- _login = lambda u, s: u.login
- _name = lambda u, s: u.name
- _line = lambda u, s: s.line if s and s.line else ''
- _idle = lambda u, s: self._format_time(now - s.idle) if s else ''
- _logt = lambda u, s: self._format_when(s.start) if s else ''
- _offic = lambda u, s: (f'({s.host})' if s.host
- else u.office if u.office else '')
-
- columns = (
- ('Login',) + tuple(_login(u, s) for u, s in lst),
- ('Name',) + tuple( _name(u, s) for u, s in lst),
- ('TTY',) + tuple( _line(u, s) for u, s in lst),
- ('Idle',) + tuple( _idle(u, s) for u, s in lst),
- ('When',) + tuple( _logt(u, s) for u, s in lst),
- ('Office',) + tuple(_offic(u, s) for u, s in lst))
-
- sizes = tuple(max(map(len, c)) + 1 for i, c in enumerate(columns))
- align = ('<', '<', '<', '^', '^', '<')
-
- lines = []
- for line in range(len(columns[0])):
- lines.append(' '.join(\
- f"{columns[i][line][:sizes[i]]:{align[i]}{sizes[i]}}" \
- for i in range(len(columns))))
-
- return (
- self._format_header(hostname, raw_query) +
- '\r\n'.join(lines) + '\r\n' +
- self._format_footer())
-
- def format_long(self, hostname: str, raw_query: str,
- users: _Sequence[FingerUser]) -> str:
- """ Returns the formatted answer for a user list in the 'long'
- format.
-
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :param users: The user list.
- :return: The formatted answer as text. """
-
- if not users:
- return "No user list available.\r\n"
-
- now = _dt.now().astimezone()
- res = ''
-
- for user in users:
- res += (""
- f"Login name: {user.login[:27]:<27} "
- f"Name: {user.name if user.name else user.login}\r\n"
- f"Directory: {user.home[:28] if user.home else '':<28} "
- f"Shell: {user.shell if user.shell else ''}\r\n")
- if user.office:
- res += f"Office: {user.office if user.office else ''}\r\n"
-
- if user.sessions:
- # List current sessions.
-
- for se in user.sessions:
- since = (se.start.astimezone(self._tzinfo)
- .strftime("%a %b %e %R"))
- tz = self._tzinfo
- res += f"On since {since} ({tz})"
- if se.line is not None:
- res += f" on {se.line}"
- if se.host is not None:
- res += f" from {se.host}"
- res += "\r\n"
-
- idle = now - se.idle
- if idle >= _td(seconds = 4):
- res += f" {self._format_idle(idle)}\r\n"
- elif user.last_login is not None:
- # Show last login.
-
- date = (user.last_login.astimezone(self._tzinfo)
- .strftime("%a %b %e %R"))
- tz = self._tzinfo
- res += f"Last login {date} ({tz}) on console\r\n"
- else:
- res += "Never logged in.\r\n"
-
- if user.plan is None:
- res += "No plan.\r\n"
- else:
- res += "Plan:\r\n"
- res += "\r\n".join(user.plan.splitlines())
- res += "\r\n"
-
- res += "\r\n"
-
- return (
- self._format_header(hostname, raw_query) +
- res +
- self._format_footer())
+ def _format_header(self, hostname: str, raw_query: str) -> str:
+ """ Returns the header of the formatted answer for every request,
+ except when an error has occurred in the user's query.
+
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :return: The header of the formatted answer as text. """
+
+ if raw_query:
+ raw_query = ' ' + raw_query
+
+ return (
+ f'Site: {hostname}\r\n'
+ f'Command line:{raw_query}\r\n'
+ '\r\n'
+ )
+
+ def _format_footer(self) -> str:
+ """ Returns the footer of the formatted answer for every request,
+ except when an error has occurred in the user's query.
+
+ :return: The footer of the formatted answer as text. """
+
+ return ''
+
+ # ---
+ # Used formatting functions.
+ # ---
+
+ def format_query_error(self, hostname: str, raw_query: str) -> str:
+ """ Returns the formatted answer for when an error has occurred
+ in the user's query.
+
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :return: The formatted answer as text. """
+
+ return (
+ f'Site: {hostname}\r\n'
+ 'You have made a mistake in your query!\r\n'
+ )
+
+ def format_short(
+ self,
+ hostname: str,
+ raw_query: str,
+ users: _Sequence[FingerUser],
+ ) -> str:
+ """ Returns the formatted answer for a user list in the 'short'
+ format.
+
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :param users: The user list.
+ :return: The formatted answer as text. """
+
+ if not users:
+ return 'No user list available.\r\n'
+
+ now = _dt.now().astimezone()
+
+ lst = []
+ for user in users:
+ if not user.sessions:
+ lst.append((user, None))
+ for session in user.sessions:
+ lst.append((user, session))
+
+ def _login(u, s):
+ return u.login
+
+ def _name(u, s):
+ return u.name
+
+ def _line(u, s):
+ return s.line if s and s.line else ''
+
+ def _idle(u, s):
+ return self._format_time(now - s.idle) if s else ''
+
+ def _logt(u, s):
+ return self._format_when(s.start) if s else ''
+
+ def _offic(u, s):
+ return (
+ f'({s.host})' if s.host
+ else u.office if u.office else '')
+
+ columns = (
+ ('Login',) + tuple(_login(u, s) for u, s in lst),
+ ('Name',) + tuple(_name(u, s) for u, s in lst),
+ ('TTY',) + tuple(_line(u, s) for u, s in lst),
+ ('Idle',) + tuple(_idle(u, s) for u, s in lst),
+ ('When',) + tuple(_logt(u, s) for u, s in lst),
+ ('Office',) + tuple(_offic(u, s) for u, s in lst))
+
+ sizes = tuple(max(map(len, c)) + 1 for i, c in enumerate(columns))
+ align = ('<', '<', '<', '^', '^', '<')
+
+ lines = []
+ for line in range(len(columns[0])):
+ lines.append(' '.join(
+ f'{columns[i][line][:sizes[i]]:{align[i]}{sizes[i]}}'
+ for i in range(len(columns))
+ ))
+
+ return (
+ self._format_header(hostname, raw_query) +
+ '\r\n'.join(lines) + '\r\n' +
+ self._format_footer())
+
+ def format_long(
+ self,
+ hostname: str,
+ raw_query: str,
+ users: _Sequence[FingerUser],
+ ) -> str:
+ """ Returns the formatted answer for a user list in the 'long'
+ format.
+
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :param users: The user list.
+ :return: The formatted answer as text. """
+
+ if not users:
+ return 'No user list available.\r\n'
+
+ now = _dt.now().astimezone()
+ res = ''
+
+ for user in users:
+ res += (
+ f'Login name: {user.login[:27]:<27} '
+ f'Name: {user.name if user.name else user.login}\r\n'
+ f'Directory: {user.home[:28] if user.home else "":<28} '
+ f'Shell: {user.shell if user.shell else ""}\r\n'
+ )
+ if user.office:
+ res += f"Office: {user.office if user.office else ''}\r\n"
+
+ if user.sessions:
+ # List current sessions.
+
+ for se in user.sessions:
+ since = (
+ se.start.astimezone(self._tzinfo)
+ .strftime('%a %b %e %R')
+ )
+ tz = self._tzinfo
+ res += f'On since {since} ({tz})'
+ if se.line is not None:
+ res += f' on {se.line}'
+ if se.host is not None:
+ res += f' from {se.host}'
+ res += '\r\n'
+
+ idle = now - se.idle
+ if idle >= _td(seconds=4):
+ res += f' {self._format_idle(idle)}\r\n'
+ elif user.last_login is not None:
+ # Show last login.
+
+ date = (
+ user.last_login.astimezone(self._tzinfo)
+ .strftime('%a %b %e %R'))
+ tz = self._tzinfo
+ res += f'Last login {date} ({tz}) on console\r\n'
+ else:
+ res += 'Never logged in.\r\n'
+
+ if user.plan is None:
+ res += 'No plan.\r\n'
+ else:
+ res += 'Plan:\r\n'
+ res += '\r\n'.join(user.plan.splitlines())
+ res += '\r\n'
+
+ res += '\r\n'
+
+ return (
+ self._format_header(hostname, raw_query) +
+ res +
+ self._format_footer())
# ---
# Interface (dummy) base class.
# ---
-class FingerInterface:
- """ Data source for :py:class:`FingerServer`.
- Provides users and answers for the various queries received
- from the clients by the server.
-
- This class must be subclassed by other interfaces.
- Only methods not starting with an underscore are called by
- instances of :py:class:`FingerServer`; others are utilities
- called by these.
-
- By default, it behaves like a dummy interface. """
- def __repr__(self):
- return f"{self.__class__.__name__}()"
-
- def transmit_query(self, query: _Optional[str], host: str,
- verbose: bool) -> str:
- """ Transmit a user query to a foreign host, and return
- the answer formatted by it.
-
- If used directly (not overridden by subclasses), this
- method will refuse to transmit finger queries.
-
- :param query: The user query, set to None in case of
- no query provided by the client.
- :param host: The distant host to which to transmit the
- query.
- :param verbose: Whether the verbose flag (``/W``, long format)
- has been passed by the current client
- or not.
- :return: The answer formatted by the distant server. """
-
- return "This server won't transmit finger queries.\r\n"
-
- def search_users(self, query: _Optional[str],
- active: _Optional[bool]) -> _Sequence[FingerUser]:
- """ Search for users on the current host using the given query.
-
- :param query: The user query, set to None in case of no
- query provided by the client.
- :param active: Whether to get active users (True),
- inactive users (False), or all users (None).
- :return: The list of users found using the query provided
- by the client. """
-
- return []
-
- def regular(seconds: float = 1):
- """ Decorator for asking the finger server to call the current
- function every second. """
-
- try:
- seconds = float(seconds)
- assert 0 < seconds
- except (TypeError, ValueError, AssertionError):
- raise ValueError("invalid period") from None
-
- def decorator(func):
- func.__period__ = seconds
- return func
-
- return decorator
+class FingerInterface:
+ """ Data source for :py:class:`FingerServer`.
+ Provides users and answers for the various queries received
+ from the clients by the server.
+
+ This class must be subclassed by other interfaces.
+ Only methods not starting with an underscore are called by
+ instances of :py:class:`FingerServer`; others are utilities
+ called by these.
+
+ By default, it behaves like a dummy interface. """
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}()'
+
+ def transmit_query(
+ self,
+ query: _Optional[str],
+ host: str,
+ verbose: bool,
+ ) -> str:
+ """ Transmit a user query to a foreign host, and return
+ the answer formatted by it.
+
+ If used directly (not overridden by subclasses), this
+ method will refuse to transmit finger queries.
+
+ :param query: The user query, set to None in case of
+ no query provided by the client.
+ :param host: The distant host to which to transmit the
+ query.
+ :param verbose: Whether the verbose flag (``/W``, long format)
+ has been passed by the current client
+ or not.
+ :return: The answer formatted by the distant server. """
+
+ return "This server won't transmit finger queries.\r\n"
+
+ def search_users(
+ self,
+ query: _Optional[str],
+ active: _Optional[bool],
+ ) -> _Sequence[FingerUser]:
+ """ Search for users on the current host using the given query.
+
+ :param query: The user query, set to None in case of no
+ query provided by the client.
+ :param active: Whether to get active users (True),
+ inactive users (False), or all users (None).
+ :return: The list of users found using the query provided
+ by the client. """
+
+ return []
# ---
# Server logger base class.
# ---
+
class FingerLogger:
- """ Logger class for :py:class:`FingerServer`.
- Provides methods for allowing custom behaviours on server and
- request events.
+ """ Logger class for :py:class:`FingerServer`.
+ Provides methods for allowing custom behaviours on server and
+ request events.
- This class must be subclassed by other loggers.
- Only methods not starting with an underscore are called by
- instances of :py:class:`FingerServer`; others are utilities called
- by these.
+ This class must be subclassed by other loggers.
+ Only methods not starting with an underscore are called by
+ instances of :py:class:`FingerServer`; others are utilities called
+ by these.
- Unless methods are overridden to have a different behaviour,
- this logger emits messages on the given text stream, by default
- stderr (standard error output). """
+ Unless methods are overridden to have a different behaviour,
+ this logger emits messages on the given text stream, by default
+ stderr (standard error output). """
- def __init__(self, stream = _sys.stderr):
- self._stream = stream
- self._lock = _multip.Lock()
+ def __init__(self, stream=_sys.stderr):
+ self._stream = stream
+ self._lock = _multip.Lock()
- def __repr__(self):
- return f"{self.__class__.__name__}()"
+ def __repr__(self):
+ return f'{self.__class__.__name__}()'
- # ---
- # Internal methods.
- # ---
+ # ---
+ # Internal methods.
+ # ---
- def _write(self, msg: str) -> None:
- """ Write the formatted string on the given stream. """
+ def _write(self, msg: str) -> None:
+ """ Write the formatted string on the given stream. """
- self._lock.acquire()
- print('\r' + msg, file = self._stream)
- self._lock.release()
+ self._lock.acquire()
+ print('\r' + msg, file=self._stream)
+ self._lock.release()
- def _write_s(self, src: str, msg: str) -> None:
- """ Write the formatted string and add the source address of
- the host to the given stream using
- :py:meth:`FingerLogger._write`. """
+ def _write_s(self, src: str, msg: str) -> None:
+ """ Write the formatted string and add the source address of
+ the host to the given stream using
+ :py:meth:`FingerLogger._write`. """
- self._write(f'[{src}] {msg}')
+ self._write(f'[{src}] {msg}')
- # ---
- # Public methods.
- # ---
+ # ---
+ # Public methods.
+ # ---
- def start(self, host: str, port: str) -> None:
- """ This method is called when the server starts listening on
- the given port of the given host.
+ def start(self, host: str, port: str) -> None:
+ """ This method is called when the server starts listening on
+ the given port of the given host.
- :param host: The host on which the server has started
- listening (usually an IPv4 or IPv6 address).
- :param port: The TCP port on which the server has started
- listening. """
+ :param host: The host on which the server has started
+ listening (usually an IPv4 or IPv6 address).
+ :param port: The TCP port on which the server has started
+ listening. """
- self._write(f"Starting fingerd on [{host}]:{port}.")
+ self._write(f'Starting fingerd on [{host}]:{port}.')
- def stop(self, host: str, port: str) -> None:
- """ This method is called when the server stops listening on
- the given port of the given host.
+ def stop(self, host: str, port: str) -> None:
+ """ This method is called when the server stops listening on
+ the given port of the given host.
- :param host: The host on which the server has stopped
- listening (usually an IPv4 or IPv6 address).
- :param port: The TCP port on which the server has stopped
- listening. """
+ :param host: The host on which the server has stopped
+ listening (usually an IPv4 or IPv6 address).
+ :param port: The TCP port on which the server has stopped
+ listening. """
- self._write(f"Stopping fingerd on [{host}]:{port}.")
+ self._write(f'Stopping fingerd on [{host}]:{port}.')
- def no_query(self, source: str) -> None:
- """ This method is called when a connection has been opened with
- the server by a client, but immediately closed without
- emitting a query.
+ def no_query(self, source: str) -> None:
+ """ This method is called when a connection has been opened with
+ the server by a client, but immediately closed without
+ emitting a query.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address). """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address). """
- self._write_s(source, "no query. (possible scan)")
+ self._write_s(source, 'no query. (possible scan)')
- def bad_query(self, source: str) -> None:
- """ This method is called when a client has emitted a bad request.
+ def bad_query(self, source: str) -> None:
+ """ This method is called when a client has emitted a bad request.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address). """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address). """
- self._write_s(source, "bad request.")
+ self._write_s(source, 'bad request.')
- def could_not_answer(self, source: str) -> None:
- """ This method is called when a client has closed the connection
- before or while the server was answering.
+ def could_not_answer(self, source: str) -> None:
+ """ This method is called when a client has closed the connection
+ before or while the server was answering.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address). """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address). """
- self._write_s(source, "could not write the answer.")
+ self._write_s(source, 'could not write the answer.')
- def transmit_list(self, source: str, host: str):
- """ This method is called when a client has requested the server
- to transmit a finger request to another finger server, without
- a user query.
+ def transmit_list(self, source: str, host: str):
+ """ This method is called when a client has requested the server
+ to transmit a finger request to another finger server, without
+ a user query.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address).
- :param host: The host to which the user wants the request
- to be transmitted. """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address).
+ :param host: The host to which the user wants the request
+ to be transmitted. """
- self._write_s(source, f"transmit user list query to `{host}`.")
+ self._write_s(source, f'transmit user list query to `{host}`.')
- def transmit(self, source: str, username: str, host: str) -> None:
- """ This method is called when a client has requested the server
- to transmit a finger request to another finger server, with
- a user query.
+ def transmit(self, source: str, username: str, host: str) -> None:
+ """ This method is called when a client has requested the server
+ to transmit a finger request to another finger server, with
+ a user query.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address).
- :param username: The query the user wants transmitted to
- the provided host.
- :param host: The host to which the user wants the request
- to be transmitted. """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address).
+ :param username: The query the user wants transmitted to
+ the provided host.
+ :param host: The host to which the user wants the request
+ to be transmitted. """
- self._write_s(source, f"transmit user query for `{username}` "
- f"to `{host}`.")
+ self._write_s(
+ source,
+ f'transmit user query for `{username}` to `{host}`.',
+ )
- def search_users(self, source: str, username: str) -> None:
- """ This method is called when a client has requested a user list
- to the current host, with a query provided.
+ def search_users(self, source: str, username: str) -> None:
+ """ This method is called when a client has requested a user list
+ to the current host, with a query provided.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address).
- :param username: The provided user query. """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address).
+ :param username: The provided user query. """
- self._write_s(source, f"look for user `{username}`.")
+ self._write_s(source, f'look for user `{username}`.')
- def list(self, source: str) -> None:
- """ This method is called when a client has requested a user list
- to the current host, without any provided queries.
+ def list_users(self, source: str) -> None:
+ """ This method is called when a client has requested a user list
+ to the current host, without any provided queries.
- :param source: The source address from which the client
- has started the connection (usually an
- IPv4 or IPv6 address). """
+ :param source: The source address from which the client
+ has started the connection (usually an
+ IPv4 or IPv6 address). """
- self._write_s(source, "list connected users.")
+ self._write_s(source, 'list connected users.')
# ---
# Bind-related configuration.
# ---
+
class _BindAddress:
- """ Bind address for fingerd. """
+ """ Bind address for fingerd. """
- class Type(_Enum):
- """ Bind address type. """
+ class Type(_Enum):
+ """ Bind address type. """
- """ TCP on IPv4 bind. """
- TCP_IPv4 = 1
+ """ TCP on IPv4 bind. """
+ TCP_IPv4 = 1
- """ TCP on IPv6 bind. """
- TCP_IPv6 = 2
+ """ TCP on IPv6 bind. """
+ TCP_IPv6 = 2
- def __init__(self, family):
- self._family = self.Type(family)
+ def __init__(self, family):
+ self._family = self.Type(family)
- def __repr__(self):
- return f"{self._class__.__name__}(family = {self._family})"
+ def __repr__(self):
+ return f'{self._class__.__name__}(family = {self._family!r})'
- @property
- def family(self):
- """ Family as one of the `BindAddress.Type` enumeration values. """
+ @property
+ def family(self):
+ """ Family as one of the `BindAddress.Type` enumeration values. """
- return self._family
+ return self._family
class _TCP4Address(_BindAddress):
- """ IPv4 TCP Address. """
+ """ IPv4 TCP Address. """
- def __init__(self, address, port):
- super().__init__(_BindAddress.Type.TCP_IPv4)
+ def __init__(self, address, port):
+ super().__init__(_BindAddress.Type.TCP_IPv4)
- try:
- self._addr = _socket.inet_pton(_socket.AF_INET, address)
- except:
- self._addr = address
+ try:
+ self._addr = _socket.inet_pton(_socket.AF_INET, address)
+ except Exception:
+ self._addr = address
- self._port = port
+ self._port = port
- @property
- def runserver_params(self):
- """ Return the data as `_runserver` parameters. """
+ @property
+ def runserver_params(self):
+ """ Return the data as `_runserver` parameters. """
+
+ return (
+ _socket.AF_INET,
+ _socket.inet_ntop(_socket.AF_INET, self._addr),
+ self._port)
- return (_socket.AF_INET, _socket.inet_ntop(_socket.AF_INET,
- self._addr), self._port)
class _TCP6Address(_BindAddress):
- """ IPv6 TCP Address. """
+ """ IPv6 TCP Address. """
- def __init__(self, address, port):
- super().__init__(_BindAddress.Type.TCP_IPv6)
+ def __init__(self, address, port):
+ super().__init__(_BindAddress.Type.TCP_IPv6)
- try:
- self._addr = _socket.inet_pton(_socket.AF_INET6, address)
- except:
- self._addr = address
+ try:
+ self._addr = _socket.inet_pton(_socket.AF_INET6, address)
+ except Exception:
+ self._addr = address
- self._port = port
+ self._port = port
- @property
- def runserver_params(self):
- """ Return the data as `_runserver` parameters. """
+ @property
+ def runserver_params(self):
+ """ Return the data as `_runserver` parameters. """
- return (_socket.AF_INET6, _socket.inet_ntop(_socket.AF_INET6,
- self._addr), self._port)
+ return (
+ _socket.AF_INET6,
+ _socket.inet_ntop(_socket.AF_INET6, self._addr),
+ self._port,
+ )
class _BindsDecoder:
- """ Binds decoder for fingerd.
-
- Takes a raw string and the protocol name, either 'finger' (the base
- protocol managed by the class) or 'fingerd-control' (the protocol
- used for controlling the live fingerd interface). """
-
- def __init__(self, raw, proto = 'finger'):
- proto = proto.casefold()
- if proto not in ('finger',):
- raise ValueError(f"unsupported protocol {proto}")
-
- self._binds = set()
-
- for x in map(lambda x: x.strip(), raw.split(',')):
- addr = x
-
- # Try to find a scheme.
-
- scheme, *rest = x.split(':/')
- if not rest:
- # No scheme found, let's just guess the scheme based on
- # the situation.
+ """ Binds decoder for fingerd.
- x = scheme
- scheme = {'finger': 'tcp'}[proto]
- else:
- # just don't add the ':' of ':/' again
- x = '/' + ':/'.join(rest)
+ Takes a raw string and the protocol name, either 'finger' (the base
+ protocol managed by the class) or 'fingerd-control' (the protocol
+ used for controlling the live fingerd interface). """
- if (proto == 'finger' and scheme != 'tcp') \
- or scheme not in ('tcp',):
- raise _InvalidBindError(addr, "unsupported scheme "
- f"{repr(scheme)} for protocol {repr(proto)}")
+ def __init__(self, raw, proto='finger'):
+ proto = proto.casefold()
+ if proto not in ('finger',):
+ raise ValueError(f'unsupported protocol {proto}')
- # Decode the address data.
+ self._binds = set()
- if scheme == "tcp":
- self._binds.update(self._decode_tcp_host(x))
+ for x in map(lambda x: x.strip(), raw.split(',')):
+ addr = x
+
+ # Try to find a scheme.
- self._binds = tuple(self._binds)
-
- def __iter__(self):
- return iter(self._binds)
+ scheme, *rest = x.split(':/')
+ if not rest:
+ # No scheme found, let's just guess the scheme based on
+ # the situation.
- def __repr__(self):
- return f"{self._class__.__name__}(binds = {self._binds})"
+ x = scheme
+ scheme = {'finger': 'tcp'}[proto]
+ else:
+ # just don't add the ':' of ':/' again
+ x = '/' + ':/'.join(rest)
+
+ if (
+ (proto == 'finger' and scheme != 'tcp')
+ or scheme not in ('tcp',)
+ ):
+ raise _InvalidBindError(
+ addr,
+ f'Unsupported scheme {scheme!r} for protocol {proto!r}',
+ )
+
+ # Decode the address data.
+
+ if scheme == 'tcp':
+ self._binds.update(self._decode_tcp_host(x))
+
+ self._binds = tuple(self._binds)
+
+ def __iter__(self):
+ return iter(self._binds)
+
+ def __repr__(self):
+ return f'{self._class__.__name__}(binds = {self._binds})'
+
+ def _decode_tcp_host(self, x):
+ """ Decode suitable hosts for a TCP bind. """
+
+ addrs = ()
+ addr = x
+
+ # TODO: manage the '*' case.
+
+ # Get the host part first, we'll decode it later.
+
+ if x[0] == '[':
+ # The host part is an IPv6, look for the closing ']' and
+ # decode it later.
+
+ to = x.find(']')
+ if to < 0:
+ raise _InvalidBindError(
+ addr, "Expected closing ']'")
+
+ host = x[1:to]
+ x = x[to + 1:]
+
+ is_ipv6 = True
+ else:
+ # The host part is either an IPv4 or a host name, look for
+ # the ':' and decode it later.
+
+ host, *x = x.split(':')
+ x = ':' + ':'.join(x)
+
+ is_ipv6 = False
+
+ # Decode the port part.
+
+ if x == '':
+ port = 79
+ elif x[0] == ':':
+ try:
+ port = int(x[1:])
+ except ValueError:
+ try:
+ if x[1:] != '':
+ raise AssertionError('Expected a port number')
+ port = _socket.getservbyname(x[1:])
+ except Exception:
+ raise _InvalidBindError(
+ addr,
+ 'Expected a valid port number or name '
+ f'(got {x[1:]!r})',
+ ) from None
+ else:
+ raise _InvalidBindError(
+ addr, 'Garbage found after the host',
+ )
+
+ # Decode the host part and get the addresses.
+
+ addrs = ()
+ if is_ipv6:
+ # Decode the IPv6 address (validate it using `_socket.inet_pton`).
+
+ ip6 = host
+ _socket.inet_pton(_socket.AF_INET6, host)
+ addrs += (_TCP6Address(ip6, port),)
+ else:
+ # Decode the host (try IPv4, otherwise, resolve domain).
+
+ try:
+ ip = host.split('.')
+ if len(ip) < 2 or len(ip) > 4:
+ raise AssertionError('2 <= len(ip) <= 4')
+
+ ip = list(map(int, ip))
+ if not all(lambda x: 0 <= x < 256, ip):
+ raise AssertionError('non-8-bit component')
+
+ if len(ip) == 2:
+ ip = [ip[0], 0, 0, ip[1]]
+ elif len(ip) == 3:
+ ip = [ip[0], 0, ip[1], ip[2]]
+
+ addrs += (_TCP4Address(ip, port),)
+ except Exception:
+ entries = _socket.getaddrinfo(
+ host, port,
+ proto=_socket.IPPROTO_TCP,
+ type=_socket.SOCK_STREAM)
+
+ for ent in entries:
+ if (
+ ent[0] not in (_socket.AF_INET, _socket.AF_INET6)
+ or ent[1] not in (_socket.SOCK_STREAM,)
+ ):
+ continue
+
+ if ent[0] == _socket.AF_INET:
+ ip = ent[4][0]
+ _socket.inet_pton(_socket.AF_INET, ent[4][0])
+ addrs += (_TCP4Address(ip, port),)
+ else:
+ ip6 = ent[4][0]
+ _socket.inet_pton(_socket.AF_INET6, ent[4][0])
+ addrs += (_TCP6Address(ip6, port),)
+
+ return addrs
- def _decode_tcp_host(self, x):
- """ Decode suitable hosts for a TCP bind. """
-
- addrs = ()
- addr = x
-
- # TODO: manage the '*' case.
-
- # Get the host part first, we'll decode it later.
-
- if x[0] == '[':
- # The host part is an IPv6, look for the closing ']' and
- # decode it later.
-
- to = x.find(']')
- if to < 0:
- raise _InvalidBindAddressError(addr, "expected " \
- "closing ']'")
-
- host = x[1:to]
- x = x[to + 1:]
-
- is_ipv6 = True
- else:
- # The host part is either an IPv4 or a host name, look for
- # the ':' and decode it later.
-
- host, *x = x.split(':')
- x = ':' + ':'.join(x)
-
- is_ipv6 = False
-
- # Decode the port part.
-
- if x == '':
- port = 79
- elif x[0] == ':':
- try:
- port = int(x[1:])
- except:
- try:
- assert x[1:] != ''
- port = _socket.getservbyname(x[1:])
- except:
- raise _InvalidBindError(addr,
- "expected a valid port number or name " \
- f"(got {repr(x[1:])})") from None
- else:
- raise _InvalidBindAddressError(addr,
- "garbage found after the host")
-
- # Decode the host part and get the addresses.
-
- addrs = ()
- if is_ipv6:
- # Decode the IPv6 address (validate it using `_socket.inet_pton`).
-
- ip6 = host
- _socket.inet_pton(_socket.AF_INET6, host)
- addrs += (_TCP6Address(ip6, port),)
- else:
- # Decode the host (try IPv4, otherwise, resolve domain).
-
- try:
- ip = host.split('.')
- assert 2 <= len(ip) <= 4
-
- ip = list(map(int, ip))
- assert all(lambda x: 0 <= x < 256, ip)
-
- if len(ip) == 2:
- ip = [ip[0], 0, 0, ip[1]]
- elif len(ip) == 3:
- ip = [ip[0], 0, ip[1], ip[2]]
-
- addrs += (_TCP4Address(ip, port),)
- except:
- entries = _socket.getaddrinfo(host, port,
- proto = _socket.IPPROTO_TCP,
- type = _socket.SOCK_STREAM)
-
- for ent in entries:
- if ent[0] not in (_socket.AF_INET, _socket.AF_INET6) \
- or ent[1] not in (_socket.SOCK_STREAM,):
- continue
-
- if ent[0] == _socket.AF_INET:
- ip = ent[4][0]
- _socket.inet_pton(_socket.AF_INET, ent[4][0])
- addrs += (_TCP4Address(ip, port),)
- else:
- ip6 = ent[4][0]
- _socket.inet_pton(_socket.AF_INET6, ent[4][0])
- addrs += (_TCP6Address(ip6, port),)
-
- return addrs
# ---
# Finger/TCP server implementation.
# ---
+
class _FingerQuery:
- """ A finger query. Requests information about connected or specific
- users on a remote server.
+ """ A finger query. Requests information about connected or specific
+ users on a remote server.
- There are three types of requests recognized by RFC 1288:
+ There are three types of requests recognized by RFC 1288:
- * {C} is a request for a list of all online users.
- * {Q1} is a request for a local user.
- * {Q2} is a request for a distant user (with hostname).
+ * {C} is a request for a list of all online users.
+ * {Q1} is a request for a local user.
+ * {Q2} is a request for a distant user (with hostname).
- /W means the RUIP (program answering the query) should be more
- verbose (this token can be ignored). """
+ /W means the RUIP (program answering the query) should be more
+ verbose (this token can be ignored). """
- # "By default, this program SHOULD filter any unprintable data,
- # leaving only printable 7-bit characters (ASCII 32 through
- # ASCII 126), tabs (ASCII 9) and CRLFs."
+ # "By default, this program SHOULD filter any unprintable data,
+ # leaving only printable 7-bit characters (ASCII 32 through
+ # ASCII 126), tabs (ASCII 9) and CRLFs."
- allowed_chars = ("\t !\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
- + _string.ascii_letters + _string.digits)
+ allowed_chars = (
+ "\t !\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
+ + _string.ascii_letters + _string.digits)
- def __init__(self, raw):
- """ Initialize the query object by decoding the data. """
+ def __init__(self, raw):
+ """ Initialize the query object by decoding the data. """
- # Get a character string out of the query.
+ # Get a character string out of the query.
- raw = raw.decode('ascii', errors = 'ignore')
- raw = ''.join(c for c in raw if c in self.allowed_chars)
- self.line = raw
+ raw = raw.decode('ascii', errors='ignore')
+ raw = ''.join(c for c in raw if c in self.allowed_chars)
+ self.line = raw
- # Get elements.
+ # Get elements.
- self.host = None
- self.username = None
- self.verbose = False
- for element in raw.split():
- if element[0] == '/':
- if 'W' in element[1:]:
- self.verbose = True
- continue
- elif self.username is not None:
- raise Exception
- self.username = element
+ self.host = None
+ self.username = None
+ self.verbose = False
+ for element in raw.split():
+ if element[0] == '/':
+ if 'W' in element[1:]:
+ self.verbose = True
+ continue
+ elif self.username is not None:
+ raise Exception
+ self.username = element
- if self.username is not None and '@' in self.username:
- self.host, *self.username = self.username.split('@')[::-1]
- self.username = '@'.join(self.username[::-1])
+ if self.username is not None and '@' in self.username:
+ self.host, *self.username = self.username.split('@')[::-1]
+ self.username = '@'.join(self.username[::-1])
class FingerServer:
- """ The main finger server class.
+ """ The main finger server class.
+
+ :param binds: The hosts and ports on which the server should
+ listen to and answer finger requests.
+ :param hostname: The hostname to be included in answers sent
+ to clients.
+ :param interface: The interface to use for querying
+ users and sessions.
+ :param formatter: The formatter to use for formatting
+ answers sent to clients.
+ :param logger: The logger to use for logging server and
+ request events. """
+
+ def __init__(
+ self,
+ binds: str = 'localhost:79',
+ hostname: str = 'LOCALHOST',
+ interface: FingerInterface = FingerInterface(),
+ formatter: FingerFormatter = FingerFormatter(),
+ logger: FingerLogger = FingerLogger(),
+ ):
+ # Check the host name, which should be simple LDH, i.e.
+ # Letters, Digits, Hyphens.
+
+ try:
+ hostname = hostname.upper()
+ if not all(
+ c in _string.ascii_letters + _string.digits + '.-'
+ for c in hostname
+ ):
+ raise AssertionError('Non-LDH hostname')
+ except Exception:
+ raise _HostnameError(hostname)
+
+ # Check the interface and formatter classes.
+
+ if not isinstance(interface, FingerInterface):
+ raise TypeError(
+ 'Please base your interface '
+ 'on the base class provided by the fingerd module',
+ )
+
+ if not isinstance(formatter, FingerFormatter):
+ raise TypeError(
+ 'Please base your formatter '
+ 'on the base class provided by the fingerd module',
+ )
+
+ if not isinstance(logger, FingerLogger):
+ raise TypeError(
+ 'Please base your logger '
+ 'on the base class provided by the fingerd module',
+ )
+
+ # Keep the parameters.
+
+ self._host = hostname
+ self._logger = logger
+ self._interface = interface
+ self._formatter = formatter
+
+ # Check the binds.
+
+ self._binds = [b for b in _BindsDecoder(binds)]
+ if not self._binds:
+ raise _NoBindsError()
+
+ # Initialize multi-process related.
+
+ self._p = None
+
+ @property
+ def hostname(self):
+ """ The hostname configured for this server. """
+
+ return self._host
+
+ @property
+ def logger(self):
+ """ The logger configured for this server. """
+
+ return self._logger
+
+ @property
+ def interface(self):
+ """ The interface configured for this server. """
+
+ return self._interface
+
+ @property
+ def formatter(self):
+ """ The formatter configured for this server. """
+
+ return self._formatter
+
+ def _serve(self):
+ """ Serving process. """
+
+ async def handle_connection(inp, outp):
+ """ Handle a new incoming connection on one of the
+ FingerServer bindings. """
+
+ src, *_ = inp._transport.get_extra_info('peername')
+
+ # Gather the request line.
+
+ try:
+ line = await inp.readline()
+ except ConnectionResetError:
+ self.logger.no_query(src)
+ outp.close()
+ return
+
+ # Decode the request.
+
+ ans = ''
+
+ try:
+ query = _FingerQuery(line)
+ except Exception:
+ self.logger.bad_query(src)
+ ans = self.formatter.format_query_error(
+ self.hostname, line)
+ else:
+ if query.host is not None:
+ if query.username:
+ self.logger.transmit(
+ src, query.username, query.host)
+ else:
+ self.logger.transmit_list(src, query.host)
+
+ ans = self.interface.transmit_query(
+ query.host, query.username, query.verbose)
+ else:
+ if query.username:
+ self.logger.search_users(src, query.username)
+ users = self.interface.search_users(
+ query.username, None)
+ else:
+ self.logger.list_users(src)
+ users = self.interface.search_users(None, True)
- :param binds: The hosts and ports on which the server should
- listen to and answer finger requests.
- :param hostname: The hostname to be included in answers sent
- to clients.
- :param interface: The interface to use for querying
- users and sessions.
- :param formatter: The formatter to use for formatting
- answers sent to clients.
- :param logger: The logger to use for logging server and
- request events. """
+ if query.username or query.verbose:
+ ans = self.formatter.format_long(
+ self.hostname, query.line, users)
+ else:
+ ans = self.formatter.format_short(
+ self.hostname, query.line, users)
- def __init__(self, binds: str = 'localhost:79',
- hostname: str = 'LOCALHOST',
- interface: FingerInterface = FingerInterface(),
- formatter: FingerFormatter = FingerFormatter(),
- logger: FingerLogger = FingerLogger()):
- # Check the host name.
+ # Write the output.
- try:
- hostname = hostname.upper()
- assert all(c in _string.ascii_letters + _string.digits + '.-'
- for c in hostname)
- except:
- raise _HostnameError(hostname)
-
- # Check the interface and formatter classes.
-
- if not isinstance(interface, FingerInterface):
- raise TypeError("please base your interface "
- "on the base class provided by the fingerd module")
-
- if not isinstance(formatter, FingerFormatter):
- raise TypeError("please base your formatter "
- "on the base class provided by the fingerd module")
-
- if not isinstance(logger, FingerLogger):
- raise TypeError("please base your logger "
- "on the base class provided by the fingerd module")
-
- # Keep the parameters.
-
- self._host = hostname
- self._logger = logger
- self._interface = interface
- self._formatter = formatter
-
- # Check the binds.
-
- self._binds = [b for b in _BindsDecoder(binds)]
- if not self._binds:
- raise _NoBindsError()
+ try:
+ ans = '\r\n'.join(ans.splitlines()) + '\r\n'
+ outp.write(ans.encode('ascii', errors='ignore'))
+ except ConnectionResetError:
+ self.logger.could_not_answer(src)
- # Initialize multi-process related.
-
- self._p = None
-
- @property
- def hostname(self):
- """ The hostname configured for this server. """
-
- return self._host
-
- @property
- def logger(self):
- """ The logger configured for this server. """
-
- return self._logger
-
- @property
- def interface(self):
- """ The interface configured for this server. """
-
- return self._interface
-
- @property
- def formatter(self):
- """ The formatter configured for this server. """
-
- return self._formatter
-
- def _serve(self):
- """ Serving process. """
-
- async def handle_connection(inp, outp):
- """ Handle a new incoming connection on one of the
- FingerServer bindings. """
-
- src, *_ = inp._transport.get_extra_info('peername')
-
- # Gather the request line.
-
- try:
- line = await inp.readline()
- except ConnectionResetError:
- self.logger.no_query(src)
- outp.close()
- return
-
- # Decode the request.
-
- ans = ""
-
- try:
- query = _FingerQuery(line)
- except:
- self.logger.bad_query(src)
- ans = self.formatter.format_query_error(
- self.hostname, line)
- else:
- if query.host is not None:
- if query.username:
- self.logger.transmit(src,
- query.username, query.host)
- else:
- self.logger.transmit_list(src, query.host)
+ outp.close()
- ans = self.interface.transmit_query(query.host,
- query.username, query.verbose)
- else:
- if query.username:
- self.logger.search_users(src, query.username)
- users = self.interface.search_users(query.username,
- None)
- else:
- self.logger.list(src)
- users = self.interface.search_users(None, True)
+ async def start_server(bind):
+ """ Start a given server. """
- if query.username or query.verbose:
- ans = self.formatter.format_long(self.hostname,
- query.line, users)
- else:
- ans = self.formatter.format_short(self.hostname,
- query.line, users)
+ family, host, port = bind.runserver_params
- # Write the output.
+ self.logger.start(host, port)
- try:
- ans = '\r\n'.join(ans.splitlines()) + '\r\n'
- outp.write(ans.encode('ascii', errors = 'ignore'))
- except ConnectionResetError:
- self.logger.could_not_answer(src)
+ try:
+ server = await _asyncio.start_server(
+ handle_connection,
+ host=host, port=port, family=family,
+ reuse_address=True)
+ await server.serve_forever()
+ except _asyncio.CancelledError:
+ pass
+ finally:
+ self.logger.stop(host, port)
- outp.close()
+ async def cron_call(func, spec):
+ """ Call a function periodically using a croniter
+ specification. """
+
+ spec.set_current(_dt.now())
+
+ while True:
+ try:
+ func()
+ except SystemExit:
+ return
+
+ while True:
+ seconds = (
+ spec.get_next(_dt) - _dt.now()
+ ).total_seconds()
+ if seconds >= 0:
+ break
+
+ await _asyncio.sleep(seconds) # TODO
+
+ async def start_servers():
+ """ Start the servers. """
+
+ def get_coroutines():
+ """ Tasks iterator. """
+
+ for bind in self._binds:
+ yield start_server(bind)
+
+ for key in dir(self.interface):
+ member = getattr(self.interface, key)
+ if not callable(member):
+ continue
+
+ try:
+ spec = member.__cron__
+ except AttributeError:
+ continue
+
+ if not isinstance(spec, _croniter):
+ continue
+
+ yield cron_call(member, spec)
+
+ tasks = [_asyncio.create_task(co) for co in get_coroutines()]
+ await _asyncio.wait(
+ tasks, return_when=_asyncio.FIRST_COMPLETED)
+
+ exc = None
+ for task in tasks:
+ if exc is None:
+ exc = task.exception()
- async def start_server(bind):
- """ Start a given server. """
+ task.cancel()
+
+ if exc is not None:
+ raise exc
- family, host, port = bind.runserver_params
+ try:
+ _asyncio.run(start_servers())
+ except KeyboardInterrupt:
+ pass
- self.logger.start(host, port)
+ def start(self) -> None:
+ """ Bind all ports for the given hosts and start the underlying
+ servers in separate processes. """
- try:
- server = await _asyncio.start_server(
- handle_connection,
- host = host, port = port, family = family,
- reuse_address = True)
- await server.serve_forever()
- except _asyncio.CancelledError:
- pass
- finally:
- self.logger.stop(host, port)
+ if self._p is not None and self._p.is_alive():
+ return
- async def regular_call(bound_method, seconds):
- while True:
- try:
- bound_method()
- except SystemExit:
- return
-
- await _asyncio.sleep(seconds)
-
- async def start_servers():
- """ Start the servers. """
-
- def get_coroutines():
- """ Tasks iterator. """
-
- for bind in self._binds:
- yield start_server(bind)
-
- for key in dir(self.interface):
- member = getattr(self.interface, key)
- if not callable(member):
- continue
-
- try:
- period = float(member.__period__)
- assert 0 < period
- except (AttributeError, TypeError, ValueError,
- AssertionError):
- continue
-
- yield regular_call(member, period)
-
- tasks = [_asyncio.create_task(co) for co in get_coroutines()]
- await _asyncio.wait(tasks,
- return_when = _asyncio.FIRST_COMPLETED)
-
- exc = None
- for task in tasks:
- if exc is None:
- exc = task.exception()
-
- task.cancel()
-
- if exc is not None:
- raise exc
-
- try:
- _asyncio.run(start_servers())
- except KeyboardInterrupt:
- pass
-
- def start(self) -> None:
- """ Bind all ports for the given hosts and start the underlying
- servers in separate processes. """
-
- if self._p is not None and self._p.is_alive():
- return
-
- self._p = _multip.Process(target = self._serve)
- self._p.start()
+ self._p = _multip.Process(target=self._serve)
+ self._p.start()
- def stop(self) -> None:
- """ Unbind all ports for the given hosts and stop the underlying
- server processes. """
+ def stop(self) -> None:
+ """ Unbind all ports for the given hosts and stop the underlying
+ server processes. """
- if self._p is None or not self._p.is_alive():
- return
+ if self._p is None or not self._p.is_alive():
+ return
- self._p.join()
- self._p = None
+ self._p.kill()
+ self._p.join()
+ self._p = None
- def serve_forever(self) -> None:
- """ Shortcut for synchronously starting all servers using
- :py:meth:`FingerServer.start`, waiting for an interrupt
- signal, and stopping all servers using
- :py:meth:`FingerServer.stop`. """
+ def serve_forever(self) -> None:
+ """ Shortcut for synchronously starting all servers using
+ :py:meth:`FingerServer.start`, waiting for an interrupt
+ signal, and stopping all servers using
+ :py:meth:`FingerServer.stop`. """
- if self._p is not None:
- self.start()
+ if self._p is not None:
+ self.start()
- try:
- while True:
- _signal.pause()
- except KeyboardInterrupt:
- pass
+ try:
+ while True:
+ _signal.pause()
+ except KeyboardInterrupt:
+ pass
- self.stop()
- else:
- # If the server hasn't been started on another process
- # using ``.start()``, we can just start is on this process.
+ self.stop()
+ else:
+ # If the server hasn't been started on another process
+ # using ``.start()``, we can just start is on this process.
- self._serve()
+ self._serve()
- def shutdown(self):
- """ Shutdown the server, alias to `.stop()`. """
+ def shutdown(self):
+ """ Shutdown the server, alias to `.stop()`. """
- self.stop()
+ self.stop()
# End of file.
diff --git a/fingerd/errors.py b/fingerd/errors.py
index 98110e8..6987bdb 100755
--- a/fingerd/errors.py
+++ b/fingerd/errors.py
@@ -1,52 +1,58 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" This file defines the exceptions used throughout the module. """
__all__ = [
- 'ConfigurationError',
- 'BindError', 'NoBindsError', 'InvalidBindError',
- 'HostnameError'
+ 'BindError', 'ConfigurationError', 'HostnameError',
+ 'InvalidBindError', 'NoBindsError',
]
# ---
# Configuration-related errors.
# ---
+
class ConfigurationError(Exception):
- """ Generic exception for when an invalid configuration
- exception occurs. """
+ """ Generic exception for when an invalid configuration
+ exception occurs. """
- pass
+ pass
-class BindError(ConfigurationError):
- """ Exception raised when an error has occurred with the provided
- binds. """
- def __init__(self, msg):
- super().__init__("an error has occurred with the provided binds: "
- + msg)
+class HostnameError(ConfigurationError):
+ """ Exception raised when a host name is invalid. """
-class NoBindsError(BindError):
- """ Exception raised when no binds were provided. """
+ def __init__(self, hostname):
+ super().__init__('invalid host name {}.'.format(repr(hostname)))
- def __init__(self):
- super().__init__("no valid bind")
-class InvalidBindError(BindError):
- """ Exception raised when one of the provided binds came out
- erroneous. """
+class BindError(ConfigurationError):
+ """ Exception raised when an error has occurred with the provided
+ binds. """
- def __init__(self, bind, msg = None):
- super().__init__("one of the provided bind ({}) ".format(repr(bind))
- + "was invalid{}".format(': ' + msg if msg is not None else ''))
+ def __init__(self, msg):
+ super().__init__(
+ f'an error has occurred with the provided binds: {msg}')
-class HostnameError(ConfigurationError):
- """ Exception raised when a host name is invalid. """
- def __init__(self, hostname):
- super().__init__("invalid host name {}.".format(repr(hostname)))
+class NoBindsError(BindError):
+ """ Exception raised when no binds were provided. """
+
+ def __init__(self):
+ super().__init__('no valid bind')
+
+
+class InvalidBindError(BindError):
+ """ Exception raised when one of the provided binds came out
+ erroneous. """
+
+ def __init__(self, bind, msg=None):
+ super().__init__(
+ f'one of the provided bind ({bind!r}) '
+ f'was invalid{": " + msg if msg else ""}',
+ )
# End of file.
diff --git a/fingerd/fiction.py b/fingerd/fiction.py
index 2e68403..8ddb2ec 100755
--- a/fingerd/fiction.py
+++ b/fingerd/fiction.py
@@ -1,33 +1,36 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" Definitions for the finger server fiction interface.
- This file contains everything to decode and use the actions file. """
+ This file contains everything to decode and use the actions file. """
+import copy as _copy
+import math as _math
import os.path as _path
-import copy as _copy, re as _re, math as _math
-import itertools as _itertools
+import re as _re
-from enum import Enum as _Enum
-from datetime import datetime as _dt, timedelta as _td
-from typing import Optional as _Optional, Union as _Union, Tuple as _Tuple
from collections import defaultdict as _defaultdict
from collections.abc import Sequence as _Sequence
+from datetime import datetime as _dt, timedelta as _td
+from enum import Enum as _Enum
+from typing import Optional as _Optional, Union as _Union
from .core import (
- FingerInterface as _FingerInterface,
- FingerUser as _FingerUser,
- FingerSession as _FingerSession)
+ FingerInterface as _FingerInterface,
+ FingerSession as _FingerSession,
+ FingerUser as _FingerUser,
+ cron as _cron,
+)
__all__ = [
- 'FingerAction', 'FingerUserCreationAction', 'FingerUserEditionAction',
- 'FingerUserDeletionAction', 'FingerUserLoginAction',
- 'FingerUserSessionChangeAction', 'FingerUserLogoutAction',
-
- 'FingerFictionInterface',
- 'FingerScenario', 'FingerScenarioInterface']
+ 'FingerAction', 'FingerFictionInterface', 'FingerScenario',
+ 'FingerScenarioInterface', 'FingerUserCreationAction',
+ 'FingerUserDeletionAction', 'FingerUserEditionAction',
+ 'FingerUserLoginAction', 'FingerUserLogoutAction',
+ 'FingerUserSessionChangeAction',
+]
_toml = None
@@ -35,172 +38,186 @@ _toml = None
# Users and sessions for the fictions.
# ---
+
class _FictionalUser(_FingerUser):
- """ Representation of a user on the fictional system,
- compatible with :py:class:`fingerd.core.FingerUser`
- and implementing existing and/or new properties more suited for
- fiction.
+ """ Representation of a user on the fictional system,
+ compatible with :py:class:`fingerd.core.FingerUser`
+ and implementing existing and/or new properties more suited for
+ fiction.
- For now, there are no modifications from the base class. """
+ For now, there are no modifications from the base class. """
+
+ pass
- pass
class _FictionalSession(_FingerSession):
- """ Representation of an active session for a given user
- on the fictional system, compatible with
- :py:class:`fingerd.core.FingerSession`.
+ """ Representation of an active session for a given user
+ on the fictional system, compatible with
+ :py:class:`fingerd.core.FingerSession`.
+
+ The two main modifications from the base class are the following:
+
+ * Each session has a name, which identifies it. It can be used
+ to designate it, allowing outside code to edit and delete the
+ session specifically from an outside point of view.
+ * Since the actions only allow for setting the user idle or not,
+ when the user is active, the idle timestamp is simulated to make
+ it seem like the user makes signs of life at an irregular but
+ reasonable rate, like when the user often stops typing to think
+ or do a task outside of the computer. """
- The two main modifications from the base class are the following:
+ __slots__ = ('_name', '_is_idle', '_idle_last')
- * Each session has a name, which identifies it. It can be used
- to designate it, allowing outside code to edit and delete the
- session specifically from an outside point of view.
- * Since the actions only allow for setting the user idle or not,
- when the user is active, the idle timestamp is simulated to make
- it seem like the user makes signs of life at an irregular but
- reasonable rate, like when the user stops typing to think
- or do a task outside of the computer often. """
+ def __init__(self, *args, name=None, is_idle=False, **kwargs):
+ super().__init__(*args, **kwargs)
- def __init__(self, *args, name = None, is_idle = False, **kwargs):
- super().__init__(*args, **kwargs)
+ self._name = None if name is None else str(name)
+ self._is_idle = is_idle
+ self._idle_last = self.start
- self._name = None if name is None else str(name)
- self._is_idle = is_idle
- self._idle_last = self.start
+ def __repr__(self):
+ p = ('name', 'start', 'line', 'host', 'idle_since', 'active_since')
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in p if getattr(self, x) is not None
+ )
+ return f"{self.__class__.__name__}({', '.join(p)})"
- def __repr__(self):
- p = ('name', 'start', 'line', 'host', 'idle_since', 'active_since')
- p = (f"{x} = {repr(getattr(self, x))}" for x in p \
- if getattr(self, x) is not None)
- return f"{self.__class__.__name__}({', '.join(p)})"
+ @property
+ def name(self):
+ """ Session name (for identifying). """
- @property
- def name(self):
- """ Session name (for identifying). """
+ return self._name
- return self._name
+ @name.setter
+ def name(self, value):
+ self._name = None if value is None else str(value)
- @name.setter
- def name(self, value):
- self._name = None if value is None else str(value)
+ @property
+ def idle(self):
+ """ Idle time (simulated). """
- @property
- def idle(self):
- """ Idle time (simulated). """
+ if self._is_idle:
+ return self._idle_last
- if self._is_idle:
- return self._idle_last
+ now = _dt.now().astimezone()
- now = _dt.now().astimezone()
+ # Generate a number of seconds and return it.
- # Generate a number of seconds and return it.
+ def s(x):
+ return _math.sin(x * (_math.pi / 2))
- x = (now - self._idle_last).seconds
- s = lambda x: _math.sin(x * (_math.pi / 2))
- randsecs = int(abs(s(x) + s(x / 4)))
+ x = (now - self._idle_last).seconds
+ randsecs = int(abs(s(x) + s(x / 4)))
- return now - _td(seconds = randsecs)
+ return now - _td(seconds=randsecs)
- @idle.setter
- def idle(self, value):
- # Does nothing as we manage this time.
+ @idle.setter
+ def idle(self, value):
+ # Does nothing as we manage this time.
- pass
+ pass
- @property
- def idle_since(self):
- """ Idle since the given time. """
+ @property
+ def idle_since(self):
+ """ Idle since the given time. """
- if not self._is_idle:
- return None
- return self._idle_last
+ if not self._is_idle:
+ return None
+ return self._idle_last
- @idle_since.setter
- def idle_since(self, value):
- self._is_idle = True
- self._idle_last = value
+ @idle_since.setter
+ def idle_since(self, value):
+ self._is_idle = True
+ self._idle_last = value
- @property
- def active_since(self):
- """ Active since the given time. """
+ @property
+ def active_since(self):
+ """ Active since the given time. """
- if self._is_idle:
- return None
- return self._idle_last
+ if self._is_idle:
+ return None
+ return self._idle_last
+
+ @active_since.setter
+ def active_since(self, value):
+ self._is_idle = False
+ self._idle_last = value
- @active_since.setter
- def active_since(self, value):
- self._is_idle = False
- self._idle_last = value
# ---
# Parse a delta string.
# ---
-__delta0_re = _re.compile(r"(-?)(([0-9]+[a-z]+)+)")
-__delta1_re = _re.compile(r"([0-9]+)([a-z]+)")
+__delta0_re = _re.compile(r'(-?)(([0-9]+[a-z]+)+)')
+__delta1_re = _re.compile(r'([0-9]+)([a-z]+)')
+
def _parse_delta(raw):
- """ Parse a delta string as found in the configuration files. """
-
- try:
- delta = _td()
-
- sign, elements, _ = __delta0_re.fullmatch(raw).groups()
- sign = (1, -1)[len(sign)]
- for res in __delta1_re.finditer(elements):
- num, typ = res.groups()
- num = int(num)
-
- if typ == 'w':
- delta += _td(weeks = sign * num)
- elif typ in 'jd':
- delta += _td(days = sign * num)
- elif typ == 'h':
- delta += _td(hours = sign * num)
- elif typ == 'm':
- delta += _td(minutes = sign * num)
- elif typ == 's':
- delta += _td(seconds = sign * num)
- else:
- raise Exception
-
- return delta
- except Exception as e:
- return None
+ """ Parse a delta string as found in the configuration files. """
+
+ try:
+ delta = _td()
+
+ sign, elements, _ = __delta0_re.fullmatch(raw).groups()
+ sign = (1, -1)[len(sign)]
+ for res in __delta1_re.finditer(elements):
+ num, typ = res.groups()
+ num = int(num)
+
+ if typ == 'w':
+ delta += _td(weeks=sign * num)
+ elif typ in 'jd':
+ delta += _td(days=sign * num)
+ elif typ == 'h':
+ delta += _td(hours=sign * num)
+ elif typ == 'm':
+ delta += _td(minutes=sign * num)
+ elif typ == 's':
+ delta += _td(seconds=sign * num)
+ else:
+ raise Exception
+
+ return delta
+ except Exception:
+ return None
+
def _format_delta(td):
- """ Create a delta string. """
+ """ Create a delta string. """
- sls = zip((_td(days = 7), _td(days = 1), _td(seconds = 3600),
- _td(seconds = 60)), 'wdhm')
+ sls = zip(
+ (_td(days=7), _td(days=1), _td(seconds=3600), _td(seconds=60)),
+ 'wdhm',
+ )
- if td >= _td():
- d = ''
+ if td >= _td():
+ d = ''
- for span, letter in sls:
- n = td // span
- if n:
- d += f"{n}{letter}"
- td -= span * n
+ for span, letter in sls:
+ n = td // span
+ if n:
+ d += f'{n}{letter}'
+ td -= span * n
- s = td.seconds
- if not d or s:
- d += f"{s}s"
- else:
- d = '-'
+ s = td.seconds
+ if not d or s:
+ d += f'{s}s'
+ else:
+ d = '-'
- for span, letter in sls:
- n = -td // span
- if n:
- d += f"{n}{letter}"
- td += span * n
+ for span, letter in sls:
+ n = -td // span
+ if n:
+ d += f'{n}{letter}'
+ td += span * n
- s = (-td).seconds
- if s:
- d += f"{s}s"
+ s = (-td).seconds
+ if s:
+ d += f'{s}s'
+
+ return d
- return d
# ---
# Unchanged global.
@@ -209,9 +226,11 @@ def _format_delta(td):
# TODO: Make that ``UnchangedType()`` (or ``type(Unchanged)()``) always
# return Unchanged, like for None and NoneType.
+
class _UnchangedType:
- def __repr__(self):
- return "Unchanged"
+ def __repr__(self):
+ return 'Unchanged'
+
Unchanged = _UnchangedType()
@@ -219,945 +238,1019 @@ Unchanged = _UnchangedType()
# Action description.
# ---
+
class FingerAction:
- """ Base class for actions in a fiction. """
+ """ Base class for actions in a fiction. """
+
+ pass
- pass
class FingerUserCreationAction(FingerAction):
- """ A user has been created.
+ """ A user has been created.
+
+ :param login: The login of the user that is being created.
+ :param name: The initial value for :py:attr:`_FictionalUser.name`.
+ :param home: The initial value for :py:attr:`_FictionalUser.home`.
+ :param shell: The initial value for :py:attr:`_FictionalUser.shell`.
+ :param office: The initial value for :py:attr:`_FictionalUser.office`.
+ :param plan: The initial value for :py:attr:`_FictionalUser.plan`. """
- :param login: The login of the user that is being created.
- :param name: The initial value for :py:attr:`_FictionalUser.name`.
- :param home: The initial value for :py:attr:`_FictionalUser.home`.
- :param shell: The initial value for :py:attr:`_FictionalUser.shell`.
- :param office: The initial value for :py:attr:`_FictionalUser.office`.
- :param plan: The initial value for :py:attr:`_FictionalUser.plan`. """
+ def __init__(
+ self,
+ login: str,
+ name: _Union[str, None] = None,
+ home: _Union[str, None] = None,
+ shell: _Union[str, None] = None,
+ office: _Union[str, None] = None,
+ plan: _Union[str, None] = None,
+ ):
+ super().__init__()
+ self._login = login
+ self._name = name
+ self._home = home
+ self._shell = shell
+ self._office = office
+ self._plan = plan
- def __init__(self, login: str,
- name: _Union[str, None] = None,
- home: _Union[str, None] = None,
- shell: _Union[str, None] = None,
- office: _Union[str, None] = None,
- plan: _Union[str, None] = None):
- super().__init__()
- self._login = login
- self._name = name
- self._home = home
- self._shell = shell
- self._office = office
- self._plan = plan
+ def __repr__(self):
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in ('login', 'name', 'home', 'shell', 'office', 'plan'))
+ return f"{self.__class__.__name__}({', '.join(p)})"
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login',
- 'name', 'home', 'shell', 'office', 'plan'))
- return f"{self.__class__.__name__}({', '.join(p)})"
+ @property
+ def login(self) -> str:
+ """ The login of the user that is being created. """
- @property
- def login(self) -> str:
- """ The login of the user that is being created. """
+ return self._login
- return self._login
+ @property
+ def name(self) -> _Union[str, None]:
+ """ The initial value for :py:attr:`_FictionalUser.name`. """
- @property
- def name(self) -> _Union[str, None]:
- """ The initial value for :py:attr:`_FictionalUser.name`. """
+ return self._name
- return self._name
+ @property
+ def home(self) -> _Union[str, None]:
+ """ The initial value for :py:attr:`_FictionalUser.home`. """
- @property
- def home(self) -> _Union[str, None]:
- """ The initial value for :py:attr:`_FictionalUser.home`. """
+ return self._home
- return self._home
+ @property
+ def shell(self) -> _Union[str, None]:
+ """ The initial value for :py:attr:`_FictionalUser.shell`. """
- @property
- def shell(self) -> _Union[str, None]:
- """ The initial value for :py:attr:`_FictionalUser.shell`. """
+ return self._shell
- return self._shell
+ @property
+ def office(self) -> _Union[str, None]:
+ """ The initial value for :py:attr:`_FictionalUser.office`. """
- @property
- def office(self) -> _Union[str, None]:
- """ The initial value for :py:attr:`_FictionalUser.office`. """
+ return self._office
- return self._office
+ @property
+ def plan(self) -> _Union[str, None]:
+ """ The initial value for :py:attr:`_FictionalUser.plan`. """
- @property
- def plan(self) -> _Union[str, None]:
- """ The initial value for :py:attr:`_FictionalUser.plan`. """
+ return self._plan
- return self._plan
class FingerUserEditionAction(FingerAction):
- """ A user has been edited.
-
- :param login: The login of the user that is being edited.
- :param name: The new value for :py:attr:`_FictionalUser.name`;
- :py:data:`Unchanged` if the property is unchanged.
- :param home: The new value for :py:attr:`_FictionalUser.home`;
- :py:data:`Unchanged` if the property is unchanged.
- :param shell: The new value for :py:attr:`_FictionalUser.shell`;
- :py:data:`Unchanged` if the property is unchanged.
- :param office: The new value for :py:attr:`_FictionalUser.office`;
- :py:data:`Unchanged` if the property is unchanged.
- :param plan: The new value for :py:attr:`_FictionalUser.plan`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- def __init__(self, login: str,
- name: _Union[str, None, _UnchangedType] = Unchanged,
- home: _Union[str, None, _UnchangedType] = Unchanged,
- shell: _Union[str, None, _UnchangedType] = Unchanged,
- office: _Union[str, None, _UnchangedType] = Unchanged,
- plan: _Union[str, None, _UnchangedType] = Unchanged):
- super().__init__()
- self._login = login
- self._name = name
- self._home = home
- self._shell = shell
- self._office = office
- self._plan = plan
-
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login',
- 'name', 'home', 'shell', 'office', 'plan'))
- return f"{self.__class__.__name__}({', '.join(p)})"
-
- @property
- def login(self) -> str:
- """ The login of the user that is being edited. """
-
- return self._login
-
- @property
- def name(self) -> _Union[str, None, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalUser.name`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- return self._name
-
- @property
- def home(self) -> _Union[str, None, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalUser.home`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- return self._home
-
- @property
- def shell(self) -> _Union[str, None, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalUser.shell`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- return self._shell
-
- @property
- def office(self) -> _Union[str, None, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalUser.office`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- return self._office
-
- @property
- def plan(self) -> _Union[str, None, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalUser.plan`;
- :py:data:`Unchanged` if the property is unchanged. """
-
- return self._plan
+ """ A user has been edited.
+
+ :param login: The login of the user that is being edited.
+ :param name: The new value for :py:attr:`_FictionalUser.name`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param home: The new value for :py:attr:`_FictionalUser.home`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param shell: The new value for :py:attr:`_FictionalUser.shell`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param office: The new value for :py:attr:`_FictionalUser.office`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param plan: The new value for :py:attr:`_FictionalUser.plan`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ def __init__(
+ self,
+ login: str,
+ name: _Union[str, None, _UnchangedType] = Unchanged,
+ home: _Union[str, None, _UnchangedType] = Unchanged,
+ shell: _Union[str, None, _UnchangedType] = Unchanged,
+ office: _Union[str, None, _UnchangedType] = Unchanged,
+ plan: _Union[str, None, _UnchangedType] = Unchanged,
+ ):
+ super().__init__()
+ self._login = login
+ self._name = name
+ self._home = home
+ self._shell = shell
+ self._office = office
+ self._plan = plan
+
+ def __repr__(self):
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in ('login', 'name', 'home', 'shell', 'office', 'plan'))
+ return f"{self.__class__.__name__}({', '.join(p)})"
+
+ @property
+ def login(self) -> str:
+ """ The login of the user that is being edited. """
+
+ return self._login
+
+ @property
+ def name(self) -> _Union[str, None, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalUser.name`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._name
+
+ @property
+ def home(self) -> _Union[str, None, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalUser.home`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._home
+
+ @property
+ def shell(self) -> _Union[str, None, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalUser.shell`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._shell
+
+ @property
+ def office(self) -> _Union[str, None, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalUser.office`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._office
+
+ @property
+ def plan(self) -> _Union[str, None, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalUser.plan`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._plan
+
class FingerUserDeletionAction(FingerAction):
- """ A user has been deleted.
+ """ A user has been deleted.
+
+ :param login: The login of the user to delete. """
- :param login: The login of the user to delete. """
+ def __init__(self, login: str):
+ super().__init__()
+ self._login = login
- def __init__(self, login: str):
- super().__init__()
- self._login = login
+ def __repr__(self):
+ p = (f'{x} = {getattr(self, x)!r}' for x in ('login'))
+ return f"{self.__class__.__name__}({', '.join(p)})"
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login'))
- return f"{self.__class__.__name__}({', '.join(p)})"
+ @property
+ def login(self) -> str:
+ """ The user's login. """
- @property
- def login(self) -> str:
- """ The user's login. """
+ return self._login
- return self._login
class FingerUserLoginAction(FingerAction):
- """ A user has logged in.
+ """ A user has logged in.
+
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to create.
+ :param line: The new value for :py:attr:`_FictionalSession.line`.
+ :param host: The new value for :py:attr:`_FictionalSession.host`. """
- :param login: The login of the user to edit.
- :param session_name: The name of the session to create.
- :param line: The new value for :py:attr:`_FictionalSession.line`.
- :param host: The new value for :py:attr:`_FictionalSession.host`. """
+ def __init__(
+ self,
+ login: str,
+ session_name: _Union[str, None] = None,
+ line: _Union[str, None] = None,
+ host: _Union[str, None] = None,
+ ):
+ super().__init__()
+ self._login = login
+ self._session = session_name
+ self._line = line
+ self._host = host
- def __init__(self, login: str,
- session_name: _Union[str, None] = None,
- line: _Union[str, None] = None,
- host: _Union[str, None] = None):
- super().__init__()
- self._login = login
- self._session = session_name
- self._line = line
- self._host = host
+ def __repr__(self):
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in ('login', 'session_name'))
+ return f"{self.__class__.__name__}({', '.join(p)})"
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login',
- 'session_name'))
- return f"{self.__class__.__name__}({', '.join(p)})"
+ @property
+ def login(self) -> str:
+ """ The login of the user to edit """
- @property
- def login(self) -> str:
- """ The login of the user to edit """
+ return self._login
- return self._login
+ @property
+ def session_name(self) -> _Union[str, None]:
+ """ The name of the session to create. """
- @property
- def session_name(self) -> _Union[str, None]:
- """ The name of the session to create. """
+ return self._session
- return self._session
+ @property
+ def line(self) -> _Union[str, None]:
+ """ The name of the line from which the user has logged in. """
- @property
- def line(self) -> _Union[str, None]:
- """ The name of the line from which the user has logged in. """
+ return self._line
- return self._line
+ @property
+ def host(self) -> _Union[str, None]:
+ """ The name of the host from which the user has logged in. """
- @property
- def host(self) -> _Union[str, None]:
- """ The name of the host from which the user has logged in. """
+ return self._host
- return self._host
class FingerUserSessionChangeAction(FingerAction):
- """ A user session has undergone modifications.
+ """ A user session has undergone modifications.
- :param login: The login of the user to edit.
- :param session_name: The name of the session to edit.
- :param is_idle: The new value for
- :py:attr:`_FictionalSession.is_idle`;
- :py:data:`Unchanged` if the property is unchanged. """
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to edit.
+ :param is_idle: The new value for
+ :py:attr:`_FictionalSession.is_idle`;
+ :py:data:`Unchanged` if the property is unchanged. """
- def __init__(self, login: str,
- session_name: _Union[str, None] = None,
- idle: _Union[bool, _UnchangedType] = Unchanged):
- super().__init__()
- self._login = login
- self._session = session_name
- self._idle = bool(idle)
+ def __init__(
+ self,
+ login: str,
+ session_name: _Optional[str] = None,
+ idle: _Union[bool, _UnchangedType] = Unchanged,
+ ):
+ super().__init__()
+ self._login = login
+ self._session = session_name
+ self._idle = bool(idle)
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login',
- 'session_name', 'idle'))
- return f"{self.__class__.__name__}({', '.join(p)})"
+ def __repr__(self):
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in ('login', 'session_name', 'idle'))
+ return f"{self.__class__.__name__}({', '.join(p)})"
- @property
- def login(self) -> str:
- """ The login of the user to edit. """
+ @property
+ def login(self) -> str:
+ """ The login of the user to edit. """
- return self._login
+ return self._login
- @property
- def session_name(self) -> _Union[str, None]:
- """ The name of the session to edit. """
+ @property
+ def session_name(self) -> _Union[str, None]:
+ """ The name of the session to edit. """
- return self._session
+ return self._session
- @property
- def idle(self) -> _Union[bool, _UnchangedType]:
- """ The new value for :py:attr:`_FictionalSession.is_idle`;
- :py:data:`Unchanged` if the property is unchanged. """
+ @property
+ def idle(self) -> _Union[bool, _UnchangedType]:
+ """ The new value for :py:attr:`_FictionalSession.is_idle`;
+ :py:data:`Unchanged` if the property is unchanged. """
+
+ return self._idle
- return self._idle
class FingerUserLogoutAction(FingerAction):
- """ A user has logged out.
+ """ A user has logged out.
+
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to delete. """
- :param login: The login of the user to edit.
- :param session_name: The name of the session to delete. """
+ def __init__(
+ self,
+ login: str,
+ session_name: _Optional[str] = None,
+ ):
+ super().__init__()
+ self._login = login
+ self._session = session_name
- def __init__(self, login: str,
- session_name: _Union[str, None] = None):
- super().__init__()
- self._login = login
- self._session = session_name
+ def __repr__(self):
+ p = (
+ f'{x} = {getattr(self, x)!r}'
+ for x in ('login', 'session_name'))
+ return f'{self.__class__.__name__}({", ".join(p)})'
- def __repr__(self):
- p = (f"{x} = {repr(getattr(self, x))}" for x in ('login',
- 'session_name'))
- return f"{self.__class__.__name__}({', '.join(p)})"
+ @property
+ def login(self) -> str:
+ """ The login of the user to edit. """
- @property
- def login(self) -> str:
- """ The login of the user to edit. """
+ return self._login
- return self._login
+ @property
+ def session_name(self) -> _Union[str, None]:
+ """ The name of the session to delete. """
- @property
- def session_name(self) -> _Union[str, None]:
- """ The name of the session to delete. """
+ return self._session
- return self._session
# ---
# Base fiction interface.
# ---
+
class FingerFictionInterface(_FingerInterface):
- """ Base finger fiction interface for managing a scene and updating
- it using sequences of actions and given times.
-
- The basic state for this class is to have no users; it is possible
- at any point in time to
-
- This class should be subclassed for interfaces specialized in various
- sources for the data; for example, while
- :py:class:`FingerScenarioInterface` is specialized in using a
- static sequence of actions, another class could read events from
- a live source. """
-
- def __init__(self, start: _Optional[_dt] = None):
- if start is None:
- start = _dt.now().astimezone()
-
- super().__init__()
-
- # Initialize the object properties.
- # - `users`: the users.
- # - `lasttime`: the last datetime, in order to raise an
- # exception if not applied in order.
-
- self._users = {}
- self._lasttime = None
-
- # ---
- # Expected methods from an interface.
- # ---
-
- def search_users(self, query: _Optional[str],
- active: _Optional[bool]) -> _Sequence[_FictionalUser]:
- """ Look for users according to a check. """
-
- return ([_copy.deepcopy(user) for user in self._users.values()
- if not (query is not None and query not in user.login)
- and not (active is not None and active != bool(user.sessions))])
-
- # ---
- # Elements proper to the fiction interface.
- # ---
-
- def reset(self):
- """ Reset the interface (revert all actions). """
-
- self._users = {}
- self._lasttime = None
-
- def apply(self, action, time: _Optional[_dt] = None):
- """ Apply an action. """
-
- if time is None:
- time = _dt.now().astimezone()
-
- if self._lasttime is not None and self._lasttime > time:
- raise ValueError("operations weren't applied in order!")
-
- self._lasttime = time
-
- if isinstance(action, FingerUserCreationAction):
- # Create user `action.user`.
-
- if action.login is None:
- raise ValueError("missing login")
- if action.login in self._users:
- raise ValueError("already got a user with that login")
-
- user = _FictionalUser(login = action.login, name = action.name)
- user.shell = action.shell
- user.home = action.home
- user.office = action.office
- user.plan = action.plan
-
- self._users[user.login] = user
- elif isinstance(action, FingerUserEditionAction):
- # Edit user `action.user` with the given modifications.
-
- if action.login is None:
- raise ValueError("missing login")
- if not action.login in self._users:
- raise ValueError("got no user with login "
- f"{repr(action.login)}")
-
- user = self._users[action.login]
- if action.name is not Unchanged:
- user.name = action.name
- if action.shell is not Unchanged:
- user.shell = action.shell
- if action.home is not Unchanged:
- user.home = action.home
- if action.office is not Unchanged:
- user.office = action.office
- if action.plan is not Unchanged:
- user.plan = action.plan
- elif isinstance(action, FingerUserDeletionAction):
- # Delete user with login `action.login`.
-
- if action.login is None:
- raise ValueError("missing login")
- if not action.login in self._users:
- raise ValueError("got no user with login "
- f"{repr(action.login)}")
-
- del self._users[action.login]
- elif isinstance(action, FingerUserLoginAction):
- # Login as user `action.login` with session `action.session_name`.
-
- session = _FictionalSession(
- time = time,
- name = action.session_name)
- session.line = action.line
- session.host = action.host
-
- if action.login is None:
- raise ValueError("missing login")
-
- try:
- user = self._users[action.login]
- except KeyError:
- raise ValueError("got no user with login "
- f"{repr(action.login)}") from None
-
- # We don't check if the session exists or not; multiple
- # sessions can have the same name, we just act on the last
- # inserted one that still exists and has that name.
-
- user.sessions.add(session)
- if user.last_login is None or user.last_login < session.start:
- user.last_login = session.start
- elif isinstance(action, FingerUserLogoutAction):
- # Logout as user `action.login` from session `action.session_name`.
-
- if action.login is None:
- raise ValueError("missing login")
-
- try:
- user = self._users[action.login]
- except KeyError:
- raise ValueError("got no user with login " \
- f"{repr(action.login)}") from None
-
- try:
- del user.sessions[action.session_name]
- except (KeyError, IndexError):
- raise ValueError(f"got no session {repr(action.name)} "
- f"for user {repr(action.login)}") from None
- elif isinstance(action, FingerUserSessionChangeAction):
- # Make user with login `action.login` idle.
-
- if action.login is None:
- raise ValueError("missing login")
-
- try:
- user = self._users[action.login]
- except KeyError:
- raise ValueError("got no user with login " \
- f"{repr(action.login)}") from None
-
- try:
- session = user.sessions[action.session_name]
- except (KeyError, IndexError):
- raise ValueError(f"got no session {repr(action.name)} "
- f"for user {repr(action.login)}") from None
-
- since = time
- if action.idle is Unchanged:
- pass
- elif action.idle:
- session.idle_since = since
- else:
- session.active_since = since
+ """ Base finger fiction interface for managing a scene and updating
+ it using sequences of actions and given times.
+
+ The basic state for this class is to have no users; it is possible
+ at any point in time to (TODO)
+
+ This class should be subclassed for interfaces specialized in various
+ sources for the data; for example, while
+ :py:class:`FingerScenarioInterface` is specialized in using a
+ static sequence of actions, another class could read events from
+ a live source. """
+
+ def __init__(self, start: _Optional[_dt] = None):
+ if start is None:
+ start = _dt.now().astimezone()
+
+ super().__init__()
+
+ # Initialize the object properties.
+ # - `users`: the users.
+ # - `lasttime`: the last datetime, in order to raise an
+ # exception if not applied in order.
+
+ self._users = {}
+ self._lasttime = None
+
+ # ---
+ # Expected methods from an interface.
+ # ---
+
+ def search_users(
+ self,
+ query: _Optional[str],
+ active: _Optional[bool],
+ ) -> _Sequence[_FictionalUser]:
+ """ Look for users according to a check. """
+
+ return ([
+ _copy.deepcopy(user) for user in self._users.values()
+ if not (query is not None and query not in user.login)
+ and not (active is not None and active != bool(user.sessions))])
+
+ # ---
+ # Elements proper to the fiction interface.
+ # ---
+
+ def reset(self):
+ """ Reset the interface (revert all actions). """
+
+ self._users = {}
+ self._lasttime = None
+
+ def apply(self, action, time: _Optional[_dt] = None):
+ """ Apply an action. """
+
+ if time is None:
+ time = _dt.now().astimezone()
+
+ if self._lasttime is not None and self._lasttime > time:
+ raise ValueError("operations weren't applied in order!")
+
+ self._lasttime = time
+
+ if isinstance(action, FingerUserCreationAction):
+ # Create user `action.user`.
+
+ if action.login is None:
+ raise ValueError('missing login')
+ if action.login in self._users:
+ raise ValueError('already got a user with that login')
+
+ user = _FictionalUser(login=action.login, name=action.name)
+ user.shell = action.shell
+ user.home = action.home
+ user.office = action.office
+ user.plan = action.plan
+
+ self._users[user.login] = user
+ elif isinstance(action, FingerUserEditionAction):
+ # Edit user `action.user` with the given modifications.
+
+ if action.login is None:
+ raise ValueError('missing login')
+ if action.login not in self._users:
+ raise ValueError(
+ f'got no user with login {action.login!r}',
+ )
+
+ user = self._users[action.login]
+ if action.name is not Unchanged:
+ user.name = action.name
+ if action.shell is not Unchanged:
+ user.shell = action.shell
+ if action.home is not Unchanged:
+ user.home = action.home
+ if action.office is not Unchanged:
+ user.office = action.office
+ if action.plan is not Unchanged:
+ user.plan = action.plan
+ elif isinstance(action, FingerUserDeletionAction):
+ # Delete user with login `action.login`.
+
+ if action.login is None:
+ raise ValueError('missing login')
+ if action.login not in self._users:
+ raise ValueError(
+ f'got no user with login {action.login!r}',
+ )
+
+ del self._users[action.login]
+ elif isinstance(action, FingerUserLoginAction):
+ # Login as user `action.login` with session `action.session_name`.
+
+ session = _FictionalSession(
+ time=time,
+ name=action.session_name,
+ )
+
+ session.line = action.line
+ session.host = action.host
+
+ if action.login is None:
+ raise ValueError('missing login')
+
+ try:
+ user = self._users[action.login]
+ except KeyError:
+ raise ValueError(
+ f'got no user with login {action.login!r}',
+ ) from None
+
+ # We don't check if the session exists or not; multiple
+ # sessions can have the same name, we just act on the last
+ # inserted one that still exists and has that name.
+
+ user.sessions.add(session)
+ if user.last_login is None or user.last_login < session.start:
+ user.last_login = session.start
+ elif isinstance(action, FingerUserLogoutAction):
+ # Logout as user `action.login` from session `action.session_name`.
+
+ if action.login is None:
+ raise ValueError('missing login')
+
+ try:
+ user = self._users[action.login]
+ except KeyError:
+ raise ValueError(
+ f'got no user with login {action.login!r}',
+ ) from None
+
+ try:
+ del user.sessions[action.session_name]
+ except (KeyError, IndexError):
+ raise ValueError(
+ f'got no session {action.name!r} '
+ f'for user {action.login!r}') from None
+ elif isinstance(action, FingerUserSessionChangeAction):
+ # Make user with login `action.login` idle.
+
+ if action.login is None:
+ raise ValueError('missing login')
+
+ try:
+ user = self._users[action.login]
+ except KeyError:
+ raise ValueError(
+ 'got no user with login '
+ f'{action.login!r}',
+ ) from None
+
+ try:
+ session = user.sessions[action.session_name]
+ except (KeyError, IndexError):
+ raise ValueError(
+ f'got no session {action.name!r} '
+ f'for user {action.login!r}') from None
+
+ since = time
+ if action.idle is Unchanged:
+ pass
+ elif action.idle:
+ session.idle_since = since
+ else:
+ session.active_since = since
+
# ---
# Scenario definition and related interface definition.
# ---
-class FingerScenario:
- """ Scenario for fingerd, consisting of actions (as instances of subclasses
- of :py:class:`FingerAction`) located at a given timedelta, with
- a given ending type and time.
-
- A scenario always uses timedeltas and not datetimes, since it can
- start at any arbitrary point in time and some scenarios are even
- on repeat. """
-
- class EndingType(_Enum):
- """ Ending type.
-
- .. data:: FREEZE
-
- Freeze the end state forever.
-
- .. data:: STOP
-
- Stop the server as soon as the scenario has reached
- the ending time.
-
- .. data:: REPEAT
-
- Repeat the scenario from the beginning while
- starting again from the initial state. """
-
- FREEZE = 0
- STOP = 1
- REPEAT = 2
-
- def __init__(self):
- # Initialize the properties.
- self._end_type = None
- self._end_time = None
- self._actions = []
-
- def verify(self) -> None:
- """ Verify that the current fiction is valid.
- Raises ValueError in case it is not. """
-
- # Check if the ending is well defined.
-
- if self._end_time is None:
- raise ValueError("ending time (duration) has not been provided")
- if not isinstance(self._end_type, self.EndingType):
- raise ValueError("ending type has not been provided")
-
- # Check if the events are coherent.
-
- users = _defaultdict(lambda: False)
- sessions = _defaultdict(lambda: _defaultdict(lambda: 0))
-
- for i, (time, action, _) in enumerate(self._actions):
- try:
- if time >= self._end_time:
- # Action will be ignored.
-
- pass
- elif isinstance(action, FingerUserCreationAction):
- if users[action.login]:
- # The user we're trying to create already exists.
-
- raise ValueError("trying to create user "
- f"{repr(action.login)} which already exists")
-
- users[action.login] = True
- elif isinstance(action, FingerUserEditionAction):
- if not users[action.login]:
- # The user we're trying to edit doesn't exist.
-
- raise ValueError("trying to edit user "
- f"{repr(action.login)} while it doesn't exist")
- elif isinstance(action, FingerUserDeletionAction):
- if not users[action.login]:
- # The user we're trying to delete doesn't exist.
+class FingerScenario:
+ """ Scenario for fingerd, consisting of actions (as instances of subclasses
+ of :py:class:`FingerAction`) located at a given timedelta, with
+ a given ending type and time.
- raise ValueError("trying to delete user "
- f"{repr(action.login)} while it doesn't exist")
+ A scenario always uses timedeltas and not datetimes, since it can
+ start at any arbitrary point in time and some scenarios are even
+ on repeat. """
+
+ class EndingType(_Enum):
+ """ Ending type.
- del users[action.login]
- del sessions[action.login]
- elif isinstance(action, FingerUserLoginAction):
- if not users[action.login]:
- # The user we're trying to log in as doesn't exist.
-
- raise ValueError("trying to log in as user "
- f"{repr(action.login)} which doesn't exist")
-
- sessions[action.login][action.session_name] += 1
- elif isinstance(action, FingerUserSessionChangeAction):
- if sessions[action.login][action.session_name] <= 0:
- # The user doesn't exist (anymore?) or the session
- # does not exist.
-
- if users[action.login]:
- raise ValueError("trying to change non existing "
- f"session of user {repr(action.login)}")
- else:
- raise ValueError("trying to change session of "
- f"non-existing user {repr(action.login)}")
- elif isinstance(action, FingerUserLogoutAction):
- if sessions[action.login][action.session_name] <= 0:
- # The user doesn't exist (anymore?) or the session
- # does not exist.
-
- if users[action.login]:
- raise ValueError("trying to delete non existing "
- f"session of user {repr(action.login)}")
- else:
- raise ValueError("trying to delete session of "
- f"non-existing user {repr(action.login)}")
-
- sessions[action.login][action.session_name] -= 1
- except ValueError as e:
- raise ValueError(f"at action #{i} at {_format_delta(time)}: "
- f"{str(e)}") from None
-
- def load(self, path: str) -> None:
- """ Decodes the content of a scenario in TOML format and, if
- successful, stores the result in the current object for
- later usage.
-
- :param path: Path of the TOML file to load. """
-
- global _toml
-
- actions = []
- end_type = None
- end_time = None
-
- # Load the required modules.
-
- if _toml is None:
- try:
- import toml
- except ModuleNotFoundError:
- raise ModuleNotFoundError("'toml' module "
- "required") from None
-
- _toml = toml
- del toml
-
- # Read the document and translate all of the timestamps.
-
- document = _toml.load(path)
- i = 0
-
- for key in document.keys():
- time = _parse_delta(key)
- if time is None:
- raise ValueError(f"found invalid time: {repr(key)}")
-
- if not isinstance(document[key], list):
- raise ValueError(f"time {repr(key)} is not an array, "
- f"you have probably written [{key}] instead of "
- f"[[{key}]]")
-
- for j, data in enumerate(document[key]):
- try:
- typ = data['type']
-
- if typ in ('interrupt', 'freeze', 'stop', 'repeat'):
- # Set the ending type and time.
-
- if end_time is None or end_time > time:
- end_type = {
- 'interrupt': self.EndingType.FREEZE,
- 'freeze': self.EndingType.FREEZE,
- 'stop': self.EndingType.STOP,
- 'repeat': self.EndingType.REPEAT}[typ]
- end_time = time
-
- continue
- elif typ == 'create':
- # User creation.
-
- plan = None
- if 'plan' in data:
- pp = _path.join(_path.dirname(path), data['plan'])
- plan = open(pp).read()
-
- action = FingerUserCreationAction(
- login = data['login'],
- name = data.get('name'),
- shell = data.get('shell'),
- home = data.get('home'),
- office = data.get('office'),
- plan = plan)
- elif typ == 'update':
- # User update.
-
- plan = Unchanged
- if 'plan' in data:
- if data['plan'] is False:
- plan = None
- else:
- pp = _path.join(_path.dirname(path),
- data['plan'])
- plan = open(pp).read()
-
- g = (lambda k: None if k in data
- and data[k] is False
- else data.get(k, Unchanged))
-
- action = FingerUserEditionAction(
- login = data['login'],
- name = g('name'),
- shell = g('shell'),
- home = g('home'),
- office = g('office'),
- plan = plan)
- elif typ == 'delete':
- # User deletion.
-
- action = FingerUserDeletionAction(
- login = data['login'])
- elif typ == 'login':
- # User login.
-
- action = FingerUserLoginAction(
- login = data['login'],
- session_name = data.get('session'),
- line = data.get('line'),
- host = data.get('host'))
- elif typ == 'logout':
- # User logout.
-
- action = FingerUserLogoutAction(
- login = data['login'],
- session_name = data.get('session'))
- elif typ in ('idle', 'active'):
- # Idle change status.
-
- action = FingerUserSessionChangeAction(
- login = data['login'],
- session_name = data.get('session'),
- idle = (typ == 'idle'))
- else:
- raise ValueError(f"invalid action type {repr(typ)}")
-
- actions.append((time, action, i))
- i += 1
- except Exception as e:
- raise ValueError(f"at action #{j + 1} at "
- f"{_format_delta(time)}: {str(e)}") from None
-
- # Sort and check the actions.
-
- actions.sort(key = lambda x: (x[0], x[2]))
-
- if end_type is None:
- # If no ending was given in the script file, we ought to
- # interrupt 10 seconds after the last action.
-
- end_type = self.EndingType.FREEZE
- end_time = actions[-1][0] + _td(seconds = 10)
- else:
- # Otherwise, we are removing actions that are after the ending.
-
- actions = [a for a in actions if a[0] <= end_time]
-
- # FIXME: check that incompatible actions, such as double creation
- # for a user, doesn't occur.
-
- self._end_type = end_type
- self._end_time = end_time
- self._actions = actions
-
- def get(self, to: _Optional[_td] = None,
- since: _Optional[_td] = None) -> _Sequence[FingerAction]:
- """ Returns a sequence of actions in order from the scenario.
-
- :param to: Maximum timedelta for the actions to gather.
- :param since: Minimum timedelta for the actions to gather.
- :return: The sequence of actions that occur and respect
- the given constraints. """
-
- if since is not None and to is not None and since > to:
- raise ValueError(f"`since` ({since}) should be " \
- f"before `to` ({to}).")
-
- for time, action, _ in self._actions:
- if since is not None and since >= time:
- continue
- if to is not None and time > to:
- continue
- if self._end_time is not None and time >= self._end_time:
- continue
- yield time, action
-
- def add(self, action: FingerAction, time: _td):
- """ Add an action in the registered actions. """
-
- try:
- index = max(x[2] for x in self._actions)
- except ValueError:
- index = 0
-
- self._actions.append((time, action, index + 1))
- self._actions.sort(key = lambda x: (x[0], x[2]))
-
- @property
- def type(self) -> EndingType:
- """ Type of action flow, either 'interrupt', 'stop' or 'repeat'. """
-
- return self._end_type
-
- @type.setter
- def type(self, value: EndingType) -> None:
- if value is None:
- self._end_type = None
- return
-
- try:
- value = self.EndingType(value)
- except ValueError:
- try:
- if isinstance(value, str):
- value = value.casefold()
- value = {
- None: None,
- 'interrupt': self.EndingType.FREEZE,
- 'freeze': self.EndingType.FREEZE,
- 'stop': self.EndingType.STOP,
- 'repeat': self.EndingType.REPEAT}[value]
- except KeyError:
- raise TypeError("invalid value for ending type: "
- f"{repr(value)}")
-
- self._end_type = value
-
- @property
- def duration(self) -> _td:
- """ Maximum offset. """
-
- return self._end_time
-
- @duration.setter
- def duration(self, value: _Optional[_td]) -> None:
- if isinstance(value, _td):
- self._end_time = value
- return
- if value is None:
- self._end_time = None
- return
-
- try:
- self._end_time = _td(seconds = value)
- except TypeError:
- raise TypeError(f"Invalid end time value: {repr(value)}")
+ .. py:data:: FREEZE
+
+ Freeze the end state forever.
+
+ .. py:data:: STOP
+
+ Stop the server as soon as the scenario has reached
+ the ending time.
+
+ .. py:data:: REPEAT
+
+ Repeat the scenario from the beginning while
+ starting again from the initial state. """
+
+ FREEZE = 0
+ STOP = 1
+ REPEAT = 2
+
+ def __init__(self):
+ # Initialize the properties.
+
+ self._end_type = None
+ self._end_time = None
+ self._actions = []
+
+ def verify(self) -> None:
+ """ Verify that the current fiction is valid.
+ Raises ValueError in case it is not. """
+
+ # Check if the ending is well defined.
+
+ if self._end_time is None:
+ raise ValueError('ending time (duration) has not been provided')
+ if not isinstance(self._end_type, self.EndingType):
+ raise ValueError('ending type has not been provided')
+
+ # Check if the events are coherent.
+
+ users = _defaultdict(lambda: False)
+ sessions = _defaultdict(lambda: _defaultdict(lambda: 0))
+
+ for i, (time, action, _) in enumerate(self._actions):
+ try:
+ if time >= self._end_time:
+ # Action will be ignored.
+
+ pass
+ elif isinstance(action, FingerUserCreationAction):
+ if users[action.login]:
+ # The user we're trying to create already exists.
+
+ raise ValueError(
+ 'trying to create user '
+ f'{action.login!r} which already exists')
+
+ users[action.login] = True
+ elif isinstance(action, FingerUserEditionAction):
+ if not users[action.login]:
+ # The user we're trying to edit doesn't exist.
+
+ raise ValueError(
+ 'trying to edit user '
+ f"{action.login!r} while it doesn't exist")
+ elif isinstance(action, FingerUserDeletionAction):
+ if not users[action.login]:
+ # The user we're trying to delete doesn't exist.
+
+ raise ValueError(
+ 'trying to delete user '
+ f"{action.login!r} while it doesn't exist")
+
+ del users[action.login]
+ del sessions[action.login]
+ elif isinstance(action, FingerUserLoginAction):
+ if not users[action.login]:
+ # The user we're trying to log in as doesn't exist.
+
+ raise ValueError(
+ 'trying to log in as user '
+ f"{action.login!r} which doesn't exist")
+
+ sessions[action.login][action.session_name] += 1
+ elif isinstance(action, FingerUserSessionChangeAction):
+ if sessions[action.login][action.session_name] <= 0:
+ # The user doesn't exist (anymore?) or the session
+ # does not exist.
+
+ if users[action.login]:
+ raise ValueError(
+ 'trying to change non existing '
+ f'session of user {action.login!r}')
+ else:
+ raise ValueError(
+ 'trying to change session of '
+ f'non-existing user {action.login!r}')
+ elif isinstance(action, FingerUserLogoutAction):
+ if sessions[action.login][action.session_name] <= 0:
+ # The user doesn't exist (anymore?) or the session
+ # does not exist.
+
+ if users[action.login]:
+ raise ValueError(
+ 'trying to delete non existing '
+ f'session of user {action.login!r}')
+ else:
+ raise ValueError(
+ 'trying to delete session of '
+ f'non-existing user {action.login!r}')
+
+ sessions[action.login][action.session_name] -= 1
+ except ValueError as e:
+ raise ValueError(
+ f'at action #{i} at {_format_delta(time)}: '
+ f'{e!s}',
+ ) from None
+
+ def load(self, path: str) -> None:
+ """ Decodes the content of a scenario in TOML format and, if
+ successful, stores the result in the current object for
+ later usage.
+
+ :param path: Path of the TOML file to load. """
+
+ global _toml
+
+ actions = []
+ end_type = None
+ end_time = None
+
+ # Load the required modules.
+
+ if _toml is None:
+ try:
+ import toml
+ except ModuleNotFoundError:
+ raise ModuleNotFoundError(
+ "'toml' module required") from None
+
+ _toml = toml
+ del toml
+
+ # Read the document and translate all of the timestamps.
+
+ document = _toml.load(path)
+ i = 0
+
+ for key in document.keys():
+ time = _parse_delta(key)
+ if time is None:
+ raise ValueError(f'found invalid time: {key!r}')
+
+ if not isinstance(document[key], list):
+ raise ValueError(
+ f'time {key!r} is not an array, '
+ f'you have probably written [{key}] instead of '
+ f'[[{key}]]',
+ )
+
+ for j, data in enumerate(document[key]):
+ try:
+ typ = data['type']
+
+ if typ in ('interrupt', 'freeze', 'stop', 'repeat'):
+ # Set the ending type and time.
+
+ if end_time is None or end_time > time:
+ end_type = {
+ 'interrupt': self.EndingType.FREEZE,
+ 'freeze': self.EndingType.FREEZE,
+ 'stop': self.EndingType.STOP,
+ 'repeat': self.EndingType.REPEAT}[typ]
+ end_time = time
+
+ continue
+ elif typ == 'create':
+ # User creation.
+
+ plan = None
+ if 'plan' in data:
+ pp = _path.join(_path.dirname(path), data['plan'])
+ plan = open(pp).read()
+
+ action = FingerUserCreationAction(
+ login=data['login'],
+ name=data.get('name'),
+ shell=data.get('shell'), # NOQA
+ home=data.get('home'),
+ office=data.get('office'),
+ plan=plan)
+ elif typ == 'update':
+ # User update.
+
+ plan = Unchanged
+ if 'plan' in data:
+ if data['plan'] is False:
+ plan = None
+ else:
+ pp = _path.join(
+ _path.dirname(path), data['plan'])
+ plan = open(pp).read()
+
+ def g(k):
+ if k in data and data[k] is False:
+ return None
+ return data.get(k, Unchanged)
+
+ action = FingerUserEditionAction(
+ login=data['login'],
+ name=g('name'),
+ shell=g('shell'), # NOQA
+ home=g('home'),
+ office=g('office'),
+ plan=plan,
+ )
+ elif typ == 'delete':
+ # User deletion.
+
+ action = FingerUserDeletionAction(
+ login=data['login'])
+ elif typ == 'login':
+ # User login.
+
+ action = FingerUserLoginAction(
+ login=data['login'],
+ session_name=data.get('session'),
+ line=data.get('line'),
+ host=data.get('host'))
+ elif typ == 'logout':
+ # User logout.
+
+ action = FingerUserLogoutAction(
+ login=data['login'],
+ session_name=data.get('session'))
+ elif typ in ('idle', 'active'):
+ # Idle change status.
+
+ action = FingerUserSessionChangeAction(
+ login=data['login'],
+ session_name=data.get('session'),
+ idle=(typ == 'idle'),
+ )
+ else:
+ raise ValueError(f'invalid action type {typ!r}')
+
+ actions.append((time, action, i))
+ i += 1
+ except Exception as e:
+ raise ValueError(
+ f'at action #{j + 1} at '
+ f'{_format_delta(time)}: {e!s}') from None
+
+ # Sort and check the actions.
+
+ actions.sort(key=lambda x: (x[0], x[2]))
+
+ if end_type is None:
+ # If no ending was given in the script file, we ought to
+ # interrupt 10 seconds after the last action.
+
+ end_type = self.EndingType.FREEZE
+ end_time = actions[-1][0] + _td(seconds=10)
+ else:
+ # Otherwise, we are removing actions that are after the ending.
+
+ actions = [a for a in actions if a[0] <= end_time]
+
+ # FIXME: check that incompatible actions, such as double creation
+ # for a user, doesn't occur.
+
+ self._end_type = end_type
+ self._end_time = end_time
+ self._actions = actions
+
+ def get(
+ self,
+ to: _Optional[_td] = None,
+ since: _Optional[_td] = None,
+ ) -> _Sequence[FingerAction]:
+ """ Returns a sequence of actions in order from the scenario.
+
+ :param to: Maximum timedelta for the actions to gather.
+ :param since: Minimum timedelta for the actions to gather.
+ :return: The sequence of actions that occur and respect
+ the given constraints. """
+
+ if since is not None and to is not None and since > to:
+ raise ValueError(
+ f'`since` ({since}) should be before `to` ({to}).')
+
+ for time, action, _ in self._actions:
+ if since is not None and since >= time:
+ continue
+ if to is not None and time > to:
+ continue
+ if self._end_time is not None and time >= self._end_time:
+ continue
+ yield time, action
+
+ def add(self, action: FingerAction, time: _td):
+ """ Add an action in the registered actions. """
+
+ try:
+ index = max(x[2] for x in self._actions)
+ except ValueError:
+ index = 0
+
+ self._actions.append((time, action, index + 1))
+ self._actions.sort(key=lambda x: (x[0], x[2]))
+
+ @property
+ def ending_type(self) -> EndingType:
+ """ Type of action flow, either 'interrupt', 'stop' or 'repeat'. """
+
+ return self._end_type
+
+ @ending_type.setter
+ def ending_type(self, value: EndingType) -> None:
+ if value is None:
+ self._end_type = None
+ return
+
+ try:
+ value = self.EndingType(value)
+ except ValueError:
+ try:
+ if isinstance(value, str):
+ value = value.casefold()
+ value = {
+ None: None,
+ 'interrupt': self.EndingType.FREEZE,
+ 'freeze': self.EndingType.FREEZE,
+ 'stop': self.EndingType.STOP,
+ 'repeat': self.EndingType.REPEAT}[value]
+ except KeyError:
+ raise TypeError(
+ f'invalid value for ending type: {value!r}')
+
+ self._end_type = value
+
+ @property
+ def duration(self) -> _td:
+ """ Maximum offset. """
+
+ return self._end_time
+
+ @duration.setter
+ def duration(self, value: _Optional[_td]) -> None:
+ if isinstance(value, _td):
+ self._end_time = value
+ return
+ if value is None:
+ self._end_time = None
+ return
+
+ try:
+ self._end_time = _td(seconds=value)
+ except TypeError:
+ raise TypeError(f'Invalid end time value: {value!r}')
class FingerScenarioInterface(FingerFictionInterface):
- """ Fiction interface, to follow actions written in a scenario.
-
- Subclasses :py:class:`FingerFictionInterface` and adds
- a regular update method for updating the state according
- to the given scenario.
-
- :param scenario: The scenario to follow using the given interface.
- :param start: The start time at which to start; by default, the
- current time is used. """
-
- def __init__(self, scenario: FingerScenario,
- start: _Optional[_dt] = None):
- """ Initialize the interface. """
-
- if start is None:
- start = _dt.now().astimezone()
-
- super().__init__(start)
-
- # Initialize the object properties.
-
- if isinstance(scenario, FingerScenario):
- scenario.verify()
- scenario = _copy.copy(scenario)
- elif scenario is not None:
- raise TypeError("scenario should be a FingerScenario or None, "
- f"is {scenario.__class__.__name__}.")
-
- # Initialize the object properties.
- # - `scenario`: the script to follow.
- # - `laststart`: the last registered start.
- # - `lastdelta`: the last registered delta.
-
- self._scenario = scenario
- self._start = start
- self._laststart = None
- self._lastdelta = None
-
- @_FingerInterface.regular(seconds = 2)
- def update(self):
- """ Update the state according to the scenario regularly. """
-
- now = _dt.now().astimezone()
-
- # First, get the start and delta.
-
- start = self._start
-
- if (self._scenario is not None and
- now > start + self._scenario.duration):
- if self._scenario.type == FingerScenario.EndingType.STOP:
- exit()
- elif self._scenario.type == FingerScenario.EndingType.FREEZE:
- delta = self._scenario.duration
- else:
- # We're on 'repeat', so we are going to have a slightly
- # different start because we want the start of the new
- # iteration.
- #
- # >>> from datetime import (datetime as dt
- # timedelta as td)
- # >>> a = dt(2000, 1, 1)
- # >>> b = a + td(seconds = 27)
- # >>> (b - a) % td(seconds = 10)
- # datetime.timedelta(seconds=7)
- # >>> b - (b - a) % td(seconds = 10)
- # datetime.datetime(2000, 1, 1, 0, 0, 20)
- #
- # Let's see.
-
- start = now - (now - start) % self._scenario.duration
-
- # We're within the duration of the fiction, so we just use the
- # offset from the start.
-
- delta = now - start
-
- # If last start was not the same or last start isn't initialized,
- # let's start again from a blank slate.
- #
- # Previously we would have kept the events, but that wasn't good.
- # Take the following examples:
- #
- # <first action> at +5s
- # <second action> at +10s
- # repeat at 15s
- #
- # Let's say our start is 01/01/2000 at 00:00:00.
- # First request comes in at 00:00:07.
- # Start is 00:00:00, delta is +7s.
- # We apply <first action> at 00:00:05, and yield the state.
- # Second request comes in at 00:00:26.
- # Start is 00:00:15, delta is +11s.
- #
- # In this case, if we just compared deltas, the second one
- # is further so that means we haven't started again and could
- # just apply the second action at 00:00:25, which means both
- # actions would not be separated by 20s instead of 5s; not
- # the behaviour we want!
- #
- # So we look at if the start is the same or not. The question could
- # also be if we need to still check the deltas, since time
- # necessarily goes forward; but the local time on the system
- # could be set backwards during the period, throwing everything off!
- #
- # The conclusion to all of this is, we need to check both the start
- # and the delta.
-
- if (self._laststart is None or self._lastdelta is None or
- self._laststart != start or self._lastdelta > delta):
- self.reset()
- self._lastdelta = None
-
- # Then, we apply the actions up to the current time.
-
- if self._scenario is not None:
- for time, action in self._scenario.get(to = delta,
- since = self._lastdelta):
- self.apply(action, start + time)
-
- # Finally, we can keep track of where we were.
-
- self._laststart = start
- self._lastdelta = delta
+ """ Fiction interface, to follow actions written in a scenario.
+
+ Subclasses :py:class:`FingerFictionInterface` and adds
+ a regular update method for updating the state according
+ to the given scenario.
+
+ :param scenario: The scenario to follow using the given interface.
+ :param start: The start time at which to start; by default, the
+ current time is used. """
+
+ def __init__(
+ self,
+ scenario: FingerScenario,
+ start: _Optional[_dt] = None,
+ ):
+ """ Initialize the interface. """
+
+ if start is None:
+ start = _dt.now().astimezone()
+
+ super().__init__(start)
+
+ # Initialize the object properties.
+
+ if isinstance(scenario, FingerScenario):
+ scenario.verify()
+ scenario = _copy.copy(scenario)
+ elif scenario is not None:
+ raise TypeError(
+ 'scenario should be a FingerScenario or None, '
+ f'is {scenario.__class__.__name__}.')
+
+ # Initialize the object properties.
+ # - `scenario`: the script to follow.
+ # - `laststart`: the last registered start.
+ # - `lastdelta`: the last registered delta.
+
+ self._scenario = scenario
+ self._start = start
+ self._laststart = None
+ self._lastdelta = None
+
+ @_cron('* * * * * *')
+ def update(self):
+ """ Update the state according to the scenario every second. """
+
+ now = _dt.now().astimezone()
+
+ # First, get the start and delta.
+
+ start = self._start
+
+ if (
+ self._scenario is not None
+ and now > start + self._scenario.duration
+ ):
+ ending_type = self._scenario.ending_type
+ if ending_type == FingerScenario.EndingType.STOP:
+ exit()
+ elif ending_type == FingerScenario.EndingType.FREEZE:
+ delta = self._scenario.duration
+ else:
+ # We're on 'repeat', so we are going to have a slightly
+ # different start because we want the start of the new
+ # iteration.
+ #
+ # >>> from datetime import (datetime as dt
+ # timedelta as td)
+ # >>> a = dt(2000, 1, 1)
+ # >>> b = a + td(seconds = 27)
+ # >>> (b - a) % td(seconds = 10)
+ # datetime.timedelta(seconds=7)
+ # >>> b - (b - a) % td(seconds = 10)
+ # datetime.datetime(2000, 1, 1, 0, 0, 20)
+ #
+ # Let's see.
+
+ start = now - (now - start) % self._scenario.duration
+
+ # We're within the duration of the fiction, so we just use the
+ # offset from the start.
+
+ delta = now - start
+
+ # If last start was not the same or last start isn't initialized,
+ # let's start again from a blank slate.
+ #
+ # Previously we would have kept the events, but that wasn't good.
+ # Take the following examples:
+ #
+ # <first action> at +5s
+ # <second action> at +10s
+ # repeat at 15s
+ #
+ # Let's say our start is 01/01/2000 at 00:00:00.
+ # First request comes in at 00:00:07.
+ # Start is 00:00:00, delta is +7s.
+ # We apply <first action> at 00:00:05, and yield the state.
+ # Second request comes in at 00:00:26.
+ # Start is 00:00:15, delta is +11s.
+ #
+ # In this case, if we just compared deltas, the second one
+ # is further so that means we haven't started again and could
+ # just apply the second action at 00:00:25, which means both
+ # actions would not be separated by 20s instead of 5s; not
+ # the behaviour we want!
+ #
+ # So we look at if the start is the same or not. The question could
+ # also be if we need to still check the deltas, since time
+ # necessarily goes forward; but the local time on the system
+ # could be set backwards during the period, throwing everything off!
+ #
+ # The conclusion to all of this is, we need to check both the start
+ # and the delta.
+
+ if (
+ self._laststart is None or self._lastdelta is None
+ or self._laststart != start or self._lastdelta > delta
+ ):
+ self.reset()
+ self._lastdelta = None
+
+ # Then, we apply the actions up to the current time.
+
+ if self._scenario is not None:
+ for time, action in self._scenario.get(
+ to=delta, since=self._lastdelta,
+ ):
+ self.apply(action, start + time)
+
+ # Finally, we can keep track of where we were.
+
+ self._laststart = start
+ self._lastdelta = delta
# End of file.
diff --git a/fingerd/native.py b/fingerd/native.py
index e1b0b6b..11d981c 100644..100755
--- a/fingerd/native.py
+++ b/fingerd/native.py
@@ -1,26 +1,27 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" Defining the native interface. """
from .core import FingerInterface as _FingerInterface
__all__ = ['FingerNativeInterface']
+
class _FingerNoNativeFoundInterface(_FingerInterface):
- """ Placeholder that doesn't initiate. """
+ """ Placeholder that doesn't initiate. """
- def __init__(self, *args, **kwargs):
- raise NotImplementedError("Could not find a suitable " \
- "native interface.")
+ def __init__(self, *args, **kwargs):
+ raise NotImplementedError(
+ 'Could not find a suitable native interface.',
+ )
-from .posix import FingerPOSIXInterface as FingerNativeInterface
-#try:
-# from .posix import FingerPOSIXInterface as FingerNativeInterface
-#except (ImportError, ModuleNotFoundError):
-# FingerNativeInterface = _FingerNoNativeFoundInterface
+try:
+ from .posix import FingerPOSIXInterface as FingerNativeInterface # NOQA
+except (ImportError, ModuleNotFoundError):
+ FingerNativeInterface = _FingerNoNativeFoundInterface
# End of file.
diff --git a/fingerd/posix.py b/fingerd/posix.py
index 8cb6ace..c626be2 100644..100755
--- a/fingerd/posix.py
+++ b/fingerd/posix.py
@@ -1,155 +1,165 @@
#!/usr/bin/env python3
-#******************************************************************************
+# *****************************************************************************
# Copyright (C) 2017-2018 Thomas "Cakeisalie5" Touhey <thomas@touhey.fr>
# This file is part of the fingerd Python 3.x module, which is MIT-licensed.
-#******************************************************************************
+# *****************************************************************************
""" Make use of the utmp/x file to read the user data.
- This will be done using the `pyutmpx` Python module. """
+ This will be done using the `pyutmpx` Python module. """
-import posix as _posix, pwd as _pwd
+import pwd as _pwd
-from os import stat as _stat
-from os.path import exists as _exists, join as _joinpaths
+from collections.abc import Sequence as _Sequence
from copy import copy as _copy
from datetime import datetime as _dt
from multiprocessing import Lock as _Lock
+from os import stat as _stat
+from os.path import exists as _exists, join as _joinpaths
from typing import Optional as _Optional
-from collections.abc import Sequence as _Sequence
+from pytz import utc as _utc
import pyutmpx as _pyutmpx
-from pytz import utc as _utc
+from .core import (
+ FingerInterface as _FingerInterface,
+ FingerSession as _FingerSession,
+ FingerUser as _FingerUser,
+)
-from .core import (FingerInterface as _FingerInterface,
- FingerUser as _FingerUser, FingerSession as _FingerSession)
+__all__ = ['FingerPOSIXInterface']
-__all__ = ["FingerPOSIXInterface"]
class FingerPOSIXInterface(_FingerInterface):
- """ Native finger interface for POSIX-compliant OSes we know,
- using utmp and wtmp files. """
-
- def __init__(self):
- self._data = []
- self._lastrefreshtime = None
- self._lock = _Lock()
-
- def search_users(self, query: _Optional[str],
- active: _Optional[bool]) -> _Sequence[_FingerUser]:
- """ Look for users given a check. """
-
- self._lock.acquire()
-
- # Refresh the user list if required.
-
- if (self._lastrefreshtime is None or
- abs((self._lastrefreshtime - _dt.now()).total_seconds()) >= 1):
- # The method for gathering users and sessions in POSIX systems
- # is the following:
- #
- # 1. Get the users in /etc/passwd, and check which ones not
- # to make appear through the presence of ``.nofinger``
- # in their home directory.
- # 2. Get the sessions in utmpx, and make them correspond to
- # the related user.
- # 3. For each session, get the idle time by gathering the
- # mtime of the device.
-
- users = {}
- usernames_by_id = {}
-
- for pw in _pwd.getpwall():
- usernames_by_id[pw.pw_uid] = pw.pw_name
-
- if _exists(_joinpaths(pw.pw_dir, '.nofinger')):
- continue
-
- gecos = pw.pw_gecos.split(',')
- user = _FingerUser(
- login = pw.pw_name,
- name = gecos[0])
- user.shell = pw.pw_shell
- user.home = pw.pw_dir
-
- if len(gecos) >= 2:
- user.office = gecos[1]
- try:
- with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan:
- user.plan = plan.read()
- except (FileNotFoundError, PermissionError):
- pass
-
- users[user.login] = user
-
- try:
- lastlog = _pyutmpx.lastlog
- except AttributeError:
- lastlog = None
-
- if lastlog is not None:
- for lle in lastlog:
- try:
- login = usernames_by_id[lle.uid]
- except KeyError:
- continue
-
- try:
- user = users[login]
- except KeyError:
- continue
-
- time = lle.time.replace(tzinfo = _utc)
-
- user.last_login = time
-
- try:
- utmp = _pyutmpx.utmp
- except AttributeError:
- utmp = None
-
- if utmp is not None:
- for ue in utmp:
- if ue.type != _pyutmpx.USER_PROCESS:
- continue
- try:
- user = users[ue.user]
- except KeyError:
- continue
-
- session = _FingerSession(
- time = ue.time.replace(tzinfo = _utc))
- if ue.line:
- session.line = ue.line
- if ue.host:
- session.host = ue.host
-
- session.idle = _dt.now()
- if ue.line and not ue.line.startswith(':'):
- dev_path = (("", "/dev/")[ue.line[0] != '/']
- + ue.line)
- atime = _dt.fromtimestamp(_stat(dev_path).st_atime,
- _utc)
- session.idle = atime
-
- # Add the session to the user.
-
- user.sessions.add(session)
- if (user.last_login is None or
- user.last_login < session.start):
- user.last_login = session.start
-
- # We're done refreshing!
-
- self._data = list(users.values())
- self._lastrefreshtime = _dt.now()
-
- # Get the results.
-
- results = ([_copy(user) for user in self._data
- if not (query is not None and query not in user.login)
- and not (active is not None and active != bool(user.sessions))])
- self._lock.release()
-
- return results
+ """ Native finger interface for POSIX-compliant OSes we know,
+ using utmp and wtmp files. """
+
+ def __init__(self):
+ self._data = []
+ self._lastrefreshtime = None
+ self._lock = _Lock()
+
+ def search_users(
+ self,
+ query: _Optional[str],
+ active: _Optional[bool],
+ ) -> _Sequence[_FingerUser]:
+ """ Look for users given a check. """
+
+ self._lock.acquire()
+
+ # Refresh the user list if required.
+
+ if (
+ self._lastrefreshtime is None
+ or abs((self._lastrefreshtime - _dt.now()).total_seconds()) >= 1
+ ):
+ # The method for gathering users and sessions in POSIX systems
+ # is the following:
+ #
+ # 1. Get the users in /etc/passwd, and check which ones not
+ # to make appear through the presence of ``.nofinger``
+ # in their home directory.
+ # 2. Get the sessions in utmpx, and make them correspond to
+ # the related user.
+ # 3. For each session, get the idle time by gathering the
+ # mtime of the device.
+
+ users = {}
+ usernames_by_id = {}
+
+ for pw in _pwd.getpwall():
+ usernames_by_id[pw.pw_uid] = pw.pw_name
+
+ if _exists(_joinpaths(pw.pw_dir, '.nofinger')):
+ continue
+
+ gecos = pw.pw_gecos.split(',')
+ user = _FingerUser(
+ login=pw.pw_name,
+ name=gecos[0])
+ user.shell = pw.pw_shell
+ user.home = pw.pw_dir
+
+ if len(gecos) >= 2:
+ user.office = gecos[1]
+ try:
+ with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan:
+ user.plan = plan.read()
+ except (FileNotFoundError, PermissionError):
+ pass
+
+ users[user.login] = user
+
+ try:
+ lastlog = _pyutmpx.lastlog
+ except AttributeError:
+ lastlog = None
+
+ if lastlog is not None:
+ for lle in lastlog:
+ try:
+ login = usernames_by_id[lle.uid]
+ except KeyError:
+ continue
+
+ try:
+ user = users[login]
+ except KeyError:
+ continue
+
+ user.last_login = lle.time.replace(tzinfo=_utc)
+
+ try:
+ utmp = _pyutmpx.utmp
+ except AttributeError:
+ utmp = None
+
+ if utmp is not None:
+ for ue in utmp:
+ if ue.type != _pyutmpx.USER_PROCESS:
+ continue
+ try:
+ user = users[ue.user]
+ except KeyError:
+ continue
+
+ session = _FingerSession(
+ time=ue.time.replace(tzinfo=_utc))
+ if ue.line:
+ session.line = ue.line
+ if ue.host:
+ session.host = ue.host
+
+ session.idle = _dt.now()
+ if ue.line and not ue.line.startswith(':'):
+ dev_path = (
+ ('', '/dev/')[ue.line[0] != '/']
+ + ue.line)
+ atime = _dt.fromtimestamp(
+ _stat(dev_path).st_atime, _utc)
+ session.idle = atime
+
+ # Add the session to the user.
+
+ user.sessions.add(session)
+ if (
+ user.last_login is None
+ or user.last_login < session.start
+ ):
+ user.last_login = session.start
+
+ # We're done refreshing!
+
+ self._data = list(users.values())
+ self._lastrefreshtime = _dt.now()
+
+ # Get the results.
+
+ results = ([
+ _copy(user) for user in self._data
+ if not (query is not None and query not in user.login)
+ and not (active is not None and active != bool(user.sessions))])
+ self._lock.release()
+
+ return results
# End of file.
diff --git a/fingerd/version.py b/fingerd/version.py
index c368fc7..7dd1530 100644..100755
--- a/fingerd/version.py
+++ b/fingerd/version.py
@@ -1,10 +1,10 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" fingerd version definition. """
-version = "0.2"
+version = '0.3'
# End of file.
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..9194da0
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,6 @@
+click
+croniter
+python-dateutil
+pytz
+pyutmpx
+toml
diff --git a/scripts/fingerd b/scripts/fingerd
index 28877d2..0e5ce52 100755
--- a/scripts/fingerd
+++ b/scripts/fingerd
@@ -10,6 +10,6 @@ from fingerd.cli import cli as _cli
__all__ = []
if __name__ == '__main__':
- _cli()
+ _cli()
# End of file.
diff --git a/setup.cfg b/setup.cfg
index 28b0393..7676253 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -23,14 +23,20 @@ include_package_data = True
packages = fingerd
scripts =
scripts/fingerd
-install_requires =
- toml
- click
- pytz
- pyutmpx
[options.package_data]
* = *.txt, *.rst
[wheel]
universal = True
+
+[flake8]
+per-file-ignores =
+ tests/*:S101
+rst-roles =
+ py:class
+ py:attr
+ py:data
+ py:meth
+rst-directives =
+ py:data
diff --git a/setup.py b/setup.py
index 5b09a0e..5f19ce4 100755
--- a/setup.py
+++ b/setup.py
@@ -1,19 +1,30 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2018-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" Setup script for the fingerd Python package. """
+import os.path as _path
from setuptools import setup as _setup
kwargs = {}
+# Add requirements using the requirements.txt file.
+
+requirements = set()
+with open(_path.join(_path.dirname(__file__), 'requirements.txt'), 'r') as f:
+ requirements.update(f.read().splitlines())
+
+kwargs['install_requires'] = sorted(filter(lambda x: x, requirements))
+
+# Use Sphinx for building docs.
+
try:
- from sphinx.setup_command import BuildDoc as _BuildDoc
- kwargs['cmdclass'] = {'build_sphinx': _BuildDoc}
+ from sphinx.setup_command import BuildDoc as _BuildDoc
+ kwargs['cmdclass'] = {'build_sphinx': _BuildDoc}
except ImportError:
- pass
+ pass
# Actually, most of the project's data is read from the `setup.cfg` file.
diff --git a/tests/__init__.py b/tests/__init__.py
index 1e6ccc3..c5a4391 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,8 +1,8 @@
#!/usr/bin/env python3
-#**************************************************************************
+# *****************************************************************************
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
+# *****************************************************************************
""" Unit tests for the `fingerd` Python module. """
# This file is only there to indicate that the folder is a module.
diff --git a/tests/test_core.py b/tests/test_core.py
deleted file mode 100644
index dfe6f4a..0000000
--- a/tests/test_core.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python3
-#**************************************************************************
-# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
-# This file is part of the fingerd project, which is MIT-licensed.
-#**************************************************************************
-""" Tests for the fingerd core. """
-
-from fingerd.core import *
-from sys import stderr
-
-def test_logger(capfd):
- l = FingerLogger(stderr)
- l.start('127.0.0.1', '79')
-
- _, err = capfd.readouterr()
- assert err == "Starting fingerd on [127.0.0.1]:79.\n"
-
-# End of file.
diff --git a/tests/test_fingerd.py b/tests/test_fingerd.py
new file mode 100644
index 0000000..69f3669
--- /dev/null
+++ b/tests/test_fingerd.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+# *****************************************************************************
+# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
+# This file is part of the fingerd project, which is MIT-licensed.
+# *****************************************************************************
+""" Tests for the fingerd server. """
+
+import socket
+
+from datetime import timedelta
+from time import sleep
+
+from fingerd.core import FingerServer
+from fingerd.fiction import (
+ FingerScenario, FingerScenarioInterface,
+ FingerUserCreationAction, FingerUserLoginAction, FingerUserLogoutAction,
+)
+
+import pytest
+
+
+class TestFingerConnection:
+ @pytest.fixture
+ def fingerserver(self):
+ scenario = FingerScenario()
+ scenario.ending_type = 'freeze'
+ scenario.duration = timedelta(seconds=5)
+ scenario.add(
+ FingerUserCreationAction(
+ login='john',
+ name='John Doe',
+ home='/home/john',
+ shell='/bin/bash', # NOQA
+ office='84.6',
+ ),
+ timedelta(seconds=-5))
+ scenario.add(
+ FingerUserLoginAction(
+ login='john',
+ line='tty1',
+ ),
+ timedelta(seconds=0))
+ scenario.add(
+ FingerUserLogoutAction(
+ login='john',
+ ),
+ timedelta(seconds=1))
+
+ server = FingerServer(
+ 'localhost:3099',
+ hostname='example.org',
+ interface=FingerScenarioInterface(scenario),
+ )
+ server.start()
+
+ sleep(.1)
+ yield
+
+ server.stop()
+
+ def _send_command(self, command):
+ conn = socket.create_connection(('localhost', 3099))
+ conn.send(command)
+ return conn.recv(1024)
+
+ # ---
+ # Tests.
+ # ---
+
+ def test_no_user_list(self, fingerserver):
+ result = self._send_command(b'user\r\n')
+ assert result == b'No user list available.\r\n'
+
+ def test_existing_user_list(self, fingerserver):
+ result = self._send_command(b'\r\n')
+
+ assert result != b''
+ assert result != b'No user list available.\r\n'
+
+ sleep(2)
+ result = self._send_command(b'\r\n')
+
+ assert result == b'No user list available.\r\n'
+
+# End of file.