news 2026/2/25 9:54:27

基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

引言:招聘信息聚合的挑战与机遇

在当今数字化招聘时代,求职者常常需要在多个招聘平台间切换搜索,而招聘网站反爬机制日益复杂,传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招聘信息聚合爬虫系统,实现高效、稳定、智能的数据采集。

技术栈概览

  • 异步框架: asyncio + aiohttp + aiomysql

  • 反反爬技术: 动态代理池 + 请求指纹模拟 + 浏览器行为模拟

  • 智能解析: Playwright自动化 + XPath/CSS选择器 + 正则表达式

  • 数据存储: MySQL 8.0 + Redis + 异步数据库操作

  • 去重技术: SimHash算法 + 布隆过滤器 + 文本相似度计算

  • 监控部署: Prometheus + Grafana + Docker容器化

系统架构设计

python

""" 智能招聘信息聚合爬虫系统架构 """ import asyncio import aiohttp import aiomysql from typing import Dict, List, Optional from dataclasses import dataclass from datetime import datetime import hashlib import json @dataclass class JobPosition: """职位数据模型""" id: str title: str company: str salary: str location: str experience: str education: str tags: List[str] description: str source: str url: str publish_time: datetime crawl_time: datetime hash_value: str = None def __post_init__(self): """生成内容哈希值用于去重""" content = f"{self.title}{self.company}{self.description}" self.hash_value = self.generate_simhash(content) @staticmethod def generate_simhash(content: str, bits: int = 64) -> str: """SimHash算法生成文档指纹""" import numpy as np # 分词和哈希(简化版) words = content.split() vector = np.zeros(bits) for word in words: # 生成每个词的哈希 word_hash = bin(int(hashlib.md5(word.encode()).hexdigest(), 16))[2:].zfill(bits) # 加权累加 for i, bit in enumerate(word_hash): vector[i] += 1 if bit == '1' else -1 # 生成SimHash simhash = ''.join(['1' if v > 0 else '0' for v in vector]) return simhash

核心爬虫实现

1. 异步爬虫引擎

python

import asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector from contextlib import asynccontextmanager import random import logging from urllib.parse import urlparse, urljoin import backoff logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AsyncSpiderEngine: """异步爬虫引擎""" def __init__(self, max_concurrent: int = 10, request_timeout: int = 30): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.timeout = ClientTimeout(total=request_timeout) self.session = None self.proxy_pool = ProxyPool() self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] @asynccontextmanager async def create_session(self): """创建aiohttp会话""" connector = TCPConnector(limit=self.max_concurrent, ssl=False) async with aiohttp.ClientSession( connector=connector, timeout=self.timeout, headers=self._get_headers() ) as session: self.session = session yield session def _get_headers(self) -> Dict: """生成随机请求头""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } @backoff.on_exception( backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=3, max_time=30 ) async def fetch(self, url: str, use_proxy: bool = True) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: proxy = await self.proxy_pool.get_proxy() if use_proxy else None async with self.session.get( url, proxy=proxy, headers=self._get_headers(), cookies=self._get_cookies() ) as response: if response.status == 200: content = await response.text() # 更新代理评分 if proxy: await self.proxy_pool.update_score(proxy, True) return content else: logger.warning(f"请求失败: {url}, 状态码: {response.status}") if proxy: await self.proxy_pool.update_score(proxy, False) return None except Exception as e: logger.error(f"请求异常 {url}: {e}") return None def _get_cookies(self) -> Dict: """生成模拟cookies""" return { 'session_id': hashlib.md5(str(random.random()).encode()).hexdigest(), 'user_token': hashlib.md5(str(random.random()).encode()).hexdigest()[:16] }

2. 智能代理池实现

python

import asyncio import aiohttp from typing import List, Dict import random class ProxyPool: """智能代理池管理""" def __init__(self): self.proxies = [] self.proxy_scores = {} self.lock = asyncio.Lock() self.proxy_sources = [ 'http://www.proxy-list.org/', 'https://free-proxy-list.net/', 'http://www.gatherproxy.com/' ] async def initialize(self): """初始化代理池""" await self.refresh_proxies() # 启动定时刷新任务 asyncio.create_task(self._scheduled_refresh()) async def refresh_proxies(self): """刷新代理列表""" async with self.lock: new_proxies = [] for source in self.proxy_sources: proxies = await self._fetch_proxies_from_source(source) new_proxies.extend(proxies) # 验证代理可用性 valid_proxies = await self._validate_proxies(new_proxies) self.proxies = valid_proxies # 初始化分数 for proxy in valid_proxies: self.proxy_scores[proxy] = 100 async def get_proxy(self) -> Optional[str]: """获取高质量代理""" if not self.proxies: await self.refresh_proxies() # 根据分数选择代理(加权随机) weighted_proxies = [] for proxy in self.proxies: weight = self.proxy_scores.get(proxy, 50) weighted_proxies.extend([proxy] * weight) return random.choice(weighted_proxies) if weighted_proxies else None async def update_score(self, proxy: str, success: bool): """更新代理评分""" current_score = self.proxy_scores.get(proxy, 50) if success: new_score = min(current_score + 10, 200) else: new_score = max(current_score - 30, 0) if new_score <= 0: self.proxies.remove(proxy) self.proxy_scores.pop(proxy, None) return self.proxy_scores[proxy] = new_score async def _scheduled_refresh(self): """定时刷新代理""" while True: await asyncio.sleep(3600) # 每小时刷新一次 await self.refresh_proxies()

3. Playwright动态渲染支持

python

import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup class DynamicPageRenderer: """处理JavaScript动态渲染的页面""" def __init__(self): self.browser = None self.context = None async def __aenter__(self): self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.context.close() await self.browser.close() await self.playwright.stop() async def render_page(self, url: str, wait_for_selector: str = None) -> str: """渲染动态页面""" page = await self.context.new_page() # 模拟人类行为 await self._simulate_human_behavior(page) try: await page.goto(url, wait_until='networkidle') if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout=10000) # 随机滚动 await self._random_scroll(page) # 获取渲染后的内容 content = await page.content() return content finally: await page.close() async def _simulate_human_behavior(self, page): """模拟人类浏览行为""" # 随机延迟 await asyncio.sleep(random.uniform(1, 3)) # 随机移动鼠标 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def _random_scroll(self, page): """随机滚动页面""" for _ in range(random.randint(2, 5)): scroll_amount = random.randint(300, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.5, 1.5))

4. 招聘网站解析器

python

class JobSiteParser: """招聘网站解析基类""" def __init__(self): self.spider = AsyncSpiderEngine() self.renderer = DynamicPageRenderer() async def parse_job_list(self, url: str) -> List[Dict]: """解析职位列表页""" raise NotImplementedError async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析职位详情页""" raise NotImplementedError def extract_salary_range(self, salary_text: str) -> Dict: """提取薪资范围""" import re pattern = r'(\d+\.?\d*)[kK]?-?(\d+\.?\d*)?[kK]?' match = re.search(pattern, salary_text) if match: min_salary = float(match.group(1)) * 1000 max_salary = float(match.group(2)) * 1000 if match.group(2) else min_salary return { 'min': min_salary, 'max': max_salary, 'text': salary_text } return {'min': 0, 'max': 0, 'text': salary_text} class BossZhiPinParser(JobSiteParser): """Boss直聘解析器""" async def parse_job_list(self, url: str) -> List[Dict]: """解析Boss直聘列表页""" content = await self.spider.fetch(url) if not content: # 尝试使用动态渲染 async with self.renderer: content = await self.renderer.render_page( url, wait_for_selector='.job-list-box' ) soup = BeautifulSoup(content, 'html.parser') jobs = [] for item in soup.select('.job-card-wrapper'): try: job = { 'title': item.select_one('.job-title').text.strip(), 'company': item.select_one('.company-name').text.strip(), 'salary': item.select_one('.salary').text.strip(), 'location': item.select_one('.job-area').text.strip(), 'experience': item.select_one('.tag-list').text.strip(), 'link': item.select_one('a')['href'], 'source': 'boss_zhipin' } jobs.append(job) except Exception as e: logger.error(f"解析职位项失败: {e}") return jobs async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析Boss直聘详情页""" full_url = f"https://www.zhipin.com{url}" if not url.startswith('http') else url async with self.renderer: content = await self.renderer.render_page( full_url, wait_for_selector='.job-detail' ) soup = BeautifulSoup(content, 'html.parser') try: title = soup.select_one('.job-title').text.strip() company = soup.select_one('.company-name').text.strip() salary = soup.select_one('.salary').text.strip() # 提取其他信息 info_items = soup.select('.job-detail-section-item') location = info_items[0].text.strip() if len(info_items) > 0 else '' experience = info_items[1].text.strip() if len(info_items) > 1 else '' description = soup.select_one('.job-sec-text').text.strip() return JobPosition( id=hashlib.md5(full_url.encode()).hexdigest(), title=title, company=company, salary=salary, location=location, experience=experience, education='', # 可根据需要提取 tags=[], # 可根据需要提取 description=description, source='boss_zhipin', url=full_url, publish_time=datetime.now(), crawl_time=datetime.now() ) except Exception as e: logger.error(f"解析详情页失败 {full_url}: {e}") return None class LagouParser(JobSiteParser): """拉勾网解析器""" # 实现类似Boss直聘的解析逻辑 pass class ZhilianParser(JobSiteParser): """智联招聘解析器""" # 实现类似Boss直聘的解析逻辑 pass

