diff options
author | Thomas Touhey <thomas@touhey.fr> | 2021-09-05 02:05:16 +0200 |
---|---|---|
committer | Thomas Touhey <thomas@touhey.fr> | 2021-09-05 02:05:16 +0200 |
commit | e7465642e39cfce7fcd6ed36e28dbdda14c0a4dc (patch) | |
tree | e72cab78f6de85b11c1bc282248dfbf60d13ee6f | |
parent | a423ac0f832d5a4541e005630daee8679e07301f (diff) |
Fixed the fiction scenario checking
-rw-r--r-- | docs/api/fiction.rst | 2 | ||||
-rwxr-xr-x | fingerd/fiction.py | 326 |
2 files changed, 252 insertions, 76 deletions
diff --git a/docs/api/fiction.rst b/docs/api/fiction.rst index b362021..1f215ed 100644 --- a/docs/api/fiction.rst +++ b/docs/api/fiction.rst @@ -52,7 +52,7 @@ Classes for playing scenarios ----------------------------- .. autoclass:: FingerScenario - :members: ready, load, get + :members: valid, load, get .. autoclass:: FingerScenarioInterface diff --git a/fingerd/fiction.py b/fingerd/fiction.py index a2e09be..2a2a9c9 100755 --- a/fingerd/fiction.py +++ b/fingerd/fiction.py @@ -10,8 +10,10 @@ import os.path as _path import copy as _copy, re as _re, math as _math import itertools as _itertools +from enum import Enum as _Enum from datetime import datetime as _dt, timedelta as _td from typing import Optional as _Optional, Union as _Union, Tuple as _Tuple +from collections import defaultdict as _defaultdict from collections.abc import Sequence as _Sequence from .core import ( @@ -165,6 +167,39 @@ def _parse_delta(raw): except Exception as e: return None +def _format_delta(td): + """ Create a delta string. """ + + sls = zip((_td(days = 7), _td(days = 1), _td(seconds = 3600), + _td(seconds = 60)), 'wdhm') + + if td >= _td(): + d = '' + + for span, letter in sls: + n = td // span + if n: + d += f"{n}{letter}" + td -= span * n + + s = td.seconds + if not d or s: + d += f"{s}s" + else: + d = '-' + + for span, letter in sls: + n = -td // span + if n: + d += f"{n}{letter}" + td += span * n + + s = (-td).seconds + if s: + d += f"{s}s" + + return d + # --- # Unchanged global. # --- @@ -354,11 +389,10 @@ class FingerUserLoginAction(FingerAction): :param line: The new value for :py:attr:`_FictionalSession.line`. :param host: The new value for :py:attr:`_FictionalSession.host`. """ - def __init__(self, - login: str, - session_name: str, - line: _Union[str, None], - host: _Union[str, None]): + def __init__(self, login: str, + session_name: _Union[str, None] = None, + line: _Union[str, None] = None, + host: _Union[str, None] = None): super().__init__() self._login = login self._session = session_name @@ -377,7 +411,7 @@ class FingerUserLoginAction(FingerAction): return self._login @property - def session_name(self) -> str: + def session_name(self) -> _Union[str, None]: """ The name of the session to create. """ return self._session @@ -403,8 +437,9 @@ class FingerUserSessionChangeAction(FingerAction): :py:attr:`_FictionalSession.is_idle`; :py:data:`Unchanged` if the property is unchanged. """ - def __init__(self, login: str, session_name: str, - idle: _Union[bool, _UnchangedType]): + def __init__(self, login: str, + session_name: _Union[str, None] = None, + idle: _Union[bool, _UnchangedType] = Unchanged): super().__init__() self._login = login self._session = session_name @@ -422,7 +457,7 @@ class FingerUserSessionChangeAction(FingerAction): return self._login @property - def session_name(self) -> str: + def session_name(self) -> _Union[str, None]: """ The name of the session to edit. """ return self._session @@ -440,7 +475,8 @@ class FingerUserLogoutAction(FingerAction): :param login: The login of the user to edit. :param session_name: The name of the session to delete. """ - def __init__(self, login: str, session_name: str): + def __init__(self, login: str, + session_name: _Union[str, None] = None): super().__init__() self._login = login self._session = session_name @@ -457,7 +493,7 @@ class FingerUserLogoutAction(FingerAction): return self._login @property - def session_name(self) -> str: + def session_name(self) -> _Union[str, None]: """ The name of the session to delete. """ return self._session @@ -654,44 +690,116 @@ class FingerScenario: A scenario always uses timedeltas and not datetimes, since it can start at any arbitrary point in time and some scenarios are even - on repeat. + on repeat. """ - Actions are always read from a TOML file, there is no API for adding - actions manually to a scenario. """ + class EndingType(_Enum): + """ Ending type. - def __init__(self, path: _Optional[str] = None): - global _toml + .. data:: FREEZE - # Load the required modules. + Freeze the end state forever. - if _toml is None: - try: - import toml - except ModuleNotFoundError: - raise ModuleNotFoundError("'toml' module " - "required") from None + .. data:: STOP - _toml = toml - del toml + Stop the server as soon as the scenario has reached + the ending time. - # Initialize the properties. + .. data:: REPEAT + + Repeat the scenario from the beginning while + starting again from the initial state. """ - self._type = 'interrupt' - self._start = _td() - self._duration = None + FREEZE = 0 + STOP = 1 + REPEAT = 2 + + def __init__(self): + # Initialize the properties. self._end_type = None self._end_time = None self._actions = [] - if isinstance(path, str): - self.load(path) + def verify(self) -> None: + """ Verify that the current fiction is valid. + Raises ValueError in case it is not. """ + + # Check if the ending is well defined. + + if self._end_time is None: + raise ValueError("ending time (duration) has not been provided") + if not isinstance(self._end_type, self.EndingType): + raise ValueError("ending type has not been provided") - def ready(self) -> bool: - """ Returns whether the fiction has successfully been loaded - or not. """ + # Check if the events are coherent. - return self._end_time is not None + users = _defaultdict(lambda: False) + sessions = _defaultdict(lambda: _defaultdict(lambda: 0)) + + for i, (time, action, _) in enumerate(self._actions): + try: + if time >= self._end_time: + # Action will be ignored. + + pass + elif isinstance(action, FingerUserCreationAction): + if users[action.login]: + # The user we're trying to create already exists. + + raise ValueError("trying to create user " + f"{repr(action.login)} which already exists") + + users[action.login] = True + elif isinstance(action, FingerUserEditionAction): + if not users[action.login]: + # The user we're trying to edit doesn't exist. + + raise ValueError("trying to edit user " + f"{repr(action.login)} while it doesn't exist") + elif isinstance(action, FingerUserDeletionAction): + if not users[action.login]: + # The user we're trying to delete doesn't exist. + + raise ValueError("trying to delete user " + f"{repr(action.login)} while it doesn't exist") + + del users[action.login] + del sessions[action.login] + elif isinstance(action, FingerUserLoginAction): + if not users[action.login]: + # The user we're trying to log in as doesn't exist. + + raise ValueError("trying to log in as user " + f"{repr(action.login)} which doesn't exist") + + sessions[action.login][action.session_name] += 1 + elif isinstance(action, FingerUserSessionChangeAction): + if sessions[action.login][action.session_name] <= 0: + # The user doesn't exist (anymore?) or the session + # does not exist. + + if users[action.login]: + raise ValueError("trying to change non existing " + f"session of user {repr(action.login)}") + else: + raise ValueError("trying to change session of " + f"non-existing user {repr(action.login)}") + elif isinstance(action, FingerUserLogoutAction): + if sessions[action.login][action.session_name] <= 0: + # The user doesn't exist (anymore?) or the session + # does not exist. + + if users[action.login]: + raise ValueError("trying to delete non existing " + f"session of user {repr(action.login)}") + else: + raise ValueError("trying to delete session of " + f"non-existing user {repr(action.login)}") + + sessions[action.login][action.session_name] -= 1 + except ValueError as e: + raise ValueError(f"at action #{i} at {_format_delta(time)}: " + f"{str(e)}") from None def load(self, path: str) -> None: """ Decodes the content of a scenario in TOML format and, if @@ -700,13 +808,29 @@ class FingerScenario: :param path: Path of the TOML file to load. """ + global _toml + actions = [] end_type = None end_time = None + # Load the required modules. + + if _toml is None: + try: + import toml + except ModuleNotFoundError: + raise ModuleNotFoundError("'toml' module " + "required") from None + + _toml = toml + del toml + # Read the document and translate all of the timestamps. document = _toml.load(path) + i = 0 + for key in document.keys(): time = _parse_delta(key) if time is None: @@ -715,11 +839,15 @@ class FingerScenario: for data in document[key]: typ = data['type'] - if typ in ('interrupt', 'freeze' 'stop', 'repeat'): + if typ in ('interrupt', 'freeze' 'stop', 'repeat'): # Set the ending type and time. if end_time is None or end_time > time: - end_type = {'freeze': 'interrupt'}.get(typ, typ) + end_type = { + 'interrupt': self.EndingType.FREEZE, + 'freeze': self.EndingType.FREEZE, + 'stop': self.EndingType.STOP, + 'repeat': self.EndingType.REPEAT}[typ] end_time = time continue elif typ == 'create': @@ -788,17 +916,18 @@ class FingerScenario: else: raise Exception("invalid action type") - actions.append((time, action)) + actions.append((time, action, i)) + i += 1 # Sort and check the actions. - actions.sort(key = lambda x: x[0]) + actions.sort(key = lambda x: (x[0], x[2])) if end_type is None: # If no ending was given in the script file, we ought to # interrupt 10 seconds after the last action. - end_type = 'interrupt' + end_type = self.EndingType.FREEZE end_time = actions[-1][0] + _td(seconds = 10) else: # Otherwise, we are removing actions that are after the ending. @@ -808,9 +937,8 @@ class FingerScenario: # FIXME: check that incompatible actions, such as double creation # for a user, doesn't occur. - self._type = end_type - self._start = actions[0][0] if actions else _td() - self._duration = end_time + self._end_type = end_type + self._end_time = end_time self._actions = actions def get(self, to: _Optional[_td] = None, @@ -826,24 +954,75 @@ class FingerScenario: raise ValueError(f"`since` ({since}) should be " \ f"before `to` ({to}).") - for time, action in self._actions: + for time, action, _ in self._actions: if since is not None and since >= time: continue if to is not None and time > to: continue + if self._end_time is not None and time >= self._end_time: + continue yield time, action + def add(self, action: FingerAction, time: _td): + """ Add an action in the registered actions. """ + + try: + index = max(x[2] for x in self._actions) + except ValueError: + index = 0 + + self._actions.append((time, action, index + 1)) + self._actions.sort(key = lambda x: (x[0], x[2])) + @property - def type(self): + def type(self) -> EndingType: """ Type of action flow, either 'interrupt', 'stop' or 'repeat'. """ - return self._type + return self._end_type + + @type.setter + def type(self, value: EndingType) -> None: + if value is None: + self._end_type = None + return + + try: + value = self.EndingType(value) + except ValueError: + try: + if isinstance(value, str): + value = value.casefold() + value = { + None: None, + 'interrupt': self.EndingType.FREEZE, + 'freeze': self.EndingType.FREEZE, + 'stop': self.EndingType.STOP, + 'repeat': self.EndingType.REPEAT}[value] + except KeyError: + raise TypeError("invalid value for ending type: " + f"{repr(value)}") + + self._end_type = value @property - def duration(self): + def duration(self) -> _td: """ Maximum offset. """ - return self._duration + return self._end_time + + @duration.setter + def duration(self, value: _Optional[_td]) -> None: + if isinstance(value, _td): + self._end_time = value + return + if value is None: + self._end_time = None + return + + try: + self._end_time = _td(seconds = value) + except TypeError: + raise TypeError(f"Invalid end time value: {repr(value)}") class FingerScenarioInterface(FingerFictionInterface): @@ -869,7 +1048,8 @@ class FingerScenarioInterface(FingerFictionInterface): # Initialize the object properties. if isinstance(scenario, FingerScenario): - self._as = _copy.copy(scenario) + scenario.verify() + scenario = _copy.copy(scenario) elif scenario is not None: raise TypeError("scenario should be a FingerScenario or None, " f"is {scenario.__class__.__name__}.") @@ -893,32 +1073,28 @@ class FingerScenarioInterface(FingerFictionInterface): now = _dt.now() start = self._start - if self._scenario is not None: - # Either 'stop' or 'interrupt'. - # These cases only are distinguishable after the - # end of the fiction. - - if now > start + self._scenario.duration: - if self._scenario.type == 'stop': - exit(0) - elif self._scenario.type == 'interrupt': - return self._scenario.duration - - # We're on 'repeat', so we are going to have a slightly - # different start because we want the start of the new - # iteration. - # - # >>> from datetime import datetime as dt, timedelta as td - # >>> a = dt(2000, 1, 1) - # >>> b = a + td(seconds = 27) - # >>> (b - a) % td(seconds = 10) - # datetime.timedelta(seconds=7) - # >>> b - (b - a) % td(seconds = 10) - # datetime.datetime(2000, 1, 1, 0, 0, 20) - # - # Let's see. - - start = now - (now - start) % self._scenario.duration + if (self._scenario is not None and + now > start + self._scenario.duration): + if self._scenario.type == FingerScenario.EndingType.STOP: + exit(0) + elif self._scenario.type == FingerScenario.EndingType.FREEZE: + return start, self._scenario.duration + + # We're on 'repeat', so we are going to have a slightly + # different start because we want the start of the new + # iteration. + # + # >>> from datetime import datetime as dt, timedelta as td + # >>> a = dt(2000, 1, 1) + # >>> b = a + td(seconds = 27) + # >>> (b - a) % td(seconds = 10) + # datetime.timedelta(seconds=7) + # >>> b - (b - a) % td(seconds = 10) + # datetime.datetime(2000, 1, 1, 0, 0, 20) + # + # Let's see. + + start = now - (now - start) % self._scenario.duration # We're within the duration of the fiction, so we just return the # offset from the start. |