news 2026/3/4 21:01:51

DeOldify服务审计日志:记录所有图片上传、处理、下载行为与时间戳

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DeOldify服务审计日志:记录所有图片上传、处理、下载行为与时间戳

DeOldify服务审计日志:记录所有图片上传、处理、下载行为与时间戳

1. 为什么需要审计日志?

想象一下,你搭建了一个DeOldify黑白图片上色服务,每天都有用户上传老照片进行处理。突然有一天,有人问:“昨天下午3点谁上传了那张全家福?处理结果发给谁了?”或者“我们这个月总共处理了多少张图片?平均处理时间是多少?”

如果你回答不上来,那就尴尬了。

这就是审计日志的价值所在。它就像服务的“行车记录仪”,完整记录每一次操作的来龙去脉。对于DeOldify这样的图像处理服务,审计日志能帮你:

  • 追踪操作历史:谁在什么时候上传了什么图片,处理结果给了谁
  • 分析使用情况:了解服务的使用频率、高峰时段、处理成功率
  • 排查问题:当处理失败时,能快速定位是图片问题、网络问题还是服务问题
  • 安全审计:监控异常访问,防止恶意使用
  • 计费依据:如果按使用量收费,日志就是最准确的计费凭证

今天,我就带你为DeOldify服务添加一套完整的审计日志系统,记录所有关键操作的时间戳和详细信息。

2. 审计日志系统设计

2.1 记录哪些信息?

一个好的审计日志应该包含“5W1H”要素:

  • Who:谁发起的请求(IP地址、用户ID)
  • What:做了什么操作(上传、处理、下载)
  • When:什么时候发生的(精确到毫秒的时间戳)
  • Where:从哪里来的请求(来源IP、User-Agent)
  • Why:操作结果如何(成功/失败、错误信息)
  • How:如何处理的(处理时长、输出格式)

对于DeOldify服务,我们需要记录:

  1. 图片上传:谁上传了什么图片,文件大小、格式、上传时间
  2. 处理请求:处理开始时间、使用的模型、参数设置
  3. 处理结果:处理结束时间、是否成功、处理耗时
  4. 结果下载:谁下载了处理结果、下载时间
  5. 服务状态:服务启动、停止、异常事件

2.2 日志存储方案

根据使用场景不同,可以选择不同的存储方案:

存储方式优点缺点适用场景
文件日志简单、无需额外依赖、易于查看查询复杂、性能一般小规模、单机部署
数据库查询灵活、支持复杂分析需要数据库服务、有性能开销中大规模、需要统计分析
日志系统专业、支持实时分析、扩展性好架构复杂、学习成本高大规模、生产环境

对于大多数DeOldify服务,文件日志+数据库的组合是比较实用的选择。日常查看用文件日志,统计分析用数据库。

3. 实现审计日志功能

3.1 修改DeOldify服务代码

首先,我们需要在现有的DeOldify服务中添加日志记录功能。这里以Flask应用为例:

