mirror of
https://github.com/Kvan7/Exiled-Exchange-2.git
synced 2025-12-17 13:35:52 +00:00
* Price checking does not work when using a gamepad(Ctrl+D) #452 Uses parts from #454 to fix the issue. Co-authored-by: lawrsp <7957003+lawrsp@users.noreply.github.com> * Fix tests :( * Fix magic rarity item name parse in "cmn-Hant" language (#460) * Fix magic rarity item name parse in "cmn-Hant" language * Add by translated when ref is null --------- Co-authored-by: kvan7 <kvan.valve@gmail.com> * chore: yarn to npm and add missing step (#461) * fix: update game config path for linux to poe2 * fix tests * fix: MacOS crash on startup (#428) * fix: MacOS crash on startup * update for windows/linux * move main app startup into function Mac calls that in async, other platforms proceed in sync. * [PoE2] - Relics broken again #444 * test prettier and add npm script * add format/lint support to main * Fix defineProps macro * Run speed v0.8.0 - Russian. #447 * fix negative gold * Merge branch 'Kvan7:master' into master * fix: app-ready fixing before we're ready * Merge branch 'dev' into pr/larssn/428 * should work? * Merge commit 'ab1c8bfa3a31b06da9cf18db0273f6a92e407bc5' into pr/larssn/428 * fix being lazy on the merge * fix: add executable bit to compilation script (#465) * Item Images (#472) * image data stuff * ignore lookup file * update testing * sort items * change sort ot be by refname * more code * working pulling * Fixes #456 Create script to request item's images from trade site #456 * Fixes #457 Update ImageFix to use new saved images before poe1 ones #457 * add images to items * Add Spear as item category * Add Flail as category * Add "goodness" from upstream * Fix #474 Tier # missing from some defense stats #474 * minor oops * remove error for waystones * Add a bunch of images to the docs * Extra widgets docs * Update chat commands links docs * add stash search docs * Item info docs page * more docs * Update bug-report.yml * Version bump * extra version bump --------- Co-authored-by: lawrsp <7957003+lawrsp@users.noreply.github.com> Co-authored-by: Seth Falco <seth@falco.fun> Co-authored-by: Amir Zarrinkafsh <nightah@me.com> Co-authored-by: Lars <890725+larssn@users.noreply.github.com>
231 lines
7.3 KiB
Python
231 lines
7.3 KiB
Python
import argparse
|
|
import copy
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Literal, Optional
|
|
|
|
from rateLimiter import RateLimiter, set_log_level
|
|
from tqdm import tqdm
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
UNIQUE_FILTER = {"type_filters": {"filters": {"rarity": {"option": "unique"}}}}
|
|
NONUNIQUE_FILTER = {"type_filters": {"filters": {"rarity": {"option": "nonunique"}}}}
|
|
BASE_PAYLOAD = {
|
|
"query": {
|
|
"status": {"option": "any"},
|
|
"stats": [{"type": "and", "filters": []}],
|
|
},
|
|
"sort": {"price": "asc"},
|
|
}
|
|
SEARCH_URL = "https://www.pathofexile.com/api/trade2/search/Standard"
|
|
SEARCH_HEADERS = {"content-type": "application/json"}
|
|
FETCH_URL = "https://www.pathofexile.com/api/trade2/fetch/"
|
|
NOT_FOUND = "%NOT_FOUND%"
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Fetch images for items")
|
|
parser.add_argument(
|
|
"-m",
|
|
"--mode",
|
|
choices=["all", "missing", "new"],
|
|
default="new",
|
|
help="Mode to determine which items to process: 'all', 'missing', or 'new'",
|
|
)
|
|
parser.add_argument(
|
|
"-d", "--debug", action="store_true", help="Enable debugging mode"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
class Item:
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
namespace: Literal["UNIQUE", "ITEM", "GEM"],
|
|
unique: Optional[dict[str, str]] = None,
|
|
):
|
|
self.name = name
|
|
self.namespace = namespace
|
|
if namespace == "UNIQUE":
|
|
assert unique is not None
|
|
assert unique.get("base") is not None
|
|
self.base = unique.get("base")
|
|
else:
|
|
self.base = name
|
|
|
|
def __str__(self):
|
|
return f"{self.name} ({self.namespace})"
|
|
|
|
def __repr__(self):
|
|
return f"{self.name} ({self.namespace})"
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Item):
|
|
return False
|
|
return self.name == other.name and self.namespace == other.namespace
|
|
|
|
def __hash__(self):
|
|
return hash((self.name, self.namespace))
|
|
|
|
def search_payload(self) -> dict:
|
|
assert self.base is not None
|
|
payload = copy.deepcopy(BASE_PAYLOAD)
|
|
if self.namespace == "UNIQUE":
|
|
payload["query"]["filters"] = UNIQUE_FILTER
|
|
payload["query"]["type"] = self.base
|
|
payload["query"]["name"] = self.name
|
|
else:
|
|
payload["query"]["filters"] = NONUNIQUE_FILTER
|
|
payload["query"]["type"] = self.name
|
|
return payload
|
|
|
|
def save_name(self) -> str:
|
|
return f"{self.namespace}={self.name}"
|
|
|
|
|
|
def get_script_dir() -> str:
|
|
"""Returns the directory where the script is located."""
|
|
return os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
def get_cache() -> dict[str, str]:
|
|
cache_path = f"{get_script_dir()}/itemImageCache.json"
|
|
|
|
# Check if the file exists and create it if it doesn't
|
|
if not os.path.exists(cache_path):
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
json.dump({}, f)
|
|
|
|
# Read and return the contents of the file
|
|
with open(cache_path, "r", encoding="utf-8") as f:
|
|
return json.loads(f.read())
|
|
|
|
|
|
def save_cache(cache: dict[str, str], output_file: str = "itemImageCache.json"):
|
|
cache_path = f"{get_script_dir()}/{output_file}"
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
json.dump(cache, f)
|
|
|
|
|
|
def get_items() -> list[dict[str, str]]:
|
|
with open(f"{get_script_dir()}/en/items.ndjson", "r", encoding="utf-8") as f:
|
|
return [json.loads(line) for line in f.readlines()]
|
|
|
|
|
|
def convert_item(item: dict[str, str]) -> Item:
|
|
return Item(item.get("name"), item.get("namespace"), item.get("unique"))
|
|
|
|
|
|
def convert_items(items: list[dict[str, str]]) -> list[Item]:
|
|
return [convert_item(item) for item in items]
|
|
|
|
|
|
def get_fetch_id(item: Item, net: RateLimiter) -> str | None:
|
|
payload = item.search_payload()
|
|
for attempt in range(3):
|
|
result = net.post(SEARCH_URL, payload=payload, headers=SEARCH_HEADERS)
|
|
if result.status_code == 200:
|
|
data = result.json()
|
|
if data.get("result") and len(data.get("result")) > 0:
|
|
return data.get("result")[0]
|
|
else:
|
|
logger.debug(f"No results for {item.name}")
|
|
return None
|
|
elif result.status_code != 429:
|
|
logger.error(f"Unexpected status code: {result.status_code}")
|
|
raise Exception(
|
|
f"Unexpected status code: {result.status_code}\n {result.text}"
|
|
)
|
|
raise Exception(f"Retry limit exceeded for {item.name}")
|
|
|
|
|
|
def fetch_listing(fetch_id: str, net: RateLimiter) -> dict | None:
|
|
for attempt in range(3):
|
|
result = net.get(FETCH_URL + fetch_id)
|
|
if result.status_code == 200:
|
|
data = result.json()
|
|
if data.get("result") and len(data.get("result")) > 0:
|
|
return data.get("result")[0]
|
|
else:
|
|
return None
|
|
elif result.status_code != 429:
|
|
logger.error(f"Unexpected status code: {result.status_code}")
|
|
raise Exception(
|
|
f"Unexpected status code: {result.status_code}\n {result.text}"
|
|
)
|
|
raise Exception(f"Retry limit exceeded for {fetch_id}")
|
|
|
|
|
|
def parse_listing_for_url(listing: dict) -> str | None:
|
|
item = listing.get("item")
|
|
if item is None:
|
|
return None
|
|
icon_url = item.get("icon")
|
|
if icon_url is None:
|
|
return None
|
|
if not icon_url.startswith(
|
|
"https://web.poecdn.com/gen/image"
|
|
) or not icon_url.endswith(".png"):
|
|
raise Exception(f"Unexpected icon url: {icon_url}")
|
|
return icon_url
|
|
|
|
|
|
def get_image_url(item: Item, net: RateLimiter) -> str | None:
|
|
fetch_id = get_fetch_id(item, net)
|
|
if fetch_id is None:
|
|
logger.warning(f"No items found for {item.name}")
|
|
return None
|
|
listing = fetch_listing(fetch_id, net)
|
|
if listing is None:
|
|
logger.warning(f"Listing missing for {item.name} | {fetch_id}")
|
|
return None
|
|
icon_url = parse_listing_for_url(listing)
|
|
if icon_url is None:
|
|
logger.warning(f"Icon missing for {item.name} | {fetch_id}")
|
|
return None
|
|
return icon_url
|
|
|
|
|
|
def main(mode: Literal["all", "missing", "new"] = "new", debug=False):
|
|
net = RateLimiter(debug=debug)
|
|
raw_items = get_items()
|
|
items = convert_items(raw_items)
|
|
|
|
cache = get_cache()
|
|
# backup old cache
|
|
save_cache(cache, output_file="itemImageCache.old.json")
|
|
|
|
if mode == "all":
|
|
filtered_items = items
|
|
elif mode == "missing":
|
|
filtered_items = [
|
|
item
|
|
for item in items
|
|
if item.save_name() in cache and cache[item.save_name()] == NOT_FOUND
|
|
]
|
|
else: # mode == "new"
|
|
filtered_items = [item for item in items if item.save_name() not in cache]
|
|
|
|
for item in tqdm(filtered_items):
|
|
image_url = get_image_url(item, net)
|
|
if image_url is not None:
|
|
logger.info(f"{item.save_name()}\t{image_url}")
|
|
cache[item.save_name()] = image_url
|
|
else:
|
|
logger.info(f"{item.save_name()}\t{NOT_FOUND}")
|
|
cache[item.save_name()] = NOT_FOUND
|
|
|
|
save_cache(cache)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
set_log_level(logging.ERROR)
|
|
args = parse_args()
|
|
main(mode=args.mode, debug=args.debug)
|