news 2026/4/15 18:00:29

Python爬虫实战:使用异步技术与AI解析构建淘宝商品价格监控系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:使用异步技术与AI解析构建淘宝商品价格监控系统

摘要

本文将深入探讨如何构建一个高效、稳定的淘宝商品价格监控系统。我们将采用最新的Python异步爬虫技术、反爬对抗策略以及智能解析方案,实现一个完整的商品价格追踪解决方案。本文适合有一定Python基础的开发者,涵盖从基础到高级的多个技术要点。

1. 项目概述与技术选型

淘宝价格监控系统需要解决以下核心问题:

  • 高效获取商品页面HTML内容

  • 智能解析动态加载的价格数据

  • 应对复杂的反爬机制

  • 实现定时监控与数据持久化

  • 异常处理与告警机制

技术栈选择:

  • 异步框架:aiohttp + asyncio(高性能异步请求)

  • 解析引擎:Playwright + BeautifulSoup4(动态页面支持)

  • 反爬对抗:代理池 + 请求头轮换 + 行为模拟

  • 数据存储:SQLite + CSV(轻量级方案)

  • 调度任务:APScheduler(定时任务管理)

  • 监控告警:SMTP邮件通知 + 日志系统

2. 环境配置与依赖安装

bash

# 创建虚拟环境 python -m venv taobao_monitor source taobao_monitor/bin/activate # Linux/Mac # taobao_monitor\Scripts\activate # Windows # 安装核心依赖 pip install aiohttp==3.8.4 pip install beautifulsoup4==4.12.2 pip install playwright==1.37.0 pip install apscheduler==3.10.4 pip install pandas==2.0.3 pip install loguru==0.7.2 pip install fake-useragent==1.4.0 # 安装Playwright浏览器驱动 playwright install chromium

3. 核心爬虫架构设计

我们的系统采用模块化设计,主要包含以下组件:

text

淘宝价格监控系统 ├── crawler/ # 爬虫核心 │ ├── async_client.py # 异步HTTP客户端 │ ├── parser.py # 页面解析器 │ └── anti_spider.py # 反爬对抗模块 ├── database/ # 数据存储 │ ├── models.py # 数据模型 │ └── storage.py # 存储接口 ├── scheduler/ # 任务调度 │ └── monitor.py # 监控调度器 ├── utils/ # 工具函数 │ ├── logger.py # 日志配置 │ └── notification.py # 通知模块 └── config.py # 配置文件

4. 淘宝页面智能解析策略

淘宝商品页面采用动态渲染技术,价格数据通常通过JavaScript加载。我们采用多解析策略:

python

# 策略1:直接API请求(效率最高) # 策略2:Playwright渲染解析(最稳定) # 策略3:HTML特征提取(备用方案)

5. 异步请求与性能优化

采用aiohttp实现异步并发请求,配合连接池和请求限流,确保高并发下的稳定性。

6. 数据存储与监控告警

使用SQLite存储历史价格数据,支持CSV导出。当价格变化超过阈值时,自动发送邮件通知。

7. 完整代码实现

以下是完整的淘宝价格监控系统实现:

python

