news 2026/1/14 19:50:52

Python实现跨平台商品价格追踪:构建智能比价爬虫系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python实现跨平台商品价格追踪:构建智能比价爬虫系统

引言:电商时代的价格监控革命

在当今的电商时代,商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说,实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬虫系统,我们不仅可以捕捉最佳购买时机,还能进行市场趋势分析和竞争对手监控。本文将深入探讨如何使用Python最新技术构建一个高效、稳定的跨平台商品价格追踪系统。

技术栈概览

我们将在本项目中采用以下现代Python技术栈:

  • Playwright:微软开发的现代化浏览器自动化工具,支持无头浏览器操作

  • Asyncio:Python原生异步IO框架,实现高并发爬取

  • BeautifulSoup4/Selectolax:高效的HTML解析库

  • Pydantic:数据验证与设置管理

  • SQLAlchemy + SQLite:轻量级数据存储方案

  • FastAPI:可选的可视化API后端

  • 代理池支持:处理反爬机制

  • 机器学习预警:基于历史价格的智能预测

系统架构设计

1. 项目结构规划

text

price-tracker/ ├── crawlers/ # 各平台爬虫模块 │ ├── base.py # 基础爬虫类 │ ├── amazon.py # 亚马逊爬虫 │ ├── taobao.py # 淘宝爬虫 │ └── jd.py # 京东爬虫 ├── models/ # 数据模型 │ ├── product.py # 商品模型 │ └── price.py # 价格记录模型 ├── database/ # 数据库操作 ├── config/ # 配置文件 ├── utils/ # 工具函数 ├── alerts/ # 预警系统 └── main.py # 主程序入口

2. 核心功能模块

  • 多平台适配器:统一接口支持不同电商平台

  • 智能请求调度:自适应请求频率控制

  • 反爬虫对抗:自动切换代理、用户代理和浏览器指纹

  • 数据清洗管道:标准化不同平台的数据格式

  • 实时监控与预警:基于规则和机器学习的价格异常检测

完整代码实现

1. 环境配置与依赖安装

首先创建并激活虚拟环境,然后安装所需依赖:

bash

# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装核心依赖 pip install playwright selectolax pydantic sqlalchemy aiohttp pip install asyncio nest-asyncio pandas numpy pip install fastapi uvicorn jinja2 # 可选,用于Web界面 # 安装Playwright浏览器 playwright install chromium

2. 配置管理模块

python

# config/settings.py from pydantic import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 数据库配置 DATABASE_URL: str = "sqlite:///./price_tracker.db" # 爬虫配置 REQUEST_TIMEOUT: int = 30 MAX_CONCURRENT_REQUESTS: int = 5 DEFAULT_USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 平台特定配置 AMAZON_DOMAINS: dict = { 'US': 'amazon.com', 'UK': 'amazon.co.uk', 'DE': 'amazon.de', 'JP': 'amazon.co.jp' } # 价格监控阈值 PRICE_DROP_THRESHOLD: float = 0.15 # 价格下降15%时触发通知 CHECK_INTERVAL_HOURS: int = 6 class Config: env_file = ".env" settings = Settings()

3. 数据模型定义

python

# models/product.py from pydantic import BaseModel, HttpUrl from typing import Optional, List from datetime import datetime from enum import Enum class Platform(str, Enum): AMAZON = "amazon" TAOBAO = "taobao" JD = "jd" EBAY = "ebay" class Product(BaseModel): id: Optional[int] = None platform: Platform product_id: str url: HttpUrl title: str description: Optional[str] = None current_price: Optional[float] = None original_price: Optional[float] = None currency: str = "USD" availability: bool = True image_url: Optional[str] = None category: Optional[str] = None last_updated: datetime = datetime.now() class Config: orm_mode = True class PriceHistory(BaseModel): id: Optional[int] = None product_id: int price: float currency: str timestamp: datetime = datetime.now() availability: bool = True special_offer: Optional[str] = None class Config: orm_mode = True

4. 数据库管理

