增加电表历史数据导入的脚本
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
mingsheng.li 2025-08-05 14:17:32 +08:00
parent 134e31a5bb
commit af11bddb60
4 changed files with 371 additions and 0 deletions

View File

@ -0,0 +1,59 @@
apiVersion: batch/v1
kind: Job
metadata:
name: dbf-import-ctllog-pwc-job-{{JOB_ID}}
namespace: {{NAMESPACE}}
spec:
ttlSecondsAfterFinished: 86400
backoffLimit: 0
template:
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: {{JOB_HOST_KEY}}
operator: In
values:
- {{JOB_HOST_NAME}}
containers:
- name: importer
image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}}
args: ["--pipeline", "dbf_to_postgres_ctllog-pwc"]
env:
- name: DATA_PVC_MOUNT_PATH
value: "/data"
- name: DBF_INPUT_DIR
value: "/data/dbf-input"
- name: MAPPING_FILE
value: "/data/disney-mapping-elec-v3.xlsx"
- name: DB_HOST
value: "{{DB_HOST}}"
- name: DB_PORT
value: "{{DB_PORT}}"
- name: DB_NAME
value: "{{DB_NAME}}"
- name: DB_USER
value: "{{DB_USER}}"
- name: DB_PASSWORD
value: "{{DB_PASSWORD}}"
- name: BATCH_SIZE
value: "{{BATCH_SIZE}}"
- name: LOG_LEVEL
value: "{{LOG_LEVEL}}"
volumeMounts:
- name: data-volume
mountPath: "/data"
resources:
requests:
cpu: "500m"
memory: "800Mi"
limits:
cpu: "1000m"
memory: "1700Mi"
volumes:
- name: data-volume
persistentVolumeClaim:
claimName: {{DATA_PVC_NAME}}
restartPolicy: Never

View File

@ -0,0 +1,55 @@
#!/bin/bash
set -e
# 默认配置
JOB_ID=$(date +%Y%m%d-%H%M%S)
IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"}
IMAGE_TAG=${IMAGE_TAG:-"dev"}
BATCH_SIZE=${BATCH_SIZE:-"50"}
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"}
JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"}
# 数据库配置(使用时需要修改)
DB_HOST=${DB_HOST:-"db"}
DB_PORT=${DB_PORT:-"6432"}
DB_NAME=${DB_NAME:-"idrc"}
DB_USER=${DB_USER:-"teramesh"}
DB_PASSWORD=${DB_PASSWORD:-"2iqTCHwnf75stGBzM8le"}
NAMESPACE=${NAMESPACE:-"default"}
# 检查模板文件
TEMPLATE_FILE="dbf-import-ctllog-pwc-job.yaml"
if [ ! -f "$TEMPLATE_FILE" ]; then
echo "Template file not found: $TEMPLATE_FILE"
exit 1
fi
# 直接替换模板变量不使用envsubst
OUTPUT_FILE="dbf-import-ctllog-pwc-job-${JOB_ID}.yaml"
sed -e "s|{{JOB_ID}}|$JOB_ID|g" \
-e "s|{{NAMESPACE}}|$NAMESPACE|g" \
-e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \
-e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \
-e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \
-e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \
-e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \
-e "s|{{DB_HOST}}|$DB_HOST|g" \
-e "s|{{DB_PORT}}|$DB_PORT|g" \
-e "s|{{DB_NAME}}|$DB_NAME|g" \
-e "s|{{DB_USER}}|$DB_USER|g" \
-e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \
-e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \
-e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \
"$TEMPLATE_FILE" > "$OUTPUT_FILE"
# 部署前验证
echo "Validating generated YAML..."
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client
# 部署Job
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE"
echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-pwc-job-${JOB_ID}"
echo "To view logs: kubectl logs job/dbf-import-ctllog-pwc-job-${JOB_ID} -n $NAMESPACE"

View File

