增加将数据写入CSV文件的逻辑

This commit is contained in:
mingsheng.li 2025-07-31 17:37:21 +08:00
parent 7fcc377a27
commit 6fe643638e

View File

@ -1,4 +1,5 @@
import os import os
import csv
import pandas as pd import pandas as pd
from dbfread import DBF from dbfread import DBF
from itertools import islice from itertools import islice
@ -13,11 +14,11 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
# todo本地调试打开 # todo本地调试打开
# self.data_root = 'D:\disney_test' self.data_root = 'D:\disney_test'
# self.mapping_file = 'D:\disney_test\disney-mapping.xlsx' self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx'
# todo本地调试打开 # todo本地调试打开
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
self.mapping_file = os.getenv('MAPPING_FILE') # self.mapping_file = os.getenv('MAPPING_FILE')
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
@ -46,7 +47,7 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
# 清理数据 # 清理数据
mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id'])
mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id',
'control_group_controller_id', 'controller_point_id']] 'control_group_controller_id', 'controller_point_id','control_group_name','control_group_id']]
# 创建映射字典 # 创建映射字典
mapping_dict = {} mapping_dict = {}
@ -58,7 +59,9 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
mapping_dict[key].append({ mapping_dict[key].append({
'seq_id': int(row['data_field_sequence_id']), 'seq_id': int(row['data_field_sequence_id']),
'control_group_controller_id': int(row['control_group_controller_id']), 'control_group_controller_id': int(row['control_group_controller_id']),
'point_id': int(row['controller_point_id']) 'point_id': int(row['controller_point_id']),
'control_group_name': row['control_group_name'],
'control_group_id': int(row['control_group_id'])
}) })
self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") self.logger.info(f"Loaded {len(mapping_dict)} mapping entries")
@ -73,10 +76,18 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
# 连接数据库 # 连接数据库
# todo本地调试时打开 # todo本地调试时打开
db_config = self.config.get_database_config() # db_config = self.config.get_database_config()
self.db = Database(**db_config) # self.db = Database(**db_config)
# todo本地调试时打开 # todo本地调试时打开
self.csv_file_path = 'D:\disney_test\debug_controller_log.csv'
# 初始化CSV文件
if not os.path.exists(self.csv_file_path):
with open(self.csv_file_path, 'w') as f:
csv.writer(f).writerow(
['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name',
'control_group_id'])
# 处理文件 # 处理文件
total_processed = 0 total_processed = 0
for filename in os.listdir(self.dbf_dir): for filename in os.listdir(self.dbf_dir):
@ -88,7 +99,7 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
# 关闭数据库连接 # 关闭数据库连接
# todo本地调试时打开 # todo本地调试时打开
self.db.disconnect() # self.db.disconnect()
# todo本地调试时打开 # todo本地调试时打开
return total_processed return total_processed
@ -146,17 +157,24 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
formatted_time, formatted_time,
mapping['control_group_controller_id'], mapping['control_group_controller_id'],
mapping['point_id'], mapping['point_id'],
float(value) float(value),
mapping['control_group_name'],
mapping['control_group_id']
)) ))
# 批量插入 # 批量插入
if len(batch_data) >= batch_size: if len(batch_data) >= batch_size:
# todo本地调试先注释掉 # todo本地调试先注释掉
self.db.execute_batch( # self.db.execute_batch(
"INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
batch_data # batch_data
) # )
# todo本地调试先注释掉 # todo本地调试先注释掉
# 以追加模式写入 CSV
with open(self.csv_file_path, 'a', newline='') as f:
csv.writer(f).writerows(batch_data)
processed_records += len(batch_data) processed_records += len(batch_data)
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
batch_data = [] batch_data = []
@ -168,11 +186,16 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
# 插入剩余记录 # 插入剩余记录
if batch_data: if batch_data:
# todo本地调试先注释掉 # todo本地调试先注释掉
self.db.execute_batch( # self.db.execute_batch(
"INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
batch_data # batch_data
) # )
# todo本地调试先注释掉 # todo本地调试先注释掉
# 以追加模式写入 CSV
with open(self.csv_file_path, 'a', newline='') as f:
csv.writer(f).writerows(batch_data)
processed_records += len(batch_data) processed_records += len(batch_data)
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")