增加导入dbf文件到controller_log表的pipeline
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
73d163fe0f
commit
f1b24c6a7b
59
k8s/job-templates/dbf-import-ctllog-job.yaml
Normal file
59
k8s/job-templates/dbf-import-ctllog-job.yaml
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
apiVersion: batch/v1
|
||||||
|
kind: Job
|
||||||
|
metadata:
|
||||||
|
name: dbf-import-ctllog-job-{{JOB_ID}}
|
||||||
|
namespace: {{NAMESPACE}}
|
||||||
|
spec:
|
||||||
|
ttlSecondsAfterFinished: 86400
|
||||||
|
backoffLimit: 0
|
||||||
|
template:
|
||||||
|
spec:
|
||||||
|
affinity:
|
||||||
|
nodeAffinity:
|
||||||
|
requiredDuringSchedulingIgnoredDuringExecution:
|
||||||
|
nodeSelectorTerms:
|
||||||
|
- matchExpressions:
|
||||||
|
- key: {{JOB_HOST_KEY}}
|
||||||
|
operator: In
|
||||||
|
values:
|
||||||
|
- {{JOB_HOST_NAME}}
|
||||||
|
containers:
|
||||||
|
- name: importer
|
||||||
|
image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}}
|
||||||
|
args: ["--pipeline", "dbf_to_postgres_ctllog"]
|
||||||
|
env:
|
||||||
|
- name: DATA_PVC_MOUNT_PATH
|
||||||
|
value: "/data"
|
||||||
|
- name: DBF_INPUT_DIR
|
||||||
|
value: "/data/dbf-input"
|
||||||
|
- name: MAPPING_FILE
|
||||||
|
value: "/data/disney-mapping-v2.xlsx"
|
||||||
|
- name: DB_HOST
|
||||||
|
value: "{{DB_HOST}}"
|
||||||
|
- name: DB_PORT
|
||||||
|
value: "{{DB_PORT}}"
|
||||||
|
- name: DB_NAME
|
||||||
|
value: "{{DB_NAME}}"
|
||||||
|
- name: DB_USER
|
||||||
|
value: "{{DB_USER}}"
|
||||||
|
- name: DB_PASSWORD
|
||||||
|
value: "{{DB_PASSWORD}}"
|
||||||
|
- name: BATCH_SIZE
|
||||||
|
value: "{{BATCH_SIZE}}"
|
||||||
|
- name: LOG_LEVEL
|
||||||
|
value: "{{LOG_LEVEL}}"
|
||||||
|
volumeMounts:
|
||||||
|
- name: data-volume
|
||||||
|
mountPath: "/data"
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: "500m"
|
||||||
|
memory: "800Mi"
|
||||||
|
limits:
|
||||||
|
cpu: "1000m"
|
||||||
|
memory: "1700Mi"
|
||||||
|
volumes:
|
||||||
|
- name: data-volume
|
||||||
|
persistentVolumeClaim:
|
||||||
|
claimName: {{DATA_PVC_NAME}}
|
||||||
|
restartPolicy: Never
|
||||||
55
scripts/deploy-dbf-import-ctllog-disney.sh
Normal file
55
scripts/deploy-dbf-import-ctllog-disney.sh
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# 默认配置
|
||||||
|
JOB_ID=$(date +%Y%m%d-%H%M%S)
|
||||||
|
IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"}
|
||||||
|
IMAGE_TAG=${IMAGE_TAG:-"dev"}
|
||||||
|
BATCH_SIZE=${BATCH_SIZE:-"50"}
|
||||||
|
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
|
||||||
|
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
|
||||||
|
JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"}
|
||||||
|
JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"}
|
||||||
|
# 数据库配置(使用时需要修改)
|
||||||
|
DB_HOST=${DB_HOST:-"db"}
|
||||||
|
DB_PORT=${DB_PORT:-"6432"}
|
||||||
|
DB_NAME=${DB_NAME:-"idrc"}
|
||||||
|
DB_USER=${DB_USER:-"teramesh"}
|
||||||
|
DB_PASSWORD=${DB_PASSWORD:-"2iqTCHwnf75stGBzM8le"}
|
||||||
|
|
||||||
|
NAMESPACE=${NAMESPACE:-"default"}
|
||||||
|
|
||||||
|
# 检查模板文件
|
||||||
|
TEMPLATE_FILE="dbf-import-ctllog-job.yaml"
|
||||||
|
if [ ! -f "$TEMPLATE_FILE" ]; then
|
||||||
|
echo "Template file not found: $TEMPLATE_FILE"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 直接替换模板变量(不使用envsubst)
|
||||||
|
OUTPUT_FILE="dbf-import-ctllog-job-${JOB_ID}.yaml"
|
||||||
|
sed -e "s|{{JOB_ID}}|$JOB_ID|g" \
|
||||||
|
-e "s|{{NAMESPACE}}|$NAMESPACE|g" \
|
||||||
|
-e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \
|
||||||
|
-e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \
|
||||||
|
-e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \
|
||||||
|
-e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \
|
||||||
|
-e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \
|
||||||
|
-e "s|{{DB_HOST}}|$DB_HOST|g" \
|
||||||
|
-e "s|{{DB_PORT}}|$DB_PORT|g" \
|
||||||
|
-e "s|{{DB_NAME}}|$DB_NAME|g" \
|
||||||
|
-e "s|{{DB_USER}}|$DB_USER|g" \
|
||||||
|
-e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \
|
||||||
|
-e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \
|
||||||
|
-e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \
|
||||||
|
"$TEMPLATE_FILE" > "$OUTPUT_FILE"
|
||||||
|
|
||||||
|
# 部署前验证
|
||||||
|
echo "Validating generated YAML..."
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client
|
||||||
|
|
||||||
|
# 部署Job
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE"
|
||||||
|
|
||||||
|
echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-job-${JOB_ID}"
|
||||||
|
echo "To view logs: kubectl logs job/dbf-import-ctllog-job-${JOB_ID} -n $NAMESPACE"
|
||||||
55
scripts/deploy-dbf-import-ctllog-test.sh
Normal file
55
scripts/deploy-dbf-import-ctllog-test.sh
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# 默认配置
|
||||||
|
JOB_ID=$(date +%Y%m%d-%H%M%S)
|
||||||
|
IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"}
|
||||||
|
IMAGE_TAG=${IMAGE_TAG:-"dev"}
|
||||||
|
BATCH_SIZE=${BATCH_SIZE:-"1000"}
|
||||||
|
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
|
||||||
|
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
|
||||||
|
JOB_HOST_KEY=${JOB_HOST_KEY:-"openebs.io/nodeid"}
|
||||||
|
JOB_HOST_NAME=${JOB_HOST_NAME:-"node008-zina"}
|
||||||
|
# 数据库配置(使用时需要修改)
|
||||||
|
DB_HOST=${DB_HOST:-"test-db.db.svc.cluster.local"}
|
||||||
|
DB_PORT=${DB_PORT:-"6432"}
|
||||||
|
DB_NAME=${DB_NAME:-"idrc"}
|
||||||
|
DB_USER=${DB_USER:-"idrc"}
|
||||||
|
DB_PASSWORD=${DB_PASSWORD:-"a8aa283c1b3ca0bdfe1d2669dd400f3d"}
|
||||||
|
|
||||||
|
NAMESPACE=${NAMESPACE:-"db"}
|
||||||
|
|
||||||
|
# 检查模板文件
|
||||||
|
TEMPLATE_FILE="dbf-import-ctllog-job.yaml"
|
||||||
|
if [ ! -f "$TEMPLATE_FILE" ]; then
|
||||||
|
echo "Template file not found: $TEMPLATE_FILE"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 直接替换模板变量(不使用envsubst)
|
||||||
|
OUTPUT_FILE="dbf-import-ctllog-job-${JOB_ID}.yaml"
|
||||||
|
sed -e "s|{{JOB_ID}}|$JOB_ID|g" \
|
||||||
|
-e "s|{{NAMESPACE}}|$NAMESPACE|g" \
|
||||||
|
-e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \
|
||||||
|
-e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \
|
||||||
|
-e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \
|
||||||
|
-e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \
|
||||||
|
-e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \
|
||||||
|
-e "s|{{DB_HOST}}|$DB_HOST|g" \
|
||||||
|
-e "s|{{DB_PORT}}|$DB_PORT|g" \
|
||||||
|
-e "s|{{DB_NAME}}|$DB_NAME|g" \
|
||||||
|
-e "s|{{DB_USER}}|$DB_USER|g" \
|
||||||
|
-e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \
|
||||||
|
-e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \
|
||||||
|
-e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \
|
||||||
|
"$TEMPLATE_FILE" > "$OUTPUT_FILE"
|
||||||
|
|
||||||
|
# 部署前验证
|
||||||
|
echo "Validating generated YAML..."
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client
|
||||||
|
|
||||||
|
# 部署Job
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE"
|
||||||
|
|
||||||
|
echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-job-${JOB_ID}"
|
||||||
|
echo "To view logs: kubectl logs job/dbf-import-ctllog-job-${JOB_ID} -n $NAMESPACE"
|
||||||
|
|
@ -8,8 +8,8 @@ IMAGE_TAG=${IMAGE_TAG:-"dev"}
|
||||||
BATCH_SIZE=${BATCH_SIZE:-"1000"}
|
BATCH_SIZE=${BATCH_SIZE:-"1000"}
|
||||||
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
|
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
|
||||||
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
|
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
|
||||||
JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"}
|
JOB_HOST_KEY=${JOB_HOST_KEY:-"openebs.io/nodeid"}
|
||||||
JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"}
|
JOB_HOST_NAME=${JOB_HOST_NAME:-"node008-zina"}
|
||||||
# 数据库配置(使用时需要修改)
|
# 数据库配置(使用时需要修改)
|
||||||
DB_HOST=${DB_HOST:-"test-db.db.svc.cluster.local"}
|
DB_HOST=${DB_HOST:-"test-db.db.svc.cluster.local"}
|
||||||
DB_PORT=${DB_PORT:-"6432"}
|
DB_PORT=${DB_PORT:-"6432"}
|
||||||
|
|
|
||||||
184
src/pipelines/dbf_to_postgres_ctllog.py
Normal file
184
src/pipelines/dbf_to_postgres_ctllog.py
Normal file
|
|
@ -0,0 +1,184 @@
|
||||||
|
import os
|
||||||
|
import pandas as pd
|
||||||
|
from dbfread import DBF
|
||||||
|
from itertools import islice
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import logging
|
||||||
|
from core.database import Database
|
||||||
|
from core.utils import size_to_human_readable, calculate_file_hash
|
||||||
|
from pipelines.base_pipeline import BasePipeline
|
||||||
|
|
||||||
|
|
||||||
|
class DbfToPostgresCtllogPipeline(BasePipeline):
|
||||||
|
def __init__(self, config):
|
||||||
|
super().__init__(config)
|
||||||
|
# todo:本地调试打开
|
||||||
|
# self.data_root = 'D:\disney_test'
|
||||||
|
# self.mapping_file = 'D:\disney_test\disney-mapping.xlsx'
|
||||||
|
# todo:本地调试打开
|
||||||
|
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
|
||||||
|
self.mapping_file = os.getenv('MAPPING_FILE')
|
||||||
|
|
||||||
|
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
|
||||||
|
|
||||||
|
self.db = None
|
||||||
|
|
||||||
|
def validate_config(self):
|
||||||
|
# 确保目录存在
|
||||||
|
if not os.path.exists(self.dbf_dir):
|
||||||
|
raise ValueError(f"DBF directory not found: {self.dbf_dir}")
|
||||||
|
|
||||||
|
# 如果有映射文件,验证存在
|
||||||
|
if self.mapping_file and not os.path.exists(self.mapping_file):
|
||||||
|
self.logger.warning(f"Mapping file not found: {self.mapping_file}")
|
||||||
|
self.mapping_file = None
|
||||||
|
|
||||||
|
def load_mapping(self):
|
||||||
|
"""加载映射关系"""
|
||||||
|
if not self.mapping_file:
|
||||||
|
self.logger.info("No mapping file provided, using default mapping")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.logger.info(f"Loading mapping from {self.mapping_file}")
|
||||||
|
mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping")
|
||||||
|
|
||||||
|
# 清理数据
|
||||||
|
mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id'])
|
||||||
|
mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id',
|
||||||
|
'control_group_controller_id', 'controller_point_id']]
|
||||||
|
|
||||||
|
# 创建映射字典
|
||||||
|
mapping_dict = {}
|
||||||
|
for _, row in mapping_df.iterrows():
|
||||||
|
key = (str(row['AS_SERIAL']), str(row['ID']))
|
||||||
|
if key not in mapping_dict:
|
||||||
|
mapping_dict[key] = []
|
||||||
|
|
||||||
|
mapping_dict[key].append({
|
||||||
|
'seq_id': int(row['data_field_sequence_id']),
|
||||||
|
'control_group_controller_id': int(row['control_group_controller_id']),
|
||||||
|
'point_id': int(row['controller_point_id'])
|
||||||
|
})
|
||||||
|
|
||||||
|
self.logger.info(f"Loaded {len(mapping_dict)} mapping entries")
|
||||||
|
return mapping_dict
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to load mapping: {str(e)}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def process(self):
|
||||||
|
# 加载映射关系
|
||||||
|
mapping_dict = self.load_mapping()
|
||||||
|
|
||||||
|
# 连接数据库
|
||||||
|
# todo:本地调试时打开
|
||||||
|
db_config = self.config.get_database_config()
|
||||||
|
self.db = Database(**db_config)
|
||||||
|
# todo:本地调试时打开
|
||||||
|
|
||||||
|
# 处理文件
|
||||||
|
total_processed = 0
|
||||||
|
for filename in os.listdir(self.dbf_dir):
|
||||||
|
if filename.casefold().endswith('.dbf'):
|
||||||
|
file_path = os.path.join(self.dbf_dir, filename)
|
||||||
|
processed = self.process_file(file_path, mapping_dict)
|
||||||
|
total_processed += processed
|
||||||
|
self.logger.info(f"Processed {processed} records from {filename}")
|
||||||
|
|
||||||
|
# 关闭数据库连接
|
||||||
|
# todo:本地调试时打开
|
||||||
|
self.db.disconnect()
|
||||||
|
# todo:本地调试时打开
|
||||||
|
return total_processed
|
||||||
|
|
||||||
|
def process_file(self, file_path, mapping_dict):
|
||||||
|
self.logger.info(f"Processing file: {os.path.basename(file_path)}")
|
||||||
|
try:
|
||||||
|
# 获取文件信息
|
||||||
|
file_size = os.path.getsize(file_path)
|
||||||
|
file_hash = calculate_file_hash(file_path)
|
||||||
|
self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}")
|
||||||
|
|
||||||
|
dbf_table = DBF(file_path, encoding='utf-8')
|
||||||
|
batch_data = []
|
||||||
|
processed_records = 0
|
||||||
|
batch_size = int(os.getenv('BATCH_SIZE', 1000))
|
||||||
|
self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}")
|
||||||
|
|
||||||
|
# 分片读取是个大坑,不能分片
|
||||||
|
# dbf_table = DBF(file_path, load=False, encoding='utf-8')
|
||||||
|
# chunk_idx = 0
|
||||||
|
# while True:
|
||||||
|
# chunk = list(islice(dbf_table._iter_records(), 100000))
|
||||||
|
# if not chunk: # 读完了
|
||||||
|
# break
|
||||||
|
# chunk_idx += 1
|
||||||
|
# # 处理这十万行
|
||||||
|
# self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}")
|
||||||
|
for record in dbf_table:
|
||||||
|
try:
|
||||||
|
as_serial = str(record.get('AS_SERIAL', '')).strip()
|
||||||
|
device_id = str(record.get('ID', '')).strip()
|
||||||
|
key = (as_serial, device_id)
|
||||||
|
|
||||||
|
# 跳过没有映射的记录
|
||||||
|
if key not in mapping_dict:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 处理时间 (+8小时)
|
||||||
|
dt_str = record.get('DATETIME', '')
|
||||||
|
if not dt_str:
|
||||||
|
continue
|
||||||
|
|
||||||
|
original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
|
||||||
|
target_time = original_time + timedelta(hours=8)
|
||||||
|
formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00')
|
||||||
|
|
||||||
|
# 处理每个映射
|
||||||
|
for mapping in mapping_dict[key]:
|
||||||
|
data_field = f"DATA{mapping['seq_id']:02d}"
|
||||||
|
value = record.get(data_field)
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
batch_data.append((
|
||||||
|
formatted_time,
|
||||||
|
mapping['control_group_controller_id'],
|
||||||
|
mapping['point_id'],
|
||||||
|
float(value)
|
||||||
|
))
|
||||||
|
|
||||||
|
# 批量插入
|
||||||
|
if len(batch_data) >= batch_size:
|
||||||
|
# todo:本地调试先注释掉
|
||||||
|
self.db.execute_batch(
|
||||||
|
"INSERT INTO current_states (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
||||||
|
batch_data
|
||||||
|
)
|
||||||
|
# todo:本地调试先注释掉
|
||||||
|
processed_records += len(batch_data)
|
||||||
|
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
||||||
|
batch_data = []
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning(f"Skipping record due to error: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 插入剩余记录
|
||||||
|
if batch_data:
|
||||||
|
# todo:本地调试先注释掉
|
||||||
|
self.db.execute_batch(
|
||||||
|
"INSERT INTO current_states (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
||||||
|
batch_data
|
||||||
|
)
|
||||||
|
# todo:本地调试先注释掉
|
||||||
|
processed_records += len(batch_data)
|
||||||
|
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
||||||
|
|
||||||
|
self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}")
|
||||||
|
return processed_records
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to process file {file_path}: {str(e)}")
|
||||||
|
return 0
|
||||||
Loading…
Reference in New Issue
Block a user