news 2026/4/6 0:15:43

基于异步并发与WebSocket的A股实时行情数据抓取:从原理到高并发实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于异步并发与WebSocket的A股实时行情数据抓取:从原理到高并发实战

一、引言:实时数据抓取在量化交易中的战略意义

在当今高速发展的金融科技领域,股票实时数据抓取已成为量化交易、风险管理和投资决策的基石。与传统的历史数据分析不同,实时数据流能够捕捉市场微观结构变化,为高频交易、算法策略提供关键输入。本文将深入探讨如何利用Python最新技术栈构建高并发、低延迟的股票实时数据抓取系统,涵盖从基础HTTP请求到高级WebSocket连接的全方位解决方案。

二、技术架构演进:从同步到异步的范式转变

2.1 传统同步抓取的局限性

传统的requests+BeautifulSoup组合虽然简单易用,但在实时数据场景下暴露明显缺陷:

  • 阻塞式I/O导致并发性能低下

  • 难以维持持久连接接收数据流

  • 频繁请求易触发反爬机制

2.2 现代异步技术栈优势

  • asyncio/aiohttp: 基于事件循环的异步HTTP客户端

  • WebSocket: 双向全双工通信协议,适合实时数据流

  • Playwright: 新一代自动化测试工具,可处理动态渲染页面

  • Apache Kafka/RabbitMQ: 实时数据流处理与分发

三、环境配置与依赖安装

python

# requirements.txt # 核心异步网络库 aiohttp>=3.9.0 websockets>=12.0 httpx>=0.25.0 # 数据解析与处理 pandas>=2.0.0 numpy>=1.24.0 polars>=0.19.0 # 高性能DataFrame库 # 浏览器自动化(处理JS渲染) playwright>=1.40.0 selenium>=4.15.0 # 数据存储 sqlalchemy>=2.0.0 redis>=5.0.0 pymongo>=4.5.0 # 消息队列与流处理 kafka-python>=2.0.0 pika>=1.3.0 # 其他工具 websocket-client>=1.6.0 yfinance>=0.2.0 # Yahoo Finance备用方案

安装命令:

bash

pip install -r requirements.txt playwright install # 安装浏览器驱动

四、实战案例一:基于异步HTTP的多源并发抓取

python