python

# database/database.py from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os from config.settings import settings Base = declarative_base() class ProductORM(Base): __tablename__ = "products" id = Column(Integer, primary_key=True, index=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), unique=True, nullable=False) url = Column(String(500), nullable=False) title = Column(String(500), nullable=False) description = Column(Text) current_price = Column(Float) original_price = Column(Float) currency = Column(String(10), default="USD") availability = Column(Boolean, default=True) image_url = Column(String(500)) category = Column(String(200)) last_updated = Column(DateTime, default=datetime.now) price_history = relationship("PriceHistoryORM", back_populates="product", cascade="all, delete-orphan") class PriceHistoryORM(Base): __tablename__ = "price_history" id = Column(Integer, primary_key=True, index=True) product_id = Column(Integer, ForeignKey("products.id")) price = Column(Float, nullable=False) currency = Column(String(10), default="USD") timestamp = Column(DateTime, default=datetime.now) availability = Column(Boolean, default=True) special_offer = Column(String(200)) product = relationship("ProductORM", back_populates="price_history") # 数据库初始化 engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def init_db(): Base.metadata.create_all(bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close()

5. 基础爬虫类

python

# crawlers/base.py import asyncio import aiohttp from typing import Optional, Dict, Any from selectolax.parser import HTMLParser import random import time from dataclasses import dataclass from urllib.parse import urlparse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CrawlerConfig: user_agents: list = None proxy: Optional[str] = None timeout: int = 30 max_retries: int = 3 delay_range: tuple = (1, 3) class BaseCrawler: def __init__(self, config: CrawlerConfig = None): self.config = config or CrawlerConfig() if not self.config.user_agents: self.config.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" ] def get_random_user_agent(self) -> str: return random.choice(self.config.user_agents) async def fetch(self, url: str, session: aiohttp.ClientSession) -> Optional[str]: headers = { "User-Agent": self.get_random_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } for attempt in range(self.config.max_retries): try: async with session.get( url, headers=headers, proxy=self.config.proxy, timeout=aiohttp.ClientTimeout(total=self.config.timeout) ) as response: if response.status == 200: return await response.text() elif response.status == 429: # Too Many Requests wait_time = 2 ** attempt # Exponential backoff logger.warning(f"Rate limited. Waiting {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Failed to fetch {url}: Status {response.status}") return None except Exception as e: logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < self.config.max_retries - 1: await asyncio.sleep(2 ** attempt) else: return None # 随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(*self.config.delay_range)) return None def parse_html(self, html: str) -> HTMLParser: return HTMLParser(html)

6. 亚马逊爬虫实现

python

