summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsyn <isaqtm@gmail.com>2020-05-25 19:19:26 +0300
committersyn <isaqtm@gmail.com>2020-05-25 19:19:26 +0300
commitb6efc9b39a367da829e84af581387733600c08d7 (patch)
treef1c7d77b536c706a56eafc73a4b8fa6daf3b0938
parentca66ae302745a0e3285fd260a528e17b21065743 (diff)
downloadpert-b6efc9b39a367da829e84af581387733600c08d7.tar.gz
[move to typing] typed updates
-rw-r--r--src/client.rs16
-rw-r--r--src/main.rs105
-rw-r--r--src/update.rs41
3 files changed, 20 insertions, 142 deletions
diff --git a/src/client.rs b/src/client.rs
index 0e215f8..b105311 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -18,7 +18,7 @@ use crossbeam::channel::{
use uuid::Uuid;
use std::collections::HashMap;
use serde_json::Value as JsonValue;
-use crate::update::{ UpdateRouter, Handler };
+use crate::update::Handler;
use pert_types::types::Update;
@@ -62,7 +62,7 @@ pub struct Client {
}
impl Client {
- pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self {
+ pub fn new<H: Handler>(log_opt: Option<i32>, updater: H) -> Self {
if let Some(log) = log_opt {
Tdlib::set_log_verbosity_level(log).ok();
}
@@ -73,10 +73,11 @@ impl Client {
let api_for_listener = api.clone();
let api_for_responder = api.clone();
let sender_for_responder = tx.clone();
+ let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime");
let _run_handle = thread::spawn(
move || OneshotResponder::new(rx, api_for_responder).run(
- updater, Self { sender: sender_for_responder }
+ updater, Self { sender: sender_for_responder }, rt
)
);
let _tg_handle = thread::spawn(
@@ -129,7 +130,7 @@ impl OneshotResponder {
}
}
- fn run(&mut self, updater: UpdateRouter, client: Client) {
+ fn run<H: Handler>(&mut self, updater: H, client: Client, rt: tokio::runtime::Handle) {
loop {
match self.rx.recv() {
Ok(JoinStreams::NewRequest(fut)) => {
@@ -156,7 +157,12 @@ impl OneshotResponder {
Ok(val) => {
let typ = val["@type"].as_str().unwrap();
if typ.starts_with("update") {
- updater.dispatch(&client, val);
+ match serde_json::from_value(val.clone()) {
+ Ok(upd) => { rt.spawn(updater.handle(client.clone(), upd)); },
+ Err(err) => {
+ error!("Could not deser update: {}, was: {}", err, val);
+ }
+ };
} else {
self.handle_response(val);
}
diff --git a/src/main.rs b/src/main.rs
index 0a6b256..69f5ecc 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,16 +6,16 @@ mod client;
//mod auth;
mod update;
//mod message;
-/*
+
struct UpdateHandler;
impl update::Handler for UpdateHandler {
- fn handle(&self, _client: client::Client, req: serde_json::Value) -> futures::future::BoxFuture<'static, ()> {
+ fn handle(&self, _client: client::Client, req: pert_types::types::Update) -> futures::future::BoxFuture<'static, ()> {
Box::pin(async move {
- info!("update: {:#}", req)
+ info!("update: {:#?}", req);
})
}
-}*/
+}
#[tokio::main]
async fn main() {
@@ -25,103 +25,8 @@ async fn main() {
.ok()
.and_then(|var| var.parse().ok());
- let mut updater = update::UpdateRouter::new(
- tokio::runtime::Handle::try_current().expect("Must be in runtime")
- );
-
- let update_auth_state = |tg: client::Client, val: serde_json::Value| async move {
- let cache = env::current_dir().unwrap().join("cache");
- info!("auth update: val: {:#}", val);
- match val.pointer("/authorization_state/@type").unwrap()
- .as_str().unwrap() {
- "authorizationStateWaitTdlibParameters" => {
- let resp = tg.send(&serde_json::json!({
- "@type": "setTdlibParameters",
- "parameters": {
- "use_test_dc": true,
- "api_id": env::var("API_ID").unwrap(),
- "api_hash": env::var("API_HASH").unwrap(),
- "device_model": "mbia",
- "system_version": "Catalina",
- "application_version": "0.1",
- "system_language_code": "en",
- "database_directory": cache.join("database").to_str().unwrap(),
- "use_message_database": false,
- "files_directory": cache.join("files").to_str().unwrap(),
- "use_secret_chats": false,
- },
- })).await;
- info!("settdlib: {}", resp);
- },
- "authorizationStateWaitEncryptionKey" => {
- let resp = tg.send(&serde_json::json!({
- "@type": "setDatabaseEncryptionKey",
- "encryption_key": "sup3rs3cr3t",
- })).await;
- info!("setenckey: {}", resp);
- },
- "authorizationStateWaitPhoneNumber" => {
- let resp = tg.send(&serde_json::json!({
- "@type": "setAuthenticationPhoneNumber",
- "phone_number": "+79859053875",
- "settings": {
- "allow_flash_call": false,
- "is_current_phone_number": false,
- "allow_sms_retriever_api": false,
- }
- })).await;
- info!("setphone: {}", resp);
- },
- "authorizationStateWaitRegistration" => {
- let resp = tg.send(&serde_json::json!({
- "@type": "registerUser",
- "first_name": "John",
- "last_name": "Doe",
- })).await;
- info!("reg: {}", resp);
- },
- "authorizationStateWaitCode" => {
- let code = {
- let mut s = String::new();
- std::io::stdin().read_line(&mut s).expect("Could not read line");
- s
- };
- let resp = tg.send(&serde_json::json!({
- "@type": "checkAuthenticationCode",
- "code": code,
- })).await;
- info!("checkcode: {}", resp);
- },
- auth_state => {
- error!("Unknown auth state update: {} / {:#}", auth_state, val);
- }
- };
- };
-
- updater.add_handler("updateAuthorizationState", update_auth_state);
+ let tg = client::Client::new(tg_log, UpdateHandler{});
- let tg = client::Client::new(tg_log, updater);
- std::thread::sleep(std::time::Duration::new(2, 0));
- let get_chats = tg.send(&serde_json::json!({
- "@type": "getChats",
- "chat_list": {
- "@type": "chatListMain"
- },
- "offset_order": "9223372036854775807",
- "offset_chat_id": "9223372036854775807",
- "limit": 5
- })).await;
- error!("send msg: {}", get_chats);
- let chat = get_chats.pointer("/chat_ids/0").unwrap_or(&serde_json::json!(0)).as_u64().unwrap();
- let get_chat = tg.send(&serde_json::json!({
- "@type": "getChat",
- "chat_id": chat,
- })).await;
- error!("get_chat: {:#}", get_chat);
- let me = tg.send(&serde_json::json!({
- "@type": "getMe",
- })).await;
- error!("me: {:#}", me);
std::thread::sleep(std::time::Duration::new(200, 0));
diff --git a/src/update.rs b/src/update.rs
index b5b0ba7..7f5e28d 100644
--- a/src/update.rs
+++ b/src/update.rs
@@ -1,49 +1,16 @@
-use std::collections::HashMap;
-use serde_json::Value as JsonValue;
use std::future::Future;
use futures::future::BoxFuture;
-use log::{ warn, trace };
use crate::client::Client;
+use pert_types::types::Update;
pub trait Handler: Send + Sync + 'static {
- fn handle(&self, _: Client, _: JsonValue) -> BoxFuture<'static, ()>;
+ fn handle(&self, _: Client, _: Update) -> BoxFuture<'static, ()>;
}
impl<C, F> Handler for C
-where C: Send + Sync + 'static + Fn(Client, JsonValue) -> F,
+where C: Send + Sync + 'static + Fn(Client, Update) -> F,
F: Future<Output = ()> + 'static + Send {
- fn handle(&self, client: Client, req: JsonValue) -> BoxFuture<'static, ()> {
+ fn handle(&self, client: Client, req: Update) -> BoxFuture<'static, ()> {
Box::pin((*self)(client, req))
}
}
-
-pub struct UpdateRouter {
- router: HashMap<String, Box<dyn Handler>>,
- rt: tokio::runtime::Handle,
-}
-
-impl UpdateRouter {
- pub fn new(rt: tokio::runtime::Handle) -> Self {
- Self {
- router: HashMap::new(),
- rt: rt,
- }
- }
-
- pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) {
- self.router.insert(update_type.to_owned(), Box::new(handler));
- }
-
- pub fn dispatch(&self, client: &Client, update: JsonValue) {
- let update_type: &str = update["@type"].as_str().unwrap();
- match self.router.get(update_type) {
- Some(handler) => {
- self.rt.spawn(handler.handle(client.clone(), update));
- },
- None => {
- warn!("no handler for {}", update_type);
- trace!("request was: {}", update);
- },
- }
- }
-}