diff options
Diffstat (limited to 'pyfingerd/fiction.py')
-rwxr-xr-x | pyfingerd/fiction.py | 540 |
1 files changed, 243 insertions, 297 deletions
diff --git a/pyfingerd/fiction.py b/pyfingerd/fiction.py index 1dc61b8..db4aff2 100755 --- a/pyfingerd/fiction.py +++ b/pyfingerd/fiction.py @@ -3,16 +3,15 @@ # Copyright (C) 2017-2022 Thomas Touhey <thomas@touhey.fr> # This file is part of the pyfingerd project, which is MIT-licensed. # ***************************************************************************** -""" Definitions for the finger server fiction interface. +"""Definitions for the finger server fiction interface. - This file contains everything to decode and use the actions file. +This file contains everything to decode and use the actions file. """ import copy as _copy import logging as _logging import math as _math import os.path as _path - from collections import defaultdict as _defaultdict from datetime import datetime as _dt, timedelta as _td from enum import Enum as _Enum @@ -50,29 +49,29 @@ _toml = None class FictionalFingerUser(_FingerUser): - """ Representation of a user on the fictional system. + """Representation of a user on the fictional system. - Behaves like a :py:class:`pyfingerd.core.FingerUser`. - For now, there are no modifications from the base class. + Behaves like a :py:class:`pyfingerd.core.FingerUser`. + For now, there are no modifications from the base class. """ pass class FictionalFingerSession(_FingerSession): - """ Representation of an active session for a given user. - - Behaves like a :py:class:`pyfingerd.core.FingerSession`. - The two main modifications from the base class are the following: - - * Each session has a name, which identifies it. It can be used - to designate it, allowing outside code to edit and delete the - session specifically from an outside point of view. - * Since the actions only allow for setting the user idle or not, - when the user is active, the idle timestamp is simulated to make - it seem like the user makes signs of life at an irregular but - reasonable rate, like when the user often stops typing to think - or do a task outside of the computer. + """Representation of an active session for a given user. + + Behaves like a :py:class:`pyfingerd.core.FingerSession`. + The two main modifications from the base class are the following: + + * Each session has a name, which identifies it. It can be used + to designate it, allowing outside code to edit and delete the + session specifically from an outside point of view. + * Since the actions only allow for setting the user idle or not, + when the user is active, the idle timestamp is simulated to make + it seem like the user makes signs of life at an irregular but + reasonable rate, like when the user often stops typing to think + or do a task outside of the computer. """ __slots__ = ('_name', '_is_idle', '_idle_last') @@ -94,8 +93,7 @@ class FictionalFingerSession(_FingerSession): @property def name(self): - """ Session name. """ - + """Session name.""" return self._name @name.setter @@ -104,15 +102,13 @@ class FictionalFingerSession(_FingerSession): @property def idle(self): - """ Idle time (simulated). """ - + """Idle time (simulated).""" if self._is_idle: return self._idle_last now = _dt.now().astimezone() # Generate a number of seconds and return it. - def s(x): return _math.sin(x * (_math.pi / 2)) @@ -124,15 +120,14 @@ class FictionalFingerSession(_FingerSession): @idle.setter def idle(self, value): # Does nothing as we manage this time. - pass @property def idle_since(self): - """ Idle since the given time. """ - + """Idle since the given time.""" if not self._is_idle: - return None + return + return self._idle_last @idle_since.setter @@ -142,10 +137,10 @@ class FictionalFingerSession(_FingerSession): @property def active_since(self): - """ Active since the given time. """ - + """Active since the given time.""" if self._is_idle: - return None + return + return self._idle_last @active_since.setter @@ -175,22 +170,22 @@ Unchanged = _UnchangedType() class FingerAction: - """ Base class for actions in a fiction. """ + """Base class for actions in a fiction.""" pass class FingerUserCreationAction(FingerAction): - """ A user has been created. - - :param login: The login of the user that is being created. - :param name: The initial value for :py:attr:`FictionalFingerUser.name`. - :param home: The initial value for :py:attr:`FictionalFingerUser.home`. - :param shell: The initial value for - :py:attr:`FictionalFingerUser.shell`. - :param office: The initial value for - :py:attr:`FictionalFingerUser.office`. - :param plan: The initial value for :py:attr:`FictionalFingerUser.plan`. + """A user has been created. + + :param login: The login of the user that is being created. + :param name: The initial value for :py:attr:`FictionalFingerUser.name`. + :param home: The initial value for :py:attr:`FictionalFingerUser.home`. + :param shell: The initial value for + :py:attr:`FictionalFingerUser.shell`. + :param office: The initial value for + :py:attr:`FictionalFingerUser.office`. + :param plan: The initial value for :py:attr:`FictionalFingerUser.plan`. """ def __init__( @@ -213,60 +208,55 @@ class FingerUserCreationAction(FingerAction): def __repr__(self): p = ( f'{x}={getattr(self, x)!r}' - for x in ('login', 'name', 'home', 'shell', 'office', 'plan')) + for x in ('login', 'name', 'home', 'shell', 'office', 'plan') + ) return f"{self.__class__.__name__}({', '.join(p)})" @property def login(self) -> str: - """ The login of the user that is being created. """ - + """Get the login of the user that is being created.""" return self._login @property def name(self) -> _Optional[str]: - """ The initial value for :py:attr:`FictionalFingerUser.name`. """ - + """Get the initial value for :py:attr:`FictionalFingerUser.name`.""" return self._name @property def home(self) -> _Optional[str]: - """ The initial value for :py:attr:`FictionalFingerUser.home`. """ - + """Get the initial value for :py:attr:`FictionalFingerUser.home`.""" return self._home @property def shell(self) -> _Optional[str]: - """ The initial value for :py:attr:`FictionalFingerUser.shell`. """ - + """Get the initial value for :py:attr:`FictionalFingerUser.shell`.""" return self._shell @property def office(self) -> _Optional[str]: - """ The initial value for :py:attr:`FictionalFingerUser.office`. """ - + """Get the initial value for :py:attr:`FictionalFingerUser.office`.""" return self._office @property def plan(self) -> _Optional[str]: - """ The initial value for :py:attr:`FictionalFingerUser.plan`. """ - + """Get the initial value for :py:attr:`FictionalFingerUser.plan`.""" return self._plan class FingerUserEditionAction(FingerAction): - """ A user has been edited. - - :param login: The login of the user that is being edited. - :param name: The new value for :py:attr:`FictionalFingerUser.name`; - :py:data:`Unchanged` if the property is unchanged. - :param home: The new value for :py:attr:`FictionalFingerUser.home`; - :py:data:`Unchanged` if the property is unchanged. - :param shell: The new value for :py:attr:`FictionalFingerUser.shell`; - :py:data:`Unchanged` if the property is unchanged. - :param office: The new value for :py:attr:`FictionalFingerUser.office`; - :py:data:`Unchanged` if the property is unchanged. - :param plan: The new value for :py:attr:`FictionalFingerUser.plan`; - :py:data:`Unchanged` if the property is unchanged. + """A user has been edited. + + :param login: The login of the user that is being edited. + :param name: The new value for :py:attr:`FictionalFingerUser.name`; + :py:data:`Unchanged` if the property is unchanged. + :param home: The new value for :py:attr:`FictionalFingerUser.home`; + :py:data:`Unchanged` if the property is unchanged. + :param shell: The new value for :py:attr:`FictionalFingerUser.shell`; + :py:data:`Unchanged` if the property is unchanged. + :param office: The new value for :py:attr:`FictionalFingerUser.office`; + :py:data:`Unchanged` if the property is unchanged. + :param plan: The new value for :py:attr:`FictionalFingerUser.plan`; + :py:data:`Unchanged` if the property is unchanged. """ def __init__( @@ -289,65 +279,60 @@ class FingerUserEditionAction(FingerAction): def __repr__(self): p = ( f'{x}={getattr(self, x)!r}' - for x in ('login', 'name', 'home', 'shell', 'office', 'plan')) + for x in ('login', 'name', 'home', 'shell', 'office', 'plan') + ) return f"{self.__class__.__name__}({', '.join(p)})" @property def login(self) -> str: - """ The login of the user that is being edited. """ - + """Get the login of the user that is being edited.""" return self._login @property def name(self) -> _Optional[_Union[str, _UnchangedType]]: - """ The new value for :py:attr:`FictionalFingerUser.name`. + """Get the new value for :py:attr:`FictionalFingerUser.name`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._name @property def home(self) -> _Optional[_Union[str, _UnchangedType]]: - """ The new value for :py:attr:`FictionalFingerUser.home`. + """Get the new value for :py:attr:`FictionalFingerUser.home`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._home @property def shell(self) -> _Optional[_Union[str, _UnchangedType]]: - """ The new value for :py:attr:`FictionalFingerUser.shell`. + """Get the new value for :py:attr:`FictionalFingerUser.shell`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._shell @property def office(self) -> _Optional[_Union[str, _UnchangedType]]: - """ The new value for :py:attr:`FictionalFingerUser.office`. + """Get the new value for :py:attr:`FictionalFingerUser.office`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._office @property def plan(self) -> _Optional[_Union[str, _UnchangedType]]: - """ The new value for :py:attr:`FictionalFingerUser.plan`. + """Get the new value for :py:attr:`FictionalFingerUser.plan`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._plan class FingerUserDeletionAction(FingerAction): - """ A user has been deleted. + """A user has been deleted. - :param login: The login of the user to delete. + :param login: The login of the user to delete. """ def __init__(self, login: str): @@ -360,18 +345,17 @@ class FingerUserDeletionAction(FingerAction): @property def login(self) -> str: - """ The user's login. """ - + """Get the user's login.""" return self._login class FingerUserLoginAction(FingerAction): - """ A user has logged in. + """A user has logged in. - :param login: The login of the user to edit. - :param session_name: The name of the session to create. - :param line: The new value for :py:attr:`FictionalFingerSession.line`. - :param host: The new value for :py:attr:`FictionalFingerSession.host`. + :param login: The login of the user to edit. + :param session_name: The name of the session to create. + :param line: The new value for :py:attr:`FictionalFingerSession.line`. + :param host: The new value for :py:attr:`FictionalFingerSession.host`. """ def __init__( @@ -396,37 +380,33 @@ class FingerUserLoginAction(FingerAction): @property def login(self) -> str: - """ The login of the user to edit. """ - + """Get the login of the user to edit.""" return self._login @property def session_name(self) -> _Optional[str]: - """ The name of the session to create. """ - + """Get the name of the session to create.""" return self._session @property def line(self) -> _Optional[str]: - """ The name of the line from which the user has logged in. """ - + """Get the name of the line from which the user has logged in.""" return self._line @property def host(self) -> _Optional[str]: - """ The name of the host from which the user has logged in. """ - + """Get the name of the host from which the user has logged in.""" return self._host class FingerUserSessionChangeAction(FingerAction): - """ A user session has undergone modifications. + """A user session has undergone modifications. - :param login: The login of the user to edit. - :param session_name: The name of the session to edit. - :param is_idle: The new value for - :py:attr:`FictionalFingerSession.is_idle`; - :py:data:`Unchanged` if the property is unchanged. + :param login: The login of the user to edit. + :param session_name: The name of the session to edit. + :param is_idle: The new value for + :py:attr:`FictionalFingerSession.is_idle`; + :py:data:`Unchanged` if the property is unchanged. """ def __init__( @@ -449,31 +429,28 @@ class FingerUserSessionChangeAction(FingerAction): @property def login(self) -> str: - """ The login of the user to edit. """ - + """Get the login of the user to edit.""" return self._login @property def session_name(self) -> _Optional[str]: - """ The name of the session to edit. """ - + """Get the name of the session to edit.""" return self._session @property def idle(self) -> _Union[bool, _UnchangedType]: - """ The new value for :py:attr:`FictionalFingerSession.is_idle`. + """Get the new value for :py:attr:`FictionalFingerSession.is_idle`. - Is :py:data:`Unchanged` if the property is unchanged. + Is :py:data:`Unchanged` if the property is unchanged. """ - return self._idle class FingerUserLogoutAction(FingerAction): - """ A user has logged out. + """A user has logged out. - :param login: The login of the user to edit. - :param session_name: The name of the session to delete. + :param login: The login of the user to edit. + :param session_name: The name of the session to delete. """ def __init__( @@ -494,14 +471,12 @@ class FingerUserLogoutAction(FingerAction): @property def login(self) -> str: - """ The login of the user to edit. """ - + """Get the login of the user to edit.""" return self._login @property def session_name(self) -> _Optional[str]: - """ The name of the session to delete. """ - + """Get the name of the session to delete.""" return self._session @@ -511,18 +486,18 @@ class FingerUserLogoutAction(FingerAction): class FingerFictionInterface(_FingerInterface): - """ Base finger fiction interface for managing a scene. - - The basic state for this class is to have no users; it is possible - at any point in time to apply actions that will add, remove or - modify users and sessions, using - :py:meth:`FingerFictionInterface.apply`. - - This class should be subclassed for interfaces specialized in various - sources for the data; for example, while - :py:class:`FingerScenarioInterface` is specialized in using a - static sequence of actions, another class could read events from - a live source. + """Base finger fiction interface for managing a scene. + + The basic state for this class is to have no users; it is possible + at any point in time to apply actions that will add, remove or + modify users and sessions, using + :py:meth:`FingerFictionInterface.apply`. + + This class should be subclassed for interfaces specialized in various + sources for the data; for example, while + :py:class:`FingerScenarioInterface` is specialized in using a + static sequence of actions, another class could read events from + a live source. """ def __init__(self): @@ -532,7 +507,6 @@ class FingerFictionInterface(_FingerInterface): # - `users`: the users. # - `lasttime`: the last datetime, in order to raise an # exception if not applied in order. - self._users = {} self._lasttime = None @@ -545,48 +519,45 @@ class FingerFictionInterface(_FingerInterface): query: _Optional[str], active: _Optional[bool], ) -> _Sequence[FictionalFingerUser]: - """ Look for users according to a check. """ - + """Look for users according to a check.""" return ([ _copy.deepcopy(user) for user in self._users.values() if not (query is not None and query not in user.login) - and not (active is not None and active != bool(user.sessions))]) + and not (active is not None and active != bool(user.sessions)) + ]) # --- # Elements proper to the fiction interface. # --- def reset(self): - """ Reset the interface, i.e. revert all actions. + """Reset the interface, i.e. revert all actions. - This method makes the interface return to the original - state with no users and sessions. + This method makes the interface return to the original + state with no users and sessions. """ - self._users = {} self._lasttime = None def apply(self, action, time: _Optional[_dt] = None): - """ Apply an action to the scene. + """Apply an action to the scene. - By default, the time of the action is the current time. + By default, the time of the action is the current time. """ - if time is None: time = _dt.now().astimezone() if self._lasttime is not None and self._lasttime > time: - raise ValueError("operations weren't applied in order!") + raise ValueError("Operations weren't applied in order!") self._lasttime = time if isinstance(action, FingerUserCreationAction): # Create user `action.user`. - if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') if action.login in self._users: - raise ValueError('already got a user with that login') + raise ValueError('Already got a user with that login') user = FictionalFingerUser(login=action.login, name=action.name) user.shell = action.shell @@ -597,12 +568,11 @@ class FingerFictionInterface(_FingerInterface): self._users[user.login] = user elif isinstance(action, FingerUserEditionAction): # Edit user `action.user` with the given modifications. - if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') if action.login not in self._users: raise ValueError( - f'got no user with login {action.login!r}', + f'Got no user with login {action.login!r}', ) user = self._users[action.login] @@ -618,18 +588,16 @@ class FingerFictionInterface(_FingerInterface): user.plan = action.plan elif isinstance(action, FingerUserDeletionAction): # Delete user with login `action.login`. - if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') if action.login not in self._users: raise ValueError( - f'got no user with login {action.login!r}', + f'Got no user with login {action.login!r}', ) del self._users[action.login] elif isinstance(action, FingerUserLoginAction): # Login as user `action.login` with session `action.session_name`. - session = FictionalFingerSession( time=time, name=action.session_name, @@ -639,61 +607,60 @@ class FingerFictionInterface(_FingerInterface): session.host = action.host if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') try: user = self._users[action.login] except KeyError: raise ValueError( - f'got no user with login {action.login!r}', + f'Got no user with login {action.login!r}', ) from None # We don't check if the session exists or not; multiple # sessions can have the same name, we just act on the last # inserted one that still exists and has that name. - user.sessions.add(session) if user.last_login is None or user.last_login < session.start: user.last_login = session.start elif isinstance(action, FingerUserLogoutAction): - # Logout as user `action.login` from session `action.session_name`. - + # Logout as user `action.login` from + # session `action.session_name`. if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') try: user = self._users[action.login] except KeyError: raise ValueError( - f'got no user with login {action.login!r}', + f'Got no user with login {action.login!r}', ) from None try: del user.sessions[action.session_name] except (KeyError, IndexError): raise ValueError( - f'got no session {action.name!r} ' - f'for user {action.login!r}') from None + f'Got no session {action.name!r} ' + f'for user {action.login!r}', + ) from None elif isinstance(action, FingerUserSessionChangeAction): # Make user with login `action.login` idle. - if action.login is None: - raise ValueError('missing login') + raise ValueError('Missing login') try: user = self._users[action.login] except KeyError: raise ValueError( - 'got no user with login ' - f'{action.login!r}', + f'Got no user with login {action.login!r}', ) from None try: session = user.sessions[action.session_name] except (KeyError, IndexError): raise ValueError( - f'got no session {action.name!r} ' - f'for user {action.login!r}') from None + f'Got no session {action.name!r} ' + f'for user {action.login!r}', + ) from None since = time if action.idle is Unchanged: @@ -710,32 +677,32 @@ class FingerFictionInterface(_FingerInterface): class FingerScenario: - """ Scenario representation for the fictional interface. + """Scenario representation for the fictional interface. - Consists of actions (as instances of subclasses of - :py:class:`FingerAction`) located at a given timedelta, with - a given ending type and time. + Consists of actions (as instances of subclasses of + :py:class:`FingerAction`) located at a given timedelta, with + a given ending type and time. - A scenario always uses timedeltas and not datetimes, since it can - start at any arbitrary point in time and some scenarios are even - on repeat. + A scenario always uses timedeltas and not datetimes, since it can + start at any arbitrary point in time and some scenarios are even + on repeat. """ class EndingType(_Enum): - """ Ending type, i.e. what happens when the scenario comes to an end. + """Ending type, i.e. what happens when the scenario comes to an end. - .. py:data:: FREEZE + .. py:data:: FREEZE - Freeze the end state forever. + Freeze the end state forever. - .. py:data:: STOP + .. py:data:: STOP - Stop the server as soon as the scenario has reached an end. + Stop the server as soon as the scenario has reached an end. - .. py:data:: REPEAT + .. py:data:: REPEAT - Repeat the scenario from the beginning while - starting again from the initial state. + Repeat the scenario from the beginning while + starting again from the initial state. """ FREEZE = 0 @@ -743,16 +710,13 @@ class FingerScenario: REPEAT = 2 def __init__(self): - # Initialize the properties. - self._end_type = None self._end_time = None self._actions = [] @property def ending_type(self) -> EndingType: - """ Ending type of the scenario, as an :py:data:`EndingType`. """ - + """Get the ending type of the scenario.""" return self._end_type @ending_type.setter @@ -772,22 +736,23 @@ class FingerScenario: 'interrupt': self.EndingType.FREEZE, 'freeze': self.EndingType.FREEZE, 'stop': self.EndingType.STOP, - 'repeat': self.EndingType.REPEAT}[value] + 'repeat': self.EndingType.REPEAT, + }[value] except KeyError: raise TypeError( - f'invalid value for ending type: {value!r}') + f'Invalid value for ending type: {value!r}', + ) self._end_type = value @property def duration(self) -> _Optional[_td]: - """ Offset of the ending. + """Offset of the ending. - When the offset is reached, any object following - the scenario should act out the ending type defined - in :py:attr:`ending_type`. + When the offset is reached, any object following + the scenario should act out the ending type defined + in :py:attr:`ending_type`. """ - return self._end_time @duration.setter @@ -795,32 +760,29 @@ class FingerScenario: self._end_time = _make_delta(value, allow_none=True) def verify(self) -> None: - """ Verify that the current scenario is valid. + """Verify that the current scenario is valid. - This function does the following checks on the scenario: + This function does the following checks on the scenario: - * The ending type and time (duration) are well defined. - * Any user edition or deletion event happens when the - related user exists. - * Any session creation, edition or deletion happens on - a user who exists at that point in time. - * Any session edition or deletion happens when the - related session exists. + * The ending type and time (duration) are well defined. + * Any user edition or deletion event happens when the + related user exists. + * Any session creation, edition or deletion happens on + a user who exists at that point in time. + * Any session edition or deletion happens when the + related session exists. - Any action defined after the ending time is ignored. + Any action defined after the ending time is ignored. - :raise ValueError: if the current scenario is invalid. + :raise ValueError: whether the current scenario is invalid. """ - # Check if the ending is well defined. - if self._end_time is None: - raise ValueError('ending time (duration) has not been provided') + raise ValueError('Ending time (duration) has not been provided') if not isinstance(self._end_type, self.EndingType): - raise ValueError('ending type has not been provided') + raise ValueError('Ending type has not been provided') # Check if the events are coherent. - users = _defaultdict(lambda: False) sessions = _defaultdict(lambda: _defaultdict(lambda: 0)) @@ -828,31 +790,30 @@ class FingerScenario: try: if time >= self._end_time: # Action will be ignored. - pass elif isinstance(action, FingerUserCreationAction): if users[action.login]: # The user we're trying to create already exists. - raise ValueError( - 'trying to create user ' - f'{action.login!r} which already exists') + 'Trying to create user ' + + f'{action.login!r} which already exists', + ) users[action.login] = True elif isinstance(action, FingerUserEditionAction): if not users[action.login]: # The user we're trying to edit doesn't exist. - raise ValueError( - 'trying to edit user ' - f"{action.login!r} while it doesn't exist") + 'Trying to edit user ' + f"{action.login!r} while it doesn't exist", + ) elif isinstance(action, FingerUserDeletionAction): if action.login not in users: # The user we're trying to delete doesn't exist. - raise ValueError( - 'trying to delete user ' - f"{action.login!r} while it doesn't exist") + 'Trying to delete user ' + f"{action.login!r} while it doesn't exist", + ) del users[action.login] try: @@ -862,56 +823,58 @@ class FingerScenario: elif isinstance(action, FingerUserLoginAction): if action.login not in users: # The user we're trying to log in as doesn't exist. - raise ValueError( - 'trying to log in as user ' - f"{action.login!r} which doesn't exist") + 'Trying to log in as user ' + f"{action.login!r} which doesn't exist", + ) sessions[action.login][action.session_name] += 1 elif isinstance(action, FingerUserSessionChangeAction): if sessions[action.login][action.session_name] <= 0: # The user doesn't exist (anymore?) or the session # does not exist. - if users[action.login]: raise ValueError( - 'trying to change non existing ' - f'session of user {action.login!r}') + 'Trying to change non existing ' + + f'session of user {action.login!r}', + ) else: raise ValueError( - 'trying to change session of ' - f'non-existing user {action.login!r}') + 'Trying to change session of ' + + f'non-existing user {action.login!r}', + ) elif isinstance(action, FingerUserLogoutAction): if sessions[action.login][action.session_name] <= 0: # The user doesn't exist (anymore?) or the session # does not exist. - if users[action.login]: raise ValueError( - 'trying to delete non existing ' - f'session of user {action.login!r}') + 'Trying to delete non existing ' + + f'session of user {action.login!r}', + ) else: raise ValueError( - 'trying to delete session of ' - f'non-existing user {action.login!r}') + 'Trying to delete session of ' + + f'non-existing user {action.login!r}', + ) sessions[action.login][action.session_name] -= 1 except ValueError as e: + msg = str(e) + msg = msg[0].lower() + msg[1:] raise ValueError( - f'at action #{i} at {_format_delta(time)}: ' - f'{e!s}', + f'At action #{i} at {_format_delta(time)}: {msg}', ) from None @classmethod def load(cls, path: str): - """ Load a scenario from a TOML file. + """Load a scenario from a TOML file. - Decodes the content of a scenario in TOML format and, if - successful, returns the result as an instance of FingerScenario. + Decodes the content of a scenario in TOML format and, if + successful, returns the result as an instance of FingerScenario. - :param path: Path of the TOML file to load. + :param path: Path of the TOML file to load. """ - global _toml actions = [] @@ -921,7 +884,6 @@ class FingerScenario: _logger.debug(f'Loading scenario from {path!r}.') # Load the required modules. - if _toml is None: try: import toml @@ -933,29 +895,26 @@ class FingerScenario: del toml # Read the document and translate all of the timestamps. - document = _toml.load(path) i = 0 for key in document.keys(): time = _parse_delta(key) if time is None: - raise ValueError(f'found invalid time: {key!r}') + raise ValueError(f'Found invalid time: {key!r}') if not isinstance(document[key], list): raise ValueError( - f'time {key!r} is not an array, ' - f'you have probably written [{key}] instead of ' - f'[[{key}]]', + f'Time {key!r} is not an array, ' + + f'you have probably written [{key}] instead of ' + + f'[[{key}]]', ) for j, data in enumerate(document[key]): try: typ = data['type'] - if typ in ('interrupt', 'freeze', 'stop', 'repeat'): # Set the ending type and time. - if end_time is None or end_time > time: end_type = { 'interrupt': cls.EndingType.FREEZE, @@ -967,7 +926,6 @@ class FingerScenario: continue elif typ == 'create': # User creation. - plan = None if 'plan' in data: pp = _path.join(_path.dirname(path), data['plan']) @@ -979,17 +937,19 @@ class FingerScenario: shell=data.get('shell'), # NOQA home=data.get('home'), office=data.get('office'), - plan=plan) + plan=plan, + ) elif typ == 'update': # User update. - plan = Unchanged if 'plan' in data: if data['plan'] is False: plan = None else: pp = _path.join( - _path.dirname(path), data['plan']) + _path.dirname(path), + data['plan'], + ) plan = open(pp).read() def g(k): @@ -1007,45 +967,43 @@ class FingerScenario: ) elif typ == 'delete': # User deletion. - action = FingerUserDeletionAction( - login=data['login']) + login=data['login'], + ) elif typ == 'login': # User login. - action = FingerUserLoginAction( login=data['login'], session_name=data.get('session'), line=data.get('line'), - host=data.get('host')) + host=data.get('host'), + ) elif typ == 'logout': # User logout. - action = FingerUserLogoutAction( login=data['login'], session_name=data.get('session'), ) elif typ in ('idle', 'active'): # Idle change status. - action = FingerUserSessionChangeAction( login=data['login'], session_name=data.get('session'), idle=(typ == 'idle'), ) else: - raise ValueError(f'invalid action type {typ!r}') + raise ValueError(f'Invalid action type {typ!r}') actions.append((time, action, i)) i += 1 except Exception as e: + msg = str(e) + msg = msg[0].lower() + msg[1:] raise ValueError( - f'at action #{j + 1} at ' - f'{_format_delta(time)}: {e!s}', + f'At action #{j + 1} at {_format_delta(time)}: {msg}', ) from None # Sort and check the actions. - _logger.debug( f'Loaded {len(actions)} action{("", "s")[len(actions) >= 2]}.', ) @@ -1053,7 +1011,6 @@ class FingerScenario: if end_type is None: # If no ending was given in the script file, we ought to # interrupt 10 seconds after the last action. - try: last_time = max(actions, key=lambda x: (x[0], x[2]))[0] except ValueError: @@ -1075,17 +1032,17 @@ class FingerScenario: to: _Optional[_td] = None, since: _Optional[_td] = None, ) -> _Sequence[FingerAction]: - """ Return a sequence of actions in order from the scenario. + """Return a sequence of actions in order from the scenario. - :param to: Maximum timedelta for the actions to gather. - :param since: Minimum timedelta for the actions to gather. - :return: The sequence of actions that occur and respect - the given constraints. + :param to: Maximum timedelta for the actions to gather. + :param since: Minimum timedelta for the actions to gather. + :return: The sequence of actions that occur and respect + the given constraints. """ - if since is not None and to is not None and since > to: raise ValueError( - f'`since` ({since}) should be before `to` ({to}).') + f'`since` ({since}) should be before `to` ({to}).', + ) for time, action, _ in self._actions: if since is not None and since >= time: @@ -1097,8 +1054,7 @@ class FingerScenario: yield time, action def add(self, action: FingerAction, time: _Union[_td, str]): - """ Add an action at the given time to the registered actions. """ - + """Add an action at the given time to the registered actions.""" time = _make_delta(time) try: @@ -1111,15 +1067,15 @@ class FingerScenario: class FingerScenarioInterface(FingerFictionInterface): - """ Fiction interface, to follow actions written in a scenario. + """Fiction interface, to follow actions written in a scenario. - Subclasses :py:class:`FingerFictionInterface` and adds - a regular update method for updating the state according - to the given scenario. + Subclasses :py:class:`FingerFictionInterface` and adds + a regular update method for updating the state according + to the given scenario. - :param scenario: The scenario to follow using the given interface. - :param start: The start time at which the scenario is supposed to - have started; by default, the current time is used. + :param scenario: The scenario to follow using the given interface. + :param start: The start time at which the scenario is supposed to + have started; by default, the current time is used. """ def __init__( @@ -1127,8 +1083,6 @@ class FingerScenarioInterface(FingerFictionInterface): scenario: FingerScenario, start: _Optional[_dt] = None, ): - """ Initialize the interface. """ - if start is None: start = _dt.now() if start.tzinfo is None: @@ -1137,11 +1091,11 @@ class FingerScenarioInterface(FingerFictionInterface): super().__init__() # Initialize the object properties. - if not isinstance(scenario, FingerScenario): raise TypeError( - 'scenario should be a FingerScenario, ' - f'is {scenario.__class__.__name__}.') + 'Scenario should be a FingerScenario, ' + f'is {scenario.__class__.__name__}.', + ) scenario.verify() scenario = _copy.copy(scenario) @@ -1150,7 +1104,6 @@ class FingerScenarioInterface(FingerFictionInterface): # - `scenario`: the script to follow. # - `laststart`: the last registered start. # - `lastdelta`: the last registered delta. - self._scenario = scenario self._start = start self._laststart = None @@ -1158,14 +1111,12 @@ class FingerScenarioInterface(FingerFictionInterface): @_cron('* * * * * *') def update(self): - """ Update the state according to the scenario every second. """ - + """Update the state according to the scenario every second.""" now = _dt.now().astimezone() start = self._laststart or self._start # Check if we have gone back in time, e.g. if the system time # has changed, and just start again. - if self._lastdelta is not None and now < start + self._lastdelta: _logger.debug('We seem to have gone back in time!') _logger.debug("Let's start again from a clean slate.") @@ -1176,7 +1127,6 @@ class FingerScenarioInterface(FingerFictionInterface): self.reset() # Check if we have reached an ending. - if now > start + self._scenario.duration: ending_type = self._scenario.ending_type if ending_type == FingerScenario.EndingType.STOP: @@ -1199,7 +1149,6 @@ class FingerScenarioInterface(FingerFictionInterface): # datetime.datetime(2000, 1, 1, 0, 0, 20) # # Let's see. - start = now - (now - start) % self._scenario.duration self.reset() @@ -1210,11 +1159,9 @@ class FingerScenarioInterface(FingerFictionInterface): # We're within the duration of the fiction, so we just use the # offset from the start. - delta = now - start # Then, we apply the actions up to the current time. - actions = self._scenario.get(to=delta, since=self._lastdelta) if _logger.getEffectiveLevel() <= _logging.DEBUG: @@ -1231,7 +1178,6 @@ class FingerScenarioInterface(FingerFictionInterface): self.apply(action, start + time) # Finally, we can keep track of where we were. - self._laststart = start self._lastdelta = delta |