Added logging, added quiet flag

This commit is contained in:
Firq 2024-10-14 20:23:09 +02:00
parent db406adfdc
commit 87e886ea0c
Signed by: Firq
GPG key ID: 3ACC61C8CEC83C20
9 changed files with 84 additions and 19 deletions

View file

@ -1,7 +1,10 @@
import logging
from typing import Annotated, List, NotRequired, Tuple, TypedDict from typing import Annotated, List, NotRequired, Tuple, TypedDict
import requests import requests
from ..config import AtlasDefaults, Paths, ExpressionDefaults from ..config import AtlasDefaults, Paths, ExpressionDefaults, Logging
LOGGER = logging.getLogger(Logging.NAME)
class SpritesheetData(TypedDict): class SpritesheetData(TypedDict):
facesize: Tuple[int, int] facesize: Tuple[int, int]
@ -14,9 +17,11 @@ class ExtendData(TypedDict):
def fetch_config(chara_id: str) -> SpritesheetData: def fetch_config(chara_id: str) -> SpritesheetData:
url = f"https://api.atlasacademy.io/raw/JP/svtScript?charaId={chara_id}" url = f"https://api.atlasacademy.io/raw/JP/svtScript?charaId={chara_id}"
LOGGER.debug(f"Loading data for {url}")
response = requests.get(url, timeout=AtlasDefaults.TIMEOUT) response = requests.get(url, timeout=AtlasDefaults.TIMEOUT)
LOGGER.debug(f"{response.status_code} - {response.text}")
if not response.ok: if not response.ok:
raise ValueError(f"{response.status_code} - {response.text}") raise ValueError()
resp_data = response.json()[0] resp_data = response.json()[0]
extend_data: ExtendData = resp_data["extendData"] extend_data: ExtendData = resp_data["extendData"]
@ -33,6 +38,7 @@ def fetch_config(chara_id: str) -> SpritesheetData:
"position": position "position": position
} }
LOGGER.debug(returndata)
return returndata return returndata
def fetch_mstsvtjson(): def fetch_mstsvtjson():
@ -40,12 +46,14 @@ def fetch_mstsvtjson():
filelocation = Paths.IMAGES / "mstsvt.json" filelocation = Paths.IMAGES / "mstsvt.json"
if filelocation.exists(): if filelocation.exists():
print("Found cached asset for mstsvt.json") LOGGER.info("Found cached asset for mstsvt.json")
return return
LOGGER.debug(f"Loading data for {url}")
with open(filelocation, 'wb') as handle: with open(filelocation, 'wb') as handle:
response = requests.get(url, stream=True, timeout=AtlasDefaults.TIMEOUT) response = requests.get(url, stream=True, timeout=AtlasDefaults.TIMEOUT)
status = response.status_code status = response.status_code
LOGGER.debug(f"{response.status_code} - {response.text}")
if status != 200: if status != 200:
raise ValueError("Could not fetch mstsvnt.json from atlas - please check your network connection") raise ValueError("Could not fetch mstsvnt.json from atlas - please check your network connection")
for block in response.iter_content(1024): for block in response.iter_content(1024):
@ -72,23 +80,25 @@ def fetch_expression_sheets(basefolder: str, imageid: str):
postfix = f"f{idx}" postfix = f"f{idx}"
if filelocation.exists(): if filelocation.exists():
print(f"Found cached asset for {imageid}{postfix}.png") LOGGER.info(f"Found cached asset for {imageid}{postfix}.png")
idx += 1 idx += 1
continue continue
filename = f"{imageid}{postfix}.png" filename = f"{imageid}{postfix}.png"
atlasurl = f"{atlasurl_base}/{filename}" atlasurl = f"{atlasurl_base}/{filename}"
LOGGER.debug(f"Loading data for {atlasurl}")
with open(filelocation, 'wb') as handle: with open(filelocation, 'wb') as handle:
response = requests.get(atlasurl, stream=True, timeout=AtlasDefaults.TIMEOUT) response = requests.get(atlasurl, stream=True, timeout=AtlasDefaults.TIMEOUT)
status = response.status_code status = response.status_code
LOGGER.debug(f"{response.status_code} - {response.text}")
if status != 200: if status != 200:
continue continue
for block in response.iter_content(1024): for block in response.iter_content(1024):
if not block: if not block:
break break
handle.write(block) handle.write(block)
print(f"Finished downloading {filename}") LOGGER.info(f"Finished downloading {filename}")
idx += 1 idx += 1
p = savefolder / f"{idx}.png" p = savefolder / f"{idx}.png"
p.unlink(missing_ok=True) p.unlink(missing_ok=True)
@ -99,8 +109,11 @@ def fetch_expression_sheets(basefolder: str, imageid: str):
def fetch_data(servantid: int) -> List[str]: def fetch_data(servantid: int) -> List[str]:
atlasurl = f"https://api.atlasacademy.io/nice/{AtlasDefaults.REGION}/servant/{servantid}?lore=false&lang=en" atlasurl = f"https://api.atlasacademy.io/nice/{AtlasDefaults.REGION}/servant/{servantid}?lore=false&lang=en"
LOGGER.debug(f"Loading data for {atlasurl}")
response = requests.get(atlasurl, timeout=AtlasDefaults.TIMEOUT) response = requests.get(atlasurl, timeout=AtlasDefaults.TIMEOUT)
LOGGER.debug(f"{response.status_code}")
if not response.ok: if not response.ok:
LOGGER.debug(f"{response.status_code} - {response.text}")
raise ValueError(f"{response.status_code} - {response.text}") raise ValueError(f"{response.status_code} - {response.text}")
responsedata = response.json() responsedata = response.json()
@ -108,5 +121,6 @@ def fetch_data(servantid: int) -> List[str]:
charascripts: List[dict[str, str]] = responsedata["charaScripts"] charascripts: List[dict[str, str]] = responsedata["charaScripts"]
chara_ids: List[str] = [chara["id"] for chara in charascripts] chara_ids: List[str] = [chara["id"] for chara in charascripts]
print(f"{svtname} ({servantid}) - {len(chara_ids)} charaIds") LOGGER.debug(chara_ids)
LOGGER.info(f"{svtname} ({servantid}) - {len(chara_ids)} charaIds")
return chara_ids return chara_ids

