summaryrefslogtreecommitdiffstats
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs55
1 files changed, 32 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs
index db46819..0e215f8 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -5,15 +5,22 @@ use std::{
future::Future,
pin::Pin,
sync::{
- mpsc::{ Sender, Receiver, channel },
Arc,
Mutex
},
thread,
};
+use crossbeam::channel::{
+ self,
+ Sender,
+ Receiver,
+};
use uuid::Uuid;
use std::collections::HashMap;
-use json::JsonValue;
+use serde_json::Value as JsonValue;
+use crate::update::{ UpdateRouter, Handler };
+use pert_types::types::Update;
+
#[derive(Debug)]
@@ -48,37 +55,40 @@ enum JoinStreams {
NewResponse(String),
}
-#[derive(Debug)]
+#[derive(Clone)]
pub struct Client {
//waker_handle: std::thread::JoinHandle<()>,
sender: Sender<JoinStreams>,
}
impl Client {
- pub fn new(log_opt: Option<i32>) -> Self {
+ pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self {
if let Some(log) = log_opt {
Tdlib::set_log_verbosity_level(log).ok();
}
- let (tx, rx) = channel();
+ let (tx, rx) = channel::unbounded();
+
let tx_for_tg = tx.clone();
let api = Arc::new(Tdlib::new());
let api_for_listener = api.clone();
let api_for_responder = api.clone();
- let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already");
+ let sender_for_responder = tx.clone();
+
let _run_handle = thread::spawn(
- move || OneshotResponder::new(rx, api_for_responder).run(rt)
+ move || OneshotResponder::new(rx, api_for_responder).run(
+ updater, Self { sender: sender_for_responder }
+ )
);
let _tg_handle = thread::spawn(
move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0)
);
-
Self {
//waker_handle: _run_handle,
- sender: tx
+ sender: tx,
}
}
- pub fn send(&mut self, req: &JsonValue) -> RequestFuture {
+ pub fn send(&self, req: &JsonValue) -> RequestFuture {
let request = RequestData {
req: req.to_owned(),
resp: None,
@@ -119,9 +129,7 @@ impl OneshotResponder {
}
}
- fn run(&mut self, rt: tokio::runtime::Handle) {
- let mut updater = crate::update::UpdateRouter::new(rt);
- updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } });
+ fn run(&mut self, updater: UpdateRouter, client: Client) {
loop {
match self.rx.recv() {
Ok(JoinStreams::NewRequest(fut)) => {
@@ -135,21 +143,20 @@ impl OneshotResponder {
};
let data = fut.data.clone();
let request: &mut JsonValue = &mut data.lock().unwrap().req;
- if request.has_key("@extra") {
+ if !request["@extra"].is_null() {
warn!("overwriting @extra in request");
}
- request["@extra"] = JsonValue::from(id.to_hyphenated().to_string());
- self.api.send(request.dump().as_ref());
+ request["@extra"] = id.to_hyphenated().to_string().into();
+ self.api.send(request.to_string().as_ref());
self.wakers_map.insert(id, fut);
- trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2));
+ trace!("new req:\n{:#}", request);
},
Ok(JoinStreams::NewResponse(resp)) => {
- match json::parse(resp.as_ref()) {
+ match serde_json::from_str::<JsonValue>(resp.as_ref()) {
Ok(val) => {
- trace!("received update: {}", val);
let typ = val["@type"].as_str().unwrap();
if typ.starts_with("update") {
- updater.dispatch(val);
+ updater.dispatch(&client, val);
} else {
self.handle_response(val);
}
@@ -171,12 +178,14 @@ impl OneshotResponder {
fn handle_response(&mut self, resp: JsonValue) {
if let Some(id_str) = resp["@extra"].as_str() {
if let Ok(id) = Uuid::parse_str(id_str) {
- let fut_data = self.wakers_map
+ let fut_extracted = self.wakers_map
.remove(&id)
.unwrap()
.data;
- fut_data.lock().unwrap().resp = Some(resp);
- fut_data.lock().unwrap()
+
+ let mut fut_data = fut_extracted.lock().unwrap();
+ fut_data.resp = Some(resp);
+ fut_data
.waker.as_ref()
.and_then(|waker: &Waker| { waker.clone().wake(); Some(()) });
}