//! Client consists of n + 1 threads: //! * one is communicating with tdlib api (api thread) //! * n threads, sending requests to the api thread through crossbeam channel. //! //! when Client::send is called: //! - creates //! * api thread receives Request: //! - adds `@extra` field to request //! - sends modified request to tdlib //! * sending thread returns future, //! that has a reference to the (not yet filled) response //! pub(crate) mod client_builder; mod responder; use crate::raw_ptr::TdPtr; use crate::update::Handler; use crossbeam::channel::{self, Sender}; use log::{debug, error}; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value as JsonValue; use std::{ future::Future, marker::PhantomData, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, }; #[derive(Debug, Clone)] pub struct Response { resp: Option, waker: Option, } impl Response { pub fn new_empty() -> Self { Self { resp: None, waker: None, } } } type SafeResponse = Arc>; pub trait Request: Serialize { /// Tag request with type /// TDLib infers type from @type field of sent json, so `tag` should insert /// this field into the value. fn tag(&self) -> crate::error::Result; /// Convenience method to tag json made from self fn tag_json>(&self, type_: S) -> crate::error::Result { let mut self_json = serde_json::to_value(self)?; if self_json.get("@type").is_some() { return Err(crate::error::Error::HasTypeInJson); } self_json["@type"] = type_.as_ref().into(); Ok(self_json) } } impl Request for JsonValue { fn tag(&self) -> crate::error::Result { if !self["@type"].is_string() { return Err(crate::error::Error::HasNoTypeInJson); } Ok(self.clone()) } } #[derive(Debug, Clone)] pub struct ResponseFuture { response_type_holder: PhantomData, pub response: SafeResponse, // TODO: maybe it is possible to make this lockless } impl Future for ResponseFuture { type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { let mut data = self.response.lock().unwrap(); if let Some(resp) = data.resp.clone() { /* ^^^^^^^^ * TODO: at this point _this thread_ is the only owner of response, * so it is safe to get `resp` without cloning */ Poll::Ready(serde_json::from_value(resp.clone())) } else { data.waker = Some(ctx.waker().clone()); Poll::Pending } } } #[derive(Debug)] pub(crate) enum JoinStreams { NewRequest((JsonValue, SafeResponse)), NewResponse(String), } #[derive(Clone, Debug)] pub struct Client { sender: Sender, } impl Client { pub(crate) fn new(updater: H, timeout: f64) -> Self { let (tx, rx) = channel::unbounded(); let api = Arc::new(TdPtr::new()); let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); let _responder_handle = { let api = api.clone(); let tx = tx.clone(); thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) }; let _tg_handle = { let api = api.clone(); let tx = tx.clone(); thread::spawn(move || loop { if let Some(msg) = api.receive(timeout) { if tx.send(JoinStreams::NewResponse(msg)).is_err() { error!("channel is closed. stopping receiver"); break; } } }); }; Self { sender: tx } } pub fn send( &self, req: Req, ) -> crate::error::Result> { let fut = ResponseFuture { response_type_holder: PhantomData, response: Arc::new(Mutex::new(Response::new_empty())), }; self.sender.send(JoinStreams::NewRequest(( serde_json::to_value(req.tag()?)?, fut.response.clone(), )))?; Ok(fut) } /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. /// Effective return type (now `serde_json::Value`) may be changed in future pub fn send_forget( &self, req: Req, ) -> crate::error::Result> { self.send(req) } }