diff --git a/src/pipelines/dbf_to_postgres_ctllog_pwc.py b/src/pipelines/dbf_to_postgres_ctllog_pwc.py index 5957386..494a8d5 100644 --- a/src/pipelines/dbf_to_postgres_ctllog_pwc.py +++ b/src/pipelines/dbf_to_postgres_ctllog_pwc.py @@ -33,6 +33,7 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline): self.group_cache = {} self.batch_size = int(os.getenv('BATCH_SIZE', 1000)) self.batch_data = [] + self.seen = set() #用于去重 self.processed_records = 0 self.current_date = None # 当前处理的日期 @@ -158,12 +159,10 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline): created_str = created.strftime('%Y-%m-%d %H:%M:%S') + '+00' - self.batch_data.append(( - created_str, - mapping['control_group_controller_id'], - mapping['controller_point_id'], - real_value - )) + key = (created_str, mapping['control_group_controller_id'], mapping['controller_point_id'], real_value) + if key not in self.seen: + self.seen.add(key) + self.batch_data.append(key) if len(self.batch_data) >= self.batch_size: self.flush_batch() @@ -195,6 +194,7 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline): # 清空批次数据 self.batch_data = [] + self.seen.clear() def final_flush(self): # 刷新剩余的批次数据