diff options
author | syn <isaqtm@gmail.com> | 2020-05-25 19:19:26 +0300 |
---|---|---|
committer | syn <isaqtm@gmail.com> | 2020-05-25 19:19:26 +0300 |
commit | b6efc9b39a367da829e84af581387733600c08d7 (patch) | |
tree | f1c7d77b536c706a56eafc73a4b8fa6daf3b0938 /src | |
parent | ca66ae302745a0e3285fd260a528e17b21065743 (diff) | |
download | pert-b6efc9b39a367da829e84af581387733600c08d7.tar.gz |
[move to typing] typed updates
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 16 | ||||
-rw-r--r-- | src/main.rs | 105 | ||||
-rw-r--r-- | src/update.rs | 41 |
3 files changed, 20 insertions, 142 deletions
diff --git a/src/client.rs b/src/client.rs index 0e215f8..b105311 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,7 +18,7 @@ use crossbeam::channel::{ use uuid::Uuid; use std::collections::HashMap; use serde_json::Value as JsonValue; -use crate::update::{ UpdateRouter, Handler }; +use crate::update::Handler; use pert_types::types::Update; @@ -62,7 +62,7 @@ pub struct Client { } impl Client { - pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self { + pub fn new<H: Handler>(log_opt: Option<i32>, updater: H) -> Self { if let Some(log) = log_opt { Tdlib::set_log_verbosity_level(log).ok(); } @@ -73,10 +73,11 @@ impl Client { let api_for_listener = api.clone(); let api_for_responder = api.clone(); let sender_for_responder = tx.clone(); + let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime"); let _run_handle = thread::spawn( move || OneshotResponder::new(rx, api_for_responder).run( - updater, Self { sender: sender_for_responder } + updater, Self { sender: sender_for_responder }, rt ) ); let _tg_handle = thread::spawn( @@ -129,7 +130,7 @@ impl OneshotResponder { } } - fn run(&mut self, updater: UpdateRouter, client: Client) { + fn run<H: Handler>(&mut self, updater: H, client: Client, rt: tokio::runtime::Handle) { loop { match self.rx.recv() { Ok(JoinStreams::NewRequest(fut)) => { @@ -156,7 +157,12 @@ impl OneshotResponder { Ok(val) => { let typ = val["@type"].as_str().unwrap(); if typ.starts_with("update") { - updater.dispatch(&client, val); + match serde_json::from_value(val.clone()) { + Ok(upd) => { rt.spawn(updater.handle(client.clone(), upd)); }, + Err(err) => { + error!("Could not deser update: {}, was: {}", err, val); + } + }; } else { self.handle_response(val); } diff --git a/src/main.rs b/src/main.rs index 0a6b256..69f5ecc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,16 +6,16 @@ mod client; //mod auth; mod update; //mod message; -/* + struct UpdateHandler; impl update::Handler for UpdateHandler { - fn handle(&self, _client: client::Client, req: serde_json::Value) -> futures::future::BoxFuture<'static, ()> { + fn handle(&self, _client: client::Client, req: pert_types::types::Update) -> futures::future::BoxFuture<'static, ()> { Box::pin(async move { - info!("update: {:#}", req) + info!("update: {:#?}", req); }) } -}*/ +} #[tokio::main] async fn main() { @@ -25,103 +25,8 @@ async fn main() { .ok() .and_then(|var| var.parse().ok()); - let mut updater = update::UpdateRouter::new( - tokio::runtime::Handle::try_current().expect("Must be in runtime") - ); - - let update_auth_state = |tg: client::Client, val: serde_json::Value| async move { - let cache = env::current_dir().unwrap().join("cache"); - info!("auth update: val: {:#}", val); - match val.pointer("/authorization_state/@type").unwrap() - .as_str().unwrap() { - "authorizationStateWaitTdlibParameters" => { - let resp = tg.send(&serde_json::json!({ - "@type": "setTdlibParameters", - "parameters": { - "use_test_dc": true, - "api_id": env::var("API_ID").unwrap(), - "api_hash": env::var("API_HASH").unwrap(), - "device_model": "mbia", - "system_version": "Catalina", - "application_version": "0.1", - "system_language_code": "en", - "database_directory": cache.join("database").to_str().unwrap(), - "use_message_database": false, - "files_directory": cache.join("files").to_str().unwrap(), - "use_secret_chats": false, - }, - })).await; - info!("settdlib: {}", resp); - }, - "authorizationStateWaitEncryptionKey" => { - let resp = tg.send(&serde_json::json!({ - "@type": "setDatabaseEncryptionKey", - "encryption_key": "sup3rs3cr3t", - })).await; - info!("setenckey: {}", resp); - }, - "authorizationStateWaitPhoneNumber" => { - let resp = tg.send(&serde_json::json!({ - "@type": "setAuthenticationPhoneNumber", - "phone_number": "+79859053875", - "settings": { - "allow_flash_call": false, - "is_current_phone_number": false, - "allow_sms_retriever_api": false, - } - })).await; - info!("setphone: {}", resp); - }, - "authorizationStateWaitRegistration" => { - let resp = tg.send(&serde_json::json!({ - "@type": "registerUser", - "first_name": "John", - "last_name": "Doe", - })).await; - info!("reg: {}", resp); - }, - "authorizationStateWaitCode" => { - let code = { - let mut s = String::new(); - std::io::stdin().read_line(&mut s).expect("Could not read line"); - s - }; - let resp = tg.send(&serde_json::json!({ - "@type": "checkAuthenticationCode", - "code": code, - })).await; - info!("checkcode: {}", resp); - }, - auth_state => { - error!("Unknown auth state update: {} / {:#}", auth_state, val); - } - }; - }; - - updater.add_handler("updateAuthorizationState", update_auth_state); + let tg = client::Client::new(tg_log, UpdateHandler{}); - let tg = client::Client::new(tg_log, updater); - std::thread::sleep(std::time::Duration::new(2, 0)); - let get_chats = tg.send(&serde_json::json!({ - "@type": "getChats", - "chat_list": { - "@type": "chatListMain" - }, - "offset_order": "9223372036854775807", - "offset_chat_id": "9223372036854775807", - "limit": 5 - })).await; - error!("send msg: {}", get_chats); - let chat = get_chats.pointer("/chat_ids/0").unwrap_or(&serde_json::json!(0)).as_u64().unwrap(); - let get_chat = tg.send(&serde_json::json!({ - "@type": "getChat", - "chat_id": chat, - })).await; - error!("get_chat: {:#}", get_chat); - let me = tg.send(&serde_json::json!({ - "@type": "getMe", - })).await; - error!("me: {:#}", me); std::thread::sleep(std::time::Duration::new(200, 0)); diff --git a/src/update.rs b/src/update.rs index b5b0ba7..7f5e28d 100644 --- a/src/update.rs +++ b/src/update.rs @@ -1,49 +1,16 @@ -use std::collections::HashMap; -use serde_json::Value as JsonValue; use std::future::Future; use futures::future::BoxFuture; -use log::{ warn, trace }; use crate::client::Client; +use pert_types::types::Update; pub trait Handler: Send + Sync + 'static { - fn handle(&self, _: Client, _: JsonValue) -> BoxFuture<'static, ()>; + fn handle(&self, _: Client, _: Update) -> BoxFuture<'static, ()>; } impl<C, F> Handler for C -where C: Send + Sync + 'static + Fn(Client, JsonValue) -> F, +where C: Send + Sync + 'static + Fn(Client, Update) -> F, F: Future<Output = ()> + 'static + Send { - fn handle(&self, client: Client, req: JsonValue) -> BoxFuture<'static, ()> { + fn handle(&self, client: Client, req: Update) -> BoxFuture<'static, ()> { Box::pin((*self)(client, req)) } } - -pub struct UpdateRouter { - router: HashMap<String, Box<dyn Handler>>, - rt: tokio::runtime::Handle, -} - -impl UpdateRouter { - pub fn new(rt: tokio::runtime::Handle) -> Self { - Self { - router: HashMap::new(), - rt: rt, - } - } - - pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) { - self.router.insert(update_type.to_owned(), Box::new(handler)); - } - - pub fn dispatch(&self, client: &Client, update: JsonValue) { - let update_type: &str = update["@type"].as_str().unwrap(); - match self.router.get(update_type) { - Some(handler) => { - self.rt.spawn(handler.handle(client.clone(), update)); - }, - None => { - warn!("no handler for {}", update_type); - trace!("request was: {}", update); - }, - } - } -} |