From 6fe643638eb1f247e012c5bc7997075788a1edbc Mon Sep 17 00:00:00 2001 From: "mingsheng.li" Date: Thu, 31 Jul 2025 17:37:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B0=86=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=86=99=E5=85=A5CSV=E6=96=87=E4=BB=B6=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pipelines/dbf_to_postgres_ctllog.py | 59 +++++++++++++++++-------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/src/pipelines/dbf_to_postgres_ctllog.py b/src/pipelines/dbf_to_postgres_ctllog.py index 9c6e326..d2a7085 100644 --- a/src/pipelines/dbf_to_postgres_ctllog.py +++ b/src/pipelines/dbf_to_postgres_ctllog.py @@ -1,4 +1,5 @@ import os +import csv import pandas as pd from dbfread import DBF from itertools import islice @@ -13,11 +14,11 @@ class DbfToPostgresCtllogPipeline(BasePipeline): def __init__(self, config): super().__init__(config) # todo:本地调试打开 - # self.data_root = 'D:\disney_test' - # self.mapping_file = 'D:\disney_test\disney-mapping.xlsx' + self.data_root = 'D:\disney_test' + self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' # todo:本地调试打开 - self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') - self.mapping_file = os.getenv('MAPPING_FILE') + # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + # self.mapping_file = os.getenv('MAPPING_FILE') self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) @@ -46,7 +47,7 @@ class DbfToPostgresCtllogPipeline(BasePipeline): # 清理数据 mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', - 'control_group_controller_id', 'controller_point_id']] + 'control_group_controller_id', 'controller_point_id','control_group_name','control_group_id']] # 创建映射字典 mapping_dict = {} @@ -58,7 +59,9 @@ class DbfToPostgresCtllogPipeline(BasePipeline): mapping_dict[key].append({ 'seq_id': int(row['data_field_sequence_id']), 'control_group_controller_id': int(row['control_group_controller_id']), - 'point_id': int(row['controller_point_id']) + 'point_id': int(row['controller_point_id']), + 'control_group_name': row['control_group_name'], + 'control_group_id': int(row['control_group_id']) }) self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") @@ -73,10 +76,18 @@ class DbfToPostgresCtllogPipeline(BasePipeline): # 连接数据库 # todo:本地调试时打开 - db_config = self.config.get_database_config() - self.db = Database(**db_config) + # db_config = self.config.get_database_config() + # self.db = Database(**db_config) # todo:本地调试时打开 + self.csv_file_path = 'D:\disney_test\debug_controller_log.csv' + # 初始化CSV文件 + if not os.path.exists(self.csv_file_path): + with open(self.csv_file_path, 'w') as f: + csv.writer(f).writerow( + ['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name', + 'control_group_id']) + # 处理文件 total_processed = 0 for filename in os.listdir(self.dbf_dir): @@ -88,7 +99,7 @@ class DbfToPostgresCtllogPipeline(BasePipeline): # 关闭数据库连接 # todo:本地调试时打开 - self.db.disconnect() + # self.db.disconnect() # todo:本地调试时打开 return total_processed @@ -146,17 +157,24 @@ class DbfToPostgresCtllogPipeline(BasePipeline): formatted_time, mapping['control_group_controller_id'], mapping['point_id'], - float(value) + float(value), + mapping['control_group_name'], + mapping['control_group_id'] )) # 批量插入 if len(batch_data) >= batch_size: # todo:本地调试先注释掉 - self.db.execute_batch( - "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - batch_data - ) + # self.db.execute_batch( + # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", + # batch_data + # ) # todo:本地调试先注释掉 + + # 以追加模式写入 CSV + with open(self.csv_file_path, 'a', newline='') as f: + csv.writer(f).writerows(batch_data) + processed_records += len(batch_data) self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") batch_data = [] @@ -168,11 +186,16 @@ class DbfToPostgresCtllogPipeline(BasePipeline): # 插入剩余记录 if batch_data: # todo:本地调试先注释掉 - self.db.execute_batch( - "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - batch_data - ) + # self.db.execute_batch( + # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", + # batch_data + # ) # todo:本地调试先注释掉 + + # 以追加模式写入 CSV + with open(self.csv_file_path, 'a', newline='') as f: + csv.writer(f).writerows(batch_data) + processed_records += len(batch_data) self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")