diff options
-rw-r--r-- | src/client/mod.rs | 69 | ||||
-rw-r--r-- | src/client/responder.rs | 4 |
2 files changed, 46 insertions, 27 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 445c1d8..fb27561 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod client_builder; mod responder; +use crate::error::Result; use crate::raw_ptr::TdPtr; use crate::update::Handler; use crossbeam::channel::{self, Sender}; @@ -29,9 +30,9 @@ use std::{ thread, }; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Response { - resp: Option<JsonValue>, + resp: Option<Result<JsonValue>>, waker: Option<Waker>, } @@ -78,17 +79,26 @@ pub struct ResponseFuture<R: DeserializeOwned> { pub response: SafeResponse, // TODO: maybe it is possible to make this lockless } +impl<R: DeserializeOwned> ResponseFuture<R> { + pub fn from_error(err: crate::error::Error) -> Self { + Self { + response_type_holder: PhantomData, + response: Arc::new(Mutex::new(Response { + resp: Some(Err(err)), + waker: None, + })), + } + } +} + impl<R: DeserializeOwned> Future for ResponseFuture<R> { - type Output = Result<R, serde_json::error::Error>; + type Output = Result<R>; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { let mut data = self.response.lock().unwrap(); - if let Some(resp) = data.resp.clone() { - /* ^^^^^^^^ - * TODO: at this point _this thread_ is the only owner of response, - * so it is safe to get `resp` without cloning - */ - Poll::Ready(serde_json::from_value(resp.clone())) + if let Some(resp) = data.resp.take() { + let result = resp.and_then(|inner| -> Result<R> { Ok(serde_json::from_value(inner)?) }); + Poll::Ready(result) } else { data.waker = Some(ctx.waker().clone()); Poll::Pending @@ -107,6 +117,10 @@ pub struct Client { sender: Sender<JoinStreams>, } +pub trait ClientLike { + fn send<Req: Request, Resp: DeserializeOwned>(&self, req: Req) -> ResponseFuture<Resp>; +} + impl Client { pub(crate) fn new<H: Handler>(updater: H, timeout: f64) -> Self { let (tx, rx) = channel::unbounded(); @@ -134,28 +148,33 @@ impl Client { Self { sender: tx } } - pub fn send<Req: Request, Resp: DeserializeOwned>( - &self, - req: Req, - ) -> crate::error::Result<ResponseFuture<Resp>> { + /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. + /// Effective return type (now `serde_json::Value`) may be changed in future + pub fn send_forget<Req: Request>(&self, req: Req) -> ResponseFuture<JsonValue> { + self.send(req) + } +} + +impl ClientLike for Client { + fn send<Req: Request, Resp: DeserializeOwned>(&self, req: Req) -> ResponseFuture<Resp> { let fut = ResponseFuture { response_type_holder: PhantomData, response: Arc::new(Mutex::new(Response::new_empty())), }; + let maybe_sent = req + .tag() + .and_then(|tagged| serde_json::to_value(tagged).map_err(|err| err.into())) + .and_then(|serialized| { self.sender.send(JoinStreams::NewRequest(( - serde_json::to_value(req.tag()?)?, - fut.response.clone(), - )))?; - Ok(fut) - } + serialized, + fut.response.clone() + ))).map_err(|err| err.into()) + }); - /// [`send`] specification to return JsonValue. Intended to be used, when return value is not used. - /// Effective return type (now `serde_json::Value`) may be changed in future - pub fn send_forget<Req: Request>( - &self, - req: Req, - ) -> crate::error::Result<ResponseFuture<JsonValue>> { - self.send(req) + match maybe_sent { + Ok(_) => fut, + Err(err) => ResponseFuture::from_error(err) + } } } diff --git a/src/client/responder.rs b/src/client/responder.rs index 87cacb8..6f5dd53 100644 --- a/src/client/responder.rs +++ b/src/client/responder.rs @@ -120,7 +120,7 @@ impl<H: Handler> OneshotResponder<H> { if let Some(id) = resp["@extra"].as_u64() { if let Some(fut) = self.wakers_map.remove(&id) { let mut fut_data = fut.lock().unwrap(); - fut_data.resp = Some(resp); + fut_data.resp = Some(Ok(resp)); fut_data.waker.as_ref().map(Waker::wake_by_ref); } else { warn!( @@ -137,7 +137,7 @@ impl<H: Handler> OneshotResponder<H> { if let Some(id) = resp["@extra"].as_u64() { if let Some(fut) = self.wakers_map.remove(&id) { let mut fut_data = fut.lock().unwrap(); - fut_data.resp = Some(resp); + fut_data.resp = Some(Ok(resp)); fut_data.waker.as_ref().map(Waker::wake_by_ref); } else { warn!( |