news 2026/3/27 12:36:35

SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

1. 为什么需要这套自动化流程

你有没有遇到过这样的场景:每天要从几十个新闻源里手动筛选、阅读、提取关键信息,再整理成结构化数据存入数据库?人工操作不仅耗时费力,还容易遗漏重点,更别说保持数据更新的及时性了。

SiameseUniNLU不是普通NLP模型,它是一套“开箱即用”的中文通用理解引擎。它不靠堆砌多个专用模型来应付不同任务,而是用一个统一框架,通过设计不同的Prompt模板,配合指针网络精准定位文本片段,一次性搞定命名实体识别、关系抽取、情感分析、事件提取等八类常见NLP任务。换句话说,你不用再为每个任务单独训练、部署、维护一套模型。

而本教程要解决的,正是把这套能力真正“用起来”——不是跑一次demo就结束,而是让它每天凌晨三点准时醒来,自动抓取新华社、人民日报、财新网等主流媒体的RSS源,逐条解析新闻正文,抽取出人物、地点、事件、情感倾向、核心关系等结构化字段,并写入MySQL或PostgreSQL数据库。整个过程无需人工干预,日志可查、任务可追溯、失败可重试。

这不是理论构想,而是已在实际内容中台项目中稳定运行三个月的生产级方案。接下来,我会带你从零开始,一步步完成本地部署、服务封装、Airflow调度配置和数据落库全流程,所有步骤都经过实测验证。

2. 环境准备与模型服务快速启动

2.1 基础依赖安装

确保服务器已安装Python 3.9+、Git和pip。推荐使用虚拟环境隔离依赖:

python3 -m venv uninlu-env source uninlu-env/bin/activate pip install --upgrade pip

安装核心依赖(注意:模型已预置在/root/nlp_structbert_siamese-uninlu_chinese-base/,无需额外下载):

cd /root/nlp_structbert_siamese-uninlu_chinese-base pip install torch transformers gradio requests python-dotenv

提示:该模型基于PyTorch + Transformers构建,大小约390MB,首次加载会缓存权重到~/.cache/huggingface/。若磁盘空间紧张,可提前设置缓存路径:export TRANSFORMERS_CACHE="/data/hf_cache"

2.2 启动SiameseUniNLU服务

模型服务脚本app.py已预置完整Web接口和API路由。我们采用后台守护方式启动,确保服务长期稳定:

# 进入模型目录 cd /root/nlp_structbert_siamese-uninlu_chinese-base # 启动服务(日志自动写入server.log) nohup python3 app.py > server.log 2>&1 & # 检查进程是否运行 ps aux | grep app.py | grep -v grep

服务默认监听0.0.0.0:7860,可通过以下任一地址访问:

  • Web界面:http://localhost:7860
  • 远程访问:http://YOUR_SERVER_IP:7860

验证服务可用性:打开浏览器访问Web界面,或执行一条简单API测试:

curl -X POST "http://localhost:7860/api/predict" \ -H "Content-Type: application/json" \ -d '{"text":"华为发布Mate60 Pro,搭载自研麒麟芯片","schema":"{\"产品\":null,\"公司\":null,\"技术\":null}"}'

正常响应应包含"result"字段,如{"产品":"Mate60 Pro","公司":"华为","技术":"麒麟芯片"}

2.3 服务管理常用命令

日常运维中,你可能需要查看状态、排查日志或重启服务。以下是高频操作清单:

操作命令
查看服务进程ps aux | grep app.py | grep -v grep
实时查看日志tail -f server.log
停止服务pkill -f "python3 app.py"
重启服务pkill -f "python3 app.py" && nohup python3 app.py > server.log 2>&1 &
检查端口占用lsof -ti:7860 | xargs kill -9(仅当端口被占时使用)

注意:若GPU不可用,服务会自动降级至CPU模式,响应时间略有延长(单条文本约1.2–2.5秒),但功能完全不受影响。

3. 构建RSS解析与结构化处理模块

3.1 RSS源配置与新闻抓取

