import os import csv import re import pandas as pd from dbfread import DBF from itertools import islice from datetime import datetime, timedelta import logging from core.database import Database from core.utils import size_to_human_readable, calculate_file_hash from pipelines.base_pipeline import BasePipeline class LogToPostgresCtllogPipeline(BasePipeline): def __init__(self, config): super().__init__(config) # todo:本地调试打开 self.data_root = 'D:\disney_test' self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' # todo:本地调试打开 # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') # self.mapping_file = os.getenv('MAPPING_FILE') self.log_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) # todo:debug use self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv' # 初始化CSV文件 if not os.path.exists(self.csv_file_path): with open(self.csv_file_path, 'w') as f: csv.writer(f).writerow( ['created', 'control_group_controller_id', 'point_id', 'real_value']) # todo:debug use self.db = None self.group_cache = {} self.batch_size = int(os.getenv('BATCH_SIZE', 1000)) self.batch_data = [] self.seen = set() # 用于去重 self.processed_records = 0 self.current_date = None # 当前处理的日期 def validate_config(self): # 确保目录存在 if not os.path.exists(self.log_dir): raise ValueError(f"DBF directory not found: {self.log_dir}") # 如果有映射文件,验证存在 if self.mapping_file and not os.path.exists(self.mapping_file): self.logger.warning(f"Mapping file not found: {self.mapping_file}") self.mapping_file = None def load_mapping(self): """加载映射关系""" if not self.mapping_file: self.logger.info("No mapping file provided, using default mapping") return {} try: self.logger.info(f"Loading mapping from {self.mapping_file}") mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping") # 清理数据 mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', 'disney_device_point_name', 'control_group_controller_id', 'controller_point_id', 'control_group_name', 'control_group_id']] # 创建映射字典 mapping_dict = {} for _, row in mapping_df.iterrows(): key = (str(row['AS_SERIAL']), str(row['ID'])) if key not in mapping_dict: mapping_dict[key] = [] mapping_dict[key].append({ 'seq_id': int(row['data_field_sequence_id']), 'disney_device_point_name': row['disney_device_point_name'].strip(), 'control_group_controller_id': int(row['control_group_controller_id']), 'point_id': int(row['controller_point_id']), 'control_group_name': row['control_group_name'].strip(), 'control_group_id': int(row['control_group_id']) }) self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") return mapping_dict except Exception as e: self.logger.error(f"Failed to load mapping: {str(e)}") return {} def process(self): # 加载映射关系 mapping_dict = self.load_mapping() # 连接数据库 # todo:本地调试时关闭 # db_config = self.config.get_database_config() # self.db = Database(**db_config) # todo:本地调试时关闭 # 处理文件 total_processed = 0 for root, dirs, files in os.walk(self.log_dir): for filename in files: if filename.casefold().endswith('.log'): full_path = os.path.join(root, filename) # 获取文件名(不含后缀),文件名即为Disney设备ID:mb-xxx。即对应mapping里的ID字段 device_id = os.path.splitext(filename)[0].upper() # 获取文件所在目录的名称(直接父目录),目录名是Dsiney楼栋的mac地址,即对应mapping里的AS_SERIAL as_serial = os.path.basename(root) processed = self.process_log_file(full_path, as_serial, device_id, mapping_dict) total_processed += processed self.logger.info(f"Processed {processed} records from {filename}") # 关闭数据库连接 # todo:本地调试时关闭 # self.db.disconnect() # todo:本地调试时关闭 return total_processed def clean_header(self, header): """更健壮的列名清理""" # 先移除可能存在的引号 header = header.strip().strip("'").strip('"') # 移除括号及内容 return re.sub(r'\s*\([^)]*\)', '', header).strip() def clean_header_debug(self, headers): """带调试信息的列名清理""" cleaned_headers = [] print(f"开始处理CSV列名,共 {len(headers)} 列") print(f"原始列名: {headers}") for i, original_header in enumerate(headers): try: print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'") # 类型检查 if not isinstance(original_header, str): print(f"警告: 列名不是字符串类型,类型为 {type(original_header)}") header_str = str(original_header) print(f"转换为字符串: '{header_str}'") else: header_str = original_header # 长度信息 orig_length = len(header_str) print(f"原始长度: {orig_length} 字符") # 移除引号 stripped = header_str.strip().strip("'").strip('"') stripped_length = len(stripped) print(f"移除引号后: '{stripped}' ({stripped_length} 字符)") # 移除括号内容 cleaned = re.sub(r'\s*\([^)]*\)', '', stripped).strip() cleaned_length = len(cleaned) print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)") # 空值检查 if not cleaned: print(f"警告: 清理后列名为空,使用原始值") cleaned = f"Column_{i + 1}" cleaned_headers.append(cleaned) print(f"处理完成: '{original_header}' => '{cleaned}'") except Exception as e: print(f"\n错误! 处理列名失败: '{original_header}'") print(f"错误详情: {str(e)}") print(f"使用原始列名作为后备") if original_header: cleaned_headers.append(original_header) else: cleaned_headers.append(f"Column_{i + 1}") print(f"\n列名处理完成") print(f"清理后列名: {cleaned_headers}") return cleaned_headers def process_log_file(self, file_path, as_serial, device_id, mapping_dict): log_table = [] self.logger.info(f"Processing file: {os.path.basename(file_path)}") try: # 获取文件信息 file_size = os.path.getsize(file_path) file_hash = calculate_file_hash(file_path) self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") with open(file_path, 'r', newline='', encoding='utf-8-sig') as file: # 创建CSV读取器 csv_reader = csv.reader(file) try: # 读取列名行 headers = next(csv_reader) except StopIteration: self.logger.info("CSV is empty") headers = [] # 使用带调试的列名清理 cleaned_headers = self.clean_header_debug(headers) # 创建DictReader dict_reader = csv.DictReader( file, fieldnames=cleaned_headers ) for row in dict_reader: log_table.append(row) # 处理log表 self.logger.info(f"the Log file: {os.path.basename(file_path)} have record: #{len(log_table)}") for record in log_table: self.process_record(record, as_serial, device_id, mapping_dict) # 确保所有剩余数据都被处理 self.final_flush() self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}") return self.processed_records except Exception as e: self.logger.error(f"Failed to process file {file_path}: {str(e)}") return def process_record(self, record, as_serial, device_id, mapping_dict): """处理单个记录""" try: # 从文件名得到modbus设备名 # 从文件名的上一层文件夹名得到AS_SERIAL key = (as_serial, device_id) if key not in mapping_dict: return dt_str = record.get('time', '') if not dt_str: return # 去掉两端的单引号 clean_dt_str = dt_str.strip().strip("'").strip('"') record_time = datetime.strptime(clean_dt_str, '%Y-%m-%d %H:%M:%S') # 检查日期变化 record_date = record_time.date() if self.current_date is None: self.current_date = record_date elif record_date != self.current_date: # 日期变化时,处理前一天的缓存数据 self.process_day_cache() self.current_date = record_date hour_key = record_time.replace(minute=0, second=0, microsecond=0) # seq_id不再有作用,需要根据disney_device_point_name去记录里查找 for mapping in mapping_dict[key]: data_field = mapping['disney_device_point_name'] # data_field = f"DATA{mapping['seq_id']:02d}" value = record.get(data_field) if value is None: continue try: float_value = float(value) except ValueError: continue # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 # 创建分组键 group_key = ( mapping['control_group_controller_id'], mapping['point_id'], hour_key ) # 添加到分组缓存 self.add_to_group_cache(group_key, record_time, float_value, mapping) # 检查批次大小 if len(self.batch_data) >= self.batch_size: self.flush_batch() # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 except Exception as e: self.logger.error(f"Error processing record: {e}") def flush_batch(self): """执行批量插入并清空批次数据""" if not self.batch_data: return # 实际插入数据库 # self.db.execute_batch( # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", # [(data[0], data[1], data[2], data[3]) for data in self.batch_data] # ) # todo: debug时写入CSV(调试用) with open(self.csv_file_path, "a", newline="") as f: writer = csv.writer(f) writer.writerows(self.batch_data) # todo: debug时写入CSV # 更新处理记录数 processed_count = len(self.batch_data) self.processed_records += processed_count self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}") # 清空批次数据 self.batch_data = [] self.seen.clear() def final_flush(self): # 刷新剩余的批次数据 self.flush_batch()