import logging import json import time from datetime import datetime from flask import Flask, request, jsonify import sqlite3 import os app = Flask(__name__) # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('deoldify_audit.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 初始化数据库 def init_db(): conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 创建审计日志表 c.execute(''' CREATE TABLE IF NOT EXISTS audit_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, client_ip TEXT, user_agent TEXT, operation TEXT NOT NULL, file_name TEXT, file_size INTEGER, file_type TEXT, request_id TEXT, status TEXT NOT NULL, processing_time REAL, error_message TEXT, additional_info TEXT ) ''') # 创建索引以提高查询性能 c.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON audit_logs(timestamp)') c.execute('CREATE INDEX IF NOT EXISTS idx_operation ON audit_logs(operation)') c.execute('CREATE INDEX IF NOT EXISTS idx_status ON audit_logs(status)') conn.commit() conn.close() # 记录审计日志 def log_audit_event(operation, status, **kwargs): """记录审计日志到文件和数据库""" # 获取请求信息 client_ip = request.remote_addr if request else 'N/A' user_agent = request.headers.get('User-Agent', 'N/A') if request else 'N/A' # 构建日志记录 log_entry = { 'timestamp': datetime.now().isoformat(), 'client_ip': client_ip, 'user_agent': user_agent, 'operation': operation, 'status': status, **kwargs } # 记录到文件日志 logger.info(f"{operation} - {status} - IP: {client_ip} - {kwargs.get('file_name', 'N/A')}") # 记录到数据库 try: conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() c.execute(''' INSERT INTO audit_logs (timestamp, client_ip, user_agent, operation, file_name, file_size, file_type, request_id, status, processing_time, error_message, additional_info) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( log_entry['timestamp'], log_entry['client_ip'], log_entry['user_agent'], log_entry['operation'], log_entry.get('file_name'), log_entry.get('file_size'), log_entry.get('file_type'), log_entry.get('request_id'), log_entry['status'], log_entry.get('processing_time'), log_entry.get('error_message'), json.dumps(log_entry.get('additional_info', {})) )) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to save audit log to database: {e}") # 装饰器:自动记录处理时间 def audit_log(operation_name): def decorator(func): def wrapper(*args, **kwargs): start_time = time.time() request_id = f"req_{int(start_time * 1000)}_{os.urandom(4).hex()}" try: # 记录开始日志 log_audit_event( operation=operation_name, status='started', request_id=request_id, additional_info={'args': str(args), 'kwargs': str(kwargs)} ) # 执行原函数 result = func(*args, **kwargs) # 记录成功日志 processing_time = time.time() - start_time log_audit_event( operation=operation_name, status='success', request_id=request_id, processing_time=processing_time ) return result except Exception as e: # 记录失败日志 processing_time = time.time() - start_time log_audit_event( operation=operation_name, status='failed', request_id=request_id, processing_time=processing_time, error_message=str(e) ) raise e return wrapper return decorator # DeOldify处理端点(添加审计日志) @app.route('/colorize', methods=['POST']) @audit_log('image_upload_and_process') def colorize_image(): """处理图片上色请求""" # 检查是否有文件 if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image file provided'}), 400 file = request.files['image'] # 记录文件信息 file_info = { 'file_name': file.filename, 'file_size': len(file.read()), 'file_type': file.content_type } # 重置文件指针 file.seek(0) # 这里添加额外的文件信息到日志上下文 # 在实际实现中,可能需要修改装饰器来传递这些信息 # 模拟处理过程 time.sleep(2) # 模拟处理时间 # 返回处理结果 return jsonify({ 'success': True, 'message': 'Image processed successfully', 'request_id': '模拟请求ID' }) # 下载端点(添加审计日志) @app.route('/download/<request_id>', methods=['GET']) @audit_log('result_download') def download_result(request_id): """下载处理结果""" # 模拟下载过程 time.sleep(0.5) return jsonify({ 'success': True, 'message': f'Downloaded result for request {request_id}' }) # 健康检查端点 @app.route('/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ 'status': 'healthy', 'service': 'deoldify-colorization', 'timestamp': datetime.now().isoformat() }) # 审计日志查询端点 @app.route('/audit/logs', methods=['GET']) def get_audit_logs(): """查询审计日志""" # 获取查询参数 operation = request.args.get('operation') status = request.args.get('status') start_time = request.args.get('start_time') end_time = request.args.get('end_time') limit = int(request.args.get('limit', 100)) # 构建查询 query = "SELECT * FROM audit_logs WHERE 1=1" params = [] if operation: query += " AND operation = ?" params.append(operation) if status: query += " AND status = ?" params.append(status) if start_time: query += " AND timestamp >= ?" params.append(start_time) if end_time: query += " AND timestamp <= ?" params.append(end_time) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) # 执行查询 conn = sqlite3.connect('deoldify_audit.db') conn.row_factory = sqlite3.Row # 返回字典格式 c = conn.cursor() c.execute(query, params) rows = c.fetchall() # 转换为字典列表 logs = [] for row in rows: log_dict = dict(row) if log_dict['additional_info']: log_dict['additional_info'] = json.loads(log_dict['additional_info']) logs.append(log_dict) conn.close() return jsonify({ 'success': True, 'count': len(logs), 'logs': logs }) # 服务统计端点 @app.route('/audit/stats', methods=['GET']) def get_audit_stats(): """获取服务统计信息""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 总请求数 c.execute("SELECT COUNT(*) FROM audit_logs") total_requests = c.fetchone()[0] # 成功/失败统计 c.execute("SELECT status, COUNT(*) FROM audit_logs GROUP BY status") status_stats = dict(c.fetchall()) # 操作类型统计 c.execute("SELECT operation, COUNT(*) FROM audit_logs GROUP BY operation") operation_stats = dict(c.fetchall()) # 今日请求数 today = datetime.now().date().isoformat() c.execute("SELECT COUNT(*) FROM audit_logs WHERE timestamp LIKE ?", (f'{today}%',)) today_requests = c.fetchone()[0] # 平均处理时间(仅限成功请求) c.execute("SELECT AVG(processing_time) FROM audit_logs WHERE status='success' AND processing_time IS NOT NULL") avg_processing_time = c.fetchone()[0] or 0 conn.close() return jsonify({ 'success': True, 'stats': { 'total_requests': total_requests, 'today_requests': today_requests, 'status_distribution': status_stats, 'operation_distribution': operation_stats, 'average_processing_time_seconds': round(avg_processing_time, 2) } }) if __name__ == '__main__': # 初始化数据库 init_db() # 记录服务启动 logger.info("DeOldify service starting with audit logging enabled") app.run(host='0.0.0.0', port=7860, debug=True)

