From 211b44978a5b43f77516af7beb9f959fabfbc822 Mon Sep 17 00:00:00 2001 From: Neshura Date: Thu, 31 Aug 2023 23:36:37 +0200 Subject: [PATCH] Threading Implementation for higher stability --- Cargo.lock | 12 +-- Cargo.toml | 1 + deploy.sh | 6 ++ src/config/mod.rs | 132 ++++++++++++++---------- src/main.rs | 252 ++++++++++++++++++++++++++++------------------ 5 files changed, 244 insertions(+), 159 deletions(-) create mode 100644 deploy.sh diff --git a/Cargo.lock b/Cargo.lock index b4bdfa5..c201ab5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,6 +301,7 @@ dependencies = [ "serde_derive", "serde_json", "strum_macros", + "tokio", "url", ] @@ -1283,9 +1284,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -1880,11 +1881,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -1893,7 +1893,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "windows-sys", ] diff --git a/Cargo.toml b/Cargo.toml index 7f0756b..c11b1cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,5 @@ serde = "1.0.164" serde_derive = "1.0.164" serde_json = "1.0.97" strum_macros = "0.25.0" +tokio = "1.32.0" url = "2.4.0" diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..719bd26 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,6 @@ +#!/bin/bash +## deploy to machine as automod.new +## stop automod service +## mv automod.new to automod +## restart automod service +## idea: websocket event? \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs index 24994f2..a3e3f88 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,11 +1,25 @@ -use std::{fs::{self, OpenOptions}, path::Path, io::Write, thread::sleep, time, error::Error}; +use std::{ + error::Error, + fs::{self, OpenOptions}, + io::Write, + path::Path, + thread::sleep, + time, +}; -use lemmy_api_common::{sensitive::Sensitive, post::CreatePost, community::{ListCommunities, ListCommunitiesResponse}}; -use lemmy_db_schema::{newtypes::{LanguageId, CommunityId, PostId}, ListingType}; +use lemmy_api_common::{ + community::{ListCommunities, ListCommunitiesResponse}, + post::CreatePost, + sensitive::Sensitive, +}; +use lemmy_db_schema::{ + newtypes::{CommunityId, LanguageId, PostId}, + ListingType, +}; use serde_derive::{Deserialize, Serialize}; use url::Url; -use crate::{CLIENT}; +use crate::CLIENT; macro_rules! pub_struct { ($name:ident {$($field:ident: $t:ty,)*}) => { @@ -40,7 +54,7 @@ impl Secrets { #[derive(Serialize, Deserialize, Clone, PartialEq)] pub(crate) struct LemmyLogin { pub(crate) username: String, - password: String + password: String, } impl LemmyLogin { @@ -49,7 +63,7 @@ impl LemmyLogin { } pub(crate) fn get_password(&self) -> Sensitive { - return Sensitive::new(self.password.clone()) + return Sensitive::new(self.password.clone()); } } @@ -97,58 +111,58 @@ impl Config { self.reddit_config = config_parse.reddit_config; } - pub(crate) fn check_feeds(&mut self, post_history: &mut Vec - , community_ids: &CommunitiesVector, auth: &Sensitive) -> Result, usize, String))>, Box> { + pub(crate) async fn check_feeds( + &mut self, + post_history: &mut Vec, + community_ids: &CommunitiesVector, + auth: &Sensitive, + ) -> Result, usize, String))>, Box> { let mut post_queue: Vec<(CreatePost, (Option, usize, String))> = vec![]; - match self.feeds.iter().map(|feed| { + let mut i = 0; + while i < self.feeds.len() { + let feed = &self.feeds[i]; + let res = CLIENT .get(feed.feed_url.clone()) - .send()?.text()?; + .send() + .await? + .text() + .await?; let data: FeedData = serde_json::from_str(&res).unwrap(); let mut prev_post_idx: Option = None; let mut do_post = true; - post_history - .iter() - .enumerate() - .for_each(|(idx, post)| { - if &post.last_post_url == &data.items[0].url { - do_post = false; - } else if &post.title == &data.title { - prev_post_idx = Some(idx); - } - }); + post_history.iter().enumerate().for_each(|(idx, post)| { + if &post.last_post_url == &data.items[0].url { + do_post = false; + } else if &post.title == &data.title { + prev_post_idx = Some(idx); + } + }); if do_post { let item = &data.items[0]; let new_post = CreatePost { - name: item.title.clone(), - community_id: community_ids.find(&feed.communities.chapter), - url: Some(Url::parse(&item.url).unwrap()), - body: Some( - "[Reddit](https://reddit.com/r/HonzukinoGekokujou)\n\n[Discord](https://discord.com/invite/fGefmzu)".into(), - ), - honeypot: None, - nsfw: Some(false), - language_id: Some(LanguageId(37)), // TODO get this id once every few hours per API request, the ordering of IDs suggests that the EN Id might change in the future - auth: auth.clone(), - }; + name: item.title.clone(), + community_id: community_ids.find(&feed.communities.chapter), + url: Some(Url::parse(&item.url).unwrap()), + body: Some( + "[Reddit](https://reddit.com/r/HonzukinoGekokujou)\n\n[Discord](https://discord.com/invite/fGefmzu)".into(), + ), + honeypot: None, + nsfw: Some(false), + language_id: Some(LanguageId(37)), // TODO get this id once every few hours per API request, the ordering of IDs suggests that the EN Id might change in the future + auth: auth.clone(), + }; - let prev_data = ( - prev_post_idx, - feed.id, - data.title - ); + let prev_data = (prev_post_idx, feed.id, data.title); post_queue.push((new_post, prev_data)); } sleep(time::Duration::from_millis(100)); // Should prevent dos-ing J-Novel servers - return Ok(()); - }).collect() { - Ok(()) => {} - Err(e) => return Err(e) + i += 1; } return Ok(post_queue); @@ -179,7 +193,7 @@ pub(crate) enum LemmyCommunities { aobprepub, aoblightnovel, aobmanga, - metadiscussions + metadiscussions, } pub_struct!(FeedRedditSettings { @@ -198,7 +212,7 @@ pub_struct!(PrevPost { impl PrevPost { pub(crate) fn load() -> Vec { let history; - + if Path::new("posts.json").exists() { let file_contents = match fs::read_to_string("posts.json") { Ok(data) => data, @@ -211,12 +225,10 @@ impl PrevPost { Err(e) => panic!("ERROR: posts.json could not be parsed:\n\n{:#?}", e), }; history = history_parse; - } - else { + } else { history = [].to_vec() } - } - else { + } else { let _ = fs::File::create("posts.json"); history = [].to_vec() } @@ -225,7 +237,12 @@ impl PrevPost { } pub(crate) fn save(data: &Vec) { - let mut file = OpenOptions::new().read(true).write(true).create(true).open("posts.json").unwrap(); + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open("posts.json") + .unwrap(); let json_data = serde_json::to_string_pretty(&data).unwrap(); @@ -243,9 +260,7 @@ pub_struct!(FeedData { items: Vec, }); -pub_struct!(FeedAuthor { - name: String, -}); +pub_struct!(FeedAuthor { name: String, }); pub_struct!(FeedEntry { id: String, @@ -255,7 +270,7 @@ pub_struct!(FeedEntry { image: Option, date_published: String, }); - + // Bot Helper Structs pub_struct!(CommunitiesVector { ids: Vec<(CommunityId, String)>, @@ -263,11 +278,15 @@ pub_struct!(CommunitiesVector { impl CommunitiesVector { pub(crate) fn new() -> CommunitiesVector { - CommunitiesVector{ids: vec![]} + CommunitiesVector { ids: vec![] } } #[warn(unused_results)] - pub(crate) fn load(&mut self, auth: &Sensitive, base: &String) -> Result<(), Box> { + pub(crate) async fn load( + &mut self, + auth: &Sensitive, + base: &String, + ) -> Result<(), Box> { let params = ListCommunities { auth: Some(auth.clone()), type_: Some(ListingType::Local), @@ -277,7 +296,10 @@ impl CommunitiesVector { let res = CLIENT .get(base.clone() + "/api/v3/community/list") .query(¶ms) - .send()?.text()?; + .send() + .await? + .text() + .await?; let site_data: ListCommunitiesResponse = serde_json::from_str(&res).unwrap(); @@ -301,6 +323,6 @@ impl CommunitiesVector { ret_id = id.0; } }); - return ret_id; + return ret_id; } } diff --git a/src/main.rs b/src/main.rs index e80781e..ee5a745 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,28 @@ -use chrono::{Utc, NaiveDateTime}; -use config::{Config, PrevPost, Secrets, CommunitiesVector, LemmyCommunities}; +use chrono::{NaiveDateTime, Utc}; +use config::{CommunitiesVector, Config, LemmyCommunities, PrevPost, Secrets}; use lemmy_api_common::{ + lemmy_db_views::structs::PostView, person::{Login, LoginResponse}, - post::{CreatePost, GetPosts, GetPostsResponse, FeaturePost}, - sensitive::Sensitive, lemmy_db_views::structs::PostView, -}; -use lemmy_db_schema::{ - ListingType, SortType, PostFeatureType, newtypes::CommunityId, + post::{CreatePost, FeaturePost, GetPosts, GetPostsResponse}, + sensitive::Sensitive, }; +use lemmy_db_schema::{newtypes::CommunityId, ListingType, PostFeatureType, SortType}; use once_cell::sync::Lazy; -use reqwest::{blocking::Client, StatusCode}; -use std::{thread::sleep, time, collections::HashMap, error::Error}; +use reqwest::{Client, StatusCode}; +use std::{ + collections::HashMap, + error::Error, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, sleep}, + time::{self, Duration}, +}; +use tokio::sync::Mutex; mod config; - - pub static CLIENT: Lazy = Lazy::new(|| { let client = Client::builder() .timeout(time::Duration::from_secs(30)) @@ -25,6 +32,7 @@ pub static CLIENT: Lazy = Lazy::new(|| { client }); +#[derive(Clone)] struct Bot { secrets: Secrets, config: Config, @@ -47,10 +55,10 @@ impl Bot { } /// Get JWT Token - /// + /// /// * `return` : Returns true if token was succesfully retrieved, false otherwise #[warn(unused_results)] - pub(crate) fn login(&mut self) -> Result<(), Box> { + pub(crate) async fn login(&mut self) -> Result, Box> { let login_params = Login { username_or_email: self.secrets.lemmy.get_username(), password: self.secrets.lemmy.get_password(), @@ -60,15 +68,14 @@ impl Bot { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/user/login") .json(&login_params) - .send()?; - + .send().await?; if res.status() == StatusCode::OK { - let data: &LoginResponse = &res.json().unwrap(); + let data: &LoginResponse = &res.json().await.unwrap(); let jwt = data.jwt.clone().expect("JWT Token could not be acquired"); - self.auth = jwt; - return Ok(()); + self.auth = jwt.clone(); + return Ok(jwt); } else { println!("Error Code: {:?}", res.status()); return Err(Box::new(res.error_for_status().unwrap_err())); @@ -76,30 +83,37 @@ impl Bot { } /// Make Post to Lemmy Instance - /// + /// /// * `post_data` : Object of type [CreatePost] containing post info /// * `return` : Returns true if Post was succesful, false otherwise #[warn(unused_results)] - pub(crate) fn post(&mut self, post_data: CreatePost) -> Result> { + pub(crate) async fn post(&mut self, post_data: CreatePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post") .json(&post_data) - .send()?; + .send().await?; - // TODO: process res to get info about if post was successfuly (mostly if jwt token was valid) - let ret: PostView = serde_json::from_str::>(res.text()?.as_str()).unwrap().remove("post_view").unwrap(); + // TODO: process res to get info about if post was successfuly (mostly if jwt token was valid) + let ret: PostView = serde_json::from_str::>(res.text().await?.as_str()) + .unwrap() + .remove("post_view") + .unwrap(); return Ok(ret); } #[warn(unused_results)] - pub(crate) fn pin_new(&mut self, old_post: &Option, new_post: &PostView) -> Result<(), Box> { + pub(crate) async fn pin_new( + &mut self, + old_post: &Option, + new_post: &PostView, + ) -> Result<(), Box> { match old_post { Some(id) => { let remove_community_pin = FeaturePost { post_id: self.post_history[*id].post_id, featured: false, feature_type: PostFeatureType::Community, - auth: self.auth.clone() + auth: self.auth.clone(), }; let _ = self.pin(remove_community_pin); @@ -119,8 +133,8 @@ impl Bot { let post_list_json = CLIENT .get(self.config.instance.clone() + "/api/v3/post/list") .query(&get_params) - .send()? - .text()?; + .send().await? + .text().await?; let post_list: GetPostsResponse = serde_json::from_str(post_list_json.as_str()).unwrap(); @@ -138,12 +152,12 @@ impl Bot { post_id: post_view.post.id, featured: false, feature_type: PostFeatureType::Local, - auth: self.auth.clone() + auth: self.auth.clone(), }; - - match self.pin(remove_local_pin) { - Ok(_) => {}, - Err(e) => println!("Error Unpinning Post: {:#?}", e) + + match self.pin(remove_local_pin).await { + Ok(_) => {} + Err(e) => println!("Error Unpinning Post: {:#?}", e), }; } } @@ -163,57 +177,68 @@ impl Bot { feature_type: PostFeatureType::Local, auth: self.auth.clone(), }; - - match self.pin(pin_new_local) { - Ok(_) => {}, - Err(e) => println!("Error Pinning Post: {:#?}", e) - };; + + match self.pin(pin_new_local).await { + Ok(_) => {} + Err(e) => println!("Error Pinning Post: {:#?}", e), + }; return Ok(()); } - pub(crate) fn pin (&mut self, pin_data: FeaturePost) -> Result> { + pub(crate) async fn pin(&mut self, pin_data: FeaturePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post/feature") .json(&pin_data) - .send()?; + .send().await?; - let ret: PostView = serde_json::from_str::>(res.text()?.as_str()).unwrap().remove("post_view").unwrap(); + let ret: PostView = serde_json::from_str::>(res.text().await?.as_str()) + .unwrap() + .remove("post_view") + .unwrap(); return Ok(ret.post.featured_local); } #[warn(unused_results)] - pub(crate) fn run_once(&mut self, prev_time: &mut NaiveDateTime) -> Result<(), Box> { + pub(crate) async fn run_once(&mut self, prev_time: &mut NaiveDateTime) -> Result<(), Box> { println!("{:#<1$}", "", 30); self.start_time = Utc::now().naive_local(); - if self.start_time - *prev_time > chrono::Duration::seconds(6) { // Prod should use hours, add command line switch later and read duration from config + if self.start_time - *prev_time > chrono::Duration::seconds(6) { + // Prod should use hours, add command line switch later and read duration from config println!("Reloading Config"); *prev_time = self.start_time; self.config.load(); - self.community_ids.load(&self.auth, &self.config.instance)?; + self.community_ids.load(&self.auth, &self.config.instance).await?; println!("Done!"); } // Start the polling process // Get all feed URLs (use cache) println!("Checking Feeds"); - let post_queue: Vec<(CreatePost, (Option, usize, String))> = self.config.check_feeds(&mut self.post_history, &self.community_ids, &self.auth)?; + let post_queue: Vec<(CreatePost, (Option, usize, String))> = self + .config + .check_feeds(&mut self.post_history, &self.community_ids, &self.auth).await?; println!("Done!"); - let _ = post_queue.iter().map(|(post, (prev_idx, feed_id, feed_title))| -> Result<(), Box> { + + let mut i = 0; + while i < post_queue.len() { + let (post, (prev_idx, feed_id, feed_title)) = &post_queue[i]; + println!("Posting: {}", post.name); - let post_data = self.post(post.clone())?; - - self.pin_new(prev_idx, &post_data)?; - + let post_data = self.post(post.clone()).await?; + + self.pin_new(&prev_idx, &post_data).await?; + // Move current post to old post list match prev_idx { Some(idx) => { self.post_history[*idx].title = feed_title.clone(); self.post_history[*idx].post_id = post_data.post.id; - self.post_history[*idx].last_post_url = post.url.clone().unwrap().to_string(); + self.post_history[*idx].last_post_url = + post.url.clone().unwrap().to_string(); } None => self.post_history.push(PrevPost { id: feed_id.clone(), @@ -222,8 +247,9 @@ impl Bot { last_post_url: post.url.clone().unwrap().to_string(), }), } - Ok(()) - }).collect::>(); + + i += 1; + } PrevPost::save(&self.post_history); @@ -240,27 +266,16 @@ impl Bot { sleep(time::Duration::from_secs(1)); } - match reqwest::blocking::get("https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=") { - Ok(_) => {}, - Err(err) => println!("{}", err) + match reqwest::blocking::get( + "https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=", + ) { + Ok(_) => {} + Err(err) => println!("{}", err), }; - } - - pub(crate) fn print_info(&self) { - print!("\x1B[2J\x1B[1;1H"); - println!("##[Ascendance of a Bookworm Bot]##"); - println!("Instance: {}", &self.config.instance); - println!("Ran Last: {}", &self.start_time.format("%d/%m/%Y %H:%M:%S")); - println!("{:#<1$}", "", 30); - self.post_history.iter().for_each(|post| { - print!("{} ", post.title); - print!("{:<1$}: ", "", 60 - post.title.len()); - println!("{}", post.last_post_url); - }) } } -fn list_posts(auth: &Sensitive, base: String) -> GetPostsResponse { +async fn list_posts(auth: &Sensitive, base: String) -> GetPostsResponse { let params = GetPosts { type_: Some(ListingType::Local), sort: Some(SortType::New), @@ -271,48 +286,89 @@ fn list_posts(auth: &Sensitive, base: String) -> GetPostsResponse { let res = CLIENT .get(base + "/api/v3/post/list") .query(¶ms) - .send() + .send().await .unwrap() - .text() + .text().await .unwrap(); return serde_json::from_str(&res).unwrap(); } -fn run_bot() { +async fn run_bot(bot: Arc>) { + // TODO this currently does not update the bot Mutex when run // Get all needed auth tokens at the start let mut old = Utc::now().naive_local(); - let mut this = Bot::new(); - match this.login() { - Ok(_) => { - let _ = this.community_ids.load(&this.auth, &this.config.instance); - // Enter a loop (not for debugging) - loop { - this.idle(); - // 3 retries in case of connection issues - //let mut loop_breaker: u8 = 0; // DEBUG disabled for clearer crash finding - match this.run_once(&mut old) { - Ok(_) => {}, - Err(e) => panic!("{:#?}", e) - }; - /* while !this.run_once(&mut old).is_ok() && loop_breaker <= 3 { - println!("Unable to complete Bot cycle, retrying with fresh login credentials"); - if this.login().is_ok() { - let _ = this.community_ids.load(&this.auth, &this.config.instance); - } - sleep(time::Duration::from_secs(10)); - loop_breaker += 1; - }; */ - this.print_info(); - this.idle(); - } + let mut this = bot.lock().await.clone(); + match this.login().await { + Ok(_) => { + println!("Login successful"); }, Err(e) => { println!("Unable to get initial login:\n {:#?}", e); } + }; + let _ = this.community_ids.load(&this.auth, &this.config.instance).await; + + loop { + this.idle(); + println!("Debug A"); // DEBUG + match this.run_once(&mut old).await { + Ok(_) => {} + Err(e) => panic!("Crashed due to Error: {:#?}", e), + }; + *bot.lock().await = this.clone(); + + this.idle(); } } -fn main() { - run_bot(); + +async fn print_info(shutdown: Arc, bot: Arc>) { + while !shutdown.load(Ordering::Relaxed) { + let bot: tokio::sync::MutexGuard<'_, Bot> = bot.lock().await; + thread::sleep(Duration::from_millis(500)); + + //print!("\x1B[2J\x1B[1;1H"); + println!( + "##[Ascendance of a Bookworm Bot]## | Time: {}", + Utc::now().naive_local().format("%H:%M:%S") + ); + println!("Instance: {}", bot.config.instance); + println!("Ran Last: {}", bot.start_time.format("%d/%m/%Y %H:%M:%S")); + println!("{:#<1$}", "", 30); + bot.post_history.iter().for_each(|post| { + print!("| -- |"); + print!("{} ", post.title); + print!("{:<1$}| ", "", 60 - post.title.len()); + println!("{}", post.last_post_url); + }) + } +} + +#[tokio::main] +async fn main() { + let shutdown = Arc::new(AtomicBool::new(false)); + + loop { + println!("Starting AoB Bot..."); + + let shutdown_clone = shutdown.clone(); + + let bot = Arc::new(Mutex::new(Bot::new())); + let bot_clone = bot.clone(); + + let bot_thread = tokio::spawn(async move { run_bot(bot).await }); + + let tui_thread = tokio::spawn(async move { print_info(shutdown_clone, bot_clone).await }); + + let _ = bot_thread.await; + + tui_thread.abort(); + + println!("Bot crashed due to unknown Error, restarting thread after wait..."); + + sleep(Duration::from_secs(30)); + } + + }