diff options
author | syn <isaqtm@gmail.com> | 2021-01-08 01:21:48 +0300 |
---|---|---|
committer | syn <isaqtm@gmail.com> | 2021-01-08 01:21:48 +0300 |
commit | 7e41b162fb1a41d026814c7e4586aa6f6a627fbb (patch) | |
tree | beedf614593ceb3555659aa98382c8950a22bae8 /src/client | |
download | tdlib-rs-7e41b162fb1a41d026814c7e4586aa6f6a627fbb.tar.gz |
Initial commit
Diffstat (limited to 'src/client')
-rw-r--r-- | src/client/client_builder.rs | 74 | ||||
-rw-r--r-- | src/client/mod.rs | 165 | ||||
-rw-r--r-- | src/client/responder.rs | 152 |
3 files changed, 391 insertions, 0 deletions
diff --git a/src/client/client_builder.rs b/src/client/client_builder.rs new file mode 100644 index 0000000..eaad59f --- /dev/null +++ b/src/client/client_builder.rs @@ -0,0 +1,74 @@ +use crate::raw_ptr::{LogLevel, TdPtr}; +use crate::update::Handler; + +/// Build client with some options. +/// +/// This builder will execute functions as soon as builder functions will be called +/// +/// e.g. if you call [`ClientBuilder::log_level`], it will immediately call +/// underlying API to change logging in TDLib, as it will be done globally +/// disregarding actual TDLib client being used, if any +#[derive(Debug)] +pub struct ClientBuilder<H: Handler> { + // need Option here to transfer ownership to client, when building + handler: Option<H>, + timeout: f64, +} + +impl<H: Handler> ClientBuilder<H> { + /// Create new `ClientBuilder` with update handler + pub fn new(handler: H) -> Self { + Self { + handler: Some(handler), + timeout: 10.0, + } + } + + /// Set up internal TDLib logging level. + /// + /// See also [`LogLevel`] doc for explanation of logging levels. + /// By default, logging level will be [`LogLevel::Verbose`] + pub fn log_level<'a>(&'a mut self, level: LogLevel) -> &'a mut Self { + TdPtr::set_log_verbosity_level(level); + self + } + + /// Set up internal TDLib logging file size. + /// Sets the maximum size of the file to where the internal TDLib log is written + /// before the file will be auto-rotated. Unused if log is not written to a file. + /// Defaults to 10 MB. + /// Should be positive. + pub fn log_max_size(&mut self, size: i64) -> &mut Self { + TdPtr::set_log_max_file_size(size); + self + } + + /// Sets the path to the file where the internal TDLib log will be written. + /// By default TDLib writes logs to stderr or an OS specific log. + /// Use this method to write the log to a file instead. + pub fn log_file<S: AsRef<str>>(&mut self, file: S) -> &mut Self { + TdPtr::set_log_file_path(file).unwrap(); + self + } + + /// Reset file of internal TDLib logging. + /// TDLib will revert to default logging behaviour (log to stderr) + pub fn log_reset_file(&mut self) -> &mut Self { + TdPtr::reset_log_file_path().unwrap(); + self + } + + /// Set timeout for receiver. + /// Usually, you do not want to customize this + pub fn receive_timeout(&mut self, timeout: f64) -> &mut Self { + self.timeout = timeout; + self + } + + /// Build client + pub fn build(&mut self) -> crate::client::Client { + // this unwrap is safe, because handler is always Some in `new` + // and cannot be mutated during building + crate::Client::new(self.handler.take().unwrap(), self.timeout) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..d2589fa --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,165 @@ +//! Client consists of n + 1 threads: +//! * one is communicating with tdlib api (api thread) +//! * n threads, sending requests to the api thread through crossbeam channel. +//! +//! when Client::send is called: +//! - creates +//! * api thread receives Request: +//! - adds `@extra` field to request +//! - sends modified request to tdlib +//! * sending thread returns future, +//! that has a reference to the (not yet filled) response +//! + +pub(crate) mod client_builder; +mod responder; + +use crate::raw_ptr::TdPtr; +use crate::update::Handler; +use crossbeam::channel::{self, Sender}; +use log::debug; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_json::Value as JsonValue; +use std::{ + future::Future, + marker::PhantomData, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, +}; + +#[derive(Debug, Clone)] +pub struct Response { + resp: Option<JsonValue>, + waker: Option<Waker>, +} + +impl Response { + pub fn new_empty() -> Self { + Self { + resp: None, + waker: None, + } + } +} + +type SafeResponse = Arc<Mutex<Response>>; + +pub trait Request: Serialize { + /// Tag request with type + /// TDLib infers type from @type field of sent json, so `tag` should insert + /// this field into the value. + fn tag(&self) -> crate::error::Result<JsonValue>; + + /// Convenience method to tag json made from self + fn tag_json<S: AsRef<str>>(&self, type_: S) -> crate::error::Result<JsonValue> { + let mut self_json = serde_json::to_value(self)?; + if self_json.get("@type").is_some() { + return Err(crate::error::Error::HasTypeInJson); + } + self_json["@type"] = type_.as_ref().into(); + Ok(self_json) + } +} + +impl Request for JsonValue { + fn tag(&self) -> crate::error::Result<JsonValue> { + if !self["@type"].is_string() { + return Err(crate::error::Error::HasNoTypeInJson); + } + Ok(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct ResponseFuture<R: DeserializeOwned> { + response_type_holder: PhantomData<R>, + pub response: SafeResponse, // TODO: maybe it is possible to make this lockless +} + +impl<R: DeserializeOwned> Future for ResponseFuture<R> { + type Output = Result<R, serde_json::error::Error>; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { + let mut data = self.response.lock().unwrap(); + if let Some(resp) = data.resp.clone() { + /* ^^^^^^^^ + * TODO: at this point _this thread_ is the only owner of response, + * so it is safe to get `resp` without cloning + */ + Poll::Ready(serde_json::from_value(resp.clone())) + } else { + data.waker = Some(ctx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +pub(crate) enum JoinStreams { + NewRequest((JsonValue, SafeResponse)), + NewResponse(String), +} + +#[derive(Clone, Debug)] +pub struct Client { + sender: Sender<JoinStreams>, +} + +impl Client { + pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self { + let (tx, rx) = channel::unbounded(); + + let api = Arc::new(TdPtr::new()); + let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); + + let _responder_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) + }; + let _tg_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || Self::tdlib_receive_loop(tx, api, timeout)); + }; + Self { sender: tx } + } + + pub fn send<Req: Request, Resp: DeserializeOwned>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<Resp>> { + let fut = ResponseFuture { + response_type_holder: PhantomData, + response: Arc::new(Mutex::new(Response::new_empty())), + }; + + self.sender.send(JoinStreams::NewRequest(( + serde_json::to_value(req.tag()?)?, + fut.response.clone(), + )))?; + Ok(fut) + } + + /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. + /// Effective return type (now `serde_json::Value`) may be changed in future + pub fn send_forget<Req: Request>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<JsonValue>> { + self.send(req) + } + + /// Receiving side of client + fn tdlib_receive_loop(tx: Sender<JoinStreams>, tdlib: Arc<TdPtr>, timeout: f64) { + loop { + if let Some(msg) = tdlib.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + debug!("receive timed out"); + } + } + } +} diff --git a/src/client/responder.rs b/src/client/responder.rs new file mode 100644 index 0000000..87cacb8 --- /dev/null +++ b/src/client/responder.rs @@ -0,0 +1,152 @@ +use std::{collections::HashMap, sync::Arc, task::Waker}; + +use super::Client; +use crate::{raw_ptr::TdPtr, Handler}; + +use super::{JoinStreams, SafeResponse}; +use crossbeam::channel::{Receiver, Sender}; +use log::{error, trace, warn}; +use serde_json::Value as JsonValue; + +/// Oneshot means it forgets any information about particular request +/// when receives response: +/// it once stores waker of the future; +/// when response arrives, it stores response and wakes the waker once, +/// dropping waker. +#[derive(Debug)] +pub(crate) struct OneshotResponder<H: Handler> { + api: Arc<TdPtr>, + wakers_map: HashMap<u64, SafeResponse>, + rx: Receiver<JoinStreams>, + + /// sequential id to be used as a unique identifier of request + /// used to match request with response + next_id: u64, + + updater: H, + client: Client, + rt: tokio::runtime::Handle, +} + +#[derive(Debug)] +enum TgResponseType { + Update, + Error, + Response, +} + +impl TgResponseType { + fn from_type<'a>(type_: &'a str) -> Self { + if type_.starts_with("update") { + Self::Update + } else if type_ == "error" { + Self::Error + } else { + Self::Response + } + } +} + +impl<H: Handler> OneshotResponder<H> { + pub(crate) fn new( + rx: Receiver<JoinStreams>, + api: Arc<TdPtr>, + updater: H, + tx: Sender<JoinStreams>, + rt: tokio::runtime::Handle, + ) -> Self { + Self { + api, + wakers_map: HashMap::new(), + rx, + next_id: 0, + updater, + client: Client { sender: tx }, + rt, + } + } + + pub(crate) fn run(&mut self) { + loop { + match self.rx.recv() { + Ok(JoinStreams::NewRequest((mut request, fut_ref))) => { + let id = self.next_id; + self.next_id += 1; + if !request["@extra"].is_null() { + warn!("overwriting @extra in request"); + } + request["@extra"] = id.into(); + self.api.send(request.to_string().as_ref()).ok(); + self.wakers_map.insert(id, fut_ref); + trace!("new req:\n{:#}", request); + } + Ok(JoinStreams::NewResponse(resp)) => { + match serde_json::from_str::<JsonValue>(&resp) { + Ok(val) => { + use crate::value_ext::ValueExt; + let type_ = val.get_type().map(TgResponseType::from_type); + match type_ { + Ok(TgResponseType::Update) => { + self.rt + .spawn(self.updater.handle_json(self.client.clone(), val)); + } + Ok(TgResponseType::Response) => { + self.handle_response(val); + } + Ok(TgResponseType::Error) => { + self.handle_error(val); + } + Err(e) => { + error!("response has invalid @type: {}. Be aware, that this could lock execution flow", e); + } + } + } + Err(e) => { + warn!("ignoring invalid response. err: {}, resp: {}", e, resp); + } + } + } + Err(e) => { + error!("stream closed: {}", e); + error!("closing responder thread"); + // this will return from function and effectively end thread + break; + } + } + } + } + + fn handle_error(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } + + fn handle_response(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } +} |