diff options
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | src/client.rs | 55 | ||||
-rw-r--r-- | src/main.rs | 138 | ||||
-rw-r--r-- | src/update.rs | 29 |
4 files changed, 161 insertions, 67 deletions
@@ -8,8 +8,10 @@ edition = "2018" [dependencies] "rust-tdlib" = { path = "rust-tdlib" } -# serde = { version = "1", features = ["derive"] } -json = "0.12" +"pert-types" = { path = "types" } +#serde = { version = "1", features = ["derive"] } +serde_json = "1" +crossbeam = "0.7" dotenv = "0.15" log = "0.4" env_logger = "0.7" diff --git a/src/client.rs b/src/client.rs index db46819..0e215f8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,15 +5,22 @@ use std::{ future::Future, pin::Pin, sync::{ - mpsc::{ Sender, Receiver, channel }, Arc, Mutex }, thread, }; +use crossbeam::channel::{ + self, + Sender, + Receiver, +}; use uuid::Uuid; use std::collections::HashMap; -use json::JsonValue; +use serde_json::Value as JsonValue; +use crate::update::{ UpdateRouter, Handler }; +use pert_types::types::Update; + #[derive(Debug)] @@ -48,37 +55,40 @@ enum JoinStreams { NewResponse(String), } -#[derive(Debug)] +#[derive(Clone)] pub struct Client { //waker_handle: std::thread::JoinHandle<()>, sender: Sender<JoinStreams>, } impl Client { - pub fn new(log_opt: Option<i32>) -> Self { + pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self { if let Some(log) = log_opt { Tdlib::set_log_verbosity_level(log).ok(); } - let (tx, rx) = channel(); + let (tx, rx) = channel::unbounded(); + let tx_for_tg = tx.clone(); let api = Arc::new(Tdlib::new()); let api_for_listener = api.clone(); let api_for_responder = api.clone(); - let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already"); + let sender_for_responder = tx.clone(); + let _run_handle = thread::spawn( - move || OneshotResponder::new(rx, api_for_responder).run(rt) + move || OneshotResponder::new(rx, api_for_responder).run( + updater, Self { sender: sender_for_responder } + ) ); let _tg_handle = thread::spawn( move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0) ); - Self { //waker_handle: _run_handle, - sender: tx + sender: tx, } } - pub fn send(&mut self, req: &JsonValue) -> RequestFuture { + pub fn send(&self, req: &JsonValue) -> RequestFuture { let request = RequestData { req: req.to_owned(), resp: None, @@ -119,9 +129,7 @@ impl OneshotResponder { } } - fn run(&mut self, rt: tokio::runtime::Handle) { - let mut updater = crate::update::UpdateRouter::new(rt); - updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } }); + fn run(&mut self, updater: UpdateRouter, client: Client) { loop { match self.rx.recv() { Ok(JoinStreams::NewRequest(fut)) => { @@ -135,21 +143,20 @@ impl OneshotResponder { }; let data = fut.data.clone(); let request: &mut JsonValue = &mut data.lock().unwrap().req; - if request.has_key("@extra") { + if !request["@extra"].is_null() { warn!("overwriting @extra in request"); } - request["@extra"] = JsonValue::from(id.to_hyphenated().to_string()); - self.api.send(request.dump().as_ref()); + request["@extra"] = id.to_hyphenated().to_string().into(); + self.api.send(request.to_string().as_ref()); self.wakers_map.insert(id, fut); - trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2)); + trace!("new req:\n{:#}", request); }, Ok(JoinStreams::NewResponse(resp)) => { - match json::parse(resp.as_ref()) { + match serde_json::from_str::<JsonValue>(resp.as_ref()) { Ok(val) => { - trace!("received update: {}", val); let typ = val["@type"].as_str().unwrap(); if typ.starts_with("update") { - updater.dispatch(val); + updater.dispatch(&client, val); } else { self.handle_response(val); } @@ -171,12 +178,14 @@ impl OneshotResponder { fn handle_response(&mut self, resp: JsonValue) { if let Some(id_str) = resp["@extra"].as_str() { if let Ok(id) = Uuid::parse_str(id_str) { - let fut_data = self.wakers_map + let fut_extracted = self.wakers_map .remove(&id) .unwrap() .data; - fut_data.lock().unwrap().resp = Some(resp); - fut_data.lock().unwrap() + + let mut fut_data = fut_extracted.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data .waker.as_ref() .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) }); } diff --git a/src/main.rs b/src/main.rs index 289ad02..0a6b256 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,21 @@ use std::env; use tokio; +use log::{ info, error }; mod client; //mod auth; mod update; //mod message; +/* +struct UpdateHandler; + +impl update::Handler for UpdateHandler { + fn handle(&self, _client: client::Client, req: serde_json::Value) -> futures::future::BoxFuture<'static, ()> { + Box::pin(async move { + info!("update: {:#}", req) + }) + } +}*/ #[tokio::main] async fn main() { @@ -14,41 +25,104 @@ async fn main() { .ok() .and_then(|var| var.parse().ok()); - let mut tg = client::Client::new(tg_log); + let mut updater = update::UpdateRouter::new( + tokio::runtime::Handle::try_current().expect("Must be in runtime") + ); - tokio::spawn(async move { - let _auth_state = tg.send( - &json::object!{ - "@type": "getAuthorizationState" - } - ).await; + let update_auth_state = |tg: client::Client, val: serde_json::Value| async move { let cache = env::current_dir().unwrap().join("cache"); - let resp = tg.send(&json::object!{ - "@type": "setTdlibParameters", - "parameters": { - "use_test_dc": false, - "api_id": env::var("API_ID").unwrap(), - "api_hash": env::var("API_HASH").unwrap(), - "device_model": "mbia", - "system_version": "Catalina", - "application_version": "0.1", - "system_language_code": "en", - "database_directory": cache.join("database").to_str().unwrap(), - "use_message_database": false, - "files_directory": cache.join("files").to_str().unwrap(), - "use_secret_chats": false, + info!("auth update: val: {:#}", val); + match val.pointer("/authorization_state/@type").unwrap() + .as_str().unwrap() { + "authorizationStateWaitTdlibParameters" => { + let resp = tg.send(&serde_json::json!({ + "@type": "setTdlibParameters", + "parameters": { + "use_test_dc": true, + "api_id": env::var("API_ID").unwrap(), + "api_hash": env::var("API_HASH").unwrap(), + "device_model": "mbia", + "system_version": "Catalina", + "application_version": "0.1", + "system_language_code": "en", + "database_directory": cache.join("database").to_str().unwrap(), + "use_message_database": false, + "files_directory": cache.join("files").to_str().unwrap(), + "use_secret_chats": false, + }, + })).await; + info!("settdlib: {}", resp); }, - }).await; - println!("resp: {}", resp); - }); - std::thread::sleep(std::time::Duration::new(1, 0)); -} + "authorizationStateWaitEncryptionKey" => { + let resp = tg.send(&serde_json::json!({ + "@type": "setDatabaseEncryptionKey", + "encryption_key": "sup3rs3cr3t", + })).await; + info!("setenckey: {}", resp); + }, + "authorizationStateWaitPhoneNumber" => { + let resp = tg.send(&serde_json::json!({ + "@type": "setAuthenticationPhoneNumber", + "phone_number": "+79859053875", + "settings": { + "allow_flash_call": false, + "is_current_phone_number": false, + "allow_sms_retriever_api": false, + } + })).await; + info!("setphone: {}", resp); + }, + "authorizationStateWaitRegistration" => { + let resp = tg.send(&serde_json::json!({ + "@type": "registerUser", + "first_name": "John", + "last_name": "Doe", + })).await; + info!("reg: {}", resp); + }, + "authorizationStateWaitCode" => { + let code = { + let mut s = String::new(); + std::io::stdin().read_line(&mut s).expect("Could not read line"); + s + }; + let resp = tg.send(&serde_json::json!({ + "@type": "checkAuthenticationCode", + "code": code, + })).await; + info!("checkcode: {}", resp); + }, + auth_state => { + error!("Unknown auth state update: {} / {:#}", auth_state, val); + } + }; + }; + updater.add_handler("updateAuthorizationState", update_auth_state); -/* -fn main() { - runtime::Runtime::new(|arc_msg, tx| { - tx.send(std::sync::Arc::new(runtime::Task{})).unwrap(); - }).run(); + let tg = client::Client::new(tg_log, updater); + std::thread::sleep(std::time::Duration::new(2, 0)); + let get_chats = tg.send(&serde_json::json!({ + "@type": "getChats", + "chat_list": { + "@type": "chatListMain" + }, + "offset_order": "9223372036854775807", + "offset_chat_id": "9223372036854775807", + "limit": 5 + })).await; + error!("send msg: {}", get_chats); + let chat = get_chats.pointer("/chat_ids/0").unwrap_or(&serde_json::json!(0)).as_u64().unwrap(); + let get_chat = tg.send(&serde_json::json!({ + "@type": "getChat", + "chat_id": chat, + })).await; + error!("get_chat: {:#}", get_chat); + let me = tg.send(&serde_json::json!({ + "@type": "getMe", + })).await; + error!("me: {:#}", me); + + + std::thread::sleep(std::time::Duration::new(200, 0)); } -*/ diff --git a/src/update.rs b/src/update.rs index a6884e3..b5b0ba7 100644 --- a/src/update.rs +++ b/src/update.rs @@ -1,17 +1,19 @@ use std::collections::HashMap; -use json::JsonValue; +use serde_json::Value as JsonValue; use std::future::Future; use futures::future::BoxFuture; +use log::{ warn, trace }; +use crate::client::Client; pub trait Handler: Send + Sync + 'static { - fn handle(&self, _: JsonValue) -> BoxFuture<'static, ()>; + fn handle(&self, _: Client, _: JsonValue) -> BoxFuture<'static, ()>; } impl<C, F> Handler for C -where C: Send + Sync + 'static + Fn(JsonValue) -> F, +where C: Send + Sync + 'static + Fn(Client, JsonValue) -> F, F: Future<Output = ()> + 'static + Send { - fn handle(&self, req: JsonValue) -> BoxFuture<'static, ()> { - Box::pin((*self)(req)) + fn handle(&self, client: Client, req: JsonValue) -> BoxFuture<'static, ()> { + Box::pin((*self)(client, req)) } } @@ -24,17 +26,24 @@ impl UpdateRouter { pub fn new(rt: tokio::runtime::Handle) -> Self { Self { router: HashMap::new(), - rt: rt + rt: rt, } } + pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) { self.router.insert(update_type.to_owned(), Box::new(handler)); } - pub fn dispatch(&self, update: JsonValue) { + pub fn dispatch(&self, client: &Client, update: JsonValue) { let update_type: &str = update["@type"].as_str().unwrap(); - self.router.get(update_type) - .and_then(|handler| { self.rt.spawn(handler.handle(update)); Some(()) }) - .or_else(|| { println!("handler not found"); Some(()) }); + match self.router.get(update_type) { + Some(handler) => { + self.rt.spawn(handler.handle(client.clone(), update)); + }, + None => { + warn!("no handler for {}", update_type); + trace!("request was: {}", update); + }, + } } } |