summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorsyn <isaqtm@gmail.com>2021-01-08 23:50:55 +0300
committersyn <isaqtm@gmail.com>2021-01-08 23:50:55 +0300
commit84e9ac248dadcbbe9cf21a2fe3020005f96b4ec0 (patch)
treea3b831906bd79d2d69787eaa84c2ee9e809d0c2c /src
parent966c1590748a307f80a0f38e7a0b3d5f4f7b34a4 (diff)
downloadtdlib-rs-84e9ac248dadcbbe9cf21a2fe3020005f96b4ec0.tar.gz
Make send infallible by returning errors in future
Diffstat (limited to 'src')
-rw-r--r--src/client/mod.rs69
-rw-r--r--src/client/responder.rs4
2 files changed, 46 insertions, 27 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 445c1d8..fb27561 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -14,6 +14,7 @@
pub(crate) mod client_builder;
mod responder;
+use crate::error::Result;
use crate::raw_ptr::TdPtr;
use crate::update::Handler;
use crossbeam::channel::{self, Sender};
@@ -29,9 +30,9 @@ use std::{
thread,
};
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct Response {
- resp: Option<JsonValue>,
+ resp: Option<Result<JsonValue>>,
waker: Option<Waker>,
}
@@ -78,17 +79,26 @@ pub struct ResponseFuture<R: DeserializeOwned> {
pub response: SafeResponse, // TODO: maybe it is possible to make this lockless
}
+impl<R: DeserializeOwned> ResponseFuture<R> {
+ pub fn from_error(err: crate::error::Error) -> Self {
+ Self {
+ response_type_holder: PhantomData,
+ response: Arc::new(Mutex::new(Response {
+ resp: Some(Err(err)),
+ waker: None,
+ })),
+ }
+ }
+}
+
impl<R: DeserializeOwned> Future for ResponseFuture<R> {
- type Output = Result<R, serde_json::error::Error>;
+ type Output = Result<R>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let mut data = self.response.lock().unwrap();
- if let Some(resp) = data.resp.clone() {
- /* ^^^^^^^^
- * TODO: at this point _this thread_ is the only owner of response,
- * so it is safe to get `resp` without cloning
- */
- Poll::Ready(serde_json::from_value(resp.clone()))
+ if let Some(resp) = data.resp.take() {
+ let result = resp.and_then(|inner| -> Result<R> { Ok(serde_json::from_value(inner)?) });
+ Poll::Ready(result)
} else {
data.waker = Some(ctx.waker().clone());
Poll::Pending
@@ -107,6 +117,10 @@ pub struct Client {
sender: Sender<JoinStreams>,
}
+pub trait ClientLike {
+ fn send<Req: Request, Resp: DeserializeOwned>(&self, req: Req) -> ResponseFuture<Resp>;
+}
+
impl Client {
pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self {
let (tx, rx) = channel::unbounded();
@@ -134,28 +148,33 @@ impl Client {
Self { sender: tx }
}
- pub fn send<Req: Request, Resp: DeserializeOwned>(
- &self,
- req: Req,
- ) -> crate::error::Result<ResponseFuture<Resp>> {
+ /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used.
+ /// Effective return type (now `serde_json::Value`) may be changed in future
+ pub fn send_forget<Req: Request>(&self, req: Req) -> ResponseFuture<JsonValue> {
+ self.send(req)
+ }
+}
+
+impl ClientLike for Client {
+ fn send<Req: Request, Resp: DeserializeOwned>(&self, req: Req) -> ResponseFuture<Resp> {
let fut = ResponseFuture {
response_type_holder: PhantomData,
response: Arc::new(Mutex::new(Response::new_empty())),
};
+ let maybe_sent = req
+ .tag()
+ .and_then(|tagged| serde_json::to_value(tagged).map_err(|err| err.into()))
+ .and_then(|serialized| {
self.sender.send(JoinStreams::NewRequest((
- serde_json::to_value(req.tag()?)?,
- fut.response.clone(),
- )))?;
- Ok(fut)
- }
+ serialized,
+ fut.response.clone()
+ ))).map_err(|err| err.into())
+ });
- /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used.
- /// Effective return type (now `serde_json::Value`) may be changed in future
- pub fn send_forget<Req: Request>(
- &self,
- req: Req,
- ) -> crate::error::Result<ResponseFuture<JsonValue>> {
- self.send(req)
+ match maybe_sent {
+ Ok(_) => fut,
+ Err(err) => ResponseFuture::from_error(err)
+ }
}
}
diff --git a/src/client/responder.rs b/src/client/responder.rs
index 87cacb8..6f5dd53 100644
--- a/src/client/responder.rs
+++ b/src/client/responder.rs
@@ -120,7 +120,7 @@ impl<H: Handler> OneshotResponder<H> {
if let Some(id) = resp["@extra"].as_u64() {
if let Some(fut) = self.wakers_map.remove(&id) {
let mut fut_data = fut.lock().unwrap();
- fut_data.resp = Some(resp);
+ fut_data.resp = Some(Ok(resp));
fut_data.waker.as_ref().map(Waker::wake_by_ref);
} else {
warn!(
@@ -137,7 +137,7 @@ impl<H: Handler> OneshotResponder<H> {
if let Some(id) = resp["@extra"].as_u64() {
if let Some(fut) = self.wakers_map.remove(&id) {
let mut fut_data = fut.lock().unwrap();
- fut_data.resp = Some(resp);
+ fut_data.resp = Some(Ok(resp));
fut_data.waker.as_ref().map(Waker::wake_by_ref);
} else {
warn!(