//! Client consists of n + 1 threads: //! * one is communicating with tdlib api (api thread) //! * n threads, sending requests to the api thread through crossbeam channel. //! //! when Client::send is called: //! - creates //! * api thread receives Request: //! - adds `@extra` field to request //! - sends modified request to tdlib //! * sending thread returns future, //! that has a reference to the (not yet filled) response //! pub(crate) mod client_builder; pub mod commands; mod responder; pub mod types; use crate::error::Result; use crate::raw_ptr::TdPtr; use crate::update::Handler; use crossbeam::channel::{self, Sender}; use log::error; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value as JsonValue; use std::{ future::Future, marker::PhantomData, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, }; #[derive(Debug)] pub struct Response { resp: Option>, waker: Option, } impl Response { pub fn new_empty() -> Self { Self { resp: None, waker: None, } } } type SafeResponse = Arc>; pub trait Request: Serialize { /// Tag request with type /// TDLib infers type from @type field of sent json, so `tag` should insert /// this field into the value. fn tag(&self) -> crate::error::Result; /// Convenience method to tag json made from self fn tag_json>(&self, type_: S) -> crate::error::Result { let mut self_json = serde_json::to_value(self)?; if self_json.get("@type").is_some() { return Err(crate::error::Error::HasTypeInJson); } self_json["@type"] = type_.as_ref().into(); Ok(self_json) } } impl Request for JsonValue { fn tag(&self) -> crate::error::Result { if !self["@type"].is_string() { return Err(crate::error::Error::HasNoTypeInJson); } Ok(self.clone()) } } #[derive(Debug, Clone)] pub struct ResponseFuture { response_type_holder: PhantomData, pub response: SafeResponse, // TODO: maybe it is possible to make this lockless } impl ResponseFuture { pub fn from_error(err: crate::error::Error) -> Self { Self { response_type_holder: PhantomData, response: Arc::new(Mutex::new(Response { resp: Some(Err(err)), waker: None, })), } } } impl Future for ResponseFuture { type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { let mut data = self.response.lock().unwrap(); if let Some(resp) = data.resp.take() { let result = resp.and_then(|inner| -> Result { Ok(serde_json::from_value(inner)?) }); Poll::Ready(result) } else { data.waker = Some(ctx.waker().clone()); Poll::Pending } } } #[derive(Debug)] pub(crate) enum JoinStreams { NewRequest((JsonValue, SafeResponse)), NewResponse(String), } #[derive(Clone, Debug)] pub struct Client { sender: Sender, } pub trait ClientLike { fn send(&self, req: Req) -> ResponseFuture; } impl Client { pub(crate) fn new(updater: H, timeout: f64) -> Self { let (tx, rx) = channel::unbounded(); let api = Arc::new(TdPtr::new()); let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); let _responder_handle = { let api = api.clone(); let tx = tx.clone(); thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) }; let _tg_handle = { let api = api.clone(); let tx = tx.clone(); thread::spawn(move || loop { if let Some(msg) = api.receive(timeout) { if tx.send(JoinStreams::NewResponse(msg)).is_err() { error!("channel is closed. stopping receiver"); break; } } }); }; Self { sender: tx } } /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. /// Effective return type (now `serde_json::Value`) may be changed in future pub fn send_forget(&self, req: Req) -> ResponseFuture { self.send(req) } } impl ClientLike for Client { fn send(&self, req: Req) -> ResponseFuture { let fut = ResponseFuture { response_type_holder: PhantomData, response: Arc::new(Mutex::new(Response::new_empty())), }; let maybe_sent = req .tag() .and_then(|tagged| serde_json::to_value(tagged).map_err(|err| err.into())) .and_then(|serialized| { self.sender .send(JoinStreams::NewRequest((serialized, fut.response.clone()))) .map_err(|err| err.into()) }); match maybe_sent { Ok(_) => fut, Err(err) => ResponseFuture::from_error(err), } } }