from flask import Response, g, redirect, render_template, request, session, url_for from tinydb import where from ttfrog import app, forms, schema STATIC = ["static"] def relative_uri(path: str = ""): """ The request's URI relative to the VIEW_URI without the leading '/'. """ return (path or request.path).replace(app.config.VIEW_URI, "", 1).strip("/") or "/" def get_parent(table: str, uri: str): try: parent_uri = uri.strip("/").rsplit("/", 1)[0] except IndexError: return None return get_page(parent_uri, table=table if "/" in parent_uri else "Page", create_okay=False) def get_page(path: str = "", table: str = "Page", create_okay: bool = False): """ Get one page, including its members, but not recursively. """ uri = path.strip("/") or relative_uri(request.path) app.web.logger.debug(f"Need a page in {table} for {uri = }") if table not in app.db.tables(): return None page = app.db.table(table).get(where("uri") == uri, recurse=False) if hasattr(page, "acl"): acl = [] for pointer in page.acl: table, uid = pointer.split("::") acl += app.db.table(table).search(where("uid") == uid, recurse=False) page.acl = acl if not page: if not create_okay: return None parent = get_parent(table, uri) if not g.user.can_read(parent, app.db): return None return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent) if not g.user.can_read(page, app.db): return None if hasattr(page, "members"): subpages = [] for pointer in page.members: table, uid = pointer.split("::") subpages += app.db.table(table).search(where("uid") == uid, recurse=False) page.members = subpages return page def rendered(page: schema.Record, template: str = "page.html"): if not page: return Response("Page not found", status=404) return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs(), user=session['user'], g=g) def get_static(path): return Response("OK", status=200) def breadcrumbs(): """ Return (uri, name) pairs for the parents leading from the VIEW_URI to the current request. """ if app.config.VIEW_URI != "/": root = get_page() yield (app.config.VIEW_URI, root.name) uri = "" for name in relative_uri().split("/"): uri = "/".join([uri, name]).lstrip("/") yield (uri, name) @app.web.route("/") def index(): return rendered(get_page(create_okay=False)) def do_login(username: str, password: str) -> bool: """ Update the session with the user record if the credenteials are valid. """ if not (username and password): return False user = app.db.User.get(where("name") == "username") if not user: return False if not user.check_credentials(username, password): return False app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.") g.user = user session["user_id"] = g.user.doc_id session["user"] = dict(g.user.serialize()) return True @app.web.route("/login", methods=["GET", "POST"]) def login(): app.web.session_interface.regenerate(session) if request.method == "POST": username = request.form.get("username") password = request.form.get("password") if do_login(username, password): return redirect(url_for("index")) g.messages.append(f"Invalid login for {username}") return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html") @app.web.route("/logout") def logout(): if "user_id" in session: del session["user_id"] del g.user @app.web.route(f"{app.config.VIEW_URI}//", methods=["GET"]) @app.web.route(f"{app.config.VIEW_URI}/", methods=["GET"], defaults={"table": "Page"}) def view(table, path): parent = get_parent(table, relative_uri()) print(parent) return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None))) @app.web.route(f"{app.config.VIEW_URI}//", methods=["POST"]) @app.web.route(f"{app.config.VIEW_URI}/", methods=["POST"], defaults={"table": "Page"}) def edit(table, path): uri = relative_uri() parent = get_parent(table, uri) if not parent: return Response("You cannot create a page at this location.", status=403) # get or create the docoument at this uri page = get_page(uri, table=table, create_okay=True) save_data = getattr(forms, table)(page, request.form).prepare() # editing existing document if page.doc_id: if page.uid != request.form["uid"]: return Response("Invalid UID.", status=403) return rendered(app.db.save(save_data)) # saving a new document return rendered(app.add_member(parent, save_data)) @app.web.before_request def before_request(): g.messages = [] user_id = session.get("user_id", 1) g.user = app.db.User.get(doc_id=user_id) session["user_id"] = user_id session["user"] = dict(g.user.serialize()) @app.web.after_request def add_header(r): r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0" r.headers["Pragma"] = "no-cache" r.headers["Expires"] = "0" return r