From e72ec9ffa3c09fe0838e4e2ef3542383a23791d4 Mon Sep 17 00:00:00 2001 From: wangwei0518 <1329996666@qq.com> Date: Sat, 6 Jun 2026 13:36:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=20NDX=20=E6=97=A5=E6=8A=A5?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E4=B8=8E=20QQBot=20=E9=89=B4=E6=9D=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NdxDailyReport/config.example.yaml | 12 +- NdxDailyReport/ndx_daily_report.py | 234 ++++++++++++++++++++--- NdxDailyReport/test_ndx_daily_report.py | 242 +++++++++++++++++++++++- 3 files changed, 453 insertions(+), 35 deletions(-) diff --git a/NdxDailyReport/config.example.yaml b/NdxDailyReport/config.example.yaml index b1d19b4..2228ad8 100644 --- a/NdxDailyReport/config.example.yaml +++ b/NdxDailyReport/config.example.yaml @@ -23,8 +23,10 @@ network: qqbot: appid: "用户填写" - appkey: "用户填写" + appsecret: "用户填写" + # 也可通过环境变量 QQBOT_APPID、QQBOT_SECRET 提供,环境变量优先。 token_api: https://bots.qq.com/app/getAppAccessToken + token_retry_count: 1 base_api: https://api.sgroup.qq.com # c2c 表示私聊,group 表示群聊 @@ -42,4 +44,10 @@ qqbot: report: timezone_trade: America/New_York timezone_delivery: Asia/Shanghai - replay_on_weekdays: true + # 美股收盘后,下一个北京时间 08:00 推送;中国休息日后的首个工作日复述最近收盘。 + delivery_hour: 8 + replay_after_china_holiday: true + # Haoshenqi0123/holiday v2:实际请求 /today 和 /yesterday。 + # 获取失败时回退为周一至周五工作、周六日休息。 + holiday_api_v2: https://api.haoshenqi.top/holiday + holiday_api_timeout_seconds: 5 diff --git a/NdxDailyReport/ndx_daily_report.py b/NdxDailyReport/ndx_daily_report.py index d11e9f3..6b0e963 100644 --- a/NdxDailyReport/ndx_daily_report.py +++ b/NdxDailyReport/ndx_daily_report.py @@ -69,26 +69,56 @@ def main() -> None: beijing_now = datetime.now(delivery_tz) beijing_date = beijing_now.date().isoformat() last_reported = state.get("lastNewDataReportedDate") + last_close_reported = state.get("lastScheduledCloseReportedDate") logger.info( - "当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s", + "当前北京时间日期=%s, 交易日期=%s, lastNewDataReportedDate=%s, " + "lastScheduledCloseReportedDate=%s", beijing_date, market_data.trade_date, last_reported, + last_close_reported, ) if not last_reported: initialize_state(config, state, market_data, beijing_now, logger) - raise SilentExit("首次运行或 state 缺少 lastNewDataReportedDate,已初始化 state") - need_new_data_report = market_data.trade_date > str(last_reported) - replay_on_weekdays = bool(config["report"].get("replay_on_weekdays", True)) - need_replay = ( - market_data.trade_date == str(last_reported) - and replay_on_weekdays - and beijing_now.weekday() < 5 - and beijing_date not in state.get("deliveries", {}) + report_config = config["report"] + trade_tz = get_timezone(report_config.get("timezone_trade", "America/New_York"), logger) + scheduled_close_report_at = next_delivery_time_after_close( + market_data.trade_date, + trade_tz, + delivery_tz, + int(report_config.get("delivery_hour", 8)), + ) + need_new_data_report = ( + market_data.trade_date > str(last_close_reported or "") + and beijing_now >= scheduled_close_report_at + ) + replay_after_china_holiday = bool( + report_config.get( + "replay_after_china_holiday", + report_config.get("replay_on_weekdays", True), + ) + ) + today_is_workday, yesterday_is_workday = get_china_workday_pair( + config, + beijing_now, + logger, + ) + need_replay = should_replay_report( + beijing_now, + state.get("deliveries", {}), + replay_after_china_holiday, + today_is_workday, + yesterday_is_workday, + int(report_config.get("delivery_hour", 8)), + ) + logger.info( + "美股收盘定时报告=%s (计划时间=%s), 中国休息日后首工作日复述=%s", + need_new_data_report, + scheduled_close_report_at.isoformat(), + need_replay, ) - logger.info("A 新数据报告=%s, B 工作日复述报告=%s", need_new_data_report, need_replay) if not need_new_data_report and not need_replay: raise SilentExit("不满足推送条件") @@ -106,7 +136,7 @@ def main() -> None: append_delivery_log(config, market_data, beijing_now, delivery_target, "failed", logger, exc) raise SilentExit("QQBot 投递失败") - update_success_state(state, market_data, beijing_now) + update_success_state(state, market_data, beijing_now, need_new_data_report) append_delivery_log(config, market_data, beijing_now, delivery_target, "sent", logger) save_state(config, state, logger) logger.info("QQBot 投递成功,state 更新成功") @@ -178,23 +208,46 @@ def setup_logging(config: dict[str, Any]) -> logging.Logger: def validate_qqbot_config(config: dict[str, Any], logger: logging.Logger) -> None: qqbot = config.get("qqbot", {}) - appid = str(qqbot.get("appid", "")).strip() - appkey = str(qqbot.get("appkey", "")).strip() + appid, appsecret = get_qqbot_credentials(config) + credential_source = get_qqbot_credential_source(config) target_type = str(qqbot.get("target_type", "c2c")).strip() logger.info( - "QQBot 配置检查: appid=%s, appkey=%s, target_type=%s", + "QQBot 配置检查: source=%s, appid=%s, appsecret=%s, target_type=%s", + credential_source, mask_secret(appid), - mask_secret(appkey), + mask_secret(appsecret), target_type, ) if not appid or appid == "用户填写": raise SilentExit("QQBot AppID 未配置") - if not appkey or appkey == "用户填写": - raise SilentExit("QQBot AppKey 未配置") + if not appsecret or appsecret == "用户填写": + raise SilentExit("QQBot AppSecret 未配置") if target_type not in {"c2c", "group"}: raise SilentExit("QQBot target_type 必须为 c2c 或 group") +def get_qqbot_credentials(config: dict[str, Any]) -> tuple[str, str]: + qqbot = config.get("qqbot", {}) + appid = os.environ.get("QQBOT_APPID") or qqbot.get("appid", "") + appsecret = ( + os.environ.get("QQBOT_SECRET") + or qqbot.get("appsecret") + or qqbot.get("appkey", "") + ) + return str(appid).strip(), str(appsecret).strip() + + +def get_qqbot_credential_source(config: dict[str, Any]) -> str: + if os.environ.get("QQBOT_APPID") or os.environ.get("QQBOT_SECRET"): + return "environment" + qqbot = config.get("qqbot", {}) + if qqbot.get("appsecret"): + return "config.appsecret" + if qqbot.get("appkey"): + return "config.appkey(legacy)" + return "config" + + def ensure_target_openid(config: dict[str, Any], logger: logging.Logger) -> None: qqbot = config["qqbot"] target_openid = str(qqbot.get("target_openid", "")).strip() @@ -361,6 +414,92 @@ def parse_trade_date(timestamp: str, timezone_name: str) -> str: return parse_trade_datetime(timestamp, timezone_name).date().isoformat() +def should_replay_report( + delivery_now: datetime, + deliveries: dict[str, Any], + enabled: bool, + today_is_workday: bool, + yesterday_is_workday: bool, + delivery_hour: int, +) -> bool: + delivery_date = delivery_now.date() + return ( + enabled + and delivery_now.hour >= delivery_hour + and today_is_workday + and not yesterday_is_workday + and delivery_date.isoformat() not in deliveries + ) + + +def next_delivery_time_after_close( + trade_date: str, + trade_tz: ZoneInfo | timezone, + delivery_tz: ZoneInfo | timezone, + delivery_hour: int, +) -> datetime: + trade_day = datetime.strptime(trade_date, "%Y-%m-%d").date() + close_at = datetime.combine(trade_day, datetime_time(16, 0), tzinfo=trade_tz) + close_in_delivery_tz = close_at.astimezone(delivery_tz) + delivery_at = datetime.combine( + close_in_delivery_tz.date(), + datetime_time(delivery_hour, 0), + tzinfo=delivery_tz, + ) + if delivery_at <= close_in_delivery_tz: + delivery_at += timedelta(days=1) + return delivery_at + + +def get_china_workday_pair( + config: dict[str, Any], + delivery_now: datetime, + logger: logging.Logger, +) -> tuple[bool, bool]: + report_config = config["report"] + base_url = str( + report_config.get("holiday_api_v2", "https://api.haoshenqi.top/holiday") + ).rstrip("/") + timeout = int(report_config.get("holiday_api_timeout_seconds", 5)) + try: + today = fetch_holiday_v2_status(config, f"{base_url}/today", timeout) + yesterday = fetch_holiday_v2_status(config, f"{base_url}/yesterday", timeout) + logger.info( + "中国工作日 API v2 获取成功: today=%s, yesterday=%s", + today, + yesterday, + ) + return today, yesterday + except Exception as exc: + today = delivery_now.date() + yesterday = today - timedelta(days=1) + logger.warning( + "中国工作日 API v2 获取失败,回退到周一至周五规则: %s", + summarize_error(exc), + ) + return today.weekday() < 5, yesterday.weekday() < 5 + + +def fetch_holiday_v2_status( + config: dict[str, Any], + url: str, + timeout: int, +) -> bool: + text = http_text( + "GET", + url, + timeout=timeout, + headers={"Accept": "text/plain"}, + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), + ).decode("utf-8-sig").strip() + if text == "工作": + return True + if text == "休息": + return False + raise ValueError(f"节假日 API v2 返回未知状态: {text[:50]!r}") + + def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime: cleaned = timestamp.replace("ET", "").strip() tz = get_timezone(timezone_name) @@ -392,9 +531,14 @@ def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime: def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None: tz = get_timezone(timezone_name) + now = datetime.now(tz) close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz) if market_status.lower() == "open": raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据") + if trade_dt > now: + raise IncompleteMarketData( + f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 晚于当前交易时区时间 {now.isoformat()}" + ) if trade_dt < close_dt: raise IncompleteMarketData( f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET" @@ -449,9 +593,16 @@ def initialize_state( logger.info("state 文件初始化成功") -def update_success_state(state: dict[str, Any], market_data: MarketData, now: datetime) -> None: +def update_success_state( + state: dict[str, Any], + market_data: MarketData, + now: datetime, + scheduled_close_report: bool = False, +) -> None: beijing_date = now.date().isoformat() state["lastNewDataReportedDate"] = market_data.trade_date + if scheduled_close_report: + state["lastScheduledCloseReportedDate"] = market_data.trade_date state["lastData"] = serialize_market_data(market_data, now) state.setdefault("deliveries", {})[beijing_date] = market_data.trade_date @@ -506,20 +657,39 @@ def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Log def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str: qqbot = config["qqbot"] url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) - body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} - payload = http_json( - "POST", - url, - body=body, - timeout=15, - proxy_config=get_proxy_config(config), - proxy_authorization=proxy_authorization_header(config), + appid, appsecret = get_qqbot_credentials(config) + logger.info( + "QQBot token 请求开始: source=%s, appid=%s", + get_qqbot_credential_source(config), + mask_secret(appid), ) - token = payload.get("access_token") or payload.get("accessToken") - if not token: - raise ValueError("QQBot token 响应缺少 access_token") - logger.info("QQBot access_token 获取成功") - return str(token) + body = {"appId": appid, "clientSecret": appsecret} + retry_count = max(0, int(qqbot.get("token_retry_count", 1))) + for attempt in range(retry_count + 1): + payload = http_json( + "POST", + url, + body=body, + timeout=15, + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), + ) + token = payload.get("access_token") or payload.get("accessToken") + if token: + logger.info("QQBot access_token 获取成功") + return str(token) + code = payload.get("code", "unknown") + message = payload.get("message") or payload.get("msg") or "unknown error" + if attempt < retry_count: + logger.warning( + "QQBot token 获取失败,将重试: code=%s, message=%s", + code, + message, + ) + time.sleep(1) + continue + raise ValueError(f"QQBot token 获取失败: code={code}, message={message}") + raise RuntimeError("QQBot token 获取流程异常结束") def append_delivery_log( @@ -654,6 +824,8 @@ def curl_request( input_bytes = None if body is not None: input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8") + if not any(key.lower() == "content-type" for key in request_headers): + command.extend(["--header", "Content-Type: application/json"]) command.extend(["--data-binary", "@-"]) command.append(url) diff --git a/NdxDailyReport/test_ndx_daily_report.py b/NdxDailyReport/test_ndx_daily_report.py index 902279e..159c3b7 100644 --- a/NdxDailyReport/test_ndx_daily_report.py +++ b/NdxDailyReport/test_ndx_daily_report.py @@ -1,6 +1,7 @@ +import logging import subprocess import unittest -from datetime import time +from datetime import date, datetime, time, timedelta, timezone from unittest.mock import patch import ndx_daily_report as report @@ -22,7 +23,243 @@ class ParseTradeDatetimeTests(unittest.TestCase): def test_date_only_timestamp_passes_closed_market_validation(self) -> None: parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York") - report.validate_complete_regular_close(parsed, "Closed", "America/New_York") + with patch.object(report, "datetime", wraps=datetime) as mocked_datetime: + mocked_datetime.now.return_value = datetime(2026, 6, 5, 20, 0, tzinfo=parsed.tzinfo) + report.validate_complete_regular_close(parsed, "Closed", "America/New_York") + + def test_date_only_timestamp_is_incomplete_before_market_close(self) -> None: + parsed = report.parse_trade_datetime("Jun 5, 2026", "America/New_York") + + with patch.object(report, "datetime", wraps=datetime) as mocked_datetime: + mocked_datetime.now.return_value = datetime(2026, 6, 5, 10, 0, tzinfo=parsed.tzinfo) + with self.assertRaises(report.IncompleteMarketData): + report.validate_complete_regular_close(parsed, "Closed", "America/New_York") + + +class ReplayReportTests(unittest.TestCase): + def setUp(self) -> None: + self.beijing_tz = timezone(timedelta(hours=8)) + + def test_china_rest_day_does_not_trigger_replay(self) -> None: + now = datetime(2026, 6, 19, 8, 0, tzinfo=self.beijing_tz) + + needed = report.should_replay_report(now, {}, True, False, True, 8) + + self.assertFalse(needed) + + def test_first_workday_after_holiday_triggers_replay(self) -> None: + now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz) + + needed = report.should_replay_report(now, {}, True, True, False, 8) + + self.assertTrue(needed) + + def test_makeup_workday_after_holiday_triggers_replay(self) -> None: + now = datetime(2026, 1, 4, 8, 0, tzinfo=self.beijing_tz) + + needed = report.should_replay_report(now, {}, True, True, False, 8) + + self.assertTrue(needed) + + def test_ordinary_workday_does_not_trigger_replay(self) -> None: + now = datetime(2026, 6, 23, 8, 0, tzinfo=self.beijing_tz) + + needed = report.should_replay_report(now, {}, True, True, True, 8) + + self.assertFalse(needed) + + def test_replay_waits_until_delivery_hour(self) -> None: + now = datetime(2026, 6, 22, 7, 59, tzinfo=self.beijing_tz) + + needed = report.should_replay_report(now, {}, True, True, False, 8) + + self.assertFalse(needed) + + def test_replay_is_not_repeated_after_delivery(self) -> None: + now = datetime(2026, 6, 22, 13, 0, tzinfo=self.beijing_tz) + + needed = report.should_replay_report( + now, + {"2026-06-22": "2026-06-19"}, + True, + True, + False, + 8, + ) + + self.assertFalse(needed) + + +class DeliveryScheduleTests(unittest.TestCase): + def test_friday_close_is_scheduled_for_saturday_beijing_time(self) -> None: + trade_tz = report.get_timezone("America/New_York") + delivery_tz = report.get_timezone("Asia/Shanghai") + + scheduled = report.next_delivery_time_after_close( + "2026-06-05", + trade_tz, + delivery_tz, + 8, + ) + + self.assertEqual(scheduled.date(), date(2026, 6, 6)) + self.assertEqual(scheduled.time(), time(8, 0)) + + def test_us_close_report_schedule_is_independent_of_china_holiday(self) -> None: + trade_tz = report.get_timezone("America/New_York") + delivery_tz = report.get_timezone("Asia/Shanghai") + + scheduled = report.next_delivery_time_after_close( + "2026-06-18", + trade_tz, + delivery_tz, + 8, + ) + + self.assertEqual(scheduled.date(), date(2026, 6, 19)) + + +class HolidayApiTests(unittest.TestCase): + def setUp(self) -> None: + self.config = { + "report": { + "holiday_api_v2": "https://api.haoshenqi.top/holiday", + "holiday_api_timeout_seconds": 5, + }, + "network": {"transport": "urllib", "use_environment_proxy": False}, + } + self.logger = logging.getLogger("holiday-api-test") + self.beijing_tz = timezone(timedelta(hours=8)) + + @patch("ndx_daily_report.fetch_holiday_v2_status") + def test_v2_requests_today_and_yesterday(self, fetch_status) -> None: + fetch_status.side_effect = [True, False] + now = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz) + + result = report.get_china_workday_pair(self.config, now, self.logger) + + self.assertEqual(result, (True, False)) + self.assertEqual( + [call.args[1] for call in fetch_status.call_args_list], + [ + "https://api.haoshenqi.top/holiday/today", + "https://api.haoshenqi.top/holiday/yesterday", + ], + ) + + @patch("ndx_daily_report.fetch_holiday_v2_status", side_effect=RuntimeError("offline")) + def test_api_failure_falls_back_to_weekdays(self, _fetch_status) -> None: + monday = datetime(2026, 6, 22, 8, 0, tzinfo=self.beijing_tz) + + result = report.get_china_workday_pair(self.config, monday, self.logger) + + self.assertEqual(result, (True, False)) + + @patch("ndx_daily_report.http_text", return_value="工作".encode()) + def test_v2_work_response_is_workday(self, _http_text) -> None: + self.assertTrue( + report.fetch_holiday_v2_status( + self.config, + "https://api.haoshenqi.top/holiday/today", + 5, + ) + ) + + @patch("ndx_daily_report.http_text", return_value="休息".encode()) + def test_v2_rest_response_is_not_workday(self, _http_text) -> None: + self.assertFalse( + report.fetch_holiday_v2_status( + self.config, + "https://api.haoshenqi.top/holiday/today", + 5, + ) + ) + + @patch("ndx_daily_report.http_text", return_value=b"unknown") + def test_unknown_v2_response_is_rejected(self, _http_text) -> None: + with self.assertRaises(ValueError): + report.fetch_holiday_v2_status( + self.config, + "https://api.haoshenqi.top/holiday/today", + 5, + ) + + +class QQBotTokenTests(unittest.TestCase): + def setUp(self) -> None: + self.config = { + "qqbot": { + "appid": "config-appid", + "appsecret": "config-secret", + "token_api": "https://bots.qq.com/app/getAppAccessToken", + }, + "network": {"transport": "urllib", "use_environment_proxy": False}, + } + self.logger = logging.getLogger("qqbot-token-test") + + @patch.dict( + "os.environ", + {"QQBOT_APPID": "env-appid", "QQBOT_SECRET": "env-secret"}, + clear=False, + ) + def test_environment_credentials_override_config(self) -> None: + self.assertEqual( + report.get_qqbot_credentials(self.config), + ("env-appid", "env-secret"), + ) + self.assertEqual( + report.get_qqbot_credential_source(self.config), + "environment", + ) + + @patch("ndx_daily_report.http_json") + def test_token_request_uses_official_field_names(self, http_json) -> None: + http_json.return_value = {"access_token": "token", "expires_in": "7200"} + + token = report.get_qqbot_token(self.config, self.logger) + + self.assertEqual(token, "token") + self.assertEqual( + http_json.call_args.kwargs["body"], + {"appId": "config-appid", "clientSecret": "config-secret"}, + ) + + @patch("ndx_daily_report.http_json") + def test_token_error_includes_qq_error_code_and_message(self, http_json) -> None: + http_json.return_value = {"code": 100007, "message": "appid invalid"} + self.config["qqbot"]["token_retry_count"] = 0 + + with self.assertRaisesRegex( + ValueError, + "code=100007, message=appid invalid", + ): + report.get_qqbot_token(self.config, self.logger) + + @patch("ndx_daily_report.time.sleep") + @patch("ndx_daily_report.http_json") + def test_token_request_retries_one_transient_error(self, http_json, sleep) -> None: + http_json.side_effect = [ + {"code": 100007, "message": "appid invalid"}, + {"access_token": "token", "expires_in": "7200"}, + ] + + token = report.get_qqbot_token(self.config, self.logger) + + self.assertEqual(token, "token") + self.assertEqual(http_json.call_count, 2) + sleep.assert_called_once_with(1) + + def test_legacy_appkey_remains_supported(self) -> None: + config = {"qqbot": {"appid": "appid", "appkey": "legacy-secret"}} + + self.assertEqual( + report.get_qqbot_credentials(config), + ("appid", "legacy-secret"), + ) + self.assertEqual( + report.get_qqbot_credential_source(config), + "config.appkey(legacy)", + ) class CurlRequestTests(unittest.TestCase): @@ -48,6 +285,7 @@ class CurlRequestTests(unittest.TestCase): command = run.call_args.args[0] self.assertNotIn("--retry", command) + self.assertIn("Content-Type: application/json", command) if __name__ == "__main__":