@ -0,0 +1,55 @@
#!/bin/bash
set -e
# 默认配置
JOB_ID=$(date +%Y%m%d-%H%M%S)
IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"}
IMAGE_TAG=${IMAGE_TAG:-"dev"}
BATCH_SIZE=${BATCH_SIZE:-"1000"}
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
JOB_HOST_KEY=${JOB_HOST_KEY:-"openebs.io/nodeid"}
JOB_HOST_NAME=${JOB_HOST_NAME:-"node008-zina"}
# 数据库配置(使用时需要修改)
DB_HOST=${DB_HOST:-"test-db.db.svc.cluster.local"}
DB_PORT=${DB_PORT:-"6432"}
DB_NAME=${DB_NAME:-"idrc"}
DB_USER=${DB_USER:-"idrc"}
DB_PASSWORD=${DB_PASSWORD:-"a8aa283c1b3ca0bdfe1d2669dd400f3d"}
NAMESPACE=${NAMESPACE:-"db"}
# 检查模板文件
TEMPLATE_FILE="dbf-import-ctllog-pwc-job.yaml"
if [ ! -f "$TEMPLATE_FILE" ]; then
echo "Template file not found: $TEMPLATE_FILE"
exit 1
fi
# 直接替换模板变量不使用envsubst
OUTPUT_FILE="dbf-import-ctllog-pwc-job-${JOB_ID}.yaml"
sed -e "s|{{JOB_ID}}|$JOB_ID|g" \
-e "s|{{NAMESPACE}}|$NAMESPACE|g" \
-e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \
-e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \
-e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \
-e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \
-e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \
-e "s|{{DB_HOST}}|$DB_HOST|g" \
-e "s|{{DB_PORT}}|$DB_PORT|g" \
-e "s|{{DB_NAME}}|$DB_NAME|g" \
-e "s|{{DB_USER}}|$DB_USER|g" \
-e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \
-e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \
-e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \
"$TEMPLATE_FILE" > "$OUTPUT_FILE"
# 部署前验证
echo "Validating generated YAML..."
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client
# 部署Job
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE"
echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-pwc-job-${JOB_ID}"
echo "To view logs: kubectl logs job/dbf-import-ctllog-pwc-job-${JOB_ID} -n $NAMESPACE"

View File

@ -0,0 +1,202 @@
import os
import csv
import pandas as pd
from dbfread import DBF
from itertools import islice
from datetime import datetime, timedelta
import logging
from core.database import Database
from core.utils import size_to_human_readable, calculate_file_hash
from pipelines.base_pipeline import BasePipeline
class DbfToPostgresCtllogPwrPipeline(BasePipeline):
def __init__(self, config):
super().__init__(config)
# todo本地调试打开
self.data_root = 'D:\disney_test'
self.mapping_file = 'D:\disney_test\disney-mapping-elec-v3.xlsx'
# todo本地调试打开
# self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
# self.mapping_file = os.getenv('MAPPING_FILE')
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
# tododebug use
self.csv_file_path = 'D:\disney_test\debug_controller_log_elec.csv'
# 初始化CSV文件
if not os.path.exists(self.csv_file_path):
with open(self.csv_file_path, 'w') as f:
csv.writer(f).writerow(
['created', 'control_group_controller_id', 'point_id', 'real_value'])
# tododebug use
self.db = None
self.group_cache = {}
self.batch_size = int(os.getenv('BATCH_SIZE', 1000))
self.batch_data = []
self.processed_records = 0
self.current_date = None # 当前处理的日期
def validate_config(self):
# 确保目录存在
if not os.path.exists(self.dbf_dir):
raise ValueError(f"DBF directory not found: {self.dbf_dir}")
# 如果有映射文件,验证存在
if self.mapping_file and not os.path.exists(self.mapping_file):
self.logger.warning(f"Mapping file not found: {self.mapping_file}")
self.mapping_file = None
def load_mapping(self):
"""加载映射关系"""
if not self.mapping_file:
self.logger.info("No mapping file provided, using default mapping")
return {}
try:
self.logger.info(f"Loading mapping from {self.mapping_file}")
mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping")
# 清理数据 - 只保留有ACCOUNT值的行
mapping_df = mapping_df.dropna(subset=['ACCOUNT'])
# 创建映射字典 {ACCOUNT: [mapping_entries]}
mapping_dict = {}
for _, row in mapping_df.iterrows():
account = str(row['ACCOUNT'])
if account not in mapping_dict:
mapping_dict[account] = []
mapping_dict[account].append({
'control_group_controller_id': int(row['control_group_controller_id']),
'controller_point_id': int(row['controller_point_id']),
'data_field_name': row['data_field_name']
})
self.logger.info(f"Loaded {len(mapping_dict)} mapping entries")
return mapping_dict
except Exception as e:
self.logger.error(f"Failed to load mapping: {str(e)}")
return {}
def process(self):
# 加载映射关系
mapping_dict = self.load_mapping()
# 连接数据库
# todo本地调试时关闭
# db_config = self.config.get_database_config()
# self.db = Database(**db_config)
# todo本地调试时关闭
# 处理文件
total_processed = 0
for filename in os.listdir(self.dbf_dir):
if filename.casefold().endswith('.dbf'):
file_path = os.path.join(self.dbf_dir, filename)
processed = self.process_file(file_path, mapping_dict)
total_processed += processed
self.logger.info(f"Processed {processed} records from {filename}")
# 关闭数据库连接
# todo本地调试时关闭
# self.db.disconnect()
# todo本地调试时关闭
return total_processed
def process_file(self, file_path, mapping_dict):
self.logger.info(f"Processing file: {os.path.basename(file_path)}")
try:
# 获取文件信息
file_size = os.path.getsize(file_path)
file_hash = calculate_file_hash(file_path)
self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}")
dbf_table = DBF(file_path, encoding='utf-8', ignore_missing_memofile=True)
self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}")
# 处理DBF表
for record in dbf_table:
self.process_record(record, mapping_dict)
# 确保所有剩余数据都被处理
self.final_flush()
self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}")
return self.processed_records
except Exception as e:
self.logger.error(f"Failed to process file {file_path}: {str(e)}")
return 0
def process_record(self, record, mapping_dict):
"""处理单个记录"""
try:
"""转换单个DBF记录为多行目标格式"""
account = str(record.get('ACCOUNT', ''))
if not account or account not in mapping_dict:
return []
transformed = []
for mapping in mapping_dict[account]:
data_field = mapping['data_field_name']
# 根据字段类型选择源字段
if data_field == "MAXIMUM":
created = record.get('TIMEDATEMA')
real_value = record.get('MAXIMUM')
elif data_field == "TOTALIZE":
created = record.get('TIMEDATE')
real_value = record.get('TOTALIZE')
else:
continue # 跳过不支持的类型
# 检查必要字段是否存在
if created is None or real_value is None:
continue
created_str = created.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + '+00'
self.batch_data.append((
created_str,
mapping['control_group_controller_id'],
mapping['controller_point_id'],
real_value
))
if len(self.batch_data) >= self.batch_size:
self.flush_batch()
except Exception as e:
self.logger.error(f"Error processing record: {e}")
def flush_batch(self):
"""执行批量插入并清空批次数据"""
if not self.batch_data:
return
# 实际插入数据库
# self.db.execute_batch(
# "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
# [(data[0], data[1], data[2], data[3]) for data in self.batch_data]
# )
# todo debug时写入CSV调试用
with open(self.csv_file_path, "a", newline="") as f:
writer = csv.writer(f)
writer.writerows(self.batch_data)
# todo debug时写入CSV
# 更新处理记录数
processed_count = len(self.batch_data)
self.processed_records += processed_count
self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}")
# 清空批次数据
self.batch_data = []
def final_flush(self):
# 刷新剩余的批次数据
self.flush_batch()