引言:招聘信息聚合的挑战与机遇
在当今数字化招聘时代,求职者常常需要在多个招聘平台间切换搜索,而招聘网站反爬机制日益复杂,传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招聘信息聚合爬虫系统,实现高效、稳定、智能的数据采集。
技术栈概览
异步框架: asyncio + aiohttp + aiomysql
反反爬技术: 动态代理池 + 请求指纹模拟 + 浏览器行为模拟
智能解析: Playwright自动化 + XPath/CSS选择器 + 正则表达式
数据存储: MySQL 8.0 + Redis + 异步数据库操作
去重技术: SimHash算法 + 布隆过滤器 + 文本相似度计算
监控部署: Prometheus + Grafana + Docker容器化
系统架构设计
python
""" 智能招聘信息聚合爬虫系统架构 """ import asyncio import aiohttp import aiomysql from typing import Dict, List, Optional from dataclasses import dataclass from datetime import datetime import hashlib import json @dataclass class JobPosition: """职位数据模型""" id: str title: str company: str salary: str location: str experience: str education: str tags: List[str] description: str source: str url: str publish_time: datetime crawl_time: datetime hash_value: str = None def __post_init__(self): """生成内容哈希值用于去重""" content = f"{self.title}{self.company}{self.description}" self.hash_value = self.generate_simhash(content) @staticmethod def generate_simhash(content: str, bits: int = 64) -> str: """SimHash算法生成文档指纹""" import numpy as np # 分词和哈希(简化版) words = content.split() vector = np.zeros(bits) for word in words: # 生成每个词的哈希 word_hash = bin(int(hashlib.md5(word.encode()).hexdigest(), 16))[2:].zfill(bits) # 加权累加 for i, bit in enumerate(word_hash): vector[i] += 1 if bit == '1' else -1 # 生成SimHash simhash = ''.join(['1' if v > 0 else '0' for v in vector]) return simhash核心爬虫实现
1. 异步爬虫引擎
python
import asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector from contextlib import asynccontextmanager import random import logging from urllib.parse import urlparse, urljoin import backoff logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AsyncSpiderEngine: """异步爬虫引擎""" def __init__(self, max_concurrent: int = 10, request_timeout: int = 30): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.timeout = ClientTimeout(total=request_timeout) self.session = None self.proxy_pool = ProxyPool() self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] @asynccontextmanager async def create_session(self): """创建aiohttp会话""" connector = TCPConnector(limit=self.max_concurrent, ssl=False) async with aiohttp.ClientSession( connector=connector, timeout=self.timeout, headers=self._get_headers() ) as session: self.session = session yield session def _get_headers(self) -> Dict: """生成随机请求头""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } @backoff.on_exception( backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=3, max_time=30 ) async def fetch(self, url: str, use_proxy: bool = True) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: proxy = await self.proxy_pool.get_proxy() if use_proxy else None async with self.session.get( url, proxy=proxy, headers=self._get_headers(), cookies=self._get_cookies() ) as response: if response.status == 200: content = await response.text() # 更新代理评分 if proxy: await self.proxy_pool.update_score(proxy, True) return content else: logger.warning(f"请求失败: {url}, 状态码: {response.status}") if proxy: await self.proxy_pool.update_score(proxy, False) return None except Exception as e: logger.error(f"请求异常 {url}: {e}") return None def _get_cookies(self) -> Dict: """生成模拟cookies""" return { 'session_id': hashlib.md5(str(random.random()).encode()).hexdigest(), 'user_token': hashlib.md5(str(random.random()).encode()).hexdigest()[:16] }2. 智能代理池实现
python
import asyncio import aiohttp from typing import List, Dict import random class ProxyPool: """智能代理池管理""" def __init__(self): self.proxies = [] self.proxy_scores = {} self.lock = asyncio.Lock() self.proxy_sources = [ 'http://www.proxy-list.org/', 'https://free-proxy-list.net/', 'http://www.gatherproxy.com/' ] async def initialize(self): """初始化代理池""" await self.refresh_proxies() # 启动定时刷新任务 asyncio.create_task(self._scheduled_refresh()) async def refresh_proxies(self): """刷新代理列表""" async with self.lock: new_proxies = [] for source in self.proxy_sources: proxies = await self._fetch_proxies_from_source(source) new_proxies.extend(proxies) # 验证代理可用性 valid_proxies = await self._validate_proxies(new_proxies) self.proxies = valid_proxies # 初始化分数 for proxy in valid_proxies: self.proxy_scores[proxy] = 100 async def get_proxy(self) -> Optional[str]: """获取高质量代理""" if not self.proxies: await self.refresh_proxies() # 根据分数选择代理(加权随机) weighted_proxies = [] for proxy in self.proxies: weight = self.proxy_scores.get(proxy, 50) weighted_proxies.extend([proxy] * weight) return random.choice(weighted_proxies) if weighted_proxies else None async def update_score(self, proxy: str, success: bool): """更新代理评分""" current_score = self.proxy_scores.get(proxy, 50) if success: new_score = min(current_score + 10, 200) else: new_score = max(current_score - 30, 0) if new_score <= 0: self.proxies.remove(proxy) self.proxy_scores.pop(proxy, None) return self.proxy_scores[proxy] = new_score async def _scheduled_refresh(self): """定时刷新代理""" while True: await asyncio.sleep(3600) # 每小时刷新一次 await self.refresh_proxies()3. Playwright动态渲染支持
python
import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup class DynamicPageRenderer: """处理JavaScript动态渲染的页面""" def __init__(self): self.browser = None self.context = None async def __aenter__(self): self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.context.close() await self.browser.close() await self.playwright.stop() async def render_page(self, url: str, wait_for_selector: str = None) -> str: """渲染动态页面""" page = await self.context.new_page() # 模拟人类行为 await self._simulate_human_behavior(page) try: await page.goto(url, wait_until='networkidle') if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout=10000) # 随机滚动 await self._random_scroll(page) # 获取渲染后的内容 content = await page.content() return content finally: await page.close() async def _simulate_human_behavior(self, page): """模拟人类浏览行为""" # 随机延迟 await asyncio.sleep(random.uniform(1, 3)) # 随机移动鼠标 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def _random_scroll(self, page): """随机滚动页面""" for _ in range(random.randint(2, 5)): scroll_amount = random.randint(300, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.5, 1.5))4. 招聘网站解析器
python
class JobSiteParser: """招聘网站解析基类""" def __init__(self): self.spider = AsyncSpiderEngine() self.renderer = DynamicPageRenderer() async def parse_job_list(self, url: str) -> List[Dict]: """解析职位列表页""" raise NotImplementedError async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析职位详情页""" raise NotImplementedError def extract_salary_range(self, salary_text: str) -> Dict: """提取薪资范围""" import re pattern = r'(\d+\.?\d*)[kK]?-?(\d+\.?\d*)?[kK]?' match = re.search(pattern, salary_text) if match: min_salary = float(match.group(1)) * 1000 max_salary = float(match.group(2)) * 1000 if match.group(2) else min_salary return { 'min': min_salary, 'max': max_salary, 'text': salary_text } return {'min': 0, 'max': 0, 'text': salary_text} class BossZhiPinParser(JobSiteParser): """Boss直聘解析器""" async def parse_job_list(self, url: str) -> List[Dict]: """解析Boss直聘列表页""" content = await self.spider.fetch(url) if not content: # 尝试使用动态渲染 async with self.renderer: content = await self.renderer.render_page( url, wait_for_selector='.job-list-box' ) soup = BeautifulSoup(content, 'html.parser') jobs = [] for item in soup.select('.job-card-wrapper'): try: job = { 'title': item.select_one('.job-title').text.strip(), 'company': item.select_one('.company-name').text.strip(), 'salary': item.select_one('.salary').text.strip(), 'location': item.select_one('.job-area').text.strip(), 'experience': item.select_one('.tag-list').text.strip(), 'link': item.select_one('a')['href'], 'source': 'boss_zhipin' } jobs.append(job) except Exception as e: logger.error(f"解析职位项失败: {e}") return jobs async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析Boss直聘详情页""" full_url = f"https://www.zhipin.com{url}" if not url.startswith('http') else url async with self.renderer: content = await self.renderer.render_page( full_url, wait_for_selector='.job-detail' ) soup = BeautifulSoup(content, 'html.parser') try: title = soup.select_one('.job-title').text.strip() company = soup.select_one('.company-name').text.strip() salary = soup.select_one('.salary').text.strip() # 提取其他信息 info_items = soup.select('.job-detail-section-item') location = info_items[0].text.strip() if len(info_items) > 0 else '' experience = info_items[1].text.strip() if len(info_items) > 1 else '' description = soup.select_one('.job-sec-text').text.strip() return JobPosition( id=hashlib.md5(full_url.encode()).hexdigest(), title=title, company=company, salary=salary, location=location, experience=experience, education='', # 可根据需要提取 tags=[], # 可根据需要提取 description=description, source='boss_zhipin', url=full_url, publish_time=datetime.now(), crawl_time=datetime.now() ) except Exception as e: logger.error(f"解析详情页失败 {full_url}: {e}") return None class LagouParser(JobSiteParser): """拉勾网解析器""" # 实现类似Boss直聘的解析逻辑 pass class ZhilianParser(JobSiteParser): """智联招聘解析器""" # 实现类似Boss直聘的解析逻辑 pass5. 异步数据存储
python
import aiomysql from motor.motor_asyncio import AsyncIOMotorClient from redis import asyncio as aioredis import json class AsyncDataStorage: """异步数据存储管理器""" def __init__(self, mysql_config: Dict, redis_config: Dict, mongo_config: Dict = None): self.mysql_config = mysql_config self.redis_config = redis_config self.mongo_config = mongo_config self.pool = None self.redis = None self.mongo = None async def initialize(self): """初始化数据库连接""" # 初始化MySQL连接池 self.pool = await aiomysql.create_pool(**self.mysql_config) # 初始化Redis连接 self.redis = await aioredis.from_url( f"redis://{self.redis_config['host']}:{self.redis_config['port']}", password=self.redis_config.get('password'), db=self.redis_config.get('db', 0) ) # 初始化MongoDB连接(可选) if self.mongo_config: self.mongo = AsyncIOMotorClient(self.mongo_config['uri']) async def save_job(self, job: JobPosition): """保存职位信息""" # 1. 使用布隆过滤器去重 if await self.is_duplicate(job.hash_value): logger.info(f"检测到重复职位: {job.title}") return False async with self.pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ INSERT INTO jobs ( id, title, company, salary, location, experience, education, description, source, url, publish_time, crawl_time, hash_value ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE crawl_time = VALUES(crawl_time) """ await cursor.execute(sql, ( job.id, job.title, job.company, job.salary, job.location, job.experience, job.education, job.description, job.source, job.url, job.publish_time, job.crawl_time, job.hash_value )) await conn.commit() # 2. 添加到布隆过滤器 await self.redis.setbit('job_bloom_filter', int(job.hash_value[:8], 16) % (2**20), 1) # 3. 缓存到Redis cache_key = f"job:{job.id}" await self.redis.setex( cache_key, 3600 * 24, # 缓存24小时 json.dumps(job.__dict__, default=str) ) return True async def is_duplicate(self, hash_value: str) -> bool: """使用布隆过滤器检查是否重复""" # 简单的布隆过滤器实现 position = int(hash_value[:8], 16) % (2**20) result = await self.redis.getbit('job_bloom_filter', position) return bool(result) async def close(self): """关闭数据库连接""" if self.pool: self.pool.close() await self.pool.wait_closed() if self.redis: await self.redis.close()6. 主调度程序
python
class JobSpiderScheduler: """爬虫调度器""" def __init__(self): self.parsers = { 'boss_zhipin': BossZhiPinParser(), 'lagou': LagouParser(), 'zhilian': ZhilianParser() } self.storage = AsyncDataStorage( mysql_config={ 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'job_spider', 'minsize': 1, 'maxsize': 10 }, redis_config={ 'host': 'localhost', 'port': 6379, 'password': '', 'db': 0 } ) self.task_queue = asyncio.Queue() self.results = [] async def run(self): """运行爬虫""" logger.info("开始运行招聘信息聚合爬虫...") # 初始化存储 await self.storage.initialize() # 定义爬取任务 tasks = [ ('boss_zhipin', 'https://www.zhipin.com/web/geek/job?query=python&city=101010100'), ('lagou', 'https://www.lagou.com/jobs/list_python?city=北京'), ('zhilian', 'https://sou.zhaopin.com/?jl=北京&kw=python') ] # 启动消费者任务 consumer_tasks = [ asyncio.create_task(self._consumer()) for _ in range(5) # 5个并发消费者 ] # 添加任务到队列 for task in tasks: await self.task_queue.put(task) # 等待所有任务完成 await self.task_queue.join() # 取消消费者任务 for consumer in consumer_tasks: consumer.cancel() # 关闭存储连接 await self.storage.close() logger.info(f"爬虫完成,共采集 {len(self.results)} 个职位") async def _consumer(self): """消费者:处理爬取任务""" while True: try: source, url = await self.task_queue.get() parser = self.parsers.get(source) if parser: # 解析列表页 jobs = await parser.parse_job_list(url) for job_info in jobs[:10]: # 限制每个站点爬取数量 # 解析详情页 job = await parser.parse_job_detail(job_info['link']) if job: # 保存到数据库 success = await self.storage.save_job(job) if success: self.results.append(job) logger.info(f"成功保存职位: {job.title}") # 礼貌延迟 await asyncio.sleep(random.uniform(1, 3)) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"处理任务失败: {e}") self.task_queue.task_done() async def main(): """主函数""" scheduler = JobSpiderScheduler() try: await scheduler.run() except KeyboardInterrupt: logger.info("收到中断信号,优雅退出...") except Exception as e: logger.error(f"爬虫运行异常: {e}") finally: logger.info("爬虫结束") if __name__ == "__main__": # 创建数据库表(一次性执行) create_table_sql = """ CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, title VARCHAR(255) NOT NULL, company VARCHAR(255) NOT NULL, salary VARCHAR(100), location VARCHAR(100), experience VARCHAR(100), education VARCHAR(100), description TEXT, source VARCHAR(50), url VARCHAR(500), publish_time DATETIME, crawl_time DATETIME, hash_value VARCHAR(128), INDEX idx_title (title(100)), INDEX idx_company (company(100)), INDEX idx_source (source), INDEX idx_crawl_time (crawl_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """ # 运行爬虫 asyncio.run(main())高级功能扩展
1. 机器学习去重优化
python
from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartDeduplicator: """智能去重器""" def __init__(self): self.vectorizer = TfidfVectorizer(max_features=5000) self.job_vectors = [] self.job_ids = [] def calculate_similarity(self, text1: str, text2: str) -> float: """计算文本相似度""" vectors = self.vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0] return similarity def is_similar_job(self, new_job: JobPosition, threshold: float = 0.8) -> bool: """判断是否为相似职位""" for job_id, vector in zip(self.job_ids, self.job_vectors): similarity = cosine_similarity( self.vectorizer.transform([new_job.description]), vector )[0][0] if similarity > threshold: return True return False
2. 反爬策略监控
python
class AntiAntiSpiderMonitor: """反反爬监控器""" def __init__(self): self.request_count = 0 self.blocked_count = 0 self.success_count = 0 async def monitor_request(self, url: str, success: bool): """监控请求状态""" self.request_count += 1 if success: self.success_count += 1 else: self.blocked_count += 1 # 计算成功率 success_rate = self.success_count / max(self.request_count, 1) # 如果成功率过低,触发警报 if success_rate < 0.3: logger.warning(f"爬虫被频繁拦截,成功率: {success_rate:.2%}") await self.adjust_strategy() async def adjust_strategy(self): """调整爬取策略""" logger.info("调整爬取策略:增加延迟、更换代理、更换User-Agent") # 实现策略调整逻辑部署与监控
Docker部署配置
dockerfile
# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]
Prometheus监控配置
yaml
# prometheus.yml scrape_configs: - job_name: 'job_spider' static_configs: - targets: ['localhost:9091']