增加去重机制
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
mingsheng.li 2025-08-05 16:25:48 +08:00
parent c9e46a902f
commit d4d412a7d2

View File

@ -33,6 +33,7 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline):
self.group_cache = {}
self.batch_size = int(os.getenv('BATCH_SIZE', 1000))
self.batch_data = []
self.seen = set() #用于去重
self.processed_records = 0
self.current_date = None # 当前处理的日期
@ -158,12 +159,10 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline):
created_str = created.strftime('%Y-%m-%d %H:%M:%S') + '+00'
self.batch_data.append((
created_str,
mapping['control_group_controller_id'],
mapping['controller_point_id'],
real_value
))
key = (created_str, mapping['control_group_controller_id'], mapping['controller_point_id'], real_value)
if key not in self.seen:
self.seen.add(key)
self.batch_data.append(key)
if len(self.batch_data) >= self.batch_size:
self.flush_batch()
@ -195,6 +194,7 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline):
# 清空批次数据
self.batch_data = []
self.seen.clear()
def final_flush(self):
# 刷新剩余的批次数据