use std::{collections::HashMap, sync::Arc, task::Waker}; use super::Client; use crate::{raw_ptr::TdPtr, Handler}; use super::{JoinStreams, SafeResponse}; use crossbeam::channel::{Receiver, Sender}; use log::{error, trace, warn}; use serde_json::Value as JsonValue; /// Oneshot means it forgets any information about particular request /// when receives response: /// it once stores waker of the future; /// when response arrives, it stores response and wakes the waker once, /// dropping waker. #[derive(Debug)] pub(crate) struct OneshotResponder { api: Arc, wakers_map: HashMap, rx: Receiver, /// sequential id to be used as a unique identifier of request /// used to match request with response next_id: u64, updater: H, client: Client, rt: tokio::runtime::Handle, } #[derive(Debug)] enum TgResponseType { Update, Error, Response, } impl TgResponseType { fn from_type<'a>(type_: &'a str) -> Self { if type_.starts_with("update") { Self::Update } else if type_ == "error" { Self::Error } else { Self::Response } } } impl OneshotResponder { pub(crate) fn new( rx: Receiver, api: Arc, updater: H, tx: Sender, rt: tokio::runtime::Handle, ) -> Self { Self { api, wakers_map: HashMap::new(), rx, next_id: 0, updater, client: Client { sender: tx }, rt, } } pub(crate) fn run(&mut self) { loop { match self.rx.recv() { Ok(JoinStreams::NewRequest((mut request, fut_ref))) => { let id = self.next_id; self.next_id += 1; if !request["@extra"].is_null() { warn!("overwriting @extra in request"); } request["@extra"] = id.into(); self.api.send(request.to_string().as_ref()).ok(); self.wakers_map.insert(id, fut_ref); trace!("new req:\n{:#}", request); } Ok(JoinStreams::NewResponse(resp)) => { match serde_json::from_str::(&resp) { Ok(val) => { use crate::value_ext::ValueExt; let type_ = val.get_type().map(TgResponseType::from_type); match type_ { Ok(TgResponseType::Update) => { self.rt .spawn(self.updater.handle_json(self.client.clone(), val)); } Ok(TgResponseType::Response) => { self.handle_response(val); } Ok(TgResponseType::Error) => { self.handle_error(val); } Err(e) => { error!("response has invalid @type: {}. Be aware, that this could lock execution flow", e); } } } Err(e) => { warn!("ignoring invalid response. err: {}, resp: {}", e, resp); } } } Err(e) => { error!("stream closed: {}", e); error!("closing responder thread"); // this will return from function and effectively end thread break; } } } } fn handle_error(&mut self, resp: JsonValue) { if let Some(id) = resp["@extra"].as_u64() { if let Some(fut) = self.wakers_map.remove(&id) { let mut fut_data = fut.lock().unwrap(); fut_data.resp = Some(Ok(resp)); fut_data.waker.as_ref().map(Waker::wake_by_ref); } else { warn!( "response received, but request was not issued by any future: {}", resp ); } } else { warn!("response has invalid @extra: {}", resp); } } fn handle_response(&mut self, resp: JsonValue) { if let Some(id) = resp["@extra"].as_u64() { if let Some(fut) = self.wakers_map.remove(&id) { let mut fut_data = fut.lock().unwrap(); fut_data.resp = Some(Ok(resp)); fut_data.waker.as_ref().map(Waker::wake_by_ref); } else { warn!( "response received, but request was not issued by any future: {}", resp ); } } else { warn!("response has invalid @extra: {}", resp); } } }