diff options
-rwxr-xr-x | pyfingerd/__main__.py | 2 | ||||
-rw-r--r-- | pyfingerd/binds.py | 12 | ||||
-rwxr-xr-x | pyfingerd/cli.py | 2 | ||||
-rwxr-xr-x | pyfingerd/core.py | 361 | ||||
-rwxr-xr-x | pyfingerd/fiction.py | 162 | ||||
-rwxr-xr-x | pyfingerd/native.py | 4 | ||||
-rwxr-xr-x | pyfingerd/posix.py | 10 | ||||
-rw-r--r-- | pyfingerd/utils.py | 22 | ||||
-rw-r--r-- | tests/test_server.py | 4 |
9 files changed, 296 insertions, 283 deletions
diff --git a/pyfingerd/__main__.py b/pyfingerd/__main__.py index b1dcbcc..12bea46 100755 --- a/pyfingerd/__main__.py +++ b/pyfingerd/__main__.py @@ -7,8 +7,6 @@ from .cli import cli as _cli -__all__ = [] - if __name__ == '__main__': _cli() diff --git a/pyfingerd/binds.py b/pyfingerd/binds.py index dd81e58..df88bd0 100644 --- a/pyfingerd/binds.py +++ b/pyfingerd/binds.py @@ -29,11 +29,7 @@ class FingerTCPv4Bind(FingerBind): """IPv4 TCP Address.""" def __init__(self, address: str, port: int): - try: - self._addr = _socket.inet_pton(_socket.AF_INET, address) - except Exception: - self._addr = address - + self._addr = _socket.inet_pton(_socket.AF_INET, address) self._port = port @property @@ -50,11 +46,7 @@ class FingerTCPv6Bind(FingerBind): """IPv6 TCP Address.""" def __init__(self, address: str, port: int): - try: - self._addr = _socket.inet_pton(_socket.AF_INET6, address) - except Exception: - self._addr = address - + self._addr = _socket.inet_pton(_socket.AF_INET6, address) self._port = port @property diff --git a/pyfingerd/cli.py b/pyfingerd/cli.py index 2fdb7a6..d1074b2 100755 --- a/pyfingerd/cli.py +++ b/pyfingerd/cli.py @@ -14,7 +14,7 @@ from sys import stderr as _stderr import click as _click -import coloredlogs as _coloredlogs +import coloredlogs as _coloredlogs # type: ignore from . import __version__ as _version from .core import ( diff --git a/pyfingerd/core.py b/pyfingerd/core.py index 6d30db4..9611294 100755 --- a/pyfingerd/core.py +++ b/pyfingerd/core.py @@ -14,9 +14,9 @@ import copy as _copy import multiprocessing as _multip import signal as _signal import string as _string +import typing as _t from datetime import datetime as _dt, timedelta as _td, tzinfo as _tzinfo from errno import errorcode as _errorcode -from typing import Optional as _Optional, Sequence as _Sequence from croniter import croniter as _croniter @@ -55,10 +55,10 @@ def cron(spec: str): by checking if the attribute exists and starting a dedicated coroutine for running it periodically using the given specification. """ - spec = _croniter(spec) + cron_spec = _croniter(spec) def decorator(func): - func.__cron__ = spec + func.__cron__ = cron_spec return func return decorator @@ -78,13 +78,12 @@ class FingerSession: __slots__ = ('_start', '_line', '_host', '_idle') - def __init__(self, *_, time: _Optional[_dt] = None): - self._start = _dt.now() + def __init__(self, *_, time: _t.Optional[_dt] = None): self._idle = None self._line = None self._host = None - self.start = time + self.start = time or _dt.now() self._idle = self._start def __repr__(self): @@ -105,15 +104,17 @@ class FingerSession: return self._start @start.setter - def start(self, value): - value = value if isinstance(value, _dt) else _dt(value) + def start(self, value: _dt): + if not isinstance(value, _dt): + raise TypeError('Expected a datetime value') + if value.tzinfo is None: value = value.replace(tzinfo=_utc) self._start = value @property - def idle(self) -> _dt: + def idle(self) -> _t.Optional[_dt]: """Get the timestamp since which the user is idle on the session. Note that when set, if no timezone is present, @@ -124,8 +125,10 @@ class FingerSession: return self._idle @idle.setter - def idle(self, value): - value = value if isinstance(value, _dt) else _dt(value) + def idle(self, value: _dt): + if not isinstance(value, _dt): + raise TypeError('Expected a datetime value') + if value.tzinfo is None: value = value.replace(tzinfo=_utc) @@ -135,7 +138,7 @@ class FingerSession: self._idle = value @property - def line(self) -> _Optional[str]: + def line(self) -> _t.Optional[str]: """Get the line on which the user is.""" return self._line @@ -144,7 +147,7 @@ class FingerSession: self._line = None if value is None else str(value) @property - def host(self) -> _Optional[str]: + def host(self) -> _t.Optional[str]: """Get the host from which the user is connected.""" return self._host @@ -153,148 +156,6 @@ class FingerSession: self._host = None if value is None else str(value) -class FingerUser: - """Representation of a user on the system. - - Returned by subclasses of :py:class:`FingerInterface`, - and used by subclasses of :py:class:`FingerFormatter`. - - :param login: The login of the user. - :param name: The display name of the user. - :param home: The path to the home of the user. - :param shell: The path to the user's default shell. - """ - - __slots__ = ( - '_login', '_name', '_home', '_shell', '_office', - '_plan', '_last_login', '_sessions', - ) - - def __init__( - self, *_, - login: _Optional[str] = None, - name: _Optional[str] = None, - home: _Optional[str] = None, - shell: _Optional[str] = None, - ): - self._login = None - self._name = '' - self._home = None - self._shell = None - self._office = None - self._plan = None - self._last_login = None - self._sessions = _FingerSessionManager() - - self.login = login - self.name = name - self.home = home - self.shell = shell - - def __repr__(self): - p = ( - 'login', 'name', 'home', 'shell', 'office', - 'last_login', 'sessions', - ) - p = ( - f'{x}={getattr(self, x)!r}' - for x in p if getattr(self, x) is not None - ) - return f"{self._class__.__name__}({', '.join(p)})" - - @property - def login(self) -> _Optional[str]: - """Get the login name of the user, e.g. 'cake' or 'gaben'.""" - return self._login - - @login.setter - def login(self, value: _Optional[str]) -> None: - self._login = value - - @property - def name(self) -> _Optional[str]: - """Get the display name of the user, e.g. 'Jean Dupont'.""" - return self._name - - @name.setter - def name(self, value: _Optional[str]) -> None: - self._name = value - - @property - def last_login(self) -> _Optional[str]: - """Get the last login date for the user. - - Is None if not known. - """ - return self._last_login - - @last_login.setter - def last_login(self, value: _Optional[str]) -> None: - self._last_login = ( - None if value is None - else value if isinstance(value, _dt) - else _dt(value) - ) - - @property - def home(self) -> _Optional[str]: - """Get the path to the user's home on the given system. - - Is None if not known or defined. - """ - return self._home - - @home.setter - def home(self, value: _Optional[str]) -> None: - self._home = None if value is None else str(value) - - @property - def shell(self) -> _Optional[str]: - """Get the path to the user's shell on the given system. - - Is None if not known or defined. - """ - return self._shell - - @shell.setter - def shell(self, value: _Optional[str]) -> None: - self._shell = None if value is None else str(value) - - @property - def office(self) -> _Optional[str]: - """Get the display name of the user's office. - - Is None if not known or defined. - """ - return self._office - - @office.setter - def office(self, value: _Optional[str]) -> None: - self._office = None if value is None else str(value) - - @property - def plan(self) -> _Optional[str]: - """Get the plan of the user. - - Usually the content of the ``.plan`` file in the user's home - on real (and kind of obsolete) UNIX-like systems. - """ - return self._plan - - @plan.setter - def plan(self, value: _Optional[str]) -> None: - if value is None: - self._plan = None - else: - value = str(value) - self._plan = '\n'.join(value.splitlines()) - - @property - def sessions(self) -> _Sequence[FingerSession]: - """Get the current sessions array for the user, always defined.""" - return self._sessions - - class _FingerSessionManager: """Session manager.""" @@ -330,7 +191,7 @@ class _FingerSessionManager: else: self._sessions.pop(idx) - def __getitem__(self, key): + def __getitem__(self, key: _t.Optional[str]): if key is None: try: return self._sessions[0] @@ -417,6 +278,148 @@ class _FingerSessionManager: self._sessions.insert(0, session) + +class FingerUser: + """Representation of a user on the system. + + Returned by subclasses of :py:class:`FingerInterface`, + and used by subclasses of :py:class:`FingerFormatter`. + + :param login: The login of the user. + :param name: The display name of the user. + :param home: The path to the home of the user. + :param shell: The path to the user's default shell. + """ + + __slots__ = ( + '_login', '_name', '_home', '_shell', '_office', + '_plan', '_last_login', '_sessions', + ) + + def __init__( + self, *_, + login: _t.Optional[str] = None, + name: _t.Optional[str] = None, + home: _t.Optional[str] = None, + shell: _t.Optional[str] = None, + ): + self._login = None + self._name = None + self._home = None + self._shell = None + self._office = None + self._plan = None + self._last_login = None + self._sessions = _FingerSessionManager() + + self.login = login + self.name = name + self.home = home + self.shell = shell + + def __repr__(self): + p = ( + 'login', 'name', 'home', 'shell', 'office', + 'last_login', 'sessions', + ) + p = ( + f'{x}={getattr(self, x)!r}' + for x in p if getattr(self, x) is not None + ) + return f"{self._class__.__name__}({', '.join(p)})" + + @property + def login(self) -> _t.Optional[str]: + """Get the login name of the user, e.g. 'cake' or 'gaben'.""" + return self._login + + @login.setter + def login(self, value: _t.Optional[str]) -> None: + self._login = value + + @property + def name(self) -> _t.Optional[str]: + """Get the display name of the user, e.g. 'Jean Dupont'.""" + return self._name + + @name.setter + def name(self, value: _t.Optional[str]) -> None: + self._name = value + + @property + def last_login(self) -> _t.Optional[_dt]: + """Get the last login date for the user. + + Is None if not known. + """ + return self._last_login + + @last_login.setter + def last_login(self, value: _t.Optional[_dt]) -> None: + if value is not None and not isinstance(value, _dt): + raise TypeError('Expected None or a datetime value') + + self._last_login = value + + @property + def home(self) -> _t.Optional[str]: + """Get the path to the user's home on the given system. + + Is None if not known or defined. + """ + return self._home + + @home.setter + def home(self, value: _t.Optional[str]) -> None: + self._home = None if value is None else str(value) + + @property + def shell(self) -> _t.Optional[str]: + """Get the path to the user's shell on the given system. + + Is None if not known or defined. + """ + return self._shell + + @shell.setter + def shell(self, value: _t.Optional[str]) -> None: + self._shell = None if value is None else str(value) + + @property + def office(self) -> _t.Optional[str]: + """Get the display name of the user's office. + + Is None if not known or defined. + """ + return self._office + + @office.setter + def office(self, value: _t.Optional[str]) -> None: + self._office = None if value is None else str(value) + + @property + def plan(self) -> _t.Optional[str]: + """Get the plan of the user. + + Usually the content of the ``.plan`` file in the user's home + on real (and kind of obsolete) UNIX-like systems. + """ + return self._plan + + @plan.setter + def plan(self, value: _t.Optional[str]) -> None: + if value is None: + self._plan = None + else: + value = str(value) + self._plan = '\n'.join(value.splitlines()) + + @property + def sessions(self) -> _FingerSessionManager: + """Get the current sessions array for the user, always defined.""" + return self._sessions + + # --- # Formatter base class. # --- @@ -439,7 +442,7 @@ class FingerFormatter: :param tzinfo: Timezone used for formatting dates and times. """ - def __init__(self, tzinfo: _Optional[_tzinfo] = None): + def __init__(self, tzinfo: _t.Optional[_tzinfo] = None): if tzinfo is None: tzinfo = _dt.now().astimezone().tzinfo self._tzinfo = tzinfo @@ -539,7 +542,7 @@ class FingerFormatter: self, hostname: str, raw_query: str, - users: _Sequence[FingerUser], + users: _t.Sequence[FingerUser], ) -> str: """Return the formatted answer for a user list in the 'short' format. @@ -609,7 +612,7 @@ class FingerFormatter: self, hostname: str, raw_query: str, - users: _Sequence[FingerUser], + users: _t.Sequence[FingerUser], ) -> str: """Return the formatted answer for a user list in the 'long' format. @@ -626,13 +629,13 @@ class FingerFormatter: for user in users: res += ( - f'Login name: {user.login[:27]:<27} ' - + f'Name: {user.name if user.name else user.login}\r\n' - + f'Directory: {user.home[:28] if user.home else "":<28} ' + f'Login name: {user.login or "":<27.27} ' + + f'Name: {user.name if user.name else user.login or ""}\r\n' + + f'Directory: {user.home if user.home else "":<28.28} ' + f'Shell: {user.shell if user.shell else ""}\r\n' ) if user.office: - res += f"Office: {user.office if user.office else ''}\r\n" + res += f'Office: {user.office if user.office else ""}\r\n' if user.sessions: # List current sessions. @@ -701,7 +704,7 @@ class FingerInterface: def transmit_query( self, - query: _Optional[str], + query: _t.Optional[str], host: str, verbose: bool, ) -> str: @@ -723,9 +726,9 @@ class FingerInterface: def search_users( self, - query: _Optional[str], - active: _Optional[bool], - ) -> _Sequence[FingerUser]: + query: _t.Optional[str], + active: _t.Optional[bool], + ) -> _t.Sequence[FingerUser]: """Search for users on the current host using the given query. :param query: The user query, set to None in case of no @@ -820,6 +823,9 @@ class FingerServer: answers sent to clients. """ + _p: _t.Optional[_multip.Process] = None + """Process running the pyfingerd server.""" + def __init__( self, binds: str = 'localhost:79', @@ -862,9 +868,6 @@ class FingerServer: if not self._binds: raise _NoBindsError() - # Initialize multi-process related. - self._p = None - @property def hostname(self): """Get the hostname configured for this server.""" @@ -991,14 +994,14 @@ class FingerServer: if name == 'EADDRINUSE': _logger.error( f'Could not bind to [{host}]:{port}: ' - 'address already in use.', + + 'address already in use.', ) return elif name == 'EACCES': _logger.error( f'Could not bind to [{host}]:{port}: ' - f'port {port} privileged port and process ' - 'is unprivileged.', + + f'port {port} privileged port and process ' + + 'is unprivileged.', ) else: raise @@ -1086,16 +1089,18 @@ class FingerServer: if self._p is not None and self._p.is_alive(): return - self._p = _multip.Process(target=self._serve) - self._p.start() + p = _multip.Process(target=self._serve) + p.start() + self._p = p def stop(self) -> None: """Stop all underlying server processes and unbind all ports.""" - if self._p is None or not self._p.is_alive(): + p = self._p + if p is None or not p.is_alive(): return - self._p.kill() - self._p.join() + p.kill() + p.join() self._p = None def serve_forever(self) -> None: diff --git a/pyfingerd/fiction.py b/pyfingerd/fiction.py index db4aff2..9632bb0 100755 --- a/pyfingerd/fiction.py +++ b/pyfingerd/fiction.py @@ -12,12 +12,10 @@ import copy as _copy import logging as _logging import math as _math import os.path as _path +import typing as _t from collections import defaultdict as _defaultdict from datetime import datetime as _dt, timedelta as _td from enum import Enum as _Enum -from typing import ( - Optional as _Optional, Sequence as _Sequence, Union as _Union, -) from .core import ( FingerInterface as _FingerInterface, @@ -157,12 +155,12 @@ class FictionalFingerSession(_FingerSession): # return Unchanged, like for None and NoneType. -class _UnchangedType: +class UnchangedType: def __repr__(self): return 'Unchanged' -Unchanged = _UnchangedType() +Unchanged = UnchangedType() # --- # Action description. @@ -191,11 +189,11 @@ class FingerUserCreationAction(FingerAction): def __init__( self, login: str, - name: _Optional[str] = None, - home: _Optional[str] = None, - shell: _Optional[str] = None, - office: _Optional[str] = None, - plan: _Optional[str] = None, + name: _t.Optional[str] = None, + home: _t.Optional[str] = None, + shell: _t.Optional[str] = None, + office: _t.Optional[str] = None, + plan: _t.Optional[str] = None, ): super().__init__() self._login = login @@ -218,27 +216,27 @@ class FingerUserCreationAction(FingerAction): return self._login @property - def name(self) -> _Optional[str]: + def name(self) -> _t.Optional[str]: """Get the initial value for :py:attr:`FictionalFingerUser.name`.""" return self._name @property - def home(self) -> _Optional[str]: + def home(self) -> _t.Optional[str]: """Get the initial value for :py:attr:`FictionalFingerUser.home`.""" return self._home @property - def shell(self) -> _Optional[str]: + def shell(self) -> _t.Optional[str]: """Get the initial value for :py:attr:`FictionalFingerUser.shell`.""" return self._shell @property - def office(self) -> _Optional[str]: + def office(self) -> _t.Optional[str]: """Get the initial value for :py:attr:`FictionalFingerUser.office`.""" return self._office @property - def plan(self) -> _Optional[str]: + def plan(self) -> _t.Optional[str]: """Get the initial value for :py:attr:`FictionalFingerUser.plan`.""" return self._plan @@ -262,11 +260,11 @@ class FingerUserEditionAction(FingerAction): def __init__( self, login: str, - name: _Optional[_Union[str, _UnchangedType]] = Unchanged, - home: _Optional[_Union[str, _UnchangedType]] = Unchanged, - shell: _Optional[_Union[str, _UnchangedType]] = Unchanged, - office: _Optional[_Union[str, _UnchangedType]] = Unchanged, - plan: _Optional[_Union[str, _UnchangedType]] = Unchanged, + name: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged, + home: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged, + shell: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged, + office: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged, + plan: _t.Optional[_t.Union[str, UnchangedType]] = Unchanged, ): super().__init__() self._login = login @@ -289,7 +287,7 @@ class FingerUserEditionAction(FingerAction): return self._login @property - def name(self) -> _Optional[_Union[str, _UnchangedType]]: + def name(self) -> _t.Optional[_t.Union[str, UnchangedType]]: """Get the new value for :py:attr:`FictionalFingerUser.name`. Is :py:data:`Unchanged` if the property is unchanged. @@ -297,7 +295,7 @@ class FingerUserEditionAction(FingerAction): return self._name @property - def home(self) -> _Optional[_Union[str, _UnchangedType]]: + def home(self) -> _t.Optional[_t.Union[str, UnchangedType]]: """Get the new value for :py:attr:`FictionalFingerUser.home`. Is :py:data:`Unchanged` if the property is unchanged. @@ -305,7 +303,7 @@ class FingerUserEditionAction(FingerAction): return self._home @property - def shell(self) -> _Optional[_Union[str, _UnchangedType]]: + def shell(self) -> _t.Optional[_t.Union[str, UnchangedType]]: """Get the new value for :py:attr:`FictionalFingerUser.shell`. Is :py:data:`Unchanged` if the property is unchanged. @@ -313,7 +311,7 @@ class FingerUserEditionAction(FingerAction): return self._shell @property - def office(self) -> _Optional[_Union[str, _UnchangedType]]: + def office(self) -> _t.Optional[_t.Union[str, UnchangedType]]: """Get the new value for :py:attr:`FictionalFingerUser.office`. Is :py:data:`Unchanged` if the property is unchanged. @@ -321,7 +319,7 @@ class FingerUserEditionAction(FingerAction): return self._office @property - def plan(self) -> _Optional[_Union[str, _UnchangedType]]: + def plan(self) -> _t.Optional[_t.Union[str, UnchangedType]]: """Get the new value for :py:attr:`FictionalFingerUser.plan`. Is :py:data:`Unchanged` if the property is unchanged. @@ -361,9 +359,9 @@ class FingerUserLoginAction(FingerAction): def __init__( self, login: str, - session_name: _Optional[str] = None, - line: _Optional[str] = None, - host: _Optional[str] = None, + session_name: _t.Optional[str] = None, + line: _t.Optional[str] = None, + host: _t.Optional[str] = None, ): super().__init__() self._login = login @@ -384,17 +382,17 @@ class FingerUserLoginAction(FingerAction): return self._login @property - def session_name(self) -> _Optional[str]: + def session_name(self) -> _t.Optional[str]: """Get the name of the session to create.""" return self._session @property - def line(self) -> _Optional[str]: + def line(self) -> _t.Optional[str]: """Get the name of the line from which the user has logged in.""" return self._line @property - def host(self) -> _Optional[str]: + def host(self) -> _t.Optional[str]: """Get the name of the host from which the user has logged in.""" return self._host @@ -412,8 +410,8 @@ class FingerUserSessionChangeAction(FingerAction): def __init__( self, login: str, - session_name: _Optional[str] = None, - idle: _Union[bool, _UnchangedType] = Unchanged, + session_name: _t.Optional[str] = None, + idle: _t.Union[bool, UnchangedType] = Unchanged, ): super().__init__() self._login = login @@ -433,12 +431,12 @@ class FingerUserSessionChangeAction(FingerAction): return self._login @property - def session_name(self) -> _Optional[str]: + def session_name(self) -> _t.Optional[str]: """Get the name of the session to edit.""" return self._session @property - def idle(self) -> _Union[bool, _UnchangedType]: + def idle(self) -> _t.Union[bool, UnchangedType]: """Get the new value for :py:attr:`FictionalFingerSession.is_idle`. Is :py:data:`Unchanged` if the property is unchanged. @@ -456,7 +454,7 @@ class FingerUserLogoutAction(FingerAction): def __init__( self, login: str, - session_name: _Optional[str] = None, + session_name: _t.Optional[str] = None, ): super().__init__() self._login = login @@ -475,7 +473,7 @@ class FingerUserLogoutAction(FingerAction): return self._login @property - def session_name(self) -> _Optional[str]: + def session_name(self) -> _t.Optional[str]: """Get the name of the session to delete.""" return self._session @@ -516,9 +514,9 @@ class FingerFictionInterface(_FingerInterface): def search_users( self, - query: _Optional[str], - active: _Optional[bool], - ) -> _Sequence[FictionalFingerUser]: + query: _t.Optional[str], + active: _t.Optional[bool], + ) -> _t.Sequence[FictionalFingerUser]: """Look for users according to a check.""" return ([ _copy.deepcopy(user) for user in self._users.values() @@ -539,7 +537,7 @@ class FingerFictionInterface(_FingerInterface): self._users = {} self._lasttime = None - def apply(self, action, time: _Optional[_dt] = None): + def apply(self, action, time: _t.Optional[_dt] = None): """Apply an action to the scene. By default, the time of the action is the current time. @@ -576,15 +574,15 @@ class FingerFictionInterface(_FingerInterface): ) user = self._users[action.login] - if action.name is not Unchanged: + if not isinstance(action.name, UnchangedType): user.name = action.name - if action.shell is not Unchanged: + if not isinstance(action.shell, UnchangedType): user.shell = action.shell - if action.home is not Unchanged: + if not isinstance(action.home, UnchangedType): user.home = action.home - if action.office is not Unchanged: + if not isinstance(action.office, UnchangedType): user.office = action.office - if action.plan is not Unchanged: + if not isinstance(action.plan, UnchangedType): user.plan = action.plan elif isinstance(action, FingerUserDeletionAction): # Delete user with login `action.login`. @@ -639,8 +637,8 @@ class FingerFictionInterface(_FingerInterface): del user.sessions[action.session_name] except (KeyError, IndexError): raise ValueError( - f'Got no session {action.name!r} ' - f'for user {action.login!r}', + f'Got no session {action.session_name!r} ' + + f'for user {action.login!r}', ) from None elif isinstance(action, FingerUserSessionChangeAction): # Make user with login `action.login` idle. @@ -658,8 +656,8 @@ class FingerFictionInterface(_FingerInterface): session = user.sessions[action.session_name] except (KeyError, IndexError): raise ValueError( - f'Got no session {action.name!r} ' - f'for user {action.login!r}', + f'Got no session {action.session_name!r} ' + + f'for user {action.login!r}', ) from None since = time @@ -729,15 +727,17 @@ class FingerScenario: value = self.EndingType(value) except ValueError: try: - if isinstance(value, str): - value = value.casefold() - value = { - None: None, - 'interrupt': self.EndingType.FREEZE, - 'freeze': self.EndingType.FREEZE, - 'stop': self.EndingType.STOP, - 'repeat': self.EndingType.REPEAT, - }[value] + if value is None: + pass + elif isinstance(value, str): + value = { + 'interrupt': self.EndingType.FREEZE, + 'freeze': self.EndingType.FREEZE, + 'stop': self.EndingType.STOP, + 'repeat': self.EndingType.REPEAT, + }[value.casefold()] + else: + raise KeyError(value) except KeyError: raise TypeError( f'Invalid value for ending type: {value!r}', @@ -746,7 +746,7 @@ class FingerScenario: self._end_type = value @property - def duration(self) -> _Optional[_td]: + def duration(self) -> _t.Optional[_td]: """Offset of the ending. When the offset is reached, any object following @@ -756,8 +756,12 @@ class FingerScenario: return self._end_time @duration.setter - def duration(self, value: _Optional[_Union[_td, str]]) -> None: - self._end_time = _make_delta(value, allow_none=True) + def duration(self, value: _t.Optional[_t.Union[_td, str]]) -> None: + if value is None: + self._end_time = None + return + + self._end_time = _make_delta(value) def verify(self) -> None: """Verify that the current scenario is valid. @@ -783,8 +787,13 @@ class FingerScenario: raise ValueError('Ending type has not been provided') # Check if the events are coherent. - users = _defaultdict(lambda: False) - sessions = _defaultdict(lambda: _defaultdict(lambda: 0)) + # TODO: Had to make the key an optional str instead of an str here + # since the action.login field might be optional? + # Not sure how safe this is tho. + users: _t.Dict[_t.Optional[str], bool] = _defaultdict(lambda: False) + sessions: _t.Dict[_t.Optional[str], _t.Dict[_t.Optional[str], int]] = ( + _defaultdict(lambda: _defaultdict(lambda: 0)) + ) for time, action, i in self._actions: try: @@ -805,14 +814,14 @@ class FingerScenario: # The user we're trying to edit doesn't exist. raise ValueError( 'Trying to edit user ' - f"{action.login!r} while it doesn't exist", + + f"{action.login!r} while it doesn't exist", ) elif isinstance(action, FingerUserDeletionAction): if action.login not in users: # The user we're trying to delete doesn't exist. raise ValueError( 'Trying to delete user ' - f"{action.login!r} while it doesn't exist", + + f"{action.login!r} while it doesn't exist", ) del users[action.login] @@ -825,7 +834,7 @@ class FingerScenario: # The user we're trying to log in as doesn't exist. raise ValueError( 'Trying to log in as user ' - f"{action.login!r} which doesn't exist", + + f"{action.login!r} which doesn't exist", ) sessions[action.login][action.session_name] += 1 @@ -924,7 +933,10 @@ class FingerScenario: end_time = time continue - elif typ == 'create': + + action: FingerAction + plan: _t.Union[UnchangedType, None, str] + if typ == 'create': # User creation. plan = None if 'plan' in data: @@ -1029,9 +1041,9 @@ class FingerScenario: def get( self, - to: _Optional[_td] = None, - since: _Optional[_td] = None, - ) -> _Sequence[FingerAction]: + to: _t.Optional[_td] = None, + since: _t.Optional[_td] = None, + ) -> _t.Iterator[_t.Tuple[_dt, FingerAction]]: """Return a sequence of actions in order from the scenario. :param to: Maximum timedelta for the actions to gather. @@ -1053,7 +1065,7 @@ class FingerScenario: continue yield time, action - def add(self, action: FingerAction, time: _Union[_td, str]): + def add(self, action: FingerAction, time: _t.Union[_td, str]): """Add an action at the given time to the registered actions.""" time = _make_delta(time) @@ -1081,7 +1093,7 @@ class FingerScenarioInterface(FingerFictionInterface): def __init__( self, scenario: FingerScenario, - start: _Optional[_dt] = None, + start: _t.Optional[_dt] = None, ): if start is None: start = _dt.now() @@ -1094,7 +1106,7 @@ class FingerScenarioInterface(FingerFictionInterface): if not isinstance(scenario, FingerScenario): raise TypeError( 'Scenario should be a FingerScenario, ' - f'is {scenario.__class__.__name__}.', + + f'is {scenario.__class__.__name__}.', ) scenario.verify() @@ -1169,7 +1181,7 @@ class FingerScenarioInterface(FingerFictionInterface): if actions: _logger.debug( f'Applying {len(actions)} ' - f'action{("", "s")[len(actions) >= 2]}:', + + f'action{("", "s")[len(actions) >= 2]}:', ) for time, action in actions: _logger.debug(f' at {_format_delta(time)}: {action!r}') diff --git a/pyfingerd/native.py b/pyfingerd/native.py index dfa03e4..70592ff 100755 --- a/pyfingerd/native.py +++ b/pyfingerd/native.py @@ -5,10 +5,14 @@ # ***************************************************************************** """Defining the native interface.""" +import typing as _t + from .core import FingerInterface as _FingerInterface __all__ = ['FingerNativeInterface'] +FingerNativeInterface: _t.Type[_FingerInterface] + class _FingerNoNativeFoundInterface(_FingerInterface): """Placeholder that doesn't initiate.""" diff --git a/pyfingerd/posix.py b/pyfingerd/posix.py index 98ccb2a..88629b5 100755 --- a/pyfingerd/posix.py +++ b/pyfingerd/posix.py @@ -9,16 +9,16 @@ This will be done using the `pyutmpx` Python module. """ import pwd as _pwd +import typing as _t from copy import copy as _copy from datetime import datetime as _dt from multiprocessing import Lock as _Lock from os import stat as _stat from os.path import exists as _exists, join as _joinpaths -from typing import Optional as _Optional, Sequence as _Sequence -from pytz import utc as _utc +from pytz import utc as _utc # type: ignore -import pyutmpx as _pyutmpx +import pyutmpx as _pyutmpx # type: ignore from .core import ( FingerInterface as _FingerInterface, @@ -43,8 +43,8 @@ class FingerPOSIXInterface(_FingerInterface): def search_users( self, - query: _Optional[str], - active: _Optional[bool], + query: _t.Optional[str], + active: _t.Optional[bool], ) -> _Sequence[_FingerUser]: """Look for users on POSIX-compliant systems. diff --git a/pyfingerd/utils.py b/pyfingerd/utils.py index c8f364a..5a00ec0 100644 --- a/pyfingerd/utils.py +++ b/pyfingerd/utils.py @@ -7,8 +7,8 @@ import logging as _logging import re as _re +import typing as _t from datetime import timedelta as _td -from typing import Optional as _Optional, Union as _Union __all__ = [ 'access_logger', 'error_logger', 'format_delta', 'logger', 'parse_delta', @@ -22,12 +22,16 @@ access_logger = _logging.getLogger('pyfingerd.access') error_logger = _logging.getLogger('pyfingerd.error') -def parse_delta(raw: str) -> _td: +def parse_delta(raw: str) -> _t.Optional[_td]: """Parse a delta string as found in the configuration files.""" try: delta = _td() - sign, elements, _ = __delta0_re.fullmatch(raw).groups() + m = __delta0_re.fullmatch(raw) + if m is None: + raise Exception + + sign, elements, _ = m.groups() sign = (1, -1)[len(sign)] for res in __delta1_re.finditer(elements): num, typ = res.groups() @@ -48,7 +52,9 @@ def parse_delta(raw: str) -> _td: return delta except Exception: - return None + pass + + return None def format_delta(td: _td) -> str: @@ -86,14 +92,10 @@ def format_delta(td: _td) -> str: return d -def make_delta( - value: _Optional[_Union[str, int, float, _td]], - allow_none: bool = False, -) -> _td: +def make_delta(value: _t.Optional[_t.Union[str, int, float, _td]]) -> _td: """Make a delta from a raw value.""" if value is None: - if not allow_none: - raise ValueError('must not be None') + raise ValueError('must not be None') if isinstance(value, _td): return value diff --git a/tests/test_server.py b/tests/test_server.py index ddcb8b5..a9e03ba 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -60,8 +60,8 @@ class TestFingerConnection: for user in users: result += ( f'{user.login}|{user.name}|{user.home}|' - f'{user.shell}|{user.office or ""}|' - f'{len(user.sessions)}\n' + + f'{user.shell}|{user.office or ""}|' + + f'{len(user.sessions)}\n' ) for session in user.sessions: result += ( |