# crawlers/amazon.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re from urllib.parse import urlparse, parse_qs import json import logging logger = logging.getLogger(__name__) class AmazonCrawler(BaseCrawler): def __init__(self, country: str = "US", config: CrawlerConfig = None): super().__init__(config) self.country = country.upper() self.base_domain = f"amazon.{self._get_domain_suffix()}" def _get_domain_suffix(self) -> str: domains = { "US": "com", "UK": "co.uk", "DE": "de", "JP": "co.jp", "FR": "fr", "IT": "it", "ES": "es", "CA": "ca", "AU": "com.au" } return domains.get(self.country, "com") def extract_product_id(self, url: str) -> Optional[str]: """提取亚马逊商品ID""" patterns = [ r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})', r'/product/([A-Z0-9]{10})', r'/dp/([A-Z0-9]{10})/' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) # 尝试从查询参数中提取 parsed = urlparse(url) query_params = parse_qs(parsed.query) if 'asin' in query_params: return query_params['asin'][0] return None async def get_product_info(self, url: str, session) -> Optional[Product]: html = await self.fetch(url, session) if not html: return None tree = self.parse_html(html) # 提取商品信息 product_data = self._parse_product_page(tree) if product_data: return Product( platform="amazon", product_id=product_data["product_id"], url=url, title=product_data["title"], current_price=product_data["current_price"], original_price=product_data["original_price"], currency=product_data["currency"], availability=product_data["availability"], image_url=product_data.get("image_url"), description=product_data.get("description") ) return None def _parse_product_page(self, tree) -> Optional[dict]: """解析亚马逊商品页面""" try: # 尝试从JSON-LD中提取数据 script_tags = tree.css('script[type="application/ld+json"]') for script in script_tags: try: data = json.loads(script.text()) if data.get("@type") == "Product": product_info = { "product_id": data.get("sku") or data.get("productID", ""), "title": data.get("name", ""), "description": data.get("description", ""), "image_url": data.get("image", ""), "currency": "USD", "availability": False, "current_price": None, "original_price": None } # 提取价格信息 if "offers" in data: offers = data["offers"] if isinstance(offers, dict): offers = [offers] for offer in offers: if offer.get("availability") == "https://schema.org/InStock": product_info["availability"] = True price = offer.get("price") if price: product_info["current_price"] = float(price) product_info["currency"] = offer.get("priceCurrency", "USD") return product_info except json.JSONDecodeError: continue # 备用解析方法:直接解析HTML元素 title_elem = tree.css_first('#productTitle') title = title_elem.text(strip=True) if title_elem else "" # 价格解析 price_selectors = [ '.a-price .a-offscreen', '#price_inside_buybox', '#priceblock_ourprice', '#priceblock_dealprice', '.a-color-price' ] current_price = None for selector in price_selectors: price_elem = tree.css_first(selector) if price_elem: price_text = price_elem.text(strip=True) price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if price_match: current_price = float(price_match.group()) break # 提取商品ID product_id = None asin_elem = tree.css_first('input[name="asin"]') if asin_elem and asin_elem.attributes.get('value'): product_id = asin_elem.attributes['value'] return { "product_id": product_id or "", "title": title, "current_price": current_price, "original_price": None, "currency": "USD", "availability": current_price is not None, "image_url": None, "description": "" } except Exception as e: logger.error(f"Error parsing Amazon page: {str(e)}") return None

7. 淘宝爬虫实现

python

# crawlers/taobao.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re import json import logging logger = logging.getLogger(__name__) class TaobaoCrawler(BaseCrawler): def __init__(self, config: CrawlerConfig = None): super().__init__(config) self.base_url = "https://item.taobao.com" def extract_product_id(self, url: str) -> Optional[str]: """提取淘宝商品ID""" # 匹配淘宝商品URL模式 patterns = [ r'item\.taobao\.com/item\.htm\?id=(\d+)', r'taobao\.com/item\.htm\?id=(\d+)', r'/item/(\d+)\.html' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None async def get_product_info(self, url: str, session) -> Optional[Product]: # 淘宝需要特殊处理,可能需要使用无头浏览器 from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.get_random_user_agent(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, timeout=30000) # 等待页面加载完成 await page.wait_for_load_state('networkidle', timeout=10000) # 提取商品信息 title = await self._extract_title(page) price = await self._extract_price(page) product_id = self.extract_product_id(url) if title and product_id: return Product( platform="taobao", product_id=product_id, url=url, title=title, current_price=float(price) if price else None, currency="CNY", availability=price is not None ) except Exception as e: logger.error(f"Error crawling Taobao: {str(e)}") finally: await browser.close() return None async def _extract_title(self, page) -> Optional[str]: """提取商品标题""" try: # 尝试多种选择器 selectors = [ '.tb-detail-hd h1', '.tb-main-title', '[data-title]', '.title-text' ] for selector in selectors: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title.strip()) > 0: return title.strip() return None except Exception as e: logger.error(f"Error extracting title: {str(e)}") return None async def _extract_price(self, page) -> Optional[str]: """提取商品价格""" try: # 价格选择器 selectors = [ '.tb-rmb-num', '.tm-price', '.price', '[property="og:product:price"]' ] for selector in selectors: element = await page.query_selector(selector) if element: price_text = await element.text_content() if price_text: # 提取数字 match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if match: return match.group() return None except Exception as e: logger.error(f"Error extracting price: {str(e)}") return None