View file

@ -1,3 +1,4 @@
import logging
import pathlib import pathlib
from typing import List, Optional from typing import List, Optional
from collections import Counter from collections import Counter
@ -5,9 +6,11 @@ from collections import Counter
from PIL import Image from PIL import Image
from tqdm.contrib import itertools as tqdm_itertools from tqdm.contrib import itertools as tqdm_itertools
from ..config import Paths from ..config import Paths, Logging
from .atlas import SpritesheetData, fetch_data, fetch_expression_sheets, fetch_config from .atlas import SpritesheetData, fetch_data, fetch_expression_sheets, fetch_config
LOGGER = logging.getLogger(Logging.NAME)
def compose(input_id: int, filters: Optional[List[str]] = None): def compose(input_id: int, filters: Optional[List[str]] = None):
Paths.IMAGES.mkdir(exist_ok=True) Paths.IMAGES.mkdir(exist_ok=True)
Paths.OUTPUT.mkdir(exist_ok=True) Paths.OUTPUT.mkdir(exist_ok=True)
@ -16,7 +19,7 @@ def compose(input_id: int, filters: Optional[List[str]] = None):
chara_ids = fetch_data(input_id) chara_ids = fetch_data(input_id)
savefolder = Paths.OUTPUT / str(input_id) savefolder = Paths.OUTPUT / str(input_id)
else: else:
print(f"Processing manually uploaded charaId {input_id}") LOGGER.info(f"Processing manually uploaded charaId {input_id}")
savefolder = Paths.OUTPUT / "manual" savefolder = Paths.OUTPUT / "manual"
chara_ids = [str(input_id)] chara_ids = [str(input_id)]
@ -25,21 +28,26 @@ def compose(input_id: int, filters: Optional[List[str]] = None):
if filters is not None: if filters is not None:
chara_ids = [ v for v in chara_ids if v in filters ] chara_ids = [ v for v in chara_ids if v in filters ]
LOGGER.debug(chara_ids)
for char_id in chara_ids: for char_id in chara_ids:
expfolder = fetch_expression_sheets(savefolder.stem, char_id) expfolder = fetch_expression_sheets(savefolder.stem, char_id)
config = fetch_config(char_id) config = fetch_config(char_id)
process_sprite(expfolder, config, savefolder) process_sprite(expfolder, config, savefolder)
print(f"Files have been saved at: {savefolder.absolute()}") LOGGER.info(f"Files have been saved at: {savefolder.absolute()}")
def calculate_counts(width: int, height: int, facesize: tuple[int, int]): def calculate_counts(width: int, height: int, facesize: tuple[int, int]):
return height // facesize[1], width // facesize[0] rowcount, colcount = height // facesize[1], width // facesize[0]
LOGGER.debug(f"{height} | {facesize[1]} --> {rowcount}")
LOGGER.debug(f"{width} | {facesize[0]} --> {colcount}")
return rowcount, colcount
def gen_main_sprite(folder: pathlib.Path): def gen_main_sprite(folder: pathlib.Path):
image = Image.open(folder / "0.png") image = Image.open(folder / "0.png")
width, height = image.size width, height = image.size
LOGGER.debug(f"Main sprite ({folder}): {width}:{height}")
return image.crop((0, 0, width, height - 256)) return image.crop((0, 0, width, height - 256))
def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, outputfolder: pathlib.Path): def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, outputfolder: pathlib.Path):
@ -47,6 +55,7 @@ def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, out
image_idx = save_sprite(main_sprite, outputfolder, f"{images_folder.stem}") image_idx = save_sprite(main_sprite, outputfolder, f"{images_folder.stem}")
for i in images_folder.iterdir(): for i in images_folder.iterdir():
LOGGER.debug(f"Idx: {image_idx}")
initial_row = 0 initial_row = 0
expressions = Image.open(i) expressions = Image.open(i)
@ -62,6 +71,7 @@ def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, out
img = generate_sprite(main_sprite, expressions, x, y, configdata) img = generate_sprite(main_sprite, expressions, x, y, configdata)
if img is not None: if img is not None:
image_idx = save_sprite(img, outputfolder, f"{images_folder.stem}", image_idx) image_idx = save_sprite(img, outputfolder, f"{images_folder.stem}", image_idx)
LOGGER.debug(f"{x}/{y} - {'Invalid' if img is None else 'Valid'} image")
def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int, col: int, configdata: SpritesheetData) -> Image.Image | None: def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int, col: int, configdata: SpritesheetData) -> Image.Image | None:
@ -72,9 +82,11 @@ def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int
(col + 1) * facesize[0], (col + 1) * facesize[0],
(row + 1) * facesize[1] (row + 1) * facesize[1]
) )
LOGGER.debug(roi)
expression = expressions.crop(roi) expression = expressions.crop(roi)
if is_empty(expression): if is_empty(expression):
LOGGER.debug("Image empty")
return None return None
mask = Image.new("RGBA", (facesize[0], facesize[1]), (255,255,255,255)) mask = Image.new("RGBA", (facesize[0], facesize[1]), (255,255,255,255))
@ -92,11 +104,11 @@ def save_sprite(image: Image.Image, outputfolder: pathlib.Path, name: str, idx:
with open(outfile, 'wb') as file: with open(outfile, 'wb') as file:
image.save(file) image.save(file)
idx += 1 return idx + 1
return idx
def is_empty(img: Image.Image): def is_empty(img: Image.Image):
data = Counter(img.crop((96, 96, 160, 160)).convert('LA').getdata()) data = Counter(img.crop((96, 96, 160, 160)).convert('LA').getdata())
LOGGER.debug(f"Counts: {len(data)}")
if len(data) < 10: if len(data) < 10:
return True return True
return False return False

View file

@ -1,13 +1,16 @@
import argparse import argparse
import logging
import pathlib import pathlib
import sys import sys
from typing import List from typing import List
from .. import __version__ from .. import __version__
from ..backend import compose from ..backend import compose
from ..config import Paths, AtlasDefaults from ..config import Paths, AtlasDefaults, Logging
from ..utils.filesystem import rmdir from ..utils.filesystem import rmdir
from ..utils.disables import disable_tqdm
LOGGER = logging.getLogger(Logging.NAME)
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class Arguments(argparse.Namespace): class Arguments(argparse.Namespace):
@ -19,6 +22,7 @@ class Arguments(argparse.Namespace):
cacheclear: bool cacheclear: bool
filter: List[str] filter: List[str]
timeout: int timeout: int
quiet: bool
def parse_arguments(): def parse_arguments():
""" """
@ -36,6 +40,7 @@ def parse_arguments():
parser.add_argument("--filter", action="extend", nargs="+", dest="filter", help='Specify one or more spritesheet ids that will be fetched') parser.add_argument("--filter", action="extend", nargs="+", dest="filter", help='Specify one or more spritesheet ids that will be fetched')
parser.add_argument("--timeout", action="store", type=int, default=None, dest="timeout", help="Set the timeout for all requests towards AtlasAcademy, default is 10s") parser.add_argument("--timeout", action="store", type=int, default=None, dest="timeout", help="Set the timeout for all requests towards AtlasAcademy, default is 10s")
parser.add_argument("--clear-cache", action="store_true", default=False, dest="cacheclear", help="Clear cached assets before downloading files") parser.add_argument("--clear-cache", action="store_true", default=False, dest="cacheclear", help="Clear cached assets before downloading files")
parser.add_argument("--quiet", "-q", action="store_true", default=False, dest="quiet", help="Disable logging output")
args = Arguments() args = Arguments()
@ -56,7 +61,11 @@ def run_cli():
if args.timeout and args.timeout >= 0: if args.timeout and args.timeout >= 0:
AtlasDefaults.TIMEOUT = args.timeout AtlasDefaults.TIMEOUT = args.timeout
welcome() if args.quiet:
disable_tqdm()
LOGGER.disabled = True
else:
welcome()
input_id = args.id input_id = args.id
if not input_id: if not input_id:
@ -67,7 +76,7 @@ def run_cli():
if t <= 0: if t <= 0:
raise ValueError raise ValueError
except ValueError: except ValueError:
print("Servant ID has to be a valid integer above 0") LOGGER.error("Servant ID has to be a valid integer above 0")
sys.exit(1) sys.exit(1)
input_id = int(input_id) input_id = int(input_id)
@ -78,8 +87,8 @@ def run_cli():
cachepath = Paths.IMAGES / "manual" / str(input_id) cachepath = Paths.IMAGES / "manual" / str(input_id)
if cachepath.exists(): if cachepath.exists():
rmdir(cachepath) rmdir(cachepath)
print("Successfully cleared cached assets") LOGGER.info("Successfully cleared cached assets")
else: else:
print("No cache to clear was found, continuing") LOGGER.info("No cache to clear was found, continuing")
compose(input_id, args.filter) compose(input_id, args.filter)

View file

@ -1 +1 @@
from .config import AtlasDefaults, ExpressionDefaults, Paths from .config import AtlasDefaults, ExpressionDefaults, Logging, Paths

View file

@ -1,6 +1,11 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import os
import pathlib import pathlib
class Logging:
_level = os.environ.get("AIC_STDOUT_LEVEL", "info")
LEVEL = int(_level) if _level.isdigit() else _level.upper()
NAME = "atlasimagecomposer"
class Paths: class Paths:
_root = pathlib.Path(__file__).parents[1] _root = pathlib.Path(__file__).parents[1]

View file

@ -0,0 +1,2 @@
from .logger import init_logger
LOGGER = init_logger()

View file

@ -0,0 +1,4 @@
def disable_tqdm():
from tqdm import tqdm
from functools import partialmethod
tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)

View file

@ -0,0 +1,18 @@
import logging
import sys
from ..config import Logging
from .disables import disable_tqdm
def init_logger():
if Logging.LEVEL == "DEBUG":
disable_tqdm()
logger = logging.getLogger(Logging.NAME)
logger.setLevel(Logging.LEVEL)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('[%(levelname)s] [%(name)s] %(asctime)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

View file

@ -1,6 +1,6 @@
[project] [project]
name = "atlasimagecomposer" name = "atlasimagecomposer"
version = "0.1.0-c.3" version = "0.1.0-c.4"
requires-python = ">= 3.10" requires-python = ">= 3.10"
authors = [{name = "Firq", email = "firelp42@gmail.com"}] authors = [{name = "Firq", email = "firelp42@gmail.com"}]
maintainers = [{name = "Firq", email = "firelp42@gmail.com"}] maintainers = [{name = "Firq", email = "firelp42@gmail.com"}]
@ -45,6 +45,7 @@ disable = [
"missing-module-docstring", "missing-module-docstring",
"missing-function-docstring", "missing-function-docstring",
"missing-class-docstring", "missing-class-docstring",
"logging-fstring-interpolation",
] ]
[tool.mypy] [tool.mypy]