""" 多数据源异步并发实时股票数据抓取系统 支持:新浪财经、东方财富、腾讯财经等主流API """ import asyncio import aiohttp import pandas as pd import json import time from datetime import datetime from typing import Dict, List, Optional import hashlib from dataclasses import dataclass, asdict import redis from sqlalchemy import create_engine, Column, String, Float, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import signal import sys # 配置类 @dataclass class StockConfig: """股票配置参数""" SYMBOLS: List[str] = None UPDATE_INTERVAL: float = 1.0 # 更新频率(秒) TIMEOUT: int = 10 MAX_RETRIES: int = 3 PROXY: Optional[str] = None def __post_init__(self): if self.SYMBOLS is None: self.SYMBOLS = [ 'sh000001', # 上证指数 'sz399001', # 深证成指 'sz399006', # 创业板指 'sh600519', # 贵州茅台 'sz000858', # 五粮液 'sh601318', # 中国平安 ] # Redis缓存管理器 class RedisCache: def __init__(self, host='localhost', port=6379, db=0): self.redis_client = redis.Redis( host=host, port=port, db=db, decode_responses=True, socket_keepalive=True ) async def set_quote(self, symbol: str, data: dict, expire: int = 60): """缓存股票行情数据""" key = f"stock:quote:{symbol}" self.redis_client.hset(key, mapping=data) self.redis_client.expire(key, expire) async def get_quote(self, symbol: str) -> Optional[dict]: """获取缓存的行情数据""" key = f"stock:quote:{symbol}" data = self.redis_client.hgetall(key) return data if data else None # 数据库模型 Base = declarative_base() class StockQuote(Base): """股票行情数据库模型""" __tablename__ = 'stock_quotes' id = Column(String(50), primary_key=True) symbol = Column(String(20), index=True) name = Column(String(50)) current = Column(Float) change = Column(Float) change_percent = Column(Float) volume = Column(Float) amount = Column(Float) timestamp = Column(DateTime) created_at = Column(DateTime, default=datetime.now) @classmethod def generate_id(cls, symbol: str, timestamp: datetime) -> str: """生成唯一ID""" return hashlib.md5(f"{symbol}_{timestamp.timestamp()}".encode()).hexdigest() # 异步HTTP客户端 class AsyncStockFetcher: def __init__(self, config: StockConfig): self.config = config self.session = None self.cache = RedisCache() self.engine = create_engine('sqlite:///stock_data.db') Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) async def __aenter__(self): connector = aiohttp.TCPConnector( limit=100, # 最大连接数 limit_per_host=20, # 每主机最大连接数 ttl_dns_cache=300 # DNS缓存时间 ) self.session = aiohttp.ClientSession( connector=connector, timeout=aiohttp.ClientTimeout(total=self.config.TIMEOUT) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _parse_sina_data(self, data: str) -> dict: """解析新浪财经实时数据""" try: # 新浪API返回格式: var hq_str_sh600519="茅台,1825.00,2.50,0.14,1825.50,1824.00,..." data = data.split('"')[1] parts = data.split(',') if len(parts) < 32: return None return { 'name': parts[0], 'open': float(parts[1]), 'pre_close': float(parts[2]), 'current': float(parts[3]), 'high': float(parts[4]), 'low': float(parts[5]), 'volume': int(parts[8]), 'amount': float(parts[9]), 'bid1': float(parts[10]), 'ask1': float(parts[20]), 'timestamp': datetime.now() } except Exception as e: print(f"解析新浪数据错误: {e}") return None def _parse_eastmoney_data(self, json_data: dict) -> dict: """解析东方财富实时数据""" try: data = json_data.get('data', {}).get('realtime', {}) return { 'name': data.get('f14', ''), 'current': data.get('f2', 0), 'change': data.get('f4', 0), 'change_percent': data.get('f3', 0), 'volume': data.get('f5', 0), 'amount': data.get('f6', 0), 'high': data.get('f15', 0), 'low': data.get('f16', 0), 'open': data.get('f17', 0), 'pre_close': data.get('f18', 0), 'timestamp': datetime.now() } except Exception as e: print(f"解析东方财富数据错误: {e}") return None async def fetch_sina_quote(self, symbol: str) -> Optional[dict]: """从新浪财经获取实时数据""" url = f"http://hq.sinajs.cn/list={symbol}" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://finance.sina.com.cn/', 'Accept-Encoding': 'gzip, deflate' } for attempt in range(self.config.MAX_RETRIES): try: async with self.session.get(url, headers=headers, proxy=self.config.PROXY) as response: if response.status == 200: text = await response.text() data = self._parse_sina_data(text) if data: data['symbol'] = symbol data['source'] = 'sina' return data except (aiohttp.ClientError, asyncio.TimeoutError) as e: print(f"新浪请求失败 {symbol} (尝试 {attempt+1}/{self.config.MAX_RETRIES}): {e}") await asyncio.sleep(1) return None async def fetch_eastmoney_quote(self, symbol: str) -> Optional[dict]: """从东方财富获取实时数据""" # 转换股票代码格式 if symbol.startswith('sh'): exchange = '1' elif symbol.startswith('sz'): exchange = '0' else: return None code = symbol[2:] secid = f"{exchange}.{code}" url = f"https://push2.eastmoney.com/api/qt/stock/get" params = { 'secid': secid, 'ut': 'fa5fd1943c7b386f172d6893dbfba10b', 'fields': 'f2,f3,f4,f5,f6,f14,f15,f16,f17,f18', 'invt': '2', 'fltt': '2' } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://quote.eastmoney.com/', 'Origin': 'https://quote.eastmoney.com' } for attempt in range(self.config.MAX_RETRIES): try: async with self.session.get(url, params=params, headers=headers, proxy=self.config.PROXY) as response: if response.status == 200: json_data = await response.json() data = self._parse_eastmoney_data(json_data) if data: data['symbol'] = symbol data['source'] = 'eastmoney' return data except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError) as e: print(f"东方财富请求失败 {symbol} (尝试 {attempt+1}/{self.config.MAX_RETRIES}): {e}") await asyncio.sleep(1) return None async def save_to_database(self, data: dict): """保存数据到数据库""" try: session = self.Session() quote = StockQuote( id=StockQuote.generate_id(data['symbol'], data['timestamp']), symbol=data['symbol'], name=data.get('name', ''), current=data.get('current', 0), change=data.get('change', 0), change_percent=data.get('change_percent', 0), volume=data.get('volume', 0), amount=data.get('amount', 0), timestamp=data['timestamp'] ) session.add(quote) session.commit() session.close() except Exception as e: print(f"数据库保存错误: {e}") async def fetch_all_quotes(self) -> Dict[str, dict]: """并发获取所有股票数据""" tasks = [] for symbol in self.config.SYMBOLS: # 同时从多个数据源获取,选择最快响应的 task_sina = asyncio.create_task(self.fetch_sina_quote(symbol)) task_eastmoney = asyncio.create_task(self.fetch_eastmoney_quote(symbol)) tasks.extend([task_sina, task_eastmoney]) # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果,优先使用新浪数据 quotes = {} sina_results = {} eastmoney_results = {} for i, result in enumerate(results): symbol_index = i // 2 symbol = self.config.SYMBOLS[symbol_index] if isinstance(result, Exception): continue if result: if result['source'] == 'sina': sina_results[symbol] = result else: eastmoney_results[symbol] = result # 合并结果,新浪优先 for symbol in self.config.SYMBOLS: quotes[symbol] = sina_results.get(symbol) or eastmoney_results.get(symbol) return quotes async def continuous_fetch(self): """持续获取数据""" print(f"开始实时数据抓取,监控{len(self.config.SYMBOLS)}只股票...") while True: start_time = time.time() try: # 获取所有报价 quotes = await self.fetch_all_quotes() # 处理并保存数据 save_tasks = [] for symbol, data in quotes.items(): if data: # 缓存数据 await self.cache.set_quote(symbol, data) # 异步保存到数据库 save_task = asyncio.create_task(self.save_to_database(data)) save_tasks.append(save_task) # 打印实时数据 print(f"[{data['timestamp'].strftime('%H:%M:%S')}] " f"{symbol} {data.get('name', '')}: " f"{data.get('current', 0):.2f} " f"({data.get('change_percent', 0):+.2f}%)") # 等待所有保存任务完成 await asyncio.gather(*save_tasks, return_exceptions=True) except Exception as e: print(f"抓取循环错误: {e}") # 控制更新频率 elapsed = time.time() - start_time sleep_time = max(0, self.config.UPDATE_INTERVAL - elapsed) await asyncio.sleep(sleep_time) # 信号处理 def signal_handler(signum, frame): print("\n接收到退出信号,优雅关闭...") sys.exit(0) # 主函数 async def main(): # 设置信号处理 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 配置 config = StockConfig( SYMBOLS=['sh000001', 'sz399001', 'sh600519', 'sz000858'], UPDATE_INTERVAL=2.0, MAX_RETRIES=3 ) async with AsyncStockFetcher(config) as fetcher: await fetcher.continuous_fetch() if __name__ == "__main__": # 运行异步主函数 asyncio.run(main())

