diff --git a/src/pipelines/dbf_to_postgres_ctllog.py b/src/pipelines/dbf_to_postgres_ctllog.py index 547e66e..ceb42c9 100644 --- a/src/pipelines/dbf_to_postgres_ctllog.py +++ b/src/pipelines/dbf_to_postgres_ctllog.py @@ -96,9 +96,19 @@ class DbfToPostgresCtllogPipeline(BasePipeline): # 处理文件 total_processed = 0 + + file_list = [] for filename in os.listdir(self.dbf_dir): if filename.casefold().endswith('.dbf'): file_path = os.path.join(self.dbf_dir, filename) + # 获取最后修改时间 + mtime = os.path.getmtime(file_path) + file_list.append((mtime, file_path, filename)) + + # 按最后修改时间排序(从早到晚) + file_list.sort(key=lambda x: x[0]) + # 按顺序处理文件 + for mtime, file_path, filename in file_list: processed = self.process_file(file_path, mapping_dict) total_processed += processed self.logger.info(f"Processed {processed} records from {filename}") @@ -265,10 +275,18 @@ class DbfToPostgresCtllogPipeline(BasePipeline): return # 实际插入数据库 - self.db.execute_batch( - "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - [(data[0], data[1], data[2], data[3]) for data in self.batch_data] - ) + try: + self.db.execute_batch( + """ + INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) + VALUES (%s, %s, %s, %s) + ON CONFLICT (created, control_group_controller_id, point_id) + DO UPDATE SET real_value = EXCLUDED.real_value + """, + [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + ) + except Exception as e: + self.logger.error(f"Batch Insert data failed: {e}") # todo: debug时写入CSV(调试用) # with open(self.csv_file_path, "a", newline="") as f: diff --git a/src/pipelines/dbf_to_postgres_ctllog_pwc.py b/src/pipelines/dbf_to_postgres_ctllog_pwc.py index 3c4a048..fb5a0ef 100644 --- a/src/pipelines/dbf_to_postgres_ctllog_pwc.py +++ b/src/pipelines/dbf_to_postgres_ctllog_pwc.py @@ -12,10 +12,10 @@ from pipelines.base_pipeline import BasePipeline class DbfToPostgresCtllogPwcPipeline(BasePipeline): def __init__(self, config): super().__init__(config) - # todo:本地调试打开 + # todo:debug use # self.data_root = 'D:\disney_test' # self.mapping_file = 'D:\disney_test\disney-mapping-elec-v3.xlsx' - # todo:本地调试打开 + # todo:debug use self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') self.mapping_file = os.getenv('MAPPING_FILE') self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) @@ -94,12 +94,22 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline): # 处理文件 total_processed = 0 + + file_list = [] for filename in os.listdir(self.dbf_dir): if filename.casefold().endswith('.dbf'): file_path = os.path.join(self.dbf_dir, filename) - processed = self.process_file(file_path, mapping_dict) - total_processed += processed - self.logger.info(f"Processed {processed} records from {filename}") + # 获取最后修改时间 + mtime = os.path.getmtime(file_path) + file_list.append((mtime, file_path, filename)) + + # 按最后修改时间排序(从早到晚) + file_list.sort(key=lambda x: x[0]) + # 按顺序处理文件 + for mtime, file_path, filename in file_list: + processed = self.process_file(file_path, mapping_dict) + total_processed += processed + self.logger.info(f"Processed {processed} records from {filename}") # 关闭数据库连接 # todo:本地调试时关闭 @@ -177,10 +187,18 @@ class DbfToPostgresCtllogPwcPipeline(BasePipeline): return # 实际插入数据库 - self.db.execute_batch( - "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - [(data[0], data[1], data[2], data[3]) for data in self.batch_data] - ) + try: + self.db.execute_batch( + """ + INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) + VALUES (%s, %s, %s, %s) + ON CONFLICT (created, control_group_controller_id, point_id) + DO UPDATE SET real_value = EXCLUDED.real_value + """, + [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + ) + except Exception as e: + self.logger.error(f"Batch Insert data failed: {e}") # todo: debug时写入CSV(调试用) # with open(self.csv_file_path, "a", newline="") as f: diff --git a/src/pipelines/log_to_postgres_ctllog.py b/src/pipelines/log_to_postgres_ctllog.py new file mode 100644 index 0000000..47f7118 --- /dev/null +++ b/src/pipelines/log_to_postgres_ctllog.py @@ -0,0 +1,317 @@ +import os +import csv +import re +import pandas as pd +from dbfread import DBF +from itertools import islice +from datetime import datetime, timedelta +import logging +from core.database import Database +from core.utils import size_to_human_readable, calculate_file_hash +from pipelines.base_pipeline import BasePipeline + + +class LogToPostgresCtllogPipeline(BasePipeline): + def __init__(self, config): + super().__init__(config) + # todo:本地调试打开 + self.data_root = 'D:\disney_test' + self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' + # todo:本地调试打开 + # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + # self.mapping_file = os.getenv('MAPPING_FILE') + self.log_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) + + # todo:debug use + self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv' + # 初始化CSV文件 + if not os.path.exists(self.csv_file_path): + with open(self.csv_file_path, 'w') as f: + csv.writer(f).writerow( + ['created', 'control_group_controller_id', 'point_id', 'real_value']) + # todo:debug use + + self.db = None + self.group_cache = {} + self.batch_size = int(os.getenv('BATCH_SIZE', 1000)) + self.batch_data = [] + self.seen = set() # 用于去重 + self.processed_records = 0 + self.current_date = None # 当前处理的日期 + + def validate_config(self): + # 确保目录存在 + if not os.path.exists(self.log_dir): + raise ValueError(f"DBF directory not found: {self.log_dir}") + + # 如果有映射文件,验证存在 + if self.mapping_file and not os.path.exists(self.mapping_file): + self.logger.warning(f"Mapping file not found: {self.mapping_file}") + self.mapping_file = None + + def load_mapping(self): + """加载映射关系""" + if not self.mapping_file: + self.logger.info("No mapping file provided, using default mapping") + return {} + + try: + self.logger.info(f"Loading mapping from {self.mapping_file}") + mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping") + + # 清理数据 + mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) + mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', 'disney_device_point_name', + 'control_group_controller_id', 'controller_point_id', + 'control_group_name', 'control_group_id']] + + # 创建映射字典 + mapping_dict = {} + for _, row in mapping_df.iterrows(): + key = (str(row['AS_SERIAL']), str(row['ID'])) + if key not in mapping_dict: + mapping_dict[key] = [] + + mapping_dict[key].append({ + 'seq_id': int(row['data_field_sequence_id']), + 'disney_device_point_name': row['disney_device_point_name'].strip(), + 'control_group_controller_id': int(row['control_group_controller_id']), + 'point_id': int(row['controller_point_id']), + 'control_group_name': row['control_group_name'].strip(), + 'control_group_id': int(row['control_group_id']) + }) + + self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") + return mapping_dict + except Exception as e: + self.logger.error(f"Failed to load mapping: {str(e)}") + return {} + + def process(self): + # 加载映射关系 + mapping_dict = self.load_mapping() + + # 连接数据库 + # todo:本地调试时关闭 + # db_config = self.config.get_database_config() + # self.db = Database(**db_config) + # todo:本地调试时关闭 + + # 处理文件 + total_processed = 0 + + for root, dirs, files in os.walk(self.log_dir): + for filename in files: + if filename.casefold().endswith('.log'): + full_path = os.path.join(root, filename) + # 获取文件名(不含后缀),文件名即为Disney设备ID:mb-xxx。即对应mapping里的ID字段 + device_id = os.path.splitext(filename)[0].upper() + # 获取文件所在目录的名称(直接父目录),目录名是Dsiney楼栋的mac地址,即对应mapping里的AS_SERIAL + as_serial = os.path.basename(root) + processed = self.process_log_file(full_path, as_serial, device_id, mapping_dict) + total_processed += processed + self.logger.info(f"Processed {processed} records from {filename}") + + # 关闭数据库连接 + # todo:本地调试时关闭 + # self.db.disconnect() + # todo:本地调试时关闭 + return total_processed + + def clean_header(self, header): + """更健壮的列名清理""" + # 先移除可能存在的引号 + header = header.strip().strip("'").strip('"') + # 移除括号及内容 + return re.sub(r'\s*\([^)]*\)', '', header).strip() + + def clean_header_debug(self, headers): + """带调试信息的列名清理""" + cleaned_headers = [] + print(f"开始处理CSV列名,共 {len(headers)} 列") + print(f"原始列名: {headers}") + + for i, original_header in enumerate(headers): + try: + print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'") + + # 类型检查 + if not isinstance(original_header, str): + print(f"警告: 列名不是字符串类型,类型为 {type(original_header)}") + header_str = str(original_header) + print(f"转换为字符串: '{header_str}'") + else: + header_str = original_header + + # 长度信息 + orig_length = len(header_str) + print(f"原始长度: {orig_length} 字符") + + # 移除引号 + stripped = header_str.strip().strip("'").strip('"') + stripped_length = len(stripped) + print(f"移除引号后: '{stripped}' ({stripped_length} 字符)") + + # 移除括号内容 + cleaned = re.sub(r'\s*\([^)]*\)', '', stripped).strip() + cleaned_length = len(cleaned) + print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)") + + # 空值检查 + if not cleaned: + print(f"警告: 清理后列名为空,使用原始值") + cleaned = f"Column_{i + 1}" + + cleaned_headers.append(cleaned) + print(f"处理完成: '{original_header}' => '{cleaned}'") + + except Exception as e: + print(f"\n错误! 处理列名失败: '{original_header}'") + print(f"错误详情: {str(e)}") + print(f"使用原始列名作为后备") + + if original_header: + cleaned_headers.append(original_header) + else: + cleaned_headers.append(f"Column_{i + 1}") + + print(f"\n列名处理完成") + print(f"清理后列名: {cleaned_headers}") + return cleaned_headers + + def process_log_file(self, file_path, as_serial, device_id, mapping_dict): + log_table = [] + self.logger.info(f"Processing file: {os.path.basename(file_path)}") + + try: + # 获取文件信息 + file_size = os.path.getsize(file_path) + file_hash = calculate_file_hash(file_path) + self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") + with open(file_path, 'r', newline='', encoding='utf-8-sig') as file: + # 创建CSV读取器 + csv_reader = csv.reader(file) + + try: + # 读取列名行 + headers = next(csv_reader) + except StopIteration: + self.logger.info("CSV is empty") + headers = [] + + # 使用带调试的列名清理 + cleaned_headers = self.clean_header_debug(headers) + + # 创建DictReader + dict_reader = csv.DictReader( + file, + fieldnames=cleaned_headers + ) + for row in dict_reader: + log_table.append(row) + + # 处理log表 + self.logger.info(f"the Log file: {os.path.basename(file_path)} have record: #{len(log_table)}") + for record in log_table: + self.process_record(record, as_serial, device_id, mapping_dict) + + # 确保所有剩余数据都被处理 + self.final_flush() + + self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}") + return self.processed_records + + except Exception as e: + self.logger.error(f"Failed to process file {file_path}: {str(e)}") + return + + def process_record(self, record, as_serial, device_id, mapping_dict): + """处理单个记录""" + try: + # 从文件名得到modbus设备名 + # 从文件名的上一层文件夹名得到AS_SERIAL + key = (as_serial, device_id) + + if key not in mapping_dict: + return + + dt_str = record.get('time', '') + if not dt_str: + return + + # 去掉两端的单引号 + clean_dt_str = dt_str.strip().strip("'").strip('"') + record_time = datetime.strptime(clean_dt_str, '%Y-%m-%d %H:%M:%S') + + # 检查日期变化 + record_date = record_time.date() + if self.current_date is None: + self.current_date = record_date + elif record_date != self.current_date: + # 日期变化时,处理前一天的缓存数据 + self.process_day_cache() + self.current_date = record_date + + hour_key = record_time.replace(minute=0, second=0, microsecond=0) + + # seq_id不再有作用,需要根据disney_device_point_name去记录里查找 + for mapping in mapping_dict[key]: + data_field = mapping['disney_device_point_name'] + # data_field = f"DATA{mapping['seq_id']:02d}" + value = record.get(data_field) + if value is None: + continue + + try: + float_value = float(value) + except ValueError: + continue + + # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 + # 创建分组键 + group_key = ( + mapping['control_group_controller_id'], + mapping['point_id'], + hour_key + ) + + # 添加到分组缓存 + self.add_to_group_cache(group_key, record_time, float_value, mapping) + + # 检查批次大小 + if len(self.batch_data) >= self.batch_size: + self.flush_batch() + # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 + + except Exception as e: + self.logger.error(f"Error processing record: {e}") + + def flush_batch(self): + """执行批量插入并清空批次数据""" + if not self.batch_data: + return + + # 实际插入数据库 + # self.db.execute_batch( + # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", + # [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + # ) + + # todo: debug时写入CSV(调试用) + with open(self.csv_file_path, "a", newline="") as f: + writer = csv.writer(f) + writer.writerows(self.batch_data) + # todo: debug时写入CSV + + # 更新处理记录数 + processed_count = len(self.batch_data) + self.processed_records += processed_count + self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}") + + # 清空批次数据 + self.batch_data = [] + self.seen.clear() + + def final_flush(self): + # 刷新剩余的批次数据 + self.flush_batch()