news 2026/5/30 18:28:00

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

在大规模数据处理场景中,YARN集群往往同时运行着数百个应用实例。当某个ETL任务因代码缺陷陷入死循环,或是某个临时查询占用了生产环境关键资源时,传统的手动操作方式显得力不从心。本文将通过实战案例,展示如何构建一个智能化的YARN应用管控系统,实现三个核心能力:实时状态巡检精准条件筛选批量操作执行

1. 构建YARN应用管理的基础工具链

1.1 环境准备与API接入

YARN ResourceManager的REST API是我们所有自动化操作的基础入口点。首先需要确保操作主机能够访问ResourceManager服务,并安装必要的命令行工具:

# 验证网络连通性 ping resourcemanager.example.com telnet resourcemanager.example.com 8088 # 安装基础工具包 sudo yum install -y curl jq python3-pip # CentOS/RHEL sudo apt-get install -y curl jq python3-pip # Ubuntu/Debian pip install requests pandas

提示:生产环境建议配置Kerberos认证,可通过kinit命令预先获取票据

1.2 应用状态查询的原型实现

通过组合curljq工具,可以快速构建应用列表查询功能:

#!/bin/bash RM_HOST="resourcemanager.example.com" API_URL="http://$RM_HOST:8088/ws/v1/cluster/apps" # 获取所有RUNNING状态的应用 curl -s "$API_URL" | jq '.apps.app[] | select(.state == "RUNNING")'

对应的Python实现版本更具扩展性:

import requests def fetch_yarn_apps(state='RUNNING'): api_url = "http://resourcemanager.example.com:8088/ws/v1/cluster/apps" params = {'states': state} if state else None response = requests.get(api_url, params=params) response.raise_for_status() return response.json().get('apps', {}).get('app', []) if __name__ == '__main__': running_apps = fetch_yarn_apps() print(f"Found {len(running_apps)} running applications")

2. 高级筛选策略的实现

2.1 多维度过滤条件设计

在实际运维中,我们需要根据多种条件组合筛选目标应用。下表列出了常见的过滤维度及其实现方式:

筛选维度数据类型示例值实现方法
用户字符串etl_user.user == "etl_user"
队列字符串production.queue == "production"
运行时长整数3600 (秒).elapsedTime > 3600000
资源占用浮点数50.0 (vcore占比).allocatedVCores > 50
应用类型枚举SPARK.applicationType == "SPARK"

2.2 动态过滤器的Python实现

通过封装过滤逻辑,可以构建灵活的筛选系统:

from datetime import datetime, timedelta class AppFilter: @staticmethod def by_user(apps, username): return [app for app in apps if app['user'] == username] @staticmethod def by_runtime(apps, hours): threshold = hours * 3600 * 1000 # 转为毫秒 return [app for app in apps if app['elapsedTime'] > threshold] @staticmethod def by_resource(apps, min_vcores, min_memory): return [ app for app in apps if app['allocatedVCores'] >= min_vcores and app['allocatedMB'] >= min_memory * 1024 ] # 使用示例 apps = fetch_yarn_apps() long_running = AppFilter.by_runtime(apps, hours=2) overloaded = AppFilter.by_resource(apps, min_vcores=8, min_memory=32)

3. 安全终止机制的实现

3.1 批量终止的防护措施

直接执行批量终止操作存在风险,需要建立防护机制:

  1. 二次确认机制:显示将被终止的应用详情,要求人工确认
  2. 模拟执行模式:仅输出将要执行的操作而不实际调用API
  3. 操作白名单:限制可操作的应用类型和用户范围
  4. 操作日志记录:详细记录每次终止操作的元数据

3.2 带防护的终止脚本实现

