import os import pandas as pd from dbfread import DBF from itertools import islice from datetime import datetime, timedelta import logging from core.database import Database from core.utils import size_to_human_readable, calculate_file_hash from pipelines.base_pipeline import BasePipeline class DbfToPostgresCtllogPipeline(BasePipeline): def __init__(self, config): super().__init__(config) # todo:本地调试打开 # self.data_root = 'D:\disney_test' # self.mapping_file = 'D:\disney_test\disney-mapping.xlsx' # todo:本地调试打开 self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') self.mapping_file = os.getenv('MAPPING_FILE') self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) self.db = None def validate_config(self): # 确保目录存在 if not os.path.exists(self.dbf_dir): raise ValueError(f"DBF directory not found: {self.dbf_dir}") # 如果有映射文件,验证存在 if self.mapping_file and not os.path.exists(self.mapping_file): self.logger.warning(f"Mapping file not found: {self.mapping_file}") self.mapping_file = None def load_mapping(self): """加载映射关系""" if not self.mapping_file: self.logger.info("No mapping file provided, using default mapping") return {} try: self.logger.info(f"Loading mapping from {self.mapping_file}") mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping") # 清理数据 mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', 'control_group_controller_id', 'controller_point_id']] # 创建映射字典 mapping_dict = {} for _, row in mapping_df.iterrows(): key = (str(row['AS_SERIAL']), str(row['ID'])) if key not in mapping_dict: mapping_dict[key] = [] mapping_dict[key].append({ 'seq_id': int(row['data_field_sequence_id']), 'control_group_controller_id': int(row['control_group_controller_id']), 'point_id': int(row['controller_point_id']) }) self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") return mapping_dict except Exception as e: self.logger.error(f"Failed to load mapping: {str(e)}") return {} def process(self): # 加载映射关系 mapping_dict = self.load_mapping() # 连接数据库 # todo:本地调试时打开 db_config = self.config.get_database_config() self.db = Database(**db_config) # todo:本地调试时打开 # 处理文件 total_processed = 0 for filename in os.listdir(self.dbf_dir): if filename.casefold().endswith('.dbf'): file_path = os.path.join(self.dbf_dir, filename) processed = self.process_file(file_path, mapping_dict) total_processed += processed self.logger.info(f"Processed {processed} records from {filename}") # 关闭数据库连接 # todo:本地调试时打开 self.db.disconnect() # todo:本地调试时打开 return total_processed def process_file(self, file_path, mapping_dict): self.logger.info(f"Processing file: {os.path.basename(file_path)}") try: # 获取文件信息 file_size = os.path.getsize(file_path) file_hash = calculate_file_hash(file_path) self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") dbf_table = DBF(file_path, encoding='utf-8') batch_data = [] processed_records = 0 batch_size = int(os.getenv('BATCH_SIZE', 1000)) self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}") # 分片读取是个大坑,不能分片 # dbf_table = DBF(file_path, load=False, encoding='utf-8') # chunk_idx = 0 # while True: # chunk = list(islice(dbf_table._iter_records(), 100000)) # if not chunk: # 读完了 # break # chunk_idx += 1 # # 处理这十万行 # self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}") for record in dbf_table: try: as_serial = str(record.get('AS_SERIAL', '')).strip() device_id = str(record.get('ID', '')).strip() key = (as_serial, device_id) # 跳过没有映射的记录 if key not in mapping_dict: continue # 处理时间 (+8小时) dt_str = record.get('DATETIME', '') if not dt_str: continue original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') target_time = original_time + timedelta(hours=8) formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00') # 处理每个映射 for mapping in mapping_dict[key]: data_field = f"DATA{mapping['seq_id']:02d}" value = record.get(data_field) if value is None: continue batch_data.append(( formatted_time, mapping['control_group_controller_id'], mapping['point_id'], float(value) )) # 批量插入 if len(batch_data) >= batch_size: # todo:本地调试先注释掉 self.db.execute_batch( "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", batch_data ) # todo:本地调试先注释掉 processed_records += len(batch_data) self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") batch_data = [] except Exception as e: self.logger.warning(f"Skipping record due to error: {str(e)}") continue # 插入剩余记录 if batch_data: # todo:本地调试先注释掉 self.db.execute_batch( "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", batch_data ) # todo:本地调试先注释掉 processed_records += len(batch_data) self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}") return processed_records except Exception as e: self.logger.error(f"Failed to process file {file_path}: {str(e)}") return 0