diff options
author | syn <isaqtm@gmail.com> | 2020-05-19 20:54:45 +0300 |
---|---|---|
committer | syn <isaqtm@gmail.com> | 2020-05-19 20:54:45 +0300 |
commit | c89b1acb6e5d5755dc79c1f5643c915624a4c4c3 (patch) | |
tree | 2b51d52df177db1b14eb6666236a131356e5452f /src | |
download | pert-c89b1acb6e5d5755dc79c1f5643c915624a4c4c3.tar.gz |
Some kind of working code
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 187 | ||||
-rw-r--r-- | src/main.rs | 54 | ||||
-rw-r--r-- | src/update.rs | 40 |
3 files changed, 281 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..db46819 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,187 @@ +use log::{ info, error, warn, trace }; +use rust_tdlib::Tdlib; +use std::{ + task::{ Waker, Context, Poll }, + future::Future, + pin::Pin, + sync::{ + mpsc::{ Sender, Receiver, channel }, + Arc, + Mutex + }, + thread, +}; +use uuid::Uuid; +use std::collections::HashMap; +use json::JsonValue; + + +#[derive(Debug)] +pub struct RequestData { + req: JsonValue, + resp: Option<JsonValue>, + waker: Option<Waker>, +} + +#[derive(Debug, Clone)] +pub struct RequestFuture { + data: Arc<Mutex<RequestData>> +} + +impl Future for RequestFuture { + type Output = JsonValue; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let mut data = self.data.lock().unwrap(); + if let Some(resp) = &data.resp { + Poll::Ready(resp.clone()) + } else { + data.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +enum JoinStreams { + NewRequest(RequestFuture), + NewResponse(String), +} + +#[derive(Debug)] +pub struct Client { + //waker_handle: std::thread::JoinHandle<()>, + sender: Sender<JoinStreams>, +} + +impl Client { + pub fn new(log_opt: Option<i32>) -> Self { + if let Some(log) = log_opt { + Tdlib::set_log_verbosity_level(log).ok(); + } + let (tx, rx) = channel(); + let tx_for_tg = tx.clone(); + let api = Arc::new(Tdlib::new()); + let api_for_listener = api.clone(); + let api_for_responder = api.clone(); + let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already"); + let _run_handle = thread::spawn( + move || OneshotResponder::new(rx, api_for_responder).run(rt) + ); + let _tg_handle = thread::spawn( + move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0) + ); + + Self { + //waker_handle: _run_handle, + sender: tx + } + } + + pub fn send(&mut self, req: &JsonValue) -> RequestFuture { + let request = RequestData { + req: req.to_owned(), + resp: None, + waker: None + }; + let fut = RequestFuture { + data: Arc::new(Mutex::new(request)) + }; + + self.sender.send(JoinStreams::NewRequest(fut.clone())).unwrap(); + fut + } + + fn listen_tg(tx: Sender<JoinStreams>, api: Arc<Tdlib>, timeout: f64) { + loop { + if let Some(msg) = api.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + info!("receive timed out"); + } + } + } +} + +#[derive(Debug)] +struct OneshotResponder { + api: Arc<Tdlib>, + wakers_map: HashMap<Uuid, RequestFuture>, + rx: Receiver<JoinStreams>, +} + +impl OneshotResponder { + fn new(rx: Receiver<JoinStreams>, api: Arc<Tdlib>) -> Self { + Self { + api: api, + wakers_map: HashMap::new(), + rx: rx + } + } + + fn run(&mut self, rt: tokio::runtime::Handle) { + let mut updater = crate::update::UpdateRouter::new(rt); + updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } }); + loop { + match self.rx.recv() { + Ok(JoinStreams::NewRequest(fut)) => { + let id = loop { + let id = Uuid::new_v4(); + if self.wakers_map.contains_key(&id) { + continue; + } else { + break id; + } + }; + let data = fut.data.clone(); + let request: &mut JsonValue = &mut data.lock().unwrap().req; + if request.has_key("@extra") { + warn!("overwriting @extra in request"); + } + request["@extra"] = JsonValue::from(id.to_hyphenated().to_string()); + self.api.send(request.dump().as_ref()); + self.wakers_map.insert(id, fut); + trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2)); + }, + Ok(JoinStreams::NewResponse(resp)) => { + match json::parse(resp.as_ref()) { + Ok(val) => { + trace!("received update: {}", val); + let typ = val["@type"].as_str().unwrap(); + if typ.starts_with("update") { + updater.dispatch(val); + } else { + self.handle_response(val); + } + }, + Err(e) => { + warn!("ignoring invalid response. err: {}, resp: {}", e, resp); + } + } + }, + Err(e) => { + error!("stream closed: {}", e); + error!("closing thread"); + break; + } + } + } + } + + fn handle_response(&mut self, resp: JsonValue) { + if let Some(id_str) = resp["@extra"].as_str() { + if let Ok(id) = Uuid::parse_str(id_str) { + let fut_data = self.wakers_map + .remove(&id) + .unwrap() + .data; + fut_data.lock().unwrap().resp = Some(resp); + fut_data.lock().unwrap() + .waker.as_ref() + .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) }); + } + } else { + warn!("update has invalid @extra: {}", resp); + } + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..289ad02 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,54 @@ +use std::env; +use tokio; + +mod client; +//mod auth; +mod update; +//mod message; + +#[tokio::main] +async fn main() { + dotenv::dotenv().ok(); + env_logger::init(); + let tg_log: Option<i32> = env::var("TG_LOG") + .ok() + .and_then(|var| var.parse().ok()); + + let mut tg = client::Client::new(tg_log); + + tokio::spawn(async move { + let _auth_state = tg.send( + &json::object!{ + "@type": "getAuthorizationState" + } + ).await; + let cache = env::current_dir().unwrap().join("cache"); + let resp = tg.send(&json::object!{ + "@type": "setTdlibParameters", + "parameters": { + "use_test_dc": false, + "api_id": env::var("API_ID").unwrap(), + "api_hash": env::var("API_HASH").unwrap(), + "device_model": "mbia", + "system_version": "Catalina", + "application_version": "0.1", + "system_language_code": "en", + "database_directory": cache.join("database").to_str().unwrap(), + "use_message_database": false, + "files_directory": cache.join("files").to_str().unwrap(), + "use_secret_chats": false, + }, + }).await; + println!("resp: {}", resp); + }); + std::thread::sleep(std::time::Duration::new(1, 0)); +} + + +/* +fn main() { + runtime::Runtime::new(|arc_msg, tx| { + tx.send(std::sync::Arc::new(runtime::Task{})).unwrap(); + }).run(); +} +*/ diff --git a/src/update.rs b/src/update.rs new file mode 100644 index 0000000..a6884e3 --- /dev/null +++ b/src/update.rs @@ -0,0 +1,40 @@ +use std::collections::HashMap; +use json::JsonValue; +use std::future::Future; +use futures::future::BoxFuture; + +pub trait Handler: Send + Sync + 'static { + fn handle(&self, _: JsonValue) -> BoxFuture<'static, ()>; +} + +impl<C, F> Handler for C +where C: Send + Sync + 'static + Fn(JsonValue) -> F, + F: Future<Output = ()> + 'static + Send { + fn handle(&self, req: JsonValue) -> BoxFuture<'static, ()> { + Box::pin((*self)(req)) + } +} + +pub struct UpdateRouter { + router: HashMap<String, Box<dyn Handler>>, + rt: tokio::runtime::Handle, +} + +impl UpdateRouter { + pub fn new(rt: tokio::runtime::Handle) -> Self { + Self { + router: HashMap::new(), + rt: rt + } + } + pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) { + self.router.insert(update_type.to_owned(), Box::new(handler)); + } + + pub fn dispatch(&self, update: JsonValue) { + let update_type: &str = update["@type"].as_str().unwrap(); + self.router.get(update_type) + .and_then(|handler| { self.rt.spawn(handler.handle(update)); Some(()) }) + .or_else(|| { println!("handler not found"); Some(()) }); + } +} |