Compare commits

...

4 Commits

Author SHA1 Message Date
wangwei0518 e72ec9ffa3 完善 NDX 日报调度与 QQBot 鉴权 2026-06-06 13:36:51 +08:00
wangwei0518 e2af2eff79 修复 NDX 日期解析和网络重试 2026-06-06 13:04:47 +08:00
wangwei0518 792922e06f 修正 NDX 完整收盘判断和代理请求策略
新增 Nasdaq quote API 完整收盘校验:当 marketStatus 仍为 Open,或 lastTradeTimestamp 早于美东常规收盘时间 16:00 时,不再视为完整收盘数据,也不会提前推进 lastNewDataReportedDate。

盘中未完整数据现在会静默结束并写入 runtime log,不再 fallback 到 FRED,避免北京时间周五夜间误记录盘中数据,确保北京时间周六 08:00 能按新数据报告规则推送美东周五收盘。

新增 curl 网络后端和 transport:auto 策略:显式 HTTPS 代理或检测到环境代理时默认使用 curl,绕开 urllib 对代理 CONNECT 的兼容问题;示例配置仅保留代理占位,不提交真实代理账号密码。
2026-06-06 00:53:13 +08:00
wangwei0518 4a592cf457 优化日报脚本网络与路径配置
新增 network.use_environment_proxy 配置项,默认禁用 urllib 对 HTTP_PROXY/HTTPS_PROXY 等环境代理的自动读取,避免代理 CONNECT 连接被重置时影响 Nasdaq、FRED 和 QQBot 请求。

将示例配置中的 state、delivery log 和 runtime log 路径改为相对路径,由脚本按 NdxDailyReport 所在目录解析,方便项目迁移和定时任务部署。