3.2 日志查询工具

为了方便查看和分析日志,我们可以创建一个命令行工具:

#!/usr/bin/env python3 """ DeOldify审计日志查询工具 """ import sqlite3 import json import argparse from datetime import datetime, timedelta import sys def query_logs(db_path, operation=None, status=None, start_date=None, end_date=None, limit=50): """查询审计日志""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row c = conn.cursor() # 构建查询条件 conditions = [] params = [] if operation: conditions.append("operation = ?") params.append(operation) if status: conditions.append("status = ?") params.append(status) if start_date: conditions.append("timestamp >= ?") params.append(start_date) if end_date: conditions.append("timestamp <= ?") params.append(end_date) # 构建SQL where_clause = " AND ".join(conditions) if conditions else "1=1" query = f""" SELECT * FROM audit_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? """ params.append(limit) # 执行查询 c.execute(query, params) rows = c.fetchall() # 输出结果 print(f"\n{'='*80}") print(f"审计日志查询结果 (共 {len(rows)} 条记录)") print(f"{'='*80}\n") for row in rows: log = dict(row) print(f"时间: {log['timestamp']}") print(f"操作: {log['operation']} - {log['status']}") print(f"客户端: {log['client_ip']}") if log['file_name']: print(f"文件: {log['file_name']} ({log['file_size']} bytes)") if log['processing_time']: print(f"处理时间: {log['processing_time']:.2f}秒") if log['error_message']: print(f"错误: {log['error_message']}") print(f"{'-'*40}") conn.close() def show_stats(db_path, days=7): """显示统计信息""" conn = sqlite3.connect(db_path) c = conn.cursor() # 计算日期范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) print(f"\n{'='*80}") print(f"DeOldify服务统计 ({start_date.date()} 至 {end_date.date()})") print(f"{'='*80}\n") # 总请求数 c.execute(""" SELECT COUNT(*) FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? """, (start_date.isoformat(), end_date.isoformat())) total = c.fetchone()[0] print(f" 总请求数: {total}") # 按操作类型统计 print(f"\n 操作类型分布:") c.execute(""" SELECT operation, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY operation ORDER BY count DESC """, (start_date.isoformat(), end_date.isoformat())) for op, count in c.fetchall(): percentage = (count / total * 100) if total > 0 else 0 print(f" {op}: {count} 次 ({percentage:.1f}%)") # 按状态统计 print(f"\n 成功/失败统计:") c.execute(""" SELECT status, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY status ORDER BY count DESC """, (start_date.isoformat(), end_date.isoformat())) for status, count in c.fetchall(): percentage = (count / total * 100) if total > 0 else 0 print(f" {status}: {count} 次 ({percentage:.1f}%)") # 平均处理时间 c.execute(""" SELECT AVG(processing_time) FROM audit_logs WHERE status='success' AND processing_time IS NOT NULL AND timestamp >= ? AND timestamp <= ? """, (start_date.isoformat(), end_date.isoformat())) avg_time = c.fetchone()[0] or 0 print(f"\n⏱ 平均处理时间: {avg_time:.2f}秒") # 最活跃的客户端 print(f"\n 最活跃客户端 (Top 5):") c.execute(""" SELECT client_ip, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? AND client_ip != 'N/A' GROUP BY client_ip ORDER BY count DESC LIMIT 5 """, (start_date.isoformat(), end_date.isoformat())) for ip, count in c.fetchall(): print(f" {ip}: {count} 次请求") # 每日请求趋势 print(f"\n 每日请求趋势:") c.execute(""" SELECT DATE(timestamp) as day, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? AND timestamp <= ? GROUP BY DATE(timestamp) ORDER BY day """, (start_date.isoformat(), end_date.isoformat())) for day, count in c.fetchall(): print(f" {day}: {count} 次请求") conn.close() def export_logs(db_path, output_file, format='json'): """导出日志""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT * FROM audit_logs ORDER BY timestamp DESC") rows = c.fetchall() logs = [] for row in rows: log = dict(row) if log['additional_info']: log['additional_info'] = json.loads(log['additional_info']) logs.append(log) if format == 'json': with open(output_file, 'w', encoding='utf-8') as f: json.dump(logs, f, ensure_ascii=False, indent=2) print(f" 日志已导出到 {output_file} (JSON格式)") elif format == 'csv': import csv with open(output_file, 'w', newline='', encoding='utf-8') as f: if logs: writer = csv.DictWriter(f, fieldnames=logs[0].keys()) writer.writeheader() writer.writerows(logs) print(f" 日志已导出到 {output_file} (CSV格式)") conn.close() def main(): """主函数""" parser = argparse.ArgumentParser(description='DeOldify审计日志查询工具') parser.add_argument('--db', default='deoldify_audit.db', help='数据库文件路径 (默认: deoldify_audit.db)') subparsers = parser.add_subparsers(dest='command', help='子命令') # 查询命令 query_parser = subparsers.add_parser('query', help='查询日志') query_parser.add_argument('--operation', help='操作类型') query_parser.add_argument('--status', help='状态') query_parser.add_argument('--start-date', help='开始日期 (YYYY-MM-DD)') query_parser.add_argument('--end-date', help='结束日期 (YYYY-MM-DD)') query_parser.add_argument('--limit', type=int, default=50, help='返回条数') # 统计命令 stats_parser = subparsers.add_parser('stats', help='查看统计') stats_parser.add_argument('--days', type=int, default=7, help='统计天数 (默认: 7天)') # 导出命令 export_parser = subparsers.add_parser('export', help='导出日志') export_parser.add_argument('--output', required=True, help='输出文件') export_parser.add_argument('--format', choices=['json', 'csv'], default='json', help='导出格式') args = parser.parse_args() if args.command == 'query': query_logs( args.db, operation=args.operation, status=args.status, start_date=args.start_date, end_date=args.end_date, limit=args.limit ) elif args.command == 'stats': show_stats(args.db, days=args.days) elif args.command == 'export': export_logs(args.db, args.output, format=args.format) else: parser.print_help() if __name__ == '__main__': main()