五、实战案例二:基于WebSocket的实时数据流处理

python

""" 基于WebSocket的股票实时行情订阅系统 连接多个交易所WebSocket API,实现低延迟数据接收 """ import asyncio import websockets import json import hashlib import zlib from datetime import datetime from typing import Dict, List, Callable, Any import pandas as pd import numpy as np from collections import deque import struct import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class WebSocketClient: """WebSocket客户端基类""" def __init__(self, uri: str, reconnect_interval: int = 5): self.uri = uri self.reconnect_interval = reconnect_interval self.ws = None self.connected = False self.subscriptions = set() self.message_handlers = [] self.ping_interval = 30 self.last_pong = None async def connect(self): """连接WebSocket服务器""" while not self.connected: try: logger.info(f"尝试连接到 {self.uri}") self.ws = await websockets.connect( self.uri, ping_interval=self.ping_interval, ping_timeout=self.ping_interval * 2, close_timeout=10 ) self.connected = True logger.info(f"成功连接到 {self.uri}") # 重新订阅 if self.subscriptions: await self._resubscribe() except Exception as e: logger.error(f"连接失败: {e}") await asyncio.sleep(self.reconnect_interval) async def _resubscribe(self): """重新订阅所有主题""" for subscription in self.subscriptions: await self.subscribe(subscription) async def subscribe(self, topic: str): """订阅主题""" self.subscriptions.add(topic) if self.connected: # 具体实现在子类中 pass async def unsubscribe(self, topic: str): """取消订阅""" self.subscriptions.discard(topic) if self.connected: # 具体实现在子类中 pass async def send_message(self, message: Any): """发送消息""" if self.connected and self.ws: try: if isinstance(message, dict): message = json.dumps(message) await self.ws.send(message) except Exception as e: logger.error(f"发送消息失败: {e}") self.connected = False async def receive_messages(self): """接收消息""" while True: try: if not self.connected: await self.connect() message = await self.ws.recv() await self._handle_message(message) except websockets.exceptions.ConnectionClosed as e: logger.error(f"连接关闭: {e}") self.connected = False await asyncio.sleep(self.reconnect_interval) except Exception as e: logger.error(f"接收消息错误: {e}") await asyncio.sleep(1) async def _handle_message(self, message: Any): """处理接收到的消息""" for handler in self.message_handlers: try: await handler(message) except Exception as e: logger.error(f"消息处理错误: {e}") def add_message_handler(self, handler: Callable): """添加消息处理器""" self.message_handlers.append(handler) async def close(self): """关闭连接""" if self.ws: await self.ws.close() self.connected = False class BinanceWebSocketClient(WebSocketClient): """币安WebSocket客户端""" def __init__(self, symbols: List[str] = None): uri = "wss://stream.binance.com:9443/ws" super().__init__(uri) if symbols is None: symbols = ['btcusdt', 'ethusdt', 'bnbusdt'] self.symbols = symbols self.stream_names = [] async def subscribe(self, symbol: str): """订阅币安交易对""" stream_name = f"{symbol}@trade" self.stream_names.append(stream_name) subscribe_msg = { "method": "SUBSCRIBE", "params": [stream_name], "id": 1 } await self.send_message(subscribe_msg) logger.info(f"订阅 {stream_name}") async def _resubscribe(self): """重新订阅所有交易对""" if self.stream_names: subscribe_msg = { "method": "SUBSCRIBE", "params": self.stream_names, "id": 1 } await self.send_message(subscribe_msg) @staticmethod def parse_trade_data(message: str) -> Dict: """解析交易数据""" data = json.loads(message) if 'e' in data and data['e'] == 'trade': return { 'exchange': 'binance', 'symbol': data['s'], 'price': float(data['p']), 'quantity': float(data['q']), 'trade_time': datetime.fromtimestamp(data['T'] / 1000), 'is_buyer_maker': data['m'], 'trade_id': data['t'] } return None class ShanghaiStockExchangeWebSocket(WebSocketClient): """上海证券交易所WebSocket客户端(模拟)""" def __init__(self): # 注意:实际SSE WebSocket需要官方API权限 # 这里使用模拟数据演示 uri = "wss://api.example.com/sse" # 模拟URI super().__init__(uri) # 数据包结构定义 self.header_format = '!HHII' self.header_size = struct.calcsize(self.header_format) async def connect(self): """模拟连接""" logger.info("连接到上海证券交易所WebSocket (模拟)") self.connected = True async def receive_messages(self): """模拟接收消息""" while True: if not self.connected: await asyncio.sleep(self.reconnect_interval) continue # 模拟生成股票数据 await asyncio.sleep(0.5) mock_data = self._generate_mock_data() await self._handle_message(json.dumps(mock_data)) def _generate_mock_data(self) -> Dict: """生成模拟的股票数据""" symbols = ['SH600519', 'SH601318', 'SH600036'] symbol = np.random.choice(symbols) return { 'exchange': 'SSE', 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'price': round(100 + np.random.randn() * 10, 2), 'volume': np.random.randint(100, 10000), 'bid_price': round(99 + np.random.randn() * 10, 2), 'ask_price': round(101 + np.random.randn() * 10, 2), 'bid_volume': np.random.randint(1, 100), 'ask_volume': np.random.randint(1, 100) } @staticmethod def parse_market_data(message: str) -> Dict: """解析市场数据""" try: data = json.loads(message) return data except: return None class WebSocketManager: """WebSocket管理器,管理多个连接""" def __init__(self): self.clients = [] self.data_buffer = deque(maxlen=10000) self.running = False def add_client(self, client: WebSocketClient): """添加客户端""" self.clients.append(client) async def start_all(self): """启动所有客户端""" self.running = True tasks = [] for client in self.clients: task = asyncio.create_task(client.receive_messages()) tasks.append(task) # 添加数据处理器 client.add_message_handler(self.handle_data) logger.info(f"启动 {len(self.clients)} 个WebSocket连接") try: await asyncio.gather(*tasks) except asyncio.CancelledError: logger.info("WebSocket管理器被取消") finally: await self.stop_all() async def handle_data(self, message: str): """处理接收到的数据""" try: # 尝试解析为JSON data = json.loads(message) data['received_at'] = datetime.now().isoformat() # 添加到缓冲区 self.data_buffer.append(data) # 实时打印 if 'symbol' in data and 'price' in data: logger.info(f"{data.get('exchange', 'Unknown')} " f"{data['symbol']}: {data['price']}") except json.JSONDecodeError: # 可能是二进制数据或其他格式 logger.debug(f"接收非JSON消息: {message[:100]}...") def get_recent_data(self, n: int = 100) -> List[Dict]: """获取最近n条数据""" return list(self.data_buffer)[-n:] def get_dataframe(self) -> pd.DataFrame: """转换为DataFrame""" return pd.DataFrame(self.data_buffer) async def stop_all(self): """停止所有客户端""" self.running = False for client in self.clients: await client.close() logger.info("所有WebSocket连接已关闭") class DataProcessor: """实时数据处理器""" def __init__(self, buffer_size: int = 1000): self.buffer = deque(maxlen=buffer_size) self.indicators = {} def add_data(self, data: Dict): """添加数据到处理器""" self.buffer.append(data) self._update_indicators(data) def _update_indicators(self, data: Dict): """更新技术指标""" symbol = data.get('symbol') price = data.get('price') if symbol and price: if symbol not in self.indicators: self.indicators[symbol] = { 'prices': deque(maxlen=20), 'volumes': deque(maxlen=20), 'last_update': datetime.now() } symbol_data = self.indicators[symbol] symbol_data['prices'].append(price) symbol_data['volumes'].append(data.get('volume', 0)) symbol_data['last_update'] = datetime.now() # 计算简单移动平均 if len(symbol_data['prices']) >= 10: prices = list(symbol_data['prices']) sma_10 = sum(prices[-10:]) / 10 symbol_data['sma_10'] = sma_10 def get_symbol_indicators(self, symbol: str) -> Dict: """获取股票技术指标""" return self.indicators.get(symbol, {}) def detect_anomalies(self, data: Dict) -> List[str]: """检测数据异常""" anomalies = [] symbol = data.get('symbol') price = data.get('price') if symbol in self.indicators: symbol_data = self.indicators[symbol] # 检查价格异常波动 if 'sma_10' in symbol_data and price: sma_10 = symbol_data['sma_10'] deviation = abs(price - sma_10) / sma_10 if deviation > 0.05: # 5%偏差 anomalies.append(f"价格异常波动: {deviation:.2%}") return anomalies async def main_websocket(): """WebSocket主函数""" logger.info("启动WebSocket实时数据系统") # 创建管理器 manager = WebSocketManager() # 创建数据处理器 processor = DataProcessor() # 创建并添加客户端 # 1. 币安客户端(加密货币) binance_client = BinanceWebSocketClient(['btcusdt', 'ethusdt']) manager.add_client(binance_client) # 2. 上交所客户端(模拟) sse_client = ShanghaiStockExchangeWebSocket() manager.add_client(sse_client) # 添加自定义处理器 async def custom_handler(message: str): try: data = json.loads(message) processor.add_data(data) # 检测异常 anomalies = processor.detect_anomalies(data) if anomalies: logger.warning(f"异常检测: {data.get('symbol')} - {', '.join(anomalies)}") except Exception as e: logger.error(f"自定义处理错误: {e}") # 为所有客户端添加处理器 for client in manager.clients: client.add_message_handler(custom_handler) # 启动监控任务 monitor_task = asyncio.create_task(monitor_system(manager, processor)) try: # 启动所有WebSocket连接 await manager.start_all() except KeyboardInterrupt: logger.info("接收到键盘中断信号") finally: monitor_task.cancel() await manager.stop_all() async def monitor_system(manager: WebSocketManager, processor: DataProcessor): """系统监控任务""" while True: await asyncio.sleep(10) # 打印系统状态 recent_data = manager.get_recent_data(5) logger.info(f"系统状态 - 缓冲区大小: {len(manager.data_buffer)}") logger.info(f"最近数据: {recent_data}") # 打印技术指标 for symbol in processor.indicators.keys(): indicators = processor.get_symbol_indicators(symbol) if 'sma_10' in indicators: logger.info(f"{symbol} SMA(10): {indicators['sma_10']:.2f}") if __name__ == "__main__": # 运行WebSocket系统 asyncio.run(main_websocket())

