aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Touhey <thomas@touhey.fr>2021-09-05 02:05:16 +0200
committerThomas Touhey <thomas@touhey.fr>2021-09-05 02:05:16 +0200
commite7465642e39cfce7fcd6ed36e28dbdda14c0a4dc (patch)
treee72cab78f6de85b11c1bc282248dfbf60d13ee6f
parenta423ac0f832d5a4541e005630daee8679e07301f (diff)
Fixed the fiction scenario checking
-rw-r--r--docs/api/fiction.rst2
-rwxr-xr-xfingerd/fiction.py326
2 files changed, 252 insertions, 76 deletions
diff --git a/docs/api/fiction.rst b/docs/api/fiction.rst
index b362021..1f215ed 100644
--- a/docs/api/fiction.rst
+++ b/docs/api/fiction.rst
@@ -52,7 +52,7 @@ Classes for playing scenarios
-----------------------------
.. autoclass:: FingerScenario
- :members: ready, load, get
+ :members: valid, load, get
.. autoclass:: FingerScenarioInterface
diff --git a/fingerd/fiction.py b/fingerd/fiction.py
index a2e09be..2a2a9c9 100755
--- a/fingerd/fiction.py
+++ b/fingerd/fiction.py
@@ -10,8 +10,10 @@ import os.path as _path
import copy as _copy, re as _re, math as _math
import itertools as _itertools
+from enum import Enum as _Enum
from datetime import datetime as _dt, timedelta as _td
from typing import Optional as _Optional, Union as _Union, Tuple as _Tuple
+from collections import defaultdict as _defaultdict
from collections.abc import Sequence as _Sequence
from .core import (
@@ -165,6 +167,39 @@ def _parse_delta(raw):
except Exception as e:
return None
+def _format_delta(td):
+ """ Create a delta string. """
+
+ sls = zip((_td(days = 7), _td(days = 1), _td(seconds = 3600),
+ _td(seconds = 60)), 'wdhm')
+
+ if td >= _td():
+ d = ''
+
+ for span, letter in sls:
+ n = td // span
+ if n:
+ d += f"{n}{letter}"
+ td -= span * n
+
+ s = td.seconds
+ if not d or s:
+ d += f"{s}s"
+ else:
+ d = '-'
+
+ for span, letter in sls:
+ n = -td // span
+ if n:
+ d += f"{n}{letter}"
+ td += span * n
+
+ s = (-td).seconds
+ if s:
+ d += f"{s}s"
+
+ return d
+
# ---
# Unchanged global.
# ---
@@ -354,11 +389,10 @@ class FingerUserLoginAction(FingerAction):
:param line: The new value for :py:attr:`_FictionalSession.line`.
:param host: The new value for :py:attr:`_FictionalSession.host`. """
- def __init__(self,
- login: str,
- session_name: str,
- line: _Union[str, None],
- host: _Union[str, None]):
+ def __init__(self, login: str,
+ session_name: _Union[str, None] = None,
+ line: _Union[str, None] = None,
+ host: _Union[str, None] = None):
super().__init__()
self._login = login
self._session = session_name
@@ -377,7 +411,7 @@ class FingerUserLoginAction(FingerAction):
return self._login
@property
- def session_name(self) -> str:
+ def session_name(self) -> _Union[str, None]:
""" The name of the session to create. """
return self._session
@@ -403,8 +437,9 @@ class FingerUserSessionChangeAction(FingerAction):
:py:attr:`_FictionalSession.is_idle`;
:py:data:`Unchanged` if the property is unchanged. """
- def __init__(self, login: str, session_name: str,
- idle: _Union[bool, _UnchangedType]):
+ def __init__(self, login: str,
+ session_name: _Union[str, None] = None,
+ idle: _Union[bool, _UnchangedType] = Unchanged):
super().__init__()
self._login = login
self._session = session_name
@@ -422,7 +457,7 @@ class FingerUserSessionChangeAction(FingerAction):
return self._login
@property
- def session_name(self) -> str:
+ def session_name(self) -> _Union[str, None]:
""" The name of the session to edit. """
return self._session
@@ -440,7 +475,8 @@ class FingerUserLogoutAction(FingerAction):
:param login: The login of the user to edit.
:param session_name: The name of the session to delete. """
- def __init__(self, login: str, session_name: str):
+ def __init__(self, login: str,
+ session_name: _Union[str, None] = None):
super().__init__()
self._login = login
self._session = session_name
@@ -457,7 +493,7 @@ class FingerUserLogoutAction(FingerAction):
return self._login
@property
- def session_name(self) -> str:
+ def session_name(self) -> _Union[str, None]:
""" The name of the session to delete. """
return self._session
@@ -654,44 +690,116 @@ class FingerScenario:
A scenario always uses timedeltas and not datetimes, since it can
start at any arbitrary point in time and some scenarios are even
- on repeat.
+ on repeat. """
- Actions are always read from a TOML file, there is no API for adding
- actions manually to a scenario. """
+ class EndingType(_Enum):
+ """ Ending type.
- def __init__(self, path: _Optional[str] = None):
- global _toml
+ .. data:: FREEZE
- # Load the required modules.
+ Freeze the end state forever.
- if _toml is None:
- try:
- import toml
- except ModuleNotFoundError:
- raise ModuleNotFoundError("'toml' module "
- "required") from None
+ .. data:: STOP
- _toml = toml
- del toml
+ Stop the server as soon as the scenario has reached
+ the ending time.
- # Initialize the properties.
+ .. data:: REPEAT
+
+ Repeat the scenario from the beginning while
+ starting again from the initial state. """
- self._type = 'interrupt'
- self._start = _td()
- self._duration = None
+ FREEZE = 0
+ STOP = 1
+ REPEAT = 2
+
+ def __init__(self):
+ # Initialize the properties.
self._end_type = None
self._end_time = None
self._actions = []
- if isinstance(path, str):
- self.load(path)
+ def verify(self) -> None:
+ """ Verify that the current fiction is valid.
+ Raises ValueError in case it is not. """
+
+ # Check if the ending is well defined.
+
+ if self._end_time is None:
+ raise ValueError("ending time (duration) has not been provided")
+ if not isinstance(self._end_type, self.EndingType):
+ raise ValueError("ending type has not been provided")
- def ready(self) -> bool:
- """ Returns whether the fiction has successfully been loaded
- or not. """
+ # Check if the events are coherent.
- return self._end_time is not None
+ users = _defaultdict(lambda: False)
+ sessions = _defaultdict(lambda: _defaultdict(lambda: 0))
+
+ for i, (time, action, _) in enumerate(self._actions):
+ try:
+ if time >= self._end_time:
+ # Action will be ignored.
+
+ pass
+ elif isinstance(action, FingerUserCreationAction):
+ if users[action.login]:
+ # The user we're trying to create already exists.
+
+ raise ValueError("trying to create user "
+ f"{repr(action.login)} which already exists")
+
+ users[action.login] = True
+ elif isinstance(action, FingerUserEditionAction):
+ if not users[action.login]:
+ # The user we're trying to edit doesn't exist.
+
+ raise ValueError("trying to edit user "
+ f"{repr(action.login)} while it doesn't exist")
+ elif isinstance(action, FingerUserDeletionAction):
+ if not users[action.login]:
+ # The user we're trying to delete doesn't exist.
+
+ raise ValueError("trying to delete user "
+ f"{repr(action.login)} while it doesn't exist")
+
+ del users[action.login]
+ del sessions[action.login]
+ elif isinstance(action, FingerUserLoginAction):
+ if not users[action.login]:
+ # The user we're trying to log in as doesn't exist.
+
+ raise ValueError("trying to log in as user "
+ f"{repr(action.login)} which doesn't exist")
+
+ sessions[action.login][action.session_name] += 1
+ elif isinstance(action, FingerUserSessionChangeAction):
+ if sessions[action.login][action.session_name] <= 0:
+ # The user doesn't exist (anymore?) or the session
+ # does not exist.
+
+ if users[action.login]:
+ raise ValueError("trying to change non existing "
+ f"session of user {repr(action.login)}")
+ else:
+ raise ValueError("trying to change session of "
+ f"non-existing user {repr(action.login)}")
+ elif isinstance(action, FingerUserLogoutAction):
+ if sessions[action.login][action.session_name] <= 0:
+ # The user doesn't exist (anymore?) or the session
+ # does not exist.
+
+ if users[action.login]:
+ raise ValueError("trying to delete non existing "
+ f"session of user {repr(action.login)}")
+ else:
+ raise ValueError("trying to delete session of "
+ f"non-existing user {repr(action.login)}")
+
+ sessions[action.login][action.session_name] -= 1
+ except ValueError as e:
+ raise ValueError(f"at action #{i} at {_format_delta(time)}: "
+ f"{str(e)}") from None
def load(self, path: str) -> None:
""" Decodes the content of a scenario in TOML format and, if
@@ -700,13 +808,29 @@ class FingerScenario:
:param path: Path of the TOML file to load. """
+ global _toml
+
actions = []
end_type = None
end_time = None
+ # Load the required modules.
+
+ if _toml is None:
+ try:
+ import toml
+ except ModuleNotFoundError:
+ raise ModuleNotFoundError("'toml' module "
+ "required") from None
+
+ _toml = toml
+ del toml
+
# Read the document and translate all of the timestamps.
document = _toml.load(path)
+ i = 0
+
for key in document.keys():
time = _parse_delta(key)
if time is None:
@@ -715,11 +839,15 @@ class FingerScenario:
for data in document[key]:
typ = data['type']
- if typ in ('interrupt', 'freeze' 'stop', 'repeat'):
+ if typ in ('interrupt', 'freeze' 'stop', 'repeat'):
# Set the ending type and time.
if end_time is None or end_time > time:
- end_type = {'freeze': 'interrupt'}.get(typ, typ)
+ end_type = {
+ 'interrupt': self.EndingType.FREEZE,
+ 'freeze': self.EndingType.FREEZE,
+ 'stop': self.EndingType.STOP,
+ 'repeat': self.EndingType.REPEAT}[typ]
end_time = time
continue
elif typ == 'create':
@@ -788,17 +916,18 @@ class FingerScenario:
else:
raise Exception("invalid action type")
- actions.append((time, action))
+ actions.append((time, action, i))
+ i += 1
# Sort and check the actions.
- actions.sort(key = lambda x: x[0])
+ actions.sort(key = lambda x: (x[0], x[2]))
if end_type is None:
# If no ending was given in the script file, we ought to
# interrupt 10 seconds after the last action.
- end_type = 'interrupt'
+ end_type = self.EndingType.FREEZE
end_time = actions[-1][0] + _td(seconds = 10)
else:
# Otherwise, we are removing actions that are after the ending.
@@ -808,9 +937,8 @@ class FingerScenario:
# FIXME: check that incompatible actions, such as double creation
# for a user, doesn't occur.
- self._type = end_type
- self._start = actions[0][0] if actions else _td()
- self._duration = end_time
+ self._end_type = end_type
+ self._end_time = end_time
self._actions = actions
def get(self, to: _Optional[_td] = None,
@@ -826,24 +954,75 @@ class FingerScenario:
raise ValueError(f"`since` ({since}) should be " \
f"before `to` ({to}).")
- for time, action in self._actions:
+ for time, action, _ in self._actions:
if since is not None and since >= time:
continue
if to is not None and time > to:
continue
+ if self._end_time is not None and time >= self._end_time:
+ continue
yield time, action
+ def add(self, action: FingerAction, time: _td):
+ """ Add an action in the registered actions. """
+
+ try:
+ index = max(x[2] for x in self._actions)
+ except ValueError:
+ index = 0
+
+ self._actions.append((time, action, index + 1))
+ self._actions.sort(key = lambda x: (x[0], x[2]))
+
@property
- def type(self):
+ def type(self) -> EndingType:
""" Type of action flow, either 'interrupt', 'stop' or 'repeat'. """
- return self._type
+ return self._end_type
+
+ @type.setter
+ def type(self, value: EndingType) -> None:
+ if value is None:
+ self._end_type = None
+ return
+
+ try:
+ value = self.EndingType(value)
+ except ValueError:
+ try:
+ if isinstance(value, str):
+ value = value.casefold()
+ value = {
+ None: None,
+ 'interrupt': self.EndingType.FREEZE,
+ 'freeze': self.EndingType.FREEZE,
+ 'stop': self.EndingType.STOP,
+ 'repeat': self.EndingType.REPEAT}[value]
+ except KeyError:
+ raise TypeError("invalid value for ending type: "
+ f"{repr(value)}")
+
+ self._end_type = value
@property
- def duration(self):
+ def duration(self) -> _td:
""" Maximum offset. """
- return self._duration
+ return self._end_time
+
+ @duration.setter
+ def duration(self, value: _Optional[_td]) -> None:
+ if isinstance(value, _td):
+ self._end_time = value
+ return
+ if value is None:
+ self._end_time = None
+ return
+
+ try:
+ self._end_time = _td(seconds = value)
+ except TypeError:
+ raise TypeError(f"Invalid end time value: {repr(value)}")
class FingerScenarioInterface(FingerFictionInterface):
@@ -869,7 +1048,8 @@ class FingerScenarioInterface(FingerFictionInterface):
# Initialize the object properties.
if isinstance(scenario, FingerScenario):
- self._as = _copy.copy(scenario)
+ scenario.verify()
+ scenario = _copy.copy(scenario)
elif scenario is not None:
raise TypeError("scenario should be a FingerScenario or None, "
f"is {scenario.__class__.__name__}.")
@@ -893,32 +1073,28 @@ class FingerScenarioInterface(FingerFictionInterface):
now = _dt.now()
start = self._start
- if self._scenario is not None:
- # Either 'stop' or 'interrupt'.
- # These cases only are distinguishable after the
- # end of the fiction.
-
- if now > start + self._scenario.duration:
- if self._scenario.type == 'stop':
- exit(0)
- elif self._scenario.type == 'interrupt':
- return self._scenario.duration
-
- # We're on 'repeat', so we are going to have a slightly
- # different start because we want the start of the new
- # iteration.
- #
- # >>> from datetime import datetime as dt, timedelta as td
- # >>> a = dt(2000, 1, 1)
- # >>> b = a + td(seconds = 27)
- # >>> (b - a) % td(seconds = 10)
- # datetime.timedelta(seconds=7)
- # >>> b - (b - a) % td(seconds = 10)
- # datetime.datetime(2000, 1, 1, 0, 0, 20)
- #
- # Let's see.
-
- start = now - (now - start) % self._scenario.duration
+ if (self._scenario is not None and
+ now > start + self._scenario.duration):
+ if self._scenario.type == FingerScenario.EndingType.STOP:
+ exit(0)
+ elif self._scenario.type == FingerScenario.EndingType.FREEZE:
+ return start, self._scenario.duration
+
+ # We're on 'repeat', so we are going to have a slightly
+ # different start because we want the start of the new
+ # iteration.
+ #
+ # >>> from datetime import datetime as dt, timedelta as td
+ # >>> a = dt(2000, 1, 1)
+ # >>> b = a + td(seconds = 27)
+ # >>> (b - a) % td(seconds = 10)
+ # datetime.timedelta(seconds=7)
+ # >>> b - (b - a) % td(seconds = 10)
+ # datetime.datetime(2000, 1, 1, 0, 0, 20)
+ #
+ # Let's see.
+
+ start = now - (now - start) % self._scenario.duration
# We're within the duration of the fiction, so we just return the
# offset from the start.