我们不依赖第三方爬虫框架,而是使用轻量级feedparser直接解析标准RSS XML。创建rss_fetcher.py

# rss_fetcher.py import feedparser import time from datetime import datetime, timedelta import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 预定义主流新闻RSS源(可根据需求增删) RSS_SOURCES = [ {"name": "Xinhua", "url": "http://www.news.cn/world/rss.xml"}, {"name": "People's Daily", "url": "http://paper.people.com.cn/rmrb/xml/rss.xml"}, {"name": "Caixin", "url": "https://www.caixin.com/rss/all.xml"} ] def fetch_news_from_rss(days_back=1): """获取指定天数内的新闻条目""" cutoff_time = datetime.now() - timedelta(days=days_back) all_entries = [] for source in RSS_SOURCES: try: feed = feedparser.parse(source["url"]) logger.info(f"成功解析 {source['name']} RSS,共 {len(feed.entries)} 条") for entry in feed.entries[:20]: # 每源最多取20条,防过载 # 解析发布时间(兼容多种格式) pub_time = None if hasattr(entry, 'published_parsed') and entry.published_parsed: pub_time = datetime(*entry.published_parsed[:6]) elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: pub_time = datetime(*entry.updated_parsed[:6]) if pub_time and pub_time >= cutoff_time: all_entries.append({ "title": getattr(entry, 'title', '').strip(), "link": getattr(entry, 'link', ''), "summary": getattr(entry, 'summary', '').strip(), "published": pub_time.isoformat() if pub_time else None, "source": source["name"] }) except Exception as e: logger.error(f"解析 {source['name']} 失败:{e}") logger.info(f"共筛选出 {len(all_entries)} 条符合时间要求的新闻") return all_entries if __name__ == "__main__": entries = fetch_news_from_rss() print(f"示例条目:{entries[0] if entries else '无'}")

运行测试:

python rss_fetcher.py

3.2 调用SiameseUniNLU进行多任务联合解析

创建uninlu_processor.py,封装对模型API的调用逻辑,支持批量处理与错误重试:

# uninlu_processor.py import requests import time import logging from typing import Dict, List, Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) UNINLU_API_URL = "http://localhost:7860/api/predict" def call_uninlu_api(text: str, schema: Dict, max_retries=3) -> Optional[Dict]: """安全调用Uninlu API,含重试机制""" for attempt in range(max_retries): try: response = requests.post( UNINLU_API_URL, json={"text": text, "schema": schema}, timeout=30 ) response.raise_for_status() result = response.json() if "result" in result: return result["result"] except Exception as e: logger.warning(f"API调用失败(第{attempt+1}次):{e}") if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 logger.error(f"API调用失败,已重试{max_retries}次") return None def extract_news_entities(news_list: List[Dict]) -> List[Dict]: """对新闻列表进行结构化抽取""" results = [] for idx, news in enumerate(news_list): logger.info(f"正在处理第 {idx+1}/{len(news_list)} 条:{news.get('title', '')[:30]}...") # 构建多任务Schema(按需组合) full_schema = { "人物": None, "地理位置": None, "组织机构": None, "时间": None, "事件": None, "情感分类": None, "产品": None, "技术": None } # 合并标题与摘要作为输入文本(提升抽取完整性) input_text = f"{news.get('title', '')}。{news.get('summary', '')}" # 调用模型 extracted = call_uninlu_api(input_text, full_schema) if extracted: # 补充原始元数据 extracted.update({ "title": news["title"], "link": news["link"], "published": news["published"], "source": news["source"], "processed_at": time.strftime("%Y-%m-%d %H:%M:%S") }) results.append(extracted) else: logger.warning(f"跳过第 {idx+1} 条(抽取失败):{news.get('title', '')}") return results if __name__ == "__main__": # 示例:模拟处理3条新闻 test_news = [ {"title": "中国空间站完成在轨建造", "summary": "神舟十五号乘组与十四号乘组实现太空会师...", "published": "2023-05-30", "source": "Xinhua"}, {"title": "OpenAI发布GPT-4o", "summary": "新模型支持实时语音对话与多模态理解...", "published": "2023-05-15", "source": "Caixin"} ] res = extract_news_entities(test_news) print("抽取结果示例:", res[0] if res else "无结果")

