Added logging, added quiet flag
All checks were successful
All checks were successful
This commit is contained in:
parent
db406adfdc
commit
3fde0f4f08
9 changed files with 84 additions and 19 deletions
|
@ -1,7 +1,10 @@
|
||||||
|
import logging
|
||||||
from typing import Annotated, List, NotRequired, Tuple, TypedDict
|
from typing import Annotated, List, NotRequired, Tuple, TypedDict
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from ..config import AtlasDefaults, Paths, ExpressionDefaults
|
from ..config import AtlasDefaults, Paths, ExpressionDefaults, Logging
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(Logging.NAME)
|
||||||
|
|
||||||
class SpritesheetData(TypedDict):
|
class SpritesheetData(TypedDict):
|
||||||
facesize: Tuple[int, int]
|
facesize: Tuple[int, int]
|
||||||
|
@ -14,9 +17,11 @@ class ExtendData(TypedDict):
|
||||||
def fetch_config(chara_id: str) -> SpritesheetData:
|
def fetch_config(chara_id: str) -> SpritesheetData:
|
||||||
url = f"https://api.atlasacademy.io/raw/JP/svtScript?charaId={chara_id}"
|
url = f"https://api.atlasacademy.io/raw/JP/svtScript?charaId={chara_id}"
|
||||||
|
|
||||||
|
LOGGER.debug(f"Loading data for {url}")
|
||||||
response = requests.get(url, timeout=AtlasDefaults.TIMEOUT)
|
response = requests.get(url, timeout=AtlasDefaults.TIMEOUT)
|
||||||
|
LOGGER.debug(f"{response.status_code} - {response.text}")
|
||||||
if not response.ok:
|
if not response.ok:
|
||||||
raise ValueError(f"{response.status_code} - {response.text}")
|
raise ValueError()
|
||||||
|
|
||||||
resp_data = response.json()[0]
|
resp_data = response.json()[0]
|
||||||
extend_data: ExtendData = resp_data["extendData"]
|
extend_data: ExtendData = resp_data["extendData"]
|
||||||
|
@ -33,6 +38,7 @@ def fetch_config(chara_id: str) -> SpritesheetData:
|
||||||
"position": position
|
"position": position
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOGGER.debug(returndata)
|
||||||
return returndata
|
return returndata
|
||||||
|
|
||||||
def fetch_mstsvtjson():
|
def fetch_mstsvtjson():
|
||||||
|
@ -40,12 +46,14 @@ def fetch_mstsvtjson():
|
||||||
filelocation = Paths.IMAGES / "mstsvt.json"
|
filelocation = Paths.IMAGES / "mstsvt.json"
|
||||||
|
|
||||||
if filelocation.exists():
|
if filelocation.exists():
|
||||||
print("Found cached asset for mstsvt.json")
|
LOGGER.info("Found cached asset for mstsvt.json")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
LOGGER.debug(f"Loading data for {url}")
|
||||||
with open(filelocation, 'wb') as handle:
|
with open(filelocation, 'wb') as handle:
|
||||||
response = requests.get(url, stream=True, timeout=AtlasDefaults.TIMEOUT)
|
response = requests.get(url, stream=True, timeout=AtlasDefaults.TIMEOUT)
|
||||||
status = response.status_code
|
status = response.status_code
|
||||||
|
LOGGER.debug(f"{response.status_code} - {response.text}")
|
||||||
if status != 200:
|
if status != 200:
|
||||||
raise ValueError("Could not fetch mstsvnt.json from atlas - please check your network connection")
|
raise ValueError("Could not fetch mstsvnt.json from atlas - please check your network connection")
|
||||||
for block in response.iter_content(1024):
|
for block in response.iter_content(1024):
|
||||||
|
@ -72,23 +80,25 @@ def fetch_expression_sheets(basefolder: str, imageid: str):
|
||||||
postfix = f"f{idx}"
|
postfix = f"f{idx}"
|
||||||
|
|
||||||
if filelocation.exists():
|
if filelocation.exists():
|
||||||
print(f"Found cached asset for {imageid}{postfix}.png")
|
LOGGER.info(f"Found cached asset for {imageid}{postfix}.png")
|
||||||
idx += 1
|
idx += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
filename = f"{imageid}{postfix}.png"
|
filename = f"{imageid}{postfix}.png"
|
||||||
atlasurl = f"{atlasurl_base}/{filename}"
|
atlasurl = f"{atlasurl_base}/{filename}"
|
||||||
|
|
||||||
|
LOGGER.debug(f"Loading data for {atlasurl}")
|
||||||
with open(filelocation, 'wb') as handle:
|
with open(filelocation, 'wb') as handle:
|
||||||
response = requests.get(atlasurl, stream=True, timeout=AtlasDefaults.TIMEOUT)
|
response = requests.get(atlasurl, stream=True, timeout=AtlasDefaults.TIMEOUT)
|
||||||
status = response.status_code
|
status = response.status_code
|
||||||
|
LOGGER.debug(f"{response.status_code} - {response.text}")
|
||||||
if status != 200:
|
if status != 200:
|
||||||
continue
|
continue
|
||||||
for block in response.iter_content(1024):
|
for block in response.iter_content(1024):
|
||||||
if not block:
|
if not block:
|
||||||
break
|
break
|
||||||
handle.write(block)
|
handle.write(block)
|
||||||
print(f"Finished downloading {filename}")
|
LOGGER.info(f"Finished downloading {filename}")
|
||||||
idx += 1
|
idx += 1
|
||||||
p = savefolder / f"{idx}.png"
|
p = savefolder / f"{idx}.png"
|
||||||
p.unlink(missing_ok=True)
|
p.unlink(missing_ok=True)
|
||||||
|
@ -99,8 +109,11 @@ def fetch_expression_sheets(basefolder: str, imageid: str):
|
||||||
def fetch_data(servantid: int) -> List[str]:
|
def fetch_data(servantid: int) -> List[str]:
|
||||||
atlasurl = f"https://api.atlasacademy.io/nice/{AtlasDefaults.REGION}/servant/{servantid}?lore=false&lang=en"
|
atlasurl = f"https://api.atlasacademy.io/nice/{AtlasDefaults.REGION}/servant/{servantid}?lore=false&lang=en"
|
||||||
|
|
||||||
|
LOGGER.debug(f"Loading data for {atlasurl}")
|
||||||
response = requests.get(atlasurl, timeout=AtlasDefaults.TIMEOUT)
|
response = requests.get(atlasurl, timeout=AtlasDefaults.TIMEOUT)
|
||||||
|
LOGGER.debug(f"{response.status_code}")
|
||||||
if not response.ok:
|
if not response.ok:
|
||||||
|
LOGGER.debug(f"{response.status_code} - {response.text}")
|
||||||
raise ValueError(f"{response.status_code} - {response.text}")
|
raise ValueError(f"{response.status_code} - {response.text}")
|
||||||
|
|
||||||
responsedata = response.json()
|
responsedata = response.json()
|
||||||
|
@ -108,5 +121,6 @@ def fetch_data(servantid: int) -> List[str]:
|
||||||
charascripts: List[dict[str, str]] = responsedata["charaScripts"]
|
charascripts: List[dict[str, str]] = responsedata["charaScripts"]
|
||||||
chara_ids: List[str] = [chara["id"] for chara in charascripts]
|
chara_ids: List[str] = [chara["id"] for chara in charascripts]
|
||||||
|
|
||||||
print(f"{svtname} ({servantid}) - {len(chara_ids)} charaIds")
|
LOGGER.debug(chara_ids)
|
||||||
|
LOGGER.info(f"{svtname} ({servantid}) - {len(chara_ids)} charaIds")
|
||||||
return chara_ids
|
return chara_ids
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
@ -5,9 +6,11 @@ from collections import Counter
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from tqdm.contrib import itertools as tqdm_itertools
|
from tqdm.contrib import itertools as tqdm_itertools
|
||||||
|
|
||||||
from ..config import Paths
|
from ..config import Paths, Logging
|
||||||
from .atlas import SpritesheetData, fetch_data, fetch_expression_sheets, fetch_config
|
from .atlas import SpritesheetData, fetch_data, fetch_expression_sheets, fetch_config
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(Logging.NAME)
|
||||||
|
|
||||||
def compose(input_id: int, filters: Optional[List[str]] = None):
|
def compose(input_id: int, filters: Optional[List[str]] = None):
|
||||||
Paths.IMAGES.mkdir(exist_ok=True)
|
Paths.IMAGES.mkdir(exist_ok=True)
|
||||||
Paths.OUTPUT.mkdir(exist_ok=True)
|
Paths.OUTPUT.mkdir(exist_ok=True)
|
||||||
|
@ -16,7 +19,7 @@ def compose(input_id: int, filters: Optional[List[str]] = None):
|
||||||
chara_ids = fetch_data(input_id)
|
chara_ids = fetch_data(input_id)
|
||||||
savefolder = Paths.OUTPUT / str(input_id)
|
savefolder = Paths.OUTPUT / str(input_id)
|
||||||
else:
|
else:
|
||||||
print(f"Processing manually uploaded charaId {input_id}")
|
LOGGER.info(f"Processing manually uploaded charaId {input_id}")
|
||||||
savefolder = Paths.OUTPUT / "manual"
|
savefolder = Paths.OUTPUT / "manual"
|
||||||
chara_ids = [str(input_id)]
|
chara_ids = [str(input_id)]
|
||||||
|
|
||||||
|
@ -25,21 +28,26 @@ def compose(input_id: int, filters: Optional[List[str]] = None):
|
||||||
|
|
||||||
if filters is not None:
|
if filters is not None:
|
||||||
chara_ids = [ v for v in chara_ids if v in filters ]
|
chara_ids = [ v for v in chara_ids if v in filters ]
|
||||||
|
LOGGER.debug(chara_ids)
|
||||||
|
|
||||||
for char_id in chara_ids:
|
for char_id in chara_ids:
|
||||||
expfolder = fetch_expression_sheets(savefolder.stem, char_id)
|
expfolder = fetch_expression_sheets(savefolder.stem, char_id)
|
||||||
config = fetch_config(char_id)
|
config = fetch_config(char_id)
|
||||||
process_sprite(expfolder, config, savefolder)
|
process_sprite(expfolder, config, savefolder)
|
||||||
|
|
||||||
print(f"Files have been saved at: {savefolder.absolute()}")
|
LOGGER.info(f"Files have been saved at: {savefolder.absolute()}")
|
||||||
|
|
||||||
|
|
||||||
def calculate_counts(width: int, height: int, facesize: tuple[int, int]):
|
def calculate_counts(width: int, height: int, facesize: tuple[int, int]):
|
||||||
return height // facesize[1], width // facesize[0]
|
rowcount, colcount = height // facesize[1], width // facesize[0]
|
||||||
|
LOGGER.debug(f"{height} | {facesize[1]} --> {rowcount}")
|
||||||
|
LOGGER.debug(f"{width} | {facesize[0]} --> {colcount}")
|
||||||
|
return rowcount, colcount
|
||||||
|
|
||||||
def gen_main_sprite(folder: pathlib.Path):
|
def gen_main_sprite(folder: pathlib.Path):
|
||||||
image = Image.open(folder / "0.png")
|
image = Image.open(folder / "0.png")
|
||||||
width, height = image.size
|
width, height = image.size
|
||||||
|
LOGGER.debug(f"Main sprite ({folder}): {width}:{height}")
|
||||||
return image.crop((0, 0, width, height - 256))
|
return image.crop((0, 0, width, height - 256))
|
||||||
|
|
||||||
def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, outputfolder: pathlib.Path):
|
def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, outputfolder: pathlib.Path):
|
||||||
|
@ -47,6 +55,7 @@ def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, out
|
||||||
image_idx = save_sprite(main_sprite, outputfolder, f"{images_folder.stem}")
|
image_idx = save_sprite(main_sprite, outputfolder, f"{images_folder.stem}")
|
||||||
|
|
||||||
for i in images_folder.iterdir():
|
for i in images_folder.iterdir():
|
||||||
|
LOGGER.debug(f"Idx: {image_idx}")
|
||||||
initial_row = 0
|
initial_row = 0
|
||||||
expressions = Image.open(i)
|
expressions = Image.open(i)
|
||||||
|
|
||||||
|
@ -62,6 +71,7 @@ def process_sprite(images_folder: pathlib.Path, configdata: SpritesheetData, out
|
||||||
img = generate_sprite(main_sprite, expressions, x, y, configdata)
|
img = generate_sprite(main_sprite, expressions, x, y, configdata)
|
||||||
if img is not None:
|
if img is not None:
|
||||||
image_idx = save_sprite(img, outputfolder, f"{images_folder.stem}", image_idx)
|
image_idx = save_sprite(img, outputfolder, f"{images_folder.stem}", image_idx)
|
||||||
|
LOGGER.debug(f"{x}/{y} - {'Invalid' if img is None else 'Valid'} image")
|
||||||
|
|
||||||
|
|
||||||
def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int, col: int, configdata: SpritesheetData) -> Image.Image | None:
|
def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int, col: int, configdata: SpritesheetData) -> Image.Image | None:
|
||||||
|
@ -72,9 +82,11 @@ def generate_sprite(main_sprite: Image.Image, expressions: Image.Image, row: int
|
||||||
(col + 1) * facesize[0],
|
(col + 1) * facesize[0],
|
||||||
(row + 1) * facesize[1]
|
(row + 1) * facesize[1]
|
||||||
)
|
)
|
||||||
|
LOGGER.debug(roi)
|
||||||
expression = expressions.crop(roi)
|
expression = expressions.crop(roi)
|
||||||
|
|
||||||
if is_empty(expression):
|
if is_empty(expression):
|
||||||
|
LOGGER.debug("Image empty")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
mask = Image.new("RGBA", (facesize[0], facesize[1]), (255,255,255,255))
|
mask = Image.new("RGBA", (facesize[0], facesize[1]), (255,255,255,255))
|
||||||
|
@ -92,11 +104,11 @@ def save_sprite(image: Image.Image, outputfolder: pathlib.Path, name: str, idx:
|
||||||
with open(outfile, 'wb') as file:
|
with open(outfile, 'wb') as file:
|
||||||
image.save(file)
|
image.save(file)
|
||||||
|
|
||||||
idx += 1
|
return idx + 1
|
||||||
return idx
|
|
||||||
|
|
||||||
def is_empty(img: Image.Image):
|
def is_empty(img: Image.Image):
|
||||||
data = Counter(img.crop((96, 96, 160, 160)).convert('LA').getdata())
|
data = Counter(img.crop((96, 96, 160, 160)).convert('LA').getdata())
|
||||||
|
LOGGER.debug(f"Counts: {len(data)}")
|
||||||
if len(data) < 10:
|
if len(data) < 10:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
import argparse
|
import argparse
|
||||||
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from .. import __version__
|
from .. import __version__
|
||||||
from ..backend import compose
|
from ..backend import compose
|
||||||
from ..config import Paths, AtlasDefaults
|
from ..config import Paths, AtlasDefaults, Logging
|
||||||
from ..utils.filesystem import rmdir
|
from ..utils.filesystem import rmdir
|
||||||
|
from ..utils.disables import disable_tqdm
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(Logging.NAME)
|
||||||
|
|
||||||
# pylint: disable=too-few-public-methods
|
# pylint: disable=too-few-public-methods
|
||||||
class Arguments(argparse.Namespace):
|
class Arguments(argparse.Namespace):
|
||||||
|
@ -19,6 +22,7 @@ class Arguments(argparse.Namespace):
|
||||||
cacheclear: bool
|
cacheclear: bool
|
||||||
filter: List[str]
|
filter: List[str]
|
||||||
timeout: int
|
timeout: int
|
||||||
|
quiet: bool
|
||||||
|
|
||||||
def parse_arguments():
|
def parse_arguments():
|
||||||
"""
|
"""
|
||||||
|
@ -36,6 +40,7 @@ def parse_arguments():
|
||||||
parser.add_argument("--filter", action="extend", nargs="+", dest="filter", help='Specify one or more spritesheet ids that will be fetched')
|
parser.add_argument("--filter", action="extend", nargs="+", dest="filter", help='Specify one or more spritesheet ids that will be fetched')
|
||||||
parser.add_argument("--timeout", action="store", type=int, default=None, dest="timeout", help="Set the timeout for all requests towards AtlasAcademy, default is 10s")
|
parser.add_argument("--timeout", action="store", type=int, default=None, dest="timeout", help="Set the timeout for all requests towards AtlasAcademy, default is 10s")
|
||||||
parser.add_argument("--clear-cache", action="store_true", default=False, dest="cacheclear", help="Clear cached assets before downloading files")
|
parser.add_argument("--clear-cache", action="store_true", default=False, dest="cacheclear", help="Clear cached assets before downloading files")
|
||||||
|
parser.add_argument("--quiet", "-q", action="store_true", default=False, dest="quiet", help="Disable logging output")
|
||||||
|
|
||||||
|
|
||||||
args = Arguments()
|
args = Arguments()
|
||||||
|
@ -56,7 +61,11 @@ def run_cli():
|
||||||
if args.timeout and args.timeout >= 0:
|
if args.timeout and args.timeout >= 0:
|
||||||
AtlasDefaults.TIMEOUT = args.timeout
|
AtlasDefaults.TIMEOUT = args.timeout
|
||||||
|
|
||||||
welcome()
|
if args.quiet:
|
||||||
|
disable_tqdm()
|
||||||
|
LOGGER.disabled = True
|
||||||
|
else:
|
||||||
|
welcome()
|
||||||
|
|
||||||
input_id = args.id
|
input_id = args.id
|
||||||
if not input_id:
|
if not input_id:
|
||||||
|
@ -67,7 +76,7 @@ def run_cli():
|
||||||
if t <= 0:
|
if t <= 0:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except ValueError:
|
except ValueError:
|
||||||
print("Servant ID has to be a valid integer above 0")
|
LOGGER.error("Servant ID has to be a valid integer above 0")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
input_id = int(input_id)
|
input_id = int(input_id)
|
||||||
|
@ -78,8 +87,8 @@ def run_cli():
|
||||||
cachepath = Paths.IMAGES / "manual" / str(input_id)
|
cachepath = Paths.IMAGES / "manual" / str(input_id)
|
||||||
if cachepath.exists():
|
if cachepath.exists():
|
||||||
rmdir(cachepath)
|
rmdir(cachepath)
|
||||||
print("Successfully cleared cached assets")
|
LOGGER.info("Successfully cleared cached assets")
|
||||||
else:
|
else:
|
||||||
print("No cache to clear was found, continuing")
|
LOGGER.info("No cache to clear was found, continuing")
|
||||||
|
|
||||||
compose(input_id, args.filter)
|
compose(input_id, args.filter)
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
from .config import AtlasDefaults, ExpressionDefaults, Paths
|
from .config import AtlasDefaults, ExpressionDefaults, Logging, Paths
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
# pylint: disable=too-few-public-methods
|
# pylint: disable=too-few-public-methods
|
||||||
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
|
class Logging:
|
||||||
|
_level = os.environ.get("AIC_STDOUT_LEVEL", "info")
|
||||||
|
LEVEL = int(_level) if _level.isdigit() else _level.upper()
|
||||||
|
NAME = "atlasimagecomposer"
|
||||||
|
|
||||||
class Paths:
|
class Paths:
|
||||||
_root = pathlib.Path(__file__).parents[1]
|
_root = pathlib.Path(__file__).parents[1]
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
from .logger import init_logger
|
||||||
|
LOGGER = init_logger()
|
4
atlasimagecomposer/utils/disables.py
Normal file
4
atlasimagecomposer/utils/disables.py
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
def disable_tqdm():
|
||||||
|
from tqdm import tqdm
|
||||||
|
from functools import partialmethod
|
||||||
|
tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)
|
18
atlasimagecomposer/utils/logger.py
Normal file
18
atlasimagecomposer/utils/logger.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from ..config import Logging
|
||||||
|
from .disables import disable_tqdm
|
||||||
|
|
||||||
|
def init_logger():
|
||||||
|
if Logging.LEVEL == "DEBUG":
|
||||||
|
disable_tqdm()
|
||||||
|
|
||||||
|
logger = logging.getLogger(Logging.NAME)
|
||||||
|
logger.setLevel(Logging.LEVEL)
|
||||||
|
handler = logging.StreamHandler(stream=sys.stdout)
|
||||||
|
formatter = logging.Formatter('[%(levelname)s] [%(name)s] %(asctime)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
return logger
|
|
@ -1,6 +1,6 @@
|
||||||
[project]
|
[project]
|
||||||
name = "atlasimagecomposer"
|
name = "atlasimagecomposer"
|
||||||
version = "0.1.0-c.3"
|
version = "0.1.0-c.4"
|
||||||
requires-python = ">= 3.10"
|
requires-python = ">= 3.10"
|
||||||
authors = [{name = "Firq", email = "firelp42@gmail.com"}]
|
authors = [{name = "Firq", email = "firelp42@gmail.com"}]
|
||||||
maintainers = [{name = "Firq", email = "firelp42@gmail.com"}]
|
maintainers = [{name = "Firq", email = "firelp42@gmail.com"}]
|
||||||
|
@ -45,6 +45,7 @@ disable = [
|
||||||
"missing-module-docstring",
|
"missing-module-docstring",
|
||||||
"missing-function-docstring",
|
"missing-function-docstring",
|
||||||
"missing-class-docstring",
|
"missing-class-docstring",
|
||||||
|
"logging-fstring-interpolation",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
|
|
Loading…
Reference in a new issue