六、高级功能:数据质量监控与异常检测

python

""" 数据质量监控与异常检测系统 确保抓取数据的准确性和完整性 """ import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional import numpy as np from scipy import stats import logging class DataQualityMonitor: """数据质量监控器""" def __init__(self): self.metrics = {} self.alerts = [] def record_metric(self, data_source: str, metric_name: str, value: float): """记录质量指标""" if data_source not in self.metrics: self.metrics[data_source] = {} if metric_name not in self.metrics[data_source]: self.metrics[data_source][metric_name] = { 'values': [], 'timestamps': [], 'stats': {} } metric_data = self.metrics[data_source][metric_name] metric_data['values'].append(value) metric_data['timestamps'].append(datetime.now()) # 保留最近1000个值 if len(metric_data['values']) > 1000: metric_data['values'].pop(0) metric_data['timestamps'].pop(0) # 更新统计信息 self._update_stats(data_source, metric_name) def _update_stats(self, data_source: str, metric_name: str): """更新统计信息""" values = self.metrics[data_source][metric_name]['values'] if len(values) < 10: return stats_dict = { 'mean': np.mean(values), 'std': np.std(values), 'min': np.min(values), 'max': np.max(values), 'median': np.median(values), 'last_updated': datetime.now() } self.metrics[data_source][metric_name]['stats'] = stats_dict def check_anomaly(self, data_source: str, metric_name: str, value: float) -> bool: """检查异常值""" if data_source not in self.metrics: return False if metric_name not in self.metrics[data_source]: return False stats_dict = self.metrics[data_source][metric_name].get('stats', {}) if not stats_dict: return False mean = stats_dict.get('mean', 0) std = stats_dict.get('std', 0) if std == 0: return False # 使用Z-score检测异常 z_score = abs(value - mean) / std # Z-score大于3视为异常 if z_score > 3: alert_msg = (f"数据源 '{data_source}' 指标 '{metric_name}' 异常: " f"值={value:.4f}, 均值={mean:.4f}, Z-score={z_score:.2f}") self.alerts.append({ 'timestamp': datetime.now(), 'message': alert_msg, 'severity': 'HIGH' }) logging.warning(alert_msg) return True return False def get_quality_report(self) -> Dict: """获取质量报告""" report = { 'timestamp': datetime.now(), 'data_sources': {}, 'alerts': self.alerts[-10:], # 最近10个警报 'overall_score': 0 } total_score = 0 source_count = 0 for source, metrics in self.metrics.items(): source_score = 0 metric_count = 0 for metric_name, metric_data in metrics.items(): stats_dict = metric_data.get('stats', {}) if stats_dict: # 基于变异系数计算稳定性分数 mean = stats_dict.get('mean', 0) std = stats_dict.get('std', 0) if mean != 0: cv = std / abs(mean) # 变异系数越小,分数越高 metric_score = max(0, 100 * (1 - min(cv, 1))) source_score += metric_score metric_count += 1 if metric_count > 0: source_score /= metric_count report['data_sources'][source] = { 'score': round(source_score, 2), 'metric_count': metric_count } total_score += source_score source_count += 1 if source_count > 0: report['overall_score'] = round(total_score / source_count, 2) return report # 使用示例 async def monitor_data_quality(): """监控数据质量""" monitor = DataQualityMonitor() # 模拟数据流 while True: # 模拟从不同数据源获取数据 for source in ['sina', 'eastmoney', 'tencent']: # 模拟响应时间(毫秒) response_time = np.random.exponential(100) # 记录响应时间指标 monitor.record_metric(source, 'response_time', response_time) # 检查异常 monitor.check_anomaly(source, 'response_time', response_time) # 模拟数据完整性(0-100%) completeness = np.random.normal(98, 1) monitor.record_metric(source, 'completeness', completeness) # 每10秒生成报告 await asyncio.sleep(10) report = monitor.get_quality_report() print(f"\n=== 数据质量报告 ===") print(f"时间: {report['timestamp']}") print(f"总体分数: {report['overall_score']}/100") for source, info in report['data_sources'].items(): print(f"{source}: {info['score']}/100") if report['alerts']: print(f"\n=== 最近警报 ===") for alert in report['alerts']: print(f"[{alert['severity']}] {alert['message']}")

