from __future__ import annotations from sanic import Sanic from sanic.response import json as response_json, file from config import APP_DIR, debug as is_debug, sanic_host, sanic_port from aioredis import create_redis_pool from dataclasses import dataclass, fields, asdict import json from math import floor app = Sanic(__name__) if is_debug: # in release environment, these will be served by nginx # but in debug, we serve this shit directly. # app.static("/", APP_DIR + "/../front/index.html") app.static("/", APP_DIR + "/../front/dist/") def now_ms() -> int: from datetime import datetime return floor(datetime.utcnow().timestamp() * 1000) @dataclass class Todo: desc: str expires: int # utc milliseconds created: int # utc milliseconds done: int # utc milliseconds of finishing, or -1 id: Optional[int] = None def marshal(self) -> str: self_dict = dict() for field in fields(self): self_dict.update({field.name: getattr(self, field.name)}) return json.dumps(self_dict) @classmethod def unmarshal_safe(cls, json_obj) -> Todo: if isinstance(json_obj, str): json_obj = json.loads(json_obj) self_dict = dict(created=now_ms()) for field in fields(cls): if field.name in ["created"] or "Optional" in field.type: continue # field.type is str, so there is no way to check field's type but string-comparing # types representation if type(json_obj[field.name]) == field.type: raise TypeError(f"Todo.{field.name} must be instance of {field.type}") self_dict.update({field.name: json_obj[field.name]}) cls.verify_json(self_dict) return cls(**self_dict) @staticmethod def verify_json(self_dict): if len(self_dict["desc"]) > 4096: raise ValueError("desc cannot be more than 4K chars") @classmethod async def from_redis(cls, redis, key): json_str = await redis.get(key) todo = cls.unmarshal_safe(json_str.decode("utf-8")) todo.id = int(key.decode("utf-8").split(":")[-1]) return todo @app.listener("before_server_start") async def init_redis(app, loop): app.db = await create_redis_pool("redis://localhost") @app.listener("after_server_stop") async def close_redis(app, loop): from asyncio import sleep app.db.close() await app.db.wait_closed() @app.post("/api/new-todo") async def new_todo(req): todo = Todo.unmarshal_safe(req.json) id = await req.app.db.incr("upnet_todo_id_seq") await req.app.db.set(f"upnet:todo:{id}", todo.marshal()) return response_json(dict(status="ok", id=id)) @app.post("/api/delete-todo") async def delete_todo(req): id = req.json["id"] await req.app.db.delete(f"upnet:todo:{id}") return response_json(dict(status="ok", was=id)) @app.get("/api/todos") async def get_todos(req): all_todos_keys = await req.app.db.keys("upnet:todo:*") # TODO: await all together todos = [ asdict(await Todo.from_redis(req.app.db, todo_id)) for todo_id in all_todos_keys ] return response_json({"todos": todos}) if __name__ == "__main__": app.go_fast(host=sanic_host, port=sanic_port, debug=is_debug)