aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpyfingerd/__init__.py16
-rwxr-xr-xpyfingerd/__main__.py2
-rw-r--r--pyfingerd/binds.py69
-rwxr-xr-xpyfingerd/cli.py30
-rwxr-xr-xpyfingerd/core.py468
-rwxr-xr-xpyfingerd/errors.py25
-rwxr-xr-xpyfingerd/fiction.py540
-rwxr-xr-xpyfingerd/native.py4
-rwxr-xr-xpyfingerd/posix.py238
-rw-r--r--pyfingerd/utils.py12
-rwxr-xr-xpyfingerd/version.py10
-rwxr-xr-xscripts/pyfingerd2
-rw-r--r--setup.cfg9
-rwxr-xr-xsetup.py10
-rwxr-xr-xtests/__init__.py2
-rw-r--r--tests/test_binds.py52
-rw-r--r--tests/test_scenarios.py8
-rw-r--r--tests/test_server.py34
18 files changed, 714 insertions, 817 deletions
diff --git a/pyfingerd/__init__.py b/pyfingerd/__init__.py
index b6dcfec..7cc405f 100755
--- a/pyfingerd/__init__.py
+++ b/pyfingerd/__init__.py
@@ -3,17 +3,17 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Pure Python finger protocol implementation.
+"""Pure Python finger protocol implementation.
- finger is both a protocol and a utility to get the information and
- status from a user on a distant machine. It was standardized in RFC 742
- in 1977, then in RFC 1288 in 1991, and has been abandoned by most
- people since.
+finger is both a protocol and a utility to get the information and
+status from a user on a distant machine. It was standardized in RFC 742
+in 1977, then in RFC 1288 in 1991, and has been abandoned by most
+people since.
- This Python module is a finger server implementation that allows you
- to give out real information as well as fictional information.
+This Python module is a finger server implementation that allows you
+to give out real information as well as fictional information.
"""
-# Empty module; please import from submodules directly.
+__version__ = '0.4.3'
# End of file.
diff --git a/pyfingerd/__main__.py b/pyfingerd/__main__.py
index a9f8150..b1dcbcc 100755
--- a/pyfingerd/__main__.py
+++ b/pyfingerd/__main__.py
@@ -3,7 +3,7 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Main script of the module. """
+"""Main script of the module."""
from .cli import cli as _cli
diff --git a/pyfingerd/binds.py b/pyfingerd/binds.py
index 4d22c93..dd81e58 100644
--- a/pyfingerd/binds.py
+++ b/pyfingerd/binds.py
@@ -3,11 +3,10 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Binds decoder for the finger server. """
+"""Binds decoder for the finger server."""
import socket as _socket
-
-from typing import Sequence as _Sequence
+import typing as _t
from .errors import InvalidBindError as _InvalidBindError
@@ -18,19 +17,18 @@ __all__ = [
class FingerBind:
- """ Bind address for pyfingerd. """
+ """Bind address for pyfingerd."""
@property
- def runserver_params(self):
- """ Return the data as ``_runserver`` arguments. """
-
- raise NotImplementedError
+ def runserver_params(self) -> _t.Tuple[_socket.AddressFamily, str, int]:
+ """Return the data as ``start_server`` arguments."""
+ raise NotImplementedError()
class FingerTCPv4Bind(FingerBind):
- """ IPv4 TCP Address. """
+ """IPv4 TCP Address."""
- def __init__(self, address, port):
+ def __init__(self, address: str, port: int):
try:
self._addr = _socket.inet_pton(_socket.AF_INET, address)
except Exception:
@@ -39,9 +37,8 @@ class FingerTCPv4Bind(FingerBind):
self._port = port
@property
- def runserver_params(self):
- """ Return the data as `_runserver` parameters. """
-
+ def runserver_params(self) -> _t.Tuple[_socket.AddressFamily, str, int]:
+ """Return the data as ``start_server`` parameters."""
return (
_socket.AF_INET,
_socket.inet_ntop(_socket.AF_INET, self._addr),
@@ -50,9 +47,9 @@ class FingerTCPv4Bind(FingerBind):
class FingerTCPv6Bind(FingerBind):
- """ IPv6 TCP Address. """
+ """IPv6 TCP Address."""
- def __init__(self, address, port):
+ def __init__(self, address: str, port: int):
try:
self._addr = _socket.inet_pton(_socket.AF_INET6, address)
except Exception:
@@ -61,9 +58,8 @@ class FingerTCPv6Bind(FingerBind):
self._port = port
@property
- def runserver_params(self):
- """ Return the data as `_runserver` parameters. """
-
+ def runserver_params(self) -> _t.Tuple[_socket.AddressFamily, str, int]:
+ """Return the data as `start_server` parameters."""
return (
_socket.AF_INET6,
_socket.inet_ntop(_socket.AF_INET6, self._addr),
@@ -72,7 +68,7 @@ class FingerTCPv6Bind(FingerBind):
class FingerBindsDecoder:
- """ Binds decoder for pyfingerd. """
+ """Binds decoder for pyfingerd."""
def __init__(self, proto: str = 'finger'):
proto = proto.casefold()
@@ -81,9 +77,8 @@ class FingerBindsDecoder:
self._proto = proto
- def decode(self, raw: str) -> _Sequence[FingerBind]:
- """ Get binds for the server, using a given string. """
-
+ def decode(self, raw: str) -> _t.Sequence[FingerBind]:
+ """Get binds for the server, using a given string."""
binds = set()
for addr in map(lambda x: x.strip(), raw.split(',')):
@@ -91,16 +86,14 @@ class FingerBindsDecoder:
continue
# Try to find a scheme.
-
scheme, *rest = addr.split(':/')
if not rest:
# No scheme found, let's just guess the scheme based on
# the situation.
-
raw = scheme
scheme = {'finger': 'tcp'}[self._proto]
else:
- # just don't add the ':' of ':/' again
+ # Just don't add the ':' of ':/' again.
raw = '/' + ':/'.join(rest)
if (
@@ -110,11 +103,10 @@ class FingerBindsDecoder:
raise _InvalidBindError(
addr,
f'Unsupported scheme {scheme!r} for '
- f'protocol {self._proto!r}',
+ + f'protocol {self._proto!r}',
)
# Decode the address data.
-
if scheme == 'tcp':
binds.update(self._decode_tcp_host(raw))
@@ -124,8 +116,7 @@ class FingerBindsDecoder:
return f'{self._class__.__name__}()'
def _decode_tcp_host(self, x):
- """ Decode suitable hosts for a TCP bind. """
-
+ """Decode suitable hosts for a TCP bind."""
addrs = ()
addr = x
@@ -133,15 +124,12 @@ class FingerBindsDecoder:
# TODO: decode hosts without the default host.
# Get the host part first, we'll decode it later.
-
if x[0] == '[':
# The host part is an IPv6, look for the closing ']' and
# decode it later.
-
to = x.find(']')
if to < 0:
- raise _InvalidBindError(
- addr, "Expected closing ']'")
+ raise _InvalidBindError(addr, "Expected closing ']'")
host = x[1:to]
x = x[to + 1:]
@@ -150,14 +138,12 @@ class FingerBindsDecoder:
else:
# The host part is either an IPv4 or a host name, look for
# the ':' and decode it later.
-
host, *x = x.split(':')
x = ':' + ':'.join(x)
is_ipv6 = False
# Decode the port part.
-
if x in ('', ':'):
port = 79
elif x[0] == ':':
@@ -167,30 +153,26 @@ class FingerBindsDecoder:
try:
if x[1:] != '':
raise AssertionError('Expected a port number')
+
port = _socket.getservbyname(x[1:])
except Exception:
raise _InvalidBindError(
addr,
'Expected a valid port number or name '
- f'(got {x[1:]!r})',
+ + f'(got {x[1:]!r})',
) from None
else:
- raise _InvalidBindError(
- addr, 'Garbage found after the host',
- )
+ raise _InvalidBindError(addr, 'Garbage found after the host')
# Decode the host part and get the addresses.
-
addrs = ()
if is_ipv6:
# Decode the IPv6 address (validate it using `_socket.inet_pton`).
-
ip6 = host
_socket.inet_pton(_socket.AF_INET6, host)
addrs += (FingerTCPv6Bind(ip6, port),)
else:
# Decode the host (try IPv4, otherwise, resolve domain).
-
try:
ip = host.split('.')
if len(ip) < 2 or len(ip) > 4:
@@ -210,7 +192,8 @@ class FingerBindsDecoder:
entries = _socket.getaddrinfo(
host, port,
proto=_socket.IPPROTO_TCP,
- type=_socket.SOCK_STREAM)
+ type=_socket.SOCK_STREAM,
+ )
for ent in entries:
if (
diff --git a/pyfingerd/cli.py b/pyfingerd/cli.py
index 409b376..2fdb7a6 100755
--- a/pyfingerd/cli.py
+++ b/pyfingerd/cli.py
@@ -3,7 +3,7 @@
# Copyright (C) 2021-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" pyfingerd CLI interface. """
+"""pyfingerd CLI interface."""
from datetime import datetime as _datetime
from platform import (
@@ -13,9 +13,11 @@ from platform import (
from sys import stderr as _stderr
import click as _click
+
import coloredlogs as _coloredlogs
-from . core import (
+from . import __version__ as _version
+from .core import (
FingerInterface as _FingerInterface,
FingerServer as _FingerServer,
)
@@ -25,21 +27,16 @@ from .fiction import (
)
from .native import FingerNativeInterface as _FingerNativeInterface
from .utils import logger as _logger
-from .version import version as _version
__all__ = ['cli']
-@_click.command(
- context_settings={
- 'help_option_names': ['-h', '--help'],
- },
-)
+@_click.command(context_settings={'help_option_names': ['-h', '--help']})
@_click.version_option(
version=_version,
message=(
f'pyfingerd version {_version}, '
- f'running on {_python_impl()} {_python_version()}'
+ + f'running on {_python_impl()} {_python_version()}'
),
)
@_click.option(
@@ -73,7 +70,7 @@ __all__ = ['cli']
envvar=('FINGER_START',),
help=(
'Date and time at which the scenario starts or has started '
- "as an ISO date, if the selected type is 'scenario'."
+ + "as an ISO date, if the selected type is 'scenario'."
),
)
@_click.option(
@@ -82,21 +79,16 @@ __all__ = ['cli']
help='Log level for the displayed messages.',
)
def cli(binds, hostname, type_, scenario, scenario_start, log_level):
- """ Start a finger (RFC 1288) server.
+ """Start a finger (RFC 1288) server.
- Find out more at <https://pyfingerd.touhey.pro/>.
+ Find out more at <https://pyfingerd.touhey.pro/>.
"""
-
- # Set the log level.
-
_coloredlogs.install(
fmt='\r%(asctime)s.%(msecs)03d %(levelname)s %(message)s',
datefmt='%d/%m/%Y %H:%M:%S',
level=log_level.upper(),
)
- # Do everything.
-
hostname = hostname.upper()
type_ = type_.casefold()
@@ -122,9 +114,7 @@ def cli(binds, hostname, type_, scenario, scenario_start, log_level):
else:
iface = _FingerInterface()
- server = _FingerServer(
- binds=binds, hostname=hostname, interface=iface,
- )
+ server = _FingerServer(binds=binds, hostname=hostname, interface=iface)
server.serve_forever()
# End of file.
diff --git a/pyfingerd/core.py b/pyfingerd/core.py
index 0cd995d..6d30db4 100755
--- a/pyfingerd/core.py
+++ b/pyfingerd/core.py
@@ -3,10 +3,10 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Main classes for the finger server, interfaces and formatters.
+"""Main classes for the finger server, interfaces and formatters.
- These classes, which behave as base returning default data,
- are bundled with base definitions for users and sessions.
+These classes, which behave as base returning default data,
+are bundled with base definitions for users and sessions.
"""
import asyncio as _asyncio
@@ -14,12 +14,12 @@ import copy as _copy
import multiprocessing as _multip
import signal as _signal
import string as _string
-
from datetime import datetime as _dt, timedelta as _td, tzinfo as _tzinfo
from errno import errorcode as _errorcode
from typing import Optional as _Optional, Sequence as _Sequence
from croniter import croniter as _croniter
+
from pytz import utc as _utc
from .binds import FingerBindsDecoder as _FingerBindsDecoder
@@ -45,17 +45,16 @@ __all__ = [
def cron(spec: str):
- """ Add a cron specification to the callable.
+ """Add a cron specification to the callable.
- This decorator adds the ``__cron__`` member on the callable,
- as a ``croniter`` instance using the given specification.
+ This decorator adds the ``__cron__`` member on the callable,
+ as a ``croniter`` instance using the given specification.
- This makes the callable identifiable by the finger server when
- starting a server with an interface with such a callable,
- by checking if the attribute exists and starting a dedicated
- coroutine for running it periodically using the given specification.
+ This makes the callable identifiable by the finger server when
+ starting a server with an interface with such a callable,
+ by checking if the attribute exists and starting a dedicated
+ coroutine for running it periodically using the given specification.
"""
-
spec = _croniter(spec)
def decorator(func):
@@ -71,10 +70,10 @@ def cron(spec: str):
class FingerSession:
- """ Representation of an active session for a given user on the system.
+ """Representation of an active session for a given user on the system.
- :param time: The start time of the given session;
- by default, the current datetime.
+ :param time: The start time of the given session;
+ by default, the current datetime.
"""
__slots__ = ('_start', '_line', '_host', '_idle')
@@ -92,17 +91,17 @@ class FingerSession:
p = ('start', 'idle', 'line', 'orig')
p = (
f'{x}={getattr(self, x)!r}'
- for x in p if getattr(self, x) is not None)
+ for x in p if getattr(self, x) is not None
+ )
return f"{self._class__.__name__}({', '.join(p)})"
@property
def start(self) -> _dt:
- """ The timestamp at which the session has started.
+ """Get the timestamp at which the session has started.
- Note that when set, if no timezone is present,
- the datetime is considered UTC.
+ Note that when set, if no timezone is present,
+ the datetime is considered UTC.
"""
-
return self._start
@start.setter
@@ -115,14 +114,13 @@ class FingerSession:
@property
def idle(self) -> _dt:
- """ The timestamp since which the user is idle on the session.
+ """Get the timestamp since which the user is idle on the session.
- Note that when set, if no timezone is present,
- the datetime is considered UTC; also, if the provided
- datetime is before the session start timestamp, it is
- set to it.
+ Note that when set, if no timezone is present,
+ the datetime is considered UTC; also, if the provided
+ datetime is before the session start timestamp, it is
+ set to it.
"""
-
return self._idle
@idle.setter
@@ -138,8 +136,7 @@ class FingerSession:
@property
def line(self) -> _Optional[str]:
- """ The line on which the user is. """
-
+ """Get the line on which the user is."""
return self._line
@line.setter
@@ -148,8 +145,7 @@ class FingerSession:
@property
def host(self) -> _Optional[str]:
- """ The host from which the user is connected. """
-
+ """Get the host from which the user is connected."""
return self._host
@host.setter
@@ -158,15 +154,15 @@ class FingerSession:
class FingerUser:
- """ Representation of a user on the system.
+ """Representation of a user on the system.
- Returned by subclasses of :py:class:`FingerInterface`,
- and used by subclasses of :py:class:`FingerFormatter`.
+ Returned by subclasses of :py:class:`FingerInterface`,
+ and used by subclasses of :py:class:`FingerFormatter`.
- :param login: The login of the user.
- :param name: The display name of the user.
- :param home: The path to the home of the user.
- :param shell: The path to the user's default shell.
+ :param login: The login of the user.
+ :param name: The display name of the user.
+ :param home: The path to the home of the user.
+ :param shell: The path to the user's default shell.
"""
__slots__ = (
@@ -198,16 +194,17 @@ class FingerUser:
def __repr__(self):
p = (
'login', 'name', 'home', 'shell', 'office',
- 'last_login', 'sessions')
+ 'last_login', 'sessions',
+ )
p = (
f'{x}={getattr(self, x)!r}'
- for x in p if getattr(self, x) is not None)
+ for x in p if getattr(self, x) is not None
+ )
return f"{self._class__.__name__}({', '.join(p)})"
@property
def login(self) -> _Optional[str]:
- """ The login name of the user, e.g. 'cake' or 'gaben'. """
-
+ """Get the login name of the user, e.g. 'cake' or 'gaben'."""
return self._login
@login.setter
@@ -216,8 +213,7 @@ class FingerUser:
@property
def name(self) -> _Optional[str]:
- """ The display name of the user, e.g. 'Jean Dupont'. """
-
+ """Get the display name of the user, e.g. 'Jean Dupont'."""
return self._name
@name.setter
@@ -226,25 +222,26 @@ class FingerUser:
@property
def last_login(self) -> _Optional[str]:
- """ The last login date for the user.
+ """Get the last login date for the user.
- Is None if not known.
+ Is None if not known.
"""
-
return self._last_login
@last_login.setter
def last_login(self, value: _Optional[str]) -> None:
- self._last_login = None if value is None else \
- value if isinstance(value, _dt) else _dt(value)
+ self._last_login = (
+ None if value is None
+ else value if isinstance(value, _dt)
+ else _dt(value)
+ )
@property
def home(self) -> _Optional[str]:
- """ The path to the user's home on the given system.
+ """Get the path to the user's home on the given system.
- Is None if not known or defined.
+ Is None if not known or defined.
"""
-
return self._home
@home.setter
@@ -253,11 +250,10 @@ class FingerUser:
@property
def shell(self) -> _Optional[str]:
- """ The path to the user's shell on the given system.
+ """Get the path to the user's shell on the given system.
- Is None if not known or defined.
+ Is None if not known or defined.
"""
-
return self._shell
@shell.setter
@@ -266,11 +262,10 @@ class FingerUser:
@property
def office(self) -> _Optional[str]:
- """ The display name of the user's office.
+ """Get the display name of the user's office.
- Is None if not known or defined.
+ Is None if not known or defined.
"""
-
return self._office
@office.setter
@@ -279,12 +274,11 @@ class FingerUser:
@property
def plan(self) -> _Optional[str]:
- """ The plan of the user.
+ """Get the plan of the user.
- Usually the content of the ``.plan`` file in the user's home
- on real (and kind of obsolete) UNIX-like systems.
+ Usually the content of the ``.plan`` file in the user's home
+ on real (and kind of obsolete) UNIX-like systems.
"""
-
return self._plan
@plan.setter
@@ -297,13 +291,12 @@ class FingerUser:
@property
def sessions(self) -> _Sequence[FingerSession]:
- """ The current sessions array for the user, always defined. """
-
+ """Get the current sessions array for the user, always defined."""
return self._sessions
class _FingerSessionManager:
- """ Session manager. """
+ """Session manager."""
__slots__ = ('_sessions',)
@@ -325,97 +318,101 @@ class _FingerSessionManager:
def __delitem__(self, key):
if key is None:
self._sessions.pop(0)
+ return
+
+ try:
+ idx = next([
+ i for i, x in enumerate(self._sessions[::-1])
+ if x.name == key
+ ])
+ except StopIteration:
+ return
else:
- for i in ([
- i for i, x in enumerate(self._sessions)
- if key == x.name
- ][::-1]):
- self._sessions.pop(i)
+ self._sessions.pop(idx)
def __getitem__(self, key):
if key is None:
try:
return self._sessions[0]
except IndexError:
- raise KeyError('could not get latest session') from None
+ raise KeyError('Could not get latest session') from None
if type(key) is int:
try:
return self._sessions[key]
except IndexError:
- msg = f'could not get session #{key!r}'
- raise IndexError(msg) from None
+ raise IndexError(f'Could not get session #{key!r}') from None
try:
- return next(x for x in self._sessions if key == x.name)
+ return next(x for x in self._sessions if x.name == key)
except StopIteration:
- raise KeyError(f'could not get session {key!r}') from None
+ raise KeyError(f'Could not get session {key!r}') from None
def __setitem__(self, key, value):
if not isinstance(value, FingerSession):
- raise TypeError('can only add sessions into a session manager')
+ raise TypeError('Can only add sessions into a session manager')
+
value = _copy.deepcopy(value)
if key is None:
# Check if except the first session, the key of the session,
# if any, does not override another key.
-
if value.name is not None:
try:
next(
i for i, x in self._sessions[1:]
- if value.name == x.name)
+ if x.name == value.name
+ )
except StopIteration:
pass
else:
- msg = "value.name overrides another session's key"
+ msg = "`value.name` overrides another session's key"
raise ValueError(msg) from None
try:
self._sessions[0] = value
return
except IndexError:
- raise KeyError('could not set latest session') from None
+ raise KeyError('Could not set latest session') from None
if type(key) is int:
# Check if except the key-th session, the key of the session,
# if any, does not override another key.
-
if value.name is not None:
try:
next(
i for i, x in self._sessions
- if i != key and value.name == x.name)
+ if i != key and x.name == value.name
+ )
except StopIteration:
pass
else:
- msg = "value.name overrides another session's key"
+ msg = "`value.name` overrides another session's key"
raise ValueError(msg) from None
try:
self._sessions[key] = value
return
except IndexError:
- msg = f'could not set session #{key!r}'
- raise IndexError(msg) from None
+ raise IndexError(f'Could not set session #{key!r}') from None
value.name = key
try:
i = next(
i for i, x in enumerate(self._sessions)
- if key == x.name)
+ if x.name == key
+ )
except StopIteration:
- raise KeyError(f'could not set session {key!r}') from None
+ raise KeyError(f'Could not set session {key!r}') from None
self._sessions[i] = value
def add(self, session):
- """ Add a session. """
-
+ """Add a session."""
if not isinstance(session, FingerSession):
raise TypeError(
- 'can only insert sessions into a session manager',
+ 'Can only insert sessions into a session manager',
)
self._sessions.insert(0, session)
@@ -426,20 +423,20 @@ class _FingerSessionManager:
class FingerFormatter:
- """ Formatter for :py:class:`FingerServer`.
+ """Formatter for :py:class:`FingerServer`.
- Provides text-formatted (as strings limited to ASCII)
- answers for given queries with given results as objects.
+ Provides text-formatted (as strings limited to ASCII)
+ answers for given queries with given results as objects.
- This class must be subclassed by other formatters.
- Only methods not starting with an underscore are called by
- instances of :py:class:`FingerServer`; others are utilities
- called by these.
+ This class must be subclassed by other formatters.
+ Only methods not starting with an underscore are called by
+ instances of :py:class:`FingerServer`; others are utilities
+ called by these.
- Unless methods are overridden to have a different behaviour,
- this formatter aims at RFC 1288 compliance.
+ Unless methods are overridden to have a different behaviour,
+ this formatter aims at RFC 1288 compliance.
- :param tzinfo: Timezone used for formatting dates and times.
+ :param tzinfo: Timezone used for formatting dates and times.
"""
def __init__(self, tzinfo: _Optional[_tzinfo] = None):
@@ -455,8 +452,7 @@ class FingerFormatter:
# ---
def _format_idle(self, idle: _td) -> str:
- """ Format an idle time delta. """
-
+ """Format an idle time delta."""
def _iter_idle(idle):
days = int(idle.days)
hours = int(idle.seconds / 3600)
@@ -475,8 +471,7 @@ class FingerFormatter:
return f'{" ".join(_iter_idle(idle))} idle'
def _format_time(self, d: _td) -> str:
- """ Format a date and time. """
-
+ """Format a date and time."""
if d < _td():
return ''
@@ -492,39 +487,36 @@ class FingerFormatter:
return ''
def _format_when(self, d: _dt) -> str:
- """ Format a date and time for 'when'. """
-
+ """Format a date and time for 'when'."""
return d.astimezone(self._tzinfo).strftime('%a %H:%M')
def _format_header(self, hostname: str, raw_query: str) -> str:
- """ Return the header of the formatted answer.
+ """Return the header of the formatted answer.
- This header is used for every request,
- except when an error has occurred in the user's query.
+ This header is used for every request,
+ except when an error has occurred in the user's query.
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :return: The header of the formatted answer as text.
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :return: The header of the formatted answer as text.
"""
-
if raw_query:
raw_query = ' ' + raw_query
return (
f'Site: {hostname}\r\n'
- f'Command line:{raw_query}\r\n'
- '\r\n'
+ + f'Command line:{raw_query}\r\n'
+ + '\r\n'
)
def _format_footer(self) -> str:
- """ Return the footer of the formatted answer.
+ """Return the footer of the formatted answer.
- This footer is used for every request,
- except when an error has occurred in the user's query.
+ This footer is used for every request,
+ except when an error has occurred in the user's query.
- :return: The footer of the formatted answer as text.
+ :return: The footer of the formatted answer as text.
"""
-
return ''
# ---
@@ -532,16 +524,15 @@ class FingerFormatter:
# ---
def format_query_error(self, hostname: str, raw_query: str) -> str:
- """ Return the formatted answr for when an error has occurred.
+ """Return the formatted answr for when an error has occurred.
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :return: The formatted answer as text.
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :return: The formatted answer as text.
"""
-
return (
f'Site: {hostname}\r\n'
- 'You have made a mistake in your query!\r\n'
+ + 'You have made a mistake in your query!\r\n'
)
def format_short(
@@ -550,14 +541,13 @@ class FingerFormatter:
raw_query: str,
users: _Sequence[FingerUser],
) -> str:
- """ Return the formatted answer for a user list in the 'short' format.
+ """Return the formatted answer for a user list in the 'short' format.
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :param users: The user list.
- :return: The formatted answer as text.
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :param users: The user list.
+ :return: The formatted answer as text.
"""
-
if not users:
return 'No user list available.\r\n'
@@ -596,7 +586,8 @@ class FingerFormatter:
('TTY',) + tuple(_line(u, s) for u, s in lst),
('Idle',) + tuple(_idle(u, s) for u, s in lst),
('When',) + tuple(_logt(u, s) for u, s in lst),
- ('Office',) + tuple(_offic(u, s) for u, s in lst))
+ ('Office',) + tuple(_offic(u, s) for u, s in lst),
+ )
sizes = tuple(max(map(len, c)) + 1 for i, c in enumerate(columns))
align = ('<', '<', '<', '^', '^', '<')
@@ -620,14 +611,13 @@ class FingerFormatter:
raw_query: str,
users: _Sequence[FingerUser],
) -> str:
- """ Return the formatted answer for a user list in the 'long' format.
+ """Return the formatted answer for a user list in the 'long' format.
- :param hostname: The hostname configured for the server.
- :param raw_query: The raw query given by the user.
- :param users: The user list.
- :return: The formatted answer as text.
+ :param hostname: The hostname configured for the server.
+ :param raw_query: The raw query given by the user.
+ :param users: The user list.
+ :return: The formatted answer as text.
"""
-
if not users:
return 'No user list available.\r\n'
@@ -637,16 +627,15 @@ class FingerFormatter:
for user in users:
res += (
f'Login name: {user.login[:27]:<27} '
- f'Name: {user.name if user.name else user.login}\r\n'
- f'Directory: {user.home[:28] if user.home else "":<28} '
- f'Shell: {user.shell if user.shell else ""}\r\n'
+ + f'Name: {user.name if user.name else user.login}\r\n'
+ + f'Directory: {user.home[:28] if user.home else "":<28} '
+ + f'Shell: {user.shell if user.shell else ""}\r\n'
)
if user.office:
res += f"Office: {user.office if user.office else ''}\r\n"
if user.sessions:
# List current sessions.
-
for se in user.sessions:
since = (
se.start.astimezone(self._tzinfo)
@@ -665,10 +654,10 @@ class FingerFormatter:
res += f' {self._format_idle(idle)}\r\n'
elif user.last_login is not None:
# Show last login.
-
date = (
user.last_login.astimezone(self._tzinfo)
- .strftime('%a %b %e %R'))
+ .strftime('%a %b %e %R')
+ )
tz = self._tzinfo
res += f'Last login {date} ({tz}) on console\r\n'
else:
@@ -694,17 +683,17 @@ class FingerFormatter:
class FingerInterface:
- """ Data source for :py:class:`FingerServer`.
+ """Data source for :py:class:`FingerServer`.
- Provides users and answers for the various queries received
- from the clients by the server.
+ Provides users and answers for the various queries received
+ from the clients by the server.
- This class must be subclassed by other interfaces.
- Only methods not starting with an underscore are called by
- instances of :py:class:`FingerServer`; others are utilities
- called by these.
+ This class must be subclassed by other interfaces.
+ Only methods not starting with an underscore are called by
+ instances of :py:class:`FingerServer`; others are utilities
+ called by these.
- By default, it behaves like a dummy interface.
+ By default, it behaves like a dummy interface.
"""
def __repr__(self):
@@ -716,23 +705,20 @@ class FingerInterface:
host: str,
verbose: bool,
) -> str:
- """ Transmit a user query to a foreign host.
+ """Transmit a user query to a foreign host.
- This function returns the answer formatted by it.
+ This function returns the answer formatted by it.
- If used directly (not overridden by subclasses), this
- method will refuse to transmit finger queries.
+ If used directly (not overridden by subclasses), this
+ method will refuse to transmit finger queries.
- :param query: The user query, set to None in case of
- no query provided by the client.
- :param host: The distant host to which to transmit the
- query.
- :param verbose: Whether the verbose flag (``/W``, long format)
- has been passed by the current client
- or not.
- :return: The answer formatted by the distant server.
+ :param query: The user query, set to None in case of
+ no query provided by the client.
+ :param host: The distant host to which to transmit the query.
+ :param verbose: Whether the verbose flag (``/W``, long format)
+ has been passed by the current client or not.
+ :return: The answer formatted by the distant server.
"""
-
return "This server won't transmit finger queries.\r\n"
def search_users(
@@ -740,16 +726,15 @@ class FingerInterface:
query: _Optional[str],
active: _Optional[bool],
) -> _Sequence[FingerUser]:
- """ Search for users on the current host using the given query.
-
- :param query: The user query, set to None in case of no
- query provided by the client.
- :param active: Whether to get active users (True),
- inactive users (False), or all users (None).
- :return: The list of users found using the query provided
- by the client.
+ """Search for users on the current host using the given query.
+
+ :param query: The user query, set to None in case of no
+ query provided by the client.
+ :param active: Whether to get active users (True),
+ inactive users (False), or all users (None).
+ :return: The list of users found using the query provided
+ by the client.
"""
-
return []
@@ -759,19 +744,19 @@ class FingerInterface:
class _FingerQuery:
- """ A finger query.
+ """A finger query.
- Requests information about connected or specific users on a
- remote server.
+ Requests information about connected or specific users on a
+ remote server.
- There are three types of requests recognized by RFC 1288:
+ There are three types of requests recognized by RFC 1288:
- * {C} is a request for a list of all online users.
- * {Q1} is a request for a local user.
- * {Q2} is a request for a distant user (with hostname).
+ * {C} is a request for a list of all online users.
+ * {Q1} is a request for a local user.
+ * {Q2} is a request for a distant user (with hostname).
- /W means the RUIP (program answering the query) should be more
- verbose (this token can be ignored).
+ /W means the RUIP (program answering the query) should be more
+ verbose (this token can be ignored).
"""
__slots__ = ('line', 'host', 'username', 'verbose')
@@ -779,22 +764,19 @@ class _FingerQuery:
# "By default, this program SHOULD filter any unprintable data,
# leaving only printable 7-bit characters (ASCII 32 through
# ASCII 126), tabs (ASCII 9) and CRLFs."
-
allowed_chars = (
"\t !\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
- + _string.ascii_letters + _string.digits)
+ + _string.ascii_letters + _string.digits
+ )
def __init__(self, raw):
- """ Initialize the query object by decoding the data. """
-
+ """Initialize the query object by decoding the data."""
# Get a character string out of the query.
-
raw = raw.decode('ascii', errors='ignore')
raw = ''.join(c for c in raw if c in self.allowed_chars)
self.line = raw
# Get elements.
-
self.host = None
self.username = None
self.verbose = False
@@ -802,19 +784,22 @@ class _FingerQuery:
if element[0] == '/':
if not element[1:]:
raise _MalformedQueryError(
- raw, "missing feature flags after '/'",
+ raw, "Missing feature flags after '/'",
)
+
for letter in element[1:]:
if letter == 'W':
self.verbose = True
else:
raise _MalformedQueryError(
raw,
- f'unknown feature flag {letter!r}',
+ f'Unknown feature flag {letter!r}',
)
+
continue
elif self.username is not None:
raise _MalformedQueryError(raw, 'multiple query arguments')
+
self.username = element
if self.username is not None and '@' in self.username:
@@ -823,16 +808,16 @@ class _FingerQuery:
class FingerServer:
- """ The main finger server class.
-
- :param binds: The hosts and ports on which the server should
- listen to and answer finger requests.
- :param hostname: The hostname to be included in answers sent
- to clients.
- :param interface: The interface to use for querying
- users and sessions.
- :param formatter: The formatter to use for formatting
- answers sent to clients.
+ """The main finger server class.
+
+ :param binds: The hosts and ports on which the server should
+ listen to and answer finger requests.
+ :param hostname: The hostname to be included in answers sent
+ to clients.
+ :param interface: The interface to use for querying
+ users and sessions.
+ :param formatter: The formatter to use for formatting
+ answers sent to clients.
"""
def __init__(
@@ -844,7 +829,6 @@ class FingerServer:
):
# Check the host name, which should be simple LDH, i.e.
# Letters, Digits, Hyphens.
-
try:
hostname = hostname.upper()
if not all(
@@ -856,63 +840,53 @@ class FingerServer:
raise _HostnameError(hostname)
# Check the interface and formatter classes.
-
if not isinstance(interface, FingerInterface):
raise TypeError(
'Please base your interface '
- 'on the base class provided by the pyfingerd module',
+ + 'on the base class provided by the pyfingerd module',
)
if not isinstance(formatter, FingerFormatter):
raise TypeError(
'Please base your formatter '
- 'on the base class provided by the pyfingerd module',
+ + 'on the base class provided by the pyfingerd module',
)
# Keep the parameters.
-
self._host = hostname
self._interface = interface
self._formatter = formatter
# Check the binds.
-
self._binds = [b for b in _FingerBindsDecoder().decode(binds)]
if not self._binds:
raise _NoBindsError()
# Initialize multi-process related.
-
self._p = None
@property
def hostname(self):
- """ The hostname configured for this server. """
-
+ """Get the hostname configured for this server."""
return self._host
@property
def interface(self):
- """ The interface configured for this server. """
-
+ """Get the interface configured for this server."""
return self._interface
@property
def formatter(self):
- """ The formatter configured for this server. """
-
+ """Get the formatter configured for this server."""
return self._formatter
def _serve(self):
- """ Start servers and serve on the current process. """
-
+ """Start servers and serve on the current process."""
async def handle_finger_connection(inp, outp):
- """ Handle a connection. """
-
+ """Handle a connection."""
src, *_ = inp._transport.get_extra_info('peername')
# Gather the request line.
-
try:
line = await inp.readline()
except ConnectionResetError:
@@ -923,7 +897,6 @@ class FingerServer:
return
# Decode the request.
-
ans = ''
try:
@@ -940,20 +913,22 @@ class FingerServer:
if query.username:
_access_logger.info(
f'{src} requested transmitting user query for '
- f'{query.username!r} at {query.host!r}.',
+ + f'{query.username!r} at {query.host!r}.',
)
else:
_access_logger.info(
f'{src} requested transmitting user query '
- f'to {query.host!r}.',
+ + f'to {query.host!r}.',
)
ans = self.interface.transmit_query(
- query.host, query.username, query.verbose)
+ query.host, query.username, query.verbose,
+ )
else:
if query.username:
users = self.interface.search_users(
- query.username, None)
+ query.username, None,
+ )
_access_logger.info(
f'{src} requested user {query.username!r}: found '
+ (
@@ -975,19 +950,19 @@ class FingerServer:
if query.username or query.verbose:
ans = self.formatter.format_long(
- self.hostname, query.line, users)
+ self.hostname, query.line, users,
+ )
else:
ans = self.formatter.format_short(
- self.hostname, query.line, users)
+ self.hostname, query.line, users,
+ )
# Write the output.
-
ans = '\r\n'.join(ans.splitlines()) + '\r\n'
outp.write(ans.encode('ascii', errors='ignore'))
async def handle_connection(inp, outp):
- """ Handle a new incoming connection. """
-
+ """Handle a new incoming connection."""
try:
await handle_finger_connection(inp, outp)
except Exception:
@@ -998,8 +973,7 @@ class FingerServer:
await outp.wait_closed()
async def start_server(bind):
- """ Start a given server. """
-
+ """Start a given server."""
family, host, port = bind.runserver_params
try:
@@ -1043,8 +1017,7 @@ class FingerServer:
)
async def cron_call(func, spec):
- """ Call a function periodically using a cron specification. """
-
+ """Call a function periodically using a cron specification."""
spec.set_current(_dt.now())
while True:
@@ -1063,11 +1036,9 @@ class FingerServer:
await _asyncio.sleep(seconds)
async def start_servers():
- """ Start the servers. """
-
+ """Start the servers."""
def get_coroutines():
- """ Tasks iterator. """
-
+ """Tasks iterator."""
for bind in self._binds:
yield start_server(bind)
@@ -1088,10 +1059,10 @@ class FingerServer:
tasks = [_asyncio.create_task(co) for co in get_coroutines()]
await _asyncio.wait(
- tasks, return_when=_asyncio.FIRST_COMPLETED)
+ tasks, return_when=_asyncio.FIRST_COMPLETED,
+ )
# If any task has set an exception, we try to catch it.
-
exc = None
for task in tasks:
if exc is None:
@@ -1111,8 +1082,7 @@ class FingerServer:
pass
def start(self) -> None:
- """ Start all underlying server processes and bind all ports. """
-
+ """Start all underlying server processes and bind all ports."""
if self._p is not None and self._p.is_alive():
return
@@ -1120,8 +1090,7 @@ class FingerServer:
self._p.start()
def stop(self) -> None:
- """ Stop all underlying server processes and unbind all ports. """
-
+ """Stop all underlying server processes and unbind all ports."""
if self._p is None or not self._p.is_alive():
return
@@ -1130,13 +1099,12 @@ class FingerServer:
self._p = None
def serve_forever(self) -> None:
- """ Start all servers and serve in a synchronous fashion.
+ """Start all servers and serve in a synchronous fashion.
- It starts all servers :py:meth:`FingerServer.start`, waits for
- an interrupt signal, and stops all servers using
- :py:meth:`FingerServer.stop`.
+ It starts all servers :py:meth:`FingerServer.start`, waits for
+ an interrupt signal, and stops all servers using
+ :py:meth:`FingerServer.stop`.
"""
-
if self._p is not None:
self.start()
@@ -1150,12 +1118,10 @@ class FingerServer:
else:
# If the server hasn't been started on another process
# using ``.start()``, we can just start is on this process.
-
self._serve()
def shutdown(self):
- """ Shutdown the server, alias to `.stop()`. """
-
+ """Shutdown the server, alias to `.stop()`."""
self.stop()
# End of file.
diff --git a/pyfingerd/errors.py b/pyfingerd/errors.py
index 6810df5..b11228d 100755
--- a/pyfingerd/errors.py
+++ b/pyfingerd/errors.py
@@ -3,7 +3,7 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" This file defines the exceptions used throughout the module. """
+"""This file defines the exceptions used throughout the module."""
__all__ = [
'BindError', 'ConfigurationError', 'HostnameError',
@@ -16,45 +16,46 @@ __all__ = [
class ConfigurationError(Exception):
- """ Raised when an invalid configuration option is set. """
+ """Raised when an invalid configuration option is set."""
pass
class HostnameError(ConfigurationError):
- """ Raised when a host name is invalid. """
+ """Raised when a host name is invalid."""
def __init__(self, hostname):
- super().__init__('invalid host name {}.'.format(repr(hostname)))
+ super().__init__(f'Invalid host name {hostname!r}.')
class BindError(ConfigurationError):
- """ Raised when an error has occurred with the provided binds. """
+ """Raised when an error has occurred with the provided binds."""
def __init__(self, msg):
super().__init__(
- f'an error has occurred with the provided binds: {msg}')
+ f'An error has occurred with the provided binds: {msg}',
+ )
class NoBindsError(BindError):
- """ Raised when no binds were provided. """
+ """Raised when no binds were provided."""
def __init__(self):
- super().__init__('no valid bind')
+ super().__init__('No valid bind')
class InvalidBindError(BindError):
- """ Raised when one of the provided binds came out erroneous. """
+ """Raised when one of the provided binds came out erroneous."""
def __init__(self, bind, msg=None):
super().__init__(
- f'one of the provided bind ({bind!r}) '
- f'was invalid{": " + msg if msg else ""}',
+ f'One of the provided bind ({bind!r}) '
+ + f'was invalid{": " + msg if msg else ""}',
)
class MalformedQueryError(Exception):
- """ Raised when a malformed query is received. """
+ """Raised when a malformed query is received."""
def __init__(self, query, msg=None):
self.query = query
diff --git a/pyfingerd/fiction.py b/pyfingerd/fiction.py
index 1dc61b8..db4aff2 100755
--- a/pyfingerd/fiction.py
+++ b/pyfingerd/fiction.py
@@ -3,16 +3,15 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Definitions for the finger server fiction interface.
+"""Definitions for the finger server fiction interface.
- This file contains everything to decode and use the actions file.
+This file contains everything to decode and use the actions file.
"""
import copy as _copy
import logging as _logging
import math as _math
import os.path as _path
-
from collections import defaultdict as _defaultdict
from datetime import datetime as _dt, timedelta as _td
from enum import Enum as _Enum
@@ -50,29 +49,29 @@ _toml = None
class FictionalFingerUser(_FingerUser):
- """ Representation of a user on the fictional system.
+ """Representation of a user on the fictional system.
- Behaves like a :py:class:`pyfingerd.core.FingerUser`.
- For now, there are no modifications from the base class.
+ Behaves like a :py:class:`pyfingerd.core.FingerUser`.
+ For now, there are no modifications from the base class.
"""
pass
class FictionalFingerSession(_FingerSession):
- """ Representation of an active session for a given user.
-
- Behaves like a :py:class:`pyfingerd.core.FingerSession`.
- The two main modifications from the base class are the following:
-
- * Each session has a name, which identifies it. It can be used
- to designate it, allowing outside code to edit and delete the
- session specifically from an outside point of view.
- * Since the actions only allow for setting the user idle or not,
- when the user is active, the idle timestamp is simulated to make
- it seem like the user makes signs of life at an irregular but
- reasonable rate, like when the user often stops typing to think
- or do a task outside of the computer.
+ """Representation of an active session for a given user.
+
+ Behaves like a :py:class:`pyfingerd.core.FingerSession`.
+ The two main modifications from the base class are the following:
+
+ * Each session has a name, which identifies it. It can be used
+ to designate it, allowing outside code to edit and delete the
+ session specifically from an outside point of view.
+ * Since the actions only allow for setting the user idle or not,
+ when the user is active, the idle timestamp is simulated to make
+ it seem like the user makes signs of life at an irregular but
+ reasonable rate, like when the user often stops typing to think
+ or do a task outside of the computer.
"""
__slots__ = ('_name', '_is_idle', '_idle_last')
@@ -94,8 +93,7 @@ class FictionalFingerSession(_FingerSession):
@property
def name(self):
- """ Session name. """
-
+ """Session name."""
return self._name
@name.setter
@@ -104,15 +102,13 @@ class FictionalFingerSession(_FingerSession):
@property
def idle(self):
- """ Idle time (simulated). """
-
+ """Idle time (simulated)."""
if self._is_idle:
return self._idle_last
now = _dt.now().astimezone()
# Generate a number of seconds and return it.
-
def s(x):
return _math.sin(x * (_math.pi / 2))
@@ -124,15 +120,14 @@ class FictionalFingerSession(_FingerSession):
@idle.setter
def idle(self, value):
# Does nothing as we manage this time.
-
pass
@property
def idle_since(self):
- """ Idle since the given time. """
-
+ """Idle since the given time."""
if not self._is_idle:
- return None
+ return
+
return self._idle_last
@idle_since.setter
@@ -142,10 +137,10 @@ class FictionalFingerSession(_FingerSession):
@property
def active_since(self):
- """ Active since the given time. """
-
+ """Active since the given time."""
if self._is_idle:
- return None
+ return
+
return self._idle_last
@active_since.setter
@@ -175,22 +170,22 @@ Unchanged = _UnchangedType()
class FingerAction:
- """ Base class for actions in a fiction. """
+ """Base class for actions in a fiction."""
pass
class FingerUserCreationAction(FingerAction):
- """ A user has been created.
-
- :param login: The login of the user that is being created.
- :param name: The initial value for :py:attr:`FictionalFingerUser.name`.
- :param home: The initial value for :py:attr:`FictionalFingerUser.home`.
- :param shell: The initial value for
- :py:attr:`FictionalFingerUser.shell`.
- :param office: The initial value for
- :py:attr:`FictionalFingerUser.office`.
- :param plan: The initial value for :py:attr:`FictionalFingerUser.plan`.
+ """A user has been created.
+
+ :param login: The login of the user that is being created.
+ :param name: The initial value for :py:attr:`FictionalFingerUser.name`.
+ :param home: The initial value for :py:attr:`FictionalFingerUser.home`.
+ :param shell: The initial value for
+ :py:attr:`FictionalFingerUser.shell`.
+ :param office: The initial value for
+ :py:attr:`FictionalFingerUser.office`.
+ :param plan: The initial value for :py:attr:`FictionalFingerUser.plan`.
"""
def __init__(
@@ -213,60 +208,55 @@ class FingerUserCreationAction(FingerAction):
def __repr__(self):
p = (
f'{x}={getattr(self, x)!r}'
- for x in ('login', 'name', 'home', 'shell', 'office', 'plan'))
+ for x in ('login', 'name', 'home', 'shell', 'office', 'plan')
+ )
return f"{self.__class__.__name__}({', '.join(p)})"
@property
def login(self) -> str:
- """ The login of the user that is being created. """
-
+ """Get the login of the user that is being created."""
return self._login
@property
def name(self) -> _Optional[str]:
- """ The initial value for :py:attr:`FictionalFingerUser.name`. """
-
+ """Get the initial value for :py:attr:`FictionalFingerUser.name`."""
return self._name
@property
def home(self) -> _Optional[str]:
- """ The initial value for :py:attr:`FictionalFingerUser.home`. """
-
+ """Get the initial value for :py:attr:`FictionalFingerUser.home`."""
return self._home
@property
def shell(self) -> _Optional[str]:
- """ The initial value for :py:attr:`FictionalFingerUser.shell`. """
-
+ """Get the initial value for :py:attr:`FictionalFingerUser.shell`."""
return self._shell
@property
def office(self) -> _Optional[str]:
- """ The initial value for :py:attr:`FictionalFingerUser.office`. """
-
+ """Get the initial value for :py:attr:`FictionalFingerUser.office`."""
return self._office
@property
def plan(self) -> _Optional[str]:
- """ The initial value for :py:attr:`FictionalFingerUser.plan`. """
-
+ """Get the initial value for :py:attr:`FictionalFingerUser.plan`."""
return self._plan
class FingerUserEditionAction(FingerAction):
- """ A user has been edited.
-
- :param login: The login of the user that is being edited.
- :param name: The new value for :py:attr:`FictionalFingerUser.name`;
- :py:data:`Unchanged` if the property is unchanged.
- :param home: The new value for :py:attr:`FictionalFingerUser.home`;
- :py:data:`Unchanged` if the property is unchanged.
- :param shell: The new value for :py:attr:`FictionalFingerUser.shell`;
- :py:data:`Unchanged` if the property is unchanged.
- :param office: The new value for :py:attr:`FictionalFingerUser.office`;
- :py:data:`Unchanged` if the property is unchanged.
- :param plan: The new value for :py:attr:`FictionalFingerUser.plan`;
- :py:data:`Unchanged` if the property is unchanged.
+ """A user has been edited.
+
+ :param login: The login of the user that is being edited.
+ :param name: The new value for :py:attr:`FictionalFingerUser.name`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param home: The new value for :py:attr:`FictionalFingerUser.home`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param shell: The new value for :py:attr:`FictionalFingerUser.shell`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param office: The new value for :py:attr:`FictionalFingerUser.office`;
+ :py:data:`Unchanged` if the property is unchanged.
+ :param plan: The new value for :py:attr:`FictionalFingerUser.plan`;
+ :py:data:`Unchanged` if the property is unchanged.
"""
def __init__(
@@ -289,65 +279,60 @@ class FingerUserEditionAction(FingerAction):
def __repr__(self):
p = (
f'{x}={getattr(self, x)!r}'
- for x in ('login', 'name', 'home', 'shell', 'office', 'plan'))
+ for x in ('login', 'name', 'home', 'shell', 'office', 'plan')
+ )
return f"{self.__class__.__name__}({', '.join(p)})"
@property
def login(self) -> str:
- """ The login of the user that is being edited. """
-
+ """Get the login of the user that is being edited."""
return self._login
@property
def name(self) -> _Optional[_Union[str, _UnchangedType]]:
- """ The new value for :py:attr:`FictionalFingerUser.name`.
+ """Get the new value for :py:attr:`FictionalFingerUser.name`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._name
@property
def home(self) -> _Optional[_Union[str, _UnchangedType]]:
- """ The new value for :py:attr:`FictionalFingerUser.home`.
+ """Get the new value for :py:attr:`FictionalFingerUser.home`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._home
@property
def shell(self) -> _Optional[_Union[str, _UnchangedType]]:
- """ The new value for :py:attr:`FictionalFingerUser.shell`.
+ """Get the new value for :py:attr:`FictionalFingerUser.shell`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._shell
@property
def office(self) -> _Optional[_Union[str, _UnchangedType]]:
- """ The new value for :py:attr:`FictionalFingerUser.office`.
+ """Get the new value for :py:attr:`FictionalFingerUser.office`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._office
@property
def plan(self) -> _Optional[_Union[str, _UnchangedType]]:
- """ The new value for :py:attr:`FictionalFingerUser.plan`.
+ """Get the new value for :py:attr:`FictionalFingerUser.plan`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._plan
class FingerUserDeletionAction(FingerAction):
- """ A user has been deleted.
+ """A user has been deleted.
- :param login: The login of the user to delete.
+ :param login: The login of the user to delete.
"""
def __init__(self, login: str):
@@ -360,18 +345,17 @@ class FingerUserDeletionAction(FingerAction):
@property
def login(self) -> str:
- """ The user's login. """
-
+ """Get the user's login."""
return self._login
class FingerUserLoginAction(FingerAction):
- """ A user has logged in.
+ """A user has logged in.
- :param login: The login of the user to edit.
- :param session_name: The name of the session to create.
- :param line: The new value for :py:attr:`FictionalFingerSession.line`.
- :param host: The new value for :py:attr:`FictionalFingerSession.host`.
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to create.
+ :param line: The new value for :py:attr:`FictionalFingerSession.line`.
+ :param host: The new value for :py:attr:`FictionalFingerSession.host`.
"""
def __init__(
@@ -396,37 +380,33 @@ class FingerUserLoginAction(FingerAction):
@property
def login(self) -> str:
- """ The login of the user to edit. """
-
+ """Get the login of the user to edit."""
return self._login
@property
def session_name(self) -> _Optional[str]:
- """ The name of the session to create. """
-
+ """Get the name of the session to create."""
return self._session
@property
def line(self) -> _Optional[str]:
- """ The name of the line from which the user has logged in. """
-
+ """Get the name of the line from which the user has logged in."""
return self._line
@property
def host(self) -> _Optional[str]:
- """ The name of the host from which the user has logged in. """
-
+ """Get the name of the host from which the user has logged in."""
return self._host
class FingerUserSessionChangeAction(FingerAction):
- """ A user session has undergone modifications.
+ """A user session has undergone modifications.
- :param login: The login of the user to edit.
- :param session_name: The name of the session to edit.
- :param is_idle: The new value for
- :py:attr:`FictionalFingerSession.is_idle`;
- :py:data:`Unchanged` if the property is unchanged.
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to edit.
+ :param is_idle: The new value for
+ :py:attr:`FictionalFingerSession.is_idle`;
+ :py:data:`Unchanged` if the property is unchanged.
"""
def __init__(
@@ -449,31 +429,28 @@ class FingerUserSessionChangeAction(FingerAction):
@property
def login(self) -> str:
- """ The login of the user to edit. """
-
+ """Get the login of the user to edit."""
return self._login
@property
def session_name(self) -> _Optional[str]:
- """ The name of the session to edit. """
-
+ """Get the name of the session to edit."""
return self._session
@property
def idle(self) -> _Union[bool, _UnchangedType]:
- """ The new value for :py:attr:`FictionalFingerSession.is_idle`.
+ """Get the new value for :py:attr:`FictionalFingerSession.is_idle`.
- Is :py:data:`Unchanged` if the property is unchanged.
+ Is :py:data:`Unchanged` if the property is unchanged.
"""
-
return self._idle
class FingerUserLogoutAction(FingerAction):
- """ A user has logged out.
+ """A user has logged out.
- :param login: The login of the user to edit.
- :param session_name: The name of the session to delete.
+ :param login: The login of the user to edit.
+ :param session_name: The name of the session to delete.
"""
def __init__(
@@ -494,14 +471,12 @@ class FingerUserLogoutAction(FingerAction):
@property
def login(self) -> str:
- """ The login of the user to edit. """
-
+ """Get the login of the user to edit."""
return self._login
@property
def session_name(self) -> _Optional[str]:
- """ The name of the session to delete. """
-
+ """Get the name of the session to delete."""
return self._session
@@ -511,18 +486,18 @@ class FingerUserLogoutAction(FingerAction):
class FingerFictionInterface(_FingerInterface):
- """ Base finger fiction interface for managing a scene.
-
- The basic state for this class is to have no users; it is possible
- at any point in time to apply actions that will add, remove or
- modify users and sessions, using
- :py:meth:`FingerFictionInterface.apply`.
-
- This class should be subclassed for interfaces specialized in various
- sources for the data; for example, while
- :py:class:`FingerScenarioInterface` is specialized in using a
- static sequence of actions, another class could read events from
- a live source.
+ """Base finger fiction interface for managing a scene.
+
+ The basic state for this class is to have no users; it is possible
+ at any point in time to apply actions that will add, remove or
+ modify users and sessions, using
+ :py:meth:`FingerFictionInterface.apply`.
+
+ This class should be subclassed for interfaces specialized in various
+ sources for the data; for example, while
+ :py:class:`FingerScenarioInterface` is specialized in using a
+ static sequence of actions, another class could read events from
+ a live source.
"""
def __init__(self):
@@ -532,7 +507,6 @@ class FingerFictionInterface(_FingerInterface):
# - `users`: the users.
# - `lasttime`: the last datetime, in order to raise an
# exception if not applied in order.
-
self._users = {}
self._lasttime = None
@@ -545,48 +519,45 @@ class FingerFictionInterface(_FingerInterface):
query: _Optional[str],
active: _Optional[bool],
) -> _Sequence[FictionalFingerUser]:
- """ Look for users according to a check. """
-
+ """Look for users according to a check."""
return ([
_copy.deepcopy(user) for user in self._users.values()
if not (query is not None and query not in user.login)
- and not (active is not None and active != bool(user.sessions))])
+ and not (active is not None and active != bool(user.sessions))
+ ])
# ---
# Elements proper to the fiction interface.
# ---
def reset(self):
- """ Reset the interface, i.e. revert all actions.
+ """Reset the interface, i.e. revert all actions.
- This method makes the interface return to the original
- state with no users and sessions.
+ This method makes the interface return to the original
+ state with no users and sessions.
"""
-
self._users = {}
self._lasttime = None
def apply(self, action, time: _Optional[_dt] = None):
- """ Apply an action to the scene.
+ """Apply an action to the scene.
- By default, the time of the action is the current time.
+ By default, the time of the action is the current time.
"""
-
if time is None:
time = _dt.now().astimezone()
if self._lasttime is not None and self._lasttime > time:
- raise ValueError("operations weren't applied in order!")
+ raise ValueError("Operations weren't applied in order!")
self._lasttime = time
if isinstance(action, FingerUserCreationAction):
# Create user `action.user`.
-
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
if action.login in self._users:
- raise ValueError('already got a user with that login')
+ raise ValueError('Already got a user with that login')
user = FictionalFingerUser(login=action.login, name=action.name)
user.shell = action.shell
@@ -597,12 +568,11 @@ class FingerFictionInterface(_FingerInterface):
self._users[user.login] = user
elif isinstance(action, FingerUserEditionAction):
# Edit user `action.user` with the given modifications.
-
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
if action.login not in self._users:
raise ValueError(
- f'got no user with login {action.login!r}',
+ f'Got no user with login {action.login!r}',
)
user = self._users[action.login]
@@ -618,18 +588,16 @@ class FingerFictionInterface(_FingerInterface):
user.plan = action.plan
elif isinstance(action, FingerUserDeletionAction):
# Delete user with login `action.login`.
-
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
if action.login not in self._users:
raise ValueError(
- f'got no user with login {action.login!r}',
+ f'Got no user with login {action.login!r}',
)
del self._users[action.login]
elif isinstance(action, FingerUserLoginAction):
# Login as user `action.login` with session `action.session_name`.
-
session = FictionalFingerSession(
time=time,
name=action.session_name,
@@ -639,61 +607,60 @@ class FingerFictionInterface(_FingerInterface):
session.host = action.host
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
try:
user = self._users[action.login]
except KeyError:
raise ValueError(
- f'got no user with login {action.login!r}',
+ f'Got no user with login {action.login!r}',
) from None
# We don't check if the session exists or not; multiple
# sessions can have the same name, we just act on the last
# inserted one that still exists and has that name.
-
user.sessions.add(session)
if user.last_login is None or user.last_login < session.start:
user.last_login = session.start
elif isinstance(action, FingerUserLogoutAction):
- # Logout as user `action.login` from session `action.session_name`.
-
+ # Logout as user `action.login` from
+ # session `action.session_name`.
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
try:
user = self._users[action.login]
except KeyError:
raise ValueError(
- f'got no user with login {action.login!r}',
+ f'Got no user with login {action.login!r}',
) from None
try:
del user.sessions[action.session_name]
except (KeyError, IndexError):
raise ValueError(
- f'got no session {action.name!r} '
- f'for user {action.login!r}') from None
+ f'Got no session {action.name!r} '
+ f'for user {action.login!r}',
+ ) from None
elif isinstance(action, FingerUserSessionChangeAction):
# Make user with login `action.login` idle.
-
if action.login is None:
- raise ValueError('missing login')
+ raise ValueError('Missing login')
try:
user = self._users[action.login]
except KeyError:
raise ValueError(
- 'got no user with login '
- f'{action.login!r}',
+ f'Got no user with login {action.login!r}',
) from None
try:
session = user.sessions[action.session_name]
except (KeyError, IndexError):
raise ValueError(
- f'got no session {action.name!r} '
- f'for user {action.login!r}') from None
+ f'Got no session {action.name!r} '
+ f'for user {action.login!r}',
+ ) from None
since = time
if action.idle is Unchanged:
@@ -710,32 +677,32 @@ class FingerFictionInterface(_FingerInterface):
class FingerScenario:
- """ Scenario representation for the fictional interface.
+ """Scenario representation for the fictional interface.
- Consists of actions (as instances of subclasses of
- :py:class:`FingerAction`) located at a given timedelta, with
- a given ending type and time.
+ Consists of actions (as instances of subclasses of
+ :py:class:`FingerAction`) located at a given timedelta, with
+ a given ending type and time.
- A scenario always uses timedeltas and not datetimes, since it can
- start at any arbitrary point in time and some scenarios are even
- on repeat.
+ A scenario always uses timedeltas and not datetimes, since it can
+ start at any arbitrary point in time and some scenarios are even
+ on repeat.
"""
class EndingType(_Enum):
- """ Ending type, i.e. what happens when the scenario comes to an end.
+ """Ending type, i.e. what happens when the scenario comes to an end.
- .. py:data:: FREEZE
+ .. py:data:: FREEZE
- Freeze the end state forever.
+ Freeze the end state forever.
- .. py:data:: STOP
+ .. py:data:: STOP
- Stop the server as soon as the scenario has reached an end.
+ Stop the server as soon as the scenario has reached an end.
- .. py:data:: REPEAT
+ .. py:data:: REPEAT
- Repeat the scenario from the beginning while
- starting again from the initial state.
+ Repeat the scenario from the beginning while
+ starting again from the initial state.
"""
FREEZE = 0
@@ -743,16 +710,13 @@ class FingerScenario:
REPEAT = 2
def __init__(self):
- # Initialize the properties.
-
self._end_type = None
self._end_time = None
self._actions = []
@property
def ending_type(self) -> EndingType:
- """ Ending type of the scenario, as an :py:data:`EndingType`. """
-
+ """Get the ending type of the scenario."""
return self._end_type
@ending_type.setter
@@ -772,22 +736,23 @@ class FingerScenario:
'interrupt': self.EndingType.FREEZE,
'freeze': self.EndingType.FREEZE,
'stop': self.EndingType.STOP,
- 'repeat': self.EndingType.REPEAT}[value]
+ 'repeat': self.EndingType.REPEAT,
+ }[value]
except KeyError:
raise TypeError(
- f'invalid value for ending type: {value!r}')
+ f'Invalid value for ending type: {value!r}',
+ )
self._end_type = value
@property
def duration(self) -> _Optional[_td]:
- """ Offset of the ending.
+ """Offset of the ending.
- When the offset is reached, any object following
- the scenario should act out the ending type defined
- in :py:attr:`ending_type`.
+ When the offset is reached, any object following
+ the scenario should act out the ending type defined
+ in :py:attr:`ending_type`.
"""
-
return self._end_time
@duration.setter
@@ -795,32 +760,29 @@ class FingerScenario:
self._end_time = _make_delta(value, allow_none=True)
def verify(self) -> None:
- """ Verify that the current scenario is valid.
+ """Verify that the current scenario is valid.
- This function does the following checks on the scenario:
+ This function does the following checks on the scenario:
- * The ending type and time (duration) are well defined.
- * Any user edition or deletion event happens when the
- related user exists.
- * Any session creation, edition or deletion happens on
- a user who exists at that point in time.
- * Any session edition or deletion happens when the
- related session exists.
+ * The ending type and time (duration) are well defined.
+ * Any user edition or deletion event happens when the
+ related user exists.
+ * Any session creation, edition or deletion happens on
+ a user who exists at that point in time.
+ * Any session edition or deletion happens when the
+ related session exists.
- Any action defined after the ending time is ignored.
+ Any action defined after the ending time is ignored.
- :raise ValueError: if the current scenario is invalid.
+ :raise ValueError: whether the current scenario is invalid.
"""
-
# Check if the ending is well defined.
-
if self._end_time is None:
- raise ValueError('ending time (duration) has not been provided')
+ raise ValueError('Ending time (duration) has not been provided')
if not isinstance(self._end_type, self.EndingType):
- raise ValueError('ending type has not been provided')
+ raise ValueError('Ending type has not been provided')
# Check if the events are coherent.
-
users = _defaultdict(lambda: False)
sessions = _defaultdict(lambda: _defaultdict(lambda: 0))
@@ -828,31 +790,30 @@ class FingerScenario:
try:
if time >= self._end_time:
# Action will be ignored.
-
pass
elif isinstance(action, FingerUserCreationAction):
if users[action.login]:
# The user we're trying to create already exists.
-
raise ValueError(
- 'trying to create user '
- f'{action.login!r} which already exists')
+ 'Trying to create user '
+ + f'{action.login!r} which already exists',
+ )
users[action.login] = True
elif isinstance(action, FingerUserEditionAction):
if not users[action.login]:
# The user we're trying to edit doesn't exist.
-
raise ValueError(
- 'trying to edit user '
- f"{action.login!r} while it doesn't exist")
+ 'Trying to edit user '
+ f"{action.login!r} while it doesn't exist",
+ )
elif isinstance(action, FingerUserDeletionAction):
if action.login not in users:
# The user we're trying to delete doesn't exist.
-
raise ValueError(
- 'trying to delete user '
- f"{action.login!r} while it doesn't exist")
+ 'Trying to delete user '
+ f"{action.login!r} while it doesn't exist",
+ )
del users[action.login]
try:
@@ -862,56 +823,58 @@ class FingerScenario:
elif isinstance(action, FingerUserLoginAction):
if action.login not in users:
# The user we're trying to log in as doesn't exist.
-
raise ValueError(
- 'trying to log in as user '
- f"{action.login!r} which doesn't exist")
+ 'Trying to log in as user '
+ f"{action.login!r} which doesn't exist",
+ )
sessions[action.login][action.session_name] += 1
elif isinstance(action, FingerUserSessionChangeAction):
if sessions[action.login][action.session_name] <= 0:
# The user doesn't exist (anymore?) or the session
# does not exist.
-
if users[action.login]:
raise ValueError(
- 'trying to change non existing '
- f'session of user {action.login!r}')
+ 'Trying to change non existing '
+ + f'session of user {action.login!r}',
+ )
else:
raise ValueError(
- 'trying to change session of '
- f'non-existing user {action.login!r}')
+ 'Trying to change session of '
+ + f'non-existing user {action.login!r}',
+ )
elif isinstance(action, FingerUserLogoutAction):
if sessions[action.login][action.session_name] <= 0:
# The user doesn't exist (anymore?) or the session
# does not exist.
-
if users[action.login]:
raise ValueError(
- 'trying to delete non existing '
- f'session of user {action.login!r}')
+ 'Trying to delete non existing '
+ + f'session of user {action.login!r}',
+ )
else:
raise ValueError(
- 'trying to delete session of '
- f'non-existing user {action.login!r}')
+ 'Trying to delete session of '
+ + f'non-existing user {action.login!r}',
+ )
sessions[action.login][action.session_name] -= 1
except ValueError as e:
+ msg = str(e)
+ msg = msg[0].lower() + msg[1:]
raise ValueError(
- f'at action #{i} at {_format_delta(time)}: '
- f'{e!s}',
+ f'At action #{i} at {_format_delta(time)}: {msg}',
) from None
@classmethod
def load(cls, path: str):
- """ Load a scenario from a TOML file.
+ """Load a scenario from a TOML file.
- Decodes the content of a scenario in TOML format and, if
- successful, returns the result as an instance of FingerScenario.
+ Decodes the content of a scenario in TOML format and, if
+ successful, returns the result as an instance of FingerScenario.
- :param path: Path of the TOML file to load.
+ :param path: Path of the TOML file to load.
"""
-
global _toml
actions = []
@@ -921,7 +884,6 @@ class FingerScenario:
_logger.debug(f'Loading scenario from {path!r}.')
# Load the required modules.
-
if _toml is None:
try:
import toml
@@ -933,29 +895,26 @@ class FingerScenario:
del toml
# Read the document and translate all of the timestamps.
-
document = _toml.load(path)
i = 0
for key in document.keys():
time = _parse_delta(key)
if time is None:
- raise ValueError(f'found invalid time: {key!r}')
+ raise ValueError(f'Found invalid time: {key!r}')
if not isinstance(document[key], list):
raise ValueError(
- f'time {key!r} is not an array, '
- f'you have probably written [{key}] instead of '
- f'[[{key}]]',
+ f'Time {key!r} is not an array, '
+ + f'you have probably written [{key}] instead of '
+ + f'[[{key}]]',
)
for j, data in enumerate(document[key]):
try:
typ = data['type']
-
if typ in ('interrupt', 'freeze', 'stop', 'repeat'):
# Set the ending type and time.
-
if end_time is None or end_time > time:
end_type = {
'interrupt': cls.EndingType.FREEZE,
@@ -967,7 +926,6 @@ class FingerScenario:
continue
elif typ == 'create':
# User creation.
-
plan = None
if 'plan' in data:
pp = _path.join(_path.dirname(path), data['plan'])
@@ -979,17 +937,19 @@ class FingerScenario:
shell=data.get('shell'), # NOQA
home=data.get('home'),
office=data.get('office'),
- plan=plan)
+ plan=plan,
+ )
elif typ == 'update':
# User update.
-
plan = Unchanged
if 'plan' in data:
if data['plan'] is False:
plan = None
else:
pp = _path.join(
- _path.dirname(path), data['plan'])
+ _path.dirname(path),
+ data['plan'],
+ )
plan = open(pp).read()
def g(k):
@@ -1007,45 +967,43 @@ class FingerScenario:
)
elif typ == 'delete':
# User deletion.
-
action = FingerUserDeletionAction(
- login=data['login'])
+ login=data['login'],
+ )
elif typ == 'login':
# User login.
-
action = FingerUserLoginAction(
login=data['login'],
session_name=data.get('session'),
line=data.get('line'),
- host=data.get('host'))
+ host=data.get('host'),
+ )
elif typ == 'logout':
# User logout.
-
action = FingerUserLogoutAction(
login=data['login'],
session_name=data.get('session'),
)
elif typ in ('idle', 'active'):
# Idle change status.
-
action = FingerUserSessionChangeAction(
login=data['login'],
session_name=data.get('session'),
idle=(typ == 'idle'),
)
else:
- raise ValueError(f'invalid action type {typ!r}')
+ raise ValueError(f'Invalid action type {typ!r}')
actions.append((time, action, i))
i += 1
except Exception as e:
+ msg = str(e)
+ msg = msg[0].lower() + msg[1:]
raise ValueError(
- f'at action #{j + 1} at '
- f'{_format_delta(time)}: {e!s}',
+ f'At action #{j + 1} at {_format_delta(time)}: {msg}',
) from None
# Sort and check the actions.
-
_logger.debug(
f'Loaded {len(actions)} action{("", "s")[len(actions) >= 2]}.',
)
@@ -1053,7 +1011,6 @@ class FingerScenario:
if end_type is None:
# If no ending was given in the script file, we ought to
# interrupt 10 seconds after the last action.
-
try:
last_time = max(actions, key=lambda x: (x[0], x[2]))[0]
except ValueError:
@@ -1075,17 +1032,17 @@ class FingerScenario:
to: _Optional[_td] = None,
since: _Optional[_td] = None,
) -> _Sequence[FingerAction]:
- """ Return a sequence of actions in order from the scenario.
+ """Return a sequence of actions in order from the scenario.
- :param to: Maximum timedelta for the actions to gather.
- :param since: Minimum timedelta for the actions to gather.
- :return: The sequence of actions that occur and respect
- the given constraints.
+ :param to: Maximum timedelta for the actions to gather.
+ :param since: Minimum timedelta for the actions to gather.
+ :return: The sequence of actions that occur and respect
+ the given constraints.
"""
-
if since is not None and to is not None and since > to:
raise ValueError(
- f'`since` ({since}) should be before `to` ({to}).')
+ f'`since` ({since}) should be before `to` ({to}).',
+ )
for time, action, _ in self._actions:
if since is not None and since >= time:
@@ -1097,8 +1054,7 @@ class FingerScenario:
yield time, action
def add(self, action: FingerAction, time: _Union[_td, str]):
- """ Add an action at the given time to the registered actions. """
-
+ """Add an action at the given time to the registered actions."""
time = _make_delta(time)
try:
@@ -1111,15 +1067,15 @@ class FingerScenario:
class FingerScenarioInterface(FingerFictionInterface):
- """ Fiction interface, to follow actions written in a scenario.
+ """Fiction interface, to follow actions written in a scenario.
- Subclasses :py:class:`FingerFictionInterface` and adds
- a regular update method for updating the state according
- to the given scenario.
+ Subclasses :py:class:`FingerFictionInterface` and adds
+ a regular update method for updating the state according
+ to the given scenario.
- :param scenario: The scenario to follow using the given interface.
- :param start: The start time at which the scenario is supposed to
- have started; by default, the current time is used.
+ :param scenario: The scenario to follow using the given interface.
+ :param start: The start time at which the scenario is supposed to
+ have started; by default, the current time is used.
"""
def __init__(
@@ -1127,8 +1083,6 @@ class FingerScenarioInterface(FingerFictionInterface):
scenario: FingerScenario,
start: _Optional[_dt] = None,
):
- """ Initialize the interface. """
-
if start is None:
start = _dt.now()
if start.tzinfo is None:
@@ -1137,11 +1091,11 @@ class FingerScenarioInterface(FingerFictionInterface):
super().__init__()
# Initialize the object properties.
-
if not isinstance(scenario, FingerScenario):
raise TypeError(
- 'scenario should be a FingerScenario, '
- f'is {scenario.__class__.__name__}.')
+ 'Scenario should be a FingerScenario, '
+ f'is {scenario.__class__.__name__}.',
+ )
scenario.verify()
scenario = _copy.copy(scenario)
@@ -1150,7 +1104,6 @@ class FingerScenarioInterface(FingerFictionInterface):
# - `scenario`: the script to follow.
# - `laststart`: the last registered start.
# - `lastdelta`: the last registered delta.
-
self._scenario = scenario
self._start = start
self._laststart = None
@@ -1158,14 +1111,12 @@ class FingerScenarioInterface(FingerFictionInterface):
@_cron('* * * * * *')
def update(self):
- """ Update the state according to the scenario every second. """
-
+ """Update the state according to the scenario every second."""
now = _dt.now().astimezone()
start = self._laststart or self._start
# Check if we have gone back in time, e.g. if the system time
# has changed, and just start again.
-
if self._lastdelta is not None and now < start + self._lastdelta:
_logger.debug('We seem to have gone back in time!')
_logger.debug("Let's start again from a clean slate.")
@@ -1176,7 +1127,6 @@ class FingerScenarioInterface(FingerFictionInterface):
self.reset()
# Check if we have reached an ending.
-
if now > start + self._scenario.duration:
ending_type = self._scenario.ending_type
if ending_type == FingerScenario.EndingType.STOP:
@@ -1199,7 +1149,6 @@ class FingerScenarioInterface(FingerFictionInterface):
# datetime.datetime(2000, 1, 1, 0, 0, 20)
#
# Let's see.
-
start = now - (now - start) % self._scenario.duration
self.reset()
@@ -1210,11 +1159,9 @@ class FingerScenarioInterface(FingerFictionInterface):
# We're within the duration of the fiction, so we just use the
# offset from the start.
-
delta = now - start
# Then, we apply the actions up to the current time.
-
actions = self._scenario.get(to=delta, since=self._lastdelta)
if _logger.getEffectiveLevel() <= _logging.DEBUG:
@@ -1231,7 +1178,6 @@ class FingerScenarioInterface(FingerFictionInterface):
self.apply(action, start + time)
# Finally, we can keep track of where we were.
-
self._laststart = start
self._lastdelta = delta
diff --git a/pyfingerd/native.py b/pyfingerd/native.py
index 8fbdd34..dfa03e4 100755
--- a/pyfingerd/native.py
+++ b/pyfingerd/native.py
@@ -3,7 +3,7 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Defining the native interface. """
+"""Defining the native interface."""
from .core import FingerInterface as _FingerInterface
@@ -11,7 +11,7 @@ __all__ = ['FingerNativeInterface']
class _FingerNoNativeFoundInterface(_FingerInterface):
- """ Placeholder that doesn't initiate. """
+ """Placeholder that doesn't initiate."""
def __init__(self, *args, **kwargs):
raise NotImplementedError(
diff --git a/pyfingerd/posix.py b/pyfingerd/posix.py
index 265fa06..98ccb2a 100755
--- a/pyfingerd/posix.py
+++ b/pyfingerd/posix.py
@@ -3,13 +3,12 @@
# Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd Python 3.x module, which is MIT-licensed.
# *****************************************************************************
-""" Make use of the utmp/x file to read the user data.
+"""Make use of the utmp/x file to read the user data.
- This will be done using the `pyutmpx` Python module.
+This will be done using the `pyutmpx` Python module.
"""
import pwd as _pwd
-
from copy import copy as _copy
from datetime import datetime as _dt
from multiprocessing import Lock as _Lock
@@ -18,6 +17,7 @@ from os.path import exists as _exists, join as _joinpaths
from typing import Optional as _Optional, Sequence as _Sequence
from pytz import utc as _utc
+
import pyutmpx as _pyutmpx
from .core import (
@@ -30,10 +30,10 @@ __all__ = ['FingerPOSIXInterface']
class FingerPOSIXInterface(_FingerInterface):
- """ Finger interface for POSIX-compliant systems.
+ """Finger interface for POSIX-compliant systems.
- Only accessible on such systems; imports from other systems
- will result in an ``ImportError``.
+ Only accessible on such systems; imports from other systems
+ will result in an ``ImportError``.
"""
def __init__(self):
@@ -46,129 +46,131 @@ class FingerPOSIXInterface(_FingerInterface):
query: _Optional[str],
active: _Optional[bool],
) -> _Sequence[_FingerUser]:
- """ Look for users on POSIX-compliant systems.
-
- The method for gathering users and sessions on such systems
- is the following:
-
- 1. Get the users in ``/etc/passwd``, and check which ones not
- to make appear through the presence of ``.nofinger``
- in their home directory.
- 2. Get the last login times for all users to display them
- by default, through the lastlog database if available.
- 3. Get the sessions in the utmp / utmpx database, and make
- them correspond to the related user.
- 4. For each session, get the idle time by gathering the
- mtime of the device.
+ """Look for users on POSIX-compliant systems.
+
+ The method for gathering users and sessions on such systems
+ is the following:
+
+ 1. Get the users in ``/etc/passwd``, and check which ones not
+ to make appear through the presence of ``.nofinger``
+ in their home directory.
+ 2. Get the last login times for all users to display them
+ by default, through the lastlog database if available.
+ 3. Get the sessions in the utmp / utmpx database, and make
+ them correspond to the related user.
+ 4. For each session, get the idle time by gathering the
+ mtime of the device.
"""
-
self._lock.acquire()
- # Refresh the user list if required.
-
- if (
- self._lastrefreshtime is None
- or abs((self._lastrefreshtime - _dt.now()).total_seconds()) >= 1
- ):
- users = {}
- usernames_by_id = {}
+ try:
+ # Refresh the user list if required.
+ if (
+ self._lastrefreshtime is None
+ or abs(
+ (self._lastrefreshtime - _dt.now()).total_seconds(),
+ ) >= 1
+ ):
+ users = {}
+ usernames_by_id = {}
+
+ for pw in _pwd.getpwall():
+ usernames_by_id[pw.pw_uid] = pw.pw_name
+
+ if _exists(_joinpaths(pw.pw_dir, '.nofinger')):
+ continue
- for pw in _pwd.getpwall():
- usernames_by_id[pw.pw_uid] = pw.pw_name
+ gecos = pw.pw_gecos.split(',')
+ user = _FingerUser(
+ login=pw.pw_name,
+ name=gecos[0])
+ user.shell = pw.pw_shell
+ user.home = pw.pw_dir
- if _exists(_joinpaths(pw.pw_dir, '.nofinger')):
- continue
+ if len(gecos) >= 2:
+ user.office = gecos[1]
+ try:
+ with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan:
+ user.plan = plan.read()
+ except (FileNotFoundError, PermissionError):
+ pass
- gecos = pw.pw_gecos.split(',')
- user = _FingerUser(
- login=pw.pw_name,
- name=gecos[0])
- user.shell = pw.pw_shell
- user.home = pw.pw_dir
+ users[user.login] = user
- if len(gecos) >= 2:
- user.office = gecos[1]
try:
- with open(_joinpaths(pw.pw_dir, '.plan'), 'r') as plan:
- user.plan = plan.read()
- except (FileNotFoundError, PermissionError):
- pass
-
- users[user.login] = user
-
- try:
- lastlog = _pyutmpx.lastlog
- except AttributeError:
- lastlog = None
-
- if lastlog is not None:
- for lle in lastlog:
- try:
- login = usernames_by_id[lle.uid]
- except KeyError:
- continue
+ lastlog = _pyutmpx.lastlog
+ except AttributeError:
+ lastlog = None
- try:
- user = users[login]
- except KeyError:
- continue
+ if lastlog is not None:
+ for lle in lastlog:
+ try:
+ login = usernames_by_id[lle.uid]
+ except KeyError:
+ continue
- user.last_login = lle.time.replace(tzinfo=_utc)
+ try:
+ user = users[login]
+ except KeyError:
+ continue
- try:
- utmp = _pyutmpx.utmp
- except AttributeError:
- utmp = None
+ user.last_login = lle.time.replace(tzinfo=_utc)
- if utmp is not None:
- for ue in utmp:
- if ue.type != _pyutmpx.USER_PROCESS:
- continue
- try:
- user = users[ue.user]
- except KeyError:
- continue
-
- session = _FingerSession(
- time=ue.time.replace(tzinfo=_utc))
- if ue.line:
- session.line = ue.line
- if ue.host:
- session.host = ue.host
-
- session.idle = _dt.now()
- if ue.line and not ue.line.startswith(':'):
- dev_path = (
- ('', '/dev/')[ue.line[0] != '/']
- + ue.line)
- atime = _dt.fromtimestamp(
- _stat(dev_path).st_atime, _utc)
- session.idle = atime
-
- # Add the session to the user.
-
- user.sessions.add(session)
- if (
- user.last_login is None
- or user.last_login < session.start
- ):
- user.last_login = session.start
-
- # We're done refreshing!
-
- self._data = list(users.values())
- self._lastrefreshtime = _dt.now()
-
- # Get the results.
-
- results = ([
- _copy(user) for user in self._data if (
- not (query is not None and query not in user.login)
- and not (active is not None and active != bool(user.sessions))
- )
- ])
- self._lock.release()
-
- return results
+ try:
+ utmp = _pyutmpx.utmp
+ except AttributeError:
+ utmp = None
+
+ if utmp is not None:
+ for ue in utmp:
+ if ue.type != _pyutmpx.USER_PROCESS:
+ continue
+
+ try:
+ user = users[ue.user]
+ except KeyError:
+ continue
+
+ session = _FingerSession(
+ time=ue.time.replace(tzinfo=_utc),
+ )
+ if ue.line:
+ session.line = ue.line
+ if ue.host:
+ session.host = ue.host
+
+ session.idle = _dt.now()
+ if ue.line and not ue.line.startswith(':'):
+ dev_path = (
+ ('', '/dev/')[ue.line[0] != '/']
+ + ue.line
+ )
+ atime = _dt.fromtimestamp(
+ _stat(dev_path).st_atime,
+ _utc,
+ )
+ session.idle = atime
+
+ # Add the session to the user.
+ user.sessions.add(session)
+ if (
+ user.last_login is None
+ or user.last_login < session.start
+ ):
+ user.last_login = session.start
+
+ # We're done refreshing!
+ self._data = list(users.values())
+ self._lastrefreshtime = _dt.now()
+
+ # Get the results.
+ return ([
+ _copy(user) for user in self._data if (
+ (query is None or query in user.login)
+ and (active is None or active == bool(user.sessions))
+ )
+ ])
+ finally:
+ self._lock.release()
# End of file.
diff --git a/pyfingerd/utils.py b/pyfingerd/utils.py
index 99d8546..c8f364a 100644
--- a/pyfingerd/utils.py
+++ b/pyfingerd/utils.py
@@ -3,11 +3,10 @@
# Copyright (C) 2021-2022 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd Python 3.x module, which is MIT-licensed.
# *****************************************************************************
-""" Utilities for the pyfingerd module. """
+"""Utilities for the pyfingerd module."""
import logging as _logging
import re as _re
-
from datetime import timedelta as _td
from typing import Optional as _Optional, Union as _Union
@@ -24,8 +23,7 @@ error_logger = _logging.getLogger('pyfingerd.error')
def parse_delta(raw: str) -> _td:
- """ Parse a delta string as found in the configuration files. """
-
+ """Parse a delta string as found in the configuration files."""
try:
delta = _td()
@@ -54,8 +52,7 @@ def parse_delta(raw: str) -> _td:
def format_delta(td: _td) -> str:
- """ Create a delta string. """
-
+ """Create a delta string."""
sls = zip(
(_td(days=7), _td(days=1), _td(seconds=3600), _td(seconds=60)),
'wdhm',
@@ -93,8 +90,7 @@ def make_delta(
value: _Optional[_Union[str, int, float, _td]],
allow_none: bool = False,
) -> _td:
- """ Make a delta from a raw value. """
-
+ """Make a delta from a raw value."""
if value is None:
if not allow_none:
raise ValueError('must not be None')
diff --git a/pyfingerd/version.py b/pyfingerd/version.py
deleted file mode 100755
index 461d46f..0000000
--- a/pyfingerd/version.py
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env python3
-# *****************************************************************************
-# Copyright (C) 2021-2022 Thomas Touhey <thomas@touhey.fr>
-# This file is part of the pyfingerd project, which is MIT-licensed.
-# *****************************************************************************
-""" pyfingerd version definition. """
-
-version = '0.4.3'
-
-# End of file.
diff --git a/scripts/pyfingerd b/scripts/pyfingerd
index f8c6df8..902c29c 100755
--- a/scripts/pyfingerd
+++ b/scripts/pyfingerd
@@ -3,7 +3,7 @@
# Copyright (C) 2017-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
#**************************************************************************
-""" Run pyfingerd without calling the `pyfingerd` module directly. """
+"""Run pyfingerd without calling the `pyfingerd` module directly."""
from pyfingerd.cli import cli as _cli
diff --git a/setup.cfg b/setup.cfg
index fd2ce16..69add7e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = pyfingerd
-version = attr: pyfingerd.version.version
+version = attr: pyfingerd.__version__
url = https://pyfingerd.touhey.pro/
author = Thomas Touhey
author_email = thomas@touhey.fr
@@ -30,10 +30,13 @@ scripts =
[wheel]
universal = True
+[isort]
+known_first_party = pyfingerd
+
[flake8]
-ignore = D105,D107,D202,D208,D210,D401,F405,W503
+ignore = D105,D107,W503
per-file-ignores =
- tests/*:S101,D102
+ tests/*:F405,S101,D102
rst-roles =
py:class
py:attr
diff --git a/setup.py b/setup.py
index 3061d54..1a7dc05 100755
--- a/setup.py
+++ b/setup.py
@@ -3,15 +3,15 @@
# Copyright (C) 2018-2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Setup script for the pyfingerd Python package. """
+"""Setup script for the pyfingerd Python package."""
import os.path as _path
+
from setuptools import setup as _setup
kwargs = {}
# Add requirements using the requirements.txt file.
-
requirements = set()
with open(_path.join(_path.dirname(__file__), 'requirements.txt'), 'r') as f:
requirements.update(f.read().splitlines())
@@ -19,7 +19,6 @@ with open(_path.join(_path.dirname(__file__), 'requirements.txt'), 'r') as f:
kwargs['install_requires'] = sorted(filter(lambda x: x, requirements))
# Use Sphinx for building docs.
-
try:
from sphinx.setup_command import BuildDoc as _BuildDoc
kwargs['cmdclass'] = {'build_sphinx': _BuildDoc}
@@ -27,11 +26,8 @@ except ImportError:
pass
# Actually, most of the project's data is read from the `setup.cfg` file.
-
kwargs['setup_requires'] = ['flake8']
-_setup(
- **kwargs,
-)
+_setup(**kwargs)
# End of file.
diff --git a/tests/__init__.py b/tests/__init__.py
index a744c3e..4c6871d 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -3,7 +3,7 @@
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Unit tests for the `pyfingerd` Python module. """
+"""Unit tests for the `pyfingerd` Python module."""
# This file is only there to indicate that the folder is a module.
# It doesn't actually contain code.
diff --git a/tests/test_binds.py b/tests/test_binds.py
index e1fac43..713ef08 100644
--- a/tests/test_binds.py
+++ b/tests/test_binds.py
@@ -3,16 +3,19 @@
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Tests for the pyfingerd server. """
+"""Tests for the pyfingerd server."""
import socket
-from pyfingerd.binds import FingerBindsDecoder, FingerTCPv4Bind
+from pyfingerd.binds import (
+ FingerBindsDecoder, FingerTCPv4Bind, FingerTCPv6Bind,
+)
+
import pytest
class TestFingerDecoder:
- """ Test binds. """
+ """Test binds."""
@pytest.fixture
def decoder(self):
@@ -22,16 +25,41 @@ class TestFingerDecoder:
assert decoder.decode('') == ()
@pytest.mark.parametrize('raw,cls,params', (
- ('127.0.0.1:79', FingerTCPv4Bind, (
- socket.AF_INET,
- '127.0.0.1',
- 79,
- )),
- ('127.0.2.3', FingerTCPv4Bind, (
- socket.AF_INET,
+ (
+ '127.0.0.1:79',
+ FingerTCPv4Bind,
+ (socket.AF_INET, '127.0.0.1', 79),
+ ),
+ (
'127.0.2.3',
- 79,
- )),
+ FingerTCPv4Bind,
+ (socket.AF_INET, '127.0.2.3', 79),
+ ),
+ (
+ '1.2:1234',
+ FingerTCPv4Bind,
+ (socket.AF_INET, '1.0.0.2', 1234),
+ ),
+ (
+ '1.257:1235',
+ FingerTCPv4Bind,
+ (socket.AF_INET, '1.0.1.1', 1235),
+ ),
+ (
+ '[::1]',
+ FingerTCPv6Bind,
+ (socket.AF_INET6, '::1', 79),
+ ),
+ (
+ '[1::2:3]:8081',
+ FingerTCPv6Bind,
+ (socket.AF_INET6, '1::2:3', 8081),
+ ),
+ (
+ '[1:2::3.4.5.6]:8246',
+ FingerTCPv6Bind,
+ (socket.AF_INET6, '1:2::304:506', 8246),
+ ),
))
def test_binds(self, decoder, raw, cls, params):
binds = decoder.decode(raw)
diff --git a/tests/test_scenarios.py b/tests/test_scenarios.py
index 1cd9db3..4314959 100644
--- a/tests/test_scenarios.py
+++ b/tests/test_scenarios.py
@@ -3,7 +3,7 @@
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Tests for the pyfingerd server. """
+"""Tests for the pyfingerd server."""
from pyfingerd.fiction import (
FingerScenario, FingerUserCreationAction, FingerUserDeletionAction,
@@ -14,18 +14,18 @@ import pytest
class TestScenarios:
- """ Test scenarios. """
+ """Test scenarios."""
def test_no_ending_type(self):
scenario = FingerScenario()
scenario.duration = '1m'
- with pytest.raises(ValueError, match=r'ending type'):
+ with pytest.raises(ValueError, match=r'Ending type'):
scenario.verify()
def test_no_duration(self):
scenario = FingerScenario()
scenario.ending_type = FingerScenario.EndingType.FREEZE
- with pytest.raises(ValueError, match=r'ending time'):
+ with pytest.raises(ValueError, match=r'Ending time'):
scenario.verify()
def test_edit_without_create_user(self):
diff --git a/tests/test_server.py b/tests/test_server.py
index de944f8..ddcb8b5 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -3,10 +3,9 @@
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
-""" Tests for the pyfingerd server. """
+"""Tests for the pyfingerd server."""
import socket
-
from datetime import timedelta
from time import sleep
@@ -17,16 +16,15 @@ import pytest
class TestFingerConnection:
- """ Test basic finger connections. """
+ """Test basic finger connections."""
@pytest.fixture(autouse=True)
def fingerserver(self):
- """ Start a finger server.
+ """Start a finger server.
- A fixture starting a finger server on ``localhost:3099``
- and stopping it after the test.
+ A fixture starting a finger server on ``localhost:3099``
+ and stopping it after the test.
"""
-
scenario = FingerScenario()
scenario.ending_type = 'freeze'
scenario.duration = timedelta(seconds=5)
@@ -38,21 +36,24 @@ class TestFingerConnection:
shell='/bin/bash', # NOQA
office='84.6',
),
- timedelta(seconds=-5))
+ timedelta(seconds=-5),
+ )
scenario.add(
FingerUserLoginAction(
login='john',
line='tty1',
),
- timedelta(seconds=0))
+ timedelta(seconds=0),
+ )
scenario.add(
FingerUserLogoutAction(
login='john',
),
- timedelta(seconds=1))
+ timedelta(seconds=1),
+ )
class TestFormatter(FingerFormatter):
- """ Test formatter, uncomplicated to test. """
+ """Test formatter, uncomplicated to test."""
def _format_users(self, users):
result = f'{len(users)}\n'
@@ -106,10 +107,7 @@ class TestFingerConnection:
.rstrip('\r\n').split('\r\n')
)
- assert result[:2] == (
- 'EXAMPLE.ORG',
- command,
- )
+ assert result[:2] == ('EXAMPLE.ORG', command)
return result[2:]
# ---
@@ -117,13 +115,11 @@ class TestFingerConnection:
# ---
def test_no_user_list(self):
- """ Test if an unknown user returns an empty result. """
-
+ """Test if an unknown user returns an empty result."""
assert self._send_command('user') == ('long', '0')
def test_existing_user_list(self):
- """ Test the user list before and after the cron is executed. """
-
+ """Test the user list before and after the cron is executed."""
assert self._send_command('') == (
'short', '1',
'john|John Doe|/home/john|/bin/bash|84.6|1',