use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use config::{CommunitiesVector, Config, LemmyCommunities, PrevPost, Secrets}; use lemmy_api_common::{ lemmy_db_views::structs::PostView, person::{Login, LoginResponse}, post::{CreatePost, FeaturePost, GetPosts, GetPostsResponse}, sensitive::Sensitive, }; use lemmy_db_schema::newtypes::PostId; use lemmy_db_schema::{newtypes::CommunityId, ListingType, PostFeatureType, SortType}; use once_cell::sync::Lazy; use reqwest::{Client, StatusCode}; use std::{ collections::HashMap, error::Error, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, vec, }; use std::str::FromStr; use std::time::UNIX_EPOCH; use tokio::sync::Mutex; use tokio::time::sleep; mod config; mod feeds; pub static CLIENT: Lazy = Lazy::new(|| { let client = Client::builder() .timeout(Duration::seconds(30).to_std().unwrap()) .connect_timeout(Duration::seconds(30).to_std().unwrap()) .build() .expect("build client"); client }); struct PostQueueMetadata { id: usize, series: String, part: Option, volume: Option, } #[derive(Clone)] struct Bot { secrets: Secrets, config: Config, post_history: Vec, community_ids: CommunitiesVector, auth: Sensitive, login_error: bool, start_time: DateTime, message_queue: Vec, error_queue: Vec, } impl Bot { pub(crate) fn new() -> Bot { Bot { secrets: Secrets::init(), config: Config::init(), post_history: PrevPost::load(), community_ids: CommunitiesVector::new(), auth: Sensitive::new("".to_string()), login_error: true, start_time: Utc::now(), message_queue: vec![], error_queue: vec![], } } /// Get JWT Token /// /// * `return` : Returns true if token was successfully retrieved, false otherwise #[warn(unused_results)] pub(crate) async fn login(&mut self) -> Result, Box> { let login_params = Login { username_or_email: self.secrets.lemmy.get_username(), password: self.secrets.lemmy.get_password(), totp_2fa_token: None, }; let res = CLIENT .post(self.config.instance.clone() + "/api/v3/user/login") .json(&login_params) .send() .await?; return if res.status() == StatusCode::OK { let data: &LoginResponse = &res.json().await.unwrap(); let jwt = match data.jwt.clone() { Some(data) => data, None => { self.error_queue.push(format!("Error: Missing JWT Token")); return Err(Box::try_from(format!("Error: Missing JWT Token")).unwrap()); } }; self.auth = jwt.clone(); self.login_error = false; Ok(jwt) } else { self.error_queue .push(format!("Error Code: {:?}", res.status())); Err(Box::new(res.error_for_status().unwrap_err())) }; } /// Make Post to Lemmy Instance /// /// * `post_data` : Object of type [CreatePost] containing post info /// * `return` : Returns true if Post was successful, false otherwise #[warn(unused_results)] pub(crate) async fn post(&mut self, post_data: &CreatePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post") .json(post_data) .send() .await?; // TODO: process res to get info about if post was successfully (mostly if jwt token was valid) let ret: PostView = serde_json::from_str::>(res.text().await?.as_str())? .remove("post_view") .unwrap(); return Ok(ret); } #[warn(unused_results)] pub(crate) async fn pin_post(&mut self, new_post_id: &PostId, new_post_community: &CommunityId) -> Result<(bool, bool), Box> { let mut local_pins = true; let mut community_pins = true; // Unpin Old Posts // Get Local Posts & Unpin The Other Post let mut meta_community: CommunityId = CommunityId(15); self.community_ids.ids.iter().for_each(|(id, name)| { if name == &LemmyCommunities::metadiscussions.to_string() { meta_community = id.clone(); } }); let get_params = GetPosts { auth: Some(self.auth.clone()), ..Default::default() }; local_pins = self .unpin_old_posts(get_params, PostFeatureType::Local, &meta_community) .await?; // Get Community Posts & Unpin The Other Post let get_params = GetPosts { auth: Some(self.auth.clone()), community_id: Some(new_post_community.clone()), ..Default::default() }; community_pins = self .unpin_old_posts(get_params, PostFeatureType::Community, &meta_community) .await?; // Pin New Post let pin_new_community = FeaturePost { post_id: new_post_id.clone(), featured: true, feature_type: PostFeatureType::Community, auth: self.auth.clone(), }; match self.pin(pin_new_community).await { Ok(_) => {} Err(e) => { self.message_queue .push(format!("Error Unpinning Post: {:#?}", e)); community_pins = false; } }; let pin_new_local = FeaturePost { post_id: new_post_id.clone(), featured: true, feature_type: PostFeatureType::Local, auth: self.auth.clone(), }; match self.pin(pin_new_local).await { Ok(_) => {} Err(e) => { self.message_queue .push(format!("Error Unpinning Post: {:#?}", e)); local_pins = false; } }; return Ok((community_pins, local_pins)); } #[warn(unused_results)] pub(crate) async fn unpin_old_posts( &mut self, get_params: GetPosts, pin_scope: PostFeatureType, meta_community: &CommunityId, ) -> Result> { let post_list_json = CLIENT .get(self.config.instance.clone() + "/api/v3/post/list") .query(&get_params) .send() .await? .text() .await?; let post_list: GetPostsResponse = serde_json::from_str(post_list_json.as_str()).unwrap(); for post_view in post_list.posts { if &post_view.community.id != meta_community && post_view.post.featured_local { let remove_local_pin = FeaturePost { post_id: post_view.post.id, featured: false, feature_type: pin_scope, auth: self.auth.clone(), }; match self.pin(remove_local_pin).await { Ok(_) => {} Err(e) => { self.message_queue .push(format!("Error Unpinning Post: {:#?}", e)); return Err(Box::from(format!("{}", e))); } }; } } Ok(true) } pub(crate) async fn pin(&mut self, pin_data: FeaturePost) -> Result> { let res = CLIENT .post(self.config.instance.clone() + "/api/v3/post/feature") .json(&pin_data) .send() .await?; let ret: PostView = serde_json::from_str::>(res.text().await?.as_str())? .remove("post_view") .unwrap(); return Ok(ret.post.featured_local); } pub(crate) async fn idle(&mut self) { let mut sleep_duration = Duration::seconds(30); if Utc::now() - self.start_time > sleep_duration { sleep_duration = Duration::seconds(60); } match reqwest::get("https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=").await { Ok(_) => {} Err(err) => self.error_queue.push(format!("{}", err)), }; while Utc::now() - self.start_time < sleep_duration { sleep(Duration::milliseconds(100).to_std().unwrap()).await; } } } async fn list_posts(auth: &Sensitive, base: String) -> GetPostsResponse { let params = GetPosts { type_: Some(ListingType::Local), sort: Some(SortType::New), auth: Some(auth.clone()), ..Default::default() }; let res = CLIENT .get(base + "/api/v3/post/list") .query(¶ms) .send() .await .unwrap() .text() .await .unwrap(); return serde_json::from_str(&res).unwrap(); } async fn run_bot(bot: Arc>) { let mut this = bot.lock().await.clone(); let now = Utc::now; let mut config_reload_duration = this.config.config_reload.unwrap_or(360); let mut config_reload_time = now() - Duration::minutes(config_reload_duration as i64 + 1); // Setting this to be in the future by default prevents unneeded code duplication loop { *bot.lock().await = this.clone(); this.start_time = now(); this.idle().await; // After configured time passed reload config if now() - config_reload_time >= Duration::minutes(config_reload_duration as i64) { this.config.load(); this.secrets.load(); config_reload_duration = this.config.config_reload.unwrap_or(360); let _ = this .community_ids .load(&this.auth, &this.config.instance) .await; this.message_queue .push(format!("Config Reloaded at {}", now().naive_local())); config_reload_time = now(); } // Check if Login token is valid, if it is not get a new one while this.login_error { let _ = this.login().await; } // Perform Run // Start the polling process // Get all feed URLs (use cache) let queue_data = match this .config .check_feeds(&this.post_history, &this.community_ids, &this.auth) .await { Ok(data) => data, Err(e) => { this.error_queue.push(format!("{}", e)); continue; } }; this.message_queue .push(format!("Checked Feeds at {}", now().naive_local())); for queued_post in queue_data { let (post, post_metadata) = queued_post; this.message_queue.push(format!("Posting: {}", post.name)); // Perform Post and Pins let post_data = match this.post(&post).await { Ok(data) => data, Err(e) => { this.error_queue.push(format!("{}", e)); continue; } }; let _ = this .pin_post(&post_data.post.id, &post_data.community.id) .await; // Update Post History let mut index_exists = false; if this.post_history.len() > post_metadata.id { index_exists = true; } if let Some(part_slug) = post_metadata.part { match index_exists { true => { this.post_history[post_metadata.id].last_part_slug = Some(part_slug); this.post_history[post_metadata.id].last_part_time = Some(now().to_string()); } false => { let new_history = PrevPost { id: post_metadata.id, last_volume_slug: None, last_volume_time: None, last_part_slug: Some(part_slug), last_part_time: Some(now().to_string()), }; this.post_history.push(new_history); } } } if let Some(volume_slug) = post_metadata.volume { match index_exists { true => { this.post_history[post_metadata.id].last_volume_slug = Some(volume_slug); this.post_history[post_metadata.id].last_volume_time = Some(now().to_string()); } false => { let new_history = PrevPost { id: post_metadata.id, last_volume_slug: Some(volume_slug), last_volume_time: Some(now().to_string()), last_part_slug: None, last_part_time: None, }; this.post_history.push(new_history); } }; } } PrevPost::save(&this.post_history); // Fix Queue Lengths and Update Mutex Data while this.message_queue.len() > 5 { this.message_queue.remove(0); } while this.error_queue.len() > 10 { this.error_queue.remove(0); } this.idle().await; } } async fn print_info(shutdown: Arc, bot: Arc>) { while !shutdown.load(Ordering::Relaxed) { let snapshot: tokio::sync::MutexGuard<'_, Bot> = bot.lock().await; sleep(Duration::milliseconds(200).to_std().unwrap()).await; print!("\x1B[2J\x1B[1;1H"); println!( "##[Ascendance of a Bookworm Bot]## | Time: {}", Utc::now().naive_local().format("%H:%M:%S") ); println!("Instance: {}", snapshot.config.instance); println!( "Ran Last: {}", snapshot .start_time .naive_local() .format("%d/%m/%Y %H:%M:%S") ); println!("{:#<1$}", "", 175); snapshot.post_history.iter().for_each(|post| { if post.last_part_time.is_some() && post.last_volume_time.is_some() { let part_time = post.last_part_time.clone().unwrap(); let volume_time = post.last_volume_time.clone().unwrap(); let parsed_part_time = DateTime::::from_str(&part_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); let parsed_volume_time = DateTime::::from_str(&volume_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); let formatted_time; if parsed_part_time > parsed_volume_time { formatted_time = parsed_part_time; } else { formatted_time = parsed_volume_time; } print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); } else if post.last_part_time.is_some() { let part_time = post.last_part_time.clone().unwrap(); let formatted_time: NaiveDateTime = DateTime::::from_str(&part_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); } else if post.last_volume_time.is_some() { let volume_time = post.last_volume_time.clone().unwrap(); let formatted_time = DateTime::::from_str(&volume_time).unwrap_or(DateTime::::from(UNIX_EPOCH)).naive_local(); print!("| {} |", formatted_time.format("%d/%m/%Y %H:%M:%S")); } else { print!("| {:<1$} |", "", 19); } print!("{:<1$}", "", 2 - post.id.to_string().len()); print!("{}| ", post.id); print!("{}", post.last_part_slug.clone().unwrap_or("N/A".to_string())); print!("{:<1$}| ", "", 75 - post.last_part_slug.clone().unwrap_or("N/A".to_string()).len()); print!("{}", post.last_volume_slug.clone().unwrap_or("N/A".to_string())); println!("{:<1$}| ", "", 70 - post.last_volume_slug.clone().unwrap_or("N/A".to_string()).len()); }); println!("{:#<1$}", "", 175); for error in snapshot.error_queue.iter() { println!("{}", error); } println!("{:#<1$}", "", 175); for message in snapshot.message_queue.iter() { println!("{}", message); } } } #[tokio::main] async fn main() { let shutdown = Arc::new(AtomicBool::new(false)); loop { println!("Starting AoB Bot..."); let shutdown_clone = shutdown.clone(); let bot = Arc::new(Mutex::new(Bot::new())); let bot_clone = bot.clone(); let bot_thread = tokio::spawn(async move { run_bot(bot).await }); let tui_thread = tokio::spawn(async move { print_info(shutdown_clone, bot_clone).await }); let _ = bot_thread.await; tui_thread.abort(); println!("Bot crashed due to unknown Error, restarting thread after wait..."); sleep(Duration::seconds(10).to_std().unwrap()).await; } }