aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpyfingerd/__main__.py2
-rw-r--r--pyfingerd/binds.py12
-rwxr-xr-xpyfingerd/cli.py2
-rwxr-xr-xpyfingerd/core.py361
-rwxr-xr-xpyfingerd/fiction.py162
-rwxr-xr-xpyfingerd/native.py4
-rwxr-xr-xpyfingerd/posix.py10
-rw-r--r--pyfingerd/utils.py22
-rw-r--r--tests/test_server.py4
9 files changed, 296 insertions, 283 deletions
diff --git a/pyfingerd/__main__.py b/pyfingerd/__main__.py
index b1dcbcc..12bea46 100755
--- a/pyfingerd/__main__.py
+++ b/pyfingerd/__main__.py
@@ -7,8 +7,6 @@
from .cli import cli as _cli
-__all__ = []
-
if __name__ == '__main__':
_cli()
diff --git a/pyfingerd/binds.py b/pyfingerd/binds.py
index dd81e58..df88bd0 100644
--- a/pyfingerd/binds.py
+++ b/pyfingerd/binds.py
@@ -29,11 +29,7 @@ class FingerTCPv4Bind(FingerBind):
"""IPv4 TCP Address."""
def __init__(self, address: str, port: int):
- try:
- self._addr = _socket.inet_pton(_socket.AF_INET, address)
- except Exception:
- self._addr = address
-
+ self._addr = _socket.inet_pton(_socket.AF_INET, address)
self._port = port
@property
@@ -50,11 +46,7 @@ class FingerTCPv6Bind(FingerBind):
"""IPv6 TCP Address."""
def __init__(self, address: str, port: int):
- try:
- self._addr = _socket.inet_pton(_socket.AF_INET6, address)
- except Exception:
- self._addr = address
-
+ self._addr = _socket.inet_pton(_socket.AF_INET6, address)
self._port = port
@property
diff --git a/pyfingerd/cli.py b/pyfingerd/cli.py
index 2fdb7a6..d1074b2 100755
--- a/pyfingerd/cli.py
+++ b/pyfingerd/cli.py
@@ -14,7 +14,7 @@ from sys import stderr as _stderr
import click as _click
-import coloredlogs as _coloredlogs
+import coloredlogs as _coloredlogs # type: ignore
from . import __version__ as _version
from .core import (
diff --git a/pyfingerd/core.py b/pyfingerd/core.py
index 6d30db4..9611294 100755
--- a/pyfingerd/core.py
+++ b/pyfingerd/core.py
@@ -14,9 +14,9 @@ import copy as _copy
import multiprocessing as _multip
import signal as _signal
import string as _string
+import typing as _t
from datetime import datetime as _dt, timedelta as _td, tzinfo as _tzinfo
from errno import errorcode as _errorcode
-from typing import Optional as _Optional, Sequence as _Sequence
from croniter import croniter as _croniter
@@ -55,10 +55,10 @@ def cron(spec: str):
by checking if the attribute exists and starting a dedicated
coroutine for running it periodically using the given specification.
"""
- spec = _croniter(spec)
+ cron_spec = _croniter(spec)
def decorator(func):
- func.__cron__ = spec
+ func.__cron__ = cron_spec
return func
return decorator
@@ -78,13 +78,12 @@ class FingerSession:
__slots__ = ('_start', '_line', '_host', '_idle')
- def __init__(self, *_, time: _Optional[_dt] = None):
- self._start = _dt.now()
+ def __init__(self, *_, time: _t.Optional[_dt] = None):
self._idle = None
self._line = None
self._host = None
- self.start = time
+ self.start = time or _dt.now()
self._idle = self._start
def __repr__(self):
@@ -105,15 +104,17 @@ class FingerSession:
return self._start
@start.setter
- def start(self, value):
- value = value if isinstance(value, _dt) else _dt(value)
+ def start(self, value: _dt):
+ if not isinstance(value, _dt):
+ raise TypeError('Expected a datetime value')
+
if value.tzinfo is None:
value = value.replace(tzinfo=_utc)
self._start = value
@property
- def idle(self) -> _dt:
+ def idle(self) -> _t.Optional[_dt]:
"""Get the timestamp since which the user is idle on the session.
Note that when set, if no timezone is present,
@@ -124,8 +125,10 @@ class FingerSession:
return self._idle
@idle.setter
- def idle(self, value):
- value = value if isinstance(value, _dt) else _dt(value)
+ def idle(self, value: _dt):
+ if not isinstance(value, _dt):
+ raise TypeError('Expected a datetime value')
+
if value.tzinfo is None:
value = value.replace(tzinfo=_utc)
@@ -135,7 +138,7 @@ class FingerSession:
self._idle = value
@property
- def line(self) -> _Optional[str]:
+ def line(self) -> _t.Optional[str]:
"""Get the line on which the user is."""
return self._line
@@ -144,7 +147,7 @@ class FingerSession:
self._line = None if value is None else str(value)
@property
- def host(self) -> _Optional[str]:
+ def host(self) -> _t.Optional[str]:
"""Get the host from which the user is connected."""
return self._host
@@ -153,148 +156,6 @@ class FingerSession:
self._host = None if value is None else str(value)
-class FingerUser:
- """Representation of a user on the system.
-
- Returned by subclasses of :py:class:`FingerInterface`,
- and used by subclasses of :py:class:`FingerFormatter`.
-
- :param login: The login of the user.
- :param name: The display name of the user.
- :param home: The path to the home of the user.
- :param shell: The path to the user's default shell.
- """
-
- __slots__ = (
- '_login', '_name', '_home', '_shell', '_office',
- '_plan', '_last_login', '_sessions',
- )
-
- def __init__(
- self, *_,
- login: _Optional[str] = None,
- name: _Optional[str] = None,
- home: _Optional[str] = None,
- shell: _Optional[str] = None,
- ):
- self._login = None
- self._name = ''
- self._home = None
- self._shell = None
- self._office = None
- self._plan = None
- self._last_login = None
- self._sessions = _FingerSessionManager()
-
- self.login = login
- self.name = name
- self.home = home
- self.shell = shell
-
- def __repr__(self):
- p = (
- 'login', 'name', 'home', 'shell', 'office',
- 'last_login', 'sessions',
- )
- p = (
- f'{x}={getattr(self, x)!r}'
- for x in p if getattr(self, x) is not None
- )
- return f"{self._class__.__name__}({', '.join(p)})"
-
- @property
- def login(self) -> _Optional[str]:
- """Get the login name of the user, e.g. 'cake' or 'gaben'."""
- return self._login
-
- @login.setter
- def login(self, value: _Optional[str]) -> None:
- self._login = value
-
- @property
- def name(self) -> _Optional[str]:
- """Get the display name of the user, e.g. 'Jean Dupont'."""
- return self._name
-
- @name.setter
- def name(self, value: _Optional[str]) -> None:
- self._name = value
-
- @property
- def last_login(self) -> _Optional[str]:
- """Get the last login date for the user.
-
- Is None if not known.
- """
- return self._last_login
-
- @last_login.setter
- def last_login(self, value: _Optional[str]) -> None:
- self._last_login = (
- None if value is None
- else value if isinstance(value, _dt)
- else _dt(value)
- )
-
- @property
- def home(self) -> _Optional[str]:
- """Get the path to the user's home on the given system.
-
- Is None if not known or defined.
- """
- return self._home
-
- @home.setter
- def home(self, value: _Optional[str]) -> None:
- self._home = None if value is None else str(value)
-
- @property
- def shell(self) -> _Optional[str]:
- """Get the path to the user's shell on the given system.
-
- Is None if not known or defined.
- """
- return self._shell
-
- @shell.setter
- def shell(self, value: _Optional[str]) -> None:
- self._shell = None if value is None else str(value)
-
- @property
- def office(self) -> _Optional[str]:
- """Get the display name of the user's office.
-
- Is None if not known or defined.
- """
- return self._office
-
- @office.setter
- def office(self, value: _Optional[str]) -> None:
- self._office = None if value is None else str(value)
-
- @property
- def plan(self) -> _Optional[str]:
- """Get the plan of the user.
-
- Usually the content of the ``.plan`` file in the user's home
- on real (and kind of obsolete) UNIX-like systems.
- """
- return self._plan
-
- @plan.setter
- def plan(self, value: _Optional[str]) -> None:
- if value is None:
- self._plan = None
- else:
- value = str(value)
- self._plan = '\n'.join(value.splitlines())
-
- @property
- def sessions(self) -> _Sequence[FingerSession]:
- """Get the current sessions array for the user, always defined."""
- return self._sessions
-
-
class _FingerSessionManager:
"""Session manager."""
@@ -330,7 +191,7 @@ class _FingerSessionManager:
else:
self._sessions.pop(idx)
- def __getitem__(self, key):
+ def __getitem__(self, key: _t.Optional[str]):
if key is None:
try:
return self._sessions[0]
@@ -417,6 +278,148 @@ class _FingerSessionManager:
self._sessions.insert(0, session)
+
+class FingerUser:
+ """Representation of a user on the system.
+
+ Returned by subclasses of :py:class:`FingerInterface`,
+ and used by subclasses of :py:class:`FingerFormatter`.
+
+ :param login: The login of the user.
+ :param name: The display name of the user.
+ :param home: The path to the home of the user.
+ :param shell: The path to the user's default shell.
+ """
+
+ __slots__ = (
+ '_login', '_name', '_home', '_shell', '_office',
+ '_plan', '_last_login', '_sessions',
+ )
+
+ def __init__(
+ self, *_,
+ login: _t.Optional[str] = None,
+ name: _t.Optional[str] = None,
+ home: _t.Optional[str] = None,
+ shell: _t.Optional[str] = None,
+ ):
+ self._login = None
+ self._name = None
+ self._home = None
+ self._shell = None
+ self._office = None
+ self._plan = None
+ self._last_login = None
+ self._sessions = _FingerSessionManager()
+
+ self.login = login
+ self.name = name
+ self.home = home
+ self.shell = shell
+
+ def __repr__(self):
+ p = (
+ 'login', 'name', 'home', 'shell', 'office',
+ 'last_login', 'sessions',
+ )
+ p = (
+ f'{x}={getattr(self, x)!r}'
+ for x in p if getattr(self, x) is not None
+ )
+ return f"{self._class__.__name__}({', '.join(p)})"
+
+ @property
+ def login(self) -> _t.Optional[str]:
+ """Get the login name of the user, e.g. 'cake' or 'gaben'."""
+ return self._login
+
+ @login.setter
+ def login(self, value: _t.Optional[str]) -> None:
+ self._login = value
+
+ @property
+ def name(self) -> _t.Optional[str]:
+ """Get the display name of the user, e.g. 'Jean Dupont'."""
+ return self._name
+
+ @name.setter
+ def name(self, value: _t.Optional[str]) -> None:
+ self._name = value
+
+ @property
+ def last_login(self) -> _t.Optional[_dt]:
+ """Get the last login date for the user.
+
+ Is None if not known.
+ """
+ return self._last_login
+
+ @last_login.setter
+ def last_login(self, value: _t.Optional[_dt]) -> None:
+ if value is not None and not isinstance(value, _dt):
+ raise TypeError('Expected None or a datetime value')
+
+ self._last_login = value
+
+ @property
+ def home(self) -> _t.Optional[str]:
+ """Get the path to the user's home on the given system.
+
+ Is None if not known or defined.
+ """
+ return self._home
+
+ @home.setter
+ def home(self, value: _t.Optional[str]) -> None:
+ self._home = None if value is None else str(value)
+
+ @property
+ def shell(self) -> _t.Optional[str]:
+ """Get the path to the user's shell on the given system.
+
+ Is None if not known or defined.
+ """
+ return self._shell
+
+ @shell.setter
+ def shell(self, value: _t.Optional[str]) -> None:
+ self._shell = None if value is None else str(value)
+
+ @property
+ def office(self) -> _t.Optional[str]:
+ """Get the display name of the user's office.
+
+ Is None if not known or defined.
+ """
+ return self._office
+
+ @office.setter
+ def office(self, value: _t.Optional[str]) -> None:
+ self._office = None if value is None else str(value)
+
+ @property
+ def plan(self) -> _t.Optional[str]:
+ """Get the plan of the user.
+
+ Usually the content of the ``.plan`` file in the user's home
+ on real (and kind of obsolete) UNIX-like systems.
+ """
+ return self._plan
+
+ @plan.setter
+ def plan(self, value: _t.Optional[str]) -> None:
+ if value is None:
+ self._plan = None
+ else:
+ value = str(value)
+ self._plan = '\n'.join(value.splitlines())
+
+ @property
+ def sessions(self) -> _FingerSessionManager:
+ """Get the current sessions array for the user, always defined."""
+ return self._sessions
+
+
# ---
# Formatter base class.
# ---
@@ -439,7 +442,7 @@ class FingerFormatter:
:param tzinfo: Timezone used for formatting dates and times.
"""
- def __init__(self, tzinfo: _Optional[_tzinfo] = None):
+ def __init__(self, tzinfo: _t.Optional[_tzinfo] = None):
if tzinfo is None:
tzinfo = _dt.now().astimezone().tzinfo
self._tzinfo = tzinfo
@@ -539,7 +542,7 @@ class FingerFormatter:
self,
hostname: str,
raw_query: str,
- users: _Sequence[FingerUser],
+ users: _t.Sequence[FingerUser],
) -> str:
"""Return the formatted answer for a user list in the 'short' format.
@@ -609,7 +612,7 @@ class FingerFormatter:
self,
hostname: str,
raw_query: str,
- users: _Sequence[FingerUser],
+ users: _t.Sequence[FingerUser],
) -> str:
"""Return the formatted answer for a user list in the 'long' format.
@@ -626,13 +629,13 @@ class FingerFormatter:
for user in users:
res += (
- f'Login name: {user.login[:27]:<27} '
- + f'Name: {user.name if user.name else user.login}\r\n'
- + f'Directory: {user.home[:28] if user.home else "":<28} '
+ f'Login name: {user.login or "":<27.27} '
+ + f'Name: {user.name if user.name else user.login or ""}\r\n'
+ + f'Directory: {user.home if user.home else "":<28.28} '
+ f'Shell: {user.shell if user.shell else ""}\r\n'
)
if user.office:
- res += f"Office: {user.office if user.office else ''}\r\n"
+ res += f'Office: {user.office if user.office else ""}\r\n'
if user.sessions:
# List current sessions.
@@ -701,7 +704,7 @@ class FingerInterface:
def transmit_query(
self,
- query: _Optional[str],
+ query: _t.Optional[str],
host: str,
verbose: bool,
) -> str:
@@ -723,9 +726,9 @@ class FingerInterface:
def search_users(
self,
- query: _Optional[str],
- active: _Optional[bool],
- ) -> _Sequence[FingerUser]:
+ query: _t.Optional[str],
+ active: _t.Optional[bool],
+ ) -> _t.Sequence[FingerUser]:
"""Search for users on the current host using the given query.
:param query: The user query, set to None in case of no
@@ -820,6 +823,9 @@ class FingerServer:
answers sent to clients.
"""
+ _p: _t.Optional[_multip.Process] = None
+ """Process running the pyfingerd server."""
+
def __init__(
self,
binds: str = 'localhost:79',
@@ -862,9 +868,6 @@ class FingerServer:
if not self._binds:
raise _NoBindsError()
- # Initialize multi-process related.
- self._p = None
-
@property
def hostname(self):
"""Get the hostname configured for this server."""
@@ -991,14 +994,14 @@ class FingerServer:
if name == 'EADDRINUSE':
_logger.error(
f'Could not bind to [{host}]:{port}: '
- 'address already in use.',
+ + 'address already in use.',
)
return
elif name == 'EACCES':
_logger.error(
f'Could not bind to [{host}]:{port}: '
- f'port {port} privileged port and process '
- 'is unprivileged.',
+ + f'port {port} privileged port and process '
+ + 'is unprivileged.',
)
else:
raise
@@ -1086,16 +1089,18 @@ class FingerServer:
if self._p is not None and self._p.is_alive():
return
- self._p = _multip.Process(target=self._serve)
- self._p.start()
+ p = _multip.Process(target=self._serve)
+ p.start()
+ self._p = p
def stop(self) -> None:
"""Stop all underlying server processes and unbind all ports."""
- if self._p is None or not self._p.is_alive():
+ p = self._p
+ if p is None or not p.is_alive():
return
- self._p.kill()
- self._p.join()
+ p.kill()
+ p.join()
self._p = None
def serve_forever(self) -> None:
diff --git a/pyfingerd/fiction.py b/pyfingerd/fiction.py
index db4aff2..9632bb0 100755
--- a/pyfingerd/fiction.py
+++ b/pyfingerd/fiction.py
@@ -12,12 +12,10 @@ import copy as _copy
import logging as _logging
import math as _math
import os.path as _path
+import typing as _t
from collections import defaultdict as _defaultdict
from datetime import datetime as _dt, timedelta as _td
from enum import Enum as _Enum
-from typing import (
- Optional as _Optional, Sequence as _Sequence, Union as _Union,
-)
from .core import (
FingerInterface as _FingerInterface,
@@ -157,12 +155,12 @@ class FictionalFingerSession(_FingerSession):
# return Unchanged, like for None and NoneType.
-class _UnchangedType:
+class UnchangedType:
def __repr__(self):
return 'Unchanged'
-Unchanged = _UnchangedType()
+Unchanged = UnchangedType()
# ---
# Action description.
@@ -191,11 +189,11 @@ class FingerUserCreationAction(FingerAction):
def __init__(
self,
login: str,
- name: _Optional[str] = None,
- home: _Optional[str] = None,
- shell: _Optional[str] = None,
- office: _Optional[str] = None,
- plan: _Optional[str] = None,
+ name: _t.Optional[str] = None,
+ home: _t.Optional[str] = None,
+ shell: _t.Optional[str] = None,
+ office: _t.Optional[str] = None,
+ plan: _t.Optional[str] = None,
):
super().__init__()
self._login = login
@@ -218,27 +216,27 @@ class FingerUserCreationAction(FingerAction):
return self._login
@property
- def name(self) -> _Optional[str]:
+ def name(self) -> _t.Optional[str]:
"""Get the initial value for :py:attr:`FictionalFingerUser.name`."""
return self._name
@property
- def home(self) -> _Optional[str]:
+ def home(self) -> _t.Optional[str]:
"""Get the initial value for :py:attr:`FictionalFingerUser.home`."""
return self._home
@property
- def shell(self) -> _Optional[str]:
+ def shell(self) -> _t.Optional[str]:
"""Get the initial value for :py:attr:`FictionalFingerUser.shell`."""
return self._shell
@property
- def office(self) -> _Optional[str]:
+ def office(self) -> _t.Optional[str]:
"""Get the initial value for :py:attr:`FictionalFingerUser.office`."""
return self._office
@property
- def plan(self) -> _Optional[str]:
+ def plan(self) -> _t.Optional[str]:
"""Get the initial value for :py:attr:`FictionalFingerUser.plan`."""
return self._plan
@@ -262,11 +260,11 @@ class FingerUserEditionAction(FingerAction):
def __init__(
self,
login: str,
- name: _Optional[_Union[str, _UnchangedType]] = Unchanged,
- home: _Optional[_Union[str, _UnchangedType]] = Unchanged,
- shell: _Optional[_Union[str, _UnchangedType]] = Unchanged,
- office: _Optional[_Union[str, _UnchangedType]] = Unchanged,
- plan: _Optional[_Union[str, _UnchangedType]] = Unchanged,
+ name: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged,
+ home: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged,
+ shell: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged,
+ office: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged,
+ plan: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged,
):
super().__init__()
self._login = login
@@ -289,7 +287,7 @@ class FingerUserEditionAction(FingerAction):
return self._login
@property
- def name(self) -> _Optional[_Union[str, _UnchangedType]]:
+ def name(self) -> _t.Optional[_t.Union[str, UnchangedType]]:
"""Get the new value for :py:attr:`FictionalFingerUser.name`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -297,7 +295,7 @@ class FingerUserEditionAction(FingerAction):
return self._name
@property
- def home(self) -> _Optional[_Union[str, _UnchangedType]]:
+ def home(self) -> _t.Optional[_t.Union[str, UnchangedType]]:
"""Get the new value for :py:attr:`FictionalFingerUser.home`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -305,7 +303,7 @@ class FingerUserEditionAction(FingerAction):
return self._home
@property
- def shell(self) -> _Optional[_Union[str, _UnchangedType]]:
+ def shell(self) -> _t.Optional[_t.Union[str, UnchangedType]]:
"""Get the new value for :py:attr:`FictionalFingerUser.shell`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -313,7 +311,7 @@ class FingerUserEditionAction(FingerAction):
return self._shell
@property
- def office(self) -> _Optional[_Union[str, _UnchangedType]]:
+ def office(self) -> _t.Optional[_t.Union[str, UnchangedType]]:
"""Get the new value for :py:attr:`FictionalFingerUser.office`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -321,7 +319,7 @@ class FingerUserEditionAction(FingerAction):
return self._office
@property
- def plan(self) -> _Optional[_Union[str, _UnchangedType]]:
+ def plan(self) -> _t.Optional[_t.Union[str, UnchangedType]]:
"""Get the new value for :py:attr:`FictionalFingerUser.plan`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -361,9 +359,9 @@ class FingerUserLoginAction(FingerAction):
def __init__(
self,
login: str,
- session_name: _Optional[str] = None,
- line: _Optional[str] = None,
- host: _Optional[str] = None,
+ session_name: _t.Optional[str] = None,
+ line: _t.Optional[str] = None,
+ host: _t.Optional[str] = None,
):
super().__init__()
self._login = login
@@ -384,17 +382,17 @@ class FingerUserLoginAction(FingerAction):
return self._login
@property
- def session_name(self) -> _Optional[str]:
+ def session_name(self) -> _t.Optional[str]:
"""Get the name of the session to create."""
return self._session
@property
- def line(self) -> _Optional[str]:
+ def line(self) -> _t.Optional[str]:
"""Get the name of the line from which the user has logged in."""
return self._line
@property
- def host(self) -> _Optional[str]:
+ def host(self) -> _t.Optional[str]:
"""Get the name of the host from which the user has logged in."""
return self._host
@@ -412,8 +410,8 @@ class FingerUserSessionChangeAction(FingerAction):
def __init__(
self,
login: str,
- session_name: _Optional[str] = None,
- idle: _Union[bool, _UnchangedType] = Unchanged,
+ session_name: _t.Optional[str] = None,
+ idle: _t.Union[bool, UnchangedType] = Unchanged,
):
super().__init__()
self._login = login
@@ -433,12 +431,12 @@ class FingerUserSessionChangeAction(FingerAction):
return self._login
@property
- def session_name(self) -> _Optional[str]:
+ def session_name(self) -> _t.Optional[str]:
"""Get the name of the session to edit."""
return self._session
@property
- def idle(self) -> _Union[bool, _UnchangedType]:
+ def idle(self) -> _t.Union[bool, UnchangedType]:
"""Get the new value for :py:attr:`FictionalFingerSession.is_idle`.
Is :py:data:`Unchanged` if the property is unchanged.
@@ -456,7 +454,7 @@ class FingerUserLogoutAction(FingerAction):
def __init__(
self,
login: str,
- session_name: _Optional[str] = None,
+ session_name: _t.Optional[str] = None,
):
super().__init__()
self._login = login
@@ -475,7 +473,7 @@ class FingerUserLogoutAction(FingerAction):
return self._login
@property
- def session_name(self) -> _Optional[str]:
+ def session_name(self) -> _t.Optional[str]:
"""Get the name of the session to delete."""
return self._session
@@ -516,9 +514,9 @@ class FingerFictionInterface(_FingerInterface):
def search_users(
self,
- query: _Optional[str],
- active: _Optional[bool],
- ) -> _Sequence[FictionalFingerUser]:
+ query: _t.Optional[str],
+ active: _t.Optional[bool],
+ ) -> _t.Sequence[FictionalFingerUser]:
"""Look for users according to a check."""
return ([
_copy.deepcopy(user) for user in self._users.values()
@@ -539,7 +537,7 @@ class FingerFictionInterface(_FingerInterface):
self._users = {}
self._lasttime = None
- def apply(self, action, time: _Optional[_dt] = None):
+ def apply(self, action, time: _t.Optional[_dt] = None):
"""Apply an action to the scene.
By default, the time of the action is the current time.
@@ -576,15 +574,15 @@ class FingerFictionInterface(_FingerInterface):
)
user = self._users[action.login]
- if action.name is not Unchanged:
+ if not isinstance(action.name, UnchangedType):
user.name = action.name
- if action.shell is not Unchanged:
+ if not isinstance(action.shell, UnchangedType):
user.shell = action.shell
- if action.home is not Unchanged:
+ if not isinstance(action.home, UnchangedType):
user.home = action.home
- if action.office is not Unchanged:
+ if not isinstance(action.office, UnchangedType):
user.office = action.office
- if action.plan is not Unchanged:
+ if not isinstance(action.plan, UnchangedType):
user.plan = action.plan
elif isinstance(action, FingerUserDeletionAction):
# Delete user with login `action.login`.
@@ -639,8 +637,8 @@ class FingerFictionInterface(_FingerInterface):
del user.sessions[action.session_name]
except (KeyError, IndexError):
raise ValueError(
- f'Got no session {action.name!r} '
- f'for user {action.login!r}',
+ f'Got no session {action.session_name!r} '
+ + f'for user {action.login!r}',
) from None
elif isinstance(action, FingerUserSessionChangeAction):
# Make user with login `action.login` idle.
@@ -658,8 +656,8 @@ class FingerFictionInterface(_FingerInterface):
session = user.sessions[action.session_name]
except (KeyError, IndexError):
raise ValueError(
- f'Got no session {action.name!r} '
- f'for user {action.login!r}',
+ f'Got no session {action.session_name!r} '
+ + f'for user {action.login!r}',
) from None
since = time
@@ -729,15 +727,17 @@ class FingerScenario:
value = self.EndingType(value)
except ValueError:
try:
- if isinstance(value, str):
- value = value.casefold()
- value = {
- None: None,
- 'interrupt': self.EndingType.FREEZE,
- 'freeze': self.EndingType.FREEZE,
- 'stop': self.EndingType.STOP,
- 'repeat': self.EndingType.REPEAT,
- }[value]
+ if value is None:
+ pass
+ elif isinstance(value, str):
+ value = {
+ 'interrupt': self.EndingType.FREEZE,
+ 'freeze': self.EndingType.FREEZE,
+ 'stop': self.EndingType.STOP,
+ 'repeat': self.EndingType.REPEAT,
+ }[value.casefold()]
+ else:
+ raise KeyError(value)
except KeyError:
raise TypeError(
f'Invalid value for ending type: {value!r}',
@@ -746,7 +746,7 @@ class FingerScenario:
self._end_type = value
@property
- def duration(self) -> _Optional[_td]:
+ def duration(self) -> _t.Optional[_td]:
"""Offset of the ending.
When the offset is reached, any object following
@@ -756,8 +756,12 @@ class FingerScenario:
return self._end_time
@duration.setter
- def duration(self, value: _Optional[_Union[_td, str]]) -> None:
- self._end_time = _make_delta(value, allow_none=True)
+ def duration(self, value: _t.Optional[_t.Union[_td, str]]) -> None:
+ if value is None:
+ self._end_time = None
+ return
+
+ self._end_time = _make_delta(value)
def verify(self) -> None:
"""Verify that the current scenario is valid.
@@ -783,8 +787,13 @@ class FingerScenario:
raise ValueError('Ending type has not been provided')
# Check if the events are coherent.
- users = _defaultdict(lambda: False)
- sessions = _defaultdict(lambda: _defaultdict(lambda: 0))
+ # TODO: Had to make the key an optional str instead of an str here
+ # since the action.login field might be optional?
+ # Not sure how safe this is tho.
+ users: _t.Dict[_t.Optional[str], bool] = _defaultdict(lambda: False)
+ sessions: _t.Dict[_t.Optional[str], _t.Dict[_t.Optional[str], int]] = (
+ _defaultdict(lambda: _defaultdict(lambda: 0))
+ )
for time, action, i in self._actions:
try:
@@ -805,14 +814,14 @@ class FingerScenario:
# The user we're trying to edit doesn't exist.
raise ValueError(
'Trying to edit user '
- f"{action.login!r} while it doesn't exist",
+ + f"{action.login!r} while it doesn't exist",
)
elif isinstance(action, FingerUserDeletionAction):
if action.login not in users:
# The user we're trying to delete doesn't exist.
raise ValueError(
'Trying to delete user '
- f"{action.login!r} while it doesn't exist",
+ + f"{action.login!r} while it doesn't exist",
)
del users[action.login]
@@ -825,7 +834,7 @@ class FingerScenario:
# The user we're trying to log in as doesn't exist.
raise ValueError(
'Trying to log in as user '
- f"{action.login!r} which doesn't exist",
+ + f"{action.login!r} which doesn't exist",
)
sessions[action.login][action.session_name] += 1
@@ -924,7 +933,10 @@ class FingerScenario:
end_time = time
continue
- elif typ == 'create':
+
+ action: FingerAction
+ plan: _t.Union[UnchangedType, None, str]
+ if typ == 'create':
# User creation.
plan = None
if 'plan' in data:
@@ -1029,9 +1041,9 @@ class FingerScenario:
def get(
self,
- to: _Optional[_td] = None,
- since: _Optional[_td] = None,
- ) -> _Sequence[FingerAction]:
+ to: _t.Optional[_td] = None,
+ since: _t.Optional[_td] = None,
+ ) -> _t.Iterator[_t.Tuple[_dt, FingerAction]]:
"""Return a sequence of actions in order from the scenario.
:param to: Maximum timedelta for the actions to gather.
@@ -1053,7 +1065,7 @@ class FingerScenario:
continue
yield time, action
- def add(self, action: FingerAction, time: _Union[_td, str]):
+ def add(self, action: FingerAction, time: _t.Union[_td, str]):
"""Add an action at the given time to the registered actions."""
time = _make_delta(time)
@@ -1081,7 +1093,7 @@ class FingerScenarioInterface(FingerFictionInterface):
def __init__(
self,
scenario: FingerScenario,
- start: _Optional[_dt] = None,
+ start: _t.Optional[_dt] = None,
):
if start is None:
start = _dt.now()
@@ -1094,7 +1106,7 @@ class FingerScenarioInterface(FingerFictionInterface):
if not isinstance(scenario, FingerScenario):
raise TypeError(
'Scenario should be a FingerScenario, '
- f'is {scenario.__class__.__name__}.',
+ + f'is {scenario.__class__.__name__}.',
)
scenario.verify()
@@ -1169,7 +1181,7 @@ class FingerScenarioInterface(FingerFictionInterface):
if actions:
_logger.debug(
f'Applying {len(actions)} '
- f'action{("", "s")[len(actions) >= 2]}:',
+ + f'action{("", "s")[len(actions) >= 2]}:',
)
for time, action in actions:
_logger.debug(f' at {_format_delta(time)}: {action!r}')
diff --git a/pyfingerd/native.py b/pyfingerd/native.py
index dfa03e4..70592ff 100755
--- a/pyfingerd/native.py
+++ b/pyfingerd/native.py
@@ -5,10 +5,14 @@
# *****************************************************************************
"""Defining the native interface."""
+import typing as _t
+
from .core import FingerInterface as _FingerInterface
__all__ = ['FingerNativeInterface']
+FingerNativeInterface: _t.Type[_FingerInterface]
+
class _FingerNoNativeFoundInterface(_FingerInterface):
"""Placeholder that doesn't initiate."""
diff --git a/pyfingerd/posix.py b/pyfingerd/posix.py
index 98ccb2a..88629b5 100755
--- a/pyfingerd/posix.py
+++ b/pyfingerd/posix.py
@@ -9,16 +9,16 @@ This will be done using the `pyutmpx` Python module.
"""
import pwd as _pwd
+import typing as _t
from copy import copy as _copy
from datetime import datetime as _dt
from multiprocessing import Lock as _Lock
from os import stat as _stat
from os.path import exists as _exists, join as _joinpaths
-from typing import Optional as _Optional, Sequence as _Sequence
-from pytz import utc as _utc
+from pytz import utc as _utc # type: ignore
-import pyutmpx as _pyutmpx
+import pyutmpx as _pyutmpx # type: ignore
from .core import (
FingerInterface as _FingerInterface,
@@ -43,8 +43,8 @@ class FingerPOSIXInterface(_FingerInterface):
def search_users(
self,
- query: _Optional[str],
- active: _Optional[bool],
+ query: _t.Optional[str],
+ active: _t.Optional[bool],
) -> _Sequence[_FingerUser]:
"""Look for users on POSIX-compliant systems.
diff --git a/pyfingerd/utils.py b/pyfingerd/utils.py
index c8f364a..5a00ec0 100644
--- a/pyfingerd/utils.py
+++ b/pyfingerd/utils.py
@@ -7,8 +7,8 @@
import logging as _logging
import re as _re
+import typing as _t
from datetime import timedelta as _td
-from typing import Optional as _Optional, Union as _Union
__all__ = [
'access_logger', 'error_logger', 'format_delta', 'logger', 'parse_delta',
@@ -22,12 +22,16 @@ access_logger = _logging.getLogger('pyfingerd.access')
error_logger = _logging.getLogger('pyfingerd.error')
-def parse_delta(raw: str) -> _td:
+def parse_delta(raw: str) -> _t.Optional[_td]:
"""Parse a delta string as found in the configuration files."""
try:
delta = _td()
- sign, elements, _ = __delta0_re.fullmatch(raw).groups()
+ m = __delta0_re.fullmatch(raw)
+ if m is None:
+ raise Exception
+
+ sign, elements, _ = m.groups()
sign = (1, -1)[len(sign)]
for res in __delta1_re.finditer(elements):
num, typ = res.groups()
@@ -48,7 +52,9 @@ def parse_delta(raw: str) -> _td:
return delta
except Exception:
- return None
+ pass
+
+ return None
def format_delta(td: _td) -> str:
@@ -86,14 +92,10 @@ def format_delta(td: _td) -> str:
return d
-def make_delta(
- value: _Optional[_Union[str, int, float, _td]],
- allow_none: bool = False,
-) -> _td:
+def make_delta(value: _t.Optional[_t.Union[str, int, float, _td]]) -> _td:
"""Make a delta from a raw value."""
if value is None:
- if not allow_none:
- raise ValueError('must not be None')
+ raise ValueError('must not be None')
if isinstance(value, _td):
return value
diff --git a/tests/test_server.py b/tests/test_server.py
index ddcb8b5..a9e03ba 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -60,8 +60,8 @@ class TestFingerConnection:
for user in users:
result += (
f'{user.login}|{user.name}|{user.home}|'
- f'{user.shell}|{user.office or ""}|'
- f'{len(user.sessions)}\n'
+ + f'{user.shell}|{user.office or ""}|'
+ + f'{len(user.sessions)}\n'
)
for session in user.sessions:
result += (