8. 异步爬虫调度器

python

# crawlers/scheduler.py import asyncio from typing import List, Dict, Any, Optional import aiohttp from datetime import datetime import logging from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from crawlers.amazon import AmazonCrawler from crawlers.taobao import TaobaoCrawler from models.product import Product, Platform from database.database import SessionLocal, ProductORM, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class TrackingTask: url: str platform: Platform check_interval: int = 6 # 检查间隔(小时) last_checked: Optional[datetime] = None class PriceTrackerScheduler: def __init__(self, max_concurrent: int = 5): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.crawlers = { Platform.AMAZON: AmazonCrawler(), Platform.TAOBAO: TaobaoCrawler(), } self.tracking_tasks: List[TrackingTask] = [] def add_tracking_task(self, url: str, platform: Platform, check_interval: int = 6): """添加追踪任务""" task = TrackingTask(url=url, platform=platform, check_interval=check_interval) self.tracking_tasks.append(task) logger.info(f"Added tracking task: {url}") async def crawl_product(self, task: TrackingTask, session: aiohttp.ClientSession) -> Optional[Product]: """爬取单个商品信息""" async with self.semaphore: try: crawler = self.crawlers.get(task.platform) if not crawler: logger.error(f"No crawler for platform: {task.platform}") return None product = await crawler.get_product_info(task.url, session) if product: logger.info(f"Successfully crawled {product.title}") return product except Exception as e: logger.error(f"Error crawling {task.url}: {str(e)}") return None async def process_batch(self, tasks: List[TrackingTask]): """批量处理追踪任务""" async with aiohttp.ClientSession() as session: tasks_list = [self.crawl_product(task, session) for task in tasks] results = await asyncio.gather(*tasks_list, return_exceptions=True) successful_products = [] for result in results: if isinstance(result, Product): successful_products.append(result) elif isinstance(result, Exception): logger.error(f"Task failed with exception: {str(result)}") # 保存到数据库 await self.save_to_database(successful_products) return successful_products async def save_to_database(self, products: List[Product]): """保存商品信息到数据库""" db = SessionLocal() try: for product in products: # 检查商品是否已存在 db_product = db.query(ProductORM).filter( ProductORM.platform == product.platform, ProductORM.product_id == product.product_id ).first() if db_product: # 更新现有商品 price_changed = db_product.current_price != product.current_price db_product.current_price = product.current_price db_product.original_price = product.original_price or db_product.original_price db_product.availability = product.availability db_product.last_updated = datetime.now() # 如果价格变化,添加历史记录 if price_changed and product.current_price: price_history = PriceHistoryORM( product_id=db_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Updated product: {product.title}") else: # 添加新商品 new_product = ProductORM( platform=product.platform.value, product_id=product.product_id, url=str(product.url), title=product.title, description=product.description, current_price=product.current_price, original_price=product.original_price, currency=product.currency, availability=product.availability, image_url=product.image_url, category=product.category ) db.add(new_product) db.flush() # 获取新商品的ID # 添加初始价格记录 if product.current_price: price_history = PriceHistoryORM( product_id=new_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Added new product: {product.title}") db.commit() except Exception as e: db.rollback() logger.error(f"Error saving to database: {str(e)}") finally: db.close() async def run_continuous_monitoring(self, interval_hours: int = 6): """持续监控任务""" logger.info(f"Starting continuous monitoring with {interval_hours} hour interval") while True: try: # 筛选需要检查的任务 now = datetime.now() tasks_to_check = [ task for task in self.tracking_tasks if task.last_checked is None or (now - task.last_checked).total_seconds() >= task.check_interval * 3600 ] if tasks_to_check: logger.info(f"Checking {len(tasks_to_check)} products...") results = await self.process_batch(tasks_to_check) # 更新最后检查时间 for task in tasks_to_check: task.last_checked = now logger.info(f"Completed checking {len(results)} products") else: logger.info("No products need checking at this time") # 等待下一个检查周期 await asyncio.sleep(interval_hours * 3600) except KeyboardInterrupt: logger.info("Monitoring stopped by user") break except Exception as e: logger.error(f"Error in monitoring loop: {str(e)}") await asyncio.sleep(300) # 出错后等待5分钟再试

