summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorsyn <isaqtm@gmail.com>2020-05-19 20:54:45 +0300
committersyn <isaqtm@gmail.com>2020-05-19 20:54:45 +0300
commitc89b1acb6e5d5755dc79c1f5643c915624a4c4c3 (patch)
tree2b51d52df177db1b14eb6666236a131356e5452f /src
downloadpert-c89b1acb6e5d5755dc79c1f5643c915624a4c4c3.tar.gz
Some kind of working code
Diffstat (limited to 'src')
-rw-r--r--src/client.rs187
-rw-r--r--src/main.rs54
-rw-r--r--src/update.rs40
3 files changed, 281 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..db46819
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,187 @@
+use log::{ info, error, warn, trace };
+use rust_tdlib::Tdlib;
+use std::{
+ task::{ Waker, Context, Poll },
+ future::Future,
+ pin::Pin,
+ sync::{
+ mpsc::{ Sender, Receiver, channel },
+ Arc,
+ Mutex
+ },
+ thread,
+};
+use uuid::Uuid;
+use std::collections::HashMap;
+use json::JsonValue;
+
+
+#[derive(Debug)]
+pub struct RequestData {
+ req: JsonValue,
+ resp: Option<JsonValue>,
+ waker: Option<Waker>,
+}
+
+#[derive(Debug, Clone)]
+pub struct RequestFuture {
+ data: Arc<Mutex<RequestData>>
+}
+
+impl Future for RequestFuture {
+ type Output = JsonValue;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let mut data = self.data.lock().unwrap();
+ if let Some(resp) = &data.resp {
+ Poll::Ready(resp.clone())
+ } else {
+ data.waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+}
+
+#[derive(Debug)]
+enum JoinStreams {
+ NewRequest(RequestFuture),
+ NewResponse(String),
+}
+
+#[derive(Debug)]
+pub struct Client {
+ //waker_handle: std::thread::JoinHandle<()>,
+ sender: Sender<JoinStreams>,
+}
+
+impl Client {
+ pub fn new(log_opt: Option<i32>) -> Self {
+ if let Some(log) = log_opt {
+ Tdlib::set_log_verbosity_level(log).ok();
+ }
+ let (tx, rx) = channel();
+ let tx_for_tg = tx.clone();
+ let api = Arc::new(Tdlib::new());
+ let api_for_listener = api.clone();
+ let api_for_responder = api.clone();
+ let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already");
+ let _run_handle = thread::spawn(
+ move || OneshotResponder::new(rx, api_for_responder).run(rt)
+ );
+ let _tg_handle = thread::spawn(
+ move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0)
+ );
+
+ Self {
+ //waker_handle: _run_handle,
+ sender: tx
+ }
+ }
+
+ pub fn send(&mut self, req: &JsonValue) -> RequestFuture {
+ let request = RequestData {
+ req: req.to_owned(),
+ resp: None,
+ waker: None
+ };
+ let fut = RequestFuture {
+ data: Arc::new(Mutex::new(request))
+ };
+
+ self.sender.send(JoinStreams::NewRequest(fut.clone())).unwrap();
+ fut
+ }
+
+ fn listen_tg(tx: Sender<JoinStreams>, api: Arc<Tdlib>, timeout: f64) {
+ loop {
+ if let Some(msg) = api.receive(timeout) {
+ tx.send(JoinStreams::NewResponse(msg)).unwrap();
+ } else {
+ info!("receive timed out");
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+struct OneshotResponder {
+ api: Arc<Tdlib>,
+ wakers_map: HashMap<Uuid, RequestFuture>,
+ rx: Receiver<JoinStreams>,
+}
+
+impl OneshotResponder {
+ fn new(rx: Receiver<JoinStreams>, api: Arc<Tdlib>) -> Self {
+ Self {
+ api: api,
+ wakers_map: HashMap::new(),
+ rx: rx
+ }
+ }
+
+ fn run(&mut self, rt: tokio::runtime::Handle) {
+ let mut updater = crate::update::UpdateRouter::new(rt);
+ updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } });
+ loop {
+ match self.rx.recv() {
+ Ok(JoinStreams::NewRequest(fut)) => {
+ let id = loop {
+ let id = Uuid::new_v4();
+ if self.wakers_map.contains_key(&id) {
+ continue;
+ } else {
+ break id;
+ }
+ };
+ let data = fut.data.clone();
+ let request: &mut JsonValue = &mut data.lock().unwrap().req;
+ if request.has_key("@extra") {
+ warn!("overwriting @extra in request");
+ }
+ request["@extra"] = JsonValue::from(id.to_hyphenated().to_string());
+ self.api.send(request.dump().as_ref());
+ self.wakers_map.insert(id, fut);
+ trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2));
+ },
+ Ok(JoinStreams::NewResponse(resp)) => {
+ match json::parse(resp.as_ref()) {
+ Ok(val) => {
+ trace!("received update: {}", val);
+ let typ = val["@type"].as_str().unwrap();
+ if typ.starts_with("update") {
+ updater.dispatch(val);
+ } else {
+ self.handle_response(val);
+ }
+ },
+ Err(e) => {
+ warn!("ignoring invalid response. err: {}, resp: {}", e, resp);
+ }
+ }
+ },
+ Err(e) => {
+ error!("stream closed: {}", e);
+ error!("closing thread");
+ break;
+ }
+ }
+ }
+ }
+
+ fn handle_response(&mut self, resp: JsonValue) {
+ if let Some(id_str) = resp["@extra"].as_str() {
+ if let Ok(id) = Uuid::parse_str(id_str) {
+ let fut_data = self.wakers_map
+ .remove(&id)
+ .unwrap()
+ .data;
+ fut_data.lock().unwrap().resp = Some(resp);
+ fut_data.lock().unwrap()
+ .waker.as_ref()
+ .and_then(|waker: &Waker| { waker.clone().wake(); Some(()) });
+ }
+ } else {
+ warn!("update has invalid @extra: {}", resp);
+ }
+ }
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..289ad02
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,54 @@
+use std::env;
+use tokio;
+
+mod client;
+//mod auth;
+mod update;
+//mod message;
+
+#[tokio::main]
+async fn main() {
+ dotenv::dotenv().ok();
+ env_logger::init();
+ let tg_log: Option<i32> = env::var("TG_LOG")
+ .ok()
+ .and_then(|var| var.parse().ok());
+
+ let mut tg = client::Client::new(tg_log);
+
+ tokio::spawn(async move {
+ let _auth_state = tg.send(
+ &json::object!{
+ "@type": "getAuthorizationState"
+ }
+ ).await;
+ let cache = env::current_dir().unwrap().join("cache");
+ let resp = tg.send(&json::object!{
+ "@type": "setTdlibParameters",
+ "parameters": {
+ "use_test_dc": false,
+ "api_id": env::var("API_ID").unwrap(),
+ "api_hash": env::var("API_HASH").unwrap(),
+ "device_model": "mbia",
+ "system_version": "Catalina",
+ "application_version": "0.1",
+ "system_language_code": "en",
+ "database_directory": cache.join("database").to_str().unwrap(),
+ "use_message_database": false,
+ "files_directory": cache.join("files").to_str().unwrap(),
+ "use_secret_chats": false,
+ },
+ }).await;
+ println!("resp: {}", resp);
+ });
+ std::thread::sleep(std::time::Duration::new(1, 0));
+}
+
+
+/*
+fn main() {
+ runtime::Runtime::new(|arc_msg, tx| {
+ tx.send(std::sync::Arc::new(runtime::Task{})).unwrap();
+ }).run();
+}
+*/
diff --git a/src/update.rs b/src/update.rs
new file mode 100644
index 0000000..a6884e3
--- /dev/null
+++ b/src/update.rs
@@ -0,0 +1,40 @@
+use std::collections::HashMap;
+use json::JsonValue;
+use std::future::Future;
+use futures::future::BoxFuture;
+
+pub trait Handler: Send + Sync + 'static {
+ fn handle(&self, _: JsonValue) -> BoxFuture<'static, ()>;
+}
+
+impl<C, F> Handler for C
+where C: Send + Sync + 'static + Fn(JsonValue) -> F,
+ F: Future<Output = ()> + 'static + Send {
+ fn handle(&self, req: JsonValue) -> BoxFuture<'static, ()> {
+ Box::pin((*self)(req))
+ }
+}
+
+pub struct UpdateRouter {
+ router: HashMap<String, Box<dyn Handler>>,
+ rt: tokio::runtime::Handle,
+}
+
+impl UpdateRouter {
+ pub fn new(rt: tokio::runtime::Handle) -> Self {
+ Self {
+ router: HashMap::new(),
+ rt: rt
+ }
+ }
+ pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) {
+ self.router.insert(update_type.to_owned(), Box::new(handler));
+ }
+
+ pub fn dispatch(&self, update: JsonValue) {
+ let update_type: &str = update["@type"].as_str().unwrap();
+ self.router.get(update_type)
+ .and_then(|handler| { self.rt.spawn(handler.handle(update)); Some(()) })
+ .or_else(|| { println!("handler not found"); Some(()) });
+ }
+}