use log::{ info, error, warn, trace }; use rust_tdlib::Tdlib; use std::{ task::{ Waker, Context, Poll }, future::Future, pin::Pin, sync::{ mpsc::{ Sender, Receiver, channel }, Arc, Mutex }, thread, }; use uuid::Uuid; use std::collections::HashMap; use json::JsonValue; #[derive(Debug)] pub struct RequestData { req: JsonValue, resp: Option, waker: Option, } #[derive(Debug, Clone)] pub struct RequestFuture { data: Arc> } impl Future for RequestFuture { type Output = JsonValue; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut data = self.data.lock().unwrap(); if let Some(resp) = &data.resp { Poll::Ready(resp.clone()) } else { data.waker = Some(cx.waker().clone()); Poll::Pending } } } #[derive(Debug)] enum JoinStreams { NewRequest(RequestFuture), NewResponse(String), } #[derive(Debug)] pub struct Client { //waker_handle: std::thread::JoinHandle<()>, sender: Sender, } impl Client { pub fn new(log_opt: Option) -> Self { if let Some(log) = log_opt { Tdlib::set_log_verbosity_level(log).ok(); } let (tx, rx) = channel(); let tx_for_tg = tx.clone(); let api = Arc::new(Tdlib::new()); let api_for_listener = api.clone(); let api_for_responder = api.clone(); let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already"); let _run_handle = thread::spawn( move || OneshotResponder::new(rx, api_for_responder).run(rt) ); let _tg_handle = thread::spawn( move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0) ); Self { //waker_handle: _run_handle, sender: tx } } pub fn send(&mut self, req: &JsonValue) -> RequestFuture { let request = RequestData { req: req.to_owned(), resp: None, waker: None }; let fut = RequestFuture { data: Arc::new(Mutex::new(request)) }; self.sender.send(JoinStreams::NewRequest(fut.clone())).unwrap(); fut } fn listen_tg(tx: Sender, api: Arc, timeout: f64) { loop { if let Some(msg) = api.receive(timeout) { tx.send(JoinStreams::NewResponse(msg)).unwrap(); } else { info!("receive timed out"); } } } } #[derive(Debug)] struct OneshotResponder { api: Arc, wakers_map: HashMap, rx: Receiver, } impl OneshotResponder { fn new(rx: Receiver, api: Arc) -> Self { Self { api: api, wakers_map: HashMap::new(), rx: rx } } fn run(&mut self, rt: tokio::runtime::Handle) { let mut updater = crate::update::UpdateRouter::new(rt); updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } }); loop { match self.rx.recv() { Ok(JoinStreams::NewRequest(fut)) => { let id = loop { let id = Uuid::new_v4(); if self.wakers_map.contains_key(&id) { continue; } else { break id; } }; let data = fut.data.clone(); let request: &mut JsonValue = &mut data.lock().unwrap().req; if request.has_key("@extra") { warn!("overwriting @extra in request"); } request["@extra"] = JsonValue::from(id.to_hyphenated().to_string()); self.api.send(request.dump().as_ref()); self.wakers_map.insert(id, fut); trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2)); }, Ok(JoinStreams::NewResponse(resp)) => { match json::parse(resp.as_ref()) { Ok(val) => { trace!("received update: {}", val); let typ = val["@type"].as_str().unwrap(); if typ.starts_with("update") { updater.dispatch(val); } else { self.handle_response(val); } }, Err(e) => { warn!("ignoring invalid response. err: {}, resp: {}", e, resp); } } }, Err(e) => { error!("stream closed: {}", e); error!("closing thread"); break; } } } } fn handle_response(&mut self, resp: JsonValue) { if let Some(id_str) = resp["@extra"].as_str() { if let Ok(id) = Uuid::parse_str(id_str) { let fut_data = self.wakers_map .remove(&id) .unwrap() .data; fut_data.lock().unwrap().resp = Some(resp); fut_data.lock().unwrap() .waker.as_ref() .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) }); } } else { warn!("update has invalid @extra: {}", resp); } } }