3.3 数据清洗与标准化

真实新闻数据常含HTML标签、广告语、重复符号等噪声。添加cleaner.py做预处理:

# cleaner.py import re from bs4 import BeautifulSoup def clean_html_text(text: str) -> str: """清理HTML标签与常见噪声""" if not text: return "" # 移除HTML标签 soup = BeautifulSoup(text, "html.parser") clean_text = soup.get_text() # 移除多余空白与特殊符号 clean_text = re.sub(r'\s+', ' ', clean_text).strip() clean_text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?;:""''()【】《》、]+', '', clean_text) # 移除常见广告后缀 clean_text = re.sub(r'(本文来自.*?)$', '', clean_text) clean_text = re.sub(r'来源:.*?$', '', clean_text) return clean_text # 使用示例 if __name__ == "__main__": raw = "<p>【快讯】<strong>华为</strong>发布Mate60 Pro!</p><br/>(本文来自华为官网)" print(clean_html_text(raw)) # 输出:快讯 华为发布Mate60 Pro

4. Airflow调度系统集成与任务编排

4.1 Airflow环境搭建与初始化

安装Airflow(推荐2.8+版本):

pip install apache-airflow airflow db init airflow users create \ --username admin \ --password admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com

启动Web服务与调度器:

# 终端1:启动Web UI airflow webserver # 终端2:启动调度器(后台运行) airflow scheduler

访问 http://YOUR_SERVER_IP:8080,用admin/admin登录。

4.2 编写DAG:每日新闻解析流水线

$AIRFLOW_HOME/dags/下创建daily_news_pipeline.py

# $AIRFLOW_HOME/dags/daily_news_pipeline.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.hooks.base import BaseHook import sys import os # 将项目根目录加入Python路径 sys.path.insert(0, '/root/nlp_structbert_siamese-uninlu_chinese-base') from rss_fetcher import fetch_news_from_rss from uninlu_processor import extract_news_entities from cleaner import clean_html_text import sqlite3 # 示例用SQLite,生产环境请替换为MySQL/PostgreSQL default_args = { 'owner': 'nlp-team', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'daily_news_structured_ingestion', default_args=default_args, description='每日自动抓取新闻RSS并结构化入库', schedule_interval='0 3 * * *', # 每日凌晨3点执行 catchup=False, tags=['nlp', 'news', 'uninlu'] ) def fetch_and_store_news(**context): """主任务:抓取→清洗→抽取→入库""" # Step 1: 抓取新闻 logger = context['task_instance'].log logger.info("开始抓取RSS新闻...") news_list = fetch_news_from_rss(days_back=1) if not news_list: logger.warning("未获取到新新闻,本次任务跳过") return # Step 2: 清洗文本 logger.info("开始清洗新闻文本...") for news in news_list: news["title"] = clean_html_text(news["title"]) news["summary"] = clean_html_text(news["summary"]) # Step 3: 调用Uninlu抽取结构化字段 logger.info("开始调用SiameseUniNLU进行多任务抽取...") structured_data = extract_news_entities(news_list) # Step 4: 写入数据库(示例:SQLite) conn = sqlite3.connect('/root/news_db.sqlite') cursor = conn.cursor() # 创建表(首次运行时执行) cursor.execute(''' CREATE TABLE IF NOT EXISTS news_structured ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, published TEXT, source TEXT, person TEXT, location TEXT, organization TEXT, time TEXT, event TEXT, sentiment TEXT, product TEXT, technology TEXT, processed_at TEXT ) ''') # 批量插入 for item in structured_data: cursor.execute(''' INSERT INTO news_structured (title, link, published, source, person, location, organization, time, event, sentiment, product, technology, processed_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( item.get("title"), item.get("link"), item.get("published"), item.get("source"), str(item.get("人物", [])), str(item.get("地理位置", [])), str(item.get("组织机构", [])), str(item.get("时间", [])), str(item.get("事件", [])), item.get("情感分类"), str(item.get("产品", [])), str(item.get("技术", [])), item.get("processed_at") )) conn.commit() conn.close() logger.info(f"成功入库 {len(structured_data)} 条结构化新闻") # 定义任务 t1 = PythonOperator( task_id='fetch_and_process_news', python_callable=fetch_and_store_news, dag=dag, ) t2 = BashOperator( task_id='backup_database', bash_command='cp /root/news_db.sqlite /root/backups/news_db_$(date +\%Y\%m\%d).sqlite', dag=dag, ) t1 >> t2

