DeOldify服务审计日志:记录所有图片上传、处理、下载行为与时间戳
1. 为什么需要审计日志?
想象一下,你搭建了一个DeOldify黑白图片上色服务,每天都有用户上传老照片进行处理。突然有一天,有人问:“昨天下午3点谁上传了那张全家福?处理结果发给谁了?”或者“我们这个月总共处理了多少张图片?平均处理时间是多少?”
如果你回答不上来,那就尴尬了。
这就是审计日志的价值所在。它就像服务的“行车记录仪”,完整记录每一次操作的来龙去脉。对于DeOldify这样的图像处理服务,审计日志能帮你:
- 追踪操作历史:谁在什么时候上传了什么图片,处理结果给了谁
- 分析使用情况:了解服务的使用频率、高峰时段、处理成功率
- 排查问题:当处理失败时,能快速定位是图片问题、网络问题还是服务问题
- 安全审计:监控异常访问,防止恶意使用
- 计费依据:如果按使用量收费,日志就是最准确的计费凭证
今天,我就带你为DeOldify服务添加一套完整的审计日志系统,记录所有关键操作的时间戳和详细信息。
2. 审计日志系统设计
2.1 记录哪些信息?
一个好的审计日志应该包含“5W1H”要素:
- Who:谁发起的请求(IP地址、用户ID)
- What:做了什么操作(上传、处理、下载)
- When:什么时候发生的(精确到毫秒的时间戳)
- Where:从哪里来的请求(来源IP、User-Agent)
- Why:操作结果如何(成功/失败、错误信息)
- How:如何处理的(处理时长、输出格式)
对于DeOldify服务,我们需要记录:
- 图片上传:谁上传了什么图片,文件大小、格式、上传时间
- 处理请求:处理开始时间、使用的模型、参数设置
- 处理结果:处理结束时间、是否成功、处理耗时
- 结果下载:谁下载了处理结果、下载时间
- 服务状态:服务启动、停止、异常事件
2.2 日志存储方案
根据使用场景不同,可以选择不同的存储方案:
| 存储方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 文件日志 | 简单、无需额外依赖、易于查看 | 查询复杂、性能一般 | 小规模、单机部署 |
| 数据库 | 查询灵活、支持复杂分析 | 需要数据库服务、有性能开销 | 中大规模、需要统计分析 |
| 日志系统 | 专业、支持实时分析、扩展性好 | 架构复杂、学习成本高 | 大规模、生产环境 |
对于大多数DeOldify服务,文件日志+数据库的组合是比较实用的选择。日常查看用文件日志,统计分析用数据库。
3. 实现审计日志功能
3.1 修改DeOldify服务代码
首先,我们需要在现有的DeOldify服务中添加日志记录功能。这里以Flask应用为例:
import logging import json import time from datetime import datetime from flask import Flask, request, jsonify import sqlite3 import os app = Flask(__name__) # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('deoldify_audit.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 初始化数据库 def init_db(): conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 创建审计日志表 c.execute(''' CREATE TABLE IF NOT EXISTS audit_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, client_ip TEXT, user_agent TEXT, operation TEXT NOT NULL, file_name TEXT, file_size INTEGER, file_type TEXT, request_id TEXT, status TEXT NOT NULL, processing_time REAL, error_message TEXT, additional_info TEXT ) ''') # 创建索引以提高查询性能 c.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON audit_logs(timestamp)') c.execute('CREATE INDEX IF NOT EXISTS idx_operation ON audit_logs(operation)') c.execute('CREATE INDEX IF NOT EXISTS idx_status ON audit_logs(status)') conn.commit() conn.close() # 记录审计日志 def log_audit_event(operation, status, **kwargs): """记录审计日志到文件和数据库""" # 获取请求信息 client_ip = request.remote_addr if request else 'N/A' user_agent = request.headers.get('User-Agent', 'N/A') if request else 'N/A' # 构建日志记录 log_entry = { 'timestamp': datetime.now().isoformat(), 'client_ip': client_ip, 'user_agent': user_agent, 'operation': operation, 'status': status, **kwargs } # 记录到文件日志 logger.info(f"{operation} - {status} - IP: {client_ip} - {kwargs.get('file_name', 'N/A')}") # 记录到数据库 try: conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() c.execute(''' INSERT INTO audit_logs (timestamp, client_ip, user_agent, operation, file_name, file_size, file_type, request_id, status, processing_time, error_message, additional_info) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( log_entry['timestamp'], log_entry['client_ip'], log_entry['user_agent'], log_entry['operation'], log_entry.get('file_name'), log_entry.get('file_size'), log_entry.get('file_type'), log_entry.get('request_id'), log_entry['status'], log_entry.get('processing_time'), log_entry.get('error_message'), json.dumps(log_entry.get('additional_info', {})) )) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to save audit log to database: {e}") # 装饰器:自动记录处理时间 def audit_log(operation_name): def decorator(func): def wrapper(*args, **kwargs): start_time = time.time() request_id = f"req_{int(start_time * 1000)}_{os.urandom(4).hex()}" try: # 记录开始日志 log_audit_event( operation=operation_name, status='started', request_id=request_id, additional_info={'args': str(args), 'kwargs': str(kwargs)} ) # 执行原函数 result = func(*args, **kwargs) # 记录成功日志 processing_time = time.time() - start_time log_audit_event( operation=operation_name, status='success', request_id=request_id, processing_time=processing_time ) return result except Exception as e: # 记录失败日志 processing_time = time.time() - start_time log_audit_event( operation=operation_name, status='failed', request_id=request_id, processing_time=processing_time, error_message=str(e) ) raise e return wrapper return decorator # DeOldify处理端点(添加审计日志) @app.route('/colorize', methods=['POST']) @audit_log('image_upload_and_process') def colorize_image(): """处理图片上色请求""" # 检查是否有文件 if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image file provided'}), 400 file = request.files['image'] # 记录文件信息 file_info = { 'file_name': file.filename, 'file_size': len(file.read()), 'file_type': file.content_type } # 重置文件指针 file.seek(0) # 这里添加额外的文件信息到日志上下文 # 在实际实现中,可能需要修改装饰器来传递这些信息 # 模拟处理过程 time.sleep(2) # 模拟处理时间 # 返回处理结果 return jsonify({ 'success': True, 'message': 'Image processed successfully', 'request_id': '模拟请求ID' }) # 下载端点(添加审计日志) @app.route('/download/<request_id>', methods=['GET']) @audit_log('result_download') def download_result(request_id): """下载处理结果""" # 模拟下载过程 time.sleep(0.5) return jsonify({ 'success': True, 'message': f'Downloaded result for request {request_id}' }) # 健康检查端点 @app.route('/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ 'status': 'healthy', 'service': 'deoldify-colorization', 'timestamp': datetime.now().isoformat() }) # 审计日志查询端点 @app.route('/audit/logs', methods=['GET']) def get_audit_logs(): """查询审计日志""" # 获取查询参数 operation = request.args.get('operation') status = request.args.get('status') start_time = request.args.get('start_time') end_time = request.args.get('end_time') limit = int(request.args.get('limit', 100)) # 构建查询 query = "SELECT * FROM audit_logs WHERE 1=1" params = [] if operation: query += " AND operation = ?" params.append(operation) if status: query += " AND status = ?" params.append(status) if start_time: query += " AND timestamp >= ?" params.append(start_time) if end_time: query += " AND timestamp <= ?" params.append(end_time) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) # 执行查询 conn = sqlite3.connect('deoldify_audit.db') conn.row_factory = sqlite3.Row # 返回字典格式 c = conn.cursor() c.execute(query, params) rows = c.fetchall() # 转换为字典列表 logs = [] for row in rows: log_dict = dict(row) if log_dict['additional_info']: log_dict['additional_info'] = json.loads(log_dict['additional_info']) logs.append(log_dict) conn.close() return jsonify({ 'success': True, 'count': len(logs), 'logs': logs }) # 服务统计端点 @app.route('/audit/stats', methods=['GET']) def get_audit_stats(): """获取服务统计信息""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 总请求数 c.execute("SELECT COUNT(*) FROM audit_logs") total_requests = c.fetchone()[0] # 成功/失败统计 c.execute("SELECT status, COUNT(*) FROM audit_logs GROUP BY status") status_stats = dict(c.fetchall()) # 操作类型统计 c.execute("SELECT operation, COUNT(*) FROM audit_logs GROUP BY operation") operation_stats = dict(c.fetchall()) # 今日请求数 today = datetime.now().date().isoformat() c.execute("SELECT COUNT(*) FROM audit_logs WHERE timestamp LIKE ?", (f'{today}%',)) today_requests = c.fetchone()[0] # 平均处理时间(仅限成功请求) c.execute("SELECT AVG(processing_time) FROM audit_logs WHERE status='success' AND processing_time IS NOT NULL") avg_processing_time = c.fetchone()[0] or 0 conn.close() return jsonify({ 'success': True, 'stats': { 'total_requests': total_requests, 'today_requests': today_requests, 'status_distribution': status_stats, 'operation_distribution': operation_stats, 'average_processing_time_seconds': round(avg_processing_time, 2) } }) if __name__ == '__main__': # 初始化数据库 init_db() # 记录服务启动 logger.info("DeOldify service starting with audit logging enabled") app.run(host='0.0.0.0', port=7860, debug=True)3.2 日志查询工具
为了方便查看和分析日志,我们可以创建一个命令行工具:
#!/usr/bin/env python3 """ DeOldify审计日志查询工具 """ import sqlite3 import json import argparse from datetime import datetime, timedelta import sys def query_logs(db_path, operation=None, status=None, start_date=None, end_date=None, limit=50): """查询审计日志""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row c = conn.cursor() # 构建查询条件 conditions = [] params = [] if operation: conditions.append("operation = ?") params.append(operation) if status: conditions.append("status = ?") params.append(status) if start_date: conditions.append("timestamp >= ?") params.append(start_date) if end_date: conditions.append("timestamp <= ?") params.append(end_date) # 构建SQL where_clause = " AND ".join(conditions) if conditions else "1=1" query = f""" SELECT * FROM audit_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? """ params.append(limit) # 执行查询 c.execute(query, params) rows = c.fetchall() # 输出结果 print(f"\n{'='*80}") print(f"审计日志查询结果 (共 {len(rows)} 条记录)") print(f"{'='*80}\n") for row in rows: log = dict(row) print(f"时间: {log['timestamp']}") print(f"操作: {log['operation']} - {log['status']}") print(f"客户端: {log['client_ip']}") if log['file_name']: print(f"文件: {log['file_name']} ({log['file_size']} bytes)") if log['processing_time']: print(f"处理时间: {log['processing_time']:.2f}秒") if log['error_message']: print(f"错误: {log['error_message']}") print(f"{'-'*40}") conn.close() def show_stats(db_path, days=7): """显示统计信息""" conn = sqlite3.connect(db_path) c = conn.cursor() # 计算日期范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) print(f"\n{'='*80}") print(f"DeOldify服务统计 ({start_date.date()} 至 {end_date.date()})") print(f"{'='*80}\n") # 总请求数 c.execute(""" SELECT COUNT(*) FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? """, (start_date.isoformat(), end_date.isoformat())) total = c.fetchone()[0] print(f" 总请求数: {total}") # 按操作类型统计 print(f"\n 操作类型分布:") c.execute(""" SELECT operation, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY operation ORDER BY count DESC """, (start_date.isoformat(), end_date.isoformat())) for op, count in c.fetchall(): percentage = (count / total * 100) if total > 0 else 0 print(f" {op}: {count} 次 ({percentage:.1f}%)") # 按状态统计 print(f"\n 成功/失败统计:") c.execute(""" SELECT status, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY status ORDER BY count DESC """, (start_date.isoformat(), end_date.isoformat())) for status, count in c.fetchall(): percentage = (count / total * 100) if total > 0 else 0 print(f" {status}: {count} 次 ({percentage:.1f}%)") # 平均处理时间 c.execute(""" SELECT AVG(processing_time) FROM audit_logs WHERE status='success' AND processing_time IS NOT NULL AND timestamp >= ? AND timestamp <= ? """, (start_date.isoformat(), end_date.isoformat())) avg_time = c.fetchone()[0] or 0 print(f"\n⏱ 平均处理时间: {avg_time:.2f}秒") # 最活跃的客户端 print(f"\n 最活跃客户端 (Top 5):") c.execute(""" SELECT client_ip, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? AND client_ip != 'N/A' GROUP BY client_ip ORDER BY count DESC LIMIT 5 """, (start_date.isoformat(), end_date.isoformat())) for ip, count in c.fetchall(): print(f" {ip}: {count} 次请求") # 每日请求趋势 print(f"\n 每日请求趋势:") c.execute(""" SELECT DATE(timestamp) as day, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY DATE(timestamp) ORDER BY day """, (start_date.isoformat(), end_date.isoformat())) for day, count in c.fetchall(): print(f" {day}: {count} 次请求") conn.close() def export_logs(db_path, output_file, format='json'): """导出日志""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT * FROM audit_logs ORDER BY timestamp DESC") rows = c.fetchall() logs = [] for row in rows: log = dict(row) if log['additional_info']: log['additional_info'] = json.loads(log['additional_info']) logs.append(log) if format == 'json': with open(output_file, 'w', encoding='utf-8') as f: json.dump(logs, f, ensure_ascii=False, indent=2) print(f" 日志已导出到 {output_file} (JSON格式)") elif format == 'csv': import csv with open(output_file, 'w', newline='', encoding='utf-8') as f: if logs: writer = csv.DictWriter(f, fieldnames=logs[0].keys()) writer.writeheader() writer.writerows(logs) print(f" 日志已导出到 {output_file} (CSV格式)") conn.close() def main(): """主函数""" parser = argparse.ArgumentParser(description='DeOldify审计日志查询工具') parser.add_argument('--db', default='deoldify_audit.db', help='数据库文件路径 (默认: deoldify_audit.db)') subparsers = parser.add_subparsers(dest='command', help='子命令') # 查询命令 query_parser = subparsers.add_parser('query', help='查询日志') query_parser.add_argument('--operation', help='操作类型') query_parser.add_argument('--status', help='状态') query_parser.add_argument('--start-date', help='开始日期 (YYYY-MM-DD)') query_parser.add_argument('--end-date', help='结束日期 (YYYY-MM-DD)') query_parser.add_argument('--limit', type=int, default=50, help='返回条数') # 统计命令 stats_parser = subparsers.add_parser('stats', help='查看统计') stats_parser.add_argument('--days', type=int, default=7, help='统计天数 (默认: 7天)') # 导出命令 export_parser = subparsers.add_parser('export', help='导出日志') export_parser.add_argument('--output', required=True, help='输出文件') export_parser.add_argument('--format', choices=['json', 'csv'], default='json', help='导出格式') args = parser.parse_args() if args.command == 'query': query_logs( args.db, operation=args.operation, status=args.status, start_date=args.start_date, end_date=args.end_date, limit=args.limit ) elif args.command == 'stats': show_stats(args.db, days=args.days) elif args.command == 'export': export_logs(args.db, args.output, format=args.format) else: parser.print_help() if __name__ == '__main__': main()3.3 日志轮转配置
为了防止日志文件无限增长,我们需要配置日志轮转:
import logging from logging.handlers import RotatingFileHandler import os def setup_logging(): """配置日志系统""" # 创建日志目录 log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) # 审计日志 - 按大小轮转 audit_handler = RotatingFileHandler( filename=os.path.join(log_dir, 'audit.log'), maxBytes=10*1024*1024, # 10MB backupCount=10, # 保留10个备份 encoding='utf-8' ) audit_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) # 错误日志 - 按时间轮转 error_handler = RotatingFileHandler( filename=os.path.join(log_dir, 'error.log'), maxBytes=5*1024*1024, # 5MB backupCount=5, # 保留5个备份 encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' )) # 控制台输出 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) # 配置根日志 logging.basicConfig( level=logging.INFO, handlers=[audit_handler, error_handler, console_handler] )4. 审计日志的实际应用
4.1 监控面板
我们可以创建一个简单的Web监控面板来实时查看服务状态:
from flask import Flask, render_template import sqlite3 from datetime import datetime, timedelta import json app = Flask(__name__) @app.route('/dashboard') def dashboard(): """监控面板""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 获取今日数据 today = datetime.now().date().isoformat() # 今日请求数 c.execute(""" SELECT COUNT(*) FROM audit_logs WHERE timestamp LIKE ? """, (f'{today}%',)) today_requests = c.fetchone()[0] # 今日成功/失败数 c.execute(""" SELECT status, COUNT(*) FROM audit_logs WHERE timestamp LIKE ? GROUP BY status """, (f'{today}%',)) today_status = dict(c.fetchall()) # 最近1小时请求趋势 one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat() c.execute(""" SELECT strftime('%Y-%m-%d %H:%M', timestamp) as minute, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? GROUP BY strftime('%Y-%m-%d %H:%M', timestamp) ORDER BY minute """, (one_hour_ago,)) hourly_trend = [] for minute, count in c.fetchall(): hourly_trend.append({ 'time': minute, 'count': count }) # 最近10条日志 c.execute(""" SELECT timestamp, operation, status, client_ip, file_name FROM audit_logs ORDER BY timestamp DESC LIMIT 10 """) recent_logs = [] for row in c.fetchall(): recent_logs.append({ 'timestamp': row[0], 'operation': row[1], 'status': row[2], 'client_ip': row[3], 'file_name': row[4] or 'N/A' }) conn.close() return render_template('dashboard.html', today_requests=today_requests, today_status=today_status, hourly_trend=json.dumps(hourly_trend), recent_logs=recent_logs) if __name__ == '__main__': app.run(port=8080)对应的HTML模板:
<!DOCTYPE html> <html> <head> <title>DeOldify服务监控面板</title> <script src="https://cdn.jsdelivr.net/npm/chart.js"></script> <style> body { font-family: Arial, sans-serif; margin: 20px; } .dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; } .card { background: #f5f5f5; padding: 20px; border-radius: 8px; } .stats { display: grid; grid-template-columns: repeat(3, 1fr); gap: 10px; } .stat-item { text-align: center; padding: 10px; } .success { color: green; } .failed { color: red; } table { width: 100%; border-collapse: collapse; } th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; } </style> </head> <body> <h1> DeOldify服务监控面板</h1> <div class="dashboard"> <div class="card"> <h2> 今日概览</h2> <div class="stats"> <div class="stat-item"> <div class="stat-value">{{ today_requests }}</div> <div class="stat-label">总请求数</div> </div> <div class="stat-item success"> <div class="stat-value">{{ today_status.get('success', 0) }}</div> <div class="stat-label">成功</div> </div> <div class="stat-item failed"> <div class="stat-value">{{ today_status.get('failed', 0) }}</div> <div class="stat-label">失败</div> </div> </div> </div> <div class="card"> <h2> 最近1小时请求趋势</h2> <canvas id="trendChart" width="400" height="200"></canvas> </div> <div class="card" style="grid-column: span 2;"> <h2> 最近操作记录</h2> <table> <thead> <tr> <th>时间</th> <th>操作</th> <th>状态</th> <th>客户端IP</th> <th>文件</th> </tr> </thead> <tbody> {% for log in recent_logs %} <tr> <td>{{ log.timestamp }}</td> <td>{{ log.operation }}</td> <td class="{{ 'success' if log.status == 'success' else 'failed' }}"> {{ log.status }} </td> <td>{{ log.client_ip }}</td> <td>{{ log.file_name }}</td> </tr> {% endfor %} </tbody> </table> </div> </div> <script> // 渲染趋势图表 const trendData = {{ hourly_trend|safe }}; const ctx = document.getElementById('trendChart').getContext('2d'); new Chart(ctx, { type: 'line', data: { labels: trendData.map(item => item.time.split(' ')[1]), datasets: [{ label: '请求数', data: trendData.map(item => item.count), borderColor: 'rgb(75, 192, 192)', tension: 0.1 }] }, options: { responsive: true, plugins: { legend: { display: false } } } }); </script> </body> </html>4.2 自动化报告
我们可以设置定时任务,每天自动发送服务报告:
import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import sqlite3 from datetime import datetime, timedelta def generate_daily_report(): """生成每日报告""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 获取昨日日期 yesterday = (datetime.now() - timedelta(days=1)).date().isoformat() # 昨日统计 c.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) as success, SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) as failed, AVG(CASE WHEN status='success' THEN processing_time ELSE NULL END) as avg_time FROM audit_logs WHERE timestamp LIKE ? """, (f'{yesterday}%',)) total, success, failed, avg_time = c.fetchone() success_rate = (success / total * 100) if total > 0 else 0 # 最活跃客户端 c.execute(""" SELECT client_ip, COUNT(*) as count FROM audit_logs WHERE timestamp LIKE ? AND client_ip != 'N/A' GROUP BY client_ip ORDER BY count DESC LIMIT 3 """, (f'{yesterday}%',)) top_clients = c.fetchall() # 最常见的错误 c.execute(""" SELECT error_message, COUNT(*) as count FROM audit_logs WHERE timestamp LIKE ? AND status='failed' AND error_message IS NOT NULL GROUP BY error_message ORDER BY count DESC LIMIT 5 """, (f'{yesterday}%',)) common_errors = c.fetchall() conn.close() # 生成报告内容 report = f""" DeOldify服务日报 ({yesterday}) ======================================== 总体统计: --------- • 总请求数: {total} • 成功: {success} ({success_rate:.1f}%) • 失败: {failed} • 平均处理时间: {avg_time:.2f}秒 最活跃客户端 (Top 3): -------------------- """ for ip, count in top_clients: report += f"• {ip}: {count} 次请求\n" if common_errors: report += f""" 最常见的错误: ------------- """ for error, count in common_errors: report += f"• {error}: {count} 次\n" report += f""" ======================================== 报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ return report def send_email_report(receiver_email, report_content): """发送邮件报告""" # 邮件配置(需要根据实际情况修改) sender_email = "deoldify@example.com" password = "your_password" smtp_server = "smtp.example.com" smtp_port = 587 # 创建邮件 msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = receiver_email msg['Subject'] = f"DeOldify服务日报 - {datetime.now().date()}" # 添加正文 msg.attach(MIMEText(report_content, 'plain')) try: # 发送邮件 server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.login(sender_email, password) server.send_message(msg) server.quit() print(f" 日报已发送到 {receiver_email}") except Exception as e: print(f" 发送邮件失败: {e}") # 定时发送报告(可以使用cron或systemd timer) if __name__ == '__main__': report = generate_daily_report() print(report) # 发送给管理员 send_email_report("admin@example.com", report)5. 总结
通过为DeOldify服务添加审计日志系统,我们实现了:
5.1 核心功能回顾
- 完整的行为记录:记录了所有图片上传、处理、下载操作的时间戳和详细信息
- 双重存储机制:文件日志用于日常查看,数据库用于统计分析
- 灵活的查询工具:提供了命令行工具和Web接口来查询和分析日志
- 实时监控面板:可以实时查看服务状态和请求趋势
- 自动化报告:定期生成服务使用报告,帮助了解服务运行状况
5.2 实际价值
- 问题排查:当用户反馈处理失败时,可以通过请求ID快速定位问题
- 使用分析:了解服务的使用模式,优化资源分配
- 安全监控:及时发现异常访问模式
- 服务优化:根据处理时间统计,优化服务性能
- 合规审计:满足数据处理的审计要求
5.3 扩展建议
如果你需要更强大的审计功能,可以考虑:
- 集成专业日志系统:如ELK Stack(Elasticsearch, Logstash, Kibana)
- 添加用户认证:记录具体用户的操作,而不仅仅是IP地址
- 实现日志加密:对敏感信息进行加密存储
- 添加告警机制:当出现大量失败请求时自动告警
- 集成到现有监控系统:如Prometheus + Grafana
审计日志不是可有可无的装饰品,而是服务可靠运行的重要保障。对于像DeOldify这样的AI服务,完整的操作记录不仅能帮助你更好地管理服务,还能在出现问题时提供关键的排查线索。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。