5. 异步数据存储

python

import aiomysql from motor.motor_asyncio import AsyncIOMotorClient from redis import asyncio as aioredis import json class AsyncDataStorage: """异步数据存储管理器""" def __init__(self, mysql_config: Dict, redis_config: Dict, mongo_config: Dict = None): self.mysql_config = mysql_config self.redis_config = redis_config self.mongo_config = mongo_config self.pool = None self.redis = None self.mongo = None async def initialize(self): """初始化数据库连接""" # 初始化MySQL连接池 self.pool = await aiomysql.create_pool(**self.mysql_config) # 初始化Redis连接 self.redis = await aioredis.from_url( f"redis://{self.redis_config['host']}:{self.redis_config['port']}", password=self.redis_config.get('password'), db=self.redis_config.get('db', 0) ) # 初始化MongoDB连接(可选) if self.mongo_config: self.mongo = AsyncIOMotorClient(self.mongo_config['uri']) async def save_job(self, job: JobPosition): """保存职位信息""" # 1. 使用布隆过滤器去重 if await self.is_duplicate(job.hash_value): logger.info(f"检测到重复职位: {job.title}") return False async with self.pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ INSERT INTO jobs ( id, title, company, salary, location, experience, education, description, source, url, publish_time, crawl_time, hash_value ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE crawl_time = VALUES(crawl_time) """ await cursor.execute(sql, ( job.id, job.title, job.company, job.salary, job.location, job.experience, job.education, job.description, job.source, job.url, job.publish_time, job.crawl_time, job.hash_value )) await conn.commit() # 2. 添加到布隆过滤器 await self.redis.setbit('job_bloom_filter', int(job.hash_value[:8], 16) % (2**20), 1) # 3. 缓存到Redis cache_key = f"job:{job.id}" await self.redis.setex( cache_key, 3600 * 24, # 缓存24小时 json.dumps(job.__dict__, default=str) ) return True async def is_duplicate(self, hash_value: str) -> bool: """使用布隆过滤器检查是否重复""" # 简单的布隆过滤器实现 position = int(hash_value[:8], 16) % (2**20) result = await self.redis.getbit('job_bloom_filter', position) return bool(result) async def close(self): """关闭数据库连接""" if self.pool: self.pool.close() await self.pool.wait_closed() if self.redis: await self.redis.close()

6. 主调度程序

python

class JobSpiderScheduler: """爬虫调度器""" def __init__(self): self.parsers = { 'boss_zhipin': BossZhiPinParser(), 'lagou': LagouParser(), 'zhilian': ZhilianParser() } self.storage = AsyncDataStorage( mysql_config={ 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'job_spider', 'minsize': 1, 'maxsize': 10 }, redis_config={ 'host': 'localhost', 'port': 6379, 'password': '', 'db': 0 } ) self.task_queue = asyncio.Queue() self.results = [] async def run(self): """运行爬虫""" logger.info("开始运行招聘信息聚合爬虫...") # 初始化存储 await self.storage.initialize() # 定义爬取任务 tasks = [ ('boss_zhipin', 'https://www.zhipin.com/web/geek/job?query=python&city=101010100'), ('lagou', 'https://www.lagou.com/jobs/list_python?city=北京'), ('zhilian', 'https://sou.zhaopin.com/?jl=北京&kw=python') ] # 启动消费者任务 consumer_tasks = [ asyncio.create_task(self._consumer()) for _ in range(5) # 5个并发消费者 ] # 添加任务到队列 for task in tasks: await self.task_queue.put(task) # 等待所有任务完成 await self.task_queue.join() # 取消消费者任务 for consumer in consumer_tasks: consumer.cancel() # 关闭存储连接 await self.storage.close() logger.info(f"爬虫完成,共采集 {len(self.results)} 个职位") async def _consumer(self): """消费者:处理爬取任务""" while True: try: source, url = await self.task_queue.get() parser = self.parsers.get(source) if parser: # 解析列表页 jobs = await parser.parse_job_list(url) for job_info in jobs[:10]: # 限制每个站点爬取数量 # 解析详情页 job = await parser.parse_job_detail(job_info['link']) if job: # 保存到数据库 success = await self.storage.save_job(job) if success: self.results.append(job) logger.info(f"成功保存职位: {job.title}") # 礼貌延迟 await asyncio.sleep(random.uniform(1, 3)) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"处理任务失败: {e}") self.task_queue.task_done() async def main(): """主函数""" scheduler = JobSpiderScheduler() try: await scheduler.run() except KeyboardInterrupt: logger.info("收到中断信号,优雅退出...") except Exception as e: logger.error(f"爬虫运行异常: {e}") finally: logger.info("爬虫结束") if __name__ == "__main__": # 创建数据库表(一次性执行) create_table_sql = """ CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, title VARCHAR(255) NOT NULL, company VARCHAR(255) NOT NULL, salary VARCHAR(100), location VARCHAR(100), experience VARCHAR(100), education VARCHAR(100), description TEXT, source VARCHAR(50), url VARCHAR(500), publish_time DATETIME, crawl_time DATETIME, hash_value VARCHAR(128), INDEX idx_title (title(100)), INDEX idx_company (company(100)), INDEX idx_source (source), INDEX idx_crawl_time (crawl_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """ # 运行爬虫 asyncio.run(main())

