diff options
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..db46819 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,187 @@ +use log::{ info, error, warn, trace }; +use rust_tdlib::Tdlib; +use std::{ + task::{ Waker, Context, Poll }, + future::Future, + pin::Pin, + sync::{ + mpsc::{ Sender, Receiver, channel }, + Arc, + Mutex + }, + thread, +}; +use uuid::Uuid; +use std::collections::HashMap; +use json::JsonValue; + + +#[derive(Debug)] +pub struct RequestData { + req: JsonValue, + resp: Option<JsonValue>, + waker: Option<Waker>, +} + +#[derive(Debug, Clone)] +pub struct RequestFuture { + data: Arc<Mutex<RequestData>> +} + +impl Future for RequestFuture { + type Output = JsonValue; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let mut data = self.data.lock().unwrap(); + if let Some(resp) = &data.resp { + Poll::Ready(resp.clone()) + } else { + data.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +enum JoinStreams { + NewRequest(RequestFuture), + NewResponse(String), +} + +#[derive(Debug)] +pub struct Client { + //waker_handle: std::thread::JoinHandle<()>, + sender: Sender<JoinStreams>, +} + +impl Client { + pub fn new(log_opt: Option<i32>) -> Self { + if let Some(log) = log_opt { + Tdlib::set_log_verbosity_level(log).ok(); + } + let (tx, rx) = channel(); + let tx_for_tg = tx.clone(); + let api = Arc::new(Tdlib::new()); + let api_for_listener = api.clone(); + let api_for_responder = api.clone(); + let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already"); + let _run_handle = thread::spawn( + move || OneshotResponder::new(rx, api_for_responder).run(rt) + ); + let _tg_handle = thread::spawn( + move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0) + ); + + Self { + //waker_handle: _run_handle, + sender: tx + } + } + + pub fn send(&mut self, req: &JsonValue) -> RequestFuture { + let request = RequestData { + req: req.to_owned(), + resp: None, + waker: None + }; + let fut = RequestFuture { + data: Arc::new(Mutex::new(request)) + }; + + self.sender.send(JoinStreams::NewRequest(fut.clone())).unwrap(); + fut + } + + fn listen_tg(tx: Sender<JoinStreams>, api: Arc<Tdlib>, timeout: f64) { + loop { + if let Some(msg) = api.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + info!("receive timed out"); + } + } + } +} + +#[derive(Debug)] +struct OneshotResponder { + api: Arc<Tdlib>, + wakers_map: HashMap<Uuid, RequestFuture>, + rx: Receiver<JoinStreams>, +} + +impl OneshotResponder { + fn new(rx: Receiver<JoinStreams>, api: Arc<Tdlib>) -> Self { + Self { + api: api, + wakers_map: HashMap::new(), + rx: rx + } + } + + fn run(&mut self, rt: tokio::runtime::Handle) { + let mut updater = crate::update::UpdateRouter::new(rt); + updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } }); + loop { + match self.rx.recv() { + Ok(JoinStreams::NewRequest(fut)) => { + let id = loop { + let id = Uuid::new_v4(); + if self.wakers_map.contains_key(&id) { + continue; + } else { + break id; + } + }; + let data = fut.data.clone(); + let request: &mut JsonValue = &mut data.lock().unwrap().req; + if request.has_key("@extra") { + warn!("overwriting @extra in request"); + } + request["@extra"] = JsonValue::from(id.to_hyphenated().to_string()); + self.api.send(request.dump().as_ref()); + self.wakers_map.insert(id, fut); + trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2)); + }, + Ok(JoinStreams::NewResponse(resp)) => { + match json::parse(resp.as_ref()) { + Ok(val) => { + trace!("received update: {}", val); + let typ = val["@type"].as_str().unwrap(); + if typ.starts_with("update") { + updater.dispatch(val); + } else { + self.handle_response(val); + } + }, + Err(e) => { + warn!("ignoring invalid response. err: {}, resp: {}", e, resp); + } + } + }, + Err(e) => { + error!("stream closed: {}", e); + error!("closing thread"); + break; + } + } + } + } + + fn handle_response(&mut self, resp: JsonValue) { + if let Some(id_str) = resp["@extra"].as_str() { + if let Ok(id) = Uuid::parse_str(id_str) { + let fut_data = self.wakers_map + .remove(&id) + .unwrap() + .data; + fut_data.lock().unwrap().resp = Some(resp); + fut_data.lock().unwrap() + .waker.as_ref() + .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) }); + } + } else { + warn!("update has invalid @extra: {}", resp); + } + } +} |