9. 价格分析与预警系统

python

# alerts/price_analyzer.py import numpy as np from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import logging from dataclasses import dataclass from database.database import SessionLocal, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class PriceAlert: product_id: int alert_type: str # "price_drop", "price_increase", "availability_change" message: str threshold: float current_value: float previous_value: float timestamp: datetime class PriceAnalyzer: def __init__(self, price_drop_threshold: float = 0.15): self.price_drop_threshold = price_drop_threshold def analyze_price_history(self, product_id: int, days_back: int = 30) -> List[PriceAlert]: """分析商品价格历史""" alerts = [] db = SessionLocal() try: # 获取价格历史 cutoff_date = datetime.now() - timedelta(days=days_back) price_history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id, PriceHistoryORM.timestamp >= cutoff_date ).order_by(PriceHistoryORM.timestamp.desc()).all() if len(price_history) < 2: return alerts # 获取当前价格和之前的价格 current_price = price_history[0].price previous_price = price_history[1].price if current_price and previous_price: # 计算价格变化百分比 price_change = (current_price - previous_price) / previous_price # 检查价格下降 if price_change <= -self.price_drop_threshold: alerts.append(PriceAlert( product_id=product_id, alert_type="price_drop", message=f"价格下降 {abs(price_change*100):.1f}%", threshold=self.price_drop_threshold, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查价格上涨(可选) if price_change >= 0.10: # 上涨10% alerts.append(PriceAlert( product_id=product_id, alert_type="price_increase", message=f"价格上涨 {price_change*100:.1f}%", threshold=0.10, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查可用性变化 current_availability = price_history[0].availability previous_availability = price_history[1].availability if current_availability != previous_availability: status = "有货" if current_availability else "缺货" alerts.append(PriceAlert( product_id=product_id, alert_type="availability_change", message=f"商品状态变化: 现在{status}", threshold=0, current_value=current_availability, previous_value=previous_availability, timestamp=datetime.now() )) except Exception as e: logger.error(f"Error analyzing price history: {str(e)}") finally: db.close() return alerts def detect_price_patterns(self, prices: List[float]) -> Dict[str, Any]: """检测价格模式""" if len(prices) < 5: return {} prices_array = np.array(prices) # 计算统计信息 stats = { "mean": float(np.mean(prices_array)), "std": float(np.std(prices_array)), "min": float(np.min(prices_array)), "max": float(np.max(prices_array)), "current": float(prices_array[-1]), "is_lowest": prices_array[-1] <= np.min(prices_array[:-1]), "trend": "unknown" } # 检测趋势(简单线性回归) if len(prices_array) >= 3: x = np.arange(len(prices_array)) slope, intercept = np.polyfit(x, prices_array, 1) if slope < -0.01: stats["trend"] = "downward" elif slope > 0.01: stats["trend"] = "upward" else: stats["trend"] = "stable" return stats

10. 主程序与Web界面

python

# main.py import asyncio import uvicorn from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import List import logging from crawlers.scheduler import PriceTrackerScheduler, TrackingTask from models.product import Product, Platform from database.database import get_db, init_db, ProductORM, PriceHistoryORM from config.settings import settings # 初始化FastAPI应用 app = FastAPI(title="Price Tracker API", version="1.0.0") templates = Jinja2Templates(directory="templates") # 全局调度器实例 scheduler = PriceTrackerScheduler(max_concurrent=settings.MAX_CONCURRENT_REQUESTS) @app.on_event("startup") async def startup_event(): """应用启动时初始化数据库和调度器""" init_db() logger.info("Application started") @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request, db: Session = Depends(get_db)): """价格追踪仪表板""" products = db.query(ProductORM).order_by(ProductORM.last_updated.desc()).limit(50).all() return templates.TemplateResponse("dashboard.html", {"request": request, "products": products}) @app.post("/api/track") async def track_product(url: str, platform: Platform): """开始追踪商品""" scheduler.add_tracking_task(url, platform) return {"message": f"Started tracking {url}", "platform": platform} @app.get("/api/products", response_model=List[Product]) async def get_products(db: Session = Depends(get_db)): """获取所有追踪的商品""" products = db.query(ProductORM).all() return products @app.get("/api/price-history/{product_id}") async def get_price_history(product_id: int, db: Session = Depends(get_db)): """获取商品价格历史""" history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id ).order_by(PriceHistoryORM.timestamp.desc()).limit(100).all() return [{ "price": h.price, "timestamp": h.timestamp, "availability": h.availability } for h in history] @app.post("/api/check-now") async def check_now(): """立即执行一次检查""" tasks = scheduler.tracking_tasks.copy() if tasks: results = await scheduler.process_batch(tasks) return {"checked": len(results), "message": "Check completed"} return {"message": "No products to check"} async def main(): """主函数""" # 示例:添加一些初始追踪任务 scheduler.add_tracking_task( "https://www.amazon.com/dp/B08N5WRWNW", # 示例商品 Platform.AMAZON ) scheduler.add_tracking_task( "https://item.taobao.com/item.htm?id=123456789", # 示例商品 Platform.TAOBAO ) # 启动监控循环 monitoring_task = asyncio.create_task( scheduler.run_continuous_monitoring(interval_hours=settings.CHECK_INTERVAL_HOURS) ) # 启动Web服务器 config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) # 同时运行监控和Web服务器 await asyncio.gather( monitoring_task, server.serve() ) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) asyncio.run(main())

高级功能扩展

1. 机器学习价格预测

python

# alerts/predictor.py from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures import numpy as np from typing import List, Optional from datetime import datetime, timedelta class PricePredictor: def __init__(self): self.model = LinearRegression() def predict_future_prices(self, historical_prices: List[float], days_ahead: int = 7) -> List[float]: """预测未来价格""" if len(historical_prices) < 10: return [] # 准备特征矩阵 X = np.arange(len(historical_prices)).reshape(-1, 1) y = np.array(historical_prices) # 添加多项式特征 poly = PolynomialFeatures(degree=2) X_poly = poly.fit_transform(X) # 训练模型 self.model.fit(X_poly, y) # 预测未来价格 future_X = np.arange(len(historical_prices), len(historical_prices) + days_ahead).reshape(-1, 1) future_X_poly = poly.transform(future_X) predictions = self.model.predict(future_X_poly) return predictions.tolist()

2. 代理池集成

python

# utils/proxy_manager.py import aiohttp import asyncio from typing import List, Optional import random class ProxyManager: def __init__(self, proxy_urls: List[str] = None): self.proxies = proxy_urls or [] self.current_index = 0 def get_random_proxy(self) -> Optional[str]: """获取随机代理""" if not self.proxies: return None return random.choice(self.proxies) def get_next_proxy(self) -> Optional[str]: """轮询获取代理""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy async def validate_proxy(self, proxy_url: str) -> bool: """验证代理可用性""" try: async with aiohttp.ClientSession() as session: async with session.get( "https://httpbin.org/ip", proxy=proxy_url, timeout=10 ) as response: return response.status == 200 except: return False async def refresh_proxies(self, api_url: str): """从代理API刷新代理列表""" try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=30) as response: if response.status == 200: data = await response.json() self.proxies = data.get("proxies", []) except Exception as e: print(f"Error refreshing proxies: {e}")

部署与优化建议

1. 部署方案

  • 本地运行:适合个人使用,直接运行main.py

  • Docker容器化:便于部署和扩展

  • 云服务器部署:使用AWS、Google Cloud或阿里云

  • Serverless架构:使用AWS Lambda或Google Cloud Functions

2. 性能优化

  • 使用连接池管理数据库连接

  • 实现缓存机制减少重复请求

  • 分布式爬虫架构处理大规模监控

  • 使用CDN存储静态资源

3. 反爬虫策略应对

  • 随机化请求间隔

  • 使用住宅代理IP

  • 模拟人类浏览行为

  • 定期更换User-Agent

  • 处理验证码(使用OCR或第三方服务)

4. 数据可视化

  • 使用Matplotlib或Plotly生成价格趋势图

  • 构建实时仪表板显示价格变化

  • 发送邮件或短信通知

法律与伦理考虑

  1. 遵守robots.txt:尊重网站的爬虫政策

  2. 限制请求频率:避免对目标网站造成负担

  3. 数据使用限制:仅用于个人或研究目的

  4. 用户隐私保护:不收集用户个人信息

  5. 遵守服务条款:遵守各电商平台的使用条款

结论

本文详细介绍了一个完整的跨平台商品价格追踪系统的设计与实现。通过结合现代Python技术栈,包括Playwright、异步编程、SQLAlchemy等工具,我们构建了一个强大、可扩展的价格监控解决方案。系统不仅能够实时追踪亚马逊、淘宝等主流电商平台的商品价格,还提供了数据分析、价格预测和智能预警功能。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/3 11:27:29

ZeRO十年演进(2015–2025)

ZeRO十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; ZeRO&#xff08;Zero Redundancy Optimizer&#xff09;从2019年Microsoft内部研究的“分布式训练内存优化技术”&#xff0c;到2025年已进化成“万亿级多模态大模型训练标配量子混合精度自进化分片具…

作者头像 李华
网站建设 2026/1/5 13:55:34

从零搭建C++分布式AI调度平台,资深架构师的10年经验总结

第一章&#xff1a;从零构建C分布式AI调度平台的背景与意义随着人工智能模型规模的持续扩大&#xff0c;单机计算资源已无法满足训练与推理任务的需求。分布式架构成为支撑大规模AI任务的核心技术路径。在此背景下&#xff0c;构建一个高效、可扩展且低延迟的AI任务调度平台显得…

作者头像 李华
网站建设 2026/1/3 11:27:18

谐波减速器十年演进(2015–2025)

谐波减速器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年谐波减速器还是“Harmonic Drive&#xff08;HD&#xff09;日本垄断刚性高背隙2–5万元单价”的工业时代&#xff0c;2025年已进化成“中国超薄零背隙纳米级精度一体化关节量子级自愈补偿…

作者头像 李华
网站建设 2026/1/6 21:34:34

JLink驱动开发前置准备:官网下载全流程

从零开始搭建J-Link调试环境&#xff1a;官网驱动下载与配置实战指南 在嵌入式开发的世界里&#xff0c;一个稳定、高效的调试工具链是项目成功的基石。而当你第一次点亮开发板上的LED、烧录进第一段固件时&#xff0c;背后往往离不开那个小小的黑色探针—— J-Link 。 作为…

作者头像 李华
网站建设 2026/1/3 11:24:43

平均分摊账单致餐馆消费升级:成本稀释与博弈下的非理性选择

平均分摊账单致餐馆消费升级&#xff1a;成本稀释与博弈下的非理性选择平均分摊账单&#xff08;AA 制&#xff09;让人们在餐馆花更多钱&#xff0c;核心并非单纯的 “摆阔心理”&#xff0c;而是成本分摊机制扭曲了个体消费决策——AA 制将个人消费的 “实际成本” 稀释为 “…

作者头像 李华
网站建设 2026/1/3 11:24:06

基于RS485模块的no stlink delected现象深度剖析

当RS485“偷走”了你的ST-LINK&#xff1a;一个调试失败背后的系统级真相你有没有经历过这样的时刻&#xff1f;手握STM32开发板&#xff0c;连接好ST-LINK&#xff0c;打开STM32CubeProgrammer——结果弹出一句刺眼的提示&#xff1a;“no stlink delected”。等等&#xff0c…

作者头像 李华