""" 淘宝商品价格监控系统 v2.0 采用异步爬虫+智能解析+反爬对抗技术 作者:Python爬虫专家 创建时间:2024年1月 """ import asyncio import json import time import sqlite3 from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from urllib.parse import urlparse, parse_qs import aiohttp import pandas as pd from bs4 import BeautifulSoup from loguru import logger from fake_useragent import UserAgent from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger import playwright.async_api from playwright.async_api import async_playwright # ============ 配置模块 ============ class Config: """系统配置""" # 数据库配置 DB_PATH = "taobao_prices.db" # 请求配置 REQUEST_TIMEOUT = 30 MAX_CONCURRENT = 5 # 最大并发数 REQUEST_DELAY = 1 # 请求延迟(秒) # 解析配置 USE_PLAYWRIGHT = True # 是否使用Playwright PLAYWRIGHT_TIMEOUT = 30000 # Playwright超时(毫秒) # 监控配置 CHECK_INTERVAL = 3600 # 检查间隔(秒) PRICE_CHANGE_THRESHOLD = 0.1 # 价格变化阈值(10%) # 通知配置 EMAIL_ENABLED = False SMTP_SERVER = "smtp.example.com" SMTP_PORT = 587 # 代理配置 PROXY_POOL = [] # 代理池,格式: http://user:pass@ip:port # ============ 数据模型 ============ @dataclass class Product: """商品信息""" id: str url: str title: str current_price: float original_price: float discount: float sales: int shop_name: str timestamp: datetime # ============ 异步HTTP客户端 ============ class AsyncHttpClient: """异步HTTP客户端,支持代理和请求头轮换""" def __init__(self): self.session = None self.ua = UserAgent() self.request_count = 0 async def __aenter__(self): connector = aiohttp.TCPConnector( limit=Config.MAX_CONCURRENT, ssl=False ) self.session = aiohttp.ClientSession( connector=connector, headers=self._get_headers() ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_headers(self) -> Dict: """生成随机请求头""" return { 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } async def get(self, url: str, **kwargs) -> Optional[str]: """发送GET请求""" try: await asyncio.sleep(Config.REQUEST_DELAY) # 代理轮换 if Config.PROXY_POOL: proxy = Config.PROXY_POOL[self.request_count % len(Config.PROXY_POOL)] kwargs['proxy'] = proxy async with self.session.get( url, timeout=Config.REQUEST_TIMEOUT, headers=self._get_headers(), **kwargs ) as response: self.request_count += 1 if response.status == 200: content = await response.text() logger.info(f"成功获取页面: {url}") return content elif response.status == 403: logger.warning(f"访问被拒绝: {url}") else: logger.error(f"HTTP错误 {response.status}: {url}") except asyncio.TimeoutError: logger.error(f"请求超时: {url}") except Exception as e: logger.error(f"请求异常 {url}: {str(e)}") return None # ============ 智能解析器 ============ class TaobaoParser: """淘宝页面智能解析器""" @staticmethod def extract_product_id(url: str) -> Optional[str]: """从URL提取商品ID""" try: parsed = urlparse(url) params = parse_qs(parsed.query) # 多种可能的ID参数 for key in ['id', 'item_id', 'itemid']: if key in params: return params[key][0] # 从路径中提取 path_parts = parsed.path.split('/') for part in path_parts: if part.isdigit() and len(part) > 8: return part except Exception as e: logger.error(f"提取商品ID失败: {str(e)}") return None @staticmethod def parse_html(html: str, url: str) -> Optional[Product]: """解析HTML页面获取商品信息""" try: soup = BeautifulSoup(html, 'html.parser') # 方法1:尝试从JSON-LD数据中提取 json_ld = soup.find('script', type='application/ld+json') if json_ld: try: data = json.loads(json_ld.string) if '@type' in data and data['@type'] == 'Product': product_id = TaobaoParser.extract_product_id(url) return Product( id=product_id or '', url=url, title=data.get('name', ''), current_price=float(data.get('offers', {}).get('price', 0)), original_price=0, discount=0, sales=0, shop_name=data.get('brand', {}).get('name', ''), timestamp=datetime.now() ) except json.JSONDecodeError: pass # 方法2:从页面meta标签提取 meta_title = soup.find('meta', property='og:title') meta_price = soup.find('meta', property='og:product:price') if meta_title and meta_price: product_id = TaobaoParser.extract_product_id(url) title = meta_title.get('content', '') price = float(meta_price.get('content', 0)) return Product( id=product_id or '', url=url, title=title[:100], # 限制标题长度 current_price=price, original_price=price, discount=0, sales=0, shop_name='', timestamp=datetime.now() ) # 方法3:从页面特定元素提取 # 这里可以根据实际页面结构调整选择器 price_element = soup.select_one('.tb-rmb-num, .price') title_element = soup.select_one('.tb-detail-hd h1, .title') if price_element and title_element: try: price_text = price_element.text.strip().replace('¥', '').replace('¥', '') price = float(price_text) product_id = TaobaoParser.extract_product_id(url) return Product( id=product_id or '', url=url, title=title_element.text.strip()[:100], current_price=price, original_price=price, discount=0, sales=0, shop_name='', timestamp=datetime.now() ) except ValueError: pass except Exception as e: logger.error(f"解析HTML失败: {str(e)}") return None @staticmethod async def parse_with_playwright(url: str) -> Optional[Product]: """使用Playwright解析动态页面""" try: async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() # 屏蔽图片和CSS加快加载 await page.route("**/*.{png,jpg,jpeg,webp,gif,css}", lambda route: route.abort()) await page.goto(url, timeout=Config.PLAYWRIGHT_TIMEOUT) # 等待价格元素加载 await page.wait_for_load_state('networkidle') # 尝试多种选择器 selectors = [ '.tb-rmb-num', '.price', '[data-price]', '.tm-price' ] for selector in selectors: elements = await page.query_selector_all(selector) if elements: break # 执行JavaScript提取数据 product_data = await page.evaluate(""" () => { const result = {}; // 提取价格 const priceEl = document.querySelector('.tb-rmb-num') || document.querySelector('.price') || document.querySelector('[data-price]'); if (priceEl) { result.price = parseFloat(priceEl.textContent.replace(/[^0-9.]/g, '')); } // 提取标题 const titleEl = document.querySelector('.tb-detail-hd h1') || document.querySelector('.title') || document.querySelector('h1'); if (titleEl) { result.title = titleEl.textContent.trim(); } // 提取销量 const salesEl = document.querySelector('.tb-sell-counter') || document.querySelector('.sell-count'); if (salesEl) { result.sales = parseInt(salesEl.textContent.replace(/[^0-9]/g, '')); } return result; } """) await browser.close() if 'price' in product_data: product_id = TaobaoParser.extract_product_id(url) return Product( id=product_id or '', url=url, title=product_data.get('title', '')[:100], current_price=float(product_data.get('price', 0)), original_price=float(product_data.get('price', 0)), discount=0, sales=product_data.get('sales', 0), shop_name='', timestamp=datetime.now() ) except Exception as e: logger.error(f"Playwright解析失败: {str(e)}") return None # ============ 数据存储 ============ class PriceDatabase: """价格数据库管理""" def __init__(self, db_path: str = Config.DB_PATH): self.db_path = db_path self._init_database() def _init_database(self): """初始化数据库表""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 商品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id TEXT PRIMARY KEY, url TEXT NOT NULL, title TEXT, shop_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 价格历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS price_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id TEXT NOT NULL, price REAL NOT NULL, discount REAL, sales INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (product_id) REFERENCES products (id) ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_product_id ON price_history (product_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON price_history (timestamp)') conn.commit() conn.close() def save_product(self, product: Product): """保存商品信息""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 插入或更新商品信息 cursor.execute(''' INSERT OR REPLACE INTO products (id, url, title, shop_name) VALUES (?, ?, ?, ?) ''', (product.id, product.url, product.title, product.shop_name)) # 插入价格记录 cursor.execute(''' INSERT INTO price_history (product_id, price, discount, sales) VALUES (?, ?, ?, ?) ''', (product.id, product.current_price, product.discount, product.sales)) conn.commit() logger.info(f"保存商品数据: {product.title}") except Exception as e: logger.error(f"保存数据失败: {str(e)}") conn.rollback() finally: conn.close() def get_price_history(self, product_id: str, days: int = 30) -> pd.DataFrame: """获取商品价格历史""" conn = sqlite3.connect(self.db_path) query = ''' SELECT price, discount, sales, timestamp FROM price_history WHERE product_id = ? AND timestamp >= datetime('now', ?) ORDER BY timestamp DESC ''' df = pd.read_sql_query( query, conn, params=(product_id, f'-{days} days') ) conn.close() return df # ============ 监控调度器 ============ class PriceMonitor: """价格监控调度器""" def __init__(self): self.db = PriceDatabase() self.scheduler = AsyncIOScheduler() self.products = [] # 监控的商品URL列表 def add_product(self, url: str): """添加监控商品""" self.products.append(url) logger.info(f"添加监控商品: {url}") async def check_product(self, url: str): """检查单个商品价格""" logger.info(f"开始检查商品: {url}") product = None if Config.USE_PLAYWRIGHT: product = await TaobaoParser.parse_with_playwright(url) if not product: async with AsyncHttpClient() as client: html = await client.get(url) if html: product = TaobaoParser.parse_html(html, url) if product: self.db.save_product(product) await self._check_price_alert(product) else: logger.warning(f"无法获取商品信息: {url}") async def _check_price_alert(self, product: Product): """检查价格变化并发送告警""" history = self.db.get_price_history(product.id, days=1) if len(history) > 1: previous_price = history.iloc[1]['price'] current_price = product.current_price price_change = abs(current_price - previous_price) / previous_price if price_change >= Config.PRICE_CHANGE_THRESHOLD: message = ( f"价格告警!\n" f"商品: {product.title}\n" f"原价: ¥{previous_price:.2f}\n" f"现价: ¥{current_price:.2f}\n" f"变化: {price_change*100:.1f}%\n" f"链接: {product.url}" ) logger.warning(message) # 这里可以添加邮件或微信通知 async def check_all_products(self): """检查所有监控商品""" logger.info("开始检查所有商品价格") semaphore = asyncio.Semaphore(Config.MAX_CONCURRENT) async def limited_check(url): async with semaphore: await self.check_product(url) tasks = [limited_check(url) for url in self.products] await asyncio.gather(*tasks, return_exceptions=True) logger.info("所有商品检查完成") def start(self): """启动监控调度""" self.scheduler.add_job( self.check_all_products, IntervalTrigger(seconds=Config.CHECK_INTERVAL), id='price_monitor' ) self.scheduler.start() logger.info("价格监控已启动") def stop(self): """停止监控""" self.scheduler.shutdown() logger.info("价格监控已停止") # ============ 主程序 ============ async def main(): """主函数""" # 配置日志 logger.add( "logs/taobao_monitor_{time:YYYY-MM-DD}.log", rotation="1 day", retention="30 days", level="INFO" ) # 创建监控器 monitor = PriceMonitor() # 添加监控商品(示例) sample_products = [ "https://item.taobao.com/item.htm?id=1234567890", "https://detail.tmall.com/item.htm?id=2345678901", ] for url in sample_products: monitor.add_product(url) # 立即执行一次检查 await monitor.check_all_products() # 启动定时监控 monitor.start() # 保持程序运行 try: while True: await asyncio.sleep(1) except KeyboardInterrupt: monitor.stop() logger.info("程序已退出") # ============ 辅助工具 ============ def export_to_csv(product_id: str, days: int = 30): """导出价格历史到CSV""" db = PriceDatabase() df = db.get_price_history(product_id, days) if not df.empty: filename = f"price_history_{product_id}_{datetime.now().strftime('%Y%m%d')}.csv" df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到: {filename}") else: logger.warning("没有找到历史数据") def analyze_price_trend(product_id: str): """分析价格趋势""" db = PriceDatabase() df = db.get_price_history(product_id, days=90) if len(df) > 1: df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) # 计算统计指标 stats = { '最高价': df['price'].max(), '最低价': df['price'].min(), '平均价': df['price'].mean(), '当前价': df['price'].iloc[0], '波动率': df['price'].std() / df['price'].mean() } print("价格统计报告:") for key, value in stats.items(): print(f"{key}: {value:.2f}") # 建议购买时机 if df['price'].iloc[0] <= df['price'].quantile(0.3): print("建议:当前价格处于低位,适合购买") elif df['price'].iloc[0] >= df['price'].quantile(0.7): print("建议:当前价格处于高位,建议观望") else: print("建议:当前价格处于正常区间") if __name__ == "__main__": # 运行监控系统 asyncio.run(main()) # 使用示例 # export_to_csv("1234567890", 30) # analyze_price_trend("1234567890")