七、性能优化与最佳实践

7.1 连接池管理

python

import aiohttp from aiohttp import ClientSession, TCPConnector from asyncio import Semaphore class ConnectionPool: def __init__(self, max_connections=100): self.semaphore = Semaphore(max_connections) self.session = None async def get_session(self): if not self.session: connector = TCPConnector( limit=100, limit_per_host=20, ttl_dns_cache=300, enable_cleanup_closed=True ) self.session = ClientSession(connector=connector) return self.session

7.2 内存优化

python

import tracemalloc from memory_profiler import profile class MemoryOptimizedProcessor: def __init__(self): self.data_cache = {} self.max_cache_size = 10000 def add_data_with_memory_check(self, data): """添加数据时检查内存使用""" if len(self.data_cache) >= self.max_cache_size: # 移除最旧的数据 oldest_key = next(iter(self.data_cache)) del self.data_cache[oldest_key] self.data_cache[data['id']] = data @profile def process_large_dataset(self, dataset): """处理大数据集时的内存分析""" # 使用生成器减少内存占用 for item in self._process_streaming(dataset): yield item def _process_streaming(self, dataset): """流式处理数据""" for data in dataset: # 处理逻辑 processed = self._process_item(data) yield processed # 及时清理引用 del data

7.3 错误恢复与重试机制

