修正 NDX 完整收盘判断和代理请求策略

新增 Nasdaq quote API 完整收盘校验:当 marketStatus 仍为 Open,或 lastTradeTimestamp 早于美东常规收盘时间 16:00 时,不再视为完整收盘数据,也不会提前推进 lastNewDataReportedDate。

盘中未完整数据现在会静默结束并写入 runtime log,不再 fallback 到 FRED,避免北京时间周五夜间误记录盘中数据,确保北京时间周六 08:00 能按新数据报告规则推送美东周五收盘。

新增 curl 网络后端和 transport:auto 策略:显式 HTTPS 代理或检测到环境代理时默认使用 curl,绕开 urllib 对代理 CONNECT 的兼容问题;示例配置仅保留代理占位,不提交真实代理账号密码。
This commit is contained in:
2026-06-06 00:53:13 +08:00
parent 4a592cf457
commit 792922e06f
2 changed files with 205 additions and 35 deletions
+7 -3
View File
@@ -9,9 +9,13 @@ nasdaq:
request_timeout_seconds: 15 request_timeout_seconds: 15
network: network:
# false 表示 urllib 不读取 HTTP_PROXY/HTTPS_PROXY 等环境代理。 # true 表示允许检测 HTTP_PROXY/HTTPS_PROXY 或系统代理。
# 某些代理会重置 CONNECT 连接,定时脚本默认直连更稳定 # transport: auto 检测到环境代理时会默认改用 curl,避免 urllib 自己处理 CONNECT
use_environment_proxy: false use_environment_proxy: true
# 可选。直连超时时可显式指定代理;真实账号密码不要提交到仓库。
proxy_url: ""
# auto 会在 https 代理或环境代理场景下使用 curl。
transport: auto
qqbot: qqbot:
appid: "用户填写" appid: "用户填写"
+198 -32
View File
@@ -4,16 +4,20 @@
from __future__ import annotations from __future__ import annotations
import csv import csv
import base64
import json import json
import logging import logging
import os import os
import re import re
import shutil
import subprocess
import sys import sys
import time import time
import urllib.error import urllib.error
import urllib.parse
import urllib.request import urllib.request
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, time as datetime_time, timedelta, timezone
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -35,6 +39,10 @@ class SilentExit(Exception):
"""Expected silent stop after writing runtime log.""" """Expected silent stop after writing runtime log."""
class IncompleteMarketData(ValueError):
"""The upstream data is valid but not a complete regular close yet."""
@dataclass(frozen=True) @dataclass(frozen=True)
class MarketData: class MarketData:
trade_date: str trade_date: str
@@ -255,6 +263,9 @@ def list_recent_text_files(base: Path) -> list[Path]:
def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData: def fetch_market_data(config: dict[str, Any], logger: logging.Logger) -> MarketData:
try: try:
return fetch_nasdaq_quote(config, logger) return fetch_nasdaq_quote(config, logger)
except IncompleteMarketData as exc:
logger.warning("Nasdaq quote API 尚无完整收盘数据: %s", summarize_error(exc))
raise SilentExit("尚无最近完整美股常规交易日收盘数据")
except Exception as exc: except Exception as exc:
logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc)) logger.warning("fallback 到 FRED 的原因: %s", summarize_error(exc))
return fetch_fred_csv(config, logger) return fetch_fred_csv(config, logger)
@@ -264,10 +275,17 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market
url = str(config["nasdaq"]["quote_api"]) url = str(config["nasdaq"]["quote_api"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("Nasdaq quote API 请求开始") logger.info("Nasdaq quote API 请求开始")
payload = http_json("GET", url, timeout=timeout, use_environment_proxy=use_environment_proxy(config)) payload = http_json(
"GET",
url,
timeout=timeout,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
data = payload.get("data", {}) data = payload.get("data", {})
primary = data.get("primaryData", {}) primary = data.get("primaryData", {})
key_stats = data.get("keyStats", {}) key_stats = data.get("keyStats", {})
market_status = str(data.get("marketStatus", "")).strip()
close = parse_decimal(primary.get("lastSalePrice")) close = parse_decimal(primary.get("lastSalePrice"))
change = parse_decimal(primary.get("netChange")) change = parse_decimal(primary.get("netChange"))
@@ -277,10 +295,14 @@ def fetch_nasdaq_quote(config: dict[str, Any], logger: logging.Logger) -> Market
if not timestamp: if not timestamp:
raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp") raise ValueError("Nasdaq quote API 缺少 lastTradeTimestamp")
trade_date = parse_trade_date(timestamp, config["report"].get("timezone_trade", "America/New_York")) trade_timezone = config["report"].get("timezone_trade", "America/New_York")
trade_dt = parse_trade_datetime(timestamp, trade_timezone)
validate_complete_regular_close(trade_dt, market_status, trade_timezone)
trade_date = trade_dt.date().isoformat()
logger.info( logger.info(
"NDX 字段解析结果: tradeDate=%s close=%s change=%s changePercent=%s previousClose=%s", "NDX 字段解析结果: tradeDate=%s marketStatus=%s close=%s change=%s changePercent=%s previousClose=%s",
trade_date, trade_date,
market_status,
money(close), money(close),
signed(change), signed(change),
signed(change_percent), signed(change_percent),
@@ -307,10 +329,14 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData
url = str(config["nasdaq"]["fred_csv"]) url = str(config["nasdaq"]["fred_csv"])
timeout = int(config["nasdaq"].get("request_timeout_seconds", 15)) timeout = int(config["nasdaq"].get("request_timeout_seconds", 15))
logger.info("FRED CSV 请求开始") logger.info("FRED CSV 请求开始")
request = urllib.request.Request(url, headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]}) text = http_text(
opener = build_url_opener(use_environment_proxy(config)) "GET",
with opener.open(request, timeout=timeout) as response: url,
text = response.read().decode("utf-8-sig") timeout=timeout,
headers={"User-Agent": DEFAULT_HEADERS["User-Agent"]},
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
).decode("utf-8-sig")
rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}] rows = [row for row in csv.DictReader(text.splitlines()) if row.get("NASDAQ100") not in {"", "."}]
if len(rows) < 2: if len(rows) < 2:
raise ValueError("FRED CSV 有效数据不足") raise ValueError("FRED CSV 有效数据不足")
@@ -332,21 +358,36 @@ def fetch_fred_csv(config: dict[str, Any], logger: logging.Logger) -> MarketData
def parse_trade_date(timestamp: str, timezone_name: str) -> str: def parse_trade_date(timestamp: str, timezone_name: str) -> str:
return parse_trade_datetime(timestamp, timezone_name).date().isoformat()
def parse_trade_datetime(timestamp: str, timezone_name: str) -> datetime:
cleaned = timestamp.replace("ET", "").strip() cleaned = timestamp.replace("ET", "").strip()
formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"] formats = ["%b %d, %Y %I:%M %p", "%m/%d/%Y %I:%M %p", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"]
tz = get_timezone(timezone_name) tz = get_timezone(timezone_name)
for fmt in formats: for fmt in formats:
try: try:
dt = datetime.strptime(cleaned, fmt) dt = datetime.strptime(cleaned, fmt)
return dt.replace(tzinfo=tz).date().isoformat() return dt.replace(tzinfo=tz)
except ValueError: except ValueError:
continue continue
try: try:
return datetime.fromisoformat(cleaned).astimezone(tz).date().isoformat() return datetime.fromisoformat(cleaned).astimezone(tz)
except ValueError as exc: except ValueError as exc:
raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc raise ValueError(f"无法解析 lastTradeTimestamp: {timestamp}") from exc
def validate_complete_regular_close(trade_dt: datetime, market_status: str, timezone_name: str) -> None:
tz = get_timezone(timezone_name)
close_dt = datetime.combine(trade_dt.date(), datetime_time(16, 0), tzinfo=tz)
if market_status.lower() == "open":
raise IncompleteMarketData("Nasdaq quote API 仍显示 marketStatus=Open,不视为完整收盘数据")
if trade_dt < close_dt:
raise IncompleteMarketData(
f"Nasdaq quote API lastTradeTimestamp={trade_dt.isoformat()} 早于常规收盘时间 16:00 ET"
)
def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone: def get_timezone(timezone_name: str, logger: logging.Logger | None = None) -> ZoneInfo | timezone:
try: try:
return ZoneInfo(timezone_name) return ZoneInfo(timezone_name)
@@ -443,7 +484,8 @@ def send_qqbot_message(config: dict[str, Any], content: str, logger: logging.Log
body=body, body=body,
timeout=15, timeout=15,
headers={"Authorization": f"QQBot {token}"}, headers={"Authorization": f"QQBot {token}"},
use_environment_proxy=use_environment_proxy(config), proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
) )
logger.info("QQBot 投递成功") logger.info("QQBot 投递成功")
@@ -452,7 +494,14 @@ def get_qqbot_token(config: dict[str, Any], logger: logging.Logger) -> str:
qqbot = config["qqbot"] qqbot = config["qqbot"]
url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken")) url = str(qqbot.get("token_api", "https://bots.qq.com/app/getAppAccessToken"))
body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])} body = {"appId": str(qqbot["appid"]), "clientSecret": str(qqbot["appkey"])}
payload = http_json("POST", url, body=body, timeout=15, use_environment_proxy=use_environment_proxy(config)) payload = http_json(
"POST",
url,
body=body,
timeout=15,
proxy_config=get_proxy_config(config),
proxy_authorization=proxy_authorization_header(config),
)
token = payload.get("access_token") or payload.get("accessToken") token = payload.get("access_token") or payload.get("accessToken")
if not token: if not token:
raise ValueError("QQBot token 响应缺少 access_token") raise ValueError("QQBot token 响应缺少 access_token")
@@ -494,37 +543,154 @@ def http_json(
body: dict[str, Any] | None = None, body: dict[str, Any] | None = None,
timeout: int = 15, timeout: int = 15,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
use_environment_proxy: bool = False, proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
request_headers = dict(DEFAULT_HEADERS) text = http_text(
if headers: method,
request_headers.update(headers) url,
data = None body=body,
if body is not None: timeout=timeout,
data = json.dumps(body, ensure_ascii=False).encode("utf-8") headers=headers,
request_headers["Content-Type"] = "application/json" proxy_config=proxy_config,
request = urllib.request.Request(url, data=data, headers=request_headers, method=method) proxy_authorization=proxy_authorization,
opener = build_url_opener(use_environment_proxy) ).decode("utf-8")
try:
with opener.open(request, timeout=timeout) as response:
text = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
parsed = json.loads(text) parsed = json.loads(text)
if not isinstance(parsed, dict): if not isinstance(parsed, dict):
raise ValueError("JSON response root is not object") raise ValueError("JSON response root is not object")
return parsed return parsed
def build_url_opener(use_proxy: bool) -> urllib.request.OpenerDirector: def http_text(
if use_proxy: method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 15,
headers: dict[str, str] | None = None,
proxy_config: dict[str, Any] | None = None,
proxy_authorization: str | None = None,
) -> bytes:
proxy_config = proxy_config or {}
if should_use_curl(proxy_config):
return curl_request(method, url, body, timeout, headers, proxy_config)
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
if proxy_authorization:
request_headers["Proxy-Authorization"] = proxy_authorization
data = None
if body is not None:
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
request_headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=request_headers, method=method)
opener = build_url_opener(proxy_config or {})
try:
with opener.open(request, timeout=timeout) as response:
return response.read()
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
def curl_request(
method: str,
url: str,
body: dict[str, Any] | None,
timeout: int,
headers: dict[str, str] | None,
proxy_config: dict[str, Any],
) -> bytes:
curl = shutil.which("curl.exe") or shutil.which("curl")
if not curl:
raise RuntimeError("network.transport 需要 curl,但当前环境未找到 curl")
command = [
curl,
"--silent",
"--show-error",
"--fail-with-body",
"--max-time",
str(timeout),
"--request",
method,
]
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
command.extend(["--proxy", proxy_url])
request_headers = dict(DEFAULT_HEADERS)
if headers:
request_headers.update(headers)
for key, value in request_headers.items():
command.extend(["--header", f"{key}: {value}"])
input_bytes = None
if body is not None:
input_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8")
command.extend(["--data-binary", "@-"])
command.append(url)
completed = subprocess.run(
command,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if completed.returncode != 0:
error = completed.stderr.decode("utf-8", errors="replace").strip()
if not error:
error = completed.stdout.decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"curl request failed: {error[:300]}")
return completed.stdout
def should_use_curl(proxy_config: dict[str, Any]) -> bool:
transport = str(proxy_config.get("transport", "auto")).strip().lower()
proxy_url = str(proxy_config.get("proxy_url", "")).strip().lower()
if transport == "curl":
return True
if transport == "urllib":
return False
if proxy_url.startswith("https://"):
return True
return bool(proxy_config.get("use_environment_proxy", True)) and has_environment_proxy()
def has_environment_proxy() -> bool:
proxies = urllib.request.getproxies()
return any(key.lower() in {"http", "https", "all"} and value for key, value in proxies.items())
def build_url_opener(proxy_config: dict[str, Any]) -> urllib.request.OpenerDirector:
proxy_url = str(proxy_config.get("proxy_url", "")).strip()
if proxy_url:
return urllib.request.build_opener(
urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url})
)
if bool(proxy_config.get("use_environment_proxy", False)):
return urllib.request.build_opener() return urllib.request.build_opener()
return urllib.request.build_opener(urllib.request.ProxyHandler({})) return urllib.request.build_opener(urllib.request.ProxyHandler({}))
def use_environment_proxy(config: dict[str, Any]) -> bool: def get_proxy_config(config: dict[str, Any]) -> dict[str, Any]:
return bool(config.get("network", {}).get("use_environment_proxy", False)) network = config.get("network", {})
if not isinstance(network, dict):
return {}
return network
def proxy_authorization_header(config: dict[str, Any]) -> str | None:
proxy_url = str(get_proxy_config(config).get("proxy_url", "")).strip()
if not proxy_url:
return None
parsed = urllib.parse.urlsplit(proxy_url)
if not parsed.username:
return None
username = urllib.parse.unquote(parsed.username)
password = urllib.parse.unquote(parsed.password or "")
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
return f"Basic {token}"
def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: def serialize_market_data(data: MarketData, now: datetime) -> dict[str, str]: