diff options
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r-- | src/client/mod.rs | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..d2589fa --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,165 @@ +//! Client consists of n + 1 threads: +//! * one is communicating with tdlib api (api thread) +//! * n threads, sending requests to the api thread through crossbeam channel. +//! +//! when Client::send is called: +//! - creates +//! * api thread receives Request: +//! - adds `@extra` field to request +//! - sends modified request to tdlib +//! * sending thread returns future, +//! that has a reference to the (not yet filled) response +//! + +pub(crate) mod client_builder; +mod responder; + +use crate::raw_ptr::TdPtr; +use crate::update::Handler; +use crossbeam::channel::{self, Sender}; +use log::debug; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_json::Value as JsonValue; +use std::{ + future::Future, + marker::PhantomData, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, +}; + +#[derive(Debug, Clone)] +pub struct Response { + resp: Option<JsonValue>, + waker: Option<Waker>, +} + +impl Response { + pub fn new_empty() -> Self { + Self { + resp: None, + waker: None, + } + } +} + +type SafeResponse = Arc<Mutex<Response>>; + +pub trait Request: Serialize { + /// Tag request with type + /// TDLib infers type from @type field of sent json, so `tag` should insert + /// this field into the value. + fn tag(&self) -> crate::error::Result<JsonValue>; + + /// Convenience method to tag json made from self + fn tag_json<S: AsRef<str>>(&self, type_: S) -> crate::error::Result<JsonValue> { + let mut self_json = serde_json::to_value(self)?; + if self_json.get("@type").is_some() { + return Err(crate::error::Error::HasTypeInJson); + } + self_json["@type"] = type_.as_ref().into(); + Ok(self_json) + } +} + +impl Request for JsonValue { + fn tag(&self) -> crate::error::Result<JsonValue> { + if !self["@type"].is_string() { + return Err(crate::error::Error::HasNoTypeInJson); + } + Ok(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct ResponseFuture<R: DeserializeOwned> { + response_type_holder: PhantomData<R>, + pub response: SafeResponse, // TODO: maybe it is possible to make this lockless +} + +impl<R: DeserializeOwned> Future for ResponseFuture<R> { + type Output = Result<R, serde_json::error::Error>; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { + let mut data = self.response.lock().unwrap(); + if let Some(resp) = data.resp.clone() { + /* ^^^^^^^^ + * TODO: at this point _this thread_ is the only owner of response, + * so it is safe to get `resp` without cloning + */ + Poll::Ready(serde_json::from_value(resp.clone())) + } else { + data.waker = Some(ctx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +pub(crate) enum JoinStreams { + NewRequest((JsonValue, SafeResponse)), + NewResponse(String), +} + +#[derive(Clone, Debug)] +pub struct Client { + sender: Sender<JoinStreams>, +} + +impl Client { + pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self { + let (tx, rx) = channel::unbounded(); + + let api = Arc::new(TdPtr::new()); + let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); + + let _responder_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) + }; + let _tg_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || Self::tdlib_receive_loop(tx, api, timeout)); + }; + Self { sender: tx } + } + + pub fn send<Req: Request, Resp: DeserializeOwned>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<Resp>> { + let fut = ResponseFuture { + response_type_holder: PhantomData, + response: Arc::new(Mutex::new(Response::new_empty())), + }; + + self.sender.send(JoinStreams::NewRequest(( + serde_json::to_value(req.tag()?)?, + fut.response.clone(), + )))?; + Ok(fut) + } + + /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. + /// Effective return type (now `serde_json::Value`) may be changed in future + pub fn send_forget<Req: Request>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<JsonValue>> { + self.send(req) + } + + /// Receiving side of client + fn tdlib_receive_loop(tx: Sender<JoinStreams>, tdlib: Arc<TdPtr>, timeout: f64) { + loop { + if let Some(msg) = tdlib.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + debug!("receive timed out"); + } + } + } +} |