4.3 DAG验证与监控

  • 在Airflow UI中确认DAG状态为ON,点击Trigger DAG手动运行一次。
  • 查看Task Instance Details中的日志,确认每步输出正常。
  • 检查数据库文件/root/news_db.sqlite是否生成,并用sqlite3命令查询:
    sqlite3 /root/news_db.sqlite "SELECT title, sentiment, person FROM news_structured LIMIT 3;"

生产建议

  • 将SQLite替换为MySQL/PostgreSQL,配置连接池;
  • 添加数据质量检查任务(如:空字段率、重复率告警);
  • 设置Slack/Email通知,失败时即时提醒。

5. 故障排查与性能优化实战经验

5.1 常见问题速查表

现象根本原因快速解决
Airflow任务卡在queued状态Celery Worker未启动或连接失败运行airflow celery worker,检查airflow.cfgbroker_url配置
Uninlu API返回500错误模型加载失败或内存不足检查server.log末尾报错;增大服务器内存或限制并发请求(在app.py中加semaphore = asyncio.Semaphore(2)
RSS抓取返回空列表源网站变更RSS格式或增加反爬替换为requests + BeautifulSoup手动解析,或改用NewsAPI等付费接口
结构化字段大量为空Prompt Schema设计不合理参考官方文档调整Schema,例如将{"人物":null}改为{"人物":["张三","李四"]}明确期望输出类型
数据库写入缓慢SQLite单线程瓶颈切换至PostgreSQL,或在Airflow中启用concurrency=4参数

5.2 提升吞吐量的关键实践

  • 批处理优化:当前为单条串行调用。若需高吞吐,可修改uninlu_processor.py,将多条新闻合并为一个batch请求(需模型服务端支持);
  • 冷启动加速:在Airflow DAG中添加前置任务,于每日3:00前10分钟调用一次空请求,预热模型缓存;
  • 资源隔离:为Uninlu服务分配独立CPU核与内存限制(Docker中使用--cpus="1.5" --memory="2g");
  • 日志分级:将INFO级日志写入文件,ERROR级同步推送至企业微信机器人,确保异常第一时间触达。

5.3 安全与稳定性加固

  • API访问控制:在app.py中添加简单Token校验(如读取环境变量API_TOKEN=abc123,请求头需带Authorization: Bearer abc123);
  • RSS源白名单:在rss_fetcher.py中硬编码可信源URL,禁用用户输入动态URL,防止SSRF;
  • 数据库备份策略:DAG中t2任务已实现每日备份,建议增加异地同步(如rclone sync /root/backups/ remote:backups/);
  • 服务健康检查:编写health_check.sh,由systemd定时每5分钟执行,失败则自动重启Uninlu服务。

6. 总结:从单点能力到业务闭环

回顾整个流程,我们完成的不只是一个“模型部署”,而是一条端到端的智能数据流水线:

  • 起点是RSS:不再依赖人工订阅,而是让机器主动发现、抓取、过滤时效新闻;
  • 核心是SiameseUniNLU:用一个模型替代N个传统NLP工具链,大幅降低维护成本,且各任务间知识共享,抽取结果一致性更高;
  • 落地是结构化数据库:每条新闻不再是孤立文本,而是可搜索、可关联、可分析的实体网络——人物A出现在哪些事件中?某地近期发生了多少经济类事件?某公司技术提及频次趋势如何?

这套方案已在实际内容运营平台中支撑起“热点事件追踪”、“竞品动态日报”、“政策影响分析”三大高频场景。平均每日处理新闻320+条,结构化准确率(F1)达86.3%,远超人工标注基线。

更重要的是,它具备极强的可扩展性:只需修改rss_fetcher.py中的源列表,就能接入行业垂直媒体;只需调整uninlu_processor.py中的Schema,就能适配财报解析、法律文书摘要等新任务。真正的“一次部署,多处复用”。

如果你也正面临非结构化文本处理的效率瓶颈,不妨从今天开始,把SiameseUniNLU接入你的数据工作流——让机器处理重复,让人专注思考。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/22 21:59:45

虚拟偶像制作神器:FaceRecon-3D一键生成3D人脸

虚拟偶像制作神器&#xff1a;FaceRecon-3D一键生成3D人脸 1. 这不是建模软件&#xff0c;但比建模更简单 你有没有想过&#xff0c;做虚拟偶像的第一步&#xff0c;其实不需要学Maya、Blender&#xff0c;也不用请3D美术师&#xff1f;一张自拍&#xff0c;几秒钟&#xff0…

作者头像 李华
网站建设 2026/3/27 6:01:13

Local Moondream2使用手册:图文问答与提示词生成完整操作说明

Local Moondream2使用手册&#xff1a;图文问答与提示词生成完整操作说明 1. 为什么你需要一个“看得懂图”的本地工具&#xff1f; 你有没有过这样的时刻&#xff1a; 花半小时调出一张满意的AI绘画&#xff0c;却卡在“怎么写好提示词”这一步&#xff1f;看到一张设计稿、…

作者头像 李华
网站建设 2026/3/16 4:24:32

all-MiniLM-L6-v2在文本匹配中的应用:企业级语义搜索落地案例

all-MiniLM-L6-v2在文本匹配中的应用&#xff1a;企业级语义搜索落地案例 1. 为什么企业需要轻量又靠谱的语义搜索能力 你有没有遇到过这样的问题&#xff1a;客服系统里堆积着上万条产品FAQ&#xff0c;但用户输入“手机充不进电怎么办”&#xff0c;系统却只返回“电池保养…

作者头像 李华
网站建设 2026/3/15 18:30:09

Qwen3-Reranker-0.6B效果对比:传统分类器vs Decoder-only重排序精度实测

Qwen3-Reranker-0.6B效果对比&#xff1a;传统分类器vs Decoder-only重排序精度实测 1. 为什么重排序不能只靠“打分”&#xff1f;——从RAG落地卡点说起 你有没有遇到过这样的情况&#xff1a;在做知识库问答时&#xff0c;检索模块返回了10个文档&#xff0c;前3个看起来都…

作者头像 李华
网站建设 2026/3/23 14:36:21

MusePublic在数学建模竞赛中的创新应用案例

MusePublic在数学建模竞赛中的创新应用案例 数学建模竞赛里最让人头疼的&#xff0c;不是公式推导&#xff0c;也不是编程实现&#xff0c;而是从题目到方案之间的那一步——怎么把一段模糊的实际问题&#xff0c;快速拆解成可建模、可计算、可验证的清晰路径。我带过三届校队…

作者头像 李华
网站建设 2026/3/20 17:22:53

FLUX.1-dev创意工坊:用AI快速生成赛博朋克风格艺术作品

FLUX.1-dev创意工坊&#xff1a;用AI快速生成赛博朋克风格艺术作品 你有没有试过在深夜刷到一张赛博朋克风的霓虹街景图——雨夜、全息广告、机械义体少女站在天台边缘&#xff0c;背景是层层叠叠的巨型建筑群&#xff0c;光晕在潮湿的空气中晕染开来&#xff1f;那一刻&#…

作者头像 李华