保留真实 config.yaml、state 和投递日志为本地文件,不纳入提交。
2026-06-06 00:26:04 +08:00
3 changed files with 746 additions and 48 deletions
+26 -5
View File
@@ -1,17 +1,32 @@
paths: paths:
state_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-state.json state_file: ndx-daily-report-state.json
delivery_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-delivery-log.jsonl delivery_log_file: ndx-daily-report-delivery-log.jsonl
runtime_log_file: D:/project/Python-tools/NdxDailyReport/ndx-daily-report-runtime.log runtime_log_file: ndx-daily-report-runtime.log
nasdaq: nasdaq:
quote_api: https://api.nasdaq.com/api/quote/NDX/info?assetclass=index quote_api: https://api.nasdaq.com/api/quote/NDX/info?assetclass=index
fred_csv: https://fred.stlouisfed.org/graph/fredgraph.csv?id=NASDAQ100 fred_csv: https://fred.stlouisfed.org/graph/fredgraph.csv?id=NASDAQ100
request_timeout_seconds: 15 request_timeout_seconds: 15
network:
# true 表示允许检测 HTTP_PROXY/HTTPS_PROXY 或系统代理。
# transport: auto 检测到环境代理时会默认改用 curl,避免 urllib 自己处理 CONNECT。
use_environment_proxy: true
# 可选。直连超时时可显式指定代理;真实账号密码不要提交到仓库。
proxy_url: ""
# auto 会在 https 代理或环境代理场景下使用 curl。
transport: auto
# curl 经代理访问时默认使用 HTTP/1.1,并对瞬时网络错误重试 2 次。
curl_http1_1: true
curl_retry_count: 2
curl_retry_delay_seconds: 1
qqbot: qqbot:
appid: "用户填写" appid: "用户填写"
appkey: "用户填写" appsecret: "用户填写"
# 也可通过环境变量 QQBOT_APPID、QQBOT_SECRET 提供,环境变量优先。
token_api: https://bots.qq.com/app/getAppAccessToken token_api: https://bots.qq.com/app/getAppAccessToken
token_retry_count: 1
base_api: https://api.sgroup.qq.com base_api: https://api.sgroup.qq.com
# c2c 表示私聊,group 表示群聊 # c2c 表示私聊,group 表示群聊
@@ -29,4 +44,10 @@ qqbot:
report: report:
timezone_trade: America/New_York timezone_trade: America/New_York
timezone_delivery: Asia/Shanghai timezone_delivery: Asia/Shanghai
replay_on_weekdays: true # 美股收盘后,下一个北京时间 08:00 推送;中国休息日后的首个工作日复述最近收盘。
delivery_hour: 8
replay_after_china_holiday: true
# Haoshenqi0123/holiday v2:实际请求 /today 和 /yesterday。
# 获取失败时回退为周一至周五工作、周六日休息。
holiday_api_v2: https://api.haoshenqi.top/holiday
holiday_api_timeout_seconds: 5
+428 -43
View File
@@ -4,16 +4,20 @@
from __future__ import annotations from __future__ import annotations
import csv import csv
import base64
import json import json
import logging import logging
import os import os
import re import re
import shutil
import subprocess
import sys import sys
import time import time
import urllib.error import urllib.error
import urllib.parse
import urllib.request import urllib.request
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, time as datetime_time, timedelta, timezone
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -35,6 +39,10 @@ class SilentExit(Exception):
"""Expected silent stop after writing runtime log.""" """Expected silent stop after writing runtime log."""
class IncompleteMarketData(ValueError):
"""The upstream data is valid but not a complete regular close yet."""
@dataclass(frozen=True) @dataclass(frozen=True)
class MarketData: class MarketData:
trade_date: str trade_date: str
@@ -61,26 +69,56 @@ def main() -> None:
beijing_now = datetime.now(delivery_tz) beijing_now = datetime.now(delivery_tz)
beijing_date = beijing_now.date().isoformat() beijing_date = beijing_now.date().isoformat()
last_reported = state.get("lastNewDataReportedDate") last_reported = state.get("lastNewDataReportedDate")
last_close_reported = state.get("lastScheduledCloseReportedDate")
logger.info( logger.info(
"当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s", "当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s, "
"lastScheduledCloseReportedDate=%s",
beijing_date, beijing_date,
market_data.trade_date, market_data.trade_date,
last_reported, last_reported,
last_close_reported,
) )
if not last_reported: if not last_reported:
initialize_state(config, state, market_data, beijing_now, logger) initialize_state(config, state, market_data, beijing_now, logger)
raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state")
need_new_data_report = market_data.trade_date > str(last_reported) report_config = config["report"]
replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True)) trade_tz = get_timezone(report_config.get("timezone_trade", "America/New_York"), logger)
need_replay = ( scheduled_close_report_at = next_delivery_time_after_close(
market_data.trade_date == str(last_reported) market_data.trade_date,
and replay_on_weekdays trade_tz,
and beijing_now.weekday() < 5 delivery_tz,
and beijing_date not in state.get("deliveries", {}) int(report_config.get("delivery_hour", 8)),
)
need_new_data_report = (
market_data.trade_date > str(last_close_reported or "")
and beijing_now >= scheduled_close_report_at
)
replay_after_china_holiday = bool(
report_config.get(
"replay_after_china_holiday",
report_config.get("replay_on_weekdays", True),
)
)
today_is_workday, yesterday_is_workday = get_china_workday_pair(
config,
beijing_now,
logger,
)
need_replay = should_replay_report(
beijing_now,
state.get("deliveries", {}),
replay_after_china_holiday,
today_is_workday,
yesterday_is_workday,
int(report_config.get("delivery_hour", 8)),
)
logger.info(
"美股收盘定时报告=%s (计划时间=%s), 中国休息日后首工作日复述=%s",
need_new_data_report,
scheduled_close_report_at.isoformat(),
need_replay,
) )
logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay)
if not need_new_data_report and not need_replay: if not need_new_data_report and not need_replay:
raise SilentExit("不满足推送条件") raise SilentExit("不满足推送条件")
@@ -98,7 +136,7 @@ def main() -> None:
append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc) append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc)
raise SilentExit("QQBot 投递失败") raise SilentExit("QQBot 投递失败")
update_success_state(state, market_data, beijing_now) update_success_state(state, market_data, beijing_now, need_new_data_report)
append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger) append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger)
save_state(config, state, logger) save_state(config, state, logger)
logger.info("QQBot 投递成功,state 更新成功") logger.info("QQBot 投递成功,state 更新成功")
@@ -170,23 +208,46 @@ def setup_logging(config: dict[str, Any]) -> logging.Logger:
def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None: def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config.get("qqbot", {}) qqbot = config.get("qqbot", {})
appid = str(qqbot.get("appid", "")).strip() appid, appsecret = get_qqbot_credentials(config)
appkey = str(qqbot.get("appkey", "")).strip() credential_source = get_qqbot_credential_source(config)
target_type = str(qqbot.get("target_type", "c2c")).strip() target_type = str(qqbot.get("target_type", "c2c")).strip()
logger.info( logger.info(
"QQBot 配置检查: appid=%s, appkey=%s, target_type=%s", "QQBot 配置检查: source=%s, appid=%s, appsecret=%s, target_type=%s",
credential_source,
mask_secret(appid), mask_secret(appid),
mask_secret(appkey), mask_secret(appsecret),
target_type, target_type,
) )
if not appid or appid == "用户填写": if not appid or appid == "用户填写":
raise SilentExit("QQBot AppID 未配置") raise SilentExit("QQBot AppID 未配置")
if not appkey or appkey == "用户填写": if not appsecret or appsecret == "用户填写":
raise SilentExit("QQBot AppKey 未配置") raise SilentExit("QQBot AppSecret 未配置")
if target_type not in {"c2c", "group"}: if target_type not in {"c2c", "group"}:
raise SilentExit("QQBot target_type 必须为 c2c 或 group") raise SilentExit("QQBot target_type 必须为 c2c 或 group")
def get_qqbot_credentials(config: dict[str, Any]) -> tuple[str, str]:
qqbot = config.get("qqbot", {})
appid = os.environ.get("QQBOT_APPID") or qqbot.get("appid", "")
appsecret = (
os.environ.get("QQBOT_SECRET")
or qqbot.get("appsecret")
or qqbot.get("appkey", "")
)
return str(appid).strip(), str(appsecret).strip()
def get_qqbot_credential_source(config: dict[str, Any]) -> str:
if os.environ.get("QQBOT_APPID") or os.environ.get("QQBOT_SECRET"):
return "environment"
qqbot = config.get("qqbot", {})
if qqbot.get("appsecret"):
return "config.appsecret"
if qqbot.get("appkey"):
return "config.appkey(legacy)"
return "config"
def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None: def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config["qqbot"] qqbot = config["qqbot"]
target_openid = str(qqbot.get("target_openid", "")).strip() target_openid = str(qqbot.get("target_openid", "")).strip()
@@ -255,6 +316,9 @@ def list_recent_text_files(base: Path) -> list[Path]:
def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData: def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData:
try: try:
return fetch_nasdaq_quote(config, logger) return fetch_nasdaq_quote(config, logger)
except IncompleteMarketData as exc:
logger.warning("Nasdaq quote API 尚无完整收盘数据: %s", summarize_error(exc))
raise SilentExit("尚无最近完整美股常规交易日收盘数据")
except Exception as exc: except Exception as exc:
logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc)) logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc))
return fetch_fred_csv(config, logger) return fetch_fred_csv(config, logger)
@@ -264,10 +328,17 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market
url = str(config["nasdaq"]["quote_api"]) url = str(config["nasdaq"]["quote_api"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("Nasdaq quote API 请求开始") logger.info("Nasdaq quote API 请求开始")
payload = http_json("GET", url, timeout=timeout) payload = http_json(
"GET",
url,
timeout=timeout,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
data = payload.get("data", {}) data = payload.get("data", {})
primary = data.get("primaryData", {}) primary = data.get("primaryData", {})
key_stats = data.get("keyStats", {}) key_stats = data.get("keyStats", {})
market_status = str(data.get("marketStatus", "")).strip()
close = parse_decimal(primary.get("lastSalePrice")) close = parse_decimal(primary.get("lastSalePrice"))
change = parse_decimal(primary.get("netChange")) change = parse_decimal(primary.get("netChange"))
@@ -277,10 +348,14 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market
if not timestamp: if not timestamp:
raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp") raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp")
trade_date = parse_trade_date(timestamp, config["report"].get("timezone_trade", "America/New_York")) trade_timezone = config["report"].get("timezone_trade", "America/New_York")
trade_dt = parse_trade_datetime(timestamp, trade_timezone)
validate_complete_regular_close(trade_dt, market_status, trade_timezone)
trade_date = trade_dt.date().isoformat()
logger.info( logger.info(
"NDX 字段解析结果: tradeDate=%s close=%s change=%s changePercent=%s previousClose=%s", "NDX 字段解析结果: tradeDate=%s marketStatus=%s close=%s change=%s changePercent=%s previousClose=%s",
trade_date, trade_date,
market_status,
money(close), money(close),
signed(change), signed(change),
signed(change_percent), signed(change_percent),
@@ -307,9 +382,14 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData
url = str(config["nasdaq"]["fred_csv"]) url = str(config["nasdaq"]["fred_csv"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("FRED CSV 请求开始") logger.info("FRED CSV 请求开始")
request = urllib.request.Request(url, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}) text = http_text(
with urllib.request.urlopen(request, timeout=timeout) as response: "GET",
text = response.read().decode("utf-8-sig") url,
timeout=timeout,
headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig")
rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}] rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}]
if len(rows) < 2: if len(rows) < 2:
raise ValueError("FRED CSV 有效数据不足") raise ValueError("FRED CSV 有效数据不足")
@@ -331,21 +411,140 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData
def parse_trade_date(timestamp: str, timezone_name: str) -> str: def parse_trade_date(timestamp: str, timezone_name: str) -> str:
return parse_trade_datetime(timestamp, timezone_name).date().isoformat()
def should_replay_report(
delivery_now: datetime,
deliveries: dict[str, Any],
enabled: bool,
today_is_workday: bool,
yesterday_is_workday: bool,
delivery_hour: int,
) -> bool:
delivery_date = delivery_now.date()
return (
enabled
and delivery_now.hour >= delivery_hour
and today_is_workday
and not yesterday_is_workday
and delivery_date.isoformat() not in deliveries
)
def next_delivery_time_after_close(
trade_date: str,
trade_tz: ZoneInfo | timezone,
delivery_tz: ZoneInfo | timezone,
delivery_hour: int,
) -> datetime:
trade_day = datetime.strptime(trade_date, "%Y-%m-%d").date()
close_at = datetime.combine(trade_day, datetime_time(16, 0), tzinfo=trade_tz)
close_in_delivery_tz = close_at.astimezone(delivery_tz)
delivery_at = datetime.combine(
close_in_delivery_tz.date(),
datetime_time(delivery_hour, 0),
tzinfo=delivery_tz,
)
if delivery_at <= close_in_delivery_tz:
delivery_at += timedelta(days=1)
return delivery_at
def get_china_workday_pair(
config: dict[str, Any],
delivery_now: datetime,
logger: logging.Logger,
) -> tuple[bool, bool]:
report_config = config["report"]
base_url = str(
report_config.get("holiday_api_v2", "https://api.haoshenqi.top/holiday")
).rstrip("/")
timeout = int(report_config.get("holiday_api_timeout_seconds", 5))
try:
today = fetch_holiday_v2_status(config, f"{base_url}/today", timeout)
yesterday = fetch_holiday_v2_status(config, f"{base_url}/yesterday", timeout)
logger.info(
"中国工作日 API v2 获取成功: today=%s, yesterday=%s",
today,
yesterday,
)
return today, yesterday
except Exception as exc:
today = delivery_now.date()
yesterday = today - timedelta(days=1)
logger.warning(
"中国工作日 API v2 获取失败,回退到周一至周五规则: %s",
summarize_error(exc),
)
return today.weekday() < 5, yesterday.weekday() < 5
def fetch_holiday_v2_status(
config: dict[str, Any],
url: str,
timeout: int,
) -> bool:
text = http_text(
"GET",
url,
timeout=timeout,
headers={"Accept": "text/plain"},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig").strip()
if text == "工作":
return True
if text == "休息":
return False
raise ValueError(f"节假日 API v2 返回未知状态: {text[:50]!r}")
def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime:
cleaned = timestamp.replace("ET", "").strip() cleaned = timestamp.replace("ET", "").strip()
formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"]
tz = get_timezone(timezone_name) tz = get_timezone(timezone_name)
for fmt in formats: formats = [
("%b %d, %Y %I:%M %p", False),
("%m/%d/%Y %I:%M %p", False),
("%Y-%m-%d %H:%M:%S", False),
("%b %d, %Y", True),
("%m/%d/%Y", True),
("%Y-%m-%d", True),
]
for fmt, date_only in formats:
try: try:
dt = datetime.strptime(cleaned, fmt) dt = datetime.strptime(cleaned, fmt)
return dt.replace(tzinfo=tz).date().isoformat() if date_only:
# Nasdaq occasionally returns only the date for an index close.
dt = datetime.combine(dt.date(), datetime_time(16, 0))
return dt.replace(tzinfo=tz)
except ValueError: except ValueError:
continue continue
try: try:
return datetime.fromisoformat(cleaned).astimezone(tz).date().isoformat() dt = datetime.fromisoformat(cleaned)
if dt.tzinfo is None:
return dt.replace(tzinfo=tz)
return dt.astimezone(tz)
except ValueError as exc: except ValueError as exc:
raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc
def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None:
tz = get_timezone(timezone_name)
now = datetime.now(tz)
close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz)
if market_status.lower() == "open":
raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据")
if trade_dt > now:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 晚于当前交易时区时间 {now.isoformat()}"
)
if trade_dt < close_dt:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET"
)
def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone: def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone:
try: try:
return ZoneInfo(timezone_name) return ZoneInfo(timezone_name)
@@ -394,9 +593,16 @@ def initialize_state(
logger.info("state 文件初始化成功") logger.info("state 文件初始化成功")
def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None: def update_success_state(
state: dict[str, Any],
market_data: MarketData,
now: datetime,
scheduled_close_report: bool = False,
) -> None:
beijing_date = now.date().isoformat() beijing_date = now.date().isoformat()
state["lastNewDataReportedDate"] = market_data.trade_date state["lastNewDataReportedDate"] = market_data.trade_date
if scheduled_close_report:
state["lastScheduledCloseReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now) state["lastData"] = serialize_market_data(market_data, now)
state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date
@@ -436,20 +642,54 @@ def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Log
url = f"{base_api}/v2/users/{target_openid}/messages" url = f"{base_api}/v2/users/{target_openid}/messages"
logger.info("QQBot 投递开始: target_type=%s", target_type) logger.info("QQBot 投递开始: target_type=%s", target_type)
body = {"content": content, "msg_type": 0} body = {"content": content, "msg_type": 0}
http_json("POST", url, body=body, timeout=15, headers={"Authorization": f"QQBot {token}"}) http_json(
"POST",
url,
body=body,
timeout=15,
headers={"Authorization": f"QQBot {token}"},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
logger.info("QQBot 投递成功") logger.info("QQBot 投递成功")
def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str: def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str:
qqbot = config["qqbot"] qqbot = config["qqbot"]
url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken"))
body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} appid, appsecret = get_qqbot_credentials(config)
payload = http_json("POST", url, body=body, timeout=15) logger.info(
token = payload.get("access_token") or payload.get("accessToken") "QQBot token 请求开始: source=%s, appid=%s",
if not token: get_qqbot_credential_source(config),
raise ValueError("QQBot token 响应缺少 access_token") mask_secret(appid),
logger.info("QQBot access_token 获取成功") )
return str(token) body = {"appId": appid, "clientSecret": appsecret}
retry_count = max(0, int(qqbot.get("token_retry_count", 1)))
for attempt in range(retry_count + 1):
payload = http_json(
"POST",
url,
body=body,
timeout=15,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
token = payload.get("access_token") or payload.get("accessToken")
if token:
logger.info("QQBot access_token 获取成功")
return str(token)
code = payload.get("code", "unknown")
message = payload.get("message") or payload.get("msg") or "unknown error"
if attempt < retry_count:
logger.warning(
"QQBot token 获取失败,将重试: code=%s, message=%s",
code,
message,
)
time.sleep(1)
continue
raise ValueError(f"QQBot token 获取失败: code={code}, message={message}")
raise RuntimeError("QQBot token 获取流程异常结束")
def append_delivery_log( def append_delivery_log(
@@ -486,25 +726,170 @@ def http_json(
body: dict[str, Any] | None = None, body: dict[str, Any] | None = None,
timeout: int = 15, timeout: int = 15,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
text = http_text(
method,
url,
body=body,
timeout=timeout,
headers=headers,
proxy_config=proxy_config,
proxy_authorization=proxy_authorization,
).decode("utf-8")
parsed = json.loads(text)
if not isinstance(parsed, dict):
raise ValueError("JSON response root is not object")
return parsed
def http_text(
method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 15,
headers: dict[str, str] | None = None,
proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> bytes:
proxy_config = proxy_config or {}
if should_use_curl(proxy_config):
return curl_request(method, url, body, timeout, headers, proxy_config)
request_headers = dict(DEFAULT_HEADERS) request_headers = dict(DEFAULT_HEADERS)
if headers: if headers:
request_headers.update(headers) request_headers.update(headers)
if proxy_authorization:
request_headers["Proxy-Authorization"] = proxy_authorization
data = None data = None
if body is not None: if body is not None:
data = json.dumps(body, ensure_ascii=False).encode("utf-8") data = json.dumps(body, ensure_ascii=False).encode("utf-8")
request_headers["Content-Type"] = "application/json" request_headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=request_headers, method=method) request = urllib.request.Request(url, data=data, headers=request_headers, method=method)
opener = build_url_opener(proxy_config or {})
try: try:
with urllib.request.urlopen(request, timeout=timeout) as response: with opener.open(request, timeout=timeout) as response:
text = response.read().decode("utf-8") return response.read()
except urllib.error.HTTPError as exc: except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:300] detail = exc.read().decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
parsed = json.loads(text)
if not isinstance(parsed, dict):
raise ValueError("JSON response root is not object") def curl_request(
return parsed method: str,
url: str,
body: dict[str, Any] | None,
timeout: int,
headers: dict[str, str] | None,
proxy_config: dict[str, Any],
) -> bytes:
curl = shutil.which("curl.exe") or shutil.which("curl")
if not curl:
raise RuntimeError("network.transport 需要 curl,但当前环境未找到 curl")
command = [
curl,
"--silent",
"--show-error",
"--fail-with-body",
"--max-time",
str(timeout),
"--request",
method,
]
if bool(proxy_config.get("curl_http1_1", True)):
# Some HTTPS proxies intermittently fail while tunnelling HTTP/2.
command.append("--http1.1")
retry_count = max(0, int(proxy_config.get("curl_retry_count", 2)))
if retry_count and method.upper() in {"GET", "HEAD"}:
command.extend(
[
"--retry",
str(retry_count),
"--retry-delay",
str(max(0, int(proxy_config.get("curl_retry_delay_seconds", 1)))),
"--retry-all-errors",
]
)
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
command.extend(["--proxy", proxy_url])
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
for key, value in request_headers.items():
command.extend(["--header", f"{key}: {value}"])
input_bytes = None
if body is not None:
input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8")
if not any(key.lower() == "content-type" for key in request_headers):
command.extend(["--header", "Content-Type: application/json"])
command.extend(["--data-binary", "@-"])
command.append(url)
completed = subprocess.run(
command,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if completed.returncode != 0:
error = completed.stderr.decode("utf-8", errors="replace").strip()
if not error:
error = completed.stdout.decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"curl request failed: {error[:300]}")
return completed.stdout
def should_use_curl(proxy_config: dict[str, Any]) -> bool:
transport = str(proxy_config.get("transport", "auto")).strip().lower()
proxy_url = str(proxy_config.get("proxy_url", "")).strip().lower()
if transport == "curl":
return True
if transport == "urllib":
return False
if proxy_url.startswith("https://"):
return True
return bool(proxy_config.get("use_environment_proxy", True)) and has_environment_proxy()
def has_environment_proxy() -> bool:
proxies = urllib.request.getproxies()
return any(key.lower() in {"http", "https", "all"} and value for key, value in proxies.items())
def build_url_opener(proxy_config: dict[str, Any]) -> urllib.request.OpenerDirector:
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
return urllib.request.build_opener(
urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url})
)
if bool(proxy_config.get("use_environment_proxy", False)):
return urllib.request.build_opener()
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
def get_proxy_config(config: dict[str, Any]) -> dict[str, Any]:
network = config.get("network", {})
if not isinstance(network, dict):
return {}
return network
def proxy_authorization_header(config: dict[str, Any]) -> str | None:
proxy_url = str(get_proxy_config(config).get("proxy_url", "")).strip()
if not proxy_url:
return None
parsed = urllib.parse.urlsplit(proxy_url)
if not parsed.username:
return None
username = urllib.parse.unquote(parsed.username)
password = urllib.parse.unquote(parsed.password or "")
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
return f"Basic {token}"
def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]:
+292
View File
@@ -0,0 +1,292 @@
import logging
import subprocess
import unittest
from datetime import date, datetime, time, timedelta, timezone
from unittest.mock import patch
import ndx_daily_report as report
class ParseTradeDatetimeTests(unittest.TestCase):
def test_nasdaq_date_only_timestamp_represents_market_close(self) -> None:
parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York")
self.assertEqual(parsed.date().isoformat(), "2026-06-05")
self.assertEqual(parsed.time(), time(16, 0))
def test_naive_iso_timestamp_uses_trade_timezone(self) -> None:
parsed = report.parse_trade_datetime("2026-06-05T16:01:00", "America/New_York")
self.assertEqual(parsed.hour, 16)
self.assertIsNotNone(parsed.tzinfo)
def test_date_only_timestamp_passes_closed_market_validation(self) -> None:
parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York")
with patch.object(report, "datetime", wraps=datetime) as mocked_datetime:
mocked_datetime.now.return_value = datetime(2026, 6, 5, 20, 0, tzinfo=parsed.tzinfo)
report.validate_complete_regular_close(parsed, "Closed", "America/New_York")
def test_date_only_timestamp_is_incomplete_before_market_close(self) -> None:
parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York")
with patch.object(report, "datetime", wraps=datetime) as mocked_datetime:
mocked_datetime.now.return_value = datetime(2026, 6, 5, 10, 0, tzinfo=parsed.tzinfo)
with self.assertRaises(report.IncompleteMarketData):
report.validate_complete_regular_close(parsed, "Closed", "America/New_York")
class ReplayReportTests(unittest.TestCase):
def setUp(self) -> None:
self.beijing_tz = timezone(timedelta(hours=8))
def test_china_rest_day_does_not_trigger_replay(self) -> None:
now = datetime(2026, 6, 19, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, False, True, 8)
self.assertFalse(needed)
def test_first_workday_after_holiday_triggers_replay(self) -> None:
now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertTrue(needed)
def test_makeup_workday_after_holiday_triggers_replay(self) -> None:
now = datetime(2026, 1, 4, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertTrue(needed)
def test_ordinary_workday_does_not_trigger_replay(self) -> None:
now = datetime(2026, 6, 23, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, True, 8)
self.assertFalse(needed)
def test_replay_waits_until_delivery_hour(self) -> None:
now = datetime(2026, 6, 22, 7, 59, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertFalse(needed)
def test_replay_is_not_repeated_after_delivery(self) -> None:
now = datetime(2026, 6, 22, 13, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(
now,
{"2026-06-22": "2026-06-19"},
True,
True,
False,
8,
)
self.assertFalse(needed)
class DeliveryScheduleTests(unittest.TestCase):
def test_friday_close_is_scheduled_for_saturday_beijing_time(self) -> None:
trade_tz = report.get_timezone("America/New_York")
delivery_tz = report.get_timezone("Asia/Shanghai")
scheduled = report.next_delivery_time_after_close(
"2026-06-05",
trade_tz,
delivery_tz,
8,
)
self.assertEqual(scheduled.date(), date(2026, 6, 6))
self.assertEqual(scheduled.time(), time(8, 0))
def test_us_close_report_schedule_is_independent_of_china_holiday(self) -> None:
trade_tz = report.get_timezone("America/New_York")
delivery_tz = report.get_timezone("Asia/Shanghai")
scheduled = report.next_delivery_time_after_close(
"2026-06-18",
trade_tz,
delivery_tz,
8,
)
self.assertEqual(scheduled.date(), date(2026, 6, 19))
class HolidayApiTests(unittest.TestCase):
def setUp(self) -> None:
self.config = {
"report": {
"holiday_api_v2": "https://api.haoshenqi.top/holiday",
"holiday_api_timeout_seconds": 5,
},
"network": {"transport": "urllib", "use_environment_proxy": False},
}
self.logger = logging.getLogger("holiday-api-test")
self.beijing_tz = timezone(timedelta(hours=8))
@patch("ndx_daily_report.fetch_holiday_v2_status")
def test_v2_requests_today_and_yesterday(self, fetch_status) -> None:
fetch_status.side_effect = [True, False]
now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
result = report.get_china_workday_pair(self.config, now, self.logger)
self.assertEqual(result, (True, False))
self.assertEqual(
[call.args[1] for call in fetch_status.call_args_list],
[
"https://api.haoshenqi.top/holiday/today",
"https://api.haoshenqi.top/holiday/yesterday",
],
)
@patch("ndx_daily_report.fetch_holiday_v2_status", side_effect=RuntimeError("offline"))
def test_api_failure_falls_back_to_weekdays(self, _fetch_status) -> None:
monday = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
result = report.get_china_workday_pair(self.config, monday, self.logger)
self.assertEqual(result, (True, False))
@patch("ndx_daily_report.http_text", return_value="工作".encode())
def test_v2_work_response_is_workday(self, _http_text) -> None:
self.assertTrue(
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
)
@patch("ndx_daily_report.http_text", return_value="休息".encode())
def test_v2_rest_response_is_not_workday(self, _http_text) -> None:
self.assertFalse(
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
)
@patch("ndx_daily_report.http_text", return_value=b"unknown")
def test_unknown_v2_response_is_rejected(self, _http_text) -> None:
with self.assertRaises(ValueError):
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
class QQBotTokenTests(unittest.TestCase):
def setUp(self) -> None:
self.config = {
"qqbot": {
"appid": "config-appid",
"appsecret": "config-secret",
"token_api": "https://bots.qq.com/app/getAppAccessToken",
},
"network": {"transport": "urllib", "use_environment_proxy": False},
}
self.logger = logging.getLogger("qqbot-token-test")
@patch.dict(
"os.environ",
{"QQBOT_APPID": "env-appid", "QQBOT_SECRET": "env-secret"},
clear=False,
)
def test_environment_credentials_override_config(self) -> None:
self.assertEqual(
report.get_qqbot_credentials(self.config),
("env-appid", "env-secret"),
)
self.assertEqual(
report.get_qqbot_credential_source(self.config),
"environment",
)
@patch("ndx_daily_report.http_json")
def test_token_request_uses_official_field_names(self, http_json) -> None:
http_json.return_value = {"access_token": "token", "expires_in": "7200"}
token = report.get_qqbot_token(self.config, self.logger)
self.assertEqual(token, "token")
self.assertEqual(
http_json.call_args.kwargs["body"],
{"appId": "config-appid", "clientSecret": "config-secret"},
)
@patch("ndx_daily_report.http_json")
def test_token_error_includes_qq_error_code_and_message(self, http_json) -> None:
http_json.return_value = {"code": 100007, "message": "appid invalid"}
self.config["qqbot"]["token_retry_count"] = 0
with self.assertRaisesRegex(
ValueError,
"code=100007, message=appid invalid",
):
report.get_qqbot_token(self.config, self.logger)
@patch("ndx_daily_report.time.sleep")
@patch("ndx_daily_report.http_json")
def test_token_request_retries_one_transient_error(self, http_json, sleep) -> None:
http_json.side_effect = [
{"code": 100007, "message": "appid invalid"},
{"access_token": "token", "expires_in": "7200"},
]
token = report.get_qqbot_token(self.config, self.logger)
self.assertEqual(token, "token")
self.assertEqual(http_json.call_count, 2)
sleep.assert_called_once_with(1)
def test_legacy_appkey_remains_supported(self) -> None:
config = {"qqbot": {"appid": "appid", "appkey": "legacy-secret"}}
self.assertEqual(
report.get_qqbot_credentials(config),
("appid", "legacy-secret"),
)
self.assertEqual(
report.get_qqbot_credential_source(config),
"config.appkey(legacy)",
)
class CurlRequestTests(unittest.TestCase):
@patch("ndx_daily_report.subprocess.run")
@patch("ndx_daily_report.shutil.which", return_value="/usr/bin/curl")
def test_curl_uses_http1_and_retries_by_default(self, _which, run) -> None:
run.return_value = subprocess.CompletedProcess([], 0, stdout=b"ok", stderr=b"")
result = report.curl_request("GET", "https://example.com", None, 15, None, {})
self.assertEqual(result, b"ok")
command = run.call_args.args[0]
self.assertIn("--http1.1", command)
self.assertEqual(command[command.index("--retry") + 1], "2")
self.assertIn("--retry-all-errors", command)
@patch("ndx_daily_report.subprocess.run")
@patch("ndx_daily_report.shutil.which", return_value="/usr/bin/curl")
def test_curl_does_not_retry_post_requests(self, _which, run) -> None:
run.return_value = subprocess.CompletedProcess([], 0, stdout=b"{}", stderr=b"")
report.curl_request("POST", "https://example.com", {"value": 1}, 15, None, {})
command = run.call_args.args[0]
self.assertNotIn("--retry", command)
self.assertIn("Content-Type: application/json", command)
if __name__ == "__main__":
unittest.main()