summaryrefslogtreecommitdiffstats
path: root/src/client/responder.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/responder.rs')
-rw-r--r--src/client/responder.rs152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/client/responder.rs b/src/client/responder.rs
new file mode 100644
index 0000000..87cacb8
--- /dev/null
+++ b/src/client/responder.rs
@@ -0,0 +1,152 @@
+use std::{collections::HashMap, sync::Arc, task::Waker};
+
+use super::Client;
+use crate::{raw_ptr::TdPtr, Handler};
+
+use super::{JoinStreams, SafeResponse};
+use crossbeam::channel::{Receiver, Sender};
+use log::{error, trace, warn};
+use serde_json::Value as JsonValue;
+
+/// Oneshot means it forgets any information about particular request
+/// when receives response:
+/// it once stores waker of the future;
+/// when response arrives, it stores response and wakes the waker once,
+/// dropping waker.
+#[derive(Debug)]
+pub(crate) struct OneshotResponder<H: Handler> {
+ api: Arc<TdPtr>,
+ wakers_map: HashMap<u64, SafeResponse>,
+ rx: Receiver<JoinStreams>,
+
+ /// sequential id to be used as a unique identifier of request
+ /// used to match request with response
+ next_id: u64,
+
+ updater: H,
+ client: Client,
+ rt: tokio::runtime::Handle,
+}
+
+#[derive(Debug)]
+enum TgResponseType {
+ Update,
+ Error,
+ Response,
+}
+
+impl TgResponseType {
+ fn from_type<'a>(type_: &'a str) -> Self {
+ if type_.starts_with("update") {
+ Self::Update
+ } else if type_ == "error" {
+ Self::Error
+ } else {
+ Self::Response
+ }
+ }
+}
+
+impl<H: Handler> OneshotResponder<H> {
+ pub(crate) fn new(
+ rx: Receiver<JoinStreams>,
+ api: Arc<TdPtr>,
+ updater: H,
+ tx: Sender<JoinStreams>,
+ rt: tokio::runtime::Handle,
+ ) -> Self {
+ Self {
+ api,
+ wakers_map: HashMap::new(),
+ rx,
+ next_id: 0,
+ updater,
+ client: Client { sender: tx },
+ rt,
+ }
+ }
+
+ pub(crate) fn run(&mut self) {
+ loop {
+ match self.rx.recv() {
+ Ok(JoinStreams::NewRequest((mut request, fut_ref))) => {
+ let id = self.next_id;
+ self.next_id += 1;
+ if !request["@extra"].is_null() {
+ warn!("overwriting @extra in request");
+ }
+ request["@extra"] = id.into();
+ self.api.send(request.to_string().as_ref()).ok();
+ self.wakers_map.insert(id, fut_ref);
+ trace!("new req:\n{:#}", request);
+ }
+ Ok(JoinStreams::NewResponse(resp)) => {
+ match serde_json::from_str::<JsonValue>(&resp) {
+ Ok(val) => {
+ use crate::value_ext::ValueExt;
+ let type_ = val.get_type().map(TgResponseType::from_type);
+ match type_ {
+ Ok(TgResponseType::Update) => {
+ self.rt
+ .spawn(self.updater.handle_json(self.client.clone(), val));
+ }
+ Ok(TgResponseType::Response) => {
+ self.handle_response(val);
+ }
+ Ok(TgResponseType::Error) => {
+ self.handle_error(val);
+ }
+ Err(e) => {
+ error!("response has invalid @type: {}. Be aware, that this could lock execution flow", e);
+ }
+ }
+ }
+ Err(e) => {
+ warn!("ignoring invalid response. err: {}, resp: {}", e, resp);
+ }
+ }
+ }
+ Err(e) => {
+ error!("stream closed: {}", e);
+ error!("closing responder thread");
+ // this will return from function and effectively end thread
+ break;
+ }
+ }
+ }
+ }
+
+ fn handle_error(&mut self, resp: JsonValue) {
+ if let Some(id) = resp["@extra"].as_u64() {
+ if let Some(fut) = self.wakers_map.remove(&id) {
+ let mut fut_data = fut.lock().unwrap();
+ fut_data.resp = Some(resp);
+ fut_data.waker.as_ref().map(Waker::wake_by_ref);
+ } else {
+ warn!(
+ "response received, but request was not issued by any future: {}",
+ resp
+ );
+ }
+ } else {
+ warn!("response has invalid @extra: {}", resp);
+ }
+ }
+
+ fn handle_response(&mut self, resp: JsonValue) {
+ if let Some(id) = resp["@extra"].as_u64() {
+ if let Some(fut) = self.wakers_map.remove(&id) {
+ let mut fut_data = fut.lock().unwrap();
+ fut_data.resp = Some(resp);
+ fut_data.waker.as_ref().map(Waker::wake_by_ref);
+ } else {
+ warn!(
+ "response received, but request was not issued by any future: {}",
+ resp
+ );
+ }
+ } else {
+ warn!("response has invalid @extra: {}", resp);
+ }
+ }
+}