You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
525 lines
19 KiB
525 lines
19 KiB
# marketmatrix_today_upload_anti_bot.py
|
|
# 功能:抓今日新闻(去重)+ 上传知识库 + 输出 Excel + 【高抗反爬 + I/O 监控】
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import hashlib
|
|
import time
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from urllib.parse import urljoin, urlparse
|
|
from contextlib import contextmanager
|
|
|
|
import pandas as pd
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
# ====== 配置区 ======
|
|
KB_API_URL = "https://dcapi.homilychart.com/prod/deepchartapi/api/QwenKnowledge/add"
|
|
KB_TOKEN = "d20287d0bb0298c73e540da7e3e1d7e3"
|
|
KB_INDEX_ID = "30xe1fbox1"
|
|
|
|
BASE_URL = "http://marketmatrix.net"
|
|
LIST_URL = urljoin(BASE_URL, "/trading.htm")
|
|
OUTPUT_DIR = "today_news"
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
OUTPUT_EXCEL = os.path.join(OUTPUT_DIR, f"today_{datetime.now().strftime('%Y%m%d')}.xlsx")
|
|
DUPLICATE_CACHE_FILE = os.path.join(OUTPUT_DIR, "today_history.json")
|
|
|
|
# 关键:持久化浏览器配置目录
|
|
PERSISTENT_PROFILE_DIR = os.path.join(os.getcwd(), "hangye_config")
|
|
PROFILE_CLEAN_THRESHOLD_DAYS = 1 # 1天以上自动清理
|
|
|
|
MAX_PAGES = 30 # 适当降低,防深层页风控
|
|
print(f"📅 系统当前日期: {datetime.now().strftime('%Y-%m-%d')}")
|
|
|
|
# ====== 全局 driver 单例(✅ 反爬核心)======
|
|
_driver_instance = None
|
|
|
|
|
|
def stealth_driver(driver):
|
|
"""注入 Stealth JS,绕过常见 Bot 检测"""
|
|
try:
|
|
# 移除 webdriver 标志
|
|
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
|
# 隐藏语言特征
|
|
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})")
|
|
# 隐藏插件列表
|
|
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]})")
|
|
# 隐藏硬件并发数(伪装普通设备)
|
|
driver.execute_script("Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8})")
|
|
# 隐藏设备内存(GB)
|
|
driver.execute_script("Object.defineProperty(navigator, 'deviceMemory', {get: () => 8})")
|
|
print("🛡️ Stealth JS 注入成功")
|
|
except Exception as e:
|
|
print(f"⚠️ Stealth JS 注入失败: {e}")
|
|
|
|
|
|
def init_persistent_driver():
|
|
"""初始化持久化 Chrome 实例(仅首次调用)"""
|
|
global _driver_instance
|
|
if _driver_instance is not None:
|
|
return _driver_instance
|
|
|
|
print("🔧 初始化持久化浏览器(首次启动,需过反爬,请稍候...)")
|
|
|
|
# 自动清理过期 Profile
|
|
_clean_old_profile()
|
|
|
|
# 配置 Chrome
|
|
chrome_options = Options()
|
|
chrome_options.add_argument(f"--user-data-dir={PERSISTENT_PROFILE_DIR}")
|
|
chrome_options.add_argument("--profile-directory=Default")
|
|
chrome_options.add_argument("--headless=new")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_argument("--disable-extensions")
|
|
chrome_options.add_argument("--disable-plugins-discovery")
|
|
chrome_options.add_argument("--disable-features=VizDisplayCompositor")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
chrome_options.add_experimental_option('useAutomationExtension', False)
|
|
# chrome_options.binary_location = "/usr/bin/chromium-browser"
|
|
# 启动
|
|
service = Service(ChromeDriverManager().install())
|
|
try:
|
|
_driver_instance = webdriver.Chrome(service=service, options=chrome_options)
|
|
_driver_instance.set_page_load_timeout(30)
|
|
_driver_instance.implicitly_wait(10)
|
|
|
|
# 关键:访问首页“暖机”,触发反爬验证
|
|
print("🌐 正在访问首页以通过反爬验证...")
|
|
_driver_instance.get(BASE_URL)
|
|
time.sleep(2 + random.uniform(0.5, 1.5))
|
|
|
|
# 注入 Stealth
|
|
stealth_driver(_driver_instance)
|
|
|
|
# 再访问一次列表页,确保状态稳定
|
|
_driver_instance.get(LIST_URL)
|
|
time.sleep(1.5 + random.uniform(0.5, 1.0))
|
|
|
|
print("✅ 浏览器初始化完成,后续请求将复用上下文")
|
|
return _driver_instance
|
|
except Exception as e:
|
|
if _driver_instance:
|
|
_driver_instance.quit()
|
|
_driver_instance = None
|
|
raise RuntimeError(f"浏览器初始化失败: {e}")
|
|
|
|
|
|
def _clean_old_profile():
|
|
"""清理过期 Profile(防积累)"""
|
|
if not os.path.exists(PERSISTENT_PROFILE_DIR):
|
|
return
|
|
try:
|
|
profile_age = time.time() - os.path.getctime(PERSISTENT_PROFILE_DIR)
|
|
if profile_age > PROFILE_CLEAN_THRESHOLD_DAYS * 86400:
|
|
print(f"🧹 Profile 目录已超 {PROFILE_CLEAN_THRESHOLD_DAYS} 天,正在清理...")
|
|
import shutil
|
|
shutil.rmtree(PERSISTENT_PROFILE_DIR, ignore_errors=True)
|
|
os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True)
|
|
print("✅ 已重建干净 Profile")
|
|
except Exception as e:
|
|
print(f"⚠️ Profile 清理失败(继续使用现有): {e}")
|
|
|
|
|
|
def fetch_page(url, max_retries=2):
|
|
"""带重试的页面获取(✅ 复用全局 driver)"""
|
|
global _driver_instance
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
if _driver_instance is None:
|
|
_driver_instance = init_persistent_driver()
|
|
|
|
# 拟人化:随机滚动 + 延迟
|
|
_driver_instance.get(url)
|
|
time.sleep(0.8 + random.uniform(0.3, 0.7))
|
|
|
|
# 检查是否被拦截
|
|
page_source = _driver_instance.page_source
|
|
if "403 Forbidden" in page_source or "challenge-platform" in page_source.lower():
|
|
raise Exception("被反爬拦截(403/Challenge)")
|
|
|
|
return page_source
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}")
|
|
if attempt < max_retries:
|
|
time.sleep(3 + random.uniform(1, 2))
|
|
# 重启 driver(极端情况)
|
|
if _driver_instance:
|
|
try:
|
|
_driver_instance.quit()
|
|
except:
|
|
pass
|
|
_driver_instance = None
|
|
else:
|
|
return None
|
|
|
|
|
|
# ====== I/O 监控器(保留)======
|
|
class IOMonitor:
|
|
def __init__(self):
|
|
self.records = {}
|
|
|
|
@contextmanager
|
|
def io_timer(self, io_type: str, desc: str = ""):
|
|
if io_type not in self.records:
|
|
self.records[io_type] = []
|
|
start = time.perf_counter()
|
|
try:
|
|
yield
|
|
finally:
|
|
duration = time.perf_counter() - start
|
|
self.records[io_type].append((duration, desc))
|
|
|
|
def summary(self):
|
|
print("\n" + "=" * 60)
|
|
print("📊 I/O 耗时总览(反爬优化版)")
|
|
print("=" * 60)
|
|
total_time = 0.0
|
|
for io_type, records in sorted(self.records.items()):
|
|
count = len(records)
|
|
total = sum(t for t, _ in records)
|
|
avg = total / count if count else 0
|
|
total_time += total
|
|
print(f"✅ {io_type:<15} | 调用 {count:2d} 次 | 总耗时 {total:6.2f}s | 平均 {avg:5.3f}s")
|
|
if count > 3:
|
|
slowest = sorted(records, key=lambda x: x[0], reverse=True)[:2]
|
|
for i, (t, d) in enumerate(slowest, 1):
|
|
print(f" └─ #{i} 慢: {t:5.2f}s → {d}")
|
|
print("-" * 60)
|
|
print(f"⏱️ I/O 总耗时: {total_time:6.2f}s")
|
|
print("=" * 60)
|
|
|
|
|
|
monitor = IOMonitor()
|
|
|
|
|
|
# ====== 工具函数(加监控)======
|
|
def load_history():
|
|
with monitor.io_timer("file_read", f"load_history: {DUPLICATE_CACHE_FILE}"):
|
|
if os.path.exists(DUPLICATE_CACHE_FILE):
|
|
try:
|
|
with open(DUPLICATE_CACHE_FILE, 'r', encoding='utf-8') as f:
|
|
return set(json.load(f))
|
|
except Exception as e:
|
|
print(f"⚠️ 历史缓存加载失败: {e}")
|
|
return set()
|
|
|
|
|
|
def save_history(history_set):
|
|
with monitor.io_timer("file_write", f"save_history: {DUPLICATE_CACHE_FILE}"):
|
|
try:
|
|
with open(DUPLICATE_CACHE_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(list(history_set), f, ensure_ascii=False, indent=2)
|
|
print(f"💾 缓存已更新:{len(history_set)} 条")
|
|
except Exception as e:
|
|
print(f"❌ 缓存保存失败: {e}")
|
|
|
|
|
|
def generate_fingerprint(title, pub_time):
|
|
raw = f"{title.strip()}|{pub_time.strip()}"
|
|
return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]
|
|
|
|
|
|
def is_today(pub_time_str: str) -> bool:
|
|
if not pub_time_str:
|
|
return False
|
|
try:
|
|
m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', pub_time_str)
|
|
if not m:
|
|
return False
|
|
year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
|
curr_year = datetime.now().year
|
|
if year > curr_year + 1 or year < curr_year - 5:
|
|
year = curr_year
|
|
pub_date = datetime(year, month, day).date()
|
|
return pub_date == datetime.now().date()
|
|
except Exception as e:
|
|
print(f"⚠️ 日期解析失败: '{pub_time_str}' → {e}")
|
|
return False
|
|
|
|
|
|
def parse_news_list(html, base_url=BASE_URL):
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
items = []
|
|
for table in soup.find_all('table', width="800", border="0"):
|
|
title_a = table.select_one('font[face="微软雅黑"][style*="font-size: 15pt"] a[href]')
|
|
if not title_a:
|
|
continue
|
|
title = title_a.get_text(strip=True)
|
|
link = urljoin(base_url, title_a['href'])
|
|
parsed = urlparse(link)
|
|
if not parsed.netloc.endswith("marketmatrix.net"):
|
|
continue
|
|
if "/topnews/" not in link:
|
|
continue
|
|
meta_fonts = table.select('font[size="2"]')
|
|
meta_combined = " ".join(f.get_text(strip=True) for f in meta_fonts)
|
|
time_match = ""
|
|
m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})(?:[^\d]+(\d{1,2}:\d{2}))?', meta_combined)
|
|
if m:
|
|
date_part = f"{m.group(1)}.{int(m.group(2)):02d}.{int(m.group(3)):02d}"
|
|
time_part = m.group(4) or "00:00"
|
|
time_match = f"{date_part} {time_part}"
|
|
else:
|
|
m2 = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', meta_combined)
|
|
if m2:
|
|
time_match = f"{m2.group(1)}.{int(m2.group(2)):02d}.{int(m2.group(3)):02d}"
|
|
category = ""
|
|
for txt in meta_fonts:
|
|
t = txt.get_text(strip=True).replace("主题:", "")
|
|
if re.search(r'\d{4}|编辑|新闻源|要点', t):
|
|
continue
|
|
if 2 < len(t) < 30:
|
|
category = t
|
|
break
|
|
items.append({
|
|
"标题": title,
|
|
"分类": category,
|
|
"发布时间": time_match,
|
|
"原文链接": link
|
|
})
|
|
return items
|
|
|
|
|
|
def extract_article_content(html):
|
|
if not html:
|
|
return "(访问失败)"
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
editor_p = None
|
|
for p in soup.find_all(['p', 'font']):
|
|
txt = p.get_text()
|
|
if "编辑:" in txt and "发布时间:" in txt:
|
|
editor_p = p
|
|
break
|
|
if editor_p:
|
|
container = editor_p
|
|
for _ in range(3):
|
|
if container.parent and container.parent.name == 'td':
|
|
container = container.parent
|
|
break
|
|
container = container.parent
|
|
if container and container.name == 'td':
|
|
paras = []
|
|
for p in container.find_all('p'):
|
|
t = p.get_text(strip=True)
|
|
if len(t) > 20 and not any(skip in t for skip in [
|
|
"编辑:", "发布时间:", "主题:", "新闻源:", "要点:", "开户", "保证金", "©"
|
|
]):
|
|
paras.append(t)
|
|
if paras:
|
|
return "\n".join(paras)
|
|
fallback = [p.get_text(strip=True) for p in soup.find_all('p') if 30 <= len(p.get_text()) <= 500]
|
|
return "\n".join(fallback[:15]) if fallback else "(提取失败)"
|
|
|
|
|
|
def sanitize_filename(name: str, max_len=80) -> str:
|
|
return re.sub(r'[\\/:*?"<>|\r\n\t]', ' ', name).strip()[:max_len] or "untitled"
|
|
|
|
|
|
def upload_to_knowledge_base_from_content(filename: str, content: str) -> dict:
|
|
print(f"📤 正在上传: {filename} (内存)")
|
|
try:
|
|
content_bytes = content.encode('utf-8')
|
|
with monitor.io_timer("network_write", f"upload: {filename}"):
|
|
files = {'file': (filename, content_bytes, 'text/markdown')}
|
|
data = {'indexId': KB_INDEX_ID}
|
|
headers = {'token': KB_TOKEN}
|
|
response = requests.post(KB_API_URL, files=files, data=data, headers=headers, timeout=30)
|
|
print(f" ← HTTP {response.status_code}")
|
|
try:
|
|
res_json = response.json()
|
|
code = res_json.get("code")
|
|
msg = res_json.get("message") or res_json.get("error") or "未知错误"
|
|
if code == 200 and res_json.get("fileId"):
|
|
print(f" ✅ 上传成功 → fileId: {res_json['fileId']}")
|
|
return {"code": 200, "fileId": res_json["fileId"], "message": "OK"}
|
|
else:
|
|
print(f" ⚠️ 业务失败 → code: {code}, msg: {msg}")
|
|
return {"code": code or -1, "fileId": "", "message": msg}
|
|
except Exception as json_e:
|
|
print(f" ❌ JSON 解析失败: {json_e}, 原始响应: {response.text[:200]}")
|
|
return {"code": -2, "fileId": "", "message": f"非JSON响应: {response.text[:100]}"}
|
|
except Exception as e:
|
|
print(f" ❌ 上传异常: {e}")
|
|
return {"code": -1, "fileId": "", "message": str(e)}
|
|
|
|
|
|
# ====== 主流程 ======
|
|
def get_all_list_pages():
|
|
pages = [LIST_URL]
|
|
current_url = LIST_URL
|
|
visited = {LIST_URL}
|
|
print("🔗 探测分页中(从首页开始)...")
|
|
|
|
for i in range(1, MAX_PAGES):
|
|
html = fetch_page(current_url)
|
|
if not html:
|
|
break
|
|
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
more_link = soup.find('a', string=re.compile(r'查看更多', re.IGNORECASE))
|
|
if not more_link:
|
|
more_link = soup.find('a', href=re.compile(r'news-list-\d+\.htm', re.IGNORECASE))
|
|
if not more_link or not more_link.get('href'):
|
|
break
|
|
|
|
next_href = more_link['href'].strip()
|
|
next_url = urljoin(current_url, next_href)
|
|
if not next_url.startswith(BASE_URL) or next_url in visited:
|
|
break
|
|
|
|
visited.add(next_url)
|
|
pages.append(next_url)
|
|
print(f" ➕ {len(pages):2d}. {next_url}")
|
|
current_url = next_url
|
|
|
|
return pages
|
|
|
|
|
|
def main():
|
|
overall_start = time.perf_counter()
|
|
print("▶ 启动高抗反爬抓取流程...")
|
|
|
|
# 1. 获取列表页(复用 driver)
|
|
print("\n[阶段1] 获取新闻列表(复用浏览器上下文)")
|
|
list_pages = get_all_list_pages()
|
|
all_items = []
|
|
|
|
for i, url in enumerate(list_pages, 1):
|
|
print(f"[{i}/{len(list_pages)}] 解析 {urlparse(url).path or '/'}")
|
|
html = fetch_page(url)
|
|
if not html:
|
|
continue
|
|
base_for_links = BASE_URL if "/list/" not in url else urljoin(BASE_URL, "/list/")
|
|
items = parse_news_list(html, base_url=base_for_links)
|
|
all_items.extend(items)
|
|
|
|
print(f"✅ 共提取 {len(all_items)} 条原始新闻")
|
|
|
|
# 2. 过滤今日 & 去重
|
|
print("\n[阶段2] 过滤今日 & 去重")
|
|
history = load_history()
|
|
new_items = []
|
|
for item in all_items:
|
|
if not is_today(item["发布时间"]):
|
|
continue
|
|
fp = generate_fingerprint(item["标题"], item["发布时间"])
|
|
if fp in history:
|
|
print(f"⏭️ 跳过重复: {item['标题'][:30]}...")
|
|
continue
|
|
new_items.append(item)
|
|
history.add(fp)
|
|
|
|
print(f"🆕 今日新增 {len(new_items)} 条新闻")
|
|
|
|
# 3. 抓取正文 + 上传
|
|
print("\n[阶段3] 抓取正文 & 上传(内存上传)")
|
|
results = []
|
|
for i, item in enumerate(new_items, 1):
|
|
title = item["标题"]
|
|
print(f"\n[{i}/{len(new_items)}] {title[:50]}...")
|
|
|
|
try:
|
|
with monitor.io_timer("network_read", f"article: {title[:20]}"):
|
|
html = fetch_page(item["原文链接"])
|
|
content = extract_article_content(html) if html else "(访问失败)"
|
|
item["正文内容"] = content
|
|
|
|
# 构建 Markdown 内容(内存中)
|
|
md_content = f"""# {title}
|
|
|
|
- 分类:{item['分类']}
|
|
- 发布时间:{item['发布时间']}
|
|
- 原文链接:{item['原文链接']}
|
|
|
|
---
|
|
|
|
{content}
|
|
"""
|
|
|
|
# 保存到磁盘(可选,用于审计)
|
|
safe_title = sanitize_filename(title)
|
|
md_file = f"{i:02d}_{safe_title}.md"
|
|
md_path = os.path.join(OUTPUT_DIR, md_file)
|
|
with monitor.io_timer("file_write", f"save_md: {md_file}"):
|
|
with open(md_path, "w", encoding="utf-8") as f:
|
|
f.write(md_content)
|
|
print(f" 💾 已保存:{md_file}")
|
|
|
|
# 上传(内存)
|
|
res = upload_to_knowledge_base_from_content(md_file, md_content)
|
|
item.update({
|
|
"知识库FileId": res.get("fileId", ""),
|
|
"上传状态": "✅" if res.get("code") == 200 else "❌",
|
|
"上传信息": res.get("message", "")[:100],
|
|
"指纹": fp
|
|
})
|
|
results.append(item)
|
|
|
|
except Exception as e:
|
|
print(f"❌ 处理失败: {title[:30]} | {e}")
|
|
item.update({
|
|
"知识库FileId": "",
|
|
"上传状态": "❌ 处理失败",
|
|
"上传信息": str(e)[:100],
|
|
"指纹": fp
|
|
})
|
|
results.append(item)
|
|
continue
|
|
|
|
# 4. 保存 & 退出
|
|
print("\n[阶段4] 保存缓存 & Excel")
|
|
save_history(history)
|
|
if results:
|
|
with monitor.io_timer("file_write", "save_excel"):
|
|
df = pd.DataFrame(results)
|
|
df.to_excel(OUTPUT_EXCEL, index=False, engine='openpyxl')
|
|
print(f"\n🎉 完成!今日新增 {len(results)} 条,Excel: {OUTPUT_EXCEL}")
|
|
else:
|
|
print(f"\nℹ️ 今日暂无新新闻发布(已探测 {len(list_pages)} 页)")
|
|
|
|
# 关键:不 quit driver!保留上下文供下次使用
|
|
print("📌 浏览器上下文已保留,下次运行将复用(加速)")
|
|
|
|
# 输出 I/O 总结
|
|
monitor.summary()
|
|
total_elapsed = time.perf_counter() - overall_start
|
|
print(f"\n🎯 总运行时间: {total_elapsed:.2f}s")
|
|
|
|
|
|
# ====== 优雅退出(保留 driver)======
|
|
def cleanup():
|
|
"""进程退出时清理(不 quit driver,除非强制)"""
|
|
global _driver_instance
|
|
if _driver_instance:
|
|
print("💡 提示:为加速下次运行,浏览器上下文已保留。")
|
|
print(" 如需彻底清理,请手动删除目录:", PERSISTENT_PROFILE_DIR)
|
|
# 不 quit,保留状态
|
|
# _driver_instance.quit()
|
|
|
|
|
|
import atexit
|
|
|
|
atexit.register(cleanup)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\n🛑 用户中断,正在退出...")
|
|
cleanup()
|
|
except Exception as e:
|
|
print(f"\n💥 严重错误: {e}")
|
|
cleanup()
|
|
raise
|