From 76cbaa820063735bab728bcc71c66712ed945762 Mon Sep 17 00:00:00 2001 From: "mingsheng.li" Date: Fri, 1 Aug 2025 10:48:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=89=E7=85=A7=E5=8F=8D=E9=A6=88=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E4=BF=AE=E6=94=B9=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=96=B9=E6=A1=88=EF=BC=9A=201=EF=BC=8C=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=81=8F=E5=B7=AE=E9=97=AE=E9=A2=98=EF=BC=8C=E5=AF=B9=E7=AD=96?= =?UTF-8?q?=EF=BC=9A=E4=B8=8D=E5=86=8D+8=E5=AF=BC=E5=85=A5=202=EF=BC=8C?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=95=B0=E6=8D=AE=E7=82=B9=E4=BD=8D=E5=80=BC?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=9A=E5=AF=B9=E7=AD=96=EF=BC=9A=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E5=B0=8F=E6=97=B6=E6=9C=89=E5=A4=9A=E4=B8=AA=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=EF=BC=8C=E9=97=B4=E9=9A=94=E5=B0=8F=E4=BA=8E15?= =?UTF-8?q?=EF=BC=8C=E5=8E=BB=E6=8E=89=E6=97=B6=E9=97=B4=E5=A4=A7=E7=9A=84?= =?UTF-8?q?=E9=82=A3=E4=B8=AA=EF=BC=8C=E5=A6=82=E6=9E=9C=E6=98=AF=E5=BC=82?= =?UTF-8?q?=E5=B8=B80=E5=80=BC=EF=BC=8C=E5=8E=BB=E6=8E=890=E5=80=BC?= =?UTF-8?q?=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pipelines/dbf_to_postgres_ctllog.py | 297 +++++++++++++++--------- 1 file changed, 191 insertions(+), 106 deletions(-) diff --git a/src/pipelines/dbf_to_postgres_ctllog.py b/src/pipelines/dbf_to_postgres_ctllog.py index d2a7085..9b209f1 100644 --- a/src/pipelines/dbf_to_postgres_ctllog.py +++ b/src/pipelines/dbf_to_postgres_ctllog.py @@ -9,20 +9,34 @@ from core.database import Database from core.utils import size_to_human_readable, calculate_file_hash from pipelines.base_pipeline import BasePipeline - class DbfToPostgresCtllogPipeline(BasePipeline): def __init__(self, config): super().__init__(config) # todo:本地调试打开 - self.data_root = 'D:\disney_test' - self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' + # self.data_root = 'D:\disney_test' + # self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' # todo:本地调试打开 - # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') - # self.mapping_file = os.getenv('MAPPING_FILE') - + self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + self.mapping_file = os.getenv('MAPPING_FILE') self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) + # todo:debug use + # self.csv_file_path = 'D:\disney_test\debug_controller_log.csv' + # # 初始化CSV文件 + # if not os.path.exists(self.csv_file_path): + # with open(self.csv_file_path, 'w') as f: + # csv.writer(f).writerow( + # ['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name', + # 'control_group_id']) + # todo:debug use + self.db = None + self.group_cache = {} + self.batch_size = os.getenv('BATCH_SIZE', 1000) + self.batch_data = [] + self.processed_records = 0 + self.current_date = None # 当前处理的日期 + def validate_config(self): # 确保目录存在 @@ -75,18 +89,10 @@ class DbfToPostgresCtllogPipeline(BasePipeline): mapping_dict = self.load_mapping() # 连接数据库 - # todo:本地调试时打开 - # db_config = self.config.get_database_config() - # self.db = Database(**db_config) - # todo:本地调试时打开 - - self.csv_file_path = 'D:\disney_test\debug_controller_log.csv' - # 初始化CSV文件 - if not os.path.exists(self.csv_file_path): - with open(self.csv_file_path, 'w') as f: - csv.writer(f).writerow( - ['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name', - 'control_group_id']) + # todo:本地调试时关闭 + db_config = self.config.get_database_config() + self.db = Database(**db_config) + # todo:本地调试时关闭 # 处理文件 total_processed = 0 @@ -98,9 +104,9 @@ class DbfToPostgresCtllogPipeline(BasePipeline): self.logger.info(f"Processed {processed} records from {filename}") # 关闭数据库连接 - # todo:本地调试时打开 - # self.db.disconnect() - # todo:本地调试时打开 + # todo:本地调试时关闭 + self.db.disconnect() + # todo:本地调试时关闭 return total_processed def process_file(self, file_path, mapping_dict): @@ -112,96 +118,175 @@ class DbfToPostgresCtllogPipeline(BasePipeline): self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") dbf_table = DBF(file_path, encoding='utf-8') - batch_data = [] - processed_records = 0 - batch_size = int(os.getenv('BATCH_SIZE', 1000)) self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}") - # 分片读取是个大坑,不能分片 - # dbf_table = DBF(file_path, load=False, encoding='utf-8') - # chunk_idx = 0 - # while True: - # chunk = list(islice(dbf_table._iter_records(), 100000)) - # if not chunk: # 读完了 - # break - # chunk_idx += 1 - # # 处理这十万行 - # self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}") + # 处理DBF表 for record in dbf_table: - try: - as_serial = str(record.get('AS_SERIAL', '')).strip() - device_id = str(record.get('ID', '')).strip() - key = (as_serial, device_id) + self.process_record(record, mapping_dict) - # 跳过没有映射的记录 - if key not in mapping_dict: - continue + # 确保所有剩余数据都被处理 + self.final_flush() - # 处理时间 (+8小时) - dt_str = record.get('DATETIME', '') - if not dt_str: - continue - - original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') - target_time = original_time + timedelta(hours=8) - formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00') - - # 处理每个映射 - for mapping in mapping_dict[key]: - data_field = f"DATA{mapping['seq_id']:02d}" - value = record.get(data_field) - if value is None: - continue - - batch_data.append(( - formatted_time, - mapping['control_group_controller_id'], - mapping['point_id'], - float(value), - mapping['control_group_name'], - mapping['control_group_id'] - )) - - # 批量插入 - if len(batch_data) >= batch_size: - # todo:本地调试先注释掉 - # self.db.execute_batch( - # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - # batch_data - # ) - # todo:本地调试先注释掉 - - # 以追加模式写入 CSV - with open(self.csv_file_path, 'a', newline='') as f: - csv.writer(f).writerows(batch_data) - - processed_records += len(batch_data) - self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") - batch_data = [] - - except Exception as e: - self.logger.warning(f"Skipping record due to error: {str(e)}") - continue - - # 插入剩余记录 - if batch_data: - # todo:本地调试先注释掉 - # self.db.execute_batch( - # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - # batch_data - # ) - # todo:本地调试先注释掉 - - # 以追加模式写入 CSV - with open(self.csv_file_path, 'a', newline='') as f: - csv.writer(f).writerows(batch_data) - - processed_records += len(batch_data) - self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") - - self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}") - return processed_records + self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}") + return self.processed_records except Exception as e: self.logger.error(f"Failed to process file {file_path}: {str(e)}") - return 0 \ No newline at end of file + return 0 + + def process_record(self, record, mapping_dict): + """处理单个记录""" + try: + as_serial = str(record.get('AS_SERIAL', '')).strip() + device_id = str(record.get('ID', '')).strip() + key = (as_serial, device_id) + + if key not in mapping_dict: + return + + dt_str = record.get('DATETIME', '') + if not dt_str: + return + + record_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') + dt_str = record.get('DATETIME', '') + if not dt_str: + return + + record_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') + + # 检查日期变化 + record_date = record_time.date() + if self.current_date is None: + self.current_date = record_date + elif record_date != self.current_date: + # 日期变化时,处理前一天的缓存数据 + self.process_day_cache() + self.current_date = record_date + + hour_key = record_time.replace(minute=0, second=0, microsecond=0) + + for mapping in mapping_dict[key]: + data_field = f"DATA{mapping['seq_id']:02d}" + value = record.get(data_field) + if value is None: + continue + + try: + float_value = float(value) + except ValueError: + continue + + # 创建分组键 + group_key = ( + mapping['control_group_controller_id'], + mapping['point_id'], + hour_key + ) + + # 添加到分组缓存 + self.add_to_group_cache(group_key, record_time, float_value, mapping) + + # 检查批次大小 + if len(self.batch_data) >= self.batch_size: + self.flush_batch() + + except Exception as e: + self.logger.error(f"Error processing record: {e}") + + def add_to_group_cache(self, group_key, record_time, value, mapping): + """添加记录到分组缓存,并进行取舍处理""" + # 获取该分组的缓存 + group_records = self.group_cache.setdefault(group_key, []) + + # 添加新记录 + group_records.append({ + 'time': record_time, + 'value': value, + 'mapping': mapping + }) + + # 一个点位一个小时数据不超过5条,如果超过5条记录,立即处理并清空缓存 + if len(group_records) >= 5: + self.process_and_remove_group(group_key) + + def process_day_cache(self): + """处理当前日期的所有缓存数据""" + if not self.group_cache: + return + + # 处理所有剩余分组 + group_keys = list(self.group_cache.keys()) + for group_key in group_keys: + self.process_and_remove_group(group_key) + + def process_and_remove_group(self, group_key): + """处理并移除一个分组的数据""" + if group_key not in self.group_cache: + return + + records = self.group_cache[group_key] + + # 应用取舍规则 + selected_record = None + + # 1. 优先非零值记录 + non_zero_records = [r for r in records if r['value'] != 0.0] + if non_zero_records: + # 2. 取时间最早的记录(数据单调递增,第一条即最早) + selected_record = non_zero_records[0] + else: + # 3. 如果所有值都是0,取第一条记录 + selected_record = records[0] + + # 添加到批次数据 + self.add_to_batch(selected_record) + + # 移除该分组缓存 + del self.group_cache[group_key] + + def add_to_batch(self, record): + """将选定记录添加到批次数据""" + formatted_time = record['time'].strftime('%Y-%m-%d %H:%M:%S') + mapping = record['mapping'] + + self.batch_data.append(( + formatted_time, + mapping['control_group_controller_id'], + mapping['point_id'], + record['value'], + mapping['control_group_name'], + mapping['control_group_id'] + )) + + def flush_batch(self): + """执行批量插入并清空批次数据""" + if not self.batch_data: + return + + # 实际插入数据库 + self.db.execute_batch( + "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", + [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + ) + + # todo: debug时写入CSV(调试用) + # with open(self.csv_file_path, "a", newline="") as f: + # writer = csv.writer(f) + # writer.writerows(self.batch_data) + # todo: debug时写入CSV + + # 更新处理记录数 + processed_count = len(self.batch_data) + self.processed_records += processed_count + self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}") + + # 清空批次数据 + self.batch_data = [] + + def final_flush(self): + # 处理所有剩余的分组缓存 + self.process_day_cache() + + # 刷新剩余的批次数据 + self.flush_batch() \ No newline at end of file