importpandasaspdimportnumpyasnpfromsqlalchemyimportcreate_enginefromdatetimeimportdatetime,timedeltaclassDataQualityMonitor:def__init__(self,db_connection):self.engine=create_engine(db_connection)self.rules=[]defadd_rule(self,rule:QualityRule):self.rules.append(rule)defcheck_completeness(self,table_name,column_name):"""完整性检查:非空检查"""query=f""" SELECT COUNT(*) as total_rows, SUM(CASE WHEN{column_name}IS NULL THEN 1 ELSE 0 END) as null_count FROM{table_name}"""df=pd.read_sql(query,self.engine)completeness_rate=1-(df['null_count'][0]/df['total_rows'][0])return{"metric":"completeness","value":completeness_rate,"passed":completeness_rate>=0.95# 95%为非空}defcheck_accuracy(self,table_name,column_name,reference_table,reference_column):"""准确性检查:与参考数据对比"""query=f""" SELECT COUNT(DISTINCT t.{column_name}) as distinct_values, COUNT(DISTINCT r.{reference_column}) as reference_values, COUNT(DISTINCT CASE WHEN t.{column_name}= r.{reference_column}THEN t.{column_name}END) as matched_values FROM{table_name}t LEFT JOIN{reference_table}r ON t.id = r.id """df=pd.read_sql(query,self.engine)accuracy_rate=df['matched_values'][0]/df['distinct_values'][0]ifdf['distinct_values'][0]>0else0return{"metric":"accuracy","value":accuracy_rate,"passed":accuracy_rate>=0.98}defcheck_validity(self,table_name,column_name,valid_values=None,min_val=None,max_val=None):"""有效性检查:值域检查"""ifvalid_values:values_str=", ".join([f"'{v}'"forvinvalid_values])query=f""" SELECT COUNT(*) as total_rows, SUM(CASE WHEN{column_name}IN ({values_str}) THEN 1 ELSE 0 END) as valid_count FROM{table_name}"""elifmin_valisnotNoneandmax_valisnotNone:query=f""" SELECT COUNT(*) as total_rows, SUM(CASE WHEN{column_name}BETWEEN{min_val}AND{max_val}THEN 1 ELSE 0 END) as valid_count FROM{table_name}"""df=pd.read_sql(query,self.engine)validity_rate=df['valid_count'][0]/df['total_rows'][0]return{"metric":"validity","value":validity_rate,"passed":validity_rate>=0.99}defcheck_uniqueness(self,table_name,column_name):"""唯一性检查"""query=f""" SELECT COUNT(*) as total_rows, COUNT(DISTINCT{column_name}) as distinct_count FROM{table_name}"""df=pd.read_sql(query,self.engine)uniqueness_rate=df['distinct_count'][0]/df['total_rows'][0]return{"metric":"uniqueness","value":uniqueness_rate,"passed":uniqueness_rate==1.0}defcheck_timeliness(self,table_name,date_column,expected_freshness_hours=24):"""及时性检查:数据新鲜度"""query=f""" SELECT MAX({date_column}) as latest_date FROM{table_name}"""df=pd.read_sql(query,self.engine)latest_date=df['latest_date'][0]ifpd.isna(latest_date):return{"metric":"timeliness","value":0,"passed":False}freshness_hours=(datetime.now()-latest_date).total_seconds()/3600passed=freshness_hours<=expected_freshness_hoursreturn{"metric":"timeliness","value":freshness_hours,"passed":passed,"expected_hours":expected_freshness_hours}defrun_all_checks(self):"""执行所有质量检查"""results=[]forruleinself.rules:ifrule.rule_type==RuleType.COMPLETENESS:result=self.check_completeness(rule.table_name,rule.column_name)elifrule.rule_type==RuleType.VALIDITY:# 解析规则表达式if'in['inrule.rule_expression:valid_values=rule.rule_expression.split('in[')[1].rstrip(']').split(',')result=self.check_validity(rule.table_name,rule.column_name,valid_values=valid_values)elif'between'inrule.rule_expression:parts=rule.rule_expression.split('between')[1].split('and')min_val,max_val=float(parts[0]),float(parts[1])result=self.check_validity(rule.table_name,rule.column_name,min_val=min_val,max_val=max_val)elifrule.rule_type==RuleType.UNIQUENESS:result=self.check_uniqueness(rule.table_name,rule.column_name)elifrule.rule_type==RuleType.TIMELINESS:result=self.check_timeliness(rule.table_name,rule.column_name)else:continueresult.update({"rule_id":rule.rule_id,"table_name":rule.table_name,"column_name":rule.column_name,"check_time":datetime.now().isoformat(),"passed":result["passed"]})results.append(result)returnresultsdefgenerate_quality_report(self,results):"""生成质量报告"""df=pd.DataFrame(results)summary={"total_checks":len(df),"passed_checks":df['passed'].sum(),"failed_checks":len(df)-df['passed'].sum(),"overall_score":df['passed'].mean()*100,"failed_rules":df[~df['passed']][['rule_id','table_name','column_name','metric','value']].to_dict('records')}# 保存报告report={"report_date":datetime.now().isoformat(),"summary":summary,"detailed_results":results}returnreport