1,时间偏差问题,对策:不再+8导入 2,异常数据点位值处理:对策:一个小时有多个数据,间隔小于15,去掉时间大的那个,如果是异常0值,去掉0值的
This commit is contained in:
parent
6fe643638e
commit
76cbaa8200
|
|
@ -9,20 +9,34 @@ from core.database import Database
|
||||||
from core.utils import size_to_human_readable, calculate_file_hash
|
from core.utils import size_to_human_readable, calculate_file_hash
|
||||||
from pipelines.base_pipeline import BasePipeline
|
from pipelines.base_pipeline import BasePipeline
|
||||||
|
|
||||||
|
|
||||||
class DbfToPostgresCtllogPipeline(BasePipeline):
|
class DbfToPostgresCtllogPipeline(BasePipeline):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
super().__init__(config)
|
super().__init__(config)
|
||||||
# todo:本地调试打开
|
# todo:本地调试打开
|
||||||
self.data_root = 'D:\disney_test'
|
# self.data_root = 'D:\disney_test'
|
||||||
self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx'
|
# self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx'
|
||||||
# todo:本地调试打开
|
# todo:本地调试打开
|
||||||
# self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
|
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
|
||||||
# self.mapping_file = os.getenv('MAPPING_FILE')
|
self.mapping_file = os.getenv('MAPPING_FILE')
|
||||||
|
|
||||||
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
|
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
|
||||||
|
|
||||||
|
# todo:debug use
|
||||||
|
# self.csv_file_path = 'D:\disney_test\debug_controller_log.csv'
|
||||||
|
# # 初始化CSV文件
|
||||||
|
# if not os.path.exists(self.csv_file_path):
|
||||||
|
# with open(self.csv_file_path, 'w') as f:
|
||||||
|
# csv.writer(f).writerow(
|
||||||
|
# ['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name',
|
||||||
|
# 'control_group_id'])
|
||||||
|
# todo:debug use
|
||||||
|
|
||||||
self.db = None
|
self.db = None
|
||||||
|
self.group_cache = {}
|
||||||
|
self.batch_size = os.getenv('BATCH_SIZE', 1000)
|
||||||
|
self.batch_data = []
|
||||||
|
self.processed_records = 0
|
||||||
|
self.current_date = None # 当前处理的日期
|
||||||
|
|
||||||
|
|
||||||
def validate_config(self):
|
def validate_config(self):
|
||||||
# 确保目录存在
|
# 确保目录存在
|
||||||
|
|
@ -75,18 +89,10 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
|
||||||
mapping_dict = self.load_mapping()
|
mapping_dict = self.load_mapping()
|
||||||
|
|
||||||
# 连接数据库
|
# 连接数据库
|
||||||
# todo:本地调试时打开
|
# todo:本地调试时关闭
|
||||||
# db_config = self.config.get_database_config()
|
db_config = self.config.get_database_config()
|
||||||
# self.db = Database(**db_config)
|
self.db = Database(**db_config)
|
||||||
# todo:本地调试时打开
|
# todo:本地调试时关闭
|
||||||
|
|
||||||
self.csv_file_path = 'D:\disney_test\debug_controller_log.csv'
|
|
||||||
# 初始化CSV文件
|
|
||||||
if not os.path.exists(self.csv_file_path):
|
|
||||||
with open(self.csv_file_path, 'w') as f:
|
|
||||||
csv.writer(f).writerow(
|
|
||||||
['created', 'control_group_controller_id', 'point_id', 'real_value', 'control_group_name',
|
|
||||||
'control_group_id'])
|
|
||||||
|
|
||||||
# 处理文件
|
# 处理文件
|
||||||
total_processed = 0
|
total_processed = 0
|
||||||
|
|
@ -98,9 +104,9 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
|
||||||
self.logger.info(f"Processed {processed} records from {filename}")
|
self.logger.info(f"Processed {processed} records from {filename}")
|
||||||
|
|
||||||
# 关闭数据库连接
|
# 关闭数据库连接
|
||||||
# todo:本地调试时打开
|
# todo:本地调试时关闭
|
||||||
# self.db.disconnect()
|
self.db.disconnect()
|
||||||
# todo:本地调试时打开
|
# todo:本地调试时关闭
|
||||||
return total_processed
|
return total_processed
|
||||||
|
|
||||||
def process_file(self, file_path, mapping_dict):
|
def process_file(self, file_path, mapping_dict):
|
||||||
|
|
@ -112,96 +118,175 @@ class DbfToPostgresCtllogPipeline(BasePipeline):
|
||||||
self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}")
|
self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}")
|
||||||
|
|
||||||
dbf_table = DBF(file_path, encoding='utf-8')
|
dbf_table = DBF(file_path, encoding='utf-8')
|
||||||
batch_data = []
|
|
||||||
processed_records = 0
|
|
||||||
batch_size = int(os.getenv('BATCH_SIZE', 1000))
|
|
||||||
self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}")
|
self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}")
|
||||||
|
|
||||||
# 分片读取是个大坑,不能分片
|
# 处理DBF表
|
||||||
# dbf_table = DBF(file_path, load=False, encoding='utf-8')
|
|
||||||
# chunk_idx = 0
|
|
||||||
# while True:
|
|
||||||
# chunk = list(islice(dbf_table._iter_records(), 100000))
|
|
||||||
# if not chunk: # 读完了
|
|
||||||
# break
|
|
||||||
# chunk_idx += 1
|
|
||||||
# # 处理这十万行
|
|
||||||
# self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}")
|
|
||||||
for record in dbf_table:
|
for record in dbf_table:
|
||||||
try:
|
self.process_record(record, mapping_dict)
|
||||||
as_serial = str(record.get('AS_SERIAL', '')).strip()
|
|
||||||
device_id = str(record.get('ID', '')).strip()
|
|
||||||
key = (as_serial, device_id)
|
|
||||||
|
|
||||||
# 跳过没有映射的记录
|
# 确保所有剩余数据都被处理
|
||||||
if key not in mapping_dict:
|
self.final_flush()
|
||||||
continue
|
|
||||||
|
|
||||||
# 处理时间 (+8小时)
|
self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}")
|
||||||
dt_str = record.get('DATETIME', '')
|
return self.processed_records
|
||||||
if not dt_str:
|
|
||||||
continue
|
|
||||||
|
|
||||||
original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
|
|
||||||
target_time = original_time + timedelta(hours=8)
|
|
||||||
formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00')
|
|
||||||
|
|
||||||
# 处理每个映射
|
|
||||||
for mapping in mapping_dict[key]:
|
|
||||||
data_field = f"DATA{mapping['seq_id']:02d}"
|
|
||||||
value = record.get(data_field)
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
batch_data.append((
|
|
||||||
formatted_time,
|
|
||||||
mapping['control_group_controller_id'],
|
|
||||||
mapping['point_id'],
|
|
||||||
float(value),
|
|
||||||
mapping['control_group_name'],
|
|
||||||
mapping['control_group_id']
|
|
||||||
))
|
|
||||||
|
|
||||||
# 批量插入
|
|
||||||
if len(batch_data) >= batch_size:
|
|
||||||
# todo:本地调试先注释掉
|
|
||||||
# self.db.execute_batch(
|
|
||||||
# "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
|
||||||
# batch_data
|
|
||||||
# )
|
|
||||||
# todo:本地调试先注释掉
|
|
||||||
|
|
||||||
# 以追加模式写入 CSV
|
|
||||||
with open(self.csv_file_path, 'a', newline='') as f:
|
|
||||||
csv.writer(f).writerows(batch_data)
|
|
||||||
|
|
||||||
processed_records += len(batch_data)
|
|
||||||
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
|
||||||
batch_data = []
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning(f"Skipping record due to error: {str(e)}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# 插入剩余记录
|
|
||||||
if batch_data:
|
|
||||||
# todo:本地调试先注释掉
|
|
||||||
# self.db.execute_batch(
|
|
||||||
# "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
|
||||||
# batch_data
|
|
||||||
# )
|
|
||||||
# todo:本地调试先注释掉
|
|
||||||
|
|
||||||
# 以追加模式写入 CSV
|
|
||||||
with open(self.csv_file_path, 'a', newline='') as f:
|
|
||||||
csv.writer(f).writerows(batch_data)
|
|
||||||
|
|
||||||
processed_records += len(batch_data)
|
|
||||||
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
|
||||||
|
|
||||||
self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
|
||||||
return processed_records
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Failed to process file {file_path}: {str(e)}")
|
self.logger.error(f"Failed to process file {file_path}: {str(e)}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def process_record(self, record, mapping_dict):
|
||||||
|
"""处理单个记录"""
|
||||||
|
try:
|
||||||
|
as_serial = str(record.get('AS_SERIAL', '')).strip()
|
||||||
|
device_id = str(record.get('ID', '')).strip()
|
||||||
|
key = (as_serial, device_id)
|
||||||
|
|
||||||
|
if key not in mapping_dict:
|
||||||
|
return
|
||||||
|
|
||||||
|
dt_str = record.get('DATETIME', '')
|
||||||
|
if not dt_str:
|
||||||
|
return
|
||||||
|
|
||||||
|
record_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
|
||||||
|
dt_str = record.get('DATETIME', '')
|
||||||
|
if not dt_str:
|
||||||
|
return
|
||||||
|
|
||||||
|
record_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
# 检查日期变化
|
||||||
|
record_date = record_time.date()
|
||||||
|
if self.current_date is None:
|
||||||
|
self.current_date = record_date
|
||||||
|
elif record_date != self.current_date:
|
||||||
|
# 日期变化时,处理前一天的缓存数据
|
||||||
|
self.process_day_cache()
|
||||||
|
self.current_date = record_date
|
||||||
|
|
||||||
|
hour_key = record_time.replace(minute=0, second=0, microsecond=0)
|
||||||
|
|
||||||
|
for mapping in mapping_dict[key]:
|
||||||
|
data_field = f"DATA{mapping['seq_id']:02d}"
|
||||||
|
value = record.get(data_field)
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
float_value = float(value)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 创建分组键
|
||||||
|
group_key = (
|
||||||
|
mapping['control_group_controller_id'],
|
||||||
|
mapping['point_id'],
|
||||||
|
hour_key
|
||||||
|
)
|
||||||
|
|
||||||
|
# 添加到分组缓存
|
||||||
|
self.add_to_group_cache(group_key, record_time, float_value, mapping)
|
||||||
|
|
||||||
|
# 检查批次大小
|
||||||
|
if len(self.batch_data) >= self.batch_size:
|
||||||
|
self.flush_batch()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error processing record: {e}")
|
||||||
|
|
||||||
|
def add_to_group_cache(self, group_key, record_time, value, mapping):
|
||||||
|
"""添加记录到分组缓存,并进行取舍处理"""
|
||||||
|
# 获取该分组的缓存
|
||||||
|
group_records = self.group_cache.setdefault(group_key, [])
|
||||||
|
|
||||||
|
# 添加新记录
|
||||||
|
group_records.append({
|
||||||
|
'time': record_time,
|
||||||
|
'value': value,
|
||||||
|
'mapping': mapping
|
||||||
|
})
|
||||||
|
|
||||||
|
# 一个点位一个小时数据不超过5条,如果超过5条记录,立即处理并清空缓存
|
||||||
|
if len(group_records) >= 5:
|
||||||
|
self.process_and_remove_group(group_key)
|
||||||
|
|
||||||
|
def process_day_cache(self):
|
||||||
|
"""处理当前日期的所有缓存数据"""
|
||||||
|
if not self.group_cache:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 处理所有剩余分组
|
||||||
|
group_keys = list(self.group_cache.keys())
|
||||||
|
for group_key in group_keys:
|
||||||
|
self.process_and_remove_group(group_key)
|
||||||
|
|
||||||
|
def process_and_remove_group(self, group_key):
|
||||||
|
"""处理并移除一个分组的数据"""
|
||||||
|
if group_key not in self.group_cache:
|
||||||
|
return
|
||||||
|
|
||||||
|
records = self.group_cache[group_key]
|
||||||
|
|
||||||
|
# 应用取舍规则
|
||||||
|
selected_record = None
|
||||||
|
|
||||||
|
# 1. 优先非零值记录
|
||||||
|
non_zero_records = [r for r in records if r['value'] != 0.0]
|
||||||
|
if non_zero_records:
|
||||||
|
# 2. 取时间最早的记录(数据单调递增,第一条即最早)
|
||||||
|
selected_record = non_zero_records[0]
|
||||||
|
else:
|
||||||
|
# 3. 如果所有值都是0,取第一条记录
|
||||||
|
selected_record = records[0]
|
||||||
|
|
||||||
|
# 添加到批次数据
|
||||||
|
self.add_to_batch(selected_record)
|
||||||
|
|
||||||
|
# 移除该分组缓存
|
||||||
|
del self.group_cache[group_key]
|
||||||
|
|
||||||
|
def add_to_batch(self, record):
|
||||||
|
"""将选定记录添加到批次数据"""
|
||||||
|
formatted_time = record['time'].strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
mapping = record['mapping']
|
||||||
|
|
||||||
|
self.batch_data.append((
|
||||||
|
formatted_time,
|
||||||
|
mapping['control_group_controller_id'],
|
||||||
|
mapping['point_id'],
|
||||||
|
record['value'],
|
||||||
|
mapping['control_group_name'],
|
||||||
|
mapping['control_group_id']
|
||||||
|
))
|
||||||
|
|
||||||
|
def flush_batch(self):
|
||||||
|
"""执行批量插入并清空批次数据"""
|
||||||
|
if not self.batch_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 实际插入数据库
|
||||||
|
self.db.execute_batch(
|
||||||
|
"INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
||||||
|
[(data[0], data[1], data[2], data[3]) for data in self.batch_data]
|
||||||
|
)
|
||||||
|
|
||||||
|
# todo: debug时写入CSV(调试用)
|
||||||
|
# with open(self.csv_file_path, "a", newline="") as f:
|
||||||
|
# writer = csv.writer(f)
|
||||||
|
# writer.writerows(self.batch_data)
|
||||||
|
# todo: debug时写入CSV
|
||||||
|
|
||||||
|
# 更新处理记录数
|
||||||
|
processed_count = len(self.batch_data)
|
||||||
|
self.processed_records += processed_count
|
||||||
|
self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}")
|
||||||
|
|
||||||
|
# 清空批次数据
|
||||||
|
self.batch_data = []
|
||||||
|
|
||||||
|
def final_flush(self):
|
||||||
|
# 处理所有剩余的分组缓存
|
||||||
|
self.process_day_cache()
|
||||||
|
|
||||||
|
# 刷新剩余的批次数据
|
||||||
|
self.flush_batch()
|
||||||
Loading…
Reference in New Issue
Block a user