diff --git a/NdxDailyReport/config.example.yaml b/NdxDailyReport/config.example.yaml index 551e63c..308a6ce 100644 --- a/NdxDailyReport/config.example.yaml +++ b/NdxDailyReport/config.example.yaml @@ -9,9 +9,13 @@ nasdaq: request_timeout_seconds: 15 network: - # false 表示 urllib 不读取 HTTP_PROXY/HTTPS_PROXY 等环境代理。 - # 某些代理会重置 CONNECT 连接,定时脚本默认直连更稳定。 - use_environment_proxy: false + # true 表示允许检测 HTTP_PROXY/HTTPS_PROXY 或系统代理。 + # transport: auto 检测到环境代理时会默认改用 curl,避免 urllib 自己处理 CONNECT。 + use_environment_proxy: true + # 可选。直连超时时可显式指定代理;真实账号密码不要提交到仓库。 + proxy_url: "" + # auto 会在 https 代理或环境代理场景下使用 curl。 + transport: auto qqbot: appid: "用户填写" diff --git a/NdxDailyReport/ndx_daily_report.py b/NdxDailyReport/ndx_daily_report.py index f1b58f3..1283bb7 100644 --- a/NdxDailyReport/ndx_daily_report.py +++ b/NdxDailyReport/ndx_daily_report.py @@ -4,16 +4,20 @@ from __future__ import annotations import csv +import base64 import json import logging import os import re +import shutil +import subprocess import sys import time import urllib.error +import urllib.parse import urllib.request from dataclasses import dataclass -from datetime import datetime, timedelta, timezone +from datetime import datetime, time as datetime_time, timedelta, timezone from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from pathlib import Path from typing import Any @@ -35,6 +39,10 @@ class SilentExit(Exception): """Expected silent stop after writing runtime log.""" +class IncompleteMarketData(ValueError): + """The upstream data is valid but not a complete regular close yet.""" + + @dataclass(frozen=True) class MarketData: trade_date: str @@ -255,6 +263,9 @@ def list_recent_text_files(base: Path) -> list[Path]: def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData: try: return fetch_nasdaq_quote(config, logger) + except IncompleteMarketData as exc: + logger.warning("Nasdaq quote API 尚无完整收盘数据: %s", summarize_error(exc)) + raise SilentExit("尚无最近完整美股常规交易日收盘数据") except Exception as exc: logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc)) return fetch_fred_csv(config, logger) @@ -264,10 +275,17 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market url = str(config["nasdaq"]["quote_api"]) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) logger.info("Nasdaq quote API 请求开始") - payload = http_json("GET", url, timeout=timeout, use_environment_proxy=use_environment_proxy(config)) + payload = http_json( + "GET", + url, + timeout=timeout, + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), + ) data = payload.get("data", {}) primary = data.get("primaryData", {}) key_stats = data.get("keyStats", {}) + market_status = str(data.get("marketStatus", "")).strip() close = parse_decimal(primary.get("lastSalePrice")) change = parse_decimal(primary.get("netChange")) @@ -277,10 +295,14 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market if not timestamp: raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp") - trade_date = parse_trade_date(timestamp, config["report"].get("timezone_trade", "America/New_York")) + trade_timezone = config["report"].get("timezone_trade", "America/New_York") + trade_dt = parse_trade_datetime(timestamp, trade_timezone) + validate_complete_regular_close(trade_dt, market_status, trade_timezone) + trade_date = trade_dt.date().isoformat() logger.info( - "NDX 字段解析结果: tradeDate=%s close=%s change=%s changePercent=%s previousClose=%s", + "NDX 字段解析结果: tradeDate=%s marketStatus=%s close=%s change=%s changePercent=%s previousClose=%s", trade_date, + market_status, money(close), signed(change), signed(change_percent), @@ -307,10 +329,14 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData url = str(config["nasdaq"]["fred_csv"]) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) logger.info("FRED CSV 请求开始") - request = urllib.request.Request(url, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}) - opener = build_url_opener(use_environment_proxy(config)) - with opener.open(request, timeout=timeout) as response: - text = response.read().decode("utf-8-sig") + text = http_text( + "GET", + url, + timeout=timeout, + headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}, + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), + ).decode("utf-8-sig") rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}] if len(rows) < 2: raise ValueError("FRED CSV 有效数据不足") @@ -332,21 +358,36 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData def parse_trade_date(timestamp: str, timezone_name: str) -> str: + return parse_trade_datetime(timestamp, timezone_name).date().isoformat() + + +def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime: cleaned = timestamp.replace("ET", "").strip() formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"] tz = get_timezone(timezone_name) for fmt in formats: try: dt = datetime.strptime(cleaned, fmt) - return dt.replace(tzinfo=tz).date().isoformat() + return dt.replace(tzinfo=tz) except ValueError: continue try: - return datetime.fromisoformat(cleaned).astimezone(tz).date().isoformat() + return datetime.fromisoformat(cleaned).astimezone(tz) except ValueError as exc: raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc +def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None: + tz = get_timezone(timezone_name) + close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz) + if market_status.lower() == "open": + raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据") + if trade_dt < close_dt: + raise IncompleteMarketData( + f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET" + ) + + def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone: try: return ZoneInfo(timezone_name) @@ -443,7 +484,8 @@ def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Log body=body, timeout=15, headers={"Authorization": f"QQBot {token}"}, - use_environment_proxy=use_environment_proxy(config), + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), ) logger.info("QQBot 投递成功") @@ -452,7 +494,14 @@ def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str: qqbot = config["qqbot"] url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} - payload = http_json("POST", url, body=body, timeout=15, use_environment_proxy=use_environment_proxy(config)) + payload = http_json( + "POST", + url, + body=body, + timeout=15, + proxy_config=get_proxy_config(config), + proxy_authorization=proxy_authorization_header(config), + ) token = payload.get("access_token") or payload.get("accessToken") if not token: raise ValueError("QQBot token 响应缺少 access_token") @@ -494,37 +543,154 @@ def http_json( body: dict[str, Any] | None = None, timeout: int = 15, headers: dict[str, str] | None = None, - use_environment_proxy: bool = False, + proxy_config: dict[str, Any] | None = None, + proxy_authorization: str | None = None, ) -> dict[str, Any]: - request_headers = dict(DEFAULT_HEADERS) - if headers: - request_headers.update(headers) - data = None - if body is not None: - data = json.dumps(body, ensure_ascii=False).encode("utf-8") - request_headers["Content-Type"] = "application/json" - request = urllib.request.Request(url, data=data, headers=request_headers, method=method) - opener = build_url_opener(use_environment_proxy) - try: - with opener.open(request, timeout=timeout) as response: - text = response.read().decode("utf-8") - except urllib.error.HTTPError as exc: - detail = exc.read().decode("utf-8", errors="replace")[:300] - raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc + text = http_text( + method, + url, + body=body, + timeout=timeout, + headers=headers, + proxy_config=proxy_config, + proxy_authorization=proxy_authorization, + ).decode("utf-8") parsed = json.loads(text) if not isinstance(parsed, dict): raise ValueError("JSON response root is not object") return parsed -def build_url_opener(use_proxy: bool) -> urllib.request.OpenerDirector: - if use_proxy: +def http_text( + method: str, + url: str, + body: dict[str, Any] | None = None, + timeout: int = 15, + headers: dict[str, str] | None = None, + proxy_config: dict[str, Any] | None = None, + proxy_authorization: str | None = None, +) -> bytes: + proxy_config = proxy_config or {} + if should_use_curl(proxy_config): + return curl_request(method, url, body, timeout, headers, proxy_config) + request_headers = dict(DEFAULT_HEADERS) + if headers: + request_headers.update(headers) + if proxy_authorization: + request_headers["Proxy-Authorization"] = proxy_authorization + data = None + if body is not None: + data = json.dumps(body, ensure_ascii=False).encode("utf-8") + request_headers["Content-Type"] = "application/json" + request = urllib.request.Request(url, data=data, headers=request_headers, method=method) + opener = build_url_opener(proxy_config or {}) + try: + with opener.open(request, timeout=timeout) as response: + return response.read() + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace")[:300] + raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc + + +def curl_request( + method: str, + url: str, + body: dict[str, Any] | None, + timeout: int, + headers: dict[str, str] | None, + proxy_config: dict[str, Any], +) -> bytes: + curl = shutil.which("curl.exe") or shutil.which("curl") + if not curl: + raise RuntimeError("network.transport 需要 curl,但当前环境未找到 curl") + + command = [ + curl, + "--silent", + "--show-error", + "--fail-with-body", + "--max-time", + str(timeout), + "--request", + method, + ] + proxy_url = str(proxy_config.get("proxy_url", "")).strip() + if proxy_url: + command.extend(["--proxy", proxy_url]) + + request_headers = dict(DEFAULT_HEADERS) + if headers: + request_headers.update(headers) + for key, value in request_headers.items(): + command.extend(["--header", f"{key}: {value}"]) + + input_bytes = None + if body is not None: + input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8") + command.extend(["--data-binary", "@-"]) + + command.append(url) + completed = subprocess.run( + command, + input=input_bytes, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=False, + ) + if completed.returncode != 0: + error = completed.stderr.decode("utf-8", errors="replace").strip() + if not error: + error = completed.stdout.decode("utf-8", errors="replace")[:300] + raise RuntimeError(f"curl request failed: {error[:300]}") + return completed.stdout + + +def should_use_curl(proxy_config: dict[str, Any]) -> bool: + transport = str(proxy_config.get("transport", "auto")).strip().lower() + proxy_url = str(proxy_config.get("proxy_url", "")).strip().lower() + if transport == "curl": + return True + if transport == "urllib": + return False + if proxy_url.startswith("https://"): + return True + return bool(proxy_config.get("use_environment_proxy", True)) and has_environment_proxy() + + +def has_environment_proxy() -> bool: + proxies = urllib.request.getproxies() + return any(key.lower() in {"http", "https", "all"} and value for key, value in proxies.items()) + + +def build_url_opener(proxy_config: dict[str, Any]) -> urllib.request.OpenerDirector: + proxy_url = str(proxy_config.get("proxy_url", "")).strip() + if proxy_url: + return urllib.request.build_opener( + urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url}) + ) + if bool(proxy_config.get("use_environment_proxy", False)): return urllib.request.build_opener() return urllib.request.build_opener(urllib.request.ProxyHandler({})) -def use_environment_proxy(config: dict[str, Any]) -> bool: - return bool(config.get("network", {}).get("use_environment_proxy", False)) +def get_proxy_config(config: dict[str, Any]) -> dict[str, Any]: + network = config.get("network", {}) + if not isinstance(network, dict): + return {} + return network + + +def proxy_authorization_header(config: dict[str, Any]) -> str | None: + proxy_url = str(get_proxy_config(config).get("proxy_url", "")).strip() + if not proxy_url: + return None + parsed = urllib.parse.urlsplit(proxy_url) + if not parsed.username: + return None + username = urllib.parse.unquote(parsed.username) + password = urllib.parse.unquote(parsed.password or "") + token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii") + return f"Basic {token}" def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: