Skip to content

API Reference

Auto-generated Python API documentation for the Turva backend.

Configuration

config

ConfigurationError

Bases: Exception

Raised when a configuration error occurs

Source code in src/config.py
5
6
class ConfigurationError(Exception):
    """Raised when a configuration error occurs"""

options: show_root_heading: true show_source: true

Models

User Model

models.user

User

Bases: Model, DateFieldsMixins, BaseUser

Source code in models/user.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class User(ormar.Model, DateFieldsMixins, BaseUser):
    ormar_config = ormar_config.copy(tablename="tbl_user")  # type: ignore

    id: UUID = ormar.UUID(
        primary_key=True,
        unique=True,
        nullable=False,
    )
    first_name: str = ormar.String(max_length=50)
    last_name: str = ormar.String(max_length=50)
    password: str = ormar.Text(nullable=False)
    email_address: str = ormar.String(max_length=100, unique=True)
    is_verified: bool = ormar.Boolean(default=False)
    verification_token: str | None = ormar.String(max_length=100, nullable=True)
    verification_token_created_at: datetime | None = ormar.DateTime(nullable=True, timezone=True)
    is_active: bool = ormar.Boolean(default=True)
    organisation: str | None = ormar.String(max_length=100, nullable=True)
    job_role: str | None = ormar.String(max_length=100, nullable=True)
    is_cso: bool = ormar.Boolean(default=False)

    @classmethod
    async def generate_password_hash(cls, password: str):
        ph = PasswordHasher()
        return ph.hash(password)

    async def check_password(self, password: str):
        ph = PasswordHasher()
        try:
            return ph.verify(self.password, password)
        except VerifyMismatchError:
            return False

    async def get_verification_token(self) -> str:
        """
        Returns the user's email verification token.

        If the user is already verified, an error is raised.
        If the token has expired, a new token is generated, saved and returned.

        Returns:
            str: The email verification token.
        """

        if self.is_verified:
            raise ValueError("User is already verified.")

        token_lifetime = timedelta(hours=8)

        # If the token is not older than 10 minutes, raise an error
        if self.verification_token_created_at is not None and datetime.now(
            tz=UTC
        ) - self.verification_token_created_at < timedelta(minutes=10):
            raise ValueError("Verification token cannot be recycled yet.")

        if (
            self.verification_token is None or self.verification_token_created_at is None
        ) or datetime.now(tz=UTC) - self.verification_token_created_at > token_lifetime:
            # Token has expired or was never created, generate a new one
            self.verification_token = secrets.token_urlsafe(48)
            self.verification_token_created_at = datetime.now(tz=UTC)

            await self.update(
                verification_token=self.verification_token,
                verification_token_created_at=self.verification_token_created_at,
            )

        return self.verification_token

    async def has_verification_token_expired(self) -> bool:
        """
        Checks if the user's email verification token has expired.

        Returns:
            bool: True if the token has expired, False otherwise.
        """
        token_lifetime = timedelta(hours=8)
        return (
            datetime.now(tz=UTC) - self.verification_token_created_at.replace(tzinfo=UTC)
            > token_lifetime
            if self.verification_token_created_at
            else True
        )

get_verification_token() async

Returns the user's email verification token.

If the user is already verified, an error is raised. If the token has expired, a new token is generated, saved and returned.

Returns:

Name Type Description
str str

The email verification token.

Source code in models/user.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def get_verification_token(self) -> str:
    """
    Returns the user's email verification token.

    If the user is already verified, an error is raised.
    If the token has expired, a new token is generated, saved and returned.

    Returns:
        str: The email verification token.
    """

    if self.is_verified:
        raise ValueError("User is already verified.")

    token_lifetime = timedelta(hours=8)

    # If the token is not older than 10 minutes, raise an error
    if self.verification_token_created_at is not None and datetime.now(
        tz=UTC
    ) - self.verification_token_created_at < timedelta(minutes=10):
        raise ValueError("Verification token cannot be recycled yet.")

    if (
        self.verification_token is None or self.verification_token_created_at is None
    ) or datetime.now(tz=UTC) - self.verification_token_created_at > token_lifetime:
        # Token has expired or was never created, generate a new one
        self.verification_token = secrets.token_urlsafe(48)
        self.verification_token_created_at = datetime.now(tz=UTC)

        await self.update(
            verification_token=self.verification_token,
            verification_token_created_at=self.verification_token_created_at,
        )

    return self.verification_token

has_verification_token_expired() async

Checks if the user's email verification token has expired.

Returns:

Name Type Description
bool bool

True if the token has expired, False otherwise.

Source code in models/user.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def has_verification_token_expired(self) -> bool:
    """
    Checks if the user's email verification token has expired.

    Returns:
        bool: True if the token has expired, False otherwise.
    """
    token_lifetime = timedelta(hours=8)
    return (
        datetime.now(tz=UTC) - self.verification_token_created_at.replace(tzinfo=UTC)
        > token_lifetime
        if self.verification_token_created_at
        else True
    )

options: show_root_heading: true members: - User

Session Model

models.session

options: show_root_heading: true members: - Session

Authentication

authentication.middleware

TurvaAuthenticationBackend

Bases: AuthenticationBackend

This middleware does behind the scenes stuff to maintain the user's session.

It checks that the user's session is valid and that the user is active. If the session is invalid or the user is not active, the user's session is deleted, logging them out.

This also handles updating the user's session expiry on each request.

Source code in authentication/middleware.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class TurvaAuthenticationBackend(AuthenticationBackend):
    """
    This middleware does behind the scenes stuff to maintain
    the user's session.

    It checks that the user's session is valid and that the user
    is active. If the session is invalid or the user is not active,
    the user's session is deleted, logging them out.

    This also handles updating the user's session expiry on each
    request.
    """

    async def authenticate(self, conn: HTTPConnection) -> tuple[AuthCredentials, BaseUser] | None:
        # We could also check if the user's valid in LDAP,
        # but that would hit the LDAP server on every
        # request, which is not ideal, but possible.
        # Need to investigate.

        # Get the session token from the signed session cookie (managed by SessionMiddleware)
        session_token = conn.session.get("session_token")
        if not session_token:
            return None

        # Look up the session in the database
        session = await Session.objects.select_related("user").get_or_none(token=session_token)

        if session is None:
            # The session does not exist
            return None

        # If the user is not active, remove their session
        # Or if the user's session has expired
        if session.user.is_active is False or session.is_expired():
            await session.delete()
            return None

        # Update the session expiry
        await session.update(
            expires_at=datetime.datetime.now(tz=datetime.UTC)
            + datetime.timedelta(seconds=Config.Application.session_cookie_lifetime)
        )

        scopes = [Scope.AUTHENTICATED.value]
        if session.user.is_verified:
            scopes.append(Scope.VERIFIED.value)

        # Return the user + their scopes
        return AuthCredentials(scopes), session.user

options: show_root_heading: true

Endpoints

endpoints

In this module, there are a series of other folders. Each of those folders contain files that are used to create the endpoints for the API.

This script will import all of these files and create the endpoints for the API, using the name of each folder as the endpoint prefix (//).

It will also support nesting folders and creating the route accordingly.

For example, if you have the following folder structure: ``` endpoints ├── auth │ ├── login.py │ ├── register.py | ├── test | ├── dothing.py

You will have the following endpoints: /auth/login /auth/register /auth/test/dothing

options: show_root_heading: true