3.3 日志轮转配置

为了防止日志文件无限增长,我们需要配置日志轮转:

import logging from logging.handlers import RotatingFileHandler import os def setup_logging(): """配置日志系统""" # 创建日志目录 log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) # 审计日志 - 按大小轮转 audit_handler = RotatingFileHandler( filename=os.path.join(log_dir, 'audit.log'), maxBytes=10*1024*1024, # 10MB backupCount=10, # 保留10个备份 encoding='utf-8' ) audit_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) # 错误日志 - 按时间轮转 error_handler = RotatingFileHandler( filename=os.path.join(log_dir, 'error.log'), maxBytes=5*1024*1024, # 5MB backupCount=5, # 保留5个备份 encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' )) # 控制台输出 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) # 配置根日志 logging.basicConfig( level=logging.INFO, handlers=[audit_handler, error_handler, console_handler] )

4. 审计日志的实际应用

4.1 监控面板

我们可以创建一个简单的Web监控面板来实时查看服务状态:

from flask import Flask, render_template import sqlite3 from datetime import datetime, timedelta import json app = Flask(__name__) @app.route('/dashboard') def dashboard(): """监控面板""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 获取今日数据 today = datetime.now().date().isoformat() # 今日请求数 c.execute(""" SELECT COUNT(*) FROM audit_logs WHERE timestamp LIKE ? """, (f'{today}%',)) today_requests = c.fetchone()[0] # 今日成功/失败数 c.execute(""" SELECT status, COUNT(*) FROM audit_logs WHERE timestamp LIKE ? GROUP BY status """, (f'{today}%',)) today_status = dict(c.fetchall()) # 最近1小时请求趋势 one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat() c.execute(""" SELECT strftime('%Y-%m-%d %H:%M', timestamp) as minute, COUNT(*) as count FROM audit_logs WHERE timestamp >= ? GROUP BY strftime('%Y-%m-%d %H:%M', timestamp) ORDER BY minute """, (one_hour_ago,)) hourly_trend = [] for minute, count in c.fetchall(): hourly_trend.append({ 'time': minute, 'count': count }) # 最近10条日志 c.execute(""" SELECT timestamp, operation, status, client_ip, file_name FROM audit_logs ORDER BY timestamp DESC LIMIT 10 """) recent_logs = [] for row in c.fetchall(): recent_logs.append({ 'timestamp': row[0], 'operation': row[1], 'status': row[2], 'client_ip': row[3], 'file_name': row[4] or 'N/A' }) conn.close() return render_template('dashboard.html', today_requests=today_requests, today_status=today_status, hourly_trend=json.dumps(hourly_trend), recent_logs=recent_logs) if __name__ == '__main__': app.run(port=8080)

对应的HTML模板:

<!DOCTYPE html> <html> <head> <title>DeOldify服务监控面板</title> <script src="https://cdn.jsdelivr.net/npm/chart.js"></script> <style> body { font-family: Arial, sans-serif; margin: 20px; } .dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; } .card { background: #f5f5f5; padding: 20px; border-radius: 8px; } .stats { display: grid; grid-template-columns: repeat(3, 1fr); gap: 10px; } .stat-item { text-align: center; padding: 10px; } .success { color: green; } .failed { color: red; } table { width: 100%; border-collapse: collapse; } th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; } </style> </head> <body> <h1> DeOldify服务监控面板</h1> <div class="dashboard"> <div class="card"> <h2> 今日概览</h2> <div class="stats"> <div class="stat-item"> <div class="stat-value">{{ today_requests }}</div> <div class="stat-label">总请求数</div> </div> <div class="stat-item success"> <div class="stat-value">{{ today_status.get('success', 0) }}</div> <div class="stat-label">成功</div> </div> <div class="stat-item failed"> <div class="stat-value">{{ today_status.get('failed', 0) }}</div> <div class="stat-label">失败</div> </div> </div> </div> <div class="card"> <h2> 最近1小时请求趋势</h2> <canvas id="trendChart" width="400" height="200"></canvas> </div> <div class="card" style="grid-column: span 2;"> <h2> 最近操作记录</h2> <table> <thead> <tr> <th>时间</th> <th>操作</th> <th>状态</th> <th>客户端IP</th> <th>文件</th> </tr> </thead> <tbody> {% for log in recent_logs %} <tr> <td>{{ log.timestamp }}</td> <td>{{ log.operation }}</td> <td class="{{ 'success' if log.status == 'success' else 'failed' }}"> {{ log.status }} </td> <td>{{ log.client_ip }}</td> <td>{{ log.file_name }}</td> </tr> {% endfor %} </tbody> </table> </div> </div> <script> // 渲染趋势图表 const trendData = {{ hourly_trend|safe }}; const ctx = document.getElementById('trendChart').getContext('2d'); new Chart(ctx, { type: 'line', data: { labels: trendData.map(item => item.time.split(' ')[1]), datasets: [{ label: '请求数', data: trendData.map(item => item.count), borderColor: 'rgb(75, 192, 192)', tension: 0.1 }] }, options: { responsive: true, plugins: { legend: { display: false } } } }); </script> </body> </html>

4.2 自动化报告

我们可以设置定时任务,每天自动发送服务报告:

import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import sqlite3 from datetime import datetime, timedelta def generate_daily_report(): """生成每日报告""" conn = sqlite3.connect('deoldify_audit.db') c = conn.cursor() # 获取昨日日期 yesterday = (datetime.now() - timedelta(days=1)).date().isoformat() # 昨日统计 c.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) as success, SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) as failed, AVG(CASE WHEN status='success' THEN processing_time ELSE NULL END) as avg_time FROM audit_logs WHERE timestamp LIKE ? """, (f'{yesterday}%',)) total, success, failed, avg_time = c.fetchone() success_rate = (success / total * 100) if total > 0 else 0 # 最活跃客户端 c.execute(""" SELECT client_ip, COUNT(*) as count FROM audit_logs WHERE timestamp LIKE ? AND client_ip != 'N/A' GROUP BY client_ip ORDER BY count DESC LIMIT 3 """, (f'{yesterday}%',)) top_clients = c.fetchall() # 最常见的错误 c.execute(""" SELECT error_message, COUNT(*) as count FROM audit_logs WHERE timestamp LIKE ? AND status='failed' AND error_message IS NOT NULL GROUP BY error_message ORDER BY count DESC LIMIT 5 """, (f'{yesterday}%',)) common_errors = c.fetchall() conn.close() # 生成报告内容 report = f""" DeOldify服务日报 ({yesterday}) ======================================== 总体统计: --------- • 总请求数: {total} • 成功: {success} ({success_rate:.1f}%) • 失败: {failed} • 平均处理时间: {avg_time:.2f}秒 最活跃客户端 (Top 3): -------------------- """ for ip, count in top_clients: report += f"• {ip}: {count} 次请求\n" if common_errors: report += f""" 最常见的错误: ------------- """ for error, count in common_errors: report += f"• {error}: {count} 次\n" report += f""" ======================================== 报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ return report def send_email_report(receiver_email, report_content): """发送邮件报告""" # 邮件配置(需要根据实际情况修改) sender_email = "deoldify@example.com" password = "your_password" smtp_server = "smtp.example.com" smtp_port = 587 # 创建邮件 msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = receiver_email msg['Subject'] = f"DeOldify服务日报 - {datetime.now().date()}" # 添加正文 msg.attach(MIMEText(report_content, 'plain')) try: # 发送邮件 server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.login(sender_email, password) server.send_message(msg) server.quit() print(f" 日报已发送到 {receiver_email}") except Exception as e: print(f" 发送邮件失败: {e}") # 定时发送报告(可以使用cron或systemd timer) if __name__ == '__main__': report = generate_daily_report() print(report) # 发送给管理员 send_email_report("admin@example.com", report)

