summaryrefslogtreecommitdiffstats
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..db46819
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,187 @@
+use log::{ info, error, warn, trace };
+use rust_tdlib::Tdlib;
+use std::{
+ task::{ Waker, Context, Poll },
+ future::Future,
+ pin::Pin,
+ sync::{
+ mpsc::{ Sender, Receiver, channel },
+ Arc,
+ Mutex
+ },
+ thread,
+};
+use uuid::Uuid;
+use std::collections::HashMap;
+use json::JsonValue;
+
+
+#[derive(Debug)]
+pub struct RequestData {
+ req: JsonValue,
+ resp: Option<JsonValue>,
+ waker: Option<Waker>,
+}
+
+#[derive(Debug, Clone)]
+pub struct RequestFuture {
+ data: Arc<Mutex<RequestData>>
+}
+
+impl Future for RequestFuture {
+ type Output = JsonValue;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let mut data = self.data.lock().unwrap();
+ if let Some(resp) = &data.resp {
+ Poll::Ready(resp.clone())
+ } else {
+ data.waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+}
+
+#[derive(Debug)]
+enum JoinStreams {
+ NewRequest(RequestFuture),
+ NewResponse(String),
+}
+
+#[derive(Debug)]
+pub struct Client {
+ //waker_handle: std::thread::JoinHandle<()>,
+ sender: Sender<JoinStreams>,
+}
+
+impl Client {
+ pub fn new(log_opt: Option<i32>) -> Self {
+ if let Some(log) = log_opt {
+ Tdlib::set_log_verbosity_level(log).ok();
+ }
+ let (tx, rx) = channel();
+ let tx_for_tg = tx.clone();
+ let api = Arc::new(Tdlib::new());
+ let api_for_listener = api.clone();
+ let api_for_responder = api.clone();
+ let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already");
+ let _run_handle = thread::spawn(
+ move || OneshotResponder::new(rx, api_for_responder).run(rt)
+ );
+ let _tg_handle = thread::spawn(
+ move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0)
+ );
+
+ Self {
+ //waker_handle: _run_handle,
+ sender: tx
+ }
+ }
+
+ pub fn send(&mut self, req: &JsonValue) -> RequestFuture {
+ let request = RequestData {
+ req: req.to_owned(),
+ resp: None,
+ waker: None
+ };
+ let fut = RequestFuture {
+ data: Arc::new(Mutex::new(request))
+ };
+
+ self.sender.send(JoinStreams::NewRequest(fut.clone())).unwrap();
+ fut
+ }
+
+ fn listen_tg(tx: Sender<JoinStreams>, api: Arc<Tdlib>, timeout: f64) {
+ loop {
+ if let Some(msg) = api.receive(timeout) {
+ tx.send(JoinStreams::NewResponse(msg)).unwrap();
+ } else {
+ info!("receive timed out");
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+struct OneshotResponder {
+ api: Arc<Tdlib>,
+ wakers_map: HashMap<Uuid, RequestFuture>,
+ rx: Receiver<JoinStreams>,
+}
+
+impl OneshotResponder {
+ fn new(rx: Receiver<JoinStreams>, api: Arc<Tdlib>) -> Self {
+ Self {
+ api: api,
+ wakers_map: HashMap::new(),
+ rx: rx
+ }
+ }
+
+ fn run(&mut self, rt: tokio::runtime::Handle) {
+ let mut updater = crate::update::UpdateRouter::new(rt);
+ updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } });
+ loop {
+ match self.rx.recv() {
+ Ok(JoinStreams::NewRequest(fut)) => {
+ let id = loop {
+ let id = Uuid::new_v4();
+ if self.wakers_map.contains_key(&id) {
+ continue;
+ } else {
+ break id;
+ }
+ };
+ let data = fut.data.clone();
+ let request: &mut JsonValue = &mut data.lock().unwrap().req;
+ if request.has_key("@extra") {
+ warn!("overwriting @extra in request");
+ }
+ request["@extra"] = JsonValue::from(id.to_hyphenated().to_string());
+ self.api.send(request.dump().as_ref());
+ self.wakers_map.insert(id, fut);
+ trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2));
+ },
+ Ok(JoinStreams::NewResponse(resp)) => {
+ match json::parse(resp.as_ref()) {
+ Ok(val) => {
+ trace!("received update: {}", val);
+ let typ = val["@type"].as_str().unwrap();
+ if typ.starts_with("update") {
+ updater.dispatch(val);
+ } else {
+ self.handle_response(val);
+ }
+ },
+ Err(e) => {
+ warn!("ignoring invalid response. err: {}, resp: {}", e, resp);
+ }
+ }
+ },
+ Err(e) => {
+ error!("stream closed: {}", e);
+ error!("closing thread");
+ break;
+ }
+ }
+ }
+ }
+
+ fn handle_response(&mut self, resp: JsonValue) {
+ if let Some(id_str) = resp["@extra"].as_str() {
+ if let Ok(id) = Uuid::parse_str(id_str) {
+ let fut_data = self.wakers_map
+ .remove(&id)
+ .unwrap()
+ .data;
+ fut_data.lock().unwrap().resp = Some(resp);
+ fut_data.lock().unwrap()
+ .waker.as_ref()
+ .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) });
+ }
+ } else {
+ warn!("update has invalid @extra: {}", resp);
+ }
+ }
+}