summaryrefslogtreecommitdiffstats
path: root/src/client/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r--src/client/mod.rs165
1 files changed, 165 insertions, 0 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
new file mode 100644
index 0000000..d2589fa
--- /dev/null
+++ b/src/client/mod.rs
@@ -0,0 +1,165 @@
+//! Client consists of n + 1 threads:
+//! * one is communicating with tdlib api (api thread)
+//! * n threads, sending requests to the api thread through crossbeam channel.
+//!
+//! when Client::send is called:
+//! - creates
+//! * api thread receives Request:
+//! - adds `@extra` field to request
+//! - sends modified request to tdlib
+//! * sending thread returns future,
+//! that has a reference to the (not yet filled) response
+//!
+
+pub(crate) mod client_builder;
+mod responder;
+
+use crate::raw_ptr::TdPtr;
+use crate::update::Handler;
+use crossbeam::channel::{self, Sender};
+use log::debug;
+use serde::{de::DeserializeOwned, ser::Serialize};
+use serde_json::Value as JsonValue;
+use std::{
+ future::Future,
+ marker::PhantomData,
+ pin::Pin,
+ sync::{Arc, Mutex},
+ task::{Context, Poll, Waker},
+ thread,
+};
+
+#[derive(Debug, Clone)]
+pub struct Response {
+ resp: Option<JsonValue>,
+ waker: Option<Waker>,
+}
+
+impl Response {
+ pub fn new_empty() -> Self {
+ Self {
+ resp: None,
+ waker: None,
+ }
+ }
+}
+
+type SafeResponse = Arc<Mutex<Response>>;
+
+pub trait Request: Serialize {
+ /// Tag request with type
+ /// TDLib infers type from @type field of sent json, so `tag` should insert
+ /// this field into the value.
+ fn tag(&self) -> crate::error::Result<JsonValue>;
+
+ /// Convenience method to tag json made from self
+ fn tag_json<S: AsRef<str>>(&self, type_: S) -> crate::error::Result<JsonValue> {
+ let mut self_json = serde_json::to_value(self)?;
+ if self_json.get("@type").is_some() {
+ return Err(crate::error::Error::HasTypeInJson);
+ }
+ self_json["@type"] = type_.as_ref().into();
+ Ok(self_json)
+ }
+}
+
+impl Request for JsonValue {
+ fn tag(&self) -> crate::error::Result<JsonValue> {
+ if !self["@type"].is_string() {
+ return Err(crate::error::Error::HasNoTypeInJson);
+ }
+ Ok(self.clone())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ResponseFuture<R: DeserializeOwned> {
+ response_type_holder: PhantomData<R>,
+ pub response: SafeResponse, // TODO: maybe it is possible to make this lockless
+}
+
+impl<R: DeserializeOwned> Future for ResponseFuture<R> {
+ type Output = Result<R, serde_json::error::Error>;
+
+ fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
+ let mut data = self.response.lock().unwrap();
+ if let Some(resp) = data.resp.clone() {
+ /* ^^^^^^^^
+ * TODO: at this point _this thread_ is the only owner of response,
+ * so it is safe to get `resp` without cloning
+ */
+ Poll::Ready(serde_json::from_value(resp.clone()))
+ } else {
+ data.waker = Some(ctx.waker().clone());
+ Poll::Pending
+ }
+ }
+}
+
+#[derive(Debug)]
+pub(crate) enum JoinStreams {
+ NewRequest((JsonValue, SafeResponse)),
+ NewResponse(String),
+}
+
+#[derive(Clone, Debug)]
+pub struct Client {
+ sender: Sender<JoinStreams>,
+}
+
+impl Client {
+ pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self {
+ let (tx, rx) = channel::unbounded();
+
+ let api = Arc::new(TdPtr::new());
+ let rt = tokio::runtime::Handle::try_current().expect("must be in runtime");
+
+ let _responder_handle = {
+ let api = api.clone();
+ let tx = tx.clone();
+ thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run())
+ };
+ let _tg_handle = {
+ let api = api.clone();
+ let tx = tx.clone();
+ thread::spawn(move || Self::tdlib_receive_loop(tx, api, timeout));
+ };
+ Self { sender: tx }
+ }
+
+ pub fn send<Req: Request, Resp: DeserializeOwned>(
+ &self,
+ req: Req,
+ ) -> crate::error::Result<ResponseFuture<Resp>> {
+ let fut = ResponseFuture {
+ response_type_holder: PhantomData,
+ response: Arc::new(Mutex::new(Response::new_empty())),
+ };
+
+ self.sender.send(JoinStreams::NewRequest((
+ serde_json::to_value(req.tag()?)?,
+ fut.response.clone(),
+ )))?;
+ Ok(fut)
+ }
+
+ /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used.
+ /// Effective return type (now `serde_json::Value`) may be changed in future
+ pub fn send_forget<Req: Request>(
+ &self,
+ req: Req,
+ ) -> crate::error::Result<ResponseFuture<JsonValue>> {
+ self.send(req)
+ }
+
+ /// Receiving side of client
+ fn tdlib_receive_loop(tx: Sender<JoinStreams>, tdlib: Arc<TdPtr>, timeout: f64) {
+ loop {
+ if let Some(msg) = tdlib.receive(timeout) {
+ tx.send(JoinStreams::NewResponse(msg)).unwrap();
+ } else {
+ debug!("receive timed out");
+ }
+ }
+ }
+}