5. 总结

通过为DeOldify服务添加审计日志系统,我们实现了:

5.1 核心功能回顾

  1. 完整的行为记录:记录了所有图片上传、处理、下载操作的时间戳和详细信息
  2. 双重存储机制:文件日志用于日常查看,数据库用于统计分析
  3. 灵活的查询工具:提供了命令行工具和Web接口来查询和分析日志
  4. 实时监控面板:可以实时查看服务状态和请求趋势
  5. 自动化报告:定期生成服务使用报告,帮助了解服务运行状况

5.2 实际价值

  • 问题排查:当用户反馈处理失败时,可以通过请求ID快速定位问题
  • 使用分析:了解服务的使用模式,优化资源分配
  • 安全监控:及时发现异常访问模式
  • 服务优化:根据处理时间统计,优化服务性能
  • 合规审计:满足数据处理的审计要求

5.3 扩展建议

如果你需要更强大的审计功能,可以考虑:

  1. 集成专业日志系统:如ELK Stack(Elasticsearch, Logstash, Kibana)
  2. 添加用户认证:记录具体用户的操作,而不仅仅是IP地址
  3. 实现日志加密:对敏感信息进行加密存储
  4. 添加告警机制:当出现大量失败请求时自动告警
  5. 集成到现有监控系统:如Prometheus + Grafana