python

import tenacity from tenacity import retry, stop_after_attempt, wait_exponential class ResilientFetcher: @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=10), retry=tenacity.retry_if_exception_type( (aiohttp.ClientError, asyncio.TimeoutError) ) ) async def fetch_with_retry(self, url): """带指数退避的重试机制""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

八、部署与监控

8.1 Docker容器化部署

dockerfile

# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 健康检查 HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8080/health')" # 运行应用 CMD ["python", "main.py"]

8.2 Prometheus监控配置

yaml

# prometheus.yml scrape_configs: - job_name: 'stock_crawler' static_configs: - targets: ['localhost:9090'] metrics_path: '/metrics'

九、总结

本文详细介绍了使用Python最新技术栈构建股票实时数据抓取系统的完整方案,涵盖:

  1. 异步并发抓取:利用asyncio/aiohttp实现高并发HTTP请求

  2. WebSocket实时流:建立持久连接接收实时数据

  3. 多数据源融合:整合多个数据源提高数据可靠性

  4. 数据质量监控:实时检测数据异常和质量问题

  5. 性能优化:内存管理、连接池、错误恢复等最佳实践

  6. 生产部署:容器化、监控和自动化运维

关键技术亮点:

  • 使用异步编程提高I/O效率

  • 实现WebSocket长连接减少延迟

  • 设计数据质量监控体系

  • 提供完整的错误处理和恢复机制

  • 优化内存使用和性能表现

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/31 1:35:06

QQ空间历史说说完整备份指南:永久保存你的青春记忆

QQ空间历史说说完整备份指南&#xff1a;永久保存你的青春记忆 【免费下载链接】GetQzonehistory 获取QQ空间发布的历史说说 项目地址: https://gitcode.com/GitHub_Trending/ge/GetQzonehistory 还在为那些承载着珍贵回忆的QQ空间说说可能丢失而烦恼吗&#xff1f;那些…

作者头像 李华
网站建设 2026/4/3 6:01:27

告别无效检索:我用LangExtract + Milvus升级 RAG 管道的实战复盘

今天我们聊下Google 的新开源库 LangExtract。虽然他已经开源了一段时间。但这段时间我一直在实际项目里用它&#xff0c;踩了不少坑&#xff0c;也总结了一些经验。所以&#xff0c;这篇文章不打算讲太多理论&#xff0c;咱们直接上代码&#xff0c;聊实践。 如果你和我一样&a…

作者头像 李华
网站建设 2026/4/5 17:58:50

iOS系统深度定制完全指南:Cowabunga Lite全方位操作手册

iOS系统深度定制完全指南&#xff1a;Cowabunga Lite全方位操作手册 【免费下载链接】CowabungaLite iOS 15 Customization Toolbox 项目地址: https://gitcode.com/gh_mirrors/co/CowabungaLite 想要让你的iPhone界面焕然一新却不想冒险越狱&#xff1f;iOS系统个性化定…

作者头像 李华
网站建设 2026/3/27 12:24:16

如何用League Director制作专业级英雄联盟回放视频

如何用League Director制作专业级英雄联盟回放视频 【免费下载链接】leaguedirector League Director is a tool for staging and recording videos from League of Legends replays 项目地址: https://gitcode.com/gh_mirrors/le/leaguedirector 想要制作出媲美职业联赛…

作者头像 李华
网站建设 2026/3/31 14:39:51

如何用R语言正确生成并报告交叉验证结果(附完整代码模板)

第一章&#xff1a;R语言交叉验证结果的核心概念在机器学习与统计建模中&#xff0c;交叉验证是评估模型泛化能力的关键技术。R语言提供了多种工具来实现并分析交叉验证结果&#xff0c;其核心在于将数据划分为训练集与测试集的多个组合&#xff0c;从而系统性地评估模型稳定性…

作者头像 李华