From a0d414a091c57a7e670ac283ebb68cf680c2a9d7 Mon Sep 17 00:00:00 2001 From: Neshura Date: Mon, 18 Sep 2023 23:01:22 +0200 Subject: [PATCH] Bot Rewrite for jnovel labs --- src/config/mod.rs | 8 - src/main.rs | 505 +++++++++++++++++++++++++++++----------------- 2 files changed, 320 insertions(+), 193 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 1a864c9..cb45e52 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -158,7 +158,6 @@ impl Config { .await?; let data: FeedSeriesData = serde_json::from_str(&res).unwrap(); - println!("{:#?}", data); } return Ok(()); } @@ -210,10 +209,6 @@ impl Config { // Get First Volume that has valid Release Data if now >= published { if Some(volume.slug.clone()) != history_data.last_volume_slug { - println!( - "Should Post for {} Volume {}", - feed.series_slug, volume.slug - ); if let Some(volume_community) = &feed.communities.volume { let mut post_url = Url::parse(&(volume_url_base!() + &feed.series_slug))?; post_url.set_fragment(Some(&("volume-".to_string() + &volume.number.to_string()))); @@ -235,7 +230,6 @@ impl Config { language_id: Some(LanguageId(37)), // TODO get this id once every few hours per API request, the ordering of IDs suggests that the EN Id might change in the future auth: auth.clone(), }; - println!("{:?}", new_post.url); post_queue.push(( new_post, PostQueueMetadata { @@ -269,8 +263,6 @@ impl Config { data.parts.reverse(); for part in data.parts { if Some(part.slug.clone()) != history_data.last_part_slug { - println!("Should Post for {} Part {}", feed.series_slug, part.slug); - let new_post = CreatePost { name: part.title.clone(), community_id: community_ids.find(&chapter_community), diff --git a/src/main.rs b/src/main.rs index 832bacb..f18e999 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use chrono::{NaiveDateTime, Utc}; +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use config::{CommunitiesVector, Config, LemmyCommunities, PrevPost, Secrets}; use lemmy_api_common::{ lemmy_db_views::structs::PostView, @@ -6,6 +6,7 @@ use lemmy_api_common::{ post::{CreatePost, FeaturePost, GetPosts, GetPostsResponse}, sensitive::Sensitive, }; +use lemmy_db_schema::newtypes::PostId; use lemmy_db_schema::{newtypes::CommunityId, ListingType, PostFeatureType, SortType}; use once_cell::sync::Lazy; use reqwest::{Client, StatusCode}; @@ -16,23 +17,32 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - thread::{self}, - time::{self, Duration}, vec, + vec, }; +use std::str::FromStr; +use std::time::UNIX_EPOCH; use tokio::sync::Mutex; use tokio::time::sleep; mod config; +mod feeds; pub static CLIENT: Lazy = Lazy::new(|| { let client = Client::builder() - .timeout(time::Duration::from_secs(30)) - .connect_timeout(time::Duration::from_secs(30)) + .timeout(Duration::seconds(30).to_std().unwrap()) + .connect_timeout(Duration::seconds(30).to_std().unwrap()) .build() .expect("build client"); client }); +struct PostQueueMetadata { + id: usize, + series: String, + part: Option, + volume: Option, +} + #[derive(Clone)] struct Bot { secrets: Secrets, @@ -40,8 +50,10 @@ struct Bot { post_history: Vec, community_ids: CommunitiesVector, auth: Sensitive, - start_time: NaiveDateTime, + login_error: bool, + start_time: DateTime, message_queue: Vec, + error_queue: Vec, } impl Bot { @@ -52,14 +64,16 @@ impl Bot { post_history: PrevPost::load(), community_ids: CommunitiesVector::new(), auth: Sensitive::new("".to_string()), - start_time: Utc::now().naive_local(), + login_error: true, + start_time: Utc::now(), message_queue: vec![], + error_queue: vec![], } } /// Get JWT Token /// - /// * `return` : Returns true if token was succesfully retrieved, false otherwise + /// * `return` : Returns true if token was successfully retrieved, false otherwise #[warn(unused_results)] pub(crate) async fn login(&mut self) -> Result, Box> { let login_params = Login { @@ -71,111 +85,101 @@ impl Bot { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/user/login") .json(&login_params) - .send().await?; + .send() + .await?; - if res.status() == StatusCode::OK { + return if res.status() == StatusCode::OK { let data: &LoginResponse = &res.json().await.unwrap(); - let jwt = data.jwt.clone().expect("JWT Token could not be acquired"); + let jwt = match data.jwt.clone() { + Some(data) => data, + None => { + self.error_queue.push(format!("Error: Missing JWT Token")); + return Err(Box::try_from(format!("Error: Missing JWT Token")).unwrap()); + } + }; self.auth = jwt.clone(); - return Ok(jwt); + self.login_error = false; + Ok(jwt) } else { - self.message_queue.push(format!("Error Code: {:?}", res.status())); - return Err(Box::new(res.error_for_status().unwrap_err())); - } + self.error_queue + .push(format!("Error Code: {:?}", res.status())); + Err(Box::new(res.error_for_status().unwrap_err())) + }; } /// Make Post to Lemmy Instance /// /// * `post_data` : Object of type [CreatePost] containing post info - /// * `return` : Returns true if Post was succesful, false otherwise + /// * `return` : Returns true if Post was successful, false otherwise #[warn(unused_results)] - pub(crate) async fn post(&mut self, post_data: CreatePost) -> Result> { + pub(crate) async fn post(&mut self, post_data: &CreatePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post") - .json(&post_data) - .send().await?; + .json(post_data) + .send() + .await?; - // TODO: process res to get info about if post was successfuly (mostly if jwt token was valid) - let ret: PostView = serde_json::from_str::>(res.text().await?.as_str()) - .unwrap() + // TODO: process res to get info about if post was successfully (mostly if jwt token was valid) + let ret: PostView = serde_json::from_str::>(res.text().await?.as_str())? .remove("post_view") .unwrap(); return Ok(ret); } #[warn(unused_results)] - pub(crate) async fn pin_new( - &mut self, - old_post: &Option, - new_post: &PostView, - ) -> Result<(), Box> { - match old_post { - Some(id) => { - let remove_community_pin = FeaturePost { - post_id: self.post_history[*id].post_id, - featured: false, - feature_type: PostFeatureType::Community, - auth: self.auth.clone(), - }; - - let _ = self.pin(remove_community_pin); - } - None => { - self.message_queue.push(format!("Unable to unpin old post, please do so manually")); - } - } - - // Unpin the other chapter post on local - // Get all local pinned posts first - // Filter out any post made in the meta community (Community ID) - let get_params = GetPosts { - auth: Some(self.auth.clone()), - ..Default::default() - }; - let post_list_json = CLIENT - .get(self.config.instance.clone() + "/api/v3/post/list") - .query(&get_params) - .send().await? - .text().await?; - - let post_list: GetPostsResponse = serde_json::from_str(post_list_json.as_str()).unwrap(); + pub(crate) async fn pin_post(&mut self, new_post_id: &PostId, new_post_community: &CommunityId) -> Result<(bool, bool), Box> { + let mut local_pins = true; + let mut community_pins = true; + // Unpin Old Posts + // Get Local Posts & Unpin The Other Post let mut meta_community: CommunityId = CommunityId(15); - self.community_ids.ids.iter().for_each(|(id, name)| { if name == &LemmyCommunities::metadiscussions.to_string() { meta_community = id.clone(); } }); - for post_view in post_list.posts { - if post_view.community.id != meta_community && post_view.post.featured_local { - let remove_local_pin = FeaturePost { - post_id: post_view.post.id, - featured: false, - feature_type: PostFeatureType::Local, - auth: self.auth.clone(), - }; + let get_params = GetPosts { + auth: Some(self.auth.clone()), + ..Default::default() + }; - match self.pin(remove_local_pin).await { - Ok(_) => {} - Err(e) => self.message_queue.push(format!("Error Unpinning Post: {:#?}", e)), - }; - } - } + local_pins = self + .unpin_old_posts(get_params, PostFeatureType::Local, &meta_community) + .await?; + // Get Community Posts & Unpin The Other Post + let get_params = GetPosts { + auth: Some(self.auth.clone()), + community_id: Some(new_post_community.clone()), + ..Default::default() + }; + + community_pins = self + .unpin_old_posts(get_params, PostFeatureType::Community, &meta_community) + .await?; + + // Pin New Post let pin_new_community = FeaturePost { - post_id: new_post.post.id, + post_id: new_post_id.clone(), featured: true, feature_type: PostFeatureType::Community, auth: self.auth.clone(), }; - let _ = self.pin(pin_new_community); + match self.pin(pin_new_community).await { + Ok(_) => {} + Err(e) => { + self.message_queue + .push(format!("Error Unpinning Post: {:#?}", e)); + community_pins = false; + } + }; let pin_new_local = FeaturePost { - post_id: new_post.post.id, + post_id: new_post_id.clone(), featured: true, feature_type: PostFeatureType::Local, auth: self.auth.clone(), @@ -183,98 +187,83 @@ impl Bot { match self.pin(pin_new_local).await { Ok(_) => {} - Err(e) => self.message_queue.push(format!("Error Pinning Post: {:#?}", e)), + Err(e) => { + self.message_queue + .push(format!("Error Unpinning Post: {:#?}", e)); + local_pins = false; + } }; - return Ok(()); + return Ok((community_pins, local_pins)); + } + + #[warn(unused_results)] + pub(crate) async fn unpin_old_posts( + &mut self, + get_params: GetPosts, + pin_scope: PostFeatureType, + meta_community: &CommunityId, + ) -> Result> { + let post_list_json = CLIENT + .get(self.config.instance.clone() + "/api/v3/post/list") + .query(&get_params) + .send() + .await? + .text() + .await?; + + let post_list: GetPostsResponse = serde_json::from_str(post_list_json.as_str()).unwrap(); + + for post_view in post_list.posts { + if &post_view.community.id != meta_community && post_view.post.featured_local { + let remove_local_pin = FeaturePost { + post_id: post_view.post.id, + featured: false, + feature_type: pin_scope, + auth: self.auth.clone(), + }; + + match self.pin(remove_local_pin).await { + Ok(_) => {} + Err(e) => { + self.message_queue + .push(format!("Error Unpinning Post: {:#?}", e)); + return Err(Box::from(format!("{}", e))); + } + }; + } + } + + Ok(true) } pub(crate) async fn pin(&mut self, pin_data: FeaturePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post/feature") .json(&pin_data) - .send().await?; + .send() + .await?; - let ret: PostView = serde_json::from_str::>(res.text().await?.as_str()) - .unwrap() + let ret: PostView = serde_json::from_str::>(res.text().await?.as_str())? .remove("post_view") .unwrap(); return Ok(ret.post.featured_local); } - #[warn(unused_results)] - pub(crate) async fn run_once(&mut self, prev_time: &mut NaiveDateTime) -> Result<(), Box> { - self.message_queue.push(format!("{:#<1$}", "", 30)); - self.start_time = Utc::now().naive_local(); - - if self.start_time - *prev_time > chrono::Duration::seconds(6) { - // Prod should use hours, add command line switch later and read duration from config - self.message_queue.push(format!("Reloading Config")); - *prev_time = self.start_time; - self.config.load(); - self.community_ids.load(&self.auth, &self.config.instance).await?; - self.message_queue.push(format!("Done!")); - } - - // Start the polling process - // Get all feed URLs (use cache) - self.message_queue.push(format!("Checking Feeds")); - let post_queue: Vec<(CreatePost, (Option, usize, String))> = self - .config - .check_feeds(&mut self.post_history, &self.community_ids, &self.auth).await?; - self.message_queue.push(format!("Done!")); - - - let mut i = 0; - while i < post_queue.len() { - let (post, (prev_idx, feed_id, feed_title)) = &post_queue[i]; - - self.message_queue.push(format!("Posting: {}", post.name)); - - let post_data = self.post(post.clone()).await?; - - self.pin_new(&prev_idx, &post_data).await?; - - // Move current post to old post list - match prev_idx { - Some(idx) => { - self.post_history[*idx].title = feed_title.clone(); - self.post_history[*idx].post_id = post_data.post.id; - self.post_history[*idx].last_post_url = - post.url.clone().unwrap().to_string(); - } - None => self.post_history.push(PrevPost { - id: feed_id.clone(), - post_id: post_data.post.id, - title: feed_title.clone(), - last_post_url: post.url.clone().unwrap().to_string(), - }), - } - - i += 1; - } - - PrevPost::save(&self.post_history); - - return Ok(()); - } - pub(crate) async fn idle(&mut self) { - let mut sleep_duration = chrono::Duration::seconds(30); - if Utc::now().naive_local() - self.start_time > sleep_duration { - sleep_duration = chrono::Duration::seconds(60); + let mut sleep_duration = Duration::seconds(30); + if Utc::now() - self.start_time > sleep_duration { + sleep_duration = Duration::seconds(60); } - while Utc::now().naive_local() - self.start_time < sleep_duration { - sleep(time::Duration::from_millis(100)).await; - } - - match reqwest::get( - "https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=", - ).await { + match reqwest::get("https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=").await { Ok(_) => {} - Err(err) => self.message_queue.push(format!("{}", err)), + Err(err) => self.error_queue.push(format!("{}", err)), }; + + while Utc::now() - self.start_time < sleep_duration { + sleep(Duration::milliseconds(100).to_std().unwrap()).await; + } } } @@ -289,65 +278,213 @@ async fn list_posts(auth: &Sensitive, base: String) -> GetPostsResponse let res = CLIENT .get(base + "/api/v3/post/list") .query(¶ms) - .send().await + .send() + .await .unwrap() - .text().await + .text() + .await .unwrap(); return serde_json::from_str(&res).unwrap(); } async fn run_bot(bot: Arc>) { - // TODO this currently does not update the bot Mutex when run - // Get all needed auth tokens at the start - let mut old = Utc::now().naive_local(); - let mut this = bot.lock().await.clone(); - match this.login().await { - Ok(_) => { - println!("Login successful"); - }, - Err(e) => { - println!("Unable to get initial login:\n {:#?}", e); - } - }; - let _ = this.community_ids.load(&this.auth, &this.config.instance).await; + let now = Utc::now; + + let mut config_reload_duration = this.config.config_reload.unwrap_or(360); + let mut config_reload_time = now() - Duration::minutes(config_reload_duration as i64 + 1); // Setting this to be in the future by default prevents unneeded code duplication loop { - this.idle().await; - this.message_queue = vec![]; - match this.run_once(&mut old).await { - Ok(_) => {} - Err(e) => panic!("Crashed due to Error: {:#?}", e), - }; *bot.lock().await = this.clone(); + this.start_time = now(); + + this.idle().await; + + // After configured time passed reload config + if now() - config_reload_time >= Duration::minutes(config_reload_duration as i64) { + this.config.load(); + this.secrets.load(); + + config_reload_duration = this.config.config_reload.unwrap_or(360); + + let _ = this + .community_ids + .load(&this.auth, &this.config.instance) + .await; + + this.message_queue + .push(format!("Config Reloaded at {}", now().naive_local())); + config_reload_time = now(); + } + // Check if Login token is valid, if it is not get a new one + while this.login_error { + let _ = this.login().await; + } + + // Perform Run + // Start the polling process + // Get all feed URLs (use cache) + let queue_data = match this + .config + .check_feeds(&this.post_history, &this.community_ids, &this.auth) + .await + { + Ok(data) => data, + Err(e) => { + this.error_queue.push(format!("{}", e)); + continue; + } + }; + + this.message_queue + .push(format!("Checked Feeds at {}", now().naive_local())); + + for queued_post in queue_data { + let (post, post_metadata) = queued_post; + this.message_queue.push(format!("Posting: {}", post.name)); + + // Perform Post and Pins + let post_data = match this.post(&post).await { + Ok(data) => data, + Err(e) => { + this.error_queue.push(format!("{}", e)); + continue; + } + }; + let _ = this + .pin_post(&post_data.post.id, &post_data.community.id) + .await; + + // Update Post History + let mut index_exists = false; + if this.post_history.len() > post_metadata.id { + index_exists = true; + } + + if let Some(part_slug) = post_metadata.part { + match index_exists { + true => { + this.post_history[post_metadata.id].last_part_slug = Some(part_slug); + this.post_history[post_metadata.id].last_part_time = Some(now().to_string()); + } + false => { + let new_history = PrevPost { + id: post_metadata.id, + last_volume_slug: None, + last_volume_time: None, + last_part_slug: Some(part_slug), + last_part_time: Some(now().to_string()), + }; + this.post_history.push(new_history); + } + } + } + + if let Some(volume_slug) = post_metadata.volume { + match index_exists { + true => { + this.post_history[post_metadata.id].last_volume_slug = Some(volume_slug); + this.post_history[post_metadata.id].last_volume_time = Some(now().to_string()); + } + false => { + let new_history = PrevPost { + id: post_metadata.id, + last_volume_slug: Some(volume_slug), + last_volume_time: Some(now().to_string()), + last_part_slug: None, + last_part_time: None, + }; + this.post_history.push(new_history); + } + }; + } + } + + PrevPost::save(&this.post_history); + + // Fix Queue Lengths and Update Mutex Data + while this.message_queue.len() > 5 { + this.message_queue.remove(0); + } + while this.error_queue.len() > 10 { + this.error_queue.remove(0); + } + this.idle().await; } } async fn print_info(shutdown: Arc, bot: Arc>) { while !shutdown.load(Ordering::Relaxed) { - let bot: tokio::sync::MutexGuard<'_, Bot> = bot.lock().await; - sleep(Duration::from_millis(500)).await; + let snapshot: tokio::sync::MutexGuard<'_, Bot> = bot.lock().await; + sleep(Duration::milliseconds(200).to_std().unwrap()).await; print!("\x1B[2J\x1B[1;1H"); println!( "##[Ascendance of a Bookworm Bot]## | Time: {}", Utc::now().naive_local().format("%H:%M:%S") ); - println!("Instance: {}", bot.config.instance); - println!("Ran Last: {}", bot.start_time.format("%d/%m/%Y %H:%M:%S")); - println!("{:#<1$}", "", 30); - bot.post_history.iter().for_each(|post| { - print!("| -- |"); - print!("{} ", post.title); - print!("{:<1$}| ", "", 60 - post.title.len()); - println!("{}", post.last_post_url); + println!("Instance: {}", snapshot.config.instance); + println!( + "Ran Last: {}", + snapshot + .start_time + .naive_local() + .format("%d/%m/%Y %H:%M:%S") + ); + println!("{:#<1$}", "", 175); + snapshot.post_history.iter().for_each(|post| { + if post.last_part_time.is_some() && post.last_volume_time.is_some() { + let part_time = post.last_part_time.clone().unwrap(); + let volume_time = post.last_volume_time.clone().unwrap(); + + let parsed_part_time = DateTime::::from_str(&part_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); + + let parsed_volume_time = DateTime::::from_str(&volume_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); + + let formatted_time; + if parsed_part_time > parsed_volume_time { + formatted_time = parsed_part_time; + } + else { + formatted_time = parsed_volume_time; + } + + print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); + } + else if post.last_part_time.is_some() { + let part_time = post.last_part_time.clone().unwrap(); + let formatted_time: NaiveDateTime = DateTime::::from_str(&part_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); + + print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); + } + else if post.last_volume_time.is_some() { + let volume_time = post.last_volume_time.clone().unwrap(); + let formatted_time = DateTime::::from_str(&volume_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); + + print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); + } + else { + print!("| {:<1$} |", "", 19); + } + + print!("{:<1$}", "", 2 - post.id.to_string().len()); + print!("{}| ", post.id); + print!("{}", post.last_part_slug.clone().unwrap_or("N/A".to_string())); + print!("{:<1$}| ", "", 75 - post.last_part_slug.clone().unwrap_or("N/A".to_string()).len()); + print!("{}", post.last_volume_slug.clone().unwrap_or("N/A".to_string())); + println!("{:<1$}| ", "", 70 - post.last_volume_slug.clone().unwrap_or("N/A".to_string()).len()); }); - bot.message_queue.iter().for_each(|message| { + println!("{:#<1$}", "", 175); + for error in snapshot.error_queue.iter() { + println!("{}", error); + } + println!("{:#<1$}", "", 175); + for message in snapshot.message_queue.iter() { println!("{}", message); - }) + } } } @@ -357,7 +494,7 @@ async fn main() { loop { println!("Starting AoB Bot..."); - + let shutdown_clone = shutdown.clone(); let bot = Arc::new(Mutex::new(Bot::new())); @@ -373,8 +510,6 @@ async fn main() { println!("Bot crashed due to unknown Error, restarting thread after wait..."); - sleep(Duration::from_secs(30)).await; + sleep(Duration::seconds(10).to_std().unwrap()).await; } - - }