深度解析安全监控与响应:Python 实现 SIEM 日志分析实战
1. 技术分析
1.1 安全监控概述
安全监控是实时检测和响应安全事件的实践:
监控类型 日志监控: 分析日志数据 网络监控: 监控网络流量 主机监控: 监控主机活动 用户行为监控: 监控用户行为 监控目的: 实时检测 异常识别 威胁预警 快速响应1.2 安全响应
响应流程 检测阶段: 发现异常 分析阶段: 评估威胁 响应阶段: 采取行动 恢复阶段: 恢复正常 响应类型: 主动响应: 自动阻断 被动响应: 人工干预 预防性响应: 提前防护1.3 SIEM系统
SIEM功能 日志收集: 聚合日志 威胁检测: 识别威胁 安全分析: 分析事件 报告生成: 生成报告 SIEM特性: 实时监控 关联分析 威胁情报 合规报告2. 核心功能实现
2.1 安全监控系统
import time import threading class SecurityMonitor: def __init__(self): self.rules = [] self.alerts = [] self.is_running = False def add_rule(self, rule): self.rules.append(rule) def start_monitoring(self): self.is_running = True self._monitor_loop() def stop_monitoring(self): self.is_running = False def _monitor_loop(self): while self.is_running: for rule in self.rules: if rule.check(): alert = { 'timestamp': time.time(), 'rule_name': rule.name, 'severity': rule.severity, 'message': rule.message } self.alerts.append(alert) self._handle_alert(alert) time.sleep(1) def _handle_alert(self, alert): print(f"ALERT [{alert['severity'].upper()}]: {alert['message']}") def get_alerts(self, severity=None): if severity: return [a for a in self.alerts if a['severity'] == severity] return self.alerts def generate_summary(self): summary = { 'total_alerts': len(self.alerts), 'by_severity': { 'critical': 0, 'high': 0, 'medium': 0, 'low': 0 } } for alert in self.alerts: sev = alert['severity'] if sev in summary['by_severity']: summary['by_severity'][sev] += 1 return summary2.2 规则引擎
class SecurityRule: def __init__(self, name, condition, message, severity='medium'): self.name = name self.condition = condition self.message = message self.severity = severity def check(self): return self.condition() class RuleEngine: def __init__(self): self.rules = [] def add_rule(self, rule): self.rules.append(rule) def evaluate(self, event): matches = [] for rule in self.rules: if rule.condition(event): matches.append(rule) return matches def evaluate_all(self, events): results = [] for event in events: matching_rules = self.evaluate(event) if matching_rules: results.append({ 'event': event, 'matches': matching_rules }) return results2.3 威胁情报系统
class ThreatIntelligenceSystem: def __init__(self): self.threats = {} def add_threat(self, threat_id, info): self.threats[threat_id] = info def get_threat(self, threat_id): return self.threats.get(threat_id) def search_threats(self, **filters): results = [] for threat_id, info in self.threats.items(): match = True for key, value in filters.items(): if info.get(key) != value: match = False break if match: results.append(info) return results def check_ip(self, ip): malicious_ips = ['192.168.1.100', '10.0.0.5'] if ip in malicious_ips: return { 'threat': True, 'category': 'malicious_ip', 'confidence': 'high' } return { 'threat': False, 'category': None, 'confidence': None } def check_domain(self, domain): malicious_domains = ['malicious.com', 'phishing.net'] if domain in malicious_domains: return { 'threat': True, 'category': 'malicious_domain', 'confidence': 'high' } return { 'threat': False, 'category': None, 'confidence': None }2.4 安全响应自动化
class SecurityResponseAutomation: def __init__(self): self.actions = {} def register_action(self, name, action): self.actions[name] = action def execute_action(self, action_name, **params): if action_name in self.actions: return self.actions[action_name](**params) raise ValueError(f"Unknown action: {action_name}") def execute_response(self, alert): severity = alert['severity'] if severity == 'critical': self.execute_action('block_ip', ip=alert.get('source_ip')) self.execute_action('notify_admin') elif severity == 'high': self.execute_action('quarantine') self.execute_action('log_event', event=alert) else: self.execute_action('log_event', event=alert) def block_ip(self, ip): print(f"Blocking IP: {ip}") return True def quarantine(self): print("Quarantining affected system") return True def notify_admin(self): print("Notifying administrator") return True def log_event(self, event): print(f"Logging event: {event}") return True3. 性能对比
3.1 监控类型对比
| 类型 | 覆盖范围 | 实时性 | 复杂度 |
|---|---|---|---|
| 日志监控 | 高 | 中 | 低 |
| 网络监控 | 中 | 高 | 中 |
| 主机监控 | 低 | 高 | 中 |
3.2 SIEM系统对比
| 系统 | 功能 | 价格 | 易用性 |
|---|---|---|---|
| Splunk | 全面 | 高 | 中 |
| ELK Stack | 开源 | 低 | 中 |
| IBM QRadar | 企业级 | 高 | 低 |
3.3 响应类型对比
| 类型 | 速度 | 准确性 | 风险 |
|---|---|---|---|
| 自动响应 | 快 | 中 | 中 |
| 人工响应 | 慢 | 高 | 低 |
4. 最佳实践
4.1 监控配置示例
def configure_monitor(): monitor = SecurityMonitor() failed_login_count = [0] def check_failed_logins(): failed_login_count[0] += 1 return failed_login_count[0] >= 5 rule = SecurityRule( name='BruteForceDetection', condition=check_failed_logins, message='Multiple failed login attempts detected', severity='high' ) monitor.add_rule(rule) monitor.start_monitoring() time.sleep(3) summary = monitor.generate_summary() print(f"Monitor summary: {summary}") monitor.stop_monitoring()4.2 威胁情报示例
def threat_intelligence_example(): tis = ThreatIntelligenceSystem() tis.add_threat('CVE-2021-34527', { 'name': 'Print Spooler Vulnerability', 'severity': 'critical', 'description': 'Remote code execution vulnerability' }) ip_check = tis.check_ip('192.168.1.100') print(f"IP check result: {ip_check}") domain_check = tis.check_domain('malicious.com') print(f"Domain check result: {domain_check}")5. 总结
安全监控与响应是实时保护系统安全的关键:
- 安全监控:实时检测异常
- 规则引擎:定义检测规则
- 威胁情报:获取威胁信息
- 响应自动化:自动响应安全事件
对比数据如下:
- 网络监控实时性最高
- Splunk功能最全面
- 自动响应速度最快
- 推荐多层次监控体系
安全监控与响应需要结合人工和自动化,建立快速响应机制。