From 114c7bc2f3ac8c5366521c04933cd915890468b7 Mon Sep 17 00:00:00 2001 From: huangqizhen <15552608129@163.com> Date: Tue, 30 Dec 2025 11:00:59 +0800 Subject: [PATCH] =?UTF-8?q?12.30=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 19 +++ bailian.py | 525 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ hangye.py | 525 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ zixun.py | 525 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1594 insertions(+) create mode 100644 Dockerfile create mode 100644 bailian.py create mode 100644 hangye.py create mode 100644 zixun.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4df878f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.14-slim + +WORKDIR /app + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# 如果不需要编译 C 扩展,可以跳过 gcc 安装 +# 只复制 requirements.txt 并安装 Python 依赖 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY ./app . + +RUN useradd -m -r appuser && chown -R appuser /app +USER appuser + +EXPOSE 8000 +CMD ["python", "main.py"] \ No newline at end of file diff --git a/bailian.py b/bailian.py new file mode 100644 index 0000000..b13785e --- /dev/null +++ b/bailian.py @@ -0,0 +1,525 @@ +# marketmatrix_today_upload_anti_bot.py +# 功能:抓今日新闻(去重)+ 上传知识库 + 输出 Excel + 【高抗反爬 + I/O 监控】 + +import os +import re +import json +import hashlib +import time +import random +from datetime import datetime, timedelta +from urllib.parse import urljoin, urlparse +from contextlib import contextmanager + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from webdriver_manager.chrome import ChromeDriverManager + +# ====== 配置区 ====== +KB_API_URL = "https://dcapi.homilychart.com/prod/deepchartapi/api/QwenKnowledge/add" +KB_TOKEN = "d20287d0bb0298c73e540da7e3e1d7e3" +KB_INDEX_ID = "30xe1fbox1" + +BASE_URL = "http://marketmatrix.net" +LIST_URL = urljoin(BASE_URL, "/news.htm") +OUTPUT_DIR = "today_news" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +OUTPUT_EXCEL = os.path.join(OUTPUT_DIR, f"today_{datetime.now().strftime('%Y%m%d')}.xlsx") +DUPLICATE_CACHE_FILE = os.path.join(OUTPUT_DIR, "today_history.json") + +# 关键:持久化浏览器配置目录 +PERSISTENT_PROFILE_DIR = os.path.join(os.getcwd(), "bailian_config") +PROFILE_CLEAN_THRESHOLD_DAYS = 1 # 1天以上自动清理 + +MAX_PAGES = 30 # 适当降低,防深层页风控 +print(f"📅 系统当前日期: {datetime.now().strftime('%Y-%m-%d')}") + +# ====== 全局 driver 单例(✅ 反爬核心)====== +_driver_instance = None + + +def stealth_driver(driver): + """注入 Stealth JS,绕过常见 Bot 检测""" + try: + # 移除 webdriver 标志 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + # 隐藏语言特征 + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") + # 隐藏插件列表 + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]})") + # 隐藏硬件并发数(伪装普通设备) + driver.execute_script("Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8})") + # 隐藏设备内存(GB) + driver.execute_script("Object.defineProperty(navigator, 'deviceMemory', {get: () => 8})") + print("🛡️ Stealth JS 注入成功") + except Exception as e: + print(f"⚠️ Stealth JS 注入失败: {e}") + + +def init_persistent_driver(): + """初始化持久化 Chrome 实例(仅首次调用)""" + global _driver_instance + if _driver_instance is not None: + return _driver_instance + + print("🔧 初始化持久化浏览器(首次启动,需过反爬,请稍候...)") + + # 自动清理过期 Profile + _clean_old_profile() + + # 配置 Chrome + chrome_options = Options() + chrome_options.add_argument(f"--user-data-dir={PERSISTENT_PROFILE_DIR}") + chrome_options.add_argument("--profile-directory=Default") + chrome_options.add_argument("--headless=new") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--disable-blink-features=AutomationControlled") + chrome_options.add_argument("--disable-extensions") + chrome_options.add_argument("--disable-plugins-discovery") + chrome_options.add_argument("--disable-features=VizDisplayCompositor") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_options.add_experimental_option('useAutomationExtension', False) + # chrome_options.binary_location = "/usr/bin/chromium-browser" + # 启动 + service = Service(ChromeDriverManager().install()) + try: + _driver_instance = webdriver.Chrome(service=service, options=chrome_options) + _driver_instance.set_page_load_timeout(30) + _driver_instance.implicitly_wait(10) + + # 关键:访问首页“暖机”,触发反爬验证 + print("🌐 正在访问首页以通过反爬验证...") + _driver_instance.get(BASE_URL) + time.sleep(2 + random.uniform(0.5, 1.5)) + + # 注入 Stealth + stealth_driver(_driver_instance) + + # 再访问一次列表页,确保状态稳定 + _driver_instance.get(LIST_URL) + time.sleep(1.5 + random.uniform(0.5, 1.0)) + + print("✅ 浏览器初始化完成,后续请求将复用上下文") + return _driver_instance + except Exception as e: + if _driver_instance: + _driver_instance.quit() + _driver_instance = None + raise RuntimeError(f"浏览器初始化失败: {e}") + + +def _clean_old_profile(): + """清理过期 Profile(防积累)""" + if not os.path.exists(PERSISTENT_PROFILE_DIR): + return + try: + profile_age = time.time() - os.path.getctime(PERSISTENT_PROFILE_DIR) + if profile_age > PROFILE_CLEAN_THRESHOLD_DAYS * 86400: + print(f"🧹 Profile 目录已超 {PROFILE_CLEAN_THRESHOLD_DAYS} 天,正在清理...") + import shutil + shutil.rmtree(PERSISTENT_PROFILE_DIR, ignore_errors=True) + os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) + print("✅ 已重建干净 Profile") + except Exception as e: + print(f"⚠️ Profile 清理失败(继续使用现有): {e}") + + +def fetch_page(url, max_retries=2): + """带重试的页面获取(✅ 复用全局 driver)""" + global _driver_instance + for attempt in range(max_retries + 1): + try: + if _driver_instance is None: + _driver_instance = init_persistent_driver() + + # 拟人化:随机滚动 + 延迟 + _driver_instance.get(url) + time.sleep(0.8 + random.uniform(0.3, 0.7)) + + # 检查是否被拦截 + page_source = _driver_instance.page_source + if "403 Forbidden" in page_source or "challenge-platform" in page_source.lower(): + raise Exception("被反爬拦截(403/Challenge)") + + return page_source + + except Exception as e: + print(f" ⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}") + if attempt < max_retries: + time.sleep(3 + random.uniform(1, 2)) + # 重启 driver(极端情况) + if _driver_instance: + try: + _driver_instance.quit() + except: + pass + _driver_instance = None + else: + return None + + +# ====== I/O 监控器(保留)====== +class IOMonitor: + def __init__(self): + self.records = {} + + @contextmanager + def io_timer(self, io_type: str, desc: str = ""): + if io_type not in self.records: + self.records[io_type] = [] + start = time.perf_counter() + try: + yield + finally: + duration = time.perf_counter() - start + self.records[io_type].append((duration, desc)) + + def summary(self): + print("\n" + "=" * 60) + print("📊 I/O 耗时总览(反爬优化版)") + print("=" * 60) + total_time = 0.0 + for io_type, records in sorted(self.records.items()): + count = len(records) + total = sum(t for t, _ in records) + avg = total / count if count else 0 + total_time += total + print(f"✅ {io_type:<15} | 调用 {count:2d} 次 | 总耗时 {total:6.2f}s | 平均 {avg:5.3f}s") + if count > 3: + slowest = sorted(records, key=lambda x: x[0], reverse=True)[:2] + for i, (t, d) in enumerate(slowest, 1): + print(f" └─ #{i} 慢: {t:5.2f}s → {d}") + print("-" * 60) + print(f"⏱️ I/O 总耗时: {total_time:6.2f}s") + print("=" * 60) + + +monitor = IOMonitor() + + +# ====== 工具函数(加监控)====== +def load_history(): + with monitor.io_timer("file_read", f"load_history: {DUPLICATE_CACHE_FILE}"): + if os.path.exists(DUPLICATE_CACHE_FILE): + try: + with open(DUPLICATE_CACHE_FILE, 'r', encoding='utf-8') as f: + return set(json.load(f)) + except Exception as e: + print(f"⚠️ 历史缓存加载失败: {e}") + return set() + + +def save_history(history_set): + with monitor.io_timer("file_write", f"save_history: {DUPLICATE_CACHE_FILE}"): + try: + with open(DUPLICATE_CACHE_FILE, 'w', encoding='utf-8') as f: + json.dump(list(history_set), f, ensure_ascii=False, indent=2) + print(f"💾 缓存已更新:{len(history_set)} 条") + except Exception as e: + print(f"❌ 缓存保存失败: {e}") + + +def generate_fingerprint(title, pub_time): + raw = f"{title.strip()}|{pub_time.strip()}" + return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16] + + +def is_today(pub_time_str: str) -> bool: + if not pub_time_str: + return False + try: + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', pub_time_str) + if not m: + return False + year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) + curr_year = datetime.now().year + if year > curr_year + 1 or year < curr_year - 5: + year = curr_year + pub_date = datetime(year, month, day).date() + return pub_date == datetime.now().date() + except Exception as e: + print(f"⚠️ 日期解析失败: '{pub_time_str}' → {e}") + return False + + +def parse_news_list(html, base_url=BASE_URL): + soup = BeautifulSoup(html, 'lxml') + items = [] + for table in soup.find_all('table', width="800", border="0"): + title_a = table.select_one('font[face="微软雅黑"][style*="font-size: 15pt"] a[href]') + if not title_a: + continue + title = title_a.get_text(strip=True) + link = urljoin(base_url, title_a['href']) + parsed = urlparse(link) + if not parsed.netloc.endswith("marketmatrix.net"): + continue + if "/topnews/" not in link: + continue + meta_fonts = table.select('font[size="2"]') + meta_combined = " ".join(f.get_text(strip=True) for f in meta_fonts) + time_match = "" + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})(?:[^\d]+(\d{1,2}:\d{2}))?', meta_combined) + if m: + date_part = f"{m.group(1)}.{int(m.group(2)):02d}.{int(m.group(3)):02d}" + time_part = m.group(4) or "00:00" + time_match = f"{date_part} {time_part}" + else: + m2 = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', meta_combined) + if m2: + time_match = f"{m2.group(1)}.{int(m2.group(2)):02d}.{int(m2.group(3)):02d}" + category = "" + for txt in meta_fonts: + t = txt.get_text(strip=True).replace("主题:", "") + if re.search(r'\d{4}|编辑|新闻源|要点', t): + continue + if 2 < len(t) < 30: + category = t + break + items.append({ + "标题": title, + "分类": category, + "发布时间": time_match, + "原文链接": link + }) + return items + + +def extract_article_content(html): + if not html: + return "(访问失败)" + soup = BeautifulSoup(html, 'lxml') + editor_p = None + for p in soup.find_all(['p', 'font']): + txt = p.get_text() + if "编辑:" in txt and "发布时间:" in txt: + editor_p = p + break + if editor_p: + container = editor_p + for _ in range(3): + if container.parent and container.parent.name == 'td': + container = container.parent + break + container = container.parent + if container and container.name == 'td': + paras = [] + for p in container.find_all('p'): + t = p.get_text(strip=True) + if len(t) > 20 and not any(skip in t for skip in [ + "编辑:", "发布时间:", "主题:", "新闻源:", "要点:", "开户", "保证金", "©" + ]): + paras.append(t) + if paras: + return "\n".join(paras) + fallback = [p.get_text(strip=True) for p in soup.find_all('p') if 30 <= len(p.get_text()) <= 500] + return "\n".join(fallback[:15]) if fallback else "(提取失败)" + + +def sanitize_filename(name: str, max_len=80) -> str: + return re.sub(r'[\\/:*?"<>|\r\n\t]', ' ', name).strip()[:max_len] or "untitled" + + +def upload_to_knowledge_base_from_content(filename: str, content: str) -> dict: + print(f"📤 正在上传: {filename} (内存)") + try: + content_bytes = content.encode('utf-8') + with monitor.io_timer("network_write", f"upload: {filename}"): + files = {'file': (filename, content_bytes, 'text/markdown')} + data = {'indexId': KB_INDEX_ID} + headers = {'token': KB_TOKEN} + response = requests.post(KB_API_URL, files=files, data=data, headers=headers, timeout=30) + print(f" ← HTTP {response.status_code}") + try: + res_json = response.json() + code = res_json.get("code") + msg = res_json.get("message") or res_json.get("error") or "未知错误" + if code == 200 and res_json.get("fileId"): + print(f" ✅ 上传成功 → fileId: {res_json['fileId']}") + return {"code": 200, "fileId": res_json["fileId"], "message": "OK"} + else: + print(f" ⚠️ 业务失败 → code: {code}, msg: {msg}") + return {"code": code or -1, "fileId": "", "message": msg} + except Exception as json_e: + print(f" ❌ JSON 解析失败: {json_e}, 原始响应: {response.text[:200]}") + return {"code": -2, "fileId": "", "message": f"非JSON响应: {response.text[:100]}"} + except Exception as e: + print(f" ❌ 上传异常: {e}") + return {"code": -1, "fileId": "", "message": str(e)} + + +# ====== 主流程 ====== +def get_all_list_pages(): + pages = [LIST_URL] + current_url = LIST_URL + visited = {LIST_URL} + print("🔗 探测分页中(从首页开始)...") + + for i in range(1, MAX_PAGES): + html = fetch_page(current_url) + if not html: + break + + soup = BeautifulSoup(html, 'lxml') + more_link = soup.find('a', string=re.compile(r'查看更多', re.IGNORECASE)) + if not more_link: + more_link = soup.find('a', href=re.compile(r'news-list-\d+\.htm', re.IGNORECASE)) + if not more_link or not more_link.get('href'): + break + + next_href = more_link['href'].strip() + next_url = urljoin(current_url, next_href) + if not next_url.startswith(BASE_URL) or next_url in visited: + break + + visited.add(next_url) + pages.append(next_url) + print(f" ➕ {len(pages):2d}. {next_url}") + current_url = next_url + + return pages + + +def main(): + overall_start = time.perf_counter() + print("▶ 启动高抗反爬抓取流程...") + + # 1. 获取列表页(复用 driver) + print("\n[阶段1] 获取新闻列表(复用浏览器上下文)") + list_pages = get_all_list_pages() + all_items = [] + + for i, url in enumerate(list_pages, 1): + print(f"[{i}/{len(list_pages)}] 解析 {urlparse(url).path or '/'}") + html = fetch_page(url) + if not html: + continue + base_for_links = BASE_URL if "/list/" not in url else urljoin(BASE_URL, "/list/") + items = parse_news_list(html, base_url=base_for_links) + all_items.extend(items) + + print(f"✅ 共提取 {len(all_items)} 条原始新闻") + + # 2. 过滤今日 & 去重 + print("\n[阶段2] 过滤今日 & 去重") + history = load_history() + new_items = [] + for item in all_items: + if not is_today(item["发布时间"]): + continue + fp = generate_fingerprint(item["标题"], item["发布时间"]) + if fp in history: + print(f"⏭️ 跳过重复: {item['标题'][:30]}...") + continue + new_items.append(item) + history.add(fp) + + print(f"🆕 今日新增 {len(new_items)} 条新闻") + + # 3. 抓取正文 + 上传 + print("\n[阶段3] 抓取正文 & 上传(内存上传)") + results = [] + for i, item in enumerate(new_items, 1): + title = item["标题"] + print(f"\n[{i}/{len(new_items)}] {title[:50]}...") + + try: + with monitor.io_timer("network_read", f"article: {title[:20]}"): + html = fetch_page(item["原文链接"]) + content = extract_article_content(html) if html else "(访问失败)" + item["正文内容"] = content + + # 构建 Markdown 内容(内存中) + md_content = f"""# {title} + +- 分类:{item['分类']} +- 发布时间:{item['发布时间']} +- 原文链接:{item['原文链接']} + +--- + +{content} +""" + + # 保存到磁盘(可选,用于审计) + safe_title = sanitize_filename(title) + md_file = f"{i:02d}_{safe_title}.md" + md_path = os.path.join(OUTPUT_DIR, md_file) + with monitor.io_timer("file_write", f"save_md: {md_file}"): + with open(md_path, "w", encoding="utf-8") as f: + f.write(md_content) + print(f" 💾 已保存:{md_file}") + + # 上传(内存) + res = upload_to_knowledge_base_from_content(md_file, md_content) + item.update({ + "知识库FileId": res.get("fileId", ""), + "上传状态": "✅" if res.get("code") == 200 else "❌", + "上传信息": res.get("message", "")[:100], + "指纹": fp + }) + results.append(item) + + except Exception as e: + print(f"❌ 处理失败: {title[:30]} | {e}") + item.update({ + "知识库FileId": "", + "上传状态": "❌ 处理失败", + "上传信息": str(e)[:100], + "指纹": fp + }) + results.append(item) + continue + + # 4. 保存 & 退出 + print("\n[阶段4] 保存缓存 & Excel") + save_history(history) + if results: + with monitor.io_timer("file_write", "save_excel"): + df = pd.DataFrame(results) + df.to_excel(OUTPUT_EXCEL, index=False, engine='openpyxl') + print(f"\n🎉 完成!今日新增 {len(results)} 条,Excel: {OUTPUT_EXCEL}") + else: + print(f"\nℹ️ 今日暂无新新闻发布(已探测 {len(list_pages)} 页)") + + # 关键:不 quit driver!保留上下文供下次使用 + print("📌 浏览器上下文已保留,下次运行将复用(加速)") + + # 输出 I/O 总结 + monitor.summary() + total_elapsed = time.perf_counter() - overall_start + print(f"\n🎯 总运行时间: {total_elapsed:.2f}s") + + +# ====== 优雅退出(保留 driver)====== +def cleanup(): + """进程退出时清理(不 quit driver,除非强制)""" + global _driver_instance + if _driver_instance: + print("💡 提示:为加速下次运行,浏览器上下文已保留。") + print(" 如需彻底清理,请手动删除目录:", PERSISTENT_PROFILE_DIR) + # 不 quit,保留状态 + # _driver_instance.quit() + + +import atexit + +atexit.register(cleanup) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n🛑 用户中断,正在退出...") + cleanup() + except Exception as e: + print(f"\n💥 严重错误: {e}") + cleanup() + raise \ No newline at end of file diff --git a/hangye.py b/hangye.py new file mode 100644 index 0000000..e3e0b51 --- /dev/null +++ b/hangye.py @@ -0,0 +1,525 @@ +# marketmatrix_today_upload_anti_bot.py +# 功能:抓今日新闻(去重)+ 上传知识库 + 输出 Excel + 【高抗反爬 + I/O 监控】 + +import os +import re +import json +import hashlib +import time +import random +from datetime import datetime, timedelta +from urllib.parse import urljoin, urlparse +from contextlib import contextmanager + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from webdriver_manager.chrome import ChromeDriverManager + +# ====== 配置区 ====== +KB_API_URL = "https://dcapi.homilychart.com/prod/deepchartapi/api/QwenKnowledge/add" +KB_TOKEN = "d20287d0bb0298c73e540da7e3e1d7e3" +KB_INDEX_ID = "30xe1fbox1" + +BASE_URL = "http://marketmatrix.net" +LIST_URL = urljoin(BASE_URL, "/trading.htm") +OUTPUT_DIR = "today_news" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +OUTPUT_EXCEL = os.path.join(OUTPUT_DIR, f"today_{datetime.now().strftime('%Y%m%d')}.xlsx") +DUPLICATE_CACHE_FILE = os.path.join(OUTPUT_DIR, "today_history.json") + +# 关键:持久化浏览器配置目录 +PERSISTENT_PROFILE_DIR = os.path.join(os.getcwd(), "hangye_config") +PROFILE_CLEAN_THRESHOLD_DAYS = 1 # 1天以上自动清理 + +MAX_PAGES = 30 # 适当降低,防深层页风控 +print(f"📅 系统当前日期: {datetime.now().strftime('%Y-%m-%d')}") + +# ====== 全局 driver 单例(✅ 反爬核心)====== +_driver_instance = None + + +def stealth_driver(driver): + """注入 Stealth JS,绕过常见 Bot 检测""" + try: + # 移除 webdriver 标志 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + # 隐藏语言特征 + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") + # 隐藏插件列表 + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]})") + # 隐藏硬件并发数(伪装普通设备) + driver.execute_script("Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8})") + # 隐藏设备内存(GB) + driver.execute_script("Object.defineProperty(navigator, 'deviceMemory', {get: () => 8})") + print("🛡️ Stealth JS 注入成功") + except Exception as e: + print(f"⚠️ Stealth JS 注入失败: {e}") + + +def init_persistent_driver(): + """初始化持久化 Chrome 实例(仅首次调用)""" + global _driver_instance + if _driver_instance is not None: + return _driver_instance + + print("🔧 初始化持久化浏览器(首次启动,需过反爬,请稍候...)") + + # 自动清理过期 Profile + _clean_old_profile() + + # 配置 Chrome + chrome_options = Options() + chrome_options.add_argument(f"--user-data-dir={PERSISTENT_PROFILE_DIR}") + chrome_options.add_argument("--profile-directory=Default") + chrome_options.add_argument("--headless=new") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--disable-blink-features=AutomationControlled") + chrome_options.add_argument("--disable-extensions") + chrome_options.add_argument("--disable-plugins-discovery") + chrome_options.add_argument("--disable-features=VizDisplayCompositor") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_options.add_experimental_option('useAutomationExtension', False) + # chrome_options.binary_location = "/usr/bin/chromium-browser" + # 启动 + service = Service(ChromeDriverManager().install()) + try: + _driver_instance = webdriver.Chrome(service=service, options=chrome_options) + _driver_instance.set_page_load_timeout(30) + _driver_instance.implicitly_wait(10) + + # 关键:访问首页“暖机”,触发反爬验证 + print("🌐 正在访问首页以通过反爬验证...") + _driver_instance.get(BASE_URL) + time.sleep(2 + random.uniform(0.5, 1.5)) + + # 注入 Stealth + stealth_driver(_driver_instance) + + # 再访问一次列表页,确保状态稳定 + _driver_instance.get(LIST_URL) + time.sleep(1.5 + random.uniform(0.5, 1.0)) + + print("✅ 浏览器初始化完成,后续请求将复用上下文") + return _driver_instance + except Exception as e: + if _driver_instance: + _driver_instance.quit() + _driver_instance = None + raise RuntimeError(f"浏览器初始化失败: {e}") + + +def _clean_old_profile(): + """清理过期 Profile(防积累)""" + if not os.path.exists(PERSISTENT_PROFILE_DIR): + return + try: + profile_age = time.time() - os.path.getctime(PERSISTENT_PROFILE_DIR) + if profile_age > PROFILE_CLEAN_THRESHOLD_DAYS * 86400: + print(f"🧹 Profile 目录已超 {PROFILE_CLEAN_THRESHOLD_DAYS} 天,正在清理...") + import shutil + shutil.rmtree(PERSISTENT_PROFILE_DIR, ignore_errors=True) + os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) + print("✅ 已重建干净 Profile") + except Exception as e: + print(f"⚠️ Profile 清理失败(继续使用现有): {e}") + + +def fetch_page(url, max_retries=2): + """带重试的页面获取(✅ 复用全局 driver)""" + global _driver_instance + for attempt in range(max_retries + 1): + try: + if _driver_instance is None: + _driver_instance = init_persistent_driver() + + # 拟人化:随机滚动 + 延迟 + _driver_instance.get(url) + time.sleep(0.8 + random.uniform(0.3, 0.7)) + + # 检查是否被拦截 + page_source = _driver_instance.page_source + if "403 Forbidden" in page_source or "challenge-platform" in page_source.lower(): + raise Exception("被反爬拦截(403/Challenge)") + + return page_source + + except Exception as e: + print(f" ⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}") + if attempt < max_retries: + time.sleep(3 + random.uniform(1, 2)) + # 重启 driver(极端情况) + if _driver_instance: + try: + _driver_instance.quit() + except: + pass + _driver_instance = None + else: + return None + + +# ====== I/O 监控器(保留)====== +class IOMonitor: + def __init__(self): + self.records = {} + + @contextmanager + def io_timer(self, io_type: str, desc: str = ""): + if io_type not in self.records: + self.records[io_type] = [] + start = time.perf_counter() + try: + yield + finally: + duration = time.perf_counter() - start + self.records[io_type].append((duration, desc)) + + def summary(self): + print("\n" + "=" * 60) + print("📊 I/O 耗时总览(反爬优化版)") + print("=" * 60) + total_time = 0.0 + for io_type, records in sorted(self.records.items()): + count = len(records) + total = sum(t for t, _ in records) + avg = total / count if count else 0 + total_time += total + print(f"✅ {io_type:<15} | 调用 {count:2d} 次 | 总耗时 {total:6.2f}s | 平均 {avg:5.3f}s") + if count > 3: + slowest = sorted(records, key=lambda x: x[0], reverse=True)[:2] + for i, (t, d) in enumerate(slowest, 1): + print(f" └─ #{i} 慢: {t:5.2f}s → {d}") + print("-" * 60) + print(f"⏱️ I/O 总耗时: {total_time:6.2f}s") + print("=" * 60) + + +monitor = IOMonitor() + + +# ====== 工具函数(加监控)====== +def load_history(): + with monitor.io_timer("file_read", f"load_history: {DUPLICATE_CACHE_FILE}"): + if os.path.exists(DUPLICATE_CACHE_FILE): + try: + with open(DUPLICATE_CACHE_FILE, 'r', encoding='utf-8') as f: + return set(json.load(f)) + except Exception as e: + print(f"⚠️ 历史缓存加载失败: {e}") + return set() + + +def save_history(history_set): + with monitor.io_timer("file_write", f"save_history: {DUPLICATE_CACHE_FILE}"): + try: + with open(DUPLICATE_CACHE_FILE, 'w', encoding='utf-8') as f: + json.dump(list(history_set), f, ensure_ascii=False, indent=2) + print(f"💾 缓存已更新:{len(history_set)} 条") + except Exception as e: + print(f"❌ 缓存保存失败: {e}") + + +def generate_fingerprint(title, pub_time): + raw = f"{title.strip()}|{pub_time.strip()}" + return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16] + + +def is_today(pub_time_str: str) -> bool: + if not pub_time_str: + return False + try: + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', pub_time_str) + if not m: + return False + year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) + curr_year = datetime.now().year + if year > curr_year + 1 or year < curr_year - 5: + year = curr_year + pub_date = datetime(year, month, day).date() + return pub_date == datetime.now().date() + except Exception as e: + print(f"⚠️ 日期解析失败: '{pub_time_str}' → {e}") + return False + + +def parse_news_list(html, base_url=BASE_URL): + soup = BeautifulSoup(html, 'lxml') + items = [] + for table in soup.find_all('table', width="800", border="0"): + title_a = table.select_one('font[face="微软雅黑"][style*="font-size: 15pt"] a[href]') + if not title_a: + continue + title = title_a.get_text(strip=True) + link = urljoin(base_url, title_a['href']) + parsed = urlparse(link) + if not parsed.netloc.endswith("marketmatrix.net"): + continue + if "/topnews/" not in link: + continue + meta_fonts = table.select('font[size="2"]') + meta_combined = " ".join(f.get_text(strip=True) for f in meta_fonts) + time_match = "" + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})(?:[^\d]+(\d{1,2}:\d{2}))?', meta_combined) + if m: + date_part = f"{m.group(1)}.{int(m.group(2)):02d}.{int(m.group(3)):02d}" + time_part = m.group(4) or "00:00" + time_match = f"{date_part} {time_part}" + else: + m2 = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', meta_combined) + if m2: + time_match = f"{m2.group(1)}.{int(m2.group(2)):02d}.{int(m2.group(3)):02d}" + category = "" + for txt in meta_fonts: + t = txt.get_text(strip=True).replace("主题:", "") + if re.search(r'\d{4}|编辑|新闻源|要点', t): + continue + if 2 < len(t) < 30: + category = t + break + items.append({ + "标题": title, + "分类": category, + "发布时间": time_match, + "原文链接": link + }) + return items + + +def extract_article_content(html): + if not html: + return "(访问失败)" + soup = BeautifulSoup(html, 'lxml') + editor_p = None + for p in soup.find_all(['p', 'font']): + txt = p.get_text() + if "编辑:" in txt and "发布时间:" in txt: + editor_p = p + break + if editor_p: + container = editor_p + for _ in range(3): + if container.parent and container.parent.name == 'td': + container = container.parent + break + container = container.parent + if container and container.name == 'td': + paras = [] + for p in container.find_all('p'): + t = p.get_text(strip=True) + if len(t) > 20 and not any(skip in t for skip in [ + "编辑:", "发布时间:", "主题:", "新闻源:", "要点:", "开户", "保证金", "©" + ]): + paras.append(t) + if paras: + return "\n".join(paras) + fallback = [p.get_text(strip=True) for p in soup.find_all('p') if 30 <= len(p.get_text()) <= 500] + return "\n".join(fallback[:15]) if fallback else "(提取失败)" + + +def sanitize_filename(name: str, max_len=80) -> str: + return re.sub(r'[\\/:*?"<>|\r\n\t]', ' ', name).strip()[:max_len] or "untitled" + + +def upload_to_knowledge_base_from_content(filename: str, content: str) -> dict: + print(f"📤 正在上传: {filename} (内存)") + try: + content_bytes = content.encode('utf-8') + with monitor.io_timer("network_write", f"upload: {filename}"): + files = {'file': (filename, content_bytes, 'text/markdown')} + data = {'indexId': KB_INDEX_ID} + headers = {'token': KB_TOKEN} + response = requests.post(KB_API_URL, files=files, data=data, headers=headers, timeout=30) + print(f" ← HTTP {response.status_code}") + try: + res_json = response.json() + code = res_json.get("code") + msg = res_json.get("message") or res_json.get("error") or "未知错误" + if code == 200 and res_json.get("fileId"): + print(f" ✅ 上传成功 → fileId: {res_json['fileId']}") + return {"code": 200, "fileId": res_json["fileId"], "message": "OK"} + else: + print(f" ⚠️ 业务失败 → code: {code}, msg: {msg}") + return {"code": code or -1, "fileId": "", "message": msg} + except Exception as json_e: + print(f" ❌ JSON 解析失败: {json_e}, 原始响应: {response.text[:200]}") + return {"code": -2, "fileId": "", "message": f"非JSON响应: {response.text[:100]}"} + except Exception as e: + print(f" ❌ 上传异常: {e}") + return {"code": -1, "fileId": "", "message": str(e)} + + +# ====== 主流程 ====== +def get_all_list_pages(): + pages = [LIST_URL] + current_url = LIST_URL + visited = {LIST_URL} + print("🔗 探测分页中(从首页开始)...") + + for i in range(1, MAX_PAGES): + html = fetch_page(current_url) + if not html: + break + + soup = BeautifulSoup(html, 'lxml') + more_link = soup.find('a', string=re.compile(r'查看更多', re.IGNORECASE)) + if not more_link: + more_link = soup.find('a', href=re.compile(r'news-list-\d+\.htm', re.IGNORECASE)) + if not more_link or not more_link.get('href'): + break + + next_href = more_link['href'].strip() + next_url = urljoin(current_url, next_href) + if not next_url.startswith(BASE_URL) or next_url in visited: + break + + visited.add(next_url) + pages.append(next_url) + print(f" ➕ {len(pages):2d}. {next_url}") + current_url = next_url + + return pages + + +def main(): + overall_start = time.perf_counter() + print("▶ 启动高抗反爬抓取流程...") + + # 1. 获取列表页(复用 driver) + print("\n[阶段1] 获取新闻列表(复用浏览器上下文)") + list_pages = get_all_list_pages() + all_items = [] + + for i, url in enumerate(list_pages, 1): + print(f"[{i}/{len(list_pages)}] 解析 {urlparse(url).path or '/'}") + html = fetch_page(url) + if not html: + continue + base_for_links = BASE_URL if "/list/" not in url else urljoin(BASE_URL, "/list/") + items = parse_news_list(html, base_url=base_for_links) + all_items.extend(items) + + print(f"✅ 共提取 {len(all_items)} 条原始新闻") + + # 2. 过滤今日 & 去重 + print("\n[阶段2] 过滤今日 & 去重") + history = load_history() + new_items = [] + for item in all_items: + if not is_today(item["发布时间"]): + continue + fp = generate_fingerprint(item["标题"], item["发布时间"]) + if fp in history: + print(f"⏭️ 跳过重复: {item['标题'][:30]}...") + continue + new_items.append(item) + history.add(fp) + + print(f"🆕 今日新增 {len(new_items)} 条新闻") + + # 3. 抓取正文 + 上传 + print("\n[阶段3] 抓取正文 & 上传(内存上传)") + results = [] + for i, item in enumerate(new_items, 1): + title = item["标题"] + print(f"\n[{i}/{len(new_items)}] {title[:50]}...") + + try: + with monitor.io_timer("network_read", f"article: {title[:20]}"): + html = fetch_page(item["原文链接"]) + content = extract_article_content(html) if html else "(访问失败)" + item["正文内容"] = content + + # 构建 Markdown 内容(内存中) + md_content = f"""# {title} + +- 分类:{item['分类']} +- 发布时间:{item['发布时间']} +- 原文链接:{item['原文链接']} + +--- + +{content} +""" + + # 保存到磁盘(可选,用于审计) + safe_title = sanitize_filename(title) + md_file = f"{i:02d}_{safe_title}.md" + md_path = os.path.join(OUTPUT_DIR, md_file) + with monitor.io_timer("file_write", f"save_md: {md_file}"): + with open(md_path, "w", encoding="utf-8") as f: + f.write(md_content) + print(f" 💾 已保存:{md_file}") + + # 上传(内存) + res = upload_to_knowledge_base_from_content(md_file, md_content) + item.update({ + "知识库FileId": res.get("fileId", ""), + "上传状态": "✅" if res.get("code") == 200 else "❌", + "上传信息": res.get("message", "")[:100], + "指纹": fp + }) + results.append(item) + + except Exception as e: + print(f"❌ 处理失败: {title[:30]} | {e}") + item.update({ + "知识库FileId": "", + "上传状态": "❌ 处理失败", + "上传信息": str(e)[:100], + "指纹": fp + }) + results.append(item) + continue + + # 4. 保存 & 退出 + print("\n[阶段4] 保存缓存 & Excel") + save_history(history) + if results: + with monitor.io_timer("file_write", "save_excel"): + df = pd.DataFrame(results) + df.to_excel(OUTPUT_EXCEL, index=False, engine='openpyxl') + print(f"\n🎉 完成!今日新增 {len(results)} 条,Excel: {OUTPUT_EXCEL}") + else: + print(f"\nℹ️ 今日暂无新新闻发布(已探测 {len(list_pages)} 页)") + + # 关键:不 quit driver!保留上下文供下次使用 + print("📌 浏览器上下文已保留,下次运行将复用(加速)") + + # 输出 I/O 总结 + monitor.summary() + total_elapsed = time.perf_counter() - overall_start + print(f"\n🎯 总运行时间: {total_elapsed:.2f}s") + + +# ====== 优雅退出(保留 driver)====== +def cleanup(): + """进程退出时清理(不 quit driver,除非强制)""" + global _driver_instance + if _driver_instance: + print("💡 提示:为加速下次运行,浏览器上下文已保留。") + print(" 如需彻底清理,请手动删除目录:", PERSISTENT_PROFILE_DIR) + # 不 quit,保留状态 + # _driver_instance.quit() + + +import atexit + +atexit.register(cleanup) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n🛑 用户中断,正在退出...") + cleanup() + except Exception as e: + print(f"\n💥 严重错误: {e}") + cleanup() + raise \ No newline at end of file diff --git a/zixun.py b/zixun.py new file mode 100644 index 0000000..fc1524f --- /dev/null +++ b/zixun.py @@ -0,0 +1,525 @@ +# marketmatrix_today_upload_anti_bot.py +# 功能:抓今日新闻(去重)+ 上传知识库 + 输出 Excel + 【高抗反爬 + I/O 监控】 + +import os +import re +import json +import hashlib +import time +import random +from datetime import datetime, timedelta +from urllib.parse import urljoin, urlparse +from contextlib import contextmanager + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from webdriver_manager.chrome import ChromeDriverManager + +# ====== 配置区 ====== +KB_API_URL = "https://dcapi.homilychart.com/prod/deepchartapi/api/QwenKnowledge/add" +KB_TOKEN = "d20287d0bb0298c73e540da7e3e1d7e3" +KB_INDEX_ID = "30xe1fbox1" + +BASE_URL = "http://marketmatrix.net" +LIST_URL = urljoin(BASE_URL, "/macro.htm") +OUTPUT_DIR = "today_news" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +OUTPUT_EXCEL = os.path.join(OUTPUT_DIR, f"today_{datetime.now().strftime('%Y%m%d')}.xlsx") +DUPLICATE_CACHE_FILE = os.path.join(OUTPUT_DIR, "today_history.json") + +# 关键:持久化浏览器配置目录 +PERSISTENT_PROFILE_DIR = os.path.join(os.getcwd(), "zixun_config") +PROFILE_CLEAN_THRESHOLD_DAYS = 1 # 1天以上自动清理 + +MAX_PAGES = 30 # 适当降低,防深层页风控 +print(f"📅 系统当前日期: {datetime.now().strftime('%Y-%m-%d')}") + +# ====== 全局 driver 单例(✅ 反爬核心)====== +_driver_instance = None + + +def stealth_driver(driver): + """注入 Stealth JS,绕过常见 Bot 检测""" + try: + # 移除 webdriver 标志 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + # 隐藏语言特征 + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") + # 隐藏插件列表 + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]})") + # 隐藏硬件并发数(伪装普通设备) + driver.execute_script("Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8})") + # 隐藏设备内存(GB) + driver.execute_script("Object.defineProperty(navigator, 'deviceMemory', {get: () => 8})") + print("🛡️ Stealth JS 注入成功") + except Exception as e: + print(f"⚠️ Stealth JS 注入失败: {e}") + + +def init_persistent_driver(): + """初始化持久化 Chrome 实例(仅首次调用)""" + global _driver_instance + if _driver_instance is not None: + return _driver_instance + + print("🔧 初始化持久化浏览器(首次启动,需过反爬,请稍候...)") + + # 自动清理过期 Profile + _clean_old_profile() + + # 配置 Chrome + chrome_options = Options() + chrome_options.add_argument(f"--user-data-dir={PERSISTENT_PROFILE_DIR}") + chrome_options.add_argument("--profile-directory=Default") + chrome_options.add_argument("--headless=new") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--disable-blink-features=AutomationControlled") + chrome_options.add_argument("--disable-extensions") + chrome_options.add_argument("--disable-plugins-discovery") + chrome_options.add_argument("--disable-features=VizDisplayCompositor") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_options.add_experimental_option('useAutomationExtension', False) + # chrome_options.binary_location = "/usr/bin/chromium-browser" + # 启动 + service = Service(ChromeDriverManager().install()) + try: + _driver_instance = webdriver.Chrome(service=service, options=chrome_options) + _driver_instance.set_page_load_timeout(30) + _driver_instance.implicitly_wait(10) + + # 关键:访问首页“暖机”,触发反爬验证 + print("🌐 正在访问首页以通过反爬验证...") + _driver_instance.get(BASE_URL) + time.sleep(2 + random.uniform(0.5, 1.5)) + + # 注入 Stealth + stealth_driver(_driver_instance) + + # 再访问一次列表页,确保状态稳定 + _driver_instance.get(LIST_URL) + time.sleep(1.5 + random.uniform(0.5, 1.0)) + + print("✅ 浏览器初始化完成,后续请求将复用上下文") + return _driver_instance + except Exception as e: + if _driver_instance: + _driver_instance.quit() + _driver_instance = None + raise RuntimeError(f"浏览器初始化失败: {e}") + + +def _clean_old_profile(): + """清理过期 Profile(防积累)""" + if not os.path.exists(PERSISTENT_PROFILE_DIR): + return + try: + profile_age = time.time() - os.path.getctime(PERSISTENT_PROFILE_DIR) + if profile_age > PROFILE_CLEAN_THRESHOLD_DAYS * 86400: + print(f"🧹 Profile 目录已超 {PROFILE_CLEAN_THRESHOLD_DAYS} 天,正在清理...") + import shutil + shutil.rmtree(PERSISTENT_PROFILE_DIR, ignore_errors=True) + os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) + print("✅ 已重建干净 Profile") + except Exception as e: + print(f"⚠️ Profile 清理失败(继续使用现有): {e}") + + +def fetch_page(url, max_retries=2): + """带重试的页面获取(✅ 复用全局 driver)""" + global _driver_instance + for attempt in range(max_retries + 1): + try: + if _driver_instance is None: + _driver_instance = init_persistent_driver() + + # 拟人化:随机滚动 + 延迟 + _driver_instance.get(url) + time.sleep(0.8 + random.uniform(0.3, 0.7)) + + # 检查是否被拦截 + page_source = _driver_instance.page_source + if "403 Forbidden" in page_source or "challenge-platform" in page_source.lower(): + raise Exception("被反爬拦截(403/Challenge)") + + return page_source + + except Exception as e: + print(f" ⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}") + if attempt < max_retries: + time.sleep(3 + random.uniform(1, 2)) + # 重启 driver(极端情况) + if _driver_instance: + try: + _driver_instance.quit() + except: + pass + _driver_instance = None + else: + return None + + +# ====== I/O 监控器(保留)====== +class IOMonitor: + def __init__(self): + self.records = {} + + @contextmanager + def io_timer(self, io_type: str, desc: str = ""): + if io_type not in self.records: + self.records[io_type] = [] + start = time.perf_counter() + try: + yield + finally: + duration = time.perf_counter() - start + self.records[io_type].append((duration, desc)) + + def summary(self): + print("\n" + "=" * 60) + print("📊 I/O 耗时总览(反爬优化版)") + print("=" * 60) + total_time = 0.0 + for io_type, records in sorted(self.records.items()): + count = len(records) + total = sum(t for t, _ in records) + avg = total / count if count else 0 + total_time += total + print(f"✅ {io_type:<15} | 调用 {count:2d} 次 | 总耗时 {total:6.2f}s | 平均 {avg:5.3f}s") + if count > 3: + slowest = sorted(records, key=lambda x: x[0], reverse=True)[:2] + for i, (t, d) in enumerate(slowest, 1): + print(f" └─ #{i} 慢: {t:5.2f}s → {d}") + print("-" * 60) + print(f"⏱️ I/O 总耗时: {total_time:6.2f}s") + print("=" * 60) + + +monitor = IOMonitor() + + +# ====== 工具函数(加监控)====== +def load_history(): + with monitor.io_timer("file_read", f"load_history: {DUPLICATE_CACHE_FILE}"): + if os.path.exists(DUPLICATE_CACHE_FILE): + try: + with open(DUPLICATE_CACHE_FILE, 'r', encoding='utf-8') as f: + return set(json.load(f)) + except Exception as e: + print(f"⚠️ 历史缓存加载失败: {e}") + return set() + + +def save_history(history_set): + with monitor.io_timer("file_write", f"save_history: {DUPLICATE_CACHE_FILE}"): + try: + with open(DUPLICATE_CACHE_FILE, 'w', encoding='utf-8') as f: + json.dump(list(history_set), f, ensure_ascii=False, indent=2) + print(f"💾 缓存已更新:{len(history_set)} 条") + except Exception as e: + print(f"❌ 缓存保存失败: {e}") + + +def generate_fingerprint(title, pub_time): + raw = f"{title.strip()}|{pub_time.strip()}" + return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16] + + +def is_today(pub_time_str: str) -> bool: + if not pub_time_str: + return False + try: + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', pub_time_str) + if not m: + return False + year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) + curr_year = datetime.now().year + if year > curr_year + 1 or year < curr_year - 5: + year = curr_year + pub_date = datetime(year, month, day).date() + return pub_date == datetime.now().date() + except Exception as e: + print(f"⚠️ 日期解析失败: '{pub_time_str}' → {e}") + return False + + +def parse_news_list(html, base_url=BASE_URL): + soup = BeautifulSoup(html, 'lxml') + items = [] + for table in soup.find_all('table', width="800", border="0"): + title_a = table.select_one('font[face="微软雅黑"][style*="font-size: 15pt"] a[href]') + if not title_a: + continue + title = title_a.get_text(strip=True) + link = urljoin(base_url, title_a['href']) + parsed = urlparse(link) + if not parsed.netloc.endswith("marketmatrix.net"): + continue + if "/topnews/" not in link: + continue + meta_fonts = table.select('font[size="2"]') + meta_combined = " ".join(f.get_text(strip=True) for f in meta_fonts) + time_match = "" + m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})(?:[^\d]+(\d{1,2}:\d{2}))?', meta_combined) + if m: + date_part = f"{m.group(1)}.{int(m.group(2)):02d}.{int(m.group(3)):02d}" + time_part = m.group(4) or "00:00" + time_match = f"{date_part} {time_part}" + else: + m2 = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', meta_combined) + if m2: + time_match = f"{m2.group(1)}.{int(m2.group(2)):02d}.{int(m2.group(3)):02d}" + category = "" + for txt in meta_fonts: + t = txt.get_text(strip=True).replace("主题:", "") + if re.search(r'\d{4}|编辑|新闻源|要点', t): + continue + if 2 < len(t) < 30: + category = t + break + items.append({ + "标题": title, + "分类": category, + "发布时间": time_match, + "原文链接": link + }) + return items + + +def extract_article_content(html): + if not html: + return "(访问失败)" + soup = BeautifulSoup(html, 'lxml') + editor_p = None + for p in soup.find_all(['p', 'font']): + txt = p.get_text() + if "编辑:" in txt and "发布时间:" in txt: + editor_p = p + break + if editor_p: + container = editor_p + for _ in range(3): + if container.parent and container.parent.name == 'td': + container = container.parent + break + container = container.parent + if container and container.name == 'td': + paras = [] + for p in container.find_all('p'): + t = p.get_text(strip=True) + if len(t) > 20 and not any(skip in t for skip in [ + "编辑:", "发布时间:", "主题:", "新闻源:", "要点:", "开户", "保证金", "©" + ]): + paras.append(t) + if paras: + return "\n".join(paras) + fallback = [p.get_text(strip=True) for p in soup.find_all('p') if 30 <= len(p.get_text()) <= 500] + return "\n".join(fallback[:15]) if fallback else "(提取失败)" + + +def sanitize_filename(name: str, max_len=80) -> str: + return re.sub(r'[\\/:*?"<>|\r\n\t]', ' ', name).strip()[:max_len] or "untitled" + + +def upload_to_knowledge_base_from_content(filename: str, content: str) -> dict: + print(f"📤 正在上传: {filename} (内存)") + try: + content_bytes = content.encode('utf-8') + with monitor.io_timer("network_write", f"upload: {filename}"): + files = {'file': (filename, content_bytes, 'text/markdown')} + data = {'indexId': KB_INDEX_ID} + headers = {'token': KB_TOKEN} + response = requests.post(KB_API_URL, files=files, data=data, headers=headers, timeout=30) + print(f" ← HTTP {response.status_code}") + try: + res_json = response.json() + code = res_json.get("code") + msg = res_json.get("message") or res_json.get("error") or "未知错误" + if code == 200 and res_json.get("fileId"): + print(f" ✅ 上传成功 → fileId: {res_json['fileId']}") + return {"code": 200, "fileId": res_json["fileId"], "message": "OK"} + else: + print(f" ⚠️ 业务失败 → code: {code}, msg: {msg}") + return {"code": code or -1, "fileId": "", "message": msg} + except Exception as json_e: + print(f" ❌ JSON 解析失败: {json_e}, 原始响应: {response.text[:200]}") + return {"code": -2, "fileId": "", "message": f"非JSON响应: {response.text[:100]}"} + except Exception as e: + print(f" ❌ 上传异常: {e}") + return {"code": -1, "fileId": "", "message": str(e)} + + +# ====== 主流程 ====== +def get_all_list_pages(): + pages = [LIST_URL] + current_url = LIST_URL + visited = {LIST_URL} + print("🔗 探测分页中(从首页开始)...") + + for i in range(1, MAX_PAGES): + html = fetch_page(current_url) + if not html: + break + + soup = BeautifulSoup(html, 'lxml') + more_link = soup.find('a', string=re.compile(r'查看更多', re.IGNORECASE)) + if not more_link: + more_link = soup.find('a', href=re.compile(r'news-list-\d+\.htm', re.IGNORECASE)) + if not more_link or not more_link.get('href'): + break + + next_href = more_link['href'].strip() + next_url = urljoin(current_url, next_href) + if not next_url.startswith(BASE_URL) or next_url in visited: + break + + visited.add(next_url) + pages.append(next_url) + print(f" ➕ {len(pages):2d}. {next_url}") + current_url = next_url + + return pages + + +def main(): + overall_start = time.perf_counter() + print("▶ 启动高抗反爬抓取流程...") + + # 1. 获取列表页(复用 driver) + print("\n[阶段1] 获取新闻列表(复用浏览器上下文)") + list_pages = get_all_list_pages() + all_items = [] + + for i, url in enumerate(list_pages, 1): + print(f"[{i}/{len(list_pages)}] 解析 {urlparse(url).path or '/'}") + html = fetch_page(url) + if not html: + continue + base_for_links = BASE_URL if "/list/" not in url else urljoin(BASE_URL, "/list/") + items = parse_news_list(html, base_url=base_for_links) + all_items.extend(items) + + print(f"✅ 共提取 {len(all_items)} 条原始新闻") + + # 2. 过滤今日 & 去重 + print("\n[阶段2] 过滤今日 & 去重") + history = load_history() + new_items = [] + for item in all_items: + if not is_today(item["发布时间"]): + continue + fp = generate_fingerprint(item["标题"], item["发布时间"]) + if fp in history: + print(f"⏭️ 跳过重复: {item['标题'][:30]}...") + continue + new_items.append(item) + history.add(fp) + + print(f"🆕 今日新增 {len(new_items)} 条新闻") + + # 3. 抓取正文 + 上传 + print("\n[阶段3] 抓取正文 & 上传(内存上传)") + results = [] + for i, item in enumerate(new_items, 1): + title = item["标题"] + print(f"\n[{i}/{len(new_items)}] {title[:50]}...") + + try: + with monitor.io_timer("network_read", f"article: {title[:20]}"): + html = fetch_page(item["原文链接"]) + content = extract_article_content(html) if html else "(访问失败)" + item["正文内容"] = content + + # 构建 Markdown 内容(内存中) + md_content = f"""# {title} + +- 分类:{item['分类']} +- 发布时间:{item['发布时间']} +- 原文链接:{item['原文链接']} + +--- + +{content} +""" + + # 保存到磁盘(可选,用于审计) + safe_title = sanitize_filename(title) + md_file = f"{i:02d}_{safe_title}.md" + md_path = os.path.join(OUTPUT_DIR, md_file) + with monitor.io_timer("file_write", f"save_md: {md_file}"): + with open(md_path, "w", encoding="utf-8") as f: + f.write(md_content) + print(f" 💾 已保存:{md_file}") + + # 上传(内存) + res = upload_to_knowledge_base_from_content(md_file, md_content) + item.update({ + "知识库FileId": res.get("fileId", ""), + "上传状态": "✅" if res.get("code") == 200 else "❌", + "上传信息": res.get("message", "")[:100], + "指纹": fp + }) + results.append(item) + + except Exception as e: + print(f"❌ 处理失败: {title[:30]} | {e}") + item.update({ + "知识库FileId": "", + "上传状态": "❌ 处理失败", + "上传信息": str(e)[:100], + "指纹": fp + }) + results.append(item) + continue + + # 4. 保存 & 退出 + print("\n[阶段4] 保存缓存 & Excel") + save_history(history) + if results: + with monitor.io_timer("file_write", "save_excel"): + df = pd.DataFrame(results) + df.to_excel(OUTPUT_EXCEL, index=False, engine='openpyxl') + print(f"\n🎉 完成!今日新增 {len(results)} 条,Excel: {OUTPUT_EXCEL}") + else: + print(f"\nℹ️ 今日暂无新新闻发布(已探测 {len(list_pages)} 页)") + + # 关键:不 quit driver!保留上下文供下次使用 + print("📌 浏览器上下文已保留,下次运行将复用(加速)") + + # 输出 I/O 总结 + monitor.summary() + total_elapsed = time.perf_counter() - overall_start + print(f"\n🎯 总运行时间: {total_elapsed:.2f}s") + + +# ====== 优雅退出(保留 driver)====== +def cleanup(): + """进程退出时清理(不 quit driver,除非强制)""" + global _driver_instance + if _driver_instance: + print("💡 提示:为加速下次运行,浏览器上下文已保留。") + print(" 如需彻底清理,请手动删除目录:", PERSISTENT_PROFILE_DIR) + # 不 quit,保留状态 + # _driver_instance.quit() + + +import atexit + +atexit.register(cleanup) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n🛑 用户中断,正在退出...") + cleanup() + except Exception as e: + print(f"\n💥 严重错误: {e}") + cleanup() + raise \ No newline at end of file