diff options
Diffstat (limited to 'src/client/responder.rs')
-rw-r--r-- | src/client/responder.rs | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/src/client/responder.rs b/src/client/responder.rs new file mode 100644 index 0000000..87cacb8 --- /dev/null +++ b/src/client/responder.rs @@ -0,0 +1,152 @@ +use std::{collections::HashMap, sync::Arc, task::Waker}; + +use super::Client; +use crate::{raw_ptr::TdPtr, Handler}; + +use super::{JoinStreams, SafeResponse}; +use crossbeam::channel::{Receiver, Sender}; +use log::{error, trace, warn}; +use serde_json::Value as JsonValue; + +/// Oneshot means it forgets any information about particular request +/// when receives response: +/// it once stores waker of the future; +/// when response arrives, it stores response and wakes the waker once, +/// dropping waker. +#[derive(Debug)] +pub(crate) struct OneshotResponder<H: Handler> { + api: Arc<TdPtr>, + wakers_map: HashMap<u64, SafeResponse>, + rx: Receiver<JoinStreams>, + + /// sequential id to be used as a unique identifier of request + /// used to match request with response + next_id: u64, + + updater: H, + client: Client, + rt: tokio::runtime::Handle, +} + +#[derive(Debug)] +enum TgResponseType { + Update, + Error, + Response, +} + +impl TgResponseType { + fn from_type<'a>(type_: &'a str) -> Self { + if type_.starts_with("update") { + Self::Update + } else if type_ == "error" { + Self::Error + } else { + Self::Response + } + } +} + +impl<H: Handler> OneshotResponder<H> { + pub(crate) fn new( + rx: Receiver<JoinStreams>, + api: Arc<TdPtr>, + updater: H, + tx: Sender<JoinStreams>, + rt: tokio::runtime::Handle, + ) -> Self { + Self { + api, + wakers_map: HashMap::new(), + rx, + next_id: 0, + updater, + client: Client { sender: tx }, + rt, + } + } + + pub(crate) fn run(&mut self) { + loop { + match self.rx.recv() { + Ok(JoinStreams::NewRequest((mut request, fut_ref))) => { + let id = self.next_id; + self.next_id += 1; + if !request["@extra"].is_null() { + warn!("overwriting @extra in request"); + } + request["@extra"] = id.into(); + self.api.send(request.to_string().as_ref()).ok(); + self.wakers_map.insert(id, fut_ref); + trace!("new req:\n{:#}", request); + } + Ok(JoinStreams::NewResponse(resp)) => { + match serde_json::from_str::<JsonValue>(&resp) { + Ok(val) => { + use crate::value_ext::ValueExt; + let type_ = val.get_type().map(TgResponseType::from_type); + match type_ { + Ok(TgResponseType::Update) => { + self.rt + .spawn(self.updater.handle_json(self.client.clone(), val)); + } + Ok(TgResponseType::Response) => { + self.handle_response(val); + } + Ok(TgResponseType::Error) => { + self.handle_error(val); + } + Err(e) => { + error!("response has invalid @type: {}. Be aware, that this could lock execution flow", e); + } + } + } + Err(e) => { + warn!("ignoring invalid response. err: {}, resp: {}", e, resp); + } + } + } + Err(e) => { + error!("stream closed: {}", e); + error!("closing responder thread"); + // this will return from function and effectively end thread + break; + } + } + } + } + + fn handle_error(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } + + fn handle_response(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } +} |