引言:电商时代的价格监控革命
在当今的电商时代,商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说,实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬虫系统,我们不仅可以捕捉最佳购买时机,还能进行市场趋势分析和竞争对手监控。本文将深入探讨如何使用Python最新技术构建一个高效、稳定的跨平台商品价格追踪系统。
技术栈概览
我们将在本项目中采用以下现代Python技术栈:
Playwright:微软开发的现代化浏览器自动化工具,支持无头浏览器操作
Asyncio:Python原生异步IO框架,实现高并发爬取
BeautifulSoup4/Selectolax:高效的HTML解析库
Pydantic:数据验证与设置管理
SQLAlchemy + SQLite:轻量级数据存储方案
FastAPI:可选的可视化API后端
代理池支持:处理反爬机制
机器学习预警:基于历史价格的智能预测
系统架构设计
1. 项目结构规划
text
price-tracker/ ├── crawlers/ # 各平台爬虫模块 │ ├── base.py # 基础爬虫类 │ ├── amazon.py # 亚马逊爬虫 │ ├── taobao.py # 淘宝爬虫 │ └── jd.py # 京东爬虫 ├── models/ # 数据模型 │ ├── product.py # 商品模型 │ └── price.py # 价格记录模型 ├── database/ # 数据库操作 ├── config/ # 配置文件 ├── utils/ # 工具函数 ├── alerts/ # 预警系统 └── main.py # 主程序入口
2. 核心功能模块
多平台适配器:统一接口支持不同电商平台
智能请求调度:自适应请求频率控制
反爬虫对抗:自动切换代理、用户代理和浏览器指纹
数据清洗管道:标准化不同平台的数据格式
实时监控与预警:基于规则和机器学习的价格异常检测
完整代码实现
1. 环境配置与依赖安装
首先创建并激活虚拟环境,然后安装所需依赖:
bash
# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装核心依赖 pip install playwright selectolax pydantic sqlalchemy aiohttp pip install asyncio nest-asyncio pandas numpy pip install fastapi uvicorn jinja2 # 可选,用于Web界面 # 安装Playwright浏览器 playwright install chromium
2. 配置管理模块
python
# config/settings.py from pydantic import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 数据库配置 DATABASE_URL: str = "sqlite:///./price_tracker.db" # 爬虫配置 REQUEST_TIMEOUT: int = 30 MAX_CONCURRENT_REQUESTS: int = 5 DEFAULT_USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 平台特定配置 AMAZON_DOMAINS: dict = { 'US': 'amazon.com', 'UK': 'amazon.co.uk', 'DE': 'amazon.de', 'JP': 'amazon.co.jp' } # 价格监控阈值 PRICE_DROP_THRESHOLD: float = 0.15 # 价格下降15%时触发通知 CHECK_INTERVAL_HOURS: int = 6 class Config: env_file = ".env" settings = Settings()3. 数据模型定义
python
# models/product.py from pydantic import BaseModel, HttpUrl from typing import Optional, List from datetime import datetime from enum import Enum class Platform(str, Enum): AMAZON = "amazon" TAOBAO = "taobao" JD = "jd" EBAY = "ebay" class Product(BaseModel): id: Optional[int] = None platform: Platform product_id: str url: HttpUrl title: str description: Optional[str] = None current_price: Optional[float] = None original_price: Optional[float] = None currency: str = "USD" availability: bool = True image_url: Optional[str] = None category: Optional[str] = None last_updated: datetime = datetime.now() class Config: orm_mode = True class PriceHistory(BaseModel): id: Optional[int] = None product_id: int price: float currency: str timestamp: datetime = datetime.now() availability: bool = True special_offer: Optional[str] = None class Config: orm_mode = True
4. 数据库管理
python
# database/database.py from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os from config.settings import settings Base = declarative_base() class ProductORM(Base): __tablename__ = "products" id = Column(Integer, primary_key=True, index=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), unique=True, nullable=False) url = Column(String(500), nullable=False) title = Column(String(500), nullable=False) description = Column(Text) current_price = Column(Float) original_price = Column(Float) currency = Column(String(10), default="USD") availability = Column(Boolean, default=True) image_url = Column(String(500)) category = Column(String(200)) last_updated = Column(DateTime, default=datetime.now) price_history = relationship("PriceHistoryORM", back_populates="product", cascade="all, delete-orphan") class PriceHistoryORM(Base): __tablename__ = "price_history" id = Column(Integer, primary_key=True, index=True) product_id = Column(Integer, ForeignKey("products.id")) price = Column(Float, nullable=False) currency = Column(String(10), default="USD") timestamp = Column(DateTime, default=datetime.now) availability = Column(Boolean, default=True) special_offer = Column(String(200)) product = relationship("ProductORM", back_populates="price_history") # 数据库初始化 engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def init_db(): Base.metadata.create_all(bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close()5. 基础爬虫类
python
# crawlers/base.py import asyncio import aiohttp from typing import Optional, Dict, Any from selectolax.parser import HTMLParser import random import time from dataclasses import dataclass from urllib.parse import urlparse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CrawlerConfig: user_agents: list = None proxy: Optional[str] = None timeout: int = 30 max_retries: int = 3 delay_range: tuple = (1, 3) class BaseCrawler: def __init__(self, config: CrawlerConfig = None): self.config = config or CrawlerConfig() if not self.config.user_agents: self.config.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" ] def get_random_user_agent(self) -> str: return random.choice(self.config.user_agents) async def fetch(self, url: str, session: aiohttp.ClientSession) -> Optional[str]: headers = { "User-Agent": self.get_random_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } for attempt in range(self.config.max_retries): try: async with session.get( url, headers=headers, proxy=self.config.proxy, timeout=aiohttp.ClientTimeout(total=self.config.timeout) ) as response: if response.status == 200: return await response.text() elif response.status == 429: # Too Many Requests wait_time = 2 ** attempt # Exponential backoff logger.warning(f"Rate limited. Waiting {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Failed to fetch {url}: Status {response.status}") return None except Exception as e: logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < self.config.max_retries - 1: await asyncio.sleep(2 ** attempt) else: return None # 随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(*self.config.delay_range)) return None def parse_html(self, html: str) -> HTMLParser: return HTMLParser(html)6. 亚马逊爬虫实现
python
# crawlers/amazon.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re from urllib.parse import urlparse, parse_qs import json import logging logger = logging.getLogger(__name__) class AmazonCrawler(BaseCrawler): def __init__(self, country: str = "US", config: CrawlerConfig = None): super().__init__(config) self.country = country.upper() self.base_domain = f"amazon.{self._get_domain_suffix()}" def _get_domain_suffix(self) -> str: domains = { "US": "com", "UK": "co.uk", "DE": "de", "JP": "co.jp", "FR": "fr", "IT": "it", "ES": "es", "CA": "ca", "AU": "com.au" } return domains.get(self.country, "com") def extract_product_id(self, url: str) -> Optional[str]: """提取亚马逊商品ID""" patterns = [ r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})', r'/product/([A-Z0-9]{10})', r'/dp/([A-Z0-9]{10})/' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) # 尝试从查询参数中提取 parsed = urlparse(url) query_params = parse_qs(parsed.query) if 'asin' in query_params: return query_params['asin'][0] return None async def get_product_info(self, url: str, session) -> Optional[Product]: html = await self.fetch(url, session) if not html: return None tree = self.parse_html(html) # 提取商品信息 product_data = self._parse_product_page(tree) if product_data: return Product( platform="amazon", product_id=product_data["product_id"], url=url, title=product_data["title"], current_price=product_data["current_price"], original_price=product_data["original_price"], currency=product_data["currency"], availability=product_data["availability"], image_url=product_data.get("image_url"), description=product_data.get("description") ) return None def _parse_product_page(self, tree) -> Optional[dict]: """解析亚马逊商品页面""" try: # 尝试从JSON-LD中提取数据 script_tags = tree.css('script[type="application/ld+json"]') for script in script_tags: try: data = json.loads(script.text()) if data.get("@type") == "Product": product_info = { "product_id": data.get("sku") or data.get("productID", ""), "title": data.get("name", ""), "description": data.get("description", ""), "image_url": data.get("image", ""), "currency": "USD", "availability": False, "current_price": None, "original_price": None } # 提取价格信息 if "offers" in data: offers = data["offers"] if isinstance(offers, dict): offers = [offers] for offer in offers: if offer.get("availability") == "https://schema.org/InStock": product_info["availability"] = True price = offer.get("price") if price: product_info["current_price"] = float(price) product_info["currency"] = offer.get("priceCurrency", "USD") return product_info except json.JSONDecodeError: continue # 备用解析方法:直接解析HTML元素 title_elem = tree.css_first('#productTitle') title = title_elem.text(strip=True) if title_elem else "" # 价格解析 price_selectors = [ '.a-price .a-offscreen', '#price_inside_buybox', '#priceblock_ourprice', '#priceblock_dealprice', '.a-color-price' ] current_price = None for selector in price_selectors: price_elem = tree.css_first(selector) if price_elem: price_text = price_elem.text(strip=True) price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if price_match: current_price = float(price_match.group()) break # 提取商品ID product_id = None asin_elem = tree.css_first('input[name="asin"]') if asin_elem and asin_elem.attributes.get('value'): product_id = asin_elem.attributes['value'] return { "product_id": product_id or "", "title": title, "current_price": current_price, "original_price": None, "currency": "USD", "availability": current_price is not None, "image_url": None, "description": "" } except Exception as e: logger.error(f"Error parsing Amazon page: {str(e)}") return None7. 淘宝爬虫实现
python
# crawlers/taobao.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re import json import logging logger = logging.getLogger(__name__) class TaobaoCrawler(BaseCrawler): def __init__(self, config: CrawlerConfig = None): super().__init__(config) self.base_url = "https://item.taobao.com" def extract_product_id(self, url: str) -> Optional[str]: """提取淘宝商品ID""" # 匹配淘宝商品URL模式 patterns = [ r'item\.taobao\.com/item\.htm\?id=(\d+)', r'taobao\.com/item\.htm\?id=(\d+)', r'/item/(\d+)\.html' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None async def get_product_info(self, url: str, session) -> Optional[Product]: # 淘宝需要特殊处理,可能需要使用无头浏览器 from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.get_random_user_agent(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, timeout=30000) # 等待页面加载完成 await page.wait_for_load_state('networkidle', timeout=10000) # 提取商品信息 title = await self._extract_title(page) price = await self._extract_price(page) product_id = self.extract_product_id(url) if title and product_id: return Product( platform="taobao", product_id=product_id, url=url, title=title, current_price=float(price) if price else None, currency="CNY", availability=price is not None ) except Exception as e: logger.error(f"Error crawling Taobao: {str(e)}") finally: await browser.close() return None async def _extract_title(self, page) -> Optional[str]: """提取商品标题""" try: # 尝试多种选择器 selectors = [ '.tb-detail-hd h1', '.tb-main-title', '[data-title]', '.title-text' ] for selector in selectors: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title.strip()) > 0: return title.strip() return None except Exception as e: logger.error(f"Error extracting title: {str(e)}") return None async def _extract_price(self, page) -> Optional[str]: """提取商品价格""" try: # 价格选择器 selectors = [ '.tb-rmb-num', '.tm-price', '.price', '[property="og:product:price"]' ] for selector in selectors: element = await page.query_selector(selector) if element: price_text = await element.text_content() if price_text: # 提取数字 match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if match: return match.group() return None except Exception as e: logger.error(f"Error extracting price: {str(e)}") return None8. 异步爬虫调度器
python
# crawlers/scheduler.py import asyncio from typing import List, Dict, Any, Optional import aiohttp from datetime import datetime import logging from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from crawlers.amazon import AmazonCrawler from crawlers.taobao import TaobaoCrawler from models.product import Product, Platform from database.database import SessionLocal, ProductORM, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class TrackingTask: url: str platform: Platform check_interval: int = 6 # 检查间隔(小时) last_checked: Optional[datetime] = None class PriceTrackerScheduler: def __init__(self, max_concurrent: int = 5): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.crawlers = { Platform.AMAZON: AmazonCrawler(), Platform.TAOBAO: TaobaoCrawler(), } self.tracking_tasks: List[TrackingTask] = [] def add_tracking_task(self, url: str, platform: Platform, check_interval: int = 6): """添加追踪任务""" task = TrackingTask(url=url, platform=platform, check_interval=check_interval) self.tracking_tasks.append(task) logger.info(f"Added tracking task: {url}") async def crawl_product(self, task: TrackingTask, session: aiohttp.ClientSession) -> Optional[Product]: """爬取单个商品信息""" async with self.semaphore: try: crawler = self.crawlers.get(task.platform) if not crawler: logger.error(f"No crawler for platform: {task.platform}") return None product = await crawler.get_product_info(task.url, session) if product: logger.info(f"Successfully crawled {product.title}") return product except Exception as e: logger.error(f"Error crawling {task.url}: {str(e)}") return None async def process_batch(self, tasks: List[TrackingTask]): """批量处理追踪任务""" async with aiohttp.ClientSession() as session: tasks_list = [self.crawl_product(task, session) for task in tasks] results = await asyncio.gather(*tasks_list, return_exceptions=True) successful_products = [] for result in results: if isinstance(result, Product): successful_products.append(result) elif isinstance(result, Exception): logger.error(f"Task failed with exception: {str(result)}") # 保存到数据库 await self.save_to_database(successful_products) return successful_products async def save_to_database(self, products: List[Product]): """保存商品信息到数据库""" db = SessionLocal() try: for product in products: # 检查商品是否已存在 db_product = db.query(ProductORM).filter( ProductORM.platform == product.platform, ProductORM.product_id == product.product_id ).first() if db_product: # 更新现有商品 price_changed = db_product.current_price != product.current_price db_product.current_price = product.current_price db_product.original_price = product.original_price or db_product.original_price db_product.availability = product.availability db_product.last_updated = datetime.now() # 如果价格变化,添加历史记录 if price_changed and product.current_price: price_history = PriceHistoryORM( product_id=db_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Updated product: {product.title}") else: # 添加新商品 new_product = ProductORM( platform=product.platform.value, product_id=product.product_id, url=str(product.url), title=product.title, description=product.description, current_price=product.current_price, original_price=product.original_price, currency=product.currency, availability=product.availability, image_url=product.image_url, category=product.category ) db.add(new_product) db.flush() # 获取新商品的ID # 添加初始价格记录 if product.current_price: price_history = PriceHistoryORM( product_id=new_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Added new product: {product.title}") db.commit() except Exception as e: db.rollback() logger.error(f"Error saving to database: {str(e)}") finally: db.close() async def run_continuous_monitoring(self, interval_hours: int = 6): """持续监控任务""" logger.info(f"Starting continuous monitoring with {interval_hours} hour interval") while True: try: # 筛选需要检查的任务 now = datetime.now() tasks_to_check = [ task for task in self.tracking_tasks if task.last_checked is None or (now - task.last_checked).total_seconds() >= task.check_interval * 3600 ] if tasks_to_check: logger.info(f"Checking {len(tasks_to_check)} products...") results = await self.process_batch(tasks_to_check) # 更新最后检查时间 for task in tasks_to_check: task.last_checked = now logger.info(f"Completed checking {len(results)} products") else: logger.info("No products need checking at this time") # 等待下一个检查周期 await asyncio.sleep(interval_hours * 3600) except KeyboardInterrupt: logger.info("Monitoring stopped by user") break except Exception as e: logger.error(f"Error in monitoring loop: {str(e)}") await asyncio.sleep(300) # 出错后等待5分钟再试9. 价格分析与预警系统
python
# alerts/price_analyzer.py import numpy as np from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import logging from dataclasses import dataclass from database.database import SessionLocal, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class PriceAlert: product_id: int alert_type: str # "price_drop", "price_increase", "availability_change" message: str threshold: float current_value: float previous_value: float timestamp: datetime class PriceAnalyzer: def __init__(self, price_drop_threshold: float = 0.15): self.price_drop_threshold = price_drop_threshold def analyze_price_history(self, product_id: int, days_back: int = 30) -> List[PriceAlert]: """分析商品价格历史""" alerts = [] db = SessionLocal() try: # 获取价格历史 cutoff_date = datetime.now() - timedelta(days=days_back) price_history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id, PriceHistoryORM.timestamp >= cutoff_date ).order_by(PriceHistoryORM.timestamp.desc()).all() if len(price_history) < 2: return alerts # 获取当前价格和之前的价格 current_price = price_history[0].price previous_price = price_history[1].price if current_price and previous_price: # 计算价格变化百分比 price_change = (current_price - previous_price) / previous_price # 检查价格下降 if price_change <= -self.price_drop_threshold: alerts.append(PriceAlert( product_id=product_id, alert_type="price_drop", message=f"价格下降 {abs(price_change*100):.1f}%", threshold=self.price_drop_threshold, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查价格上涨(可选) if price_change >= 0.10: # 上涨10% alerts.append(PriceAlert( product_id=product_id, alert_type="price_increase", message=f"价格上涨 {price_change*100:.1f}%", threshold=0.10, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查可用性变化 current_availability = price_history[0].availability previous_availability = price_history[1].availability if current_availability != previous_availability: status = "有货" if current_availability else "缺货" alerts.append(PriceAlert( product_id=product_id, alert_type="availability_change", message=f"商品状态变化: 现在{status}", threshold=0, current_value=current_availability, previous_value=previous_availability, timestamp=datetime.now() )) except Exception as e: logger.error(f"Error analyzing price history: {str(e)}") finally: db.close() return alerts def detect_price_patterns(self, prices: List[float]) -> Dict[str, Any]: """检测价格模式""" if len(prices) < 5: return {} prices_array = np.array(prices) # 计算统计信息 stats = { "mean": float(np.mean(prices_array)), "std": float(np.std(prices_array)), "min": float(np.min(prices_array)), "max": float(np.max(prices_array)), "current": float(prices_array[-1]), "is_lowest": prices_array[-1] <= np.min(prices_array[:-1]), "trend": "unknown" } # 检测趋势(简单线性回归) if len(prices_array) >= 3: x = np.arange(len(prices_array)) slope, intercept = np.polyfit(x, prices_array, 1) if slope < -0.01: stats["trend"] = "downward" elif slope > 0.01: stats["trend"] = "upward" else: stats["trend"] = "stable" return stats10. 主程序与Web界面
python
# main.py import asyncio import uvicorn from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import List import logging from crawlers.scheduler import PriceTrackerScheduler, TrackingTask from models.product import Product, Platform from database.database import get_db, init_db, ProductORM, PriceHistoryORM from config.settings import settings # 初始化FastAPI应用 app = FastAPI(title="Price Tracker API", version="1.0.0") templates = Jinja2Templates(directory="templates") # 全局调度器实例 scheduler = PriceTrackerScheduler(max_concurrent=settings.MAX_CONCURRENT_REQUESTS) @app.on_event("startup") async def startup_event(): """应用启动时初始化数据库和调度器""" init_db() logger.info("Application started") @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request, db: Session = Depends(get_db)): """价格追踪仪表板""" products = db.query(ProductORM).order_by(ProductORM.last_updated.desc()).limit(50).all() return templates.TemplateResponse("dashboard.html", {"request": request, "products": products}) @app.post("/api/track") async def track_product(url: str, platform: Platform): """开始追踪商品""" scheduler.add_tracking_task(url, platform) return {"message": f"Started tracking {url}", "platform": platform} @app.get("/api/products", response_model=List[Product]) async def get_products(db: Session = Depends(get_db)): """获取所有追踪的商品""" products = db.query(ProductORM).all() return products @app.get("/api/price-history/{product_id}") async def get_price_history(product_id: int, db: Session = Depends(get_db)): """获取商品价格历史""" history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id ).order_by(PriceHistoryORM.timestamp.desc()).limit(100).all() return [{ "price": h.price, "timestamp": h.timestamp, "availability": h.availability } for h in history] @app.post("/api/check-now") async def check_now(): """立即执行一次检查""" tasks = scheduler.tracking_tasks.copy() if tasks: results = await scheduler.process_batch(tasks) return {"checked": len(results), "message": "Check completed"} return {"message": "No products to check"} async def main(): """主函数""" # 示例:添加一些初始追踪任务 scheduler.add_tracking_task( "https://www.amazon.com/dp/B08N5WRWNW", # 示例商品 Platform.AMAZON ) scheduler.add_tracking_task( "https://item.taobao.com/item.htm?id=123456789", # 示例商品 Platform.TAOBAO ) # 启动监控循环 monitoring_task = asyncio.create_task( scheduler.run_continuous_monitoring(interval_hours=settings.CHECK_INTERVAL_HOURS) ) # 启动Web服务器 config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) # 同时运行监控和Web服务器 await asyncio.gather( monitoring_task, server.serve() ) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) asyncio.run(main())高级功能扩展
1. 机器学习价格预测
python
# alerts/predictor.py from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures import numpy as np from typing import List, Optional from datetime import datetime, timedelta class PricePredictor: def __init__(self): self.model = LinearRegression() def predict_future_prices(self, historical_prices: List[float], days_ahead: int = 7) -> List[float]: """预测未来价格""" if len(historical_prices) < 10: return [] # 准备特征矩阵 X = np.arange(len(historical_prices)).reshape(-1, 1) y = np.array(historical_prices) # 添加多项式特征 poly = PolynomialFeatures(degree=2) X_poly = poly.fit_transform(X) # 训练模型 self.model.fit(X_poly, y) # 预测未来价格 future_X = np.arange(len(historical_prices), len(historical_prices) + days_ahead).reshape(-1, 1) future_X_poly = poly.transform(future_X) predictions = self.model.predict(future_X_poly) return predictions.tolist()
2. 代理池集成
python
# utils/proxy_manager.py import aiohttp import asyncio from typing import List, Optional import random class ProxyManager: def __init__(self, proxy_urls: List[str] = None): self.proxies = proxy_urls or [] self.current_index = 0 def get_random_proxy(self) -> Optional[str]: """获取随机代理""" if not self.proxies: return None return random.choice(self.proxies) def get_next_proxy(self) -> Optional[str]: """轮询获取代理""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy async def validate_proxy(self, proxy_url: str) -> bool: """验证代理可用性""" try: async with aiohttp.ClientSession() as session: async with session.get( "https://httpbin.org/ip", proxy=proxy_url, timeout=10 ) as response: return response.status == 200 except: return False async def refresh_proxies(self, api_url: str): """从代理API刷新代理列表""" try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=30) as response: if response.status == 200: data = await response.json() self.proxies = data.get("proxies", []) except Exception as e: print(f"Error refreshing proxies: {e}")部署与优化建议
1. 部署方案
本地运行:适合个人使用,直接运行main.py
Docker容器化:便于部署和扩展
云服务器部署:使用AWS、Google Cloud或阿里云
Serverless架构:使用AWS Lambda或Google Cloud Functions
2. 性能优化
使用连接池管理数据库连接
实现缓存机制减少重复请求
分布式爬虫架构处理大规模监控
使用CDN存储静态资源
3. 反爬虫策略应对
随机化请求间隔
使用住宅代理IP
模拟人类浏览行为
定期更换User-Agent
处理验证码(使用OCR或第三方服务)
4. 数据可视化
使用Matplotlib或Plotly生成价格趋势图
构建实时仪表板显示价格变化
发送邮件或短信通知
法律与伦理考虑
遵守robots.txt:尊重网站的爬虫政策
限制请求频率:避免对目标网站造成负担
数据使用限制:仅用于个人或研究目的
用户隐私保护:不收集用户个人信息
遵守服务条款:遵守各电商平台的使用条款
结论
本文详细介绍了一个完整的跨平台商品价格追踪系统的设计与实现。通过结合现代Python技术栈,包括Playwright、异步编程、SQLAlchemy等工具,我们构建了一个强大、可扩展的价格监控解决方案。系统不仅能够实时追踪亚马逊、淘宝等主流电商平台的商品价格,还提供了数据分析、价格预测和智能预警功能。