1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
from sanic import Sanic
from sanic.response import json as response_json, file
from config import APP_DIR, debug as is_debug
from aioredis import create_redis_pool
from dataclasses import dataclass, fields, asdict
import json
from math import floor
app = Sanic(__name__)
if is_debug:
# in release environment, these will be served by nginx
# but in debug, we serve this shit directly.
# app.static("/", APP_DIR + "/../front/index.html")
app.static("/", APP_DIR + "/../front/dist/")
def now_ms() -> int:
from datetime import datetime
return floor(datetime.utcnow().timestamp() * 1000)
@dataclass
class Todo:
desc: str
expires: int # utc milliseconds
created: int # utc milliseconds
done: int # utc milliseconds of finishing, or -1
id: Optional[int] = None
def marshal(self) -> str:
self_dict = dict()
for field in fields(self):
self_dict.update({ field.name: getattr(self, field.name) })
return json.dumps(self_dict)
@classmethod
def unmarshal_safe(cls, json_obj) -> Todo:
if isinstance(json_obj, str):
json_obj = json.loads(json_obj)
self_dict = dict(created=now_ms())
for field in fields(cls):
if field.name in ["created"] or "Optional" in field.type:
continue
# field.type is str, so there is no way to check field's type but string-comparing
# types representation
if type(json_obj[field.name]) == field.type:
raise TypeError(f"Todo.{field.name} must be instance of {field.type}")
self_dict.update({ field.name: json_obj[field.name] })
cls.verify_json(self_dict)
return cls(**self_dict)
@staticmethod
def verify_json(self_dict):
if len(self_dict["desc"]) > 4096:
raise ValueError("desc cannot be more than 4K chars")
@classmethod
async def from_redis(cls, redis, key):
json_str = await redis.get(key)
todo = cls.unmarshal_safe(json_str.decode("utf-8"))
todo.id = int(key.decode("utf-8").split(":")[-1])
return todo
@app.listener("before_server_start")
async def init_redis(app, loop):
app.db = await create_redis_pool("redis://localhost")
@app.listener("after_server_stop")
async def close_redis(app, loop):
from asyncio import sleep
app.db.close()
await app.db.wait_closed()
@app.post("/new-todo")
async def new_todo(req):
todo = Todo.unmarshal_safe(req.json)
id = await req.app.db.incr("upnet_todo_id_seq")
await req.app.db.set(f"upnet:todo:{id}", todo.marshal())
return response_json(dict(status="ok", id=id))
@app.post("/delete-todo")
async def delete_todo(req):
id = req.json["id"]
await req.app.db.delete(f"upnet:todo:{id}")
return response_json(dict(status="ok", was=id))
@app.get("/todos")
async def get_todos(req):
all_todos_keys = await req.app.db.keys("upnet:todo:*")
# TODO: await all together
todos = [asdict(await Todo.from_redis(req.app.db, todo_id))
for todo_id in all_todos_keys
]
return response_json({ "todos": todos })
if __name__ == "__main__":
app.go_fast(host="0.0.0.0", port=8000, debug=is_debug)
|