|
|
# marketmatrix_today_upload_anti_bot.py# 功能:抓今日新闻(去重)+ 上传知识库 + 输出 Excel + 【高抗反爬 + I/O 监控】
import osimport reimport jsonimport hashlibimport timeimport randomfrom datetime import datetime, timedeltafrom urllib.parse import urljoin, urlparsefrom contextlib import contextmanager
import pandas as pdimport requestsfrom bs4 import BeautifulSoupfrom selenium import webdriverfrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.chrome.service import Servicefrom webdriver_manager.chrome import ChromeDriverManager
# ====== 配置区 ======KB_API_URL = "https://dcapi.homilychart.com/prod/deepchartapi/api/QwenKnowledge/add"KB_TOKEN = "d20287d0bb0298c73e540da7e3e1d7e3"KB_INDEX_ID = "30xe1fbox1"
BASE_URL = "http://marketmatrix.net"LIST_URL = urljoin(BASE_URL, "/news.htm")OUTPUT_DIR = "today_news"os.makedirs(OUTPUT_DIR, exist_ok=True)
OUTPUT_EXCEL = os.path.join(OUTPUT_DIR, f"today_{datetime.now().strftime('%Y%m%d')}.xlsx")DUPLICATE_CACHE_FILE = os.path.join(OUTPUT_DIR, "today_history.json")
# 关键:持久化浏览器配置目录PERSISTENT_PROFILE_DIR = os.path.join(os.getcwd(), "bailian_config")PROFILE_CLEAN_THRESHOLD_DAYS = 1 # 1天以上自动清理
MAX_PAGES = 30 # 适当降低,防深层页风控print(f"📅 系统当前日期: {datetime.now().strftime('%Y-%m-%d')}")
# ====== 全局 driver 单例(✅ 反爬核心)======_driver_instance = None
def stealth_driver(driver): """注入 Stealth JS,绕过常见 Bot 检测""" try: # 移除 webdriver 标志 driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") # 隐藏语言特征 driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})") # 隐藏插件列表 driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]})") # 隐藏硬件并发数(伪装普通设备) driver.execute_script("Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8})") # 隐藏设备内存(GB) driver.execute_script("Object.defineProperty(navigator, 'deviceMemory', {get: () => 8})") print("🛡️ Stealth JS 注入成功") except Exception as e: print(f"⚠️ Stealth JS 注入失败: {e}")
def init_persistent_driver(): """初始化持久化 Chrome 实例(仅首次调用)""" global _driver_instance if _driver_instance is not None: return _driver_instance
print("🔧 初始化持久化浏览器(首次启动,需过反爬,请稍候...)")
# 自动清理过期 Profile _clean_old_profile()
# 配置 Chrome chrome_options = Options() chrome_options.add_argument(f"--user-data-dir={PERSISTENT_PROFILE_DIR}") chrome_options.add_argument("--profile-directory=Default") chrome_options.add_argument("--headless=new") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-plugins-discovery") chrome_options.add_argument("--disable-features=VizDisplayCompositor") chrome_options.add_argument("--disable-gpu") chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) # chrome_options.binary_location = "/usr/bin/chromium-browser" # 启动 service = Service(ChromeDriverManager().install()) try: _driver_instance = webdriver.Chrome(service=service, options=chrome_options) _driver_instance.set_page_load_timeout(30) _driver_instance.implicitly_wait(10)
# 关键:访问首页“暖机”,触发反爬验证 print("🌐 正在访问首页以通过反爬验证...") _driver_instance.get(BASE_URL) time.sleep(2 + random.uniform(0.5, 1.5))
# 注入 Stealth stealth_driver(_driver_instance)
# 再访问一次列表页,确保状态稳定 _driver_instance.get(LIST_URL) time.sleep(1.5 + random.uniform(0.5, 1.0))
print("✅ 浏览器初始化完成,后续请求将复用上下文") return _driver_instance except Exception as e: if _driver_instance: _driver_instance.quit() _driver_instance = None raise RuntimeError(f"浏览器初始化失败: {e}")
def _clean_old_profile(): """清理过期 Profile(防积累)""" if not os.path.exists(PERSISTENT_PROFILE_DIR): return try: profile_age = time.time() - os.path.getctime(PERSISTENT_PROFILE_DIR) if profile_age > PROFILE_CLEAN_THRESHOLD_DAYS * 86400: print(f"🧹 Profile 目录已超 {PROFILE_CLEAN_THRESHOLD_DAYS} 天,正在清理...") import shutil shutil.rmtree(PERSISTENT_PROFILE_DIR, ignore_errors=True) os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) print("✅ 已重建干净 Profile") except Exception as e: print(f"⚠️ Profile 清理失败(继续使用现有): {e}")
def fetch_page(url, max_retries=2): """带重试的页面获取(✅ 复用全局 driver)""" global _driver_instance for attempt in range(max_retries + 1): try: if _driver_instance is None: _driver_instance = init_persistent_driver()
# 拟人化:随机滚动 + 延迟 _driver_instance.get(url) time.sleep(0.8 + random.uniform(0.3, 0.7))
# 检查是否被拦截 page_source = _driver_instance.page_source if "403 Forbidden" in page_source or "challenge-platform" in page_source.lower(): raise Exception("被反爬拦截(403/Challenge)")
return page_source
except Exception as e: print(f" ⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}") if attempt < max_retries: time.sleep(3 + random.uniform(1, 2)) # 重启 driver(极端情况) if _driver_instance: try: _driver_instance.quit() except: pass _driver_instance = None else: return None
# ====== I/O 监控器(保留)======class IOMonitor: def __init__(self): self.records = {}
@contextmanager def io_timer(self, io_type: str, desc: str = ""): if io_type not in self.records: self.records[io_type] = [] start = time.perf_counter() try: yield finally: duration = time.perf_counter() - start self.records[io_type].append((duration, desc))
def summary(self): print("\n" + "=" * 60) print("📊 I/O 耗时总览(反爬优化版)") print("=" * 60) total_time = 0.0 for io_type, records in sorted(self.records.items()): count = len(records) total = sum(t for t, _ in records) avg = total / count if count else 0 total_time += total print(f"✅ {io_type:<15} | 调用 {count:2d} 次 | 总耗时 {total:6.2f}s | 平均 {avg:5.3f}s") if count > 3: slowest = sorted(records, key=lambda x: x[0], reverse=True)[:2] for i, (t, d) in enumerate(slowest, 1): print(f" └─ #{i} 慢: {t:5.2f}s → {d}") print("-" * 60) print(f"⏱️ I/O 总耗时: {total_time:6.2f}s") print("=" * 60)
monitor = IOMonitor()
# ====== 工具函数(加监控)======def load_history(): with monitor.io_timer("file_read", f"load_history: {DUPLICATE_CACHE_FILE}"): if os.path.exists(DUPLICATE_CACHE_FILE): try: with open(DUPLICATE_CACHE_FILE, 'r', encoding='utf-8') as f: return set(json.load(f)) except Exception as e: print(f"⚠️ 历史缓存加载失败: {e}") return set()
def save_history(history_set): with monitor.io_timer("file_write", f"save_history: {DUPLICATE_CACHE_FILE}"): try: with open(DUPLICATE_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(list(history_set), f, ensure_ascii=False, indent=2) print(f"💾 缓存已更新:{len(history_set)} 条") except Exception as e: print(f"❌ 缓存保存失败: {e}")
def generate_fingerprint(title, pub_time): raw = f"{title.strip()}|{pub_time.strip()}" return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]
def is_today(pub_time_str: str) -> bool: if not pub_time_str: return False try: m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', pub_time_str) if not m: return False year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) curr_year = datetime.now().year if year > curr_year + 1 or year < curr_year - 5: year = curr_year pub_date = datetime(year, month, day).date() return pub_date == datetime.now().date() except Exception as e: print(f"⚠️ 日期解析失败: '{pub_time_str}' → {e}") return False
def parse_news_list(html, base_url=BASE_URL): soup = BeautifulSoup(html, 'lxml') items = [] for table in soup.find_all('table', width="800", border="0"): title_a = table.select_one('font[face="微软雅黑"][style*="font-size: 15pt"] a[href]') if not title_a: continue title = title_a.get_text(strip=True) link = urljoin(base_url, title_a['href']) parsed = urlparse(link) if not parsed.netloc.endswith("marketmatrix.net"): continue if "/topnews/" not in link: continue meta_fonts = table.select('font[size="2"]') meta_combined = " ".join(f.get_text(strip=True) for f in meta_fonts) time_match = "" m = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})(?:[^\d]+(\d{1,2}:\d{2}))?', meta_combined) if m: date_part = f"{m.group(1)}.{int(m.group(2)):02d}.{int(m.group(3)):02d}" time_part = m.group(4) or "00:00" time_match = f"{date_part} {time_part}" else: m2 = re.search(r'(\d{4})[^\d]+(\d{1,2})[^\d]+(\d{1,2})', meta_combined) if m2: time_match = f"{m2.group(1)}.{int(m2.group(2)):02d}.{int(m2.group(3)):02d}" category = "" for txt in meta_fonts: t = txt.get_text(strip=True).replace("主题:", "") if re.search(r'\d{4}|编辑|新闻源|要点', t): continue if 2 < len(t) < 30: category = t break items.append({ "标题": title, "分类": category, "发布时间": time_match, "原文链接": link }) return items
def extract_article_content(html): if not html: return "(访问失败)" soup = BeautifulSoup(html, 'lxml') editor_p = None for p in soup.find_all(['p', 'font']): txt = p.get_text() if "编辑:" in txt and "发布时间:" in txt: editor_p = p break if editor_p: container = editor_p for _ in range(3): if container.parent and container.parent.name == 'td': container = container.parent break container = container.parent if container and container.name == 'td': paras = [] for p in container.find_all('p'): t = p.get_text(strip=True) if len(t) > 20 and not any(skip in t for skip in [ "编辑:", "发布时间:", "主题:", "新闻源:", "要点:", "开户", "保证金", "©" ]): paras.append(t) if paras: return "\n".join(paras) fallback = [p.get_text(strip=True) for p in soup.find_all('p') if 30 <= len(p.get_text()) <= 500] return "\n".join(fallback[:15]) if fallback else "(提取失败)"
def sanitize_filename(name: str, max_len=80) -> str: return re.sub(r'[\\/:*?"<>|\r\n\t]', ' ', name).strip()[:max_len] or "untitled"
def upload_to_knowledge_base_from_content(filename: str, content: str) -> dict: print(f"📤 正在上传: {filename} (内存)") try: content_bytes = content.encode('utf-8') with monitor.io_timer("network_write", f"upload: {filename}"): files = {'file': (filename, content_bytes, 'text/markdown')} data = {'indexId': KB_INDEX_ID} headers = {'token': KB_TOKEN} response = requests.post(KB_API_URL, files=files, data=data, headers=headers, timeout=30) print(f" ← HTTP {response.status_code}") try: res_json = response.json() code = res_json.get("code") msg = res_json.get("message") or res_json.get("error") or "未知错误" if code == 200 and res_json.get("fileId"): print(f" ✅ 上传成功 → fileId: {res_json['fileId']}") return {"code": 200, "fileId": res_json["fileId"], "message": "OK"} else: print(f" ⚠️ 业务失败 → code: {code}, msg: {msg}") return {"code": code or -1, "fileId": "", "message": msg} except Exception as json_e: print(f" ❌ JSON 解析失败: {json_e}, 原始响应: {response.text[:200]}") return {"code": -2, "fileId": "", "message": f"非JSON响应: {response.text[:100]}"} except Exception as e: print(f" ❌ 上传异常: {e}") return {"code": -1, "fileId": "", "message": str(e)}
# ====== 主流程 ======def get_all_list_pages(): pages = [LIST_URL] current_url = LIST_URL visited = {LIST_URL} print("🔗 探测分页中(从首页开始)...")
for i in range(1, MAX_PAGES): html = fetch_page(current_url) if not html: break
soup = BeautifulSoup(html, 'lxml') more_link = soup.find('a', string=re.compile(r'查看更多', re.IGNORECASE)) if not more_link: more_link = soup.find('a', href=re.compile(r'news-list-\d+\.htm', re.IGNORECASE)) if not more_link or not more_link.get('href'): break
next_href = more_link['href'].strip() next_url = urljoin(current_url, next_href) if not next_url.startswith(BASE_URL) or next_url in visited: break
visited.add(next_url) pages.append(next_url) print(f" ➕ {len(pages):2d}. {next_url}") current_url = next_url
return pages
def main(): overall_start = time.perf_counter() print("▶ 启动高抗反爬抓取流程...")
# 1. 获取列表页(复用 driver) print("\n[阶段1] 获取新闻列表(复用浏览器上下文)") list_pages = get_all_list_pages() all_items = []
for i, url in enumerate(list_pages, 1): print(f"[{i}/{len(list_pages)}] 解析 {urlparse(url).path or '/'}") html = fetch_page(url) if not html: continue base_for_links = BASE_URL if "/list/" not in url else urljoin(BASE_URL, "/list/") items = parse_news_list(html, base_url=base_for_links) all_items.extend(items)
print(f"✅ 共提取 {len(all_items)} 条原始新闻")
# 2. 过滤今日 & 去重 print("\n[阶段2] 过滤今日 & 去重") history = load_history() new_items = [] for item in all_items: if not is_today(item["发布时间"]): continue fp = generate_fingerprint(item["标题"], item["发布时间"]) if fp in history: print(f"⏭️ 跳过重复: {item['标题'][:30]}...") continue new_items.append(item) history.add(fp)
print(f"🆕 今日新增 {len(new_items)} 条新闻")
# 3. 抓取正文 + 上传 print("\n[阶段3] 抓取正文 & 上传(内存上传)") results = [] for i, item in enumerate(new_items, 1): title = item["标题"] print(f"\n[{i}/{len(new_items)}] {title[:50]}...")
try: with monitor.io_timer("network_read", f"article: {title[:20]}"): html = fetch_page(item["原文链接"]) content = extract_article_content(html) if html else "(访问失败)" item["正文内容"] = content
# 构建 Markdown 内容(内存中) md_content = f"""# {title}
- 分类:{item['分类']}- 发布时间:{item['发布时间']}- 原文链接:{item['原文链接']}
---
{content}"""
# 保存到磁盘(可选,用于审计) safe_title = sanitize_filename(title) md_file = f"{i:02d}_{safe_title}.md" md_path = os.path.join(OUTPUT_DIR, md_file) with monitor.io_timer("file_write", f"save_md: {md_file}"): with open(md_path, "w", encoding="utf-8") as f: f.write(md_content) print(f" 💾 已保存:{md_file}")
# 上传(内存) res = upload_to_knowledge_base_from_content(md_file, md_content) item.update({ "知识库FileId": res.get("fileId", ""), "上传状态": "✅" if res.get("code") == 200 else "❌", "上传信息": res.get("message", "")[:100], "指纹": fp }) results.append(item)
except Exception as e: print(f"❌ 处理失败: {title[:30]} | {e}") item.update({ "知识库FileId": "", "上传状态": "❌ 处理失败", "上传信息": str(e)[:100], "指纹": fp }) results.append(item) continue
# 4. 保存 & 退出 print("\n[阶段4] 保存缓存 & Excel") save_history(history) if results: with monitor.io_timer("file_write", "save_excel"): df = pd.DataFrame(results) df.to_excel(OUTPUT_EXCEL, index=False, engine='openpyxl') print(f"\n🎉 完成!今日新增 {len(results)} 条,Excel: {OUTPUT_EXCEL}") else: print(f"\nℹ️ 今日暂无新新闻发布(已探测 {len(list_pages)} 页)")
# 关键:不 quit driver!保留上下文供下次使用 print("📌 浏览器上下文已保留,下次运行将复用(加速)")
# 输出 I/O 总结 monitor.summary() total_elapsed = time.perf_counter() - overall_start print(f"\n🎯 总运行时间: {total_elapsed:.2f}s")
# ====== 优雅退出(保留 driver)======def cleanup(): """进程退出时清理(不 quit driver,除非强制)""" global _driver_instance if _driver_instance: print("💡 提示:为加速下次运行,浏览器上下文已保留。") print(" 如需彻底清理,请手动删除目录:", PERSISTENT_PROFILE_DIR) # 不 quit,保留状态 # _driver_instance.quit()
import atexit
atexit.register(cleanup)
if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n🛑 用户中断,正在退出...") cleanup() except Exception as e: print(f"\n💥 严重错误: {e}") cleanup() raise
|