从零开始学 Python:自动化 / 运维开发实战(核心库 + 3 大实战场景)
在运维工作中,重复的服务器巡检、批量部署、日志分析等任务不仅耗时耗力,还容易出现人为失误。而 Python 凭借丰富的运维核心库,能轻松实现这些任务的自动化,大幅提升运维效率、降低操作风险。
本文将从运维开发的核心库入手,详细讲解paramiko(SSH 远程操作)、fabric(批量运维)、ansible(自动化配置)、psutil(系统监控)的使用,再通过 3 个落地实战场景(服务器批量巡检、自动化部署、日志分析工具),帮你从零基础掌握 Python 自动化 / 运维开发,实现运维工作的 “提质增效”。
一、运维开发核心库:开箱即用的运维工具
Python 运维生态成熟,以下核心库覆盖了远程操作、批量管理、系统监控等核心场景,是运维自动化的基石。
1.1 paramiko:SSH 远程操作核心库
paramiko是 Python 实现的 SSH2 协议库,支持 SSH 远程登录、执行命令、上传下载文件,无需依赖系统 SSH 客户端,是实现单台 / 多台服务器远程操作的基础。
前置条件:安装 paramiko
pip install paramiko核心用法
场景 1:SSH 远程执行命令
import paramiko def ssh_execute_command(host, port, username, password, command): """ SSH远程登录并执行命令 :param host: 服务器IP :param port: SSH端口 :param username: 登录用户名 :param password: 登录密码 :param command: 要执行的系统命令 :return: 命令执行结果 """ # 1. 创建SSH客户端对象 ssh_client = paramiko.SSHClient() # 2. 自动添加未知主机密钥(生产环境建议手动配置) ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 3. 连接服务器 ssh_client.connect( hostname=host, port=port, username=username, password=password, timeout=10 ) # 4. 执行命令(stdin: 输入, stdout: 输出, stderr: 错误) stdin, stdout, stderr = ssh_client.exec_command(command) # 5. 获取执行结果 stdout_result = stdout.read().decode("utf-8", errors="ignore") stderr_result = stderr.read().decode("utf-8", errors="ignore") return { "status": "success", "stdout": stdout_result, "stderr": stderr_result } except Exception as e: return { "status": "failed", "error": str(e) } finally: # 6. 关闭连接 ssh_client.close() # 调用示例 result = ssh_execute_command( host="192.168.1.100", port=22, username="root", password="your_server_password", command="df -h # 查看磁盘使用情况" ) # 打印结果 if result["status"] == "success": print("命令执行成功,输出结果:") print(result["stdout"]) if result["stderr"]: print("命令执行警告/错误:") print(result["stderr"]) else: print(f"命令执行失败:{result['error']}")场景 2:SFTP 上传 / 下载文件
import paramiko def sftp_upload_file(host, port, username, password, local_file, remote_file): """ SFTP上传本地文件到远程服务器 """ transport = paramiko.Transport((host, port)) try: # 连接SFTP transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) # 上传文件 sftp.put(local_file, remote_file) print(f"文件 {local_file} 已成功上传到 {remote_file}") return True except Exception as e: print(f"文件上传失败:{str(e)}") return False finally: transport.close() def sftp_download_file(host, port, username, password, remote_file, local_file): """ SFTP从远程服务器下载文件到本地 """ transport = paramiko.Transport((host, port)) try: transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) # 下载文件 sftp.get(remote_file, local_file) print(f"文件 {remote_file} 已成功下载到 {local_file}") return True except Exception as e: print(f"文件下载失败:{str(e)}") return False finally: transport.close() # 调用示例 sftp_upload_file( host="192.168.1.100", port=22, username="root", password="your_server_password", local_file="local_test.txt", remote_file="/root/remote_test.txt" ) sftp_download_file( host="192.168.1.100", port=22, username="root", password="your_server_password", remote_file="/root/remote_test.txt", local_file="downloaded_test.txt" )1.2 fabric:批量运维自动化工具
fabric是基于paramiko封装的高级批量运维库,支持批量执行命令、批量上传下载文件、任务编排,语法更简洁,是批量管理服务器的首选工具(当前主流版本为Fabric 3,兼容 Python3)。
前置条件:安装 fabric
pip install fabric3核心用法:批量执行运维任务
from fabric import Connection from invoke import Responder # 1. 定义服务器列表(可从配置文件读取) servers = [ {"host": "192.168.1.100", "port": 22, "user": "root", "password": "your_server_password"}, {"host": "192.168.1.101", "port": 22, "user": "root", "password": "your_server_password"} ] # 2. 定义批量执行命令的函数 def batch_execute_command(servers, command): """ 批量在多台服务器上执行命令 """ for server in servers: print(f"========== 开始处理服务器 {server['host']} ==========") try: # 创建连接 conn = Connection( host=server["host"], port=server["port"], user=server["user"], connect_kwargs={"password": server["password"]} ) # 执行命令(支持sudo提权,若需要) sudo_responder = Responder( pattern=r"\[sudo\] password for .*:", response=f"{server['password']}\n" ) # 执行普通命令 result = conn.run(command, hide=False, warn=True) if result.ok: print(f"服务器 {server['host']} 命令执行成功,输出:\n{result.stdout}") else: print(f"服务器 {server['host']} 命令执行失败,错误:\n{result.stderr}") except Exception as e: print(f"服务器 {server['host']} 连接/执行失败:{str(e)}") finally: print(f"========== 结束处理服务器 {server['host']} ==========\n") # 3. 定义批量上传文件的函数 def batch_upload_file(servers, local_file, remote_file): """ 批量上传文件到多台服务器 """ for server in servers: print(f"========== 开始上传文件到服务器 {server['host']} ==========") try: conn = Connection( host=server["host"], port=server["port"], user=server["user"], connect_kwargs={"password": server["password"]} ) # 上传文件 conn.put(local_file, remote_file) print(f"文件已成功上传到服务器 {server['host']} 的 {remote_file}") except Exception as e: print(f"服务器 {server['host']} 文件上传失败:{str(e)}") finally: print(f"========== 结束上传服务器 {server['host']} ==========\n") # 4. 调用批量任务 if __name__ == "__main__": # 批量执行查看内存命令 batch_execute_command(servers, "free -h") # 批量上传配置文件 batch_upload_file(servers, "app.conf", "/etc/app.conf")1.3 Ansible:自动化配置与运维(Python 脚本集成)
Ansible 是一款强大的 IT 自动化工具,基于 SSH 实现无代理批量运维,支持配置管理、应用部署、任务编排,其核心功能可通过 Python 脚本调用,实现更灵活的自动化流程。
前置条件:安装 Ansible
# Ubuntu/Debian apt install ansible # CentOS/RHEL yum install ansible # 或通过pip安装 pip install ansible核心用法:Python 脚本调用 Ansible
import ansible.runner import ansible.playbook import ansible.inventory # 1. 定义Ansible配置 inventory = ansible.inventory.Inventory(["192.168.1.100", "192.168.1.101"]) private_key_file = "/root/.ssh/id_rsa" # 免密登录密钥(推荐) remote_user = "root" # 2. 执行单个命令(runner方式) def ansible_execute_command(hosts, command): """ 用Ansible批量执行命令 """ runner = ansible.runner.Runner( pattern='*', module_name='command', module_args=command, inventory=inventory, remote_user=remote_user, private_key_file=private_key_file ) # 执行并获取结果 result = runner.run() if result: for host, host_result in result["contacted"].items(): print(f"========== 服务器 {host} ==========") if "stdout" in host_result: print(f"执行成功:\n{host_result['stdout']}") else: print(f"执行失败:\n{host_result['msg']}") # 处理未连接的服务器 for host in result["dark"]: print(f"服务器 {host} 无法连接") # 3. 执行Ansible Playbook(更复杂的任务编排) def ansible_run_playbook(playbook_path): """ 用Ansible执行Playbook """ pb = ansible.playbook.PlayBook( playbook=playbook_path, inventory=inventory, remote_user=remote_user, private_key_file=private_key_file ) # 执行Playbook result = pb.run() print(f"Playbook执行完成,结果:{result}") # 4. 调用示例 if __name__ == "__main__": # 批量执行磁盘查看命令 ansible_execute_command(["192.168.1.100", "192.168.1.101"], "df -h") # 执行Playbook(需提前编写deploy.yml) # ansible_run_playbook("deploy.yml")补充:简易 Ansible Playbook 示例(deploy.yml)
- hosts: all remote_user: root tasks: - name: 安装nginx yum: name: nginx state: present - name: 启动nginx并设置开机自启 service: name: nginx state: started enabled: yes1.4 psutil:系统监控与信息采集
psutil是 Python 跨平台系统监控库,支持获取 CPU、内存、磁盘、网络、进程等系统信息,无需调用系统命令,返回结构化数据,是服务器巡检、监控工具开发的核心库。
前置条件:安装 psutil
pip install psutil核心用法:获取系统核心信息
import psutil import datetime def get_system_info(): """ 采集服务器核心系统信息 :return: 系统信息字典 """ system_info = {} # 1. 基本系统信息 system_info["boot_time"] = datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime("%Y-%m-%d %H:%M:%S") # 2. CPU信息 cpu_count = psutil.cpu_count(logical=True) # 逻辑CPU数 cpu_percent = psutil.cpu_percent(interval=1, percpu=True) # 每个CPU使用率(间隔1秒) system_info["cpu"] = { "logical_count": cpu_count, "total_percent": psutil.cpu_percent(interval=0), "per_cpu_percent": cpu_percent } # 3. 内存信息 mem = psutil.virtual_memory() system_info["memory"] = { "total_gb": round(mem.total / (1024**3), 2), # 总内存(GB) "used_gb": round(mem.used / (1024**3), 2), # 已使用内存(GB) "free_gb": round(mem.free / (1024**3), 2), # 空闲内存(GB) "used_percent": mem.percent # 内存使用率 } # 4. 磁盘信息(仅获取挂载的本地磁盘) disk_info = {} for partition in psutil.disk_partitions(all=False): if partition.fstype: disk_usage = psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] = { "total_gb": round(disk_usage.total / (1024**3), 2), "used_gb": round(disk_usage.used / (1024**3), 2), "free_gb": round(disk_usage.free / (1024**3), 2), "used_percent": disk_usage.percent } system_info["disk"] = disk_info # 5. 网络信息(获取总收发流量) net_io = psutil.net_io_counters() system_info["network"] = { "sent_gb": round(net_io.bytes_sent / (1024**3), 2), "recv_gb": round(net_io.bytes_recv / (1024**3), 2) } return system_info # 调用示例:打印系统信息 if __name__ == "__main__": sys_info = get_system_info() print("========== 服务器系统信息 ==========") print(f"开机时间:{sys_info['boot_time']}") print(f"CPU使用率:{sys_info['cpu']['total_percent']}%") print(f"内存使用率:{sys_info['memory']['used_percent']}%") print(f"磁盘使用率(/):{sys_info['disk']['/']['used_percent']}%")二、运维实战场景 1:服务器批量巡检脚本
场景需求
批量巡检多台服务器的 CPU、内存、磁盘使用率,设置阈值告警(如 CPU 使用率 > 80%、内存 > 90%、磁盘 > 95%),将巡检结果输出到日志文件并生成简易报告。
完整实现代码
import psutil import datetime import logging from fabric import Connection # 1. 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("server_inspection.log", encoding="utf-8"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 2. 配置项 SERVERS = [ {"host": "192.168.1.100", "port": 22, "user": "root", "password": "your_server_password"}, {"host": "192.168.1.101", "port": 22, "user": "root", "password": "your_server_password"} ] # 告警阈值 THRESHOLDS = { "cpu": 80, "memory": 90, "disk": 95 } # 巡检报告保存路径 REPORT_PATH = "server_inspection_report.txt" # 3. 本地巡检函数(若脚本在目标服务器运行) def local_inspection(): """本地服务器巡检""" try: sys_info = get_system_info() return {"status": "success", "data": sys_info} except Exception as e: logger.error(f"本地巡检失败:{str(e)}") return {"status": "failed", "error": str(e)} # 4. 远程巡检函数(通过fabric远程调用psutil,需目标服务器已安装psutil) def remote_inspection(server): """远程服务器巡检""" logger.info(f"开始巡检远程服务器:{server['host']}") try: # 连接远程服务器 conn = Connection( host=server["host"], port=server["port"], user=server["user"], connect_kwargs={"password": server["password"]} ) # 远程执行Python脚本(采集系统信息) remote_script = """ import psutil import datetime def get_system_info(): system_info = {} system_info["boot_time"] = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S") system_info["cpu"] = {"total_percent": psutil.cpu_percent(interval=1)} mem = psutil.virtual_memory() system_info["memory"] = {"used_percent": mem.percent} disk_info = {} for p in psutil.disk_partitions(all=False): if p.fstype: du = psutil.disk_usage(p.mountpoint) disk_info[p.mountpoint] = {"used_percent": du.percent} system_info["disk"] = disk_info return system_info print(get_system_info()) """ # 执行远程脚本并获取结果 result = conn.run(f"python3 -c '{remote_script}'", hide=True, warn=True) if not result.ok: logger.error(f"服务器 {server['host']} 执行脚本失败:{result.stderr}") return {"status": "failed", "host": server["host"], "error": result.stderr} # 解析结果(简化处理,实际可使用json序列化) import ast sys_info = ast.literal_eval(result.stdout) logger.info(f"服务器 {server['host']} 巡检成功") return {"status": "success", "host": server["host"], "data": sys_info} except Exception as e: logger.error(f"服务器 {server['host']} 巡检异常:{str(e)}") return {"status": "failed", "host": server["host"], "error": str(e)} # 5. 生成巡检报告 def generate_report(inspection_results): """生成巡检报告""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") report = [f"# 服务器批量巡检报告 {now}", "="*50, ""] for result in inspection_results: if result["status"] == "success": host = result["host"] data = result["data"] report.append(f"## 服务器:{host}") report.append(f" 开机时间:{data['boot_time']}") report.append(f" CPU使用率:{data['cpu']['total_percent']}% {'[告警]' if data['cpu']['total_percent'] > THRESHOLDS['cpu'] else ''}") report.append(f" 内存使用率:{data['memory']['used_percent']}% {'[告警]' if data['memory']['used_percent'] > THRESHOLDS['memory'] else ''}") report.append(" 磁盘使用率:") for mount_point, disk_data in data["disk"].items(): alarm = "[告警]" if disk_data["used_percent"] > THRESHOLDS["disk"] else "" report.append(f" {mount_point}:{disk_data['used_percent']}% {alarm}") else: report.append(f"## 服务器:{result['host']} [巡检失败]") report.append(f" 错误信息:{result['error']}") report.append("") # 写入报告文件 with open(REPORT_PATH, "w", encoding="utf-8") as f: f.write("\n".join(report)) logger.info(f"巡检报告已生成,保存路径:{REPORT_PATH}") # 6. 核心:系统信息采集(与前文一致,简化版) def get_system_info(): system_info = {} system_info["boot_time"] = datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime("%Y-%m-%d %H:%M:%S") system_info["cpu"] = {"total_percent": psutil.cpu_percent(interval=1)} mem = psutil.virtual_memory() system_info["memory"] = {"used_percent": mem.percent} disk_info = {} for partition in psutil.disk_partitions(all=False): if partition.fstype: disk_usage = psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] = {"used_percent": disk_usage.percent} system_info["disk"] = disk_info return system_info # 7. 主函数:批量巡检 def main(): logger.info("========== 开始服务器批量巡检 ==========") inspection_results = [] # 遍历所有服务器进行巡检 for server in SERVERS: result = remote_inspection(server) inspection_results.append(result) # 生成巡检报告 generate_report(inspection_results) logger.info("========== 服务器批量巡检结束 ==========") if __name__ == "__main__": main()使用说明
- 修改
SERVERS配置中的服务器 IP、账号、密码; - 目标服务器需安装 Python3 和
psutil(可通过批量命令提前安装:pip3 install psutil); - 运行脚本后,生成
server_inspection.log(巡检日志)和server_inspection_report.txt(巡检报告); - 超过阈值的指标会标注
[告警],可后续扩展邮件 / 钉钉告警功能。
三、运维实战场景 2:自动化部署脚本
场景需求
实现 Python/Web 项目的自动化部署,核心流程:「从 Git 拉取最新代码」→「安装 / 更新依赖」→「项目打包(可选)」→「停止旧服务」→「启动新服务」→「验证服务可用性」。
完整实现代码
import logging from fabric import Connection from datetime import datetime # 1. 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("deploy_automation.log", encoding="utf-8"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 2. 部署配置 DEPLOY_CONFIG = { "server": { "host": "192.168.1.100", "port": 22, "user": "root", "password": "your_server_password" }, "project": { "name": "my_python_project", "git_repo": "https://github.com/your-name/your-project.git", "deploy_path": "/opt/projects", "venv_path": "/opt/venv", # 虚拟环境路径 "requirements_file": "requirements.txt", "service_name": "my_project.service" # systemd服务名 } } # 3. 定义单个部署步骤 def git_pull_code(conn, project_config): """从Git拉取最新代码""" project_path = f"{project_config['deploy_path']}/{project_config['name']}" logger.info(f"开始拉取代码,项目路径:{project_path}") # 检查项目目录是否存在,不存在则克隆 result = conn.run(f"test -d {project_path}", warn=True) if result.failed: logger.info("项目目录不存在,开始克隆仓库") conn.run(f"git clone {project_config['git_repo']} {project_path}", hide=False) else: logger.info("项目目录已存在,拉取最新代码") conn.run(f"cd {project_path} && git pull", hide=False) logger.info("代码拉取完成") def install_dependencies(conn, project_config): """安装/更新项目依赖""" project_path = f"{project_config['deploy_path']}/{project_config['name']}" venv_pip = f"{project_config['venv_path']}/bin/pip" requirements_path = f"{project_path}/{project_config['requirements_file']}" logger.info("开始安装/更新项目依赖") result = conn.run( f"{venv_pip} install -r {requirements_path} --upgrade", hide=False, warn=True ) if result.ok: logger.info("依赖安装/更新完成") else: logger.error(f"依赖安装失败:{result.stderr}") raise Exception("依赖安装步骤失败") def stop_old_service(conn, project_config): """停止旧服务""" logger.info("开始停止旧服务") result = conn.run( f"systemctl stop {project_config['service_name']}", warn=True, hide=False ) if result.ok or "Unit * does not exist" in result.stderr: logger.info("旧服务已停止(或服务未存在)") else: logger.error(f"停止旧服务失败:{result.stderr}") raise Exception("停止旧服务步骤失败") def start_new_service(conn, project_config): """启动新服务""" logger.info("开始启动新服务") # 重新加载systemd配置(若服务文件有修改) conn.run("systemctl daemon-reload", hide=False) # 启动服务并设置开机自启 result = conn.run( f"systemctl start {project_config['service_name']} && systemctl enable {project_config['service_name']}", hide=False, warn=True ) if result.ok: logger.info("新服务启动成功并设置开机自启") else: logger.error(f"启动新服务失败:{result.stderr}") raise Exception("启动新服务步骤失败") def verify_service(conn, project_config): """验证服务可用性(示例:检查服务状态+访问接口)""" logger.info("开始验证服务可用性") # 1. 检查systemd服务状态 service_result = conn.run( f"systemctl is-active {project_config['service_name']}", hide=True, warn=True ) if service_result.stdout.strip() != "active": logger.error(f"服务未正常运行,状态:{service_result.stdout}") raise Exception("服务验证失败:服务未激活") # 2. 可选:访问项目接口验证(示例:curl访问本地端口) api_result = conn.run( f"curl -s -w '%{{http_code}}' http://127.0.0.1:8000/health -o /dev/null", hide=True, warn=True ) if api_result.stdout.strip() == "200": logger.info("服务接口验证成功,返回200 OK") else: logger.warning(f"服务接口验证返回非200,状态码:{api_result.stdout}") # 4. 主部署流程 def automated_deploy(): """自动化部署主流程""" logger.info("========== 开始项目自动化部署 ==========") server_config = DEPLOY_CONFIG["server"] project_config = DEPLOY_CONFIG["project"] # 创建服务器连接 try: conn = Connection( host=server_config["host"], port=server_config["port"], user=server_config["user"], connect_kwargs={"password": server_config["password"]} ) # 执行部署步骤(按顺序编排) git_pull_code(conn, project_config) install_dependencies(conn, project_config) stop_old_service(conn, project_config) start_new_service(conn, project_config) verify_service(conn, project_config) logger.info("========== 项目自动化部署全部完成 ==========") return True except Exception as e: logger.error(f"========== 自动化部署失败:{str(e)} ==========") return False if __name__ == "__main__": automated_deploy()补充说明
- 项目需使用
systemd管理服务(创建my_project.service放在/etc/systemd/system/); - 目标服务器需安装 Git、Python3、虚拟环境(可提前批量配置);
- 若为 Java 项目,可修改步骤为「打包(mvn package)」→「替换 JAR 包」→「重启服务」;
- 可扩展回滚功能:部署前备份旧代码 / 旧包,部署失败时恢复。
示例 systemd 服务文件(my_project.service)
[Unit] Description=My Python Project Service After=network.target [Service] User=root WorkingDirectory=/opt/projects/my_python_project ExecStart=/opt/venv/bin/python3 app.py Restart=on-failure RestartSec=5 [Install] WantedBy=multi-user.target四、运维实战场景 3:日志分析工具
场景需求
开发一款通用日志分析工具,支持:「按关键词过滤日志」→「统计指定报错信息出现次数」→「按时间范围筛选日志」→「生成分析报告」,适用于 Nginx、Python、Java 等各类日志文件。
完整实现代码
import logging import re from datetime import datetime from collections import Counter # 1. 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # 2. 日志分析核心功能 class LogAnalyzer: def __init__(self, log_file_path): self.log_file_path = log_file_path self.all_log_lines = [] # 存储所有日志行 self.load_log_file() def load_log_file(self): """加载日志文件到内存(大文件可改为逐行处理)""" logger.info(f"开始加载日志文件:{self.log_file_path}") try: with open(self.log_file_path, "r", encoding="utf-8", errors="ignore") as f: self.all_log_lines = [line.strip() for line in f if line.strip()] logger.info(f"日志文件加载完成,共 {len(self.all_log_lines)} 行有效日志") except FileNotFoundError: logger.error(f"日志文件不存在:{self.log_file_path}") raise except Exception as e: logger.error(f"加载日志文件失败:{str(e)}") raise def filter_by_keyword(self, keywords, exclude_keywords=None): """ 按关键词过滤日志 :param keywords: 包含的关键词列表(任意匹配) :param exclude_keywords: 排除的关键词列表(任意匹配) :return: 过滤后的日志列表 """ if exclude_keywords is None: exclude_keywords = [] filtered_lines = [] for line in self.all_log_lines: # 检查是否包含任意关键词 include_flag = any(keyword.lower() in line.lower() for keyword in keywords) # 检查是否排除任意关键词 exclude_flag = any(exclude_keyword.lower() in line.lower() for exclude_keyword in exclude_keywords) if include_flag and not exclude_flag: filtered_lines.append(line) logger.info(f"按关键词过滤完成,共 {len(filtered_lines)} 行匹配日志") return filtered_lines def count_error_messages(self, error_patterns): """ 统计报错信息出现次数 :param error_patterns: 报错正则表达式列表 :return: 报错统计结果(Counter) """ error_messages = [] for line in self.all_log_lines: for pattern in error_patterns: match = re.search(pattern, line, re.IGNORECASE) if match: error_msg = match.group(0) if match.group(0) else line[:100] # 截取前100字符 error_messages.append(error_msg) break error_counter = Counter(error_messages) logger.info(f"报错信息统计完成,共识别 {len(error_counter)} 种不同报错") return error_counter def filter_by_time_range(self, start_time, end_time, time_pattern=r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"): """ 按时间范围筛选日志 :param start_time: 开始时间(字符串,格式匹配time_pattern) :param end_time: 结束时间(字符串,格式匹配time_pattern) :param time_pattern: 日志中的时间格式正则 :return: 过滤后的日志列表 """ # 解析开始/结束时间 try: start_dt = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") end_dt = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") except ValueError: logger.error("时间格式错误,需符合:YYYY-MM-DD HH:MM:SS") raise filtered_lines = [] for line in self.all_log_lines: # 提取日志中的时间 time_match = re.search(time_pattern, line) if not time_match: continue try: log_dt = datetime.strptime(time_match.group(0), "%Y-%m-%d %H:%M:%S") if start_dt <= log_dt <= end_dt: filtered_lines.append(line) except ValueError: continue logger.info(f"按时间范围过滤完成,共 {len(filtered_lines)} 行匹配日志") return filtered_lines def generate_analysis_report(self, report_path, keywords=None, error_patterns=None, time_range=None): """ 生成日志分析报告 :param report_path: 报告保存路径 :param keywords: 过滤关键词 :param error_patterns: 报错正则 :param time_range: 时间范围((start_time, end_time)) """ if keywords is None: keywords = [] if error_patterns is None: error_patterns = [r"error|exception|fail|fatal", r"500 Internal Server Error", r"404 Not Found"] if time_range is None: time_range = ("", "") logger.info(f"开始生成日志分析报告:{report_path}") report = [ f"# 日志分析报告", f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"日志文件:{self.log_file_path}", f"日志总行数:{len(self.all_log_lines)}", "="*60, "" ] # 1. 关键词过滤结果(若指定关键词) if keywords: keyword_lines = self.filter_by_keyword(keywords) report.append(f"## 一、关键词过滤结果(关键词:{','.join(keywords)})") report.append(f"匹配行数:{len(keyword_lines)}") report.append("前10条匹配日志:") for i, line in enumerate(keyword_lines[:10]): report.append(f" {i+1}. {line}") report.append("") # 2. 报错信息统计 error_counter = self.count_error_messages(error_patterns) report.append(f"## 二、报错信息统计") report.append(f"报错类型数:{len(error_counter)}") report.append(f"报错总次数:{sum(error_counter.values())}") report.append("报错Top10排名:") for i, (error_msg, count) in enumerate(error_counter.most_common(10), 1): report.append(f" {i}. 「{error_msg}」:{count} 次") report.append("") # 3. 时间范围过滤结果(若指定时间范围) if all(time_range): time_lines = self.filter_by_time_range(time_range[0], time_range[1]) report.append(f"## 三、时间范围过滤结果({time_range[0]} - {time_range[1]})") report.append(f"匹配行数:{len(time_lines)}") report.append("") # 4. 总结 report.append(f"## 四、分析总结") if sum(error_counter.values()) == 0: report.append("未检测到明显报错信息,日志整体正常。") else: report.append(f"检测到 {sum(error_counter.values())} 次报错,需重点关注Top3报错类型。") # 写入报告文件 with open(report_path, "w", encoding="utf-8") as f: f.write("\n".join(report)) logger.info(f"日志分析报告生成完成,保存路径:{report_path}") # 3. 主函数:使用日志分析工具 def main(): # 配置项 LOG_FILE = "/var/log/nginx/access.log" # 日志文件路径 REPORT_FILE = "log_analysis_report.txt" # 初始化日志分析器 try: analyzer = LogAnalyzer(LOG_FILE) # 生成分析报告(可自定义关键词、报错正则、时间范围) analyzer.generate_analysis_report( report_path=REPORT_FILE, keywords=["404", "500"], time_range=("2024-01-01 00:00:00", "2024-01-31 23:59:59") ) except Exception as e: logger.error(f"日志分析失败:{str(e)}") if __name__ == "__main__": main()使用说明
- 修改
LOG_FILE为目标日志文件路径(Nginx、Python、Java 日志均可); - 支持自定义
keywords(过滤关键词)、error_patterns(报错正则)、time_range(时间范围); - 大日志文件可优化为「逐行处理」,避免内存溢出;
- 生成的
log_analysis_report.txt包含详细分析结果,可直接用于运维排查。
五、总结
本文系统讲解了 Python 自动化 / 运维开发的核心库和 3 个落地实战场景,核心要点总结如下:
- 核心运维库:
paramiko实现基础 SSH/SFTP 操作,fabric简化批量运维,ansible适合复杂配置编排,psutil实现系统信息采集,四者覆盖运维核心需求; - 批量巡检脚本:基于
fabric+psutil实现多服务器指标采集,设置阈值告警,生成结构化报告,解决重复巡检痛点; - 自动化部署脚本:按「拉取代码→安装依赖→启停服务→验证可用性」编排任务,支持扩展回滚、多环境部署,提升部署效率和一致性;
- 日志分析工具:基于
re正则和Counter统计,支持关键词 / 时间过滤、报错统计,生成可视化报告,辅助快速排查问题。