#!/usr/bin/env python3 """Fetch the latest complete NDX close and actively deliver a QQBot daily report.""" from __future__ import annotations import csv import base64 import json import logging import os import re import shutil import subprocess import sys import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, time as datetime_time, timedelta, timezone from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from pathlib import Path from typing import Any from zoneinfo import ZoneInfo, ZoneInfoNotFoundError SCRIPT_DIR = Path(__file__).resolve().parent CONFIG_FILE = SCRIPT_DIR / "config.yaml" DEFAULT_HEADERS = { "Accept": "application/json,text/plain,*/*", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36" ), } class SilentExit(Exception): """Expected silent stop after writing runtime log.""" class IncompleteMarketData(ValueError): """The upstream data is valid but not a complete regular close yet.""" @dataclass(frozen=True) class MarketData: trade_date: str close: Decimal change: Decimal change_percent: Decimal previous_close: Decimal | None source: str def main() -> None: config = load_config(CONFIG_FILE) logger = setup_logging(config) logger.info("脚本启动") logger.info("config 加载成功: %s", CONFIG_FILE) try: validate_qqbot_config(config, logger) ensure_target_openid(config, logger) market_data = fetch_market_data(config, logger) state = load_state(config, logger) delivery_tz = get_timezone(config["report"].get("timezone_delivery", "Asia/Shanghai"), logger) beijing_now = datetime.now(delivery_tz) beijing_date = beijing_now.date().isoformat() last_reported = state.get("lastNewDataReportedDate") logger.info( "当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s", beijing_date, market_data.trade_date, last_reported, ) if not last_reported: initialize_state(config, state, market_data, beijing_now, logger) raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state") need_new_data_report = market_data.trade_date > str(last_reported) replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True)) need_replay = ( market_data.trade_date == str(last_reported) and replay_on_weekdays and beijing_now.weekday() < 5 and beijing_date not in state.get("deliveries", {}) ) logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay) if not need_new_data_report and not need_replay: raise SilentExit("不满足推送条件") report_text = build_report(market_data) logger.info("日报正文生成成功") target_type = config["qqbot"].get("target_type", "c2c") target_openid = config["qqbot"].get("target_openid", "") delivery_target = f"qqbot:{target_type}:{target_openid}" try: send_qqbot_message(config, report_text, logger) except Exception as exc: logger.error("QQBot 投递失败: %s", summarize_error(exc)) append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc) raise SilentExit("QQBot 投递失败") update_success_state(state, market_data, beijing_now) append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger) save_state(config, state, logger) logger.info("QQBot 投递成功,state 更新成功") raise SilentExit("投递完成") except SilentExit as exc: logger.info("脚本静默结束原因: %s", exc) except Exception as exc: logger.exception("脚本静默结束原因: 未处理异常: %s", summarize_error(exc)) def load_config(path: Path) -> dict[str, Any]: text = path.read_text(encoding="utf-8") try: import yaml # type: ignore data = yaml.safe_load(text) if not isinstance(data, dict): raise ValueError("config.yaml root must be a mapping") return data except ModuleNotFoundError: return parse_simple_yaml(text) def parse_simple_yaml(text: str) -> dict[str, Any]: root: dict[str, Any] = {} current: dict[str, Any] | None = None for raw_line in text.splitlines(): line = raw_line.split("#", 1)[0].rstrip() if not line.strip(): continue if not line.startswith(" "): key = line.rstrip(":").strip() root[key] = {} current = root[key] continue if current is None or ":" not in line: continue key, value = line.strip().split(":", 1) current[key.strip()] = parse_scalar(value.strip()) return root def parse_scalar(value: str) -> Any: if value in {"true", "True"}: return True if value in {"false", "False"}: return False if value in {'""', "''"}: return "" if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): return value[1:-1] try: return int(value) except ValueError: return value def setup_logging(config: dict[str, Any]) -> logging.Logger: log_file = resolve_path(config["paths"]["runtime_log_file"]) log_file.parent.mkdir(parents=True, exist_ok=True) logger = logging.getLogger("ndx_daily_report") logger.setLevel(logging.DEBUG) logger.handlers.clear() handler = logging.FileHandler(log_file, encoding="utf-8") handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s")) logger.addHandler(handler) return logger def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None: qqbot = config.get("qqbot", {}) appid = str(qqbot.get("appid", "")).strip() appkey = str(qqbot.get("appkey", "")).strip() target_type = str(qqbot.get("target_type", "c2c")).strip() logger.info( "QQBot 配置检查: appid=%s, appkey=%s, target_type=%s", mask_secret(appid), mask_secret(appkey), target_type, ) if not appid or appid == "用户填写": raise SilentExit("QQBot AppID 未配置") if not appkey or appkey == "用户填写": raise SilentExit("QQBot AppKey 未配置") if target_type not in {"c2c", "group"}: raise SilentExit("QQBot target_type 必须为 c2c 或 group") def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None: qqbot = config["qqbot"] target_openid = str(qqbot.get("target_openid", "")).strip() logger.info("target_openid 是否存在: %s", bool(target_openid)) if target_openid: return logger.info("target_openid 自动发现开始") discovered = discover_openid(config, logger) if not discovered: raise SilentExit( "target_openid 自动发现失败;请先给 QQBot 发一条私聊消息,或在目标群里 @ 机器人,然后重新运行脚本" ) qqbot["target_openid"] = discovered if bool(qqbot.get("save_discovered_target", True)): save_target_openid(CONFIG_FILE, discovered) logger.info("已自动发现 QQBot target_openid,并写回 config.yaml") raise SilentExit("target_openid 自动发现成功,本次只完成配置初始化") def discover_openid(config: dict[str, Any], logger: logging.Logger) -> str | None: target_type = str(config["qqbot"].get("target_type", "c2c")) field = "group_openid" if target_type == "group" else "user_openid" patterns = [ re.compile(rf'"{field}"\s*:\s*"([^"]+)"'), re.compile(rf"{field}\s*[:=]\s*([A-Za-z0-9_-]+)"), ] candidates = [ Path(os.environ.get("HERMES_HOME", "")) / "logs", Path(os.environ.get("HERMES_HOME", "")) / "data", Path.home() / ".hermes", Path.home() / ".qqbot", SCRIPT_DIR, ] deadline = time.time() + int(config["qqbot"].get("auto_discover_timeout_seconds", 60)) while time.time() <= deadline: for base in candidates: if not base or not base.exists(): continue for path in list_recent_text_files(base): try: text = path.read_text(encoding="utf-8", errors="ignore") except OSError: continue for pattern in patterns: match = pattern.search(text) if match: logger.info("target_openid 自动发现成功: source=%s", path) return match.group(1) time.sleep(2) logger.warning("target_openid 自动发现失败: 未找到本地事件缓存或日志字段 %s", field) return None def list_recent_text_files(base: Path) -> list[Path]: suffixes = {".log", ".json", ".jsonl", ".txt"} try: paths = [p for p in base.rglob("*") if p.is_file() and p.suffix.lower() in suffixes] except OSError: return [] paths.sort(key=lambda p: p.stat().st_mtime, reverse=True) return paths[:50] def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData: try: return fetch_nasdaq_quote(config, logger) except IncompleteMarketData as exc: logger.warning("Nasdaq quote API 尚无完整收盘数据: %s", summarize_error(exc)) raise SilentExit("尚无最近完整美股常规交易日收盘数据") except Exception as exc: logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc)) return fetch_fred_csv(config, logger) def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> MarketData: url = str(config["nasdaq"]["quote_api"]) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) logger.info("Nasdaq quote API 请求开始") payload = http_json( "GET", url, timeout=timeout, proxy_config=get_proxy_config(config), proxy_authorization=proxy_authorization_header(config), ) data = payload.get("data", {}) primary = data.get("primaryData", {}) key_stats = data.get("keyStats", {}) market_status = str(data.get("marketStatus", "")).strip() close = parse_decimal(primary.get("lastSalePrice")) change = parse_decimal(primary.get("netChange")) change_percent = parse_decimal(primary.get("percentageChange")) previous_close = parse_decimal(key_stats.get("previousclose", {}).get("value")) timestamp = str(primary.get("lastTradeTimestamp", "")).strip() if not timestamp: raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp") trade_timezone = config["report"].get("timezone_trade", "America/New_York") trade_dt = parse_trade_datetime(timestamp, trade_timezone) validate_complete_regular_close(trade_dt, market_status, trade_timezone) trade_date = trade_dt.date().isoformat() logger.info( "NDX 字段解析结果: tradeDate=%s marketStatus=%s close=%s change=%s changePercent=%s previousClose=%s", trade_date, market_status, money(close), signed(change), signed(change_percent), money(previous_close), ) calculated_change = close - previous_close calculated_percent = (calculated_change / previous_close) * Decimal("100") logger.info( "previous close 复算结果: change=%s changePercent=%s", signed(calculated_change), signed(calculated_percent), ) if abs(calculated_change - change) > Decimal("0.05"): raise ValueError("Nasdaq 点数涨跌与 previous close 复算不一致") if abs(calculated_percent - change_percent) > Decimal("0.01"): raise ValueError("Nasdaq 涨跌幅与 previous close 复算不一致") logger.info("涨跌幅一致性校验结果: 通过") return MarketData(trade_date, close, change, change_percent, previous_close, "nasdaq_quote") def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData: url = str(config["nasdaq"]["fred_csv"]) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) logger.info("FRED CSV 请求开始") text = http_text( "GET", url, timeout=timeout, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}, proxy_config=get_proxy_config(config), proxy_authorization=proxy_authorization_header(config), ).decode("utf-8-sig") rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}] if len(rows) < 2: raise ValueError("FRED CSV 有效数据不足") previous = rows[-2] latest = rows[-1] close = parse_decimal(latest["NASDAQ100"]) previous_close = parse_decimal(previous["NASDAQ100"]) change = close - previous_close change_percent = (change / previous_close) * Decimal("100") trade_date = latest["observation_date"] logger.info( "FRED 数据解析结果: tradeDate=%s close=%s change=%s changePercent=%s", trade_date, money(close), signed(change), signed(change_percent), ) return MarketData(trade_date, close, change, change_percent, previous_close, "fred_csv") def parse_trade_date(timestamp: str, timezone_name: str) -> str: return parse_trade_datetime(timestamp, timezone_name).date().isoformat() def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime: cleaned = timestamp.replace("ET", "").strip() tz = get_timezone(timezone_name) formats = [ ("%b %d, %Y %I:%M %p", False), ("%m/%d/%Y %I:%M %p", False), ("%Y-%m-%d %H:%M:%S", False), ("%b %d, %Y", True), ("%m/%d/%Y", True), ("%Y-%m-%d", True), ] for fmt, date_only in formats: try: dt = datetime.strptime(cleaned, fmt) if date_only: # Nasdaq occasionally returns only the date for an index close. dt = datetime.combine(dt.date(), datetime_time(16, 0)) return dt.replace(tzinfo=tz) except ValueError: continue try: dt = datetime.fromisoformat(cleaned) if dt.tzinfo is None: return dt.replace(tzinfo=tz) return dt.astimezone(tz) except ValueError as exc: raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None: tz = get_timezone(timezone_name) close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz) if market_status.lower() == "open": raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据") if trade_dt < close_dt: raise IncompleteMarketData( f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET" ) def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone: try: return ZoneInfo(timezone_name) except ZoneInfoNotFoundError: fallback_offsets = { "Asia/Shanghai": 8, # NDX trade date is parsed from Nasdaq's ET timestamp. A fixed EST # fallback is enough to preserve the date without requiring tzdata. "America/New_York": -5, } if timezone_name not in fallback_offsets: raise if logger: logger.warning("系统缺少 tzdata,使用固定时区兜底: %s", timezone_name) return timezone(timedelta(hours=fallback_offsets[timezone_name]), timezone_name) def load_state(config: dict[str, Any], logger: logging.Logger) -> dict[str, Any]: state_file = resolve_path(config["paths"]["state_file"]) if not state_file.exists() or state_file.stat().st_size == 0: logger.info("state 文件不存在或为空: %s", state_file) return {"deliveries": {}} try: data = json.loads(state_file.read_text(encoding="utf-8")) if not isinstance(data, dict): raise ValueError("state root is not object") data.setdefault("deliveries", {}) logger.info("state 文件加载成功: %s", state_file) return data except Exception as exc: logger.warning("state 文件加载失败,将修复为初始状态: %s", summarize_error(exc)) return {"deliveries": {}} def initialize_state( config: dict[str, Any], state: dict[str, Any], market_data: MarketData, now: datetime, logger: logging.Logger, ) -> None: state["lastNewDataReportedDate"] = market_data.trade_date state["lastData"] = serialize_market_data(market_data, now) state["deliveries"] = {} save_state(config, state, logger) logger.info("state 文件初始化成功") def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None: beijing_date = now.date().isoformat() state["lastNewDataReportedDate"] = market_data.trade_date state["lastData"] = serialize_market_data(market_data, now) state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date def save_state(config: dict[str, Any], state: dict[str, Any], logger: logging.Logger) -> None: state_file = resolve_path(config["paths"]["state_file"]) state_file.parent.mkdir(parents=True, exist_ok=True) state_file.write_text(json.dumps(state, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") logger.info("state 保存成功: %s", state_file) def build_report(data: MarketData) -> str: if data.change > 0: direction = "上涨" elif data.change < 0: direction = "下跌" else: direction = "基本持平" return ( f"{data.trade_date}(美国东部时间)Nasdaq-100 Index 收盘\n\n" f"- 收盘:{money(data.close)}\n" f"- 涨跌:{signed(data.change)} 点\n" f"- 涨跌幅:{signed(data.change_percent)}%\n\n" f"一句简短总结:相比前一交易日,Nasdaq-100 今日{direction}。" ) def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Logger) -> None: qqbot = config["qqbot"] token = get_qqbot_token(config, logger) target_type = str(qqbot.get("target_type", "c2c")) target_openid = str(qqbot.get("target_openid", "")).strip() base_api = str(qqbot.get("base_api", "https://api.sgroup.qq.com")).rstrip("/") if target_type == "group": url = f"{base_api}/v2/groups/{target_openid}/messages" else: url = f"{base_api}/v2/users/{target_openid}/messages" logger.info("QQBot 投递开始: target_type=%s", target_type) body = {"content": content, "msg_type": 0} http_json( "POST", url, body=body, timeout=15, headers={"Authorization": f"QQBot {token}"}, proxy_config=get_proxy_config(config), proxy_authorization=proxy_authorization_header(config), ) logger.info("QQBot 投递成功") def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str: qqbot = config["qqbot"] url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} payload = http_json( "POST", url, body=body, timeout=15, proxy_config=get_proxy_config(config), proxy_authorization=proxy_authorization_header(config), ) token = payload.get("access_token") or payload.get("accessToken") if not token: raise ValueError("QQBot token 响应缺少 access_token") logger.info("QQBot access_token 获取成功") return str(token) def append_delivery_log( config: dict[str, Any], market_data: MarketData, now: datetime, delivery_target: str, status: str, logger: logging.Logger, error: Exception | None = None, ) -> None: log_file = resolve_path(config["paths"]["delivery_log_file"]) log_file.parent.mkdir(parents=True, exist_ok=True) record = { "beijingDate": now.date().isoformat(), "tradeDate": market_data.trade_date, "generatedAt": now.isoformat(), "close": money(market_data.close), "change": signed(market_data.change), "changePercent": signed(market_data.change_percent), "deliveryTarget": delivery_target, "status": status, } if error is not None: record["error"] = summarize_error(error) with log_file.open("a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False) + "\n") logger.info("delivery log 追加成功: status=%s", status) def http_json( method: str, url: str, body: dict[str, Any] | None = None, timeout: int = 15, headers: dict[str, str] | None = None, proxy_config: dict[str, Any] | None = None, proxy_authorization: str | None = None, ) -> dict[str, Any]: text = http_text( method, url, body=body, timeout=timeout, headers=headers, proxy_config=proxy_config, proxy_authorization=proxy_authorization, ).decode("utf-8") parsed = json.loads(text) if not isinstance(parsed, dict): raise ValueError("JSON response root is not object") return parsed def http_text( method: str, url: str, body: dict[str, Any] | None = None, timeout: int = 15, headers: dict[str, str] | None = None, proxy_config: dict[str, Any] | None = None, proxy_authorization: str | None = None, ) -> bytes: proxy_config = proxy_config or {} if should_use_curl(proxy_config): return curl_request(method, url, body, timeout, headers, proxy_config) request_headers = dict(DEFAULT_HEADERS) if headers: request_headers.update(headers) if proxy_authorization: request_headers["Proxy-Authorization"] = proxy_authorization data = None if body is not None: data = json.dumps(body, ensure_ascii=False).encode("utf-8") request_headers["Content-Type"] = "application/json" request = urllib.request.Request(url, data=data, headers=request_headers, method=method) opener = build_url_opener(proxy_config or {}) try: with opener.open(request, timeout=timeout) as response: return response.read() except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace")[:300] raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc def curl_request( method: str, url: str, body: dict[str, Any] | None, timeout: int, headers: dict[str, str] | None, proxy_config: dict[str, Any], ) -> bytes: curl = shutil.which("curl.exe") or shutil.which("curl") if not curl: raise RuntimeError("network.transport 需要 curl,但当前环境未找到 curl") command = [ curl, "--silent", "--show-error", "--fail-with-body", "--max-time", str(timeout), "--request", method, ] if bool(proxy_config.get("curl_http1_1", True)): # Some HTTPS proxies intermittently fail while tunnelling HTTP/2. command.append("--http1.1") retry_count = max(0, int(proxy_config.get("curl_retry_count", 2))) if retry_count and method.upper() in {"GET", "HEAD"}: command.extend( [ "--retry", str(retry_count), "--retry-delay", str(max(0, int(proxy_config.get("curl_retry_delay_seconds", 1)))), "--retry-all-errors", ] ) proxy_url = str(proxy_config.get("proxy_url", "")).strip() if proxy_url: command.extend(["--proxy", proxy_url]) request_headers = dict(DEFAULT_HEADERS) if headers: request_headers.update(headers) for key, value in request_headers.items(): command.extend(["--header", f"{key}: {value}"]) input_bytes = None if body is not None: input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8") command.extend(["--data-binary", "@-"]) command.append(url) completed = subprocess.run( command, input=input_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if completed.returncode != 0: error = completed.stderr.decode("utf-8", errors="replace").strip() if not error: error = completed.stdout.decode("utf-8", errors="replace")[:300] raise RuntimeError(f"curl request failed: {error[:300]}") return completed.stdout def should_use_curl(proxy_config: dict[str, Any]) -> bool: transport = str(proxy_config.get("transport", "auto")).strip().lower() proxy_url = str(proxy_config.get("proxy_url", "")).strip().lower() if transport == "curl": return True if transport == "urllib": return False if proxy_url.startswith("https://"): return True return bool(proxy_config.get("use_environment_proxy", True)) and has_environment_proxy() def has_environment_proxy() -> bool: proxies = urllib.request.getproxies() return any(key.lower() in {"http", "https", "all"} and value for key, value in proxies.items()) def build_url_opener(proxy_config: dict[str, Any]) -> urllib.request.OpenerDirector: proxy_url = str(proxy_config.get("proxy_url", "")).strip() if proxy_url: return urllib.request.build_opener( urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url}) ) if bool(proxy_config.get("use_environment_proxy", False)): return urllib.request.build_opener() return urllib.request.build_opener(urllib.request.ProxyHandler({})) def get_proxy_config(config: dict[str, Any]) -> dict[str, Any]: network = config.get("network", {}) if not isinstance(network, dict): return {} return network def proxy_authorization_header(config: dict[str, Any]) -> str | None: proxy_url = str(get_proxy_config(config).get("proxy_url", "")).strip() if not proxy_url: return None parsed = urllib.parse.urlsplit(proxy_url) if not parsed.username: return None username = urllib.parse.unquote(parsed.username) password = urllib.parse.unquote(parsed.password or "") token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii") return f"Basic {token}" def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: return { "tradeDate": data.trade_date, "close": money(data.close), "change": signed(data.change), "changePercent": signed(data.change_percent), "updatedAt": now.isoformat(), "source": data.source, } def save_target_openid(path: Path, openid: str) -> None: lines = path.read_text(encoding="utf-8").splitlines() in_qqbot = False updated = False new_lines: list[str] = [] for line in lines: if line.startswith("qqbot:"): in_qqbot = True elif line and not line.startswith(" ") and not line.startswith("#"): in_qqbot = False if in_qqbot and re.match(r"\s*target_openid\s*:", line): indent = line[: len(line) - len(line.lstrip())] new_lines.append(f'{indent}target_openid: "{openid}"') updated = True else: new_lines.append(line) if not updated: new_lines.append(f' target_openid: "{openid}"') path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") def resolve_path(value: str) -> Path: path = Path(str(value)) if path.is_absolute(): return path return (SCRIPT_DIR / path).resolve() def parse_decimal(value: Any) -> Decimal: if value is None: raise ValueError("缺少数值字段") cleaned = str(value).replace("$", "").replace(",", "").replace("%", "").replace("+", "").strip() try: return Decimal(cleaned) except InvalidOperation as exc: raise ValueError(f"无法解析数值: {value}") from exc def money(value: Decimal) -> str: return f"{value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP):,}" def signed(value: Decimal) -> str: rounded = value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) prefix = "+" if rounded > 0 else "" return f"{prefix}{rounded:,}" def mask_secret(value: str) -> str: if not value: return "" if len(value) <= 8: return "****" return f"{value[:4]}****{value[-4:]}" def summarize_error(exc: Exception) -> str: text = str(exc).replace("\n", " ").strip() return text[:300] if text else exc.__class__.__name__ if __name__ == "__main__": try: main() except Exception: sys.exit(0)