Docs / Example

Python (SVK)

Browser-rendered source from docs/examples/notabot_validator.py. This is the actual example file, served through the docs route so you can inspect it without leaving the app.

Source file

notabot_validator.py

Python
import hmac
import hashlib
import base64
import json
import os
import time
import secrets
from typing import Optional


def build_canonical_request(
    method: str,
    path: str,
    body: str,
    site_key: str,
    timestamp: str,
    nonce: str,
) -> str:
    body_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
    return "\n".join([
        method.upper(),
        path,
        body_hash,
        site_key,
        timestamp,
        nonce,
    ])


def sign_request(canonical_request: str, signing_secret: str) -> str:
    mac = hmac.new(
        signing_secret.encode("utf-8"),
        canonical_request.encode("utf-8"),
        hashlib.sha256,
    ).digest()
    return "v1=" + base64.urlsafe_b64encode(mac).decode("utf-8").rstrip("=")


class NotabotValidator:
    def __init__(
        self,
        site_key: str,
        signing_secret: str,
        validate_url: str,
        timeout_seconds: float = 5.0,
    ):
        self.site_key = site_key
        self.signing_secret = signing_secret
        self.validate_url = validate_url
        self.timeout_seconds = timeout_seconds

    def validate_token(
        self,
        proof_token: str,
        action: str,
        origin: str,
    ) -> dict:
        """
        Validates a Notabot proof token before a protected action.

        v2 contract: sends only proof_token, action, and origin.
        Do NOT include session_id or challenge_id — the server rejects those fields with 400.
        """
        body = json.dumps(
            {
                "proof_token": proof_token,
                "action": action,
                "origin": origin,
            },
            separators=(",", ":"),
        )

        timestamp = str(int(time.time()))
        nonce = base64.urlsafe_b64encode(secrets.token_bytes(16)).decode("utf-8").rstrip("=")

        canonical = build_canonical_request(
            "POST",
            "/api/v1/verification/validate",
            body,
            self.site_key,
            timestamp,
            nonce,
        )
        signature = sign_request(canonical, self.signing_secret)

        try:
            import httpx
        except ImportError:
            return {"allow": False, "error": "validation_unavailable"}

        try:
            response = httpx.post(
                self.validate_url,
                content=body,
                headers={
                    "Content-Type": "application/json",
                    "x-tpac-site-key": self.site_key,
                    "x-tpac-timestamp": timestamp,
                    "x-tpac-nonce": nonce,
                    "x-tpac-signature": signature,
                },
                timeout=self.timeout_seconds,
            )
        except httpx.TimeoutException:
            return {"allow": False, "error": "validation_timeout"}
        except httpx.NetworkError:
            return {"allow": False, "error": "validation_unavailable"}

        if response.status_code < 200 or response.status_code >= 300:
            return {
                "allow": False,
                "request_id": None,
                "correlation_id": None,
                "error": f"http_{response.status_code}",
            }

        data = response.json()

        if data.get("success") is True and data.get("decision") == "allow":
            return {
                "allow": True,
                "request_id": data.get("request_id"),
                "correlation_id": data.get("correlation_id"),
                "error": None,
            }

        return {
            "allow": False,
            "request_id": data.get("request_id"),
            "correlation_id": data.get("correlation_id"),
            "error": data.get("reason", "verification_failed"),
        }