summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml6
-rw-r--r--src/client.rs55
-rw-r--r--src/main.rs138
-rw-r--r--src/update.rs29
4 files changed, 161 insertions, 67 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 31bac94..216fe6f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,8 +8,10 @@ edition = "2018"
[dependencies]
"rust-tdlib" = { path = "rust-tdlib" }
-# serde = { version = "1", features = ["derive"] }
-json = "0.12"
+"pert-types" = { path = "types" }
+#serde = { version = "1", features = ["derive"] }
+serde_json = "1"
+crossbeam = "0.7"
dotenv = "0.15"
log = "0.4"
env_logger = "0.7"
diff --git a/src/client.rs b/src/client.rs
index db46819..0e215f8 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -5,15 +5,22 @@ use std::{
future::Future,
pin::Pin,
sync::{
- mpsc::{ Sender, Receiver, channel },
Arc,
Mutex
},
thread,
};
+use crossbeam::channel::{
+ self,
+ Sender,
+ Receiver,
+};
use uuid::Uuid;
use std::collections::HashMap;
-use json::JsonValue;
+use serde_json::Value as JsonValue;
+use crate::update::{ UpdateRouter, Handler };
+use pert_types::types::Update;
+
#[derive(Debug)]
@@ -48,37 +55,40 @@ enum JoinStreams {
NewResponse(String),
}
-#[derive(Debug)]
+#[derive(Clone)]
pub struct Client {
//waker_handle: std::thread::JoinHandle<()>,
sender: Sender<JoinStreams>,
}
impl Client {
- pub fn new(log_opt: Option<i32>) -> Self {
+ pub fn new(log_opt: Option<i32>, updater: UpdateRouter) -> Self {
if let Some(log) = log_opt {
Tdlib::set_log_verbosity_level(log).ok();
}
- let (tx, rx) = channel();
+ let (tx, rx) = channel::unbounded();
+
let tx_for_tg = tx.clone();
let api = Arc::new(Tdlib::new());
let api_for_listener = api.clone();
let api_for_responder = api.clone();
- let rt = tokio::runtime::Handle::try_current().expect("Must be in runtime already");
+ let sender_for_responder = tx.clone();
+
let _run_handle = thread::spawn(
- move || OneshotResponder::new(rx, api_for_responder).run(rt)
+ move || OneshotResponder::new(rx, api_for_responder).run(
+ updater, Self { sender: sender_for_responder }
+ )
);
let _tg_handle = thread::spawn(
move || Self::listen_tg(tx_for_tg, api_for_listener, 1.0)
);
-
Self {
//waker_handle: _run_handle,
- sender: tx
+ sender: tx,
}
}
- pub fn send(&mut self, req: &JsonValue) -> RequestFuture {
+ pub fn send(&self, req: &JsonValue) -> RequestFuture {
let request = RequestData {
req: req.to_owned(),
resp: None,
@@ -119,9 +129,7 @@ impl OneshotResponder {
}
}
- fn run(&mut self, rt: tokio::runtime::Handle) {
- let mut updater = crate::update::UpdateRouter::new(rt);
- updater.add_handler("updateOption", |val: JsonValue| { async move { info!("async update: {}", val) } });
+ fn run(&mut self, updater: UpdateRouter, client: Client) {
loop {
match self.rx.recv() {
Ok(JoinStreams::NewRequest(fut)) => {
@@ -135,21 +143,20 @@ impl OneshotResponder {
};
let data = fut.data.clone();
let request: &mut JsonValue = &mut data.lock().unwrap().req;
- if request.has_key("@extra") {
+ if !request["@extra"].is_null() {
warn!("overwriting @extra in request");
}
- request["@extra"] = JsonValue::from(id.to_hyphenated().to_string());
- self.api.send(request.dump().as_ref());
+ request["@extra"] = id.to_hyphenated().to_string().into();
+ self.api.send(request.to_string().as_ref());
self.wakers_map.insert(id, fut);
- trace!("new req:\n{}", json::stringify_pretty(request.clone(), 2));
+ trace!("new req:\n{:#}", request);
},
Ok(JoinStreams::NewResponse(resp)) => {
- match json::parse(resp.as_ref()) {
+ match serde_json::from_str::<JsonValue>(resp.as_ref()) {
Ok(val) => {
- trace!("received update: {}", val);
let typ = val["@type"].as_str().unwrap();
if typ.starts_with("update") {
- updater.dispatch(val);
+ updater.dispatch(&client, val);
} else {
self.handle_response(val);
}
@@ -171,12 +178,14 @@ impl OneshotResponder {
fn handle_response(&mut self, resp: JsonValue) {
if let Some(id_str) = resp["@extra"].as_str() {
if let Ok(id) = Uuid::parse_str(id_str) {
- let fut_data = self.wakers_map
+ let fut_extracted = self.wakers_map
.remove(&id)
.unwrap()
.data;
- fut_data.lock().unwrap().resp = Some(resp);
- fut_data.lock().unwrap()
+
+ let mut fut_data = fut_extracted.lock().unwrap();
+ fut_data.resp = Some(resp);
+ fut_data
.waker.as_ref()
.and_then(|waker: &Waker| { waker.clone().wake(); Some(()) });
}
diff --git a/src/main.rs b/src/main.rs
index 289ad02..0a6b256 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,10 +1,21 @@
use std::env;
use tokio;
+use log::{ info, error };
mod client;
//mod auth;
mod update;
//mod message;
+/*
+struct UpdateHandler;
+
+impl update::Handler for UpdateHandler {
+ fn handle(&self, _client: client::Client, req: serde_json::Value) -> futures::future::BoxFuture<'static, ()> {
+ Box::pin(async move {
+ info!("update: {:#}", req)
+ })
+ }
+}*/
#[tokio::main]
async fn main() {
@@ -14,41 +25,104 @@ async fn main() {
.ok()
.and_then(|var| var.parse().ok());
- let mut tg = client::Client::new(tg_log);
+ let mut updater = update::UpdateRouter::new(
+ tokio::runtime::Handle::try_current().expect("Must be in runtime")
+ );
- tokio::spawn(async move {
- let _auth_state = tg.send(
- &json::object!{
- "@type": "getAuthorizationState"
- }
- ).await;
+ let update_auth_state = |tg: client::Client, val: serde_json::Value| async move {
let cache = env::current_dir().unwrap().join("cache");
- let resp = tg.send(&json::object!{
- "@type": "setTdlibParameters",
- "parameters": {
- "use_test_dc": false,
- "api_id": env::var("API_ID").unwrap(),
- "api_hash": env::var("API_HASH").unwrap(),
- "device_model": "mbia",
- "system_version": "Catalina",
- "application_version": "0.1",
- "system_language_code": "en",
- "database_directory": cache.join("database").to_str().unwrap(),
- "use_message_database": false,
- "files_directory": cache.join("files").to_str().unwrap(),
- "use_secret_chats": false,
+ info!("auth update: val: {:#}", val);
+ match val.pointer("/authorization_state/@type").unwrap()
+ .as_str().unwrap() {
+ "authorizationStateWaitTdlibParameters" => {
+ let resp = tg.send(&serde_json::json!({
+ "@type": "setTdlibParameters",
+ "parameters": {
+ "use_test_dc": true,
+ "api_id": env::var("API_ID").unwrap(),
+ "api_hash": env::var("API_HASH").unwrap(),
+ "device_model": "mbia",
+ "system_version": "Catalina",
+ "application_version": "0.1",
+ "system_language_code": "en",
+ "database_directory": cache.join("database").to_str().unwrap(),
+ "use_message_database": false,
+ "files_directory": cache.join("files").to_str().unwrap(),
+ "use_secret_chats": false,
+ },
+ })).await;
+ info!("settdlib: {}", resp);
},
- }).await;
- println!("resp: {}", resp);
- });
- std::thread::sleep(std::time::Duration::new(1, 0));
-}
+ "authorizationStateWaitEncryptionKey" => {
+ let resp = tg.send(&serde_json::json!({
+ "@type": "setDatabaseEncryptionKey",
+ "encryption_key": "sup3rs3cr3t",
+ })).await;
+ info!("setenckey: {}", resp);
+ },
+ "authorizationStateWaitPhoneNumber" => {
+ let resp = tg.send(&serde_json::json!({
+ "@type": "setAuthenticationPhoneNumber",
+ "phone_number": "+79859053875",
+ "settings": {
+ "allow_flash_call": false,
+ "is_current_phone_number": false,
+ "allow_sms_retriever_api": false,
+ }
+ })).await;
+ info!("setphone: {}", resp);
+ },
+ "authorizationStateWaitRegistration" => {
+ let resp = tg.send(&serde_json::json!({
+ "@type": "registerUser",
+ "first_name": "John",
+ "last_name": "Doe",
+ })).await;
+ info!("reg: {}", resp);
+ },
+ "authorizationStateWaitCode" => {
+ let code = {
+ let mut s = String::new();
+ std::io::stdin().read_line(&mut s).expect("Could not read line");
+ s
+ };
+ let resp = tg.send(&serde_json::json!({
+ "@type": "checkAuthenticationCode",
+ "code": code,
+ })).await;
+ info!("checkcode: {}", resp);
+ },
+ auth_state => {
+ error!("Unknown auth state update: {} / {:#}", auth_state, val);
+ }
+ };
+ };
+ updater.add_handler("updateAuthorizationState", update_auth_state);
-/*
-fn main() {
- runtime::Runtime::new(|arc_msg, tx| {
- tx.send(std::sync::Arc::new(runtime::Task{})).unwrap();
- }).run();
+ let tg = client::Client::new(tg_log, updater);
+ std::thread::sleep(std::time::Duration::new(2, 0));
+ let get_chats = tg.send(&serde_json::json!({
+ "@type": "getChats",
+ "chat_list": {
+ "@type": "chatListMain"
+ },
+ "offset_order": "9223372036854775807",
+ "offset_chat_id": "9223372036854775807",
+ "limit": 5
+ })).await;
+ error!("send msg: {}", get_chats);
+ let chat = get_chats.pointer("/chat_ids/0").unwrap_or(&serde_json::json!(0)).as_u64().unwrap();
+ let get_chat = tg.send(&serde_json::json!({
+ "@type": "getChat",
+ "chat_id": chat,
+ })).await;
+ error!("get_chat: {:#}", get_chat);
+ let me = tg.send(&serde_json::json!({
+ "@type": "getMe",
+ })).await;
+ error!("me: {:#}", me);
+
+
+ std::thread::sleep(std::time::Duration::new(200, 0));
}
-*/
diff --git a/src/update.rs b/src/update.rs
index a6884e3..b5b0ba7 100644
--- a/src/update.rs
+++ b/src/update.rs
@@ -1,17 +1,19 @@
use std::collections::HashMap;
-use json::JsonValue;
+use serde_json::Value as JsonValue;
use std::future::Future;
use futures::future::BoxFuture;
+use log::{ warn, trace };
+use crate::client::Client;
pub trait Handler: Send + Sync + 'static {
- fn handle(&self, _: JsonValue) -> BoxFuture<'static, ()>;
+ fn handle(&self, _: Client, _: JsonValue) -> BoxFuture<'static, ()>;
}
impl<C, F> Handler for C
-where C: Send + Sync + 'static + Fn(JsonValue) -> F,
+where C: Send + Sync + 'static + Fn(Client, JsonValue) -> F,
F: Future<Output = ()> + 'static + Send {
- fn handle(&self, req: JsonValue) -> BoxFuture<'static, ()> {
- Box::pin((*self)(req))
+ fn handle(&self, client: Client, req: JsonValue) -> BoxFuture<'static, ()> {
+ Box::pin((*self)(client, req))
}
}
@@ -24,17 +26,24 @@ impl UpdateRouter {
pub fn new(rt: tokio::runtime::Handle) -> Self {
Self {
router: HashMap::new(),
- rt: rt
+ rt: rt,
}
}
+
pub fn add_handler<H: Handler>(&mut self, update_type: &str, handler: H) {
self.router.insert(update_type.to_owned(), Box::new(handler));
}
- pub fn dispatch(&self, update: JsonValue) {
+ pub fn dispatch(&self, client: &Client, update: JsonValue) {
let update_type: &str = update["@type"].as_str().unwrap();
- self.router.get(update_type)
- .and_then(|handler| { self.rt.spawn(handler.handle(update)); Some(()) })
- .or_else(|| { println!("handler not found"); Some(()) });
+ match self.router.get(update_type) {
+ Some(handler) => {
+ self.rt.spawn(handler.handle(client.clone(), update));
+ },
+ None => {
+ warn!("no handler for {}", update_type);
+ trace!("request was: {}", update);
+ },
+ }
}
}