高级功能扩展

1. 机器学习去重优化

python

from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartDeduplicator: """智能去重器""" def __init__(self): self.vectorizer = TfidfVectorizer(max_features=5000) self.job_vectors = [] self.job_ids = [] def calculate_similarity(self, text1: str, text2: str) -> float: """计算文本相似度""" vectors = self.vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0] return similarity def is_similar_job(self, new_job: JobPosition, threshold: float = 0.8) -> bool: """判断是否为相似职位""" for job_id, vector in zip(self.job_ids, self.job_vectors): similarity = cosine_similarity( self.vectorizer.transform([new_job.description]), vector )[0][0] if similarity > threshold: return True return False

2. 反爬策略监控

python

class AntiAntiSpiderMonitor: """反反爬监控器""" def __init__(self): self.request_count = 0 self.blocked_count = 0 self.success_count = 0 async def monitor_request(self, url: str, success: bool): """监控请求状态""" self.request_count += 1 if success: self.success_count += 1 else: self.blocked_count += 1 # 计算成功率 success_rate = self.success_count / max(self.request_count, 1) # 如果成功率过低,触发警报 if success_rate < 0.3: logger.warning(f"爬虫被频繁拦截,成功率: {success_rate:.2%}") await self.adjust_strategy() async def adjust_strategy(self): """调整爬取策略""" logger.info("调整爬取策略:增加延迟、更换代理、更换User-Agent") # 实现策略调整逻辑

部署与监控

Docker部署配置

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]

Prometheus监控配置

yaml

# prometheus.yml scrape_configs: - job_name: 'job_spider' static_configs: - targets: ['localhost:9091']
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/18 4:15:10

零基础教程:Ubuntu SSH远程登录图文详解

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 请生成一个面向Linux新手的Ubuntu SSH配置教程脚本&#xff0c;要求&#xff1a;1. 每个步骤都有清晰的echo输出说明&#xff1b;2. 包含错误检测和友好提示&#xff1b;3. 提供测…

作者头像 李华
网站建设 2026/2/18 18:45:57

小白也能懂:三步完成MGeo地址相似度API部署

小白也能懂&#xff1a;三步完成MGeo地址相似度API部署 作为一名前端工程师&#xff0c;最近接到一个任务&#xff1a;为公司CRM系统添加智能地址去重功能。面对这个需求&#xff0c;我完全不懂Python和机器学习&#xff0c;但通过MGeo地址相似度模型&#xff0c;我找到了一个简…

作者头像 李华
网站建设 2026/2/22 9:56:46

FR-4与铝基板选型关键看这3点

问&#xff1a;在 PCB 选型时&#xff0c;FR-4 和铝基板最核心的差异是什么&#xff1f;该怎么根据性能选&#xff1f;作为 PCB 技术专家&#xff0c;答案很明确&#xff1a;两者的核心差异集中在导热性、电气性能和机械强度上&#xff0c;选型的关键是匹配产品的功率需求、信号…

作者头像 李华
网站建设 2026/2/7 12:05:28

通过智能工具与写作策略结合:深度提升学术写作效率的权威指南

工具核心特点速览 工具名称 核心优势 适用场景 数据支撑 aibiye 全流程覆盖降重优化 从开题到答辩的一站式需求 支持20万字长文逻辑连贯 aicheck 院校规范适配模板化输出 国内本硕博论文框架搭建 覆盖90%高校格式要求 秒篇 3分钟文献综述生成 紧急补文献章节 知…

作者头像 李华