8. 部署与维护建议

部署方案

  1. 本地部署:适合个人使用,配置简单

  2. 服务器部署:使用Docker容器化部署

  3. 云函数部署:无服务器架构,按需运行

维护建议

  1. 定期更新:每季度更新一次解析规则

  2. 监控日志:每日检查错误日志

  3. 代理维护:定期更换代理IP池

  4. 数据备份:每周备份数据库

注意事项

  1. 遵守淘宝Robots协议

  2. 控制请求频率,避免封禁

  3. 仅用于个人学习和研究

  4. 商业使用需获得授权

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/6 0:48:44

QQ空间历史说说完整备份指南:永久保存你的青春记忆

QQ空间历史说说完整备份指南&#xff1a;永久保存你的青春记忆 【免费下载链接】GetQzonehistory 获取QQ空间发布的历史说说 项目地址: https://gitcode.com/GitHub_Trending/ge/GetQzonehistory 还在为那些承载着珍贵回忆的QQ空间说说可能丢失而烦恼吗&#xff1f;那些…

作者头像 李华
网站建设 2026/4/15 4:07:49

告别无效检索:我用LangExtract + Milvus升级 RAG 管道的实战复盘

今天我们聊下Google 的新开源库 LangExtract。虽然他已经开源了一段时间。但这段时间我一直在实际项目里用它&#xff0c;踩了不少坑&#xff0c;也总结了一些经验。所以&#xff0c;这篇文章不打算讲太多理论&#xff0c;咱们直接上代码&#xff0c;聊实践。 如果你和我一样&a…

作者头像 李华
网站建设 2026/4/15 0:46:24

iOS系统深度定制完全指南:Cowabunga Lite全方位操作手册

iOS系统深度定制完全指南&#xff1a;Cowabunga Lite全方位操作手册 【免费下载链接】CowabungaLite iOS 15 Customization Toolbox 项目地址: https://gitcode.com/gh_mirrors/co/CowabungaLite 想要让你的iPhone界面焕然一新却不想冒险越狱&#xff1f;iOS系统个性化定…

作者头像 李华
网站建设 2026/4/12 19:21:47

如何用League Director制作专业级英雄联盟回放视频

如何用League Director制作专业级英雄联盟回放视频 【免费下载链接】leaguedirector League Director is a tool for staging and recording videos from League of Legends replays 项目地址: https://gitcode.com/gh_mirrors/le/leaguedirector 想要制作出媲美职业联赛…

作者头像 李华