import logging from typing import List, Dict logging.basicConfig( filename='yarn_operations.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class YarnTerminator: def __init__(self, dry_run=False): self.dry_run = dry_run def safe_kill(self, app_ids: List[str], reason: str = ''): results = [] for app_id in app_ids: if self.dry_run: logging.info(f"DRY RUN: Would kill {app_id}") results.append({'id': app_id, 'status': 'simulated'}) continue try: url = f"http://resourcemanager.example.com:8088/ws/v1/cluster/apps/{app_id}/state" payload = {'state': 'KILLED'} headers = {'Content-Type': 'application/json'} response = requests.put(url, json=payload, headers=headers) if response.status_code == 200: logging.info(f"Killed {app_id} - Reason: {reason}") results.append({'id': app_id, 'status': 'success'}) else: logging.error(f"Failed to kill {app_id}: {response.text}") results.append({'id': app_id, 'status': 'failed'}) except Exception as e: logging.exception(f"Error killing {app_id}") results.append({'id': app_id, 'status': 'error'}) return results # 使用示例 terminator = YarnTerminator(dry_run=True) apps_to_kill = [app['id'] for app in overloaded] terminator.safe_kill(apps_to_kill, reason='Resource overcommit')

4. 系统集成与自动化调度

4.1 与监控系统对接

将YARN管理脚本集成到Prometheus监控系统中,可以通过Textfile Collector暴露指标:

#!/bin/bash OUTPUT_FILE="/var/lib/node_exporter/textfile_collector/yarn_metrics.prom" # 获取运行超时的应用数量 TIMEOUT_APPS=$(python3 yarn_manager.py --filter runtime=2h --count-only) cat <<EOF > "$OUTPUT_FILE" # HELP yarn_timeout_apps Number of applications running over 2 hours # TYPE yarn_timeout_apps gauge yarn_timeout_apps $TIMEOUT_APPS EOF

4.2 定时巡检的Crontab配置

设置定期执行的巡检任务:

# 每30分钟检查一次长时间运行的应用 */30 * * * * /usr/local/bin/yarn_manager.py --filter runtime=4h --notify # 每天凌晨清理测试环境残留应用 0 1 * * * /usr/local/bin/yarn_manager.py --queue test --older-than 24h --kill

4.3 完整的CLI工具封装

通过argparse模块创建功能完善的命令行工具:

import argparse def main(): parser = argparse.ArgumentParser(description='YARN Application Manager') parser.add_argument('--user', help='Filter by user') parser.add_argument('--queue', help='Filter by queue') parser.add_argument('--runtime', help='Filter by runtime (e.g. 2h, 30m)') parser.add_argument('--kill', action='store_true', help='Terminate matched apps') parser.add_argument('--dry-run', action='store_true', help='Simulate operations') parser.add_argument('--notify', action='store_true', help='Send alert notifications') args = parser.parse_args() # 应用过滤逻辑 filters = {} if args.user: filters['user'] = args.user if args.queue: filters['queue'] = args.queue if args.runtime: filters['runtime'] = parse_runtime(args.runtime) apps = fetch_yarn_apps() matched = apply_filters(apps, **filters) if args.kill: terminator = YarnTerminator(dry_run=args.dry_run) results = terminator.safe_kill([app['id'] for app in matched]) if args.notify and matched: send_notification(matched) def parse_runtime(time_str): # 实现时间字符串解析逻辑 pass

在实际项目中,这套系统将运维人员从重复的手动操作中解放出来。通过组合不同的过滤条件,可以精确控制操作范围,比如仅终止测试环境中运行超过8小时的Spark作业,或是清理特定用户提交的异常任务。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/30 18:19:58

基于ESP32与LVGL的嵌入式GUI开发:圣诞雪花球交互项目全解析

1. 项目概述与核心思路最近在捣鼓一个节日氛围小玩意儿&#xff0c;用Seeed Studio的XIAO ESP32S3和那块圆形的显示屏&#xff0c;做了一个可以交互的圣诞雪花球。这玩意儿摆在桌上&#xff0c;不仅有动态飘落的雪花&#xff0c;还能轻触屏幕切换不同的圣诞背景图&#xff0c;算…

作者头像 李华
网站建设 2026/5/30 18:17:58

3大核心模块深度解析:d2s-editor如何重塑你的暗黑2游戏体验

3大核心模块深度解析&#xff1a;d2s-editor如何重塑你的暗黑2游戏体验 【免费下载链接】d2s-editor 项目地址: https://gitcode.com/gh_mirrors/d2/d2s-editor 你是否曾在暗黑破坏神2中花费数百小时打造完美角色&#xff0c;却因一次意外丢失存档&#xff1f;或者想要…

作者头像 李华
网站建设 2026/5/30 18:15:27

利用树莓派打造低成本Wi-Fi信号扩展器:从硬件选型到网络配置全攻略

1. 项目概述与核心思路拆解 家里某个角落Wi-Fi信号总是断断续续&#xff0c;或者办公室的会议室成了网络死角&#xff0c;这种体验确实让人头疼。直接购买市售的Wi-Fi信号扩展器&#xff08;也叫中继器&#xff09;是一个选择&#xff0c;但如果你手边恰好有一台闲置的树莓派&a…

作者头像 李华
网站建设 2026/5/30 18:13:34

2026年SEO现状:精分时代的AI博弈

如果你在2025年告诉我&#xff0c;到了2026年&#xff0c;我最大的SEO问题不再是关键词密度或者外链建设&#xff0c;而是“要不要给机器人写一份说明书”&#xff0c;我大概会觉得你疯了。但我现在真的在查一个叫LLMs.txt的文件格式说明&#xff0c;这玩意儿去年还不存在。 好…

作者头像 李华
网站建设 2026/5/30 18:13:06

Kazumi WebDAV同步功能终极指南:3步实现跨设备番剧数据互通

Kazumi WebDAV同步功能终极指南&#xff1a;3步实现跨设备番剧数据互通 【免费下载链接】Kazumi 基于自定义规则的番剧采集APP&#xff0c;支持流媒体在线观看&#xff0c;支持弹幕&#xff0c;支持实时超分辨率。 项目地址: https://gitcode.com/gh_mirrors/ka/Kazumi …

作者头像 李华