新增 NDX 美股收盘日报脚本

实现 Nasdaq-100 Index 收盘日报的定时脚本,支持从 Nasdaq quote API 获取并校验最新完整交易日数据,在失败时回退到 FRED CSV 数据源。

新增 QQBot 主动投递流程,包含 access token 获取、c2c/group 消息发送、target_openid 自动发现、本地运行日志、投递 JSONL 记录和 state 去重逻辑。

提供脱敏的 config.example.yaml 作为配置模板,真实 config.yaml、运行状态和投递记录保留在本地,不纳入本次提交。
This commit is contained in:
2026-06-05 23:29:38 +08:00
parent e8a2c0d620
commit 0e2863ed47
2 changed files with 618 additions and 0 deletions
+32
View File
@@ -0,0 +1,32 @@
paths:
state_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-state.json
delivery_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-delivery-log.jsonl
runtime_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-runtime.log
nasdaq:
quote_api: https://api.nasdaq.com/api/quote/NDX/info?assetclass=index
fred_csv: https://fred.stlouisfed.org/graph/fredgraph.csv?id=NASDAQ100
request_timeout_seconds: 15
qqbot:
appid: "用户填写"
appkey: "用户填写"
token_api: https://bots.qq.com/app/getAppAccessToken
base_api: https://api.sgroup.qq.com
# c2c 表示私聊,group 表示群聊
target_type: "c2c"
# 可选。为空时脚本会尝试从本地 QQBot/Hermes 日志中自动发现。
target_openid: ""
# 自动发现 openid 时的等待时间
auto_discover_timeout_seconds: 60
# 自动发现成功后是否写回 config.yaml
save_discovered_target: true
report:
timezone_trade: America/New_York
timezone_delivery: Asia/Shanghai
replay_on_weekdays: true
+586
View File
@@ -0,0 +1,586 @@
#!/usr/bin/env python3
"""Fetch the latest complete NDX close and actively deliver a QQBot daily report."""
from __future__ import annotations
import csv
import json
import logging
import os
import re
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
SCRIPT_DIR = Path(__file__).resolve().parent
CONFIG_FILE = SCRIPT_DIR / "config.yaml"
DEFAULT_HEADERS = {
"Accept": "application/json,text/plain,*/*",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36"
),
}
class SilentExit(Exception):
"""Expected silent stop after writing runtime log."""
@dataclass(frozen=True)
class MarketData:
trade_date: str
close: Decimal
change: Decimal
change_percent: Decimal
previous_close: Decimal | None
source: str
def main() -> None:
config = load_config(CONFIG_FILE)
logger = setup_logging(config)
logger.info("脚本启动")
logger.info("config 加载成功: %s", CONFIG_FILE)
try:
validate_qqbot_config(config, logger)
ensure_target_openid(config, logger)
market_data = fetch_market_data(config, logger)
state = load_state(config, logger)
delivery_tz = get_timezone(config["report"].get("timezone_delivery", "Asia/Shanghai"), logger)
beijing_now = datetime.now(delivery_tz)
beijing_date = beijing_now.date().isoformat()
last_reported = state.get("lastNewDataReportedDate")
logger.info(
"当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s",
beijing_date,
market_data.trade_date,
last_reported,
)
if not last_reported:
initialize_state(config, state, market_data, beijing_now, logger)
raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state")
need_new_data_report = market_data.trade_date > str(last_reported)
replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True))
need_replay = (
market_data.trade_date == str(last_reported)
and replay_on_weekdays
and beijing_now.weekday() < 5
and beijing_date not in state.get("deliveries", {})
)
logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay)
if not need_new_data_report and not need_replay:
raise SilentExit("不满足推送条件")
report_text = build_report(market_data)
logger.info("日报正文生成成功")
target_type = config["qqbot"].get("target_type", "c2c")
target_openid = config["qqbot"].get("target_openid", "")
delivery_target = f"qqbot:{target_type}:{target_openid}"
try:
send_qqbot_message(config, report_text, logger)
except Exception as exc:
logger.error("QQBot 投递失败: %s", summarize_error(exc))
append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc)
raise SilentExit("QQBot 投递失败")
update_success_state(state, market_data, beijing_now)
append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger)
save_state(config, state, logger)
logger.info("QQBot 投递成功,state 更新成功")
raise SilentExit("投递完成")
except SilentExit as exc:
logger.info("脚本静默结束原因: %s", exc)
except Exception as exc:
logger.exception("脚本静默结束原因: 未处理异常: %s", summarize_error(exc))
def load_config(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
try:
import yaml # type: ignore
data = yaml.safe_load(text)
if not isinstance(data, dict):
raise ValueError("config.yaml root must be a mapping")
return data
except ModuleNotFoundError:
return parse_simple_yaml(text)
def parse_simple_yaml(text: str) -> dict[str, Any]:
root: dict[str, Any] = {}
current: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" "):
key = line.rstrip(":").strip()
root[key] = {}
current = root[key]
continue
if current is None or ":" not in line:
continue
key, value = line.strip().split(":", 1)
current[key.strip()] = parse_scalar(value.strip())
return root
def parse_scalar(value: str) -> Any:
if value in {"true", "True"}:
return True
if value in {"false", "False"}:
return False
if value in {'""', "''"}:
return ""
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
return value[1:-1]
try:
return int(value)
except ValueError:
return value
def setup_logging(config: dict[str, Any]) -> logging.Logger:
log_file = resolve_path(config["paths"]["runtime_log_file"])
log_file.parent.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("ndx_daily_report")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
handler = logging.FileHandler(log_file, encoding="utf-8")
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(handler)
return logger
def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config.get("qqbot", {})
appid = str(qqbot.get("appid", "")).strip()
appkey = str(qqbot.get("appkey", "")).strip()
target_type = str(qqbot.get("target_type", "c2c")).strip()
logger.info(
"QQBot 配置检查: appid=%s, appkey=%s, target_type=%s",
mask_secret(appid),
mask_secret(appkey),
target_type,
)
if not appid or appid == "用户填写":
raise SilentExit("QQBot AppID 未配置")
if not appkey or appkey == "用户填写":
raise SilentExit("QQBot AppKey 未配置")
if target_type not in {"c2c", "group"}:
raise SilentExit("QQBot target_type 必须为 c2c 或 group")
def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config["qqbot"]
target_openid = str(qqbot.get("target_openid", "")).strip()
logger.info("target_openid 是否存在: %s", bool(target_openid))
if target_openid:
return
logger.info("target_openid 自动发现开始")
discovered = discover_openid(config, logger)
if not discovered:
raise SilentExit(
"target_openid 自动发现失败;请先给 QQBot 发一条私聊消息,或在目标群里 @ 机器人,然后重新运行脚本"
)
qqbot["target_openid"] = discovered
if bool(qqbot.get("save_discovered_target", True)):
save_target_openid(CONFIG_FILE, discovered)
logger.info("已自动发现 QQBot target_openid,并写回 config.yaml")
raise SilentExit("target_openid 自动发现成功,本次只完成配置初始化")
def discover_openid(config: dict[str, Any], logger: logging.Logger) -> str | None:
target_type = str(config["qqbot"].get("target_type", "c2c"))
field = "group_openid" if target_type == "group" else "user_openid"
patterns = [
re.compile(rf'"{field}"\s*:\s*"([^"]+)"'),
re.compile(rf"{field}\s*[:=]\s*([A-Za-z0-9_-]+)"),
]
candidates = [
Path(os.environ.get("HERMES_HOME", "")) / "logs",
Path(os.environ.get("HERMES_HOME", "")) / "data",
Path.home() / ".hermes",
Path.home() / ".qqbot",
SCRIPT_DIR,
]
deadline = time.time() + int(config["qqbot"].get("auto_discover_timeout_seconds", 60))
while time.time() <= deadline:
for base in candidates:
if not base or not base.exists():
continue
for path in list_recent_text_files(base):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
for pattern in patterns:
match = pattern.search(text)
if match:
logger.info("target_openid 自动发现成功: source=%s", path)
return match.group(1)
time.sleep(2)
logger.warning("target_openid 自动发现失败: 未找到本地事件缓存或日志字段 %s", field)
return None
def list_recent_text_files(base: Path) -> list[Path]:
suffixes = {".log", ".json", ".jsonl", ".txt"}
try:
paths = [p for p in base.rglob("*") if p.is_file() and p.suffix.lower() in suffixes]
except OSError:
return []
paths.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return paths[:50]
def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData:
try:
return fetch_nasdaq_quote(config, logger)
except Exception as exc:
logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc))
return fetch_fred_csv(config, logger)
def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> MarketData:
url = str(config["nasdaq"]["quote_api"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("Nasdaq quote API 请求开始")
payload = http_json("GET", url, timeout=timeout)
data = payload.get("data", {})
primary = data.get("primaryData", {})
key_stats = data.get("keyStats", {})
close = parse_decimal(primary.get("lastSalePrice"))
change = parse_decimal(primary.get("netChange"))
change_percent = parse_decimal(primary.get("percentageChange"))
previous_close = parse_decimal(key_stats.get("previousclose", {}).get("value"))
timestamp = str(primary.get("lastTradeTimestamp", "")).strip()
if not timestamp:
raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp")
trade_date = parse_trade_date(timestamp, config["report"].get("timezone_trade", "America/New_York"))
logger.info(
"NDX 字段解析结果: tradeDate=%s close=%s change=%s changePercent=%s previousClose=%s",
trade_date,
money(close),
signed(change),
signed(change_percent),
money(previous_close),
)
calculated_change = close - previous_close
calculated_percent = (calculated_change / previous_close) * Decimal("100")
logger.info(
"previous close 复算结果: change=%s changePercent=%s",
signed(calculated_change),
signed(calculated_percent),
)
if abs(calculated_change - change) > Decimal("0.05"):
raise ValueError("Nasdaq 点数涨跌与 previous close 复算不一致")
if abs(calculated_percent - change_percent) > Decimal("0.01"):
raise ValueError("Nasdaq 涨跌幅与 previous close 复算不一致")
logger.info("涨跌幅一致性校验结果: 通过")
return MarketData(trade_date, close, change, change_percent, previous_close, "nasdaq_quote")
def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData:
url = str(config["nasdaq"]["fred_csv"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("FRED CSV 请求开始")
request = urllib.request.Request(url, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]})
with urllib.request.urlopen(request, timeout=timeout) as response:
text = response.read().decode("utf-8-sig")
rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}]
if len(rows) < 2:
raise ValueError("FRED CSV 有效数据不足")
previous = rows[-2]
latest = rows[-1]
close = parse_decimal(latest["NASDAQ100"])
previous_close = parse_decimal(previous["NASDAQ100"])
change = close - previous_close
change_percent = (change / previous_close) * Decimal("100")
trade_date = latest["observation_date"]
logger.info(
"FRED 数据解析结果: tradeDate=%s close=%s change=%s changePercent=%s",
trade_date,
money(close),
signed(change),
signed(change_percent),
)
return MarketData(trade_date, close, change, change_percent, previous_close, "fred_csv")
def parse_trade_date(timestamp: str, timezone_name: str) -> str:
cleaned = timestamp.replace("ET", "").strip()
formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"]
tz = get_timezone(timezone_name)
for fmt in formats:
try:
dt = datetime.strptime(cleaned, fmt)
return dt.replace(tzinfo=tz).date().isoformat()
except ValueError:
continue
try:
return datetime.fromisoformat(cleaned).astimezone(tz).date().isoformat()
except ValueError as exc:
raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc
def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone:
try:
return ZoneInfo(timezone_name)
except ZoneInfoNotFoundError:
fallback_offsets = {
"Asia/Shanghai": 8,
# NDX trade date is parsed from Nasdaq's ET timestamp. A fixed EST
# fallback is enough to preserve the date without requiring tzdata.
"America/New_York": -5,
}
if timezone_name not in fallback_offsets:
raise
if logger:
logger.warning("系统缺少 tzdata,使用固定时区兜底: %s", timezone_name)
return timezone(timedelta(hours=fallback_offsets[timezone_name]), timezone_name)
def load_state(config: dict[str, Any], logger: logging.Logger) -> dict[str, Any]:
state_file = resolve_path(config["paths"]["state_file"])
if not state_file.exists() or state_file.stat().st_size == 0:
logger.info("state 文件不存在或为空: %s", state_file)
return {"deliveries": {}}
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("state root is not object")
data.setdefault("deliveries", {})
logger.info("state 文件加载成功: %s", state_file)
return data
except Exception as exc:
logger.warning("state 文件加载失败,将修复为初始状态: %s", summarize_error(exc))
return {"deliveries": {}}
def initialize_state(
config: dict[str, Any],
state: dict[str, Any],
market_data: MarketData,
now: datetime,
logger: logging.Logger,
) -> None:
state["lastNewDataReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now)
state["deliveries"] = {}
save_state(config, state, logger)
logger.info("state 文件初始化成功")
def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None:
beijing_date = now.date().isoformat()
state["lastNewDataReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now)
state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date
def save_state(config: dict[str, Any], state: dict[str, Any], logger: logging.Logger) -> None:
state_file = resolve_path(config["paths"]["state_file"])
state_file.parent.mkdir(parents=True, exist_ok=True)
state_file.write_text(json.dumps(state, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
logger.info("state 保存成功: %s", state_file)
def build_report(data: MarketData) -> str:
if data.change > 0:
direction = "上涨"
elif data.change < 0:
direction = "下跌"
else:
direction = "基本持平"
return (
f"{data.trade_date}(美国东部时间)Nasdaq-100 Index 收盘\n\n"
f"- 收盘:{money(data.close)}\n"
f"- 涨跌:{signed(data.change)}\n"
f"- 涨跌幅:{signed(data.change_percent)}%\n\n"
f"一句简短总结:相比前一交易日,Nasdaq-100 今日{direction}"
)
def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Logger) -> None:
qqbot = config["qqbot"]
token = get_qqbot_token(config, logger)
target_type = str(qqbot.get("target_type", "c2c"))
target_openid = str(qqbot.get("target_openid", "")).strip()
base_api = str(qqbot.get("base_api", "https://api.sgroup.qq.com")).rstrip("/")
if target_type == "group":
url = f"{base_api}/v2/groups/{target_openid}/messages"
else:
url = f"{base_api}/v2/users/{target_openid}/messages"
logger.info("QQBot 投递开始: target_type=%s", target_type)
body = {"content": content, "msg_type": 0}
http_json("POST", url, body=body, timeout=15, headers={"Authorization": f"QQBot {token}"})
logger.info("QQBot 投递成功")
def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str:
qqbot = config["qqbot"]
url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken"))
body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])}
payload = http_json("POST", url, body=body, timeout=15)
token = payload.get("access_token") or payload.get("accessToken")
if not token:
raise ValueError("QQBot token 响应缺少 access_token")
logger.info("QQBot access_token 获取成功")
return str(token)
def append_delivery_log(
config: dict[str, Any],
market_data: MarketData,
now: datetime,
delivery_target: str,
status: str,
logger: logging.Logger,
error: Exception | None = None,
) -> None:
log_file = resolve_path(config["paths"]["delivery_log_file"])
log_file.parent.mkdir(parents=True, exist_ok=True)
record = {
"beijingDate": now.date().isoformat(),
"tradeDate": market_data.trade_date,
"generatedAt": now.isoformat(),
"close": money(market_data.close),
"change": signed(market_data.change),
"changePercent": signed(market_data.change_percent),
"deliveryTarget": delivery_target,
"status": status,
}
if error is not None:
record["error"] = summarize_error(error)
with log_file.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
logger.info("delivery log 追加成功: status=%s", status)
def http_json(
method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 15,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
data = None
if body is not None:
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
request_headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=request_headers, method=method)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
text = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
parsed = json.loads(text)
if not isinstance(parsed, dict):
raise ValueError("JSON response root is not object")
return parsed
def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]:
return {
"tradeDate": data.trade_date,
"close": money(data.close),
"change": signed(data.change),
"changePercent": signed(data.change_percent),
"updatedAt": now.isoformat(),
"source": data.source,
}
def save_target_openid(path: Path, openid: str) -> None:
lines = path.read_text(encoding="utf-8").splitlines()
in_qqbot = False
updated = False
new_lines: list[str] = []
for line in lines:
if line.startswith("qqbot:"):
in_qqbot = True
elif line and not line.startswith(" ") and not line.startswith("#"):
in_qqbot = False
if in_qqbot and re.match(r"\s*target_openid\s*:", line):
indent = line[: len(line) - len(line.lstrip())]
new_lines.append(f'{indent}target_openid: "{openid}"')
updated = True
else:
new_lines.append(line)
if not updated:
new_lines.append(f' target_openid: "{openid}"')
path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
def resolve_path(value: str) -> Path:
path = Path(str(value))
if path.is_absolute():
return path
return (SCRIPT_DIR / path).resolve()
def parse_decimal(value: Any) -> Decimal:
if value is None:
raise ValueError("缺少数值字段")
cleaned = str(value).replace("$", "").replace(",", "").replace("%", "").replace("+", "").strip()
try:
return Decimal(cleaned)
except InvalidOperation as exc:
raise ValueError(f"无法解析数值: {value}") from exc
def money(value: Decimal) -> str:
return f"{value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP):,}"
def signed(value: Decimal) -> str:
rounded = value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
prefix = "+" if rounded > 0 else ""
return f"{prefix}{rounded:,}"
def mask_secret(value: str) -> str:
if not value:
return ""
if len(value) <= 8:
return "****"
return f"{value[:4]}****{value[-4:]}"
def summarize_error(exc: Exception) -> str:
text = str(exc).replace("\n", " ").strip()
return text[:300] if text else exc.__class__.__name__
if __name__ == "__main__":
try:
main()
except Exception:
sys.exit(0)