审计日志不是可有可无的装饰品,而是服务可靠运行的重要保障。对于像DeOldify这样的AI服务,完整的操作记录不仅能帮助你更好地管理服务,还能在出现问题时提供关键的排查线索。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/4 2:16:17

internlm2-chat-1.8b在科研辅助场景:论文润色+英文摘要生成+查重建议

internlm2-chat-1.8b在科研辅助场景&#xff1a;论文润色英文摘要生成查重建议 如果你是一名研究生、科研工作者&#xff0c;或者正在为毕业论文发愁的学生&#xff0c;这篇文章就是为你准备的。写论文最头疼的是什么&#xff1f;是反复修改的语法错误&#xff0c;是憋不出一个…

作者头像 李华
网站建设 2026/3/4 3:02:02

Hunyuan-MT-7B参数详解:vLLM中--gpu-memory-utilization对多并发影响实测

Hunyuan-MT-7B参数详解&#xff1a;vLLM中--gpu-memory-utilization对多并发影响实测 你刚用vLLM部署好Hunyuan-MT-7B翻译大模型&#xff0c;前端用Chainlit搭了个漂亮的界面&#xff0c;准备大干一场。结果&#xff0c;当几个用户同时来翻译时&#xff0c;系统要么卡顿&#…

作者头像 李华
网站建设 2026/3/4 2:51:59

