完善 NDX 日报调度与 QQBot 鉴权

This commit is contained in:
2026-06-06 13:36:51 +08:00
parent e2af2eff79
commit e72ec9ffa3
3 changed files with 453 additions and 35 deletions
+10 -2
View File
@@ -23,8 +23,10 @@ network:
qqbot:
appid: "用户填写"
appkey: "用户填写"
appsecret: "用户填写"
# 也可通过环境变量 QQBOT_APPID、QQBOT_SECRET 提供,环境变量优先。
token_api: https://bots.qq.com/app/getAppAccessToken
token_retry_count: 1
base_api: https://api.sgroup.qq.com
# c2c 表示私聊,group 表示群聊
@@ -42,4 +44,10 @@ qqbot:
report:
timezone_trade: America/New_York
timezone_delivery: Asia/Shanghai
replay_on_weekdays: true
# 美股收盘后,下一个北京时间 08:00 推送;中国休息日后的首个工作日复述最近收盘。
delivery_hour: 8
replay_after_china_holiday: true
# Haoshenqi0123/holiday v2:实际请求 /today 和 /yesterday。
# 获取失败时回退为周一至周五工作、周六日休息。
holiday_api_v2: https://api.haoshenqi.top/holiday
holiday_api_timeout_seconds: 5
+203 -31
View File
@@ -69,26 +69,56 @@ def main() -> None:
beijing_now = datetime.now(delivery_tz)
beijing_date = beijing_now.date().isoformat()
last_reported = state.get("lastNewDataReportedDate")
last_close_reported = state.get("lastScheduledCloseReportedDate")
logger.info(
"当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s",
"当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s, "
"lastScheduledCloseReportedDate=%s",
beijing_date,
market_data.trade_date,
last_reported,
last_close_reported,
)
if not last_reported:
initialize_state(config, state, market_data, beijing_now, logger)
raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state")
need_new_data_report = market_data.trade_date > str(last_reported)
replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True))
need_replay = (
market_data.trade_date == str(last_reported)
and replay_on_weekdays
and beijing_now.weekday() < 5
and beijing_date not in state.get("deliveries", {})
report_config = config["report"]
trade_tz = get_timezone(report_config.get("timezone_trade", "America/New_York"), logger)
scheduled_close_report_at = next_delivery_time_after_close(
market_data.trade_date,
trade_tz,
delivery_tz,
int(report_config.get("delivery_hour", 8)),
)
need_new_data_report = (
market_data.trade_date > str(last_close_reported or "")
and beijing_now >= scheduled_close_report_at
)
replay_after_china_holiday = bool(
report_config.get(
"replay_after_china_holiday",
report_config.get("replay_on_weekdays", True),
)
)
today_is_workday, yesterday_is_workday = get_china_workday_pair(
config,
beijing_now,
logger,
)
need_replay = should_replay_report(
beijing_now,
state.get("deliveries", {}),
replay_after_china_holiday,
today_is_workday,
yesterday_is_workday,
int(report_config.get("delivery_hour", 8)),
)
logger.info(
"美股收盘定时报告=%s (计划时间=%s), 中国休息日后首工作日复述=%s",
need_new_data_report,
scheduled_close_report_at.isoformat(),
need_replay,
)
logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay)
if not need_new_data_report and not need_replay:
raise SilentExit("不满足推送条件")
@@ -106,7 +136,7 @@ def main() -> None:
append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc)
raise SilentExit("QQBot 投递失败")
update_success_state(state, market_data, beijing_now)
update_success_state(state, market_data, beijing_now, need_new_data_report)
append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger)
save_state(config, state, logger)
logger.info("QQBot 投递成功,state 更新成功")
@@ -178,23 +208,46 @@ def setup_logging(config: dict[str, Any]) -> logging.Logger:
def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config.get("qqbot", {})
appid = str(qqbot.get("appid", "")).strip()
appkey = str(qqbot.get("appkey", "")).strip()
appid, appsecret = get_qqbot_credentials(config)
credential_source = get_qqbot_credential_source(config)
target_type = str(qqbot.get("target_type", "c2c")).strip()
logger.info(
"QQBot 配置检查: appid=%s, appkey=%s, target_type=%s",
"QQBot 配置检查: source=%s, appid=%s, appsecret=%s, target_type=%s",
credential_source,
mask_secret(appid),
mask_secret(appkey),
mask_secret(appsecret),
target_type,
)
if not appid or appid == "用户填写":
raise SilentExit("QQBot AppID 未配置")
if not appkey or appkey == "用户填写":
raise SilentExit("QQBot AppKey 未配置")
if not appsecret or appsecret == "用户填写":
raise SilentExit("QQBot AppSecret 未配置")
if target_type not in {"c2c", "group"}:
raise SilentExit("QQBot target_type 必须为 c2c 或 group")
def get_qqbot_credentials(config: dict[str, Any]) -> tuple[str, str]:
qqbot = config.get("qqbot", {})
appid = os.environ.get("QQBOT_APPID") or qqbot.get("appid", "")
appsecret = (
os.environ.get("QQBOT_SECRET")
or qqbot.get("appsecret")
or qqbot.get("appkey", "")
)
return str(appid).strip(), str(appsecret).strip()
def get_qqbot_credential_source(config: dict[str, Any]) -> str:
if os.environ.get("QQBOT_APPID") or os.environ.get("QQBOT_SECRET"):
return "environment"
qqbot = config.get("qqbot", {})
if qqbot.get("appsecret"):
return "config.appsecret"
if qqbot.get("appkey"):
return "config.appkey(legacy)"
return "config"
def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None:
qqbot = config["qqbot"]
target_openid = str(qqbot.get("target_openid", "")).strip()
@@ -361,6 +414,92 @@ def parse_trade_date(timestamp: str, timezone_name: str) -> str:
return parse_trade_datetime(timestamp, timezone_name).date().isoformat()
def should_replay_report(
delivery_now: datetime,
deliveries: dict[str, Any],
enabled: bool,
today_is_workday: bool,
yesterday_is_workday: bool,
delivery_hour: int,
) -> bool:
delivery_date = delivery_now.date()
return (
enabled
and delivery_now.hour >= delivery_hour
and today_is_workday
and not yesterday_is_workday
and delivery_date.isoformat() not in deliveries
)
def next_delivery_time_after_close(
trade_date: str,
trade_tz: ZoneInfo | timezone,
delivery_tz: ZoneInfo | timezone,
delivery_hour: int,
) -> datetime:
trade_day = datetime.strptime(trade_date, "%Y-%m-%d").date()
close_at = datetime.combine(trade_day, datetime_time(16, 0), tzinfo=trade_tz)
close_in_delivery_tz = close_at.astimezone(delivery_tz)
delivery_at = datetime.combine(
close_in_delivery_tz.date(),
datetime_time(delivery_hour, 0),
tzinfo=delivery_tz,
)
if delivery_at <= close_in_delivery_tz:
delivery_at += timedelta(days=1)
return delivery_at
def get_china_workday_pair(
config: dict[str, Any],
delivery_now: datetime,
logger: logging.Logger,
) -> tuple[bool, bool]:
report_config = config["report"]
base_url = str(
report_config.get("holiday_api_v2", "https://api.haoshenqi.top/holiday")
).rstrip("/")
timeout = int(report_config.get("holiday_api_timeout_seconds", 5))
try:
today = fetch_holiday_v2_status(config, f"{base_url}/today", timeout)
yesterday = fetch_holiday_v2_status(config, f"{base_url}/yesterday", timeout)
logger.info(
"中国工作日 API v2 获取成功: today=%s, yesterday=%s",
today,
yesterday,
)
return today, yesterday
except Exception as exc:
today = delivery_now.date()
yesterday = today - timedelta(days=1)
logger.warning(
"中国工作日 API v2 获取失败,回退到周一至周五规则: %s",
summarize_error(exc),
)
return today.weekday() < 5, yesterday.weekday() < 5
def fetch_holiday_v2_status(
config: dict[str, Any],
url: str,
timeout: int,
) -> bool:
text = http_text(
"GET",
url,
timeout=timeout,
headers={"Accept": "text/plain"},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig").strip()
if text == "工作":
return True
if text == "休息":
return False
raise ValueError(f"节假日 API v2 返回未知状态: {text[:50]!r}")
def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime:
cleaned = timestamp.replace("ET", "").strip()
tz = get_timezone(timezone_name)
@@ -392,9 +531,14 @@ def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime:
def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None:
tz = get_timezone(timezone_name)
now = datetime.now(tz)
close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz)
if market_status.lower() == "open":
raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据")
if trade_dt > now:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 晚于当前交易时区时间 {now.isoformat()}"
)
if trade_dt < close_dt:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET"
@@ -449,9 +593,16 @@ def initialize_state(
logger.info("state 文件初始化成功")
def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None:
def update_success_state(
state: dict[str, Any],
market_data: MarketData,
now: datetime,
scheduled_close_report: bool = False,
) -> None:
beijing_date = now.date().isoformat()
state["lastNewDataReportedDate"] = market_data.trade_date
if scheduled_close_report:
state["lastScheduledCloseReportedDate"] = market_data.trade_date
state["lastData"] = serialize_market_data(market_data, now)
state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date
@@ -506,20 +657,39 @@ def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Log
def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str:
qqbot = config["qqbot"]
url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken"))
body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])}
payload = http_json(
"POST",
url,
body=body,
timeout=15,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
appid, appsecret = get_qqbot_credentials(config)
logger.info(
"QQBot token 请求开始: source=%s, appid=%s",
get_qqbot_credential_source(config),
mask_secret(appid),
)
token = payload.get("access_token") or payload.get("accessToken")
if not token:
raise ValueError("QQBot token 响应缺少 access_token")
logger.info("QQBot access_token 获取成功")
return str(token)
body = {"appId": appid, "clientSecret": appsecret}
retry_count = max(0, int(qqbot.get("token_retry_count", 1)))
for attempt in range(retry_count + 1):
payload = http_json(
"POST",
url,
body=body,
timeout=15,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
token = payload.get("access_token") or payload.get("accessToken")
if token:
logger.info("QQBot access_token 获取成功")
return str(token)
code = payload.get("code", "unknown")
message = payload.get("message") or payload.get("msg") or "unknown error"
if attempt < retry_count:
logger.warning(
"QQBot token 获取失败,将重试: code=%s, message=%s",
code,
message,
)
time.sleep(1)
continue
raise ValueError(f"QQBot token 获取失败: code={code}, message={message}")
raise RuntimeError("QQBot token 获取流程异常结束")
def append_delivery_log(
@@ -654,6 +824,8 @@ def curl_request(
input_bytes = None
if body is not None:
input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8")
if not any(key.lower() == "content-type" for key in request_headers):
command.extend(["--header", "Content-Type: application/json"])
command.extend(["--data-binary", "@-"])
command.append(url)
+240 -2
View File
@@ -1,6 +1,7 @@
import logging
import subprocess
import unittest
from datetime import time
from datetime import date, datetime, time, timedelta, timezone
from unittest.mock import patch
import ndx_daily_report as report
@@ -22,7 +23,243 @@ class ParseTradeDatetimeTests(unittest.TestCase):
def test_date_only_timestamp_passes_closed_market_validation(self) -> None:
parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York")
report.validate_complete_regular_close(parsed, "Closed", "America/New_York")
with patch.object(report, "datetime", wraps=datetime) as mocked_datetime:
mocked_datetime.now.return_value = datetime(2026, 6, 5, 20, 0, tzinfo=parsed.tzinfo)
report.validate_complete_regular_close(parsed, "Closed", "America/New_York")
def test_date_only_timestamp_is_incomplete_before_market_close(self) -> None:
parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York")
with patch.object(report, "datetime", wraps=datetime) as mocked_datetime:
mocked_datetime.now.return_value = datetime(2026, 6, 5, 10, 0, tzinfo=parsed.tzinfo)
with self.assertRaises(report.IncompleteMarketData):
report.validate_complete_regular_close(parsed, "Closed", "America/New_York")
class ReplayReportTests(unittest.TestCase):
def setUp(self) -> None:
self.beijing_tz = timezone(timedelta(hours=8))
def test_china_rest_day_does_not_trigger_replay(self) -> None:
now = datetime(2026, 6, 19, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, False, True, 8)
self.assertFalse(needed)
def test_first_workday_after_holiday_triggers_replay(self) -> None:
now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertTrue(needed)
def test_makeup_workday_after_holiday_triggers_replay(self) -> None:
now = datetime(2026, 1, 4, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertTrue(needed)
def test_ordinary_workday_does_not_trigger_replay(self) -> None:
now = datetime(2026, 6, 23, 8, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, True, 8)
self.assertFalse(needed)
def test_replay_waits_until_delivery_hour(self) -> None:
now = datetime(2026, 6, 22, 7, 59, tzinfo=self.beijing_tz)
needed = report.should_replay_report(now, {}, True, True, False, 8)
self.assertFalse(needed)
def test_replay_is_not_repeated_after_delivery(self) -> None:
now = datetime(2026, 6, 22, 13, 0, tzinfo=self.beijing_tz)
needed = report.should_replay_report(
now,
{"2026-06-22": "2026-06-19"},
True,
True,
False,
8,
)
self.assertFalse(needed)
class DeliveryScheduleTests(unittest.TestCase):
def test_friday_close_is_scheduled_for_saturday_beijing_time(self) -> None:
trade_tz = report.get_timezone("America/New_York")
delivery_tz = report.get_timezone("Asia/Shanghai")
scheduled = report.next_delivery_time_after_close(
"2026-06-05",
trade_tz,
delivery_tz,
8,
)
self.assertEqual(scheduled.date(), date(2026, 6, 6))
self.assertEqual(scheduled.time(), time(8, 0))
def test_us_close_report_schedule_is_independent_of_china_holiday(self) -> None:
trade_tz = report.get_timezone("America/New_York")
delivery_tz = report.get_timezone("Asia/Shanghai")
scheduled = report.next_delivery_time_after_close(
"2026-06-18",
trade_tz,
delivery_tz,
8,
)
self.assertEqual(scheduled.date(), date(2026, 6, 19))
class HolidayApiTests(unittest.TestCase):
def setUp(self) -> None:
self.config = {
"report": {
"holiday_api_v2": "https://api.haoshenqi.top/holiday",
"holiday_api_timeout_seconds": 5,
},
"network": {"transport": "urllib", "use_environment_proxy": False},
}
self.logger = logging.getLogger("holiday-api-test")
self.beijing_tz = timezone(timedelta(hours=8))
@patch("ndx_daily_report.fetch_holiday_v2_status")
def test_v2_requests_today_and_yesterday(self, fetch_status) -> None:
fetch_status.side_effect = [True, False]
now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
result = report.get_china_workday_pair(self.config, now, self.logger)
self.assertEqual(result, (True, False))
self.assertEqual(
[call.args[1] for call in fetch_status.call_args_list],
[
"https://api.haoshenqi.top/holiday/today",
"https://api.haoshenqi.top/holiday/yesterday",
],
)
@patch("ndx_daily_report.fetch_holiday_v2_status", side_effect=RuntimeError("offline"))
def test_api_failure_falls_back_to_weekdays(self, _fetch_status) -> None:
monday = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz)
result = report.get_china_workday_pair(self.config, monday, self.logger)
self.assertEqual(result, (True, False))
@patch("ndx_daily_report.http_text", return_value="工作".encode())
def test_v2_work_response_is_workday(self, _http_text) -> None:
self.assertTrue(
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
)
@patch("ndx_daily_report.http_text", return_value="休息".encode())
def test_v2_rest_response_is_not_workday(self, _http_text) -> None:
self.assertFalse(
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
)
@patch("ndx_daily_report.http_text", return_value=b"unknown")
def test_unknown_v2_response_is_rejected(self, _http_text) -> None:
with self.assertRaises(ValueError):
report.fetch_holiday_v2_status(
self.config,
"https://api.haoshenqi.top/holiday/today",
5,
)
class QQBotTokenTests(unittest.TestCase):
def setUp(self) -> None:
self.config = {
"qqbot": {
"appid": "config-appid",
"appsecret": "config-secret",
"token_api": "https://bots.qq.com/app/getAppAccessToken",
},
"network": {"transport": "urllib", "use_environment_proxy": False},
}
self.logger = logging.getLogger("qqbot-token-test")
@patch.dict(
"os.environ",
{"QQBOT_APPID": "env-appid", "QQBOT_SECRET": "env-secret"},
clear=False,
)
def test_environment_credentials_override_config(self) -> None:
self.assertEqual(
report.get_qqbot_credentials(self.config),
("env-appid", "env-secret"),
)
self.assertEqual(
report.get_qqbot_credential_source(self.config),
"environment",
)
@patch("ndx_daily_report.http_json")
def test_token_request_uses_official_field_names(self, http_json) -> None:
http_json.return_value = {"access_token": "token", "expires_in": "7200"}
token = report.get_qqbot_token(self.config, self.logger)
self.assertEqual(token, "token")
self.assertEqual(
http_json.call_args.kwargs["body"],
{"appId": "config-appid", "clientSecret": "config-secret"},
)
@patch("ndx_daily_report.http_json")
def test_token_error_includes_qq_error_code_and_message(self, http_json) -> None:
http_json.return_value = {"code": 100007, "message": "appid invalid"}
self.config["qqbot"]["token_retry_count"] = 0
with self.assertRaisesRegex(
ValueError,
"code=100007, message=appid invalid",
):
report.get_qqbot_token(self.config, self.logger)
@patch("ndx_daily_report.time.sleep")
@patch("ndx_daily_report.http_json")
def test_token_request_retries_one_transient_error(self, http_json, sleep) -> None:
http_json.side_effect = [
{"code": 100007, "message": "appid invalid"},
{"access_token": "token", "expires_in": "7200"},
]
token = report.get_qqbot_token(self.config, self.logger)
self.assertEqual(token, "token")
self.assertEqual(http_json.call_count, 2)
sleep.assert_called_once_with(1)
def test_legacy_appkey_remains_supported(self) -> None:
config = {"qqbot": {"appid": "appid", "appkey": "legacy-secret"}}
self.assertEqual(
report.get_qqbot_credentials(config),
("appid", "legacy-secret"),
)
self.assertEqual(
report.get_qqbot_credential_source(config),
"config.appkey(legacy)",
)
class CurlRequestTests(unittest.TestCase):
@@ -48,6 +285,7 @@ class CurlRequestTests(unittest.TestCase):
command = run.call_args.args[0]
self.assertNotIn("--retry", command)
self.assertIn("Content-Type: application/json", command)
if __name__ == "__main__":