vtt/src/ttfrog/app.py

235 lines
8.3 KiB
Python

import io
import sys
from pathlib import Path
from types import SimpleNamespace
from dotenv import dotenv_values
from flask import Flask
from flask_session import Session
from grung.db import GrungDB
from tinydb import where
from tinydb.storages import MemoryStorage
from ttfrog import schema
from ttfrog.exceptions import (
ApplicationNotInitializedError,
MalformedRequestError,
RecordNotFoundError,
UnauthorizedError,
)
class ApplicationContext:
"""
The global context for the application, this class provides access to the Flask app instance, the GrungDB instance,
and the loaded configuration.
To prevent multiple contexts from being created, the class is instantiated at import time and replaces the module in
the symbol table. The first time it is imported, callers should call both .load_config() and .initialize(); this is
typically done at program start.
After being intialized, callers can import ttfrog.app and interact with the ApplicationContext instance directly:
>>> from ttfrog import app
>>> print(app.config.NAME)
ttfrog
"""
CONFIG_DEFAULTS = """
# ttfrog Defaults
NAME=ttfrog
LOG_LEVEL=INFO
SECRET_KEY=fnord
IN_MEMORY_DB=
DATA_ROOT=~/.dnd/ttfrog/
ADMIN_USERNAME=admin
ADMIN_EMAIL=admin@telisar
THEME=default
VIEW_URI=/
API_URI=/_/v1/
"""
def __init__(self):
self.config: SimpleNamespace = None
self.web: Flask = None
self.db: GrungDB = None
self._initialized = False
def load_config(self, defaults: Path | None = Path("~/.dnd/ttfrog/defaults"), **overrides) -> None:
"""
Load the user configuration from the following in sources, in order:
1. ApplicationContext.CONFIG_DEFAULTS
2. The user's configuration defaults file, if any
3. Overrides specified by the caller, if any
Once the configuration is loaded, the path attribute is also configured.
"""
config_file = defaults.expanduser() if defaults else None
self.config = SimpleNamespace(
**{
**dotenv_values(stream=io.StringIO(ApplicationContext.CONFIG_DEFAULTS)),
**(dotenv_values(config_file) if config_file else {}),
**overrides,
}
)
data_root = Path(self.config.DATA_ROOT).expanduser()
self.path = SimpleNamespace(
config=config_file,
data_root=data_root,
database=data_root / f"{self.config.NAME}.json",
sessions=data_root / "session_cache",
)
def initialize(self, db: GrungDB = None, force: bool = False) -> None:
"""
Instantiate both the database and the flask application.
"""
if force or not self._initialized:
if db:
self.db = db
elif self.config.IN_MEMORY_DB:
self.db = GrungDB.with_schema(schema, path=None, storage=MemoryStorage)
else:
self.db = GrungDB.with_schema(
schema, path=self.path.database, sort_keys=True, indent=4, separators=(",", ": ")
)
self.theme = Path(__file__).parent / "themes" / self.config.THEME
self.web = Flask(self.config.NAME, template_folder=self.theme, static_folder=self.theme / "static")
self.web.config["SECRET_KEY"] = self.config.SECRET_KEY
self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
self.web.config["DEBUG"] = True
self.web.config["SESSION_TYPE"] = "filesystem"
self.web.config["SESSION_REFRESH_EACH_REQUEST"] = True
self.web.config["SESSION_FILE_DIR"] = self.path.sessions
Session(self.web)
self.log = self.web.logger
self._initialized = True
def check_state(self) -> None:
if not self._initialized:
raise ApplicationNotInitializedError("This action requires the application to be initialized.")
def authenticate(self, username: str, password: str) -> schema.User:
"""
Returns the User record matching the given username and password
"""
if not (username and password):
self.log.debug("Need both username and password to login")
return None
user = self.db.User.get(where("name") == username)
if not user:
self.log.debug(f"No user matching {username}")
return None
if not user.check_credentials(username, password):
self.log.debug(f"Invalid credentials for {username}")
return None
return user
def authorize(self, user, record, requested):
return user.has_permission(record, requested)
def _get_or_create_page_by_uri(self, user, table, uri):
"""
Get a page by URI. If it doesn't exist, create a new one if and only if the user has permission
to write on its parent.
"""
uri = uri.replace(" ", "").strip("/")
if uri.startswith(self.config.VIEW_URI):
uri = uri.replace(self.config.VIEW_URI, "", 1)
parent_uri = None
search_uri = uri
page_name = uri
if "/" in uri:
(parent_uri, page_name) = uri.rsplit("/", 1)
if parent_uri == 'Page':
parent_uri = None
search_uri = page_name
else:
search_uri = uri
# self.log.debug(f"Searching for page in {table = } with {search_uri = }; its parent is {parent_uri=}")
# self.log.debug("\n".join([f"{p.doc_id}: {p.uri}" for p in table.all()]))
page = table.get(where("uri") == search_uri, recurse=False)
if not page:
# load the parent to check for write permissions
# self.log.debug(f"Page at {search_uri} does not exist, looking for parent at {parent_uri=}")
parent_table = table if parent_uri and "/" in parent_uri else self.db.Page
parent = None
try:
# self.log.debug(f"Loading parent with {parent_uri}")
parent = self.get_page(user, parent_table.name, uri=parent_uri)
except Exception as e:
self.log.debug(f"Error loading parent: {e}")
if not parent:
raise MalformedRequestError(f"Page with uri '{search_uri}' does not exist and neither does its parent.")
if not self.authorize(user, parent, schema.Permissions.WRITE):
raise UnauthorizedError(f"User {user.doc_id} does not have permission to create under {parent_uri}.")
obj = getattr(schema, table.name)
page = obj(name=page_name, body=obj.default.format(name=page_name), parent=parent)
# self.log.debug(f"Returning {page.doc_id}: {page.uri}")
return page
def get_page(self, user, table_name, doc_id=None, uri=None):
"""
Get a page by doc_id or by URI, if and only if the user is allowed to read it. A new Record
instance will be returned if the requested page does not exist but the user has permission
to create it.
"""
if not user.doc_id:
self.log.error(f"Invalid user: {user}")
raise MalformedRequestError("User does not exist.")
try:
table = self.db.table(table_name)
except RuntimeError:
table = self.db.Page
self.log.error(f"Invalid table_name: {table_name}, will use Page")
# raise MalformedRequestError(f"{table_name} table does not exist.")
if doc_id:
page = table.get(doc_id=doc_id)
if not page:
raise RecordNotFoundError(f"No record with {doc_id=} was found.")
page = self._get_or_create_page_by_uri(user, table, uri)
if not self.authorize(user, page, schema.Permissions.READ):
self.log.error(f"No permission for {user.name} on {page}")
raise UnauthorizedError(f"User {user.doc_id} does not have permission to read {table_name} {page.doc_id}.")
# resolve the pointers to subpages so we can render things like nav elements.
if hasattr(page, "members"):
subpages = []
for pointer in page.members:
table, pkey, pval = pointer.split("::")
subpages += self.db.table(table).search(where(pkey) == pval, recurse=False)
page.members = subpages
return page
sys.modules[__name__] = ApplicationContext()