From 0e2863ed475dca859a4a500eb448df5f286d8fce Mon Sep 17 00:00:00 2001 From: wangwei0518 <1329996666@qq.com> Date: Fri, 5 Jun 2026 23:29:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20NDX=20=E7=BE=8E=E8=82=A1?= =?UTF-8?q?=E6=94=B6=E7=9B=98=E6=97=A5=E6=8A=A5=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现 Nasdaq-100 Index 收盘日报的定时脚本,支持从 Nasdaq quote API 获取并校验最新完整交易日数据,在失败时回退到 FRED CSV 数据源。 新增 QQBot 主动投递流程,包含 access token 获取、c2c/group 消息发送、target_openid 自动发现、本地运行日志、投递 JSONL 记录和 state 去重逻辑。 提供脱敏的 config.example.yaml 作为配置模板,真实 config.yaml、运行状态和投递记录保留在本地,不纳入本次提交。 --- NdxDailyReport/config.example.yaml | 32 ++ NdxDailyReport/ndx_daily_report.py | 586 +++++++++++++++++++++++++++++ 2 files changed, 618 insertions(+) create mode 100644 NdxDailyReport/config.example.yaml create mode 100644 NdxDailyReport/ndx_daily_report.py diff --git a/NdxDailyReport/config.example.yaml b/NdxDailyReport/config.example.yaml new file mode 100644 index 0000000..1ce113f --- /dev/null +++ b/NdxDailyReport/config.example.yaml @@ -0,0 +1,32 @@ +paths: + state_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-state.json + delivery_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-delivery-log.jsonl + runtime_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-runtime.log + +nasdaq: + quote_api: https://api.nasdaq.com/api/quote/NDX/info?assetclass=index + fred_csv: https://fred.stlouisfed.org/graph/fredgraph.csv?id=NASDAQ100 + request_timeout_seconds: 15 + +qqbot: + appid: "用户填写" + appkey: "用户填写" + token_api: https://bots.qq.com/app/getAppAccessToken + base_api: https://api.sgroup.qq.com + + # c2c 表示私聊,group 表示群聊 + target_type: "c2c" + + # 可选。为空时脚本会尝试从本地 QQBot/Hermes 日志中自动发现。 + target_openid: "" + + # 自动发现 openid 时的等待时间 + auto_discover_timeout_seconds: 60 + + # 自动发现成功后是否写回 config.yaml + save_discovered_target: true + +report: + timezone_trade: America/New_York + timezone_delivery: Asia/Shanghai + replay_on_weekdays: true diff --git a/NdxDailyReport/ndx_daily_report.py b/NdxDailyReport/ndx_daily_report.py new file mode 100644 index 0000000..85efb43 --- /dev/null +++ b/NdxDailyReport/ndx_daily_report.py @@ -0,0 +1,586 @@ +#!/usr/bin/env python3 +"""Fetch the latest complete NDX close and actively deliver a QQBot daily report.""" + +from __future__ import annotations + +import csv +import json +import logging +import os +import re +import sys +import time +import urllib.error +import urllib.request +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from decimal import Decimal, InvalidOperation, ROUND_HALF_UP +from pathlib import Path +from typing import Any +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + + +SCRIPT_DIR = Path(__file__).resolve().parent +CONFIG_FILE = SCRIPT_DIR / "config.yaml" +DEFAULT_HEADERS = { + "Accept": "application/json,text/plain,*/*", + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36" + ), +} + + +class SilentExit(Exception): + """Expected silent stop after writing runtime log.""" + + +@dataclass(frozen=True) +class MarketData: + trade_date: str + close: Decimal + change: Decimal + change_percent: Decimal + previous_close: Decimal | None + source: str + + +def main() -> None: + config = load_config(CONFIG_FILE) + logger = setup_logging(config) + logger.info("脚本启动") + logger.info("config 加载成功: %s", CONFIG_FILE) + + try: + validate_qqbot_config(config, logger) + ensure_target_openid(config, logger) + market_data = fetch_market_data(config, logger) + state = load_state(config, logger) + + delivery_tz = get_timezone(config["report"].get("timezone_delivery", "Asia/Shanghai"), logger) + beijing_now = datetime.now(delivery_tz) + beijing_date = beijing_now.date().isoformat() + last_reported = state.get("lastNewDataReportedDate") + logger.info( + "当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s", + beijing_date, + market_data.trade_date, + last_reported, + ) + + if not last_reported: + initialize_state(config, state, market_data, beijing_now, logger) + raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state") + + need_new_data_report = market_data.trade_date > str(last_reported) + replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True)) + need_replay = ( + market_data.trade_date == str(last_reported) + and replay_on_weekdays + and beijing_now.weekday() < 5 + and beijing_date not in state.get("deliveries", {}) + ) + logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay) + + if not need_new_data_report and not need_replay: + raise SilentExit("不满足推送条件") + + report_text = build_report(market_data) + logger.info("日报正文生成成功") + + target_type = config["qqbot"].get("target_type", "c2c") + target_openid = config["qqbot"].get("target_openid", "") + delivery_target = f"qqbot:{target_type}:{target_openid}" + try: + send_qqbot_message(config, report_text, logger) + except Exception as exc: + logger.error("QQBot 投递失败: %s", summarize_error(exc)) + append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc) + raise SilentExit("QQBot 投递失败") + + update_success_state(state, market_data, beijing_now) + append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger) + save_state(config, state, logger) + logger.info("QQBot 投递成功,state 更新成功") + raise SilentExit("投递完成") + except SilentExit as exc: + logger.info("脚本静默结束原因: %s", exc) + except Exception as exc: + logger.exception("脚本静默结束原因: 未处理异常: %s", summarize_error(exc)) + + +def load_config(path: Path) -> dict[str, Any]: + text = path.read_text(encoding="utf-8") + try: + import yaml # type: ignore + + data = yaml.safe_load(text) + if not isinstance(data, dict): + raise ValueError("config.yaml root must be a mapping") + return data + except ModuleNotFoundError: + return parse_simple_yaml(text) + + +def parse_simple_yaml(text: str) -> dict[str, Any]: + root: dict[str, Any] = {} + current: dict[str, Any] | None = None + for raw_line in text.splitlines(): + line = raw_line.split("#", 1)[0].rstrip() + if not line.strip(): + continue + if not line.startswith(" "): + key = line.rstrip(":").strip() + root[key] = {} + current = root[key] + continue + if current is None or ":" not in line: + continue + key, value = line.strip().split(":", 1) + current[key.strip()] = parse_scalar(value.strip()) + return root + + +def parse_scalar(value: str) -> Any: + if value in {"true", "True"}: + return True + if value in {"false", "False"}: + return False + if value in {'""', "''"}: + return "" + if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): + return value[1:-1] + try: + return int(value) + except ValueError: + return value + + +def setup_logging(config: dict[str, Any]) -> logging.Logger: + log_file = resolve_path(config["paths"]["runtime_log_file"]) + log_file.parent.mkdir(parents=True, exist_ok=True) + logger = logging.getLogger("ndx_daily_report") + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + handler = logging.FileHandler(log_file, encoding="utf-8") + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s")) + logger.addHandler(handler) + return logger + + +def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None: + qqbot = config.get("qqbot", {}) + appid = str(qqbot.get("appid", "")).strip() + appkey = str(qqbot.get("appkey", "")).strip() + target_type = str(qqbot.get("target_type", "c2c")).strip() + logger.info( + "QQBot 配置检查: appid=%s, appkey=%s, target_type=%s", + mask_secret(appid), + mask_secret(appkey), + target_type, + ) + if not appid or appid == "用户填写": + raise SilentExit("QQBot AppID 未配置") + if not appkey or appkey == "用户填写": + raise SilentExit("QQBot AppKey 未配置") + if target_type not in {"c2c", "group"}: + raise SilentExit("QQBot target_type 必须为 c2c 或 group") + + +def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None: + qqbot = config["qqbot"] + target_openid = str(qqbot.get("target_openid", "")).strip() + logger.info("target_openid 是否存在: %s", bool(target_openid)) + if target_openid: + return + + logger.info("target_openid 自动发现开始") + discovered = discover_openid(config, logger) + if not discovered: + raise SilentExit( + "target_openid 自动发现失败;请先给 QQBot 发一条私聊消息,或在目标群里 @ 机器人,然后重新运行脚本" + ) + + qqbot["target_openid"] = discovered + if bool(qqbot.get("save_discovered_target", True)): + save_target_openid(CONFIG_FILE, discovered) + logger.info("已自动发现 QQBot target_openid,并写回 config.yaml") + raise SilentExit("target_openid 自动发现成功,本次只完成配置初始化") + + +def discover_openid(config: dict[str, Any], logger: logging.Logger) -> str | None: + target_type = str(config["qqbot"].get("target_type", "c2c")) + field = "group_openid" if target_type == "group" else "user_openid" + patterns = [ + re.compile(rf'"{field}"\s*:\s*"([^"]+)"'), + re.compile(rf"{field}\s*[:=]\s*([A-Za-z0-9_-]+)"), + ] + candidates = [ + Path(os.environ.get("HERMES_HOME", "")) / "logs", + Path(os.environ.get("HERMES_HOME", "")) / "data", + Path.home() / ".hermes", + Path.home() / ".qqbot", + SCRIPT_DIR, + ] + deadline = time.time() + int(config["qqbot"].get("auto_discover_timeout_seconds", 60)) + while time.time() <= deadline: + for base in candidates: + if not base or not base.exists(): + continue + for path in list_recent_text_files(base): + try: + text = path.read_text(encoding="utf-8", errors="ignore") + except OSError: + continue + for pattern in patterns: + match = pattern.search(text) + if match: + logger.info("target_openid 自动发现成功: source=%s", path) + return match.group(1) + time.sleep(2) + logger.warning("target_openid 自动发现失败: 未找到本地事件缓存或日志字段 %s", field) + return None + + +def list_recent_text_files(base: Path) -> list[Path]: + suffixes = {".log", ".json", ".jsonl", ".txt"} + try: + paths = [p for p in base.rglob("*") if p.is_file() and p.suffix.lower() in suffixes] + except OSError: + return [] + paths.sort(key=lambda p: p.stat().st_mtime, reverse=True) + return paths[:50] + + +def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData: + try: + return fetch_nasdaq_quote(config, logger) + except Exception as exc: + logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc)) + return fetch_fred_csv(config, logger) + + +def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> MarketData: + url = str(config["nasdaq"]["quote_api"]) + timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) + logger.info("Nasdaq quote API 请求开始") + payload = http_json("GET", url, timeout=timeout) + data = payload.get("data", {}) + primary = data.get("primaryData", {}) + key_stats = data.get("keyStats", {}) + + close = parse_decimal(primary.get("lastSalePrice")) + change = parse_decimal(primary.get("netChange")) + change_percent = parse_decimal(primary.get("percentageChange")) + previous_close = parse_decimal(key_stats.get("previousclose", {}).get("value")) + timestamp = str(primary.get("lastTradeTimestamp", "")).strip() + if not timestamp: + raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp") + + trade_date = parse_trade_date(timestamp, config["report"].get("timezone_trade", "America/New_York")) + logger.info( + "NDX 字段解析结果: tradeDate=%s close=%s change=%s changePercent=%s previousClose=%s", + trade_date, + money(close), + signed(change), + signed(change_percent), + money(previous_close), + ) + + calculated_change = close - previous_close + calculated_percent = (calculated_change / previous_close) * Decimal("100") + logger.info( + "previous close 复算结果: change=%s changePercent=%s", + signed(calculated_change), + signed(calculated_percent), + ) + if abs(calculated_change - change) > Decimal("0.05"): + raise ValueError("Nasdaq 点数涨跌与 previous close 复算不一致") + if abs(calculated_percent - change_percent) > Decimal("0.01"): + raise ValueError("Nasdaq 涨跌幅与 previous close 复算不一致") + logger.info("涨跌幅一致性校验结果: 通过") + + return MarketData(trade_date, close, change, change_percent, previous_close, "nasdaq_quote") + + +def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData: + url = str(config["nasdaq"]["fred_csv"]) + timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) + logger.info("FRED CSV 请求开始") + request = urllib.request.Request(url, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}) + with urllib.request.urlopen(request, timeout=timeout) as response: + text = response.read().decode("utf-8-sig") + rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}] + if len(rows) < 2: + raise ValueError("FRED CSV 有效数据不足") + previous = rows[-2] + latest = rows[-1] + close = parse_decimal(latest["NASDAQ100"]) + previous_close = parse_decimal(previous["NASDAQ100"]) + change = close - previous_close + change_percent = (change / previous_close) * Decimal("100") + trade_date = latest["observation_date"] + logger.info( + "FRED 数据解析结果: tradeDate=%s close=%s change=%s changePercent=%s", + trade_date, + money(close), + signed(change), + signed(change_percent), + ) + return MarketData(trade_date, close, change, change_percent, previous_close, "fred_csv") + + +def parse_trade_date(timestamp: str, timezone_name: str) -> str: + cleaned = timestamp.replace("ET", "").strip() + formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"] + tz = get_timezone(timezone_name) + for fmt in formats: + try: + dt = datetime.strptime(cleaned, fmt) + return dt.replace(tzinfo=tz).date().isoformat() + except ValueError: + continue + try: + return datetime.fromisoformat(cleaned).astimezone(tz).date().isoformat() + except ValueError as exc: + raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc + + +def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone: + try: + return ZoneInfo(timezone_name) + except ZoneInfoNotFoundError: + fallback_offsets = { + "Asia/Shanghai": 8, + # NDX trade date is parsed from Nasdaq's ET timestamp. A fixed EST + # fallback is enough to preserve the date without requiring tzdata. + "America/New_York": -5, + } + if timezone_name not in fallback_offsets: + raise + if logger: + logger.warning("系统缺少 tzdata,使用固定时区兜底: %s", timezone_name) + return timezone(timedelta(hours=fallback_offsets[timezone_name]), timezone_name) + + +def load_state(config: dict[str, Any], logger: logging.Logger) -> dict[str, Any]: + state_file = resolve_path(config["paths"]["state_file"]) + if not state_file.exists() or state_file.stat().st_size == 0: + logger.info("state 文件不存在或为空: %s", state_file) + return {"deliveries": {}} + try: + data = json.loads(state_file.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError("state root is not object") + data.setdefault("deliveries", {}) + logger.info("state 文件加载成功: %s", state_file) + return data + except Exception as exc: + logger.warning("state 文件加载失败,将修复为初始状态: %s", summarize_error(exc)) + return {"deliveries": {}} + + +def initialize_state( + config: dict[str, Any], + state: dict[str, Any], + market_data: MarketData, + now: datetime, + logger: logging.Logger, +) -> None: + state["lastNewDataReportedDate"] = market_data.trade_date + state["lastData"] = serialize_market_data(market_data, now) + state["deliveries"] = {} + save_state(config, state, logger) + logger.info("state 文件初始化成功") + + +def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None: + beijing_date = now.date().isoformat() + state["lastNewDataReportedDate"] = market_data.trade_date + state["lastData"] = serialize_market_data(market_data, now) + state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date + + +def save_state(config: dict[str, Any], state: dict[str, Any], logger: logging.Logger) -> None: + state_file = resolve_path(config["paths"]["state_file"]) + state_file.parent.mkdir(parents=True, exist_ok=True) + state_file.write_text(json.dumps(state, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + logger.info("state 保存成功: %s", state_file) + + +def build_report(data: MarketData) -> str: + if data.change > 0: + direction = "上涨" + elif data.change < 0: + direction = "下跌" + else: + direction = "基本持平" + return ( + f"{data.trade_date}(美国东部时间)Nasdaq-100 Index 收盘\n\n" + f"- 收盘:{money(data.close)}\n" + f"- 涨跌:{signed(data.change)} 点\n" + f"- 涨跌幅:{signed(data.change_percent)}%\n\n" + f"一句简短总结:相比前一交易日,Nasdaq-100 今日{direction}。" + ) + + +def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Logger) -> None: + qqbot = config["qqbot"] + token = get_qqbot_token(config, logger) + target_type = str(qqbot.get("target_type", "c2c")) + target_openid = str(qqbot.get("target_openid", "")).strip() + base_api = str(qqbot.get("base_api", "https://api.sgroup.qq.com")).rstrip("/") + if target_type == "group": + url = f"{base_api}/v2/groups/{target_openid}/messages" + else: + url = f"{base_api}/v2/users/{target_openid}/messages" + logger.info("QQBot 投递开始: target_type=%s", target_type) + body = {"content": content, "msg_type": 0} + http_json("POST", url, body=body, timeout=15, headers={"Authorization": f"QQBot {token}"}) + logger.info("QQBot 投递成功") + + +def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str: + qqbot = config["qqbot"] + url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) + body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} + payload = http_json("POST", url, body=body, timeout=15) + token = payload.get("access_token") or payload.get("accessToken") + if not token: + raise ValueError("QQBot token 响应缺少 access_token") + logger.info("QQBot access_token 获取成功") + return str(token) + + +def append_delivery_log( + config: dict[str, Any], + market_data: MarketData, + now: datetime, + delivery_target: str, + status: str, + logger: logging.Logger, + error: Exception | None = None, +) -> None: + log_file = resolve_path(config["paths"]["delivery_log_file"]) + log_file.parent.mkdir(parents=True, exist_ok=True) + record = { + "beijingDate": now.date().isoformat(), + "tradeDate": market_data.trade_date, + "generatedAt": now.isoformat(), + "close": money(market_data.close), + "change": signed(market_data.change), + "changePercent": signed(market_data.change_percent), + "deliveryTarget": delivery_target, + "status": status, + } + if error is not None: + record["error"] = summarize_error(error) + with log_file.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(record, ensure_ascii=False) + "\n") + logger.info("delivery log 追加成功: status=%s", status) + + +def http_json( + method: str, + url: str, + body: dict[str, Any] | None = None, + timeout: int = 15, + headers: dict[str, str] | None = None, +) -> dict[str, Any]: + request_headers = dict(DEFAULT_HEADERS) + if headers: + request_headers.update(headers) + data = None + if body is not None: + data = json.dumps(body, ensure_ascii=False).encode("utf-8") + request_headers["Content-Type"] = "application/json" + request = urllib.request.Request(url, data=data, headers=request_headers, method=method) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + text = response.read().decode("utf-8") + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace")[:300] + raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc + parsed = json.loads(text) + if not isinstance(parsed, dict): + raise ValueError("JSON response root is not object") + return parsed + + +def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: + return { + "tradeDate": data.trade_date, + "close": money(data.close), + "change": signed(data.change), + "changePercent": signed(data.change_percent), + "updatedAt": now.isoformat(), + "source": data.source, + } + + +def save_target_openid(path: Path, openid: str) -> None: + lines = path.read_text(encoding="utf-8").splitlines() + in_qqbot = False + updated = False + new_lines: list[str] = [] + for line in lines: + if line.startswith("qqbot:"): + in_qqbot = True + elif line and not line.startswith(" ") and not line.startswith("#"): + in_qqbot = False + if in_qqbot and re.match(r"\s*target_openid\s*:", line): + indent = line[: len(line) - len(line.lstrip())] + new_lines.append(f'{indent}target_openid: "{openid}"') + updated = True + else: + new_lines.append(line) + if not updated: + new_lines.append(f' target_openid: "{openid}"') + path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") + + +def resolve_path(value: str) -> Path: + path = Path(str(value)) + if path.is_absolute(): + return path + return (SCRIPT_DIR / path).resolve() + + +def parse_decimal(value: Any) -> Decimal: + if value is None: + raise ValueError("缺少数值字段") + cleaned = str(value).replace("$", "").replace(",", "").replace("%", "").replace("+", "").strip() + try: + return Decimal(cleaned) + except InvalidOperation as exc: + raise ValueError(f"无法解析数值: {value}") from exc + + +def money(value: Decimal) -> str: + return f"{value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP):,}" + + +def signed(value: Decimal) -> str: + rounded = value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) + prefix = "+" if rounded > 0 else "" + return f"{prefix}{rounded:,}" + + +def mask_secret(value: str) -> str: + if not value: + return "" + if len(value) <= 8: + return "****" + return f"{value[:4]}****{value[-4:]}" + + +def summarize_error(exc: Exception) -> str: + text = str(exc).replace("\n", " ").strip() + return text[:300] if text else exc.__class__.__name__ + + +if __name__ == "__main__": + try: + main() + except Exception: + sys.exit(0)