Threading Implementation for higher stability

This commit is contained in:
Neshura 2023-08-31 23:36:37 +02:00
parent 8b3e6a8380
commit 211b44978a
Signed by: Neshura
GPG key ID: B6983AAA6B9A7A6C
5 changed files with 244 additions and 159 deletions

12
Cargo.lock generated
View file

@ -301,6 +301,7 @@ dependencies = [
"serde_derive",
"serde_json",
"strum_macros",
"tokio",
"url",
]
@ -1283,9 +1284,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.10"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05"
[[package]]
name = "pin-utils"
@ -1880,11 +1881,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.29.1"
version = "1.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
@ -1893,7 +1893,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.4.9",
"socket2 0.5.3",
"tokio-macros",
"windows-sys",
]

View file

@ -15,4 +15,5 @@ serde = "1.0.164"
serde_derive = "1.0.164"
serde_json = "1.0.97"
strum_macros = "0.25.0"
tokio = "1.32.0"
url = "2.4.0"

6
deploy.sh Normal file
View file

@ -0,0 +1,6 @@
#!/bin/bash
## deploy to machine as automod.new
## stop automod service
## mv automod.new to automod
## restart automod service
## idea: websocket event?

View file

@ -1,11 +1,25 @@
use std::{fs::{self, OpenOptions}, path::Path, io::Write, thread::sleep, time, error::Error};
use std::{
error::Error,
fs::{self, OpenOptions},
io::Write,
path::Path,
thread::sleep,
time,
};
use lemmy_api_common::{sensitive::Sensitive, post::CreatePost, community::{ListCommunities, ListCommunitiesResponse}};
use lemmy_db_schema::{newtypes::{LanguageId, CommunityId, PostId}, ListingType};
use lemmy_api_common::{
community::{ListCommunities, ListCommunitiesResponse},
post::CreatePost,
sensitive::Sensitive,
};
use lemmy_db_schema::{
newtypes::{CommunityId, LanguageId, PostId},
ListingType,
};
use serde_derive::{Deserialize, Serialize};
use url::Url;
use crate::{CLIENT};
use crate::CLIENT;
macro_rules! pub_struct {
($name:ident {$($field:ident: $t:ty,)*}) => {
@ -40,7 +54,7 @@ impl Secrets {
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub(crate) struct LemmyLogin {
pub(crate) username: String,
password: String
password: String,
}
impl LemmyLogin {
@ -49,7 +63,7 @@ impl LemmyLogin {
}
pub(crate) fn get_password(&self) -> Sensitive<String> {
return Sensitive::new(self.password.clone())
return Sensitive::new(self.password.clone());
}
}
@ -97,58 +111,58 @@ impl Config {
self.reddit_config = config_parse.reddit_config;
}
pub(crate) fn check_feeds(&mut self, post_history: &mut Vec<PrevPost>
, community_ids: &CommunitiesVector, auth: &Sensitive<String>) -> Result<Vec<(CreatePost, (Option<usize>, usize, String))>, Box<dyn Error>> {
pub(crate) async fn check_feeds(
&mut self,
post_history: &mut Vec<PrevPost>,
community_ids: &CommunitiesVector,
auth: &Sensitive<String>,
) -> Result<Vec<(CreatePost, (Option<usize>, usize, String))>, Box<dyn Error>> {
let mut post_queue: Vec<(CreatePost, (Option<usize>, usize, String))> = vec![];
match self.feeds.iter().map(|feed| {
let mut i = 0;
while i < self.feeds.len() {
let feed = &self.feeds[i];
let res = CLIENT
.get(feed.feed_url.clone())
.send()?.text()?;
.send()
.await?
.text()
.await?;
let data: FeedData = serde_json::from_str(&res).unwrap();
let mut prev_post_idx: Option<usize> = None;
let mut do_post = true;
post_history
.iter()
.enumerate()
.for_each(|(idx, post)| {
if &post.last_post_url == &data.items[0].url {
do_post = false;
} else if &post.title == &data.title {
prev_post_idx = Some(idx);
}
});
post_history.iter().enumerate().for_each(|(idx, post)| {
if &post.last_post_url == &data.items[0].url {
do_post = false;
} else if &post.title == &data.title {
prev_post_idx = Some(idx);
}
});
if do_post {
let item = &data.items[0];
let new_post = CreatePost {
name: item.title.clone(),
community_id: community_ids.find(&feed.communities.chapter),
url: Some(Url::parse(&item.url).unwrap()),
body: Some(
"[Reddit](https://reddit.com/r/HonzukinoGekokujou)\n\n[Discord](https://discord.com/invite/fGefmzu)".into(),
),
honeypot: None,
nsfw: Some(false),
language_id: Some(LanguageId(37)), // TODO get this id once every few hours per API request, the ordering of IDs suggests that the EN Id might change in the future
auth: auth.clone(),
};
name: item.title.clone(),
community_id: community_ids.find(&feed.communities.chapter),
url: Some(Url::parse(&item.url).unwrap()),
body: Some(
"[Reddit](https://reddit.com/r/HonzukinoGekokujou)\n\n[Discord](https://discord.com/invite/fGefmzu)".into(),
),
honeypot: None,
nsfw: Some(false),
language_id: Some(LanguageId(37)), // TODO get this id once every few hours per API request, the ordering of IDs suggests that the EN Id might change in the future
auth: auth.clone(),
};
let prev_data = (
prev_post_idx,
feed.id,
data.title
);
let prev_data = (prev_post_idx, feed.id, data.title);
post_queue.push((new_post, prev_data));
}
sleep(time::Duration::from_millis(100)); // Should prevent dos-ing J-Novel servers
return Ok(());
}).collect() {
Ok(()) => {}
Err(e) => return Err(e)
i += 1;
}
return Ok(post_queue);
@ -179,7 +193,7 @@ pub(crate) enum LemmyCommunities {
aobprepub,
aoblightnovel,
aobmanga,
metadiscussions
metadiscussions,
}
pub_struct!(FeedRedditSettings {
@ -211,12 +225,10 @@ impl PrevPost {
Err(e) => panic!("ERROR: posts.json could not be parsed:\n\n{:#?}", e),
};
history = history_parse;
}
else {
} else {
history = [].to_vec()
}
}
else {
} else {
let _ = fs::File::create("posts.json");
history = [].to_vec()
}
@ -225,7 +237,12 @@ impl PrevPost {
}
pub(crate) fn save(data: &Vec<PrevPost>) {
let mut file = OpenOptions::new().read(true).write(true).create(true).open("posts.json").unwrap();
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open("posts.json")
.unwrap();
let json_data = serde_json::to_string_pretty(&data).unwrap();
@ -243,9 +260,7 @@ pub_struct!(FeedData {
items: Vec<FeedEntry>,
});
pub_struct!(FeedAuthor {
name: String,
});
pub_struct!(FeedAuthor { name: String, });
pub_struct!(FeedEntry {
id: String,
@ -263,11 +278,15 @@ pub_struct!(CommunitiesVector {
impl CommunitiesVector {
pub(crate) fn new() -> CommunitiesVector {
CommunitiesVector{ids: vec![]}
CommunitiesVector { ids: vec![] }
}
#[warn(unused_results)]
pub(crate) fn load(&mut self, auth: &Sensitive<String>, base: &String) -> Result<(), Box<dyn Error>> {
pub(crate) async fn load(
&mut self,
auth: &Sensitive<String>,
base: &String,
) -> Result<(), Box<dyn Error>> {
let params = ListCommunities {
auth: Some(auth.clone()),
type_: Some(ListingType::Local),
@ -277,7 +296,10 @@ impl CommunitiesVector {
let res = CLIENT
.get(base.clone() + "/api/v3/community/list")
.query(&params)
.send()?.text()?;
.send()
.await?
.text()
.await?;
let site_data: ListCommunitiesResponse = serde_json::from_str(&res).unwrap();
@ -301,6 +323,6 @@ impl CommunitiesVector {
ret_id = id.0;
}
});
return ret_id;
return ret_id;
}
}

View file

@ -1,21 +1,28 @@
use chrono::{Utc, NaiveDateTime};
use config::{Config, PrevPost, Secrets, CommunitiesVector, LemmyCommunities};
use chrono::{NaiveDateTime, Utc};
use config::{CommunitiesVector, Config, LemmyCommunities, PrevPost, Secrets};
use lemmy_api_common::{
lemmy_db_views::structs::PostView,
person::{Login, LoginResponse},
post::{CreatePost, GetPosts, GetPostsResponse, FeaturePost},
sensitive::Sensitive, lemmy_db_views::structs::PostView,
};
use lemmy_db_schema::{
ListingType, SortType, PostFeatureType, newtypes::CommunityId,
post::{CreatePost, FeaturePost, GetPosts, GetPostsResponse},
sensitive::Sensitive,
};
use lemmy_db_schema::{newtypes::CommunityId, ListingType, PostFeatureType, SortType};
use once_cell::sync::Lazy;
use reqwest::{blocking::Client, StatusCode};
use std::{thread::sleep, time, collections::HashMap, error::Error};
use reqwest::{Client, StatusCode};
use std::{
collections::HashMap,
error::Error,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep},
time::{self, Duration},
};
use tokio::sync::Mutex;
mod config;
pub static CLIENT: Lazy<Client> = Lazy::new(|| {
let client = Client::builder()
.timeout(time::Duration::from_secs(30))
@ -25,6 +32,7 @@ pub static CLIENT: Lazy<Client> = Lazy::new(|| {
client
});
#[derive(Clone)]
struct Bot {
secrets: Secrets,
config: Config,
@ -50,7 +58,7 @@ impl Bot {
///
/// * `return` : Returns true if token was succesfully retrieved, false otherwise
#[warn(unused_results)]
pub(crate) fn login(&mut self) -> Result<(), Box<dyn Error>> {
pub(crate) async fn login(&mut self) -> Result<Sensitive<String>, Box<dyn Error>> {
let login_params = Login {
username_or_email: self.secrets.lemmy.get_username(),
password: self.secrets.lemmy.get_password(),
@ -60,15 +68,14 @@ impl Bot {
let res = CLIENT
.post(self.config.instance.clone() + "/api/v3/user/login")
.json(&login_params)
.send()?;
.send().await?;
if res.status() == StatusCode::OK {
let data: &LoginResponse = &res.json().unwrap();
let data: &LoginResponse = &res.json().await.unwrap();
let jwt = data.jwt.clone().expect("JWT Token could not be acquired");
self.auth = jwt;
return Ok(());
self.auth = jwt.clone();
return Ok(jwt);
} else {
println!("Error Code: {:?}", res.status());
return Err(Box::new(res.error_for_status().unwrap_err()));
@ -80,26 +87,33 @@ impl Bot {
/// * `post_data` : Object of type [CreatePost] containing post info
/// * `return` : Returns true if Post was succesful, false otherwise
#[warn(unused_results)]
pub(crate) fn post(&mut self, post_data: CreatePost) -> Result<PostView, Box<dyn Error>> {
pub(crate) async fn post(&mut self, post_data: CreatePost) -> Result<PostView, Box<dyn Error>> {
let res = CLIENT
.post(self.config.instance.clone() + "/api/v3/post")
.json(&post_data)
.send()?;
.send().await?;
// TODO: process res to get info about if post was successfuly (mostly if jwt token was valid)
let ret: PostView = serde_json::from_str::<HashMap<&str, PostView>>(res.text()?.as_str()).unwrap().remove("post_view").unwrap();
let ret: PostView = serde_json::from_str::<HashMap<&str, PostView>>(res.text().await?.as_str())
.unwrap()
.remove("post_view")
.unwrap();
return Ok(ret);
}
#[warn(unused_results)]
pub(crate) fn pin_new(&mut self, old_post: &Option<usize>, new_post: &PostView) -> Result<(), Box<dyn Error>> {
pub(crate) async fn pin_new(
&mut self,
old_post: &Option<usize>,
new_post: &PostView,
) -> Result<(), Box<dyn Error>> {
match old_post {
Some(id) => {
let remove_community_pin = FeaturePost {
post_id: self.post_history[*id].post_id,
featured: false,
feature_type: PostFeatureType::Community,
auth: self.auth.clone()
auth: self.auth.clone(),
};
let _ = self.pin(remove_community_pin);
@ -119,8 +133,8 @@ impl Bot {
let post_list_json = CLIENT
.get(self.config.instance.clone() + "/api/v3/post/list")
.query(&get_params)
.send()?
.text()?;
.send().await?
.text().await?;
let post_list: GetPostsResponse = serde_json::from_str(post_list_json.as_str()).unwrap();
@ -138,12 +152,12 @@ impl Bot {
post_id: post_view.post.id,
featured: false,
feature_type: PostFeatureType::Local,
auth: self.auth.clone()
auth: self.auth.clone(),
};
match self.pin(remove_local_pin) {
Ok(_) => {},
Err(e) => println!("Error Unpinning Post: {:#?}", e)
match self.pin(remove_local_pin).await {
Ok(_) => {}
Err(e) => println!("Error Unpinning Post: {:#?}", e),
};
}
}
@ -164,56 +178,67 @@ impl Bot {
auth: self.auth.clone(),
};
match self.pin(pin_new_local) {
Ok(_) => {},
Err(e) => println!("Error Pinning Post: {:#?}", e)
};;
match self.pin(pin_new_local).await {
Ok(_) => {}
Err(e) => println!("Error Pinning Post: {:#?}", e),
};
return Ok(());
}
pub(crate) fn pin (&mut self, pin_data: FeaturePost) -> Result<bool, Box<dyn Error>> {
pub(crate) async fn pin(&mut self, pin_data: FeaturePost) -> Result<bool, Box<dyn Error>> {
let res = CLIENT
.post(self.config.instance.clone() + "/api/v3/post/feature")
.json(&pin_data)
.send()?;
.send().await?;
let ret: PostView = serde_json::from_str::<HashMap<&str, PostView>>(res.text()?.as_str()).unwrap().remove("post_view").unwrap();
let ret: PostView = serde_json::from_str::<HashMap<&str, PostView>>(res.text().await?.as_str())
.unwrap()
.remove("post_view")
.unwrap();
return Ok(ret.post.featured_local);
}
#[warn(unused_results)]
pub(crate) fn run_once(&mut self, prev_time: &mut NaiveDateTime) -> Result<(), Box<dyn Error>> {
pub(crate) async fn run_once(&mut self, prev_time: &mut NaiveDateTime) -> Result<(), Box<dyn Error>> {
println!("{:#<1$}", "", 30);
self.start_time = Utc::now().naive_local();
if self.start_time - *prev_time > chrono::Duration::seconds(6) { // Prod should use hours, add command line switch later and read duration from config
if self.start_time - *prev_time > chrono::Duration::seconds(6) {
// Prod should use hours, add command line switch later and read duration from config
println!("Reloading Config");
*prev_time = self.start_time;
self.config.load();
self.community_ids.load(&self.auth, &self.config.instance)?;
self.community_ids.load(&self.auth, &self.config.instance).await?;
println!("Done!");
}
// Start the polling process
// Get all feed URLs (use cache)
println!("Checking Feeds");
let post_queue: Vec<(CreatePost, (Option<usize>, usize, String))> = self.config.check_feeds(&mut self.post_history, &self.community_ids, &self.auth)?;
let post_queue: Vec<(CreatePost, (Option<usize>, usize, String))> = self
.config
.check_feeds(&mut self.post_history, &self.community_ids, &self.auth).await?;
println!("Done!");
let _ = post_queue.iter().map(|(post, (prev_idx, feed_id, feed_title))| -> Result<(), Box<dyn Error>> {
let mut i = 0;
while i < post_queue.len() {
let (post, (prev_idx, feed_id, feed_title)) = &post_queue[i];
println!("Posting: {}", post.name);
let post_data = self.post(post.clone())?;
let post_data = self.post(post.clone()).await?;
self.pin_new(prev_idx, &post_data)?;
self.pin_new(&prev_idx, &post_data).await?;
// Move current post to old post list
match prev_idx {
Some(idx) => {
self.post_history[*idx].title = feed_title.clone();
self.post_history[*idx].post_id = post_data.post.id;
self.post_history[*idx].last_post_url = post.url.clone().unwrap().to_string();
self.post_history[*idx].last_post_url =
post.url.clone().unwrap().to_string();
}
None => self.post_history.push(PrevPost {
id: feed_id.clone(),
@ -222,8 +247,9 @@ impl Bot {
last_post_url: post.url.clone().unwrap().to_string(),
}),
}
Ok(())
}).collect::<Vec<_>>();
i += 1;
}
PrevPost::save(&self.post_history);
@ -240,27 +266,16 @@ impl Bot {
sleep(time::Duration::from_secs(1));
}
match reqwest::blocking::get("https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=") {
Ok(_) => {},
Err(err) => println!("{}", err)
match reqwest::blocking::get(
"https://status.neshweb.net/api/push/7s1CjPPzrV?status=up&msg=OK&ping=",
) {
Ok(_) => {}
Err(err) => println!("{}", err),
};
}
pub(crate) fn print_info(&self) {
print!("\x1B[2J\x1B[1;1H");
println!("##[Ascendance of a Bookworm Bot]##");
println!("Instance: {}", &self.config.instance);
println!("Ran Last: {}", &self.start_time.format("%d/%m/%Y %H:%M:%S"));
println!("{:#<1$}", "", 30);
self.post_history.iter().for_each(|post| {
print!("{} ", post.title);
print!("{:<1$}: ", "", 60 - post.title.len());
println!("{}", post.last_post_url);
})
}
}
fn list_posts(auth: &Sensitive<String>, base: String) -> GetPostsResponse {
async fn list_posts(auth: &Sensitive<String>, base: String) -> GetPostsResponse {
let params = GetPosts {
type_: Some(ListingType::Local),
sort: Some(SortType::New),
@ -271,48 +286,89 @@ fn list_posts(auth: &Sensitive<String>, base: String) -> GetPostsResponse {
let res = CLIENT
.get(base + "/api/v3/post/list")
.query(&params)
.send()
.send().await
.unwrap()
.text()
.text().await
.unwrap();
return serde_json::from_str(&res).unwrap();
}
fn run_bot() {
async fn run_bot(bot: Arc<Mutex<Bot>>) {
// TODO this currently does not update the bot Mutex when run
// Get all needed auth tokens at the start
let mut old = Utc::now().naive_local();
let mut this = Bot::new();
match this.login() {
Ok(_) => {
let _ = this.community_ids.load(&this.auth, &this.config.instance);
// Enter a loop (not for debugging)
loop {
this.idle();
// 3 retries in case of connection issues
//let mut loop_breaker: u8 = 0; // DEBUG disabled for clearer crash finding
match this.run_once(&mut old) {
Ok(_) => {},
Err(e) => panic!("{:#?}", e)
};
/* while !this.run_once(&mut old).is_ok() && loop_breaker <= 3 {
println!("Unable to complete Bot cycle, retrying with fresh login credentials");
if this.login().is_ok() {
let _ = this.community_ids.load(&this.auth, &this.config.instance);
}
sleep(time::Duration::from_secs(10));
loop_breaker += 1;
}; */
this.print_info();
this.idle();
}
let mut this = bot.lock().await.clone();
match this.login().await {
Ok(_) => {
println!("Login successful");
},
Err(e) => {
println!("Unable to get initial login:\n {:#?}", e);
}
};
let _ = this.community_ids.load(&this.auth, &this.config.instance).await;
loop {
this.idle();
println!("Debug A"); // DEBUG
match this.run_once(&mut old).await {
Ok(_) => {}
Err(e) => panic!("Crashed due to Error: {:#?}", e),
};
*bot.lock().await = this.clone();
this.idle();
}
}
fn main() {
run_bot();
async fn print_info(shutdown: Arc<AtomicBool>, bot: Arc<Mutex<Bot>>) {
while !shutdown.load(Ordering::Relaxed) {
let bot: tokio::sync::MutexGuard<'_, Bot> = bot.lock().await;
thread::sleep(Duration::from_millis(500));
//print!("\x1B[2J\x1B[1;1H");
println!(
"##[Ascendance of a Bookworm Bot]## | Time: {}",
Utc::now().naive_local().format("%H:%M:%S")
);
println!("Instance: {}", bot.config.instance);
println!("Ran Last: {}", bot.start_time.format("%d/%m/%Y %H:%M:%S"));
println!("{:#<1$}", "", 30);
bot.post_history.iter().for_each(|post| {
print!("| -- |");
print!("{} ", post.title);
print!("{:<1$}| ", "", 60 - post.title.len());
println!("{}", post.last_post_url);
})
}
}
#[tokio::main]
async fn main() {
let shutdown = Arc::new(AtomicBool::new(false));
loop {
println!("Starting AoB Bot...");
let shutdown_clone = shutdown.clone();
let bot = Arc::new(Mutex::new(Bot::new()));
let bot_clone = bot.clone();
let bot_thread = tokio::spawn(async move { run_bot(bot).await });
let tui_thread = tokio::spawn(async move { print_info(shutdown_clone, bot_clone).await });
let _ = bot_thread.await;
tui_thread.abort();
println!("Bot crashed due to unknown Error, restarting thread after wait...");
sleep(Duration::from_secs(30));
}
}