diff options
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 55 |
1 files changed, 32 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs index db46819..0e215f8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,15 +5,22 @@ use std::{ future::Future, pin::Pin, sync::{ - mpsc::{ Sender, Receiver, channel }, Arc, Mutex }, thread, }; +use crossbeam::channel::{ + self, + Sender, + Receiver, +}; use uuid::Uuid; use std::collections::HashMap; -use json::JsonValue; +use serde_json::Value as JsonValue; +use crate::update::{ UpdateRouter, Handler }; +use pert_types::types::Update; + #[derive(Debug)] @@ -48,37 +55,40 @@ enum JoinStreams { NewResponse(String), } -#[derive(Debug)] +#[derive(Clone)] pub struct Client { //waker_handle: std::thread::JoinHandle<()>, sender: Sender<JoinStreams>, } impl Client { - pub fn new(log_opt: Option<i32>) -> Self { + pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self { if let Some(log) = log_opt { Tdlib::set_log_verbosity_level(log).ok(); } - let (tx, rx) = channel(); + let (tx, rx) = channel::unbounded(); + let tx_for_tg = tx.clone(); let api = Arc::new(Tdlib::new()); let api_for_listener = api.clone(); let api_for_responder = api.clone(); - let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already"); + let sender_for_responder = tx.clone(); + let _run_handle = thread::spawn( - move || OneshotResponder::new(rx, api_for_responder).run(rt) + move || OneshotResponder::new(rx, api_for_responder).run( + updater, Self { sender: sender_for_responder } + ) ); let _tg_handle = thread::spawn( move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0) ); - Self { //waker_handle: _run_handle, - sender: tx + sender: tx, } } - pub fn send(&mut self, req: &JsonValue) -> RequestFuture { + pub fn send(&self, req: &JsonValue) -> RequestFuture { let request = RequestData { req: req.to_owned(), resp: None, @@ -119,9 +129,7 @@ impl OneshotResponder { } } - fn run(&mut self, rt: tokio::runtime::Handle) { - let mut updater = crate::update::UpdateRouter::new(rt); - updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } }); + fn run(&mut self, updater: UpdateRouter, client: Client) { loop { match self.rx.recv() { Ok(JoinStreams::NewRequest(fut)) => { @@ -135,21 +143,20 @@ impl OneshotResponder { }; let data = fut.data.clone(); let request: &mut JsonValue = &mut data.lock().unwrap().req; - if request.has_key("@extra") { + if !request["@extra"].is_null() { warn!("overwriting @extra in request"); } - request["@extra"] = JsonValue::from(id.to_hyphenated().to_string()); - self.api.send(request.dump().as_ref()); + request["@extra"] = id.to_hyphenated().to_string().into(); + self.api.send(request.to_string().as_ref()); self.wakers_map.insert(id, fut); - trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2)); + trace!("new req:\n{:#}", request); }, Ok(JoinStreams::NewResponse(resp)) => { - match json::parse(resp.as_ref()) { + match serde_json::from_str::<JsonValue>(resp.as_ref()) { Ok(val) => { - trace!("received update: {}", val); let typ = val["@type"].as_str().unwrap(); if typ.starts_with("update") { - updater.dispatch(val); + updater.dispatch(&client, val); } else { self.handle_response(val); } @@ -171,12 +178,14 @@ impl OneshotResponder { fn handle_response(&mut self, resp: JsonValue) { if let Some(id_str) = resp["@extra"].as_str() { if let Ok(id) = Uuid::parse_str(id_str) { - let fut_data = self.wakers_map + let fut_extracted = self.wakers_map .remove(&id) .unwrap() .data; - fut_data.lock().unwrap().resp = Some(resp); - fut_data.lock().unwrap() + + let mut fut_data = fut_extracted.lock().unwrap(); + fut_data.resp = Some(resp); + fut_data .waker.as_ref() .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) }); } |