aboutsummaryrefslogtreecommitdiff
path: root/tests/test_server.py
blob: ddcb8b5051f717ed9818927828a20e3b3913c0f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# *****************************************************************************
# Copyright (C) 2021 Thomas Touhey <thomas@touhey.fr>
# This file is part of the pyfingerd project, which is MIT-licensed.
# *****************************************************************************
"""Tests for the pyfingerd server."""

import socket
from datetime import timedelta
from time import sleep

from pyfingerd.core import *  # NOQA
from pyfingerd.fiction import *  # NOQA

import pytest


class TestFingerConnection:
    """Test basic finger connections."""

    @pytest.fixture(autouse=True)
    def fingerserver(self):
        """Start a finger server.

        A fixture starting a finger server on ``localhost:3099``
        and stopping it after the test.
        """
        scenario = FingerScenario()
        scenario.ending_type = 'freeze'
        scenario.duration = timedelta(seconds=5)
        scenario.add(
            FingerUserCreationAction(
                login='john',
                name='John Doe',
                home='/home/john',
                shell='/bin/bash',  # NOQA
                office='84.6',
            ),
            timedelta(seconds=-5),
        )
        scenario.add(
            FingerUserLoginAction(
                login='john',
                line='tty1',
            ),
            timedelta(seconds=0),
        )
        scenario.add(
            FingerUserLogoutAction(
                login='john',
            ),
            timedelta(seconds=1),
        )

        class TestFormatter(FingerFormatter):
            """Test formatter, uncomplicated to test."""

            def _format_users(self, users):
                result = f'{len(users)}\n'
                for user in users:
                    result += (
                        f'{user.login}|{user.name}|{user.home}|'
                        f'{user.shell}|{user.office or ""}|'
                        f'{len(user.sessions)}\n'
                    )
                    for session in user.sessions:
                        result += (
                            f'{session.line or ""}|{session.host or ""}\n'
                        )

                return result

            def format_query_error(self, hostname, raw_query):
                return f'{hostname}\n{raw_query}\nerror\n'

            def format_short(self, hostname, raw_query, users):
                return (
                    f'{hostname}\n{raw_query}\nshort\n'
                    + self._format_users(users)
                )

            def format_long(self, hostname, raw_query, users):
                return (
                    f'{hostname}\n{raw_query}\nlong\n'
                    + self._format_users(users)
                )

        server = FingerServer(
            'localhost:3099',
            hostname='example.org',
            interface=FingerScenarioInterface(scenario),
            formatter=TestFormatter(),
        )
        server.start()

        sleep(.1)
        yield

        server.stop()

    def _send_command(self, command):
        conn = socket.create_connection(('localhost', 3099), 1)
        conn.send(command.encode('ascii') + b'\r\n')

        result = tuple(
            conn.recv(1024).decode('ascii')
            .rstrip('\r\n').split('\r\n')
        )

        assert result[:2] == ('EXAMPLE.ORG', command)
        return result[2:]

    # ---
    # Tests.
    # ---

    def test_no_user_list(self):
        """Test if an unknown user returns an empty result."""
        assert self._send_command('user') == ('long', '0')

    def test_existing_user_list(self):
        """Test the user list before and after the cron is executed."""
        assert self._send_command('') == (
            'short', '1',
            'john|John Doe|/home/john|/bin/bash|84.6|1',
            'tty1|',
        )

        sleep(2)

        assert self._send_command('') == ('short', '0')

# End of file.