Files

972 lines
34 KiB
Python

#!/usr/bin/env python3
"""Fetch the latest complete NDX close and actively deliver a QQBot daily report."""
from __future__ import annotations
import csv
import base64
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, time as datetime_time, timedelta, timezone
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
SCRIPT_DIR = Path(__file__).resolve().parent
CONFIG_FILE = SCRIPT_DIR / "config.yaml"
DEFAULT_HEADERS = {
"Accept": "application/json,text/plain,*/*",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36"
),
}
class SilentExit(Exception):
"""Expected silent stop after writing runtime log."""
class IncompleteMarketData(ValueError):
"""The upstream data is valid but not a complete regular close yet."""
@dataclass(frozen=True)
class MarketData:
trade_date: str
close: Decimal
change: Decimal
change_percent: Decimal
previous_close: Decimal | None
source: str
def main() -> None:
config = load_config(CONFIG_FILE)
logger = setup_logging(config)
logger.info("脚本启动")
logger.info("config 加载成功: %s", CONFIG_FILE)
try:
validate_qqbot_config(config, logger)
ensure_target_openid(config, logger)
market_data = fetch_market_data(config, logger)
state = load_state(config, logger)
delivery_tz = get_timezone(config["report"].get("timezone_delivery", "Asia/Shanghai"), logger)
beijing_now = datetime.now(delivery_tz)
beijing_date = beijing_now.date().isoformat()
last_reported = state.get("lastNewDataReportedDate")
last_close_reported = state.get("lastScheduledCloseReportedDate")
logger.info(
"当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s, "
"lastScheduledCloseReportedDate=%s",
beijing_date,
market_data.trade_date,
last_reported,
last_close_reported,
)
if not last_reported:
initialize_state(config, state, market_data, beijing_now, logger)
report_config = config["report"]
trade_tz = get_timezone(report_config.get("timezone_trade", "America/New_York"), logger)
scheduled_close_report_at = next_delivery_time_after_close(
market_data.trade_date,
trade_tz,
delivery_tz,
int(report_config.get("delivery_hour", 8)),
)
need_new_data_report = (
market_data.trade_date > str(last_close_reported or "")
and beijing_now >= scheduled_close_report_at
)
replay_after_china_holiday = bool(
report_config.get(
"replay_after_china_holiday",
report_config.get("replay_on_weekdays", True),
)
)
today_is_workday, yesterday_is_workday = get_china_workday_pair(
config,
beijing_now,
logger,
)
need_replay = should_replay_report(
beijing_now,
state.get("deliveries", {}),
replay_after_china_holiday,
today_is_workday,
yesterday_is_workday,
int(report_config.get("delivery_hour", 8)),
)
logger.info(
"美股收盘定时报告=%s (计划时间=%s), 中国休息日后首工作日复述=%s",
need_new_data_report,
scheduled_close_report_at.isoformat(),
need_replay,
)
if not need_new_data_report and not need_replay:
raise SilentExit("不满足推送条件")
report_text = build_report(market_data)
logger.info("日报正文生成成功")
target_type = config["qqbot"].get("target_type", "c2c")
target_openid = config["qqbot"].get("target_openid", "")
delivery_target = f"qqbot:{target_type}:{target_openid}"
try:
send_qqbot_message(config, report_text, logger)
except Exception as exc:
logger.error("QQBot 投递失败: %s", summarize_error(exc))
append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc)
raise SilentExit("QQBot 投递失败")
update_success_state(state, market_data, beijing_now, need_new_data_report)
append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger)
save_state(config, state, logger)
logger.info("QQBot 投递成功,state 更新成功")
raise SilentExit("投递完成")
except SilentExit as exc:
logger.info("脚本静默结束原因: %s", exc)
except Exception as exc:
logger.exception("脚本静默结束原因: 未处理异常: %s", summarize_error(exc))
def load_config(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
try:
import yaml # type: ignore
data = yaml.safe_load(text)
if not isinstance(data, dict):
raise ValueError("config.yaml root must be a mapping")
return data
except ModuleNotFoundError:
return parse_simple_yaml(text)
def parse_simple_yaml(text: str) -> dict[str, Any]:
root: dict[str, Any] = {}
current: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" "):
key = line.rstrip(":").strip()
root[key] = {}
current = root[key]
continue
if current is None or ":" not in line:
continue
key, value = line.strip().split(":", 1)
current[key.strip()] = parse_scalar(value.strip())
return root
def parse_scalar(value: str) -> Any:
if value in {"true", "True"}:
return True
if value in {"false", "False"}:
return False
if value in {'""', "''"}:
return ""
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
return value[1:-1]
try:
return int(value)
except ValueError:
return value
def setup_logging(config: dict[str, Any]) -> logging.Logger:
log_file = resolve_path(config["paths"]["runtime_log_file"])
log_file.parent.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("ndx_daily_report")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
handler = logging.FileHandler(log_file, encoding="utf-8")
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(handler)
return logger
def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config.get("qqbot", {})
appid, appsecret = get_qqbot_credentials(config)
credential_source = get_qqbot_credential_source(config)
target_type = str(qqbot.get("target_type", "c2c")).strip()
logger.info(
"QQBot 配置检查: source=%s, appid=%s, appsecret=%s, target_type=%s",
credential_source,
mask_secret(appid),
mask_secret(appsecret),
target_type,
)
if not appid or appid == "用户填写":
raise SilentExit("QQBot AppID 未配置")
if not appsecret or appsecret == "用户填写":
raise SilentExit("QQBot AppSecret 未配置")
if target_type not in {"c2c", "group"}:
raise SilentExit("QQBot target_type 必须为 c2c 或 group")
def get_qqbot_credentials(config: dict[str, Any]) -> tuple[str, str]:
qqbot = config.get("qqbot", {})
appid = os.environ.get("QQBOT_APPID") or qqbot.get("appid", "")
appsecret = (
os.environ.get("QQBOT_SECRET")
or qqbot.get("appsecret")
or qqbot.get("appkey", "")
)
return str(appid).strip(), str(appsecret).strip()
def get_qqbot_credential_source(config: dict[str, Any]) -> str:
if os.environ.get("QQBOT_APPID") or os.environ.get("QQBOT_SECRET"):
return "environment"
qqbot = config.get("qqbot", {})
if qqbot.get("appsecret"):
return "config.appsecret"
if qqbot.get("appkey"):
return "config.appkey(legacy)"
return "config"
def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config["qqbot"]
target_openid = str(qqbot.get("target_openid", "")).strip()
logger.info("target_openid 是否存在: %s", bool(target_openid))
if target_openid:
return
logger.info("target_openid 自动发现开始")
discovered = discover_openid(config, logger)
if not discovered:
raise SilentExit(
"target_openid 自动发现失败;请先给 QQBot 发一条私聊消息,或在目标群里 @ 机器人,然后重新运行脚本"
)
qqbot["target_openid"] = discovered
if bool(qqbot.get("save_discovered_target", True)):
save_target_openid(CONFIG_FILE, discovered)
logger.info("已自动发现 QQBot target_openid,并写回 config.yaml")
raise SilentExit("target_openid 自动发现成功,本次只完成配置初始化")
def discover_openid(config: dict[str, Any], logger: logging.Logger) -> str | None:
target_type = str(config["qqbot"].get("target_type", "c2c"))
field = "group_openid" if target_type == "group" else "user_openid"
patterns = [
re.compile(rf'"{field}"\s*:\s*"([^"]+)"'),
re.compile(rf"{field}\s*[:=]\s*([A-Za-z0-9_-]+)"),
]
candidates = [
Path(os.environ.get("HERMES_HOME", "")) / "logs",
Path(os.environ.get("HERMES_HOME", "")) / "data",
Path.home() / ".hermes",
Path.home() / ".qqbot",
SCRIPT_DIR,
]
deadline = time.time() + int(config["qqbot"].get("auto_discover_timeout_seconds", 60))
while time.time() <= deadline:
for base in candidates:
if not base or not base.exists():
continue
for path in list_recent_text_files(base):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
for pattern in patterns:
match = pattern.search(text)
if match:
logger.info("target_openid 自动发现成功: source=%s", path)
return match.group(1)
time.sleep(2)
logger.warning("target_openid 自动发现失败: 未找到本地事件缓存或日志字段 %s", field)
return None
def list_recent_text_files(base: Path) -> list[Path]:
suffixes = {".log", ".json", ".jsonl", ".txt"}
try:
paths = [p for p in base.rglob("*") if p.is_file() and p.suffix.lower() in suffixes]
except OSError:
return []
paths.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return paths[:50]
def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData:
try:
return fetch_nasdaq_quote(config, logger)
except IncompleteMarketData as exc:
logger.warning("Nasdaq quote API 尚无完整收盘数据: %s", summarize_error(exc))
raise SilentExit("尚无最近完整美股常规交易日收盘数据")
except Exception as exc:
logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc))
return fetch_fred_csv(config, logger)
def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> MarketData:
url = str(config["nasdaq"]["quote_api"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("Nasdaq quote API 请求开始")
payload = http_json(
"GET",
url,
timeout=timeout,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
data = payload.get("data", {})
primary = data.get("primaryData", {})
key_stats = data.get("keyStats", {})
market_status = str(data.get("marketStatus", "")).strip()
close = parse_decimal(primary.get("lastSalePrice"))
change = parse_decimal(primary.get("netChange"))
change_percent = parse_decimal(primary.get("percentageChange"))
previous_close = parse_decimal(key_stats.get("previousclose", {}).get("value"))
timestamp = str(primary.get("lastTradeTimestamp", "")).strip()
if not timestamp:
raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp")
trade_timezone = config["report"].get("timezone_trade", "America/New_York")
trade_dt = parse_trade_datetime(timestamp, trade_timezone)
validate_complete_regular_close(trade_dt, market_status, trade_timezone)
trade_date = trade_dt.date().isoformat()
logger.info(
"NDX 字段解析结果: tradeDate=%s marketStatus=%s close=%s change=%s changePercent=%s previousClose=%s",
trade_date,
market_status,
money(close),
signed(change),
signed(change_percent),
money(previous_close),
)
calculated_change = close - previous_close
calculated_percent = (calculated_change / previous_close) * Decimal("100")
logger.info(
"previous close 复算结果: change=%s changePercent=%s",
signed(calculated_change),
signed(calculated_percent),
)
if abs(calculated_change - change) > Decimal("0.05"):
raise ValueError("Nasdaq 点数涨跌与 previous close 复算不一致")
if abs(calculated_percent - change_percent) > Decimal("0.01"):
raise ValueError("Nasdaq 涨跌幅与 previous close 复算不一致")
logger.info("涨跌幅一致性校验结果: 通过")
return MarketData(trade_date, close, change, change_percent, previous_close, "nasdaq_quote")
def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData:
url = str(config["nasdaq"]["fred_csv"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("FRED CSV 请求开始")
text = http_text(
"GET",
url,
timeout=timeout,
headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig")
rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}]
if len(rows) < 2:
raise ValueError("FRED CSV 有效数据不足")
previous = rows[-2]
latest = rows[-1]
close = parse_decimal(latest["NASDAQ100"])
previous_close = parse_decimal(previous["NASDAQ100"])
change = close - previous_close
change_percent = (change / previous_close) * Decimal("100")
trade_date = latest["observation_date"]
logger.info(
"FRED 数据解析结果: tradeDate=%s close=%s change=%s changePercent=%s",
trade_date,
money(close),
signed(change),
signed(change_percent),
)
return MarketData(trade_date, close, change, change_percent, previous_close, "fred_csv")
def parse_trade_date(timestamp: str, timezone_name: str) -> str:
return parse_trade_datetime(timestamp, timezone_name).date().isoformat()
def should_replay_report(
delivery_now: datetime,
deliveries: dict[str, Any],
enabled: bool,
today_is_workday: bool,
yesterday_is_workday: bool,
delivery_hour: int,
) -> bool:
delivery_date = delivery_now.date()
return (
enabled
and delivery_now.hour >= delivery_hour
and today_is_workday
and not yesterday_is_workday
and delivery_date.isoformat() not in deliveries
)
def next_delivery_time_after_close(
trade_date: str,
trade_tz: ZoneInfo | timezone,
delivery_tz: ZoneInfo | timezone,
delivery_hour: int,
) -> datetime:
trade_day = datetime.strptime(trade_date, "%Y-%m-%d").date()
close_at = datetime.combine(trade_day, datetime_time(16, 0), tzinfo=trade_tz)
close_in_delivery_tz = close_at.astimezone(delivery_tz)
delivery_at = datetime.combine(
close_in_delivery_tz.date(),
datetime_time(delivery_hour, 0),
tzinfo=delivery_tz,
)
if delivery_at <= close_in_delivery_tz:
delivery_at += timedelta(days=1)
return delivery_at
def get_china_workday_pair(
config: dict[str, Any],
delivery_now: datetime,
logger: logging.Logger,
) -> tuple[bool, bool]:
report_config = config["report"]
base_url = str(
report_config.get("holiday_api_v2", "https://api.haoshenqi.top/holiday")
).rstrip("/")
timeout = int(report_config.get("holiday_api_timeout_seconds", 5))
try:
today = fetch_holiday_v2_status(config, f"{base_url}/today", timeout)
yesterday = fetch_holiday_v2_status(config, f"{base_url}/yesterday", timeout)
logger.info(
"中国工作日 API v2 获取成功: today=%s, yesterday=%s",
today,
yesterday,
)
return today, yesterday
except Exception as exc:
today = delivery_now.date()
yesterday = today - timedelta(days=1)
logger.warning(
"中国工作日 API v2 获取失败,回退到周一至周五规则: %s",
summarize_error(exc),
)
return today.weekday() < 5, yesterday.weekday() < 5
def fetch_holiday_v2_status(
config: dict[str, Any],
url: str,
timeout: int,
) -> bool:
text = http_text(
"GET",
url,
timeout=timeout,
headers={"Accept": "text/plain"},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig").strip()
if text == "工作":
return True
if text == "休息":
return False
raise ValueError(f"节假日 API v2 返回未知状态: {text[:50]!r}")
def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime:
cleaned = timestamp.replace("ET", "").strip()
tz = get_timezone(timezone_name)
formats = [
("%b %d, %Y %I:%M %p", False),
("%m/%d/%Y %I:%M %p", False),
("%Y-%m-%d %H:%M:%S", False),
("%b %d, %Y", True),
("%m/%d/%Y", True),
("%Y-%m-%d", True),
]
for fmt, date_only in formats:
try:
dt = datetime.strptime(cleaned, fmt)
if date_only:
# Nasdaq occasionally returns only the date for an index close.
dt = datetime.combine(dt.date(), datetime_time(16, 0))
return dt.replace(tzinfo=tz)
except ValueError:
continue
try:
dt = datetime.fromisoformat(cleaned)
if dt.tzinfo is None:
return dt.replace(tzinfo=tz)
return dt.astimezone(tz)
except ValueError as exc:
raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc
def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None:
tz = get_timezone(timezone_name)
now = datetime.now(tz)
close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz)
if market_status.lower() == "open":
raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据")
if trade_dt > now:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 晚于当前交易时区时间 {now.isoformat()}"
)
if trade_dt < close_dt:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET"
)
def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone:
try:
return ZoneInfo(timezone_name)
except ZoneInfoNotFoundError:
fallback_offsets = {
"Asia/Shanghai": 8,
# NDX trade date is parsed from Nasdaq's ET timestamp. A fixed EST
# fallback is enough to preserve the date without requiring tzdata.
"America/New_York": -5,
}
if timezone_name not in fallback_offsets:
raise
if logger:
logger.warning("系统缺少 tzdata,使用固定时区兜底: %s", timezone_name)
return timezone(timedelta(hours=fallback_offsets[timezone_name]), timezone_name)
def load_state(config: dict[str, Any], logger: logging.Logger) -> dict[str, Any]:
state_file = resolve_path(config["paths"]["state_file"])
if not state_file.exists() or state_file.stat().st_size == 0:
logger.info("state 文件不存在或为空: %s", state_file)
return {"deliveries": {}}
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("state root is not object")
data.setdefault("deliveries", {})
logger.info("state 文件加载成功: %s", state_file)
return data
except Exception as exc:
logger.warning("state 文件加载失败,将修复为初始状态: %s", summarize_error(exc))
return {"deliveries": {}}
def initialize_state(
config: dict[str, Any],
state: dict[str, Any],
market_data: MarketData,
now: datetime,
logger: logging.Logger,
) -> None:
state["lastNewDataReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now)
state["deliveries"] = {}
save_state(config, state, logger)
logger.info("state 文件初始化成功")
def update_success_state(
state: dict[str, Any],
market_data: MarketData,
now: datetime,
scheduled_close_report: bool = False,
) -> None:
beijing_date = now.date().isoformat()
state["lastNewDataReportedDate"] = market_data.trade_date
if scheduled_close_report:
state["lastScheduledCloseReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now)
state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date
def save_state(config: dict[str, Any], state: dict[str, Any], logger: logging.Logger) -> None:
state_file = resolve_path(config["paths"]["state_file"])
state_file.parent.mkdir(parents=True, exist_ok=True)
state_file.write_text(json.dumps(state, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
logger.info("state 保存成功: %s", state_file)
def build_report(data: MarketData) -> str:
if data.change > 0:
direction = "上涨"
elif data.change < 0:
direction = "下跌"
else:
direction = "基本持平"
return (
f"{data.trade_date}(美国东部时间)Nasdaq-100 Index 收盘\n\n"
f"- 收盘:{money(data.close)}\n"
f"- 涨跌:{signed(data.change)}\n"
f"- 涨跌幅:{signed(data.change_percent)}%\n\n"
f"一句简短总结:相比前一交易日,Nasdaq-100 今日{direction}。"
)
def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Logger) -> None:
qqbot = config["qqbot"]
token = get_qqbot_token(config, logger)
target_type = str(qqbot.get("target_type", "c2c"))
target_openid = str(qqbot.get("target_openid", "")).strip()
base_api = str(qqbot.get("base_api", "https://api.sgroup.qq.com")).rstrip("/")
if target_type == "group":
url = f"{base_api}/v2/groups/{target_openid}/messages"
else:
url = f"{base_api}/v2/users/{target_openid}/messages"
logger.info("QQBot 投递开始: target_type=%s", target_type)
body = {"content": content, "msg_type": 0}
http_json(
"POST",
url,
body=body,
timeout=15,
headers={"Authorization": f"QQBot {token}"},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
logger.info("QQBot 投递成功")
def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str:
qqbot = config["qqbot"]
url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken"))
appid, appsecret = get_qqbot_credentials(config)
logger.info(
"QQBot token 请求开始: source=%s, appid=%s",
get_qqbot_credential_source(config),
mask_secret(appid),
)
body = {"appId": appid, "clientSecret": appsecret}
retry_count = max(0, int(qqbot.get("token_retry_count", 1)))
for attempt in range(retry_count + 1):
payload = http_json(
"POST",
url,
body=body,
timeout=15,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
token = payload.get("access_token") or payload.get("accessToken")
if token:
logger.info("QQBot access_token 获取成功")
return str(token)
code = payload.get("code", "unknown")
message = payload.get("message") or payload.get("msg") or "unknown error"
if attempt < retry_count:
logger.warning(
"QQBot token 获取失败,将重试: code=%s, message=%s",
code,
message,
)
time.sleep(1)
continue
raise ValueError(f"QQBot token 获取失败: code={code}, message={message}")
raise RuntimeError("QQBot token 获取流程异常结束")
def append_delivery_log(
config: dict[str, Any],
market_data: MarketData,
now: datetime,
delivery_target: str,
status: str,
logger: logging.Logger,
error: Exception | None = None,
) -> None:
log_file = resolve_path(config["paths"]["delivery_log_file"])
log_file.parent.mkdir(parents=True, exist_ok=True)
record = {
"beijingDate": now.date().isoformat(),
"tradeDate": market_data.trade_date,
"generatedAt": now.isoformat(),
"close": money(market_data.close),
"change": signed(market_data.change),
"changePercent": signed(market_data.change_percent),
"deliveryTarget": delivery_target,
"status": status,
}
if error is not None:
record["error"] = summarize_error(error)
with log_file.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
logger.info("delivery log 追加成功: status=%s", status)
def http_json(
method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 15,
headers: dict[str, str] | None = None,
proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> dict[str, Any]:
text = http_text(
method,
url,
body=body,
timeout=timeout,
headers=headers,
proxy_config=proxy_config,
proxy_authorization=proxy_authorization,
).decode("utf-8")
parsed = json.loads(text)
if not isinstance(parsed, dict):
raise ValueError("JSON response root is not object")
return parsed
def http_text(
method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 15,
headers: dict[str, str] | None = None,
proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> bytes:
proxy_config = proxy_config or {}
if should_use_curl(proxy_config):
return curl_request(method, url, body, timeout, headers, proxy_config)
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
if proxy_authorization:
request_headers["Proxy-Authorization"] = proxy_authorization
data = None
if body is not None:
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
request_headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=request_headers, method=method)
opener = build_url_opener(proxy_config or {})
try:
with opener.open(request, timeout=timeout) as response:
return response.read()
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
def curl_request(
method: str,
url: str,
body: dict[str, Any] | None,
timeout: int,
headers: dict[str, str] | None,
proxy_config: dict[str, Any],
) -> bytes:
curl = shutil.which("curl.exe") or shutil.which("curl")
if not curl:
raise RuntimeError("network.transport 需要 curl,但当前环境未找到 curl")
command = [
curl,
"--silent",
"--show-error",
"--fail-with-body",
"--max-time",
str(timeout),
"--request",
method,
]
if bool(proxy_config.get("curl_http1_1", True)):
# Some HTTPS proxies intermittently fail while tunnelling HTTP/2.
command.append("--http1.1")
retry_count = max(0, int(proxy_config.get("curl_retry_count", 2)))
if retry_count and method.upper() in {"GET", "HEAD"}:
command.extend(
[
"--retry",
str(retry_count),
"--retry-delay",
str(max(0, int(proxy_config.get("curl_retry_delay_seconds", 1)))),
"--retry-all-errors",
]
)
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
command.extend(["--proxy", proxy_url])
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
for key, value in request_headers.items():
command.extend(["--header", f"{key}: {value}"])
input_bytes = None
if body is not None:
input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8")
if not any(key.lower() == "content-type" for key in request_headers):
command.extend(["--header", "Content-Type: application/json"])
command.extend(["--data-binary", "@-"])
command.append(url)
completed = subprocess.run(
command,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if completed.returncode != 0:
error = completed.stderr.decode("utf-8", errors="replace").strip()
if not error:
error = completed.stdout.decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"curl request failed: {error[:300]}")
return completed.stdout
def should_use_curl(proxy_config: dict[str, Any]) -> bool:
transport = str(proxy_config.get("transport", "auto")).strip().lower()
proxy_url = str(proxy_config.get("proxy_url", "")).strip().lower()
if transport == "curl":
return True
if transport == "urllib":
return False
if proxy_url.startswith("https://"):
return True
return bool(proxy_config.get("use_environment_proxy", True)) and has_environment_proxy()
def has_environment_proxy() -> bool:
proxies = urllib.request.getproxies()
return any(key.lower() in {"http", "https", "all"} and value for key, value in proxies.items())
def build_url_opener(proxy_config: dict[str, Any]) -> urllib.request.OpenerDirector:
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
return urllib.request.build_opener(
urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url})
)
if bool(proxy_config.get("use_environment_proxy", False)):
return urllib.request.build_opener()
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
def get_proxy_config(config: dict[str, Any]) -> dict[str, Any]:
network = config.get("network", {})
if not isinstance(network, dict):
return {}
return network
def proxy_authorization_header(config: dict[str, Any]) -> str | None:
proxy_url = str(get_proxy_config(config).get("proxy_url", "")).strip()
if not proxy_url:
return None
parsed = urllib.parse.urlsplit(proxy_url)
if not parsed.username:
return None
username = urllib.parse.unquote(parsed.username)
password = urllib.parse.unquote(parsed.password or "")
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
return f"Basic {token}"
def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]:
return {
"tradeDate": data.trade_date,
"close": money(data.close),
"change": signed(data.change),
"changePercent": signed(data.change_percent),
"updatedAt": now.isoformat(),
"source": data.source,
}
def save_target_openid(path: Path, openid: str) -> None:
lines = path.read_text(encoding="utf-8").splitlines()
in_qqbot = False
updated = False
new_lines: list[str] = []
for line in lines:
if line.startswith("qqbot:"):
in_qqbot = True
elif line and not line.startswith(" ") and not line.startswith("#"):
in_qqbot = False
if in_qqbot and re.match(r"\s*target_openid\s*:", line):
indent = line[: len(line) - len(line.lstrip())]
new_lines.append(f'{indent}target_openid: "{openid}"')
updated = True
else:
new_lines.append(line)
if not updated:
new_lines.append(f' target_openid: "{openid}"')
path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
def resolve_path(value: str) -> Path:
path = Path(str(value))
if path.is_absolute():
return path
return (SCRIPT_DIR / path).resolve()
def parse_decimal(value: Any) -> Decimal:
if value is None:
raise ValueError("缺少数值字段")
cleaned = str(value).replace("$", "").replace(",", "").replace("%", "").replace("+", "").strip()
try:
return Decimal(cleaned)
except InvalidOperation as exc:
raise ValueError(f"无法解析数值: {value}") from exc
def money(value: Decimal) -> str:
return f"{value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP):,}"
def signed(value: Decimal) -> str:
rounded = value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
prefix = "+" if rounded > 0 else ""
return f"{prefix}{rounded:,}"
def mask_secret(value: str) -> str:
if not value:
return ""
if len(value) <= 8:
return "****"
return f"{value[:4]}****{value[-4:]}"
def summarize_error(exc: Exception) -> str:
text = str(exc).replace("\n", " ").strip()
return text[:300] if text else exc.__class__.__name__
if __name__ == "__main__":
try:
main()
except Exception:
sys.exit(0)