From 7e41b162fb1a41d026814c7e4586aa6f6a627fbb Mon Sep 17 00:00:00 2001 From: syn Date: Fri, 8 Jan 2021 01:21:48 +0300 Subject: Initial commit --- src/client/mod.rs | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 src/client/mod.rs (limited to 'src/client/mod.rs') diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..d2589fa --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,165 @@ +//! Client consists of n + 1 threads: +//! * one is communicating with tdlib api (api thread) +//! * n threads, sending requests to the api thread through crossbeam channel. +//! +//! when Client::send is called: +//! - creates +//! * api thread receives Request: +//! - adds `@extra` field to request +//! - sends modified request to tdlib +//! * sending thread returns future, +//! that has a reference to the (not yet filled) response +//! + +pub(crate) mod client_builder; +mod responder; + +use crate::raw_ptr::TdPtr; +use crate::update::Handler; +use crossbeam::channel::{self, Sender}; +use log::debug; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_json::Value as JsonValue; +use std::{ + future::Future, + marker::PhantomData, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, +}; + +#[derive(Debug, Clone)] +pub struct Response { + resp: Option, + waker: Option, +} + +impl Response { + pub fn new_empty() -> Self { + Self { + resp: None, + waker: None, + } + } +} + +type SafeResponse = Arc>; + +pub trait Request: Serialize { + /// Tag request with type + /// TDLib infers type from @type field of sent json, so `tag` should insert + /// this field into the value. + fn tag(&self) -> crate::error::Result; + + /// Convenience method to tag json made from self + fn tag_json>(&self, type_: S) -> crate::error::Result { + let mut self_json = serde_json::to_value(self)?; + if self_json.get("@type").is_some() { + return Err(crate::error::Error::HasTypeInJson); + } + self_json["@type"] = type_.as_ref().into(); + Ok(self_json) + } +} + +impl Request for JsonValue { + fn tag(&self) -> crate::error::Result { + if !self["@type"].is_string() { + return Err(crate::error::Error::HasNoTypeInJson); + } + Ok(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct ResponseFuture { + response_type_holder: PhantomData, + pub response: SafeResponse, // TODO: maybe it is possible to make this lockless +} + +impl Future for ResponseFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + let mut data = self.response.lock().unwrap(); + if let Some(resp) = data.resp.clone() { + /* ^^^^^^^^ + * TODO: at this point _this thread_ is the only owner of response, + * so it is safe to get `resp` without cloning + */ + Poll::Ready(serde_json::from_value(resp.clone())) + } else { + data.waker = Some(ctx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +pub(crate) enum JoinStreams { + NewRequest((JsonValue, SafeResponse)), + NewResponse(String), +} + +#[derive(Clone, Debug)] +pub struct Client { + sender: Sender, +} + +impl Client { + pub(crate) fn new(updater: H, timeout: f64) -> Self { + let (tx, rx) = channel::unbounded(); + + let api = Arc::new(TdPtr::new()); + let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); + + let _responder_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) + }; + let _tg_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || Self::tdlib_receive_loop(tx, api, timeout)); + }; + Self { sender: tx } + } + + pub fn send( + &self, + req: Req, + ) -> crate::error::Result> { + let fut = ResponseFuture { + response_type_holder: PhantomData, + response: Arc::new(Mutex::new(Response::new_empty())), + }; + + self.sender.send(JoinStreams::NewRequest(( + serde_json::to_value(req.tag()?)?, + fut.response.clone(), + )))?; + Ok(fut) + } + + /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. + /// Effective return type (now `serde_json::Value`) may be changed in future + pub fn send_forget( + &self, + req: Req, + ) -> crate::error::Result> { + self.send(req) + } + + /// Receiving side of client + fn tdlib_receive_loop(tx: Sender, tdlib: Arc, timeout: f64) { + loop { + if let Some(msg) = tdlib.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + debug!("receive timed out"); + } + } + } +} -- cgit v1.2.1-18-gbd029