DeOldify多模型协同:与Real-ESRGAN超分模型串联提升最终画质

DeOldify多模型协同&#xff1a;与Real-ESRGAN超分模型串联提升最终画质 1. 引言&#xff1a;当上色遇上超分&#xff0c;老照片焕发新生 你有没有翻出过家里的老相册&#xff1f;那些泛黄的黑白照片&#xff0c;承载着珍贵的记忆&#xff0c;但模糊的细节和单调的色彩&#…

作者头像 李华
网站建设 2026/3/4 3:45:39

HY-Motion 1.0详细步骤:Gradio界面各控件功能与参数调节逻辑

HY-Motion 1.0详细步骤&#xff1a;Gradio界面各控件功能与参数调节逻辑 1. 为什么你需要真正看懂这个Gradio界面 很多人第一次打开 http://localhost:7860/&#xff0c;看到一堆滑块、下拉框和输入框&#xff0c;第一反应是——“这都啥&#xff1f;点哪个才出动作&#xff…

作者头像 李华
网站建设 2026/3/4 2:04:39

Python零基础入门:使用TranslateGemma构建第一个翻译应用

Python零基础入门&#xff1a;使用TranslateGemma构建第一个翻译应用 1. 从零开始的翻译工具&#xff1a;为什么选TranslateGemma 你有没有过这样的经历&#xff1f;看到一段外文资料&#xff0c;想快速理解却要反复切换网页、复制粘贴到在线翻译工具里&#xff0c;还要手动调…

作者头像 李华
网站建设 2026/3/4 0:50:12

QwQ-32B在QT跨平台开发中的应用

QwQ-32B在QT跨平台开发中的应用 1. 当QT开发遇上智能推理&#xff1a;为什么需要QwQ-32B QT开发一直以跨平台能力著称&#xff0c;但实际工作中&#xff0c;开发者常常陷入重复劳动的泥潭——写UI布局要反复调整像素、处理不同操作系统的兼容性问题像在解谜、为每个平台单独测…

作者头像 李华