diff options
author | syn <isaqtm@gmail.com> | 2021-01-08 01:21:48 +0300 |
---|---|---|
committer | syn <isaqtm@gmail.com> | 2021-01-08 01:21:48 +0300 |
commit | 7e41b162fb1a41d026814c7e4586aa6f6a627fbb (patch) | |
tree | beedf614593ceb3555659aa98382c8950a22bae8 | |
download | tdlib-rs-7e41b162fb1a41d026814c7e4586aa6f6a627fbb.tar.gz |
Initial commit
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | Cargo.toml | 31 | ||||
-rw-r--r-- | src/client/client_builder.rs | 74 | ||||
-rw-r--r-- | src/client/mod.rs | 165 | ||||
-rw-r--r-- | src/client/responder.rs | 152 | ||||
-rw-r--r-- | src/error.rs | 64 | ||||
-rw-r--r-- | src/lib.rs | 13 | ||||
-rw-r--r-- | src/raw_ptr.rs | 150 | ||||
-rw-r--r-- | src/update.rs | 18 | ||||
-rw-r--r-- | src/value_ext.rs | 23 |
10 files changed, 692 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..443f87c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "tdlib-rs" +version = "0.1.0" +authors = ["syn <isaqtm@gmail.com>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libc = "0.2" +tdlib-sys = { version = "1.7.0", path = "../tdlib-sys" } +thiserror = "1.0" + +# TODO: the only thing we need from futures-rs is BoxFuture. +# maybe we shold define it ourselves and drop futures dependency +futures = "0.3" + +tokio = { version = "0.3", features = ["rt"] } + +# TODO: get rid of crossbeam and use std if possible +crossbeam = "0.8" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +# TODO: make logging a feature +log = "0.4" + +[dev-dependencies] +tokio = { version = "0.3", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/src/client/client_builder.rs b/src/client/client_builder.rs new file mode 100644 index 0000000..eaad59f --- /dev/null +++ b/src/client/client_builder.rs @@ -0,0 +1,74 @@ +use crate::raw_ptr::{LogLevel, TdPtr}; +use crate::update::Handler; + +/// Build client with some options. +/// +/// This builder will execute functions as soon as builder functions will be called +/// +/// e.g. if you call [`ClientBuilder::log_level`], it will immediately call +/// underlying API to change logging in TDLib, as it will be done globally +/// disregarding actual TDLib client being used, if any +#[derive(Debug)] +pub struct ClientBuilder<H: Handler> { + // need Option here to transfer ownership to client, when building + handler: Option<H>, + timeout: f64, +} + +impl<H: Handler> ClientBuilder<H> { + /// Create new `ClientBuilder` with update handler + pub fn new(handler: H) -> Self { + Self { + handler: Some(handler), + timeout: 10.0, + } + } + + /// Set up internal TDLib logging level. + /// + /// See also [`LogLevel`] doc for explanation of logging levels. + /// By default, logging level will be [`LogLevel::Verbose`] + pub fn log_level<'a>(&'a mut self, level: LogLevel) -> &'a mut Self { + TdPtr::set_log_verbosity_level(level); + self + } + + /// Set up internal TDLib logging file size. + /// Sets the maximum size of the file to where the internal TDLib log is written + /// before the file will be auto-rotated. Unused if log is not written to a file. + /// Defaults to 10 MB. + /// Should be positive. + pub fn log_max_size(&mut self, size: i64) -> &mut Self { + TdPtr::set_log_max_file_size(size); + self + } + + /// Sets the path to the file where the internal TDLib log will be written. + /// By default TDLib writes logs to stderr or an OS specific log. + /// Use this method to write the log to a file instead. + pub fn log_file<S: AsRef<str>>(&mut self, file: S) -> &mut Self { + TdPtr::set_log_file_path(file).unwrap(); + self + } + + /// Reset file of internal TDLib logging. + /// TDLib will revert to default logging behaviour (log to stderr) + pub fn log_reset_file(&mut self) -> &mut Self { + TdPtr::reset_log_file_path().unwrap(); + self + } + + /// Set timeout for receiver. + /// Usually, you do not want to customize this + pub fn receive_timeout(&mut self, timeout: f64) -> &mut Self { + self.timeout = timeout; + self + } + + /// Build client + pub fn build(&mut self) -> crate::client::Client { + // this unwrap is safe, because handler is always Some in `new` + // and cannot be mutated during building + crate::Client::new(self.handler.take().unwrap(), self.timeout) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..d2589fa --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,165 @@ +//! Client consists of n + 1 threads: +//! * one is communicating with tdlib api (api thread) +//! * n threads, sending requests to the api thread through crossbeam channel. +//! +//! when Client::send is called: +//! - creates +//! * api thread receives Request: +//! - adds `@extra` field to request +//! - sends modified request to tdlib +//! * sending thread returns future, +//! that has a reference to the (not yet filled) response +//! + +pub(crate) mod client_builder; +mod responder; + +use crate::raw_ptr::TdPtr; +use crate::update::Handler; +use crossbeam::channel::{self, Sender}; +use log::debug; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_json::Value as JsonValue; +use std::{ + future::Future, + marker::PhantomData, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, +}; + +#[derive(Debug, Clone)] +pub struct Response { + resp: Option<JsonValue>, + waker: Option<Waker>, +} + +impl Response { + pub fn new_empty() -> Self { + Self { + resp: None, + waker: None, + } + } +} + +type SafeResponse = Arc<Mutex<Response>>; + +pub trait Request: Serialize { + /// Tag request with type + /// TDLib infers type from @type field of sent json, so `tag` should insert + /// this field into the value. + fn tag(&self) -> crate::error::Result<JsonValue>; + + /// Convenience method to tag json made from self + fn tag_json<S: AsRef<str>>(&self, type_: S) -> crate::error::Result<JsonValue> { + let mut self_json = serde_json::to_value(self)?; + if self_json.get("@type").is_some() { + return Err(crate::error::Error::HasTypeInJson); + } + self_json["@type"] = type_.as_ref().into(); + Ok(self_json) + } +} + +impl Request for JsonValue { + fn tag(&self) -> crate::error::Result<JsonValue> { + if !self["@type"].is_string() { + return Err(crate::error::Error::HasNoTypeInJson); + } + Ok(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct ResponseFuture<R: DeserializeOwned> { + response_type_holder: PhantomData<R>, + pub response: SafeResponse, // TODO: maybe it is possible to make this lockless +} + +impl<R: DeserializeOwned> Future for ResponseFuture<R> { + type Output = Result<R, serde_json::error::Error>; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { + let mut data = self.response.lock().unwrap(); + if let Some(resp) = data.resp.clone() { + /* ^^^^^^^^ + * TODO: at this point _this thread_ is the only owner of response, + * so it is safe to get `resp` without cloning + */ + Poll::Ready(serde_json::from_value(resp.clone())) + } else { + data.waker = Some(ctx.waker().clone()); + Poll::Pending + } + } +} + +#[derive(Debug)] +pub(crate) enum JoinStreams { + NewRequest((JsonValue, SafeResponse)), + NewResponse(String), +} + +#[derive(Clone, Debug)] +pub struct Client { + sender: Sender<JoinStreams>, +} + +impl Client { + pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self { + let (tx, rx) = channel::unbounded(); + + let api = Arc::new(TdPtr::new()); + let rt = tokio::runtime::Handle::try_current().expect("must be in runtime"); + + let _responder_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || responder::OneshotResponder::new(rx, api, updater, tx, rt).run()) + }; + let _tg_handle = { + let api = api.clone(); + let tx = tx.clone(); + thread::spawn(move || Self::tdlib_receive_loop(tx, api, timeout)); + }; + Self { sender: tx } + } + + pub fn send<Req: Request, Resp: DeserializeOwned>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<Resp>> { + let fut = ResponseFuture { + response_type_holder: PhantomData, + response: Arc::new(Mutex::new(Response::new_empty())), + }; + + self.sender.send(JoinStreams::NewRequest(( + serde_json::to_value(req.tag()?)?, + fut.response.clone(), + )))?; + Ok(fut) + } + + /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. + /// Effective return type (now `serde_json::Value`) may be changed in future + pub fn send_forget<Req: Request>( + &self, + req: Req, + ) -> crate::error::Result<ResponseFuture<JsonValue>> { + self.send(req) + } + + /// Receiving side of client + fn tdlib_receive_loop(tx: Sender<JoinStreams>, tdlib: Arc<TdPtr>, timeout: f64) { + loop { + if let Some(msg) = tdlib.receive(timeout) { + tx.send(JoinStreams::NewResponse(msg)).unwrap(); + } else { + debug!("receive timed out"); + } + } + } +} diff --git a/src/client/responder.rs b/src/client/responder.rs new file mode 100644 index 0000000..87cacb8 --- /dev/null +++ b/src/client/responder.rs @@ -0,0 +1,152 @@ +use std::{collections::HashMap, sync::Arc, task::Waker}; + +use super::Client; +use crate::{raw_ptr::TdPtr, Handler}; + +use super::{JoinStreams, SafeResponse}; +use crossbeam::channel::{Receiver, Sender}; +use log::{error, trace, warn}; +use serde_json::Value as JsonValue; + +/// Oneshot means it forgets any information about particular request +/// when receives response: +/// it once stores waker of the future; +/// when response arrives, it stores response and wakes the waker once, +/// dropping waker. +#[derive(Debug)] +pub(crate) struct OneshotResponder<H: Handler> { + api: Arc<TdPtr>, + wakers_map: HashMap<u64, SafeResponse>, + rx: Receiver<JoinStreams>, + + /// sequential id to be used as a unique identifier of request + /// used to match request with response + next_id: u64, + + updater: H, + client: Client, + rt: tokio::runtime::Handle, +} + +#[derive(Debug)] +enum TgResponseType { + Update, + Error, + Response, +} + +impl TgResponseType { + fn from_type<'a>(type_: &'a str) -> Self { + if type_.starts_with("update") { + Self::Update + } else if type_ == "error" { + Self::Error + } else { + Self::Response + } + } +} + +impl<H: Handler> OneshotResponder<H> { + pub(crate) fn new( + rx: Receiver<JoinStreams>, + api: Arc<TdPtr>, + updater: H, + tx: Sender<JoinStreams>, + rt: tokio::runtime::Handle, + ) -> Self { + Self { + api, + wakers_map: HashMap::new(), + rx, + next_id: 0, + updater, + client: Client { sender: tx }, + rt, + } + } + + pub(crate) fn run(&mut self) { + loop { + match self.rx.recv() { + Ok(JoinStreams::NewRequest((mut request, fut_ref))) => { + let id = self.next_id; + self.next_id += 1; + if !request["@extra"].is_null() { + warn!("overwriting @extra in request"); + } + request["@extra"] = id.into(); + self.api.send(request.to_string().as_ref()).ok(); + self.wakers_map.insert(id, fut_ref); + trace!("new req:\n{:#}", request); + } + Ok(JoinStreams::NewResponse(resp)) => { + match serde_json::from_str::<JsonValue>(&resp) { + Ok(val) => { + use crate::value_ext::ValueExt; + let type_ = val.get_type().map(TgResponseType::from_type); + match type_ { + Ok(TgResponseType::Update) => { + self.rt + .spawn(self.updater.handle_json(self.client.clone(), val)); + } + Ok(TgResponseType::Response) => { + self.handle_response(val); + } + Ok(TgResponseType::Error) => { + self.handle_error(val); + } + Err(e) => { + error!("response has invalid @type: {}. Be aware, that this could lock execution flow", e); + } + } + } + Err(e) => { + warn!("ignoring invalid response. err: {}, resp: {}", e, resp); + } + } + } + Err(e) => { + error!("stream closed: {}", e); + error!("closing responder thread"); + // this will return from function and effectively end thread + break; + } + } + } + } + + fn handle_error(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } + + fn handle_response(&mut self, resp: JsonValue) { + if let Some(id) = resp["@extra"].as_u64() { + if let Some(fut) = self.wakers_map.remove(&id) { + let mut fut_data = fut.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data.waker.as_ref().map(Waker::wake_by_ref); + } else { + warn!( + "response received, but request was not issued by any future: {}", + resp + ); + } + } else { + warn!("response has invalid @extra: {}", resp); + } + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..f7796b8 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,64 @@ +#[derive(Debug)] +pub enum TdlibSysError { + LogLevelOutOfBounds(u16), + Nul(usize), + UtfDecode, + CannotSetLogFile, +} + +impl std::error::Error for TdlibSysError {} + +impl std::convert::From<std::ffi::NulError> for TdlibSysError { + fn from(nul_err: std::ffi::NulError) -> Self { + Self::Nul(nul_err.nul_position()) + } +} + +impl std::fmt::Display for TdlibSysError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use TdlibSysError::*; + match self { + LogLevelOutOfBounds(lvl) => { + write!(f, "Log level should be less than 1024. Found {}", lvl) + } + Nul(pos) => write!(f, "String contains nul byte at pos: {}", pos), + UtfDecode => write!(f, "String is not utf8 encoded"), + CannotSetLogFile => write!(f, "Could not set log file"), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum TgError { + /// Something went wrong, when (de)serializing shit + #[error("Something went wrong, when (de)serializing shit: {}", .0)] + Serde(#[from] serde_json::Error), + + /// tdlib-sys error + #[error("tdlib-sys error: {}", .0)] + TdSys(#[from] TdlibSysError), + + #[error("@type must not be in your structure used as Request")] + HasTypeInJson, + + #[error("@type must be string in JsonValue used as Request")] + HasNoTypeInJson, + + #[error("json is invalid: {}", .0)] + InvalidJson(String), + + #[error("core channel has been closed unexpectedly")] + ChannelClosed, + + #[error("telegram replied with error code: {code} and message: {msg}")] + TelegramError { code: i32, msg: String }, +} + +impl<T> std::convert::From<crossbeam::channel::SendError<T>> for TgError { + fn from(_: crossbeam::channel::SendError<T>) -> Self { + Self::ChannelClosed + } +} + +pub type Error = TgError; +pub type Result<T> = std::result::Result<T, Error>; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..6e9f6e3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,13 @@ +pub mod client; +pub mod error; +pub mod raw_ptr; +pub mod update; +mod value_ext; + +pub use client::client_builder::ClientBuilder; +pub use client::Client; +pub use error::Error; +pub use error::TdlibSysError; +pub use raw_ptr::LogLevel; +pub use update::Handler; +pub use value_ext::ValueExt; diff --git a/src/raw_ptr.rs b/src/raw_ptr.rs new file mode 100644 index 0000000..5e2bdde --- /dev/null +++ b/src/raw_ptr.rs @@ -0,0 +1,150 @@ +use crate::error::TdlibSysError; +use libc::c_char; +use std::ffi::CStr; +use std::ffi::CString; + +#[derive(Debug)] +pub struct TdPtr(*mut tdlib_sys::TdLibRawPtr); + +// Send + Sync is guaranteed by TDLib +unsafe impl Send for TdPtr {} +unsafe impl Sync for TdPtr {} + +/// TDLib uses 6 fixed logging levels +/// +/// You can also set even more logging through `More` variant, +/// providing desired logging level +#[derive(Debug, PartialEq, Eq)] +pub enum LogLevel { + Fatal, + Error, + Warn, + Info, + Debug, + Verbose, + /// This sets more verbose logging. + /// + /// Must be between 6 and 1024 inclusively + More(u16), +} + +impl std::default::Default for LogLevel { + fn default() -> Self { + Self::Verbose // 5 is default value from tdlib's docs + } +} + +impl TdPtr { + /// Sets the verbosity level of the internal logging of TDLib. + /// By default the TDLib uses a log verbosity level of 5 (meaning [`LogLevel::Verbose`]). + pub fn set_log_verbosity_level(level: LogLevel) -> Result<(), TdlibSysError> { + let numeric_level: i32 = match level { + LogLevel::Fatal => 0, + LogLevel::Error => 1, + LogLevel::Warn => 2, + LogLevel::Info => 3, + LogLevel::Debug => 4, + LogLevel::Verbose => 5, + LogLevel::More(n) => { + if n > 1024 { + return Err(TdlibSysError::LogLevelOutOfBounds(n)); + } else { + n as i32 + } + } + }; + unsafe { tdlib_sys::td_set_log_verbosity_level(numeric_level) }; + Ok(()) + } + + /// Sets the maximum size of the file to where the internal TDLib log is written + /// before the file will be auto-rotated. + /// + /// Unused if log is not written to a file. Defaults to 10 MB. + pub fn set_log_max_file_size(size: i64) { + unsafe { tdlib_sys::td_set_log_max_file_size(size) }; + } + + /// Sets the path to the file where the internal TDLib log will be written. + /// + /// By default TDLib writes logs to stderr or an OS specific log. + /// + /// Use this method to write the log to a file instead. + pub fn set_log_file_path<S: AsRef<str>>(path: S) -> Result<(), TdlibSysError> { + let cpath = CString::new(path.as_ref())?; + if unsafe { tdlib_sys::td_set_log_file_path(cpath.as_ptr()) } == 0 { + Err(TdlibSysError::CannotSetLogFile) + } else { + Ok(()) + } + } + + /// Resets log file. + /// + /// This function reverts writing to a log file and restores + /// default behaviour for logging. At the time of writing, this means + /// logging to stderr + pub fn reset_log_file_path() -> Result<(), TdlibSysError> { + if unsafe { tdlib_sys::td_set_log_file_path(std::ptr::null()) } == 0 { + Err(TdlibSysError::CannotSetLogFile) + } else { + Ok(()) + } + } + + /// Creates a new instance of TDLib. + pub fn new() -> Self { + Self(unsafe { tdlib_sys::td_json_client_create() }) + } + + /// Sends request to the TDLib client. + /// + /// May be called from any thread. + pub fn send(&self, request: &str) -> Result<(), TdlibSysError> { + let cstring = CString::new(request)?; + unsafe { tdlib_sys::td_json_client_send(self.0, cstring.as_ptr()) }; + Ok(()) + } + + /// Receives incoming updates and request responses from the TDLib client. + /// + /// May be called from any thread, but must not be called simultaneously + /// from two different threads. + pub fn execute(&self, request: &str) -> Result<Option<String>, TdlibSysError> { + let repr_c = CString::new(request)?.as_ptr(); + Ok(Self::string_from_mut_char(unsafe { + tdlib_sys::td_json_client_execute(self.0, repr_c) + })) + } + + /// Receives incoming updates and request responses from the TDLib client. + /// + /// May be called from any thread, but shouldn't be called simultaneously + /// from two different threads. + pub fn receive(&self, timeout: f64) -> Option<String> { + Self::string_from_mut_char(unsafe { tdlib_sys::td_json_client_receive(self.0, timeout) }) + } + + /// Convert `*mut c_char` into String. + /// + /// Returns None if `ptr` is null + fn string_from_mut_char(ptr: *mut c_char) -> Option<String> { + let cstr = unsafe { CStr::from_ptr(ptr.as_ref()?) }; + Some(cstr.to_string_lossy().into_owned()) + } +} + +impl Drop for TdPtr { + fn drop(&mut self) { + unsafe { + tdlib_sys::td_json_client_destroy(self.0); + } + } +} + +mod tests { + #[test] + fn test_create() { + let _td = super::TdPtr::new(); + } +} diff --git a/src/update.rs b/src/update.rs new file mode 100644 index 0000000..b2471da --- /dev/null +++ b/src/update.rs @@ -0,0 +1,18 @@ +use crate::client::Client; +use futures::future::BoxFuture; +use serde_json::value::Value; +use std::future::Future; + +pub trait Handler: Send + Sync + 'static { + fn handle_json(&self, _: Client, _: Value) -> BoxFuture<'static, ()>; +} + +impl<C, F> Handler for C +where + C: Send + Sync + 'static + Fn(Client, Value) -> F, + F: Future<Output = ()> + 'static + Send, +{ + fn handle_json(&self, client: Client, req: Value) -> BoxFuture<'static, ()> { + Box::pin((*self)(client, req)) + } +} diff --git a/src/value_ext.rs b/src/value_ext.rs new file mode 100644 index 0000000..71b4175 --- /dev/null +++ b/src/value_ext.rs @@ -0,0 +1,23 @@ +use crate::error::{Error, Result}; +use serde_json::Value; + +/// Some extensions for serde_json::Value to be used as TDLib object +pub trait ValueExt { + /// get @type from json + fn get_type<'a>(&'a self) -> Result<&'a str>; + + /// get @extra from json + fn get_extra<'a>(&'a self) -> Result<&'a str>; +} + +impl ValueExt for Value { + fn get_type<'a>(&'a self) -> Result<&'a str> { + self["@type"].as_str().ok_or(Error::HasNoTypeInJson) + } + + fn get_extra<'a>(&'a self) -> Result<&'a str> { + self["@extra"] + .as_str() + .ok_or_else(|| Error::InvalidJson("no @extra in json".into())) + } +} |