diff --git a/k8s/job-templates/dbf-import-ctllog-pwc-job.yaml b/k8s/job-templates/dbf-import-ctllog-pwc-job.yaml new file mode 100644 index 0000000..908af42 --- /dev/null +++ b/k8s/job-templates/dbf-import-ctllog-pwc-job.yaml @@ -0,0 +1,59 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: dbf-import-ctllog-pwc-job-{{JOB_ID}} + namespace: {{NAMESPACE}} +spec: + ttlSecondsAfterFinished: 86400 + backoffLimit: 0 + template: + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: {{JOB_HOST_KEY}} + operator: In + values: + - {{JOB_HOST_NAME}} + containers: + - name: importer + image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}} + args: ["--pipeline", "dbf_to_postgres_ctllog-pwc"] + env: + - name: DATA_PVC_MOUNT_PATH + value: "/data" + - name: DBF_INPUT_DIR + value: "/data/dbf-input" + - name: MAPPING_FILE + value: "/data/disney-mapping-elec-v3.xlsx" + - name: DB_HOST + value: "{{DB_HOST}}" + - name: DB_PORT + value: "{{DB_PORT}}" + - name: DB_NAME + value: "{{DB_NAME}}" + - name: DB_USER + value: "{{DB_USER}}" + - name: DB_PASSWORD + value: "{{DB_PASSWORD}}" + - name: BATCH_SIZE + value: "{{BATCH_SIZE}}" + - name: LOG_LEVEL + value: "{{LOG_LEVEL}}" + volumeMounts: + - name: data-volume + mountPath: "/data" + resources: + requests: + cpu: "500m" + memory: "800Mi" + limits: + cpu: "1000m" + memory: "1700Mi" + volumes: + - name: data-volume + persistentVolumeClaim: + claimName: {{DATA_PVC_NAME}} + restartPolicy: Never \ No newline at end of file diff --git a/scripts/deploy-dbf-import-ctllog-pwc-disney.sh b/scripts/deploy-dbf-import-ctllog-pwc-disney.sh new file mode 100644 index 0000000..ec0bade --- /dev/null +++ b/scripts/deploy-dbf-import-ctllog-pwc-disney.sh @@ -0,0 +1,55 @@ +#!/bin/bash +set -e + +# 默认配置 +JOB_ID=$(date +%Y%m%d-%H%M%S) +IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"} +IMAGE_TAG=${IMAGE_TAG:-"dev"} +BATCH_SIZE=${BATCH_SIZE:-"50"} +LOG_LEVEL=${LOG_LEVEL:-"INFO"} +DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"} +JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"} +JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"} +# 数据库配置(使用时需要修改) +DB_HOST=${DB_HOST:-"db"} +DB_PORT=${DB_PORT:-"6432"} +DB_NAME=${DB_NAME:-"idrc"} +DB_USER=${DB_USER:-"teramesh"} +DB_PASSWORD=${DB_PASSWORD:-"2iqTCHwnf75stGBzM8le"} + +NAMESPACE=${NAMESPACE:-"default"} + +# 检查模板文件 +TEMPLATE_FILE="dbf-import-ctllog-pwc-job.yaml" +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Template file not found: $TEMPLATE_FILE" + exit 1 +fi + +# 直接替换模板变量(不使用envsubst) +OUTPUT_FILE="dbf-import-ctllog-pwc-job-${JOB_ID}.yaml" +sed -e "s|{{JOB_ID}}|$JOB_ID|g" \ + -e "s|{{NAMESPACE}}|$NAMESPACE|g" \ + -e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \ + -e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \ + -e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \ + -e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \ + -e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \ + -e "s|{{DB_HOST}}|$DB_HOST|g" \ + -e "s|{{DB_PORT}}|$DB_PORT|g" \ + -e "s|{{DB_NAME}}|$DB_NAME|g" \ + -e "s|{{DB_USER}}|$DB_USER|g" \ + -e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \ + -e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \ + -e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \ + "$TEMPLATE_FILE" > "$OUTPUT_FILE" + +# 部署前验证 +echo "Validating generated YAML..." +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client + +# 部署Job +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" + +echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-pwc-job-${JOB_ID}" +echo "To view logs: kubectl logs job/dbf-import-ctllog-pwc-job-${JOB_ID} -n $NAMESPACE" \ No newline at end of file diff --git a/scripts/deploy-dbf-import-ctllog-pwc-test.sh b/scripts/deploy-dbf-import-ctllog-pwc-test.sh new file mode 100644 index 0000000..0531d53 --- /dev/null +++ b/scripts/deploy-dbf-import-ctllog-pwc-test.sh @@ -0,0 +1,55 @@ +#!/bin/bash +set -e + +# 默认配置 +JOB_ID=$(date +%Y%m%d-%H%M%S) +IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"} +IMAGE_TAG=${IMAGE_TAG:-"dev"} +BATCH_SIZE=${BATCH_SIZE:-"1000"} +LOG_LEVEL=${LOG_LEVEL:-"INFO"} +DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"} +JOB_HOST_KEY=${JOB_HOST_KEY:-"openebs.io/nodeid"} +JOB_HOST_NAME=${JOB_HOST_NAME:-"node008-zina"} +# 数据库配置(使用时需要修改) +DB_HOST=${DB_HOST:-"test-db.db.svc.cluster.local"} +DB_PORT=${DB_PORT:-"6432"} +DB_NAME=${DB_NAME:-"idrc"} +DB_USER=${DB_USER:-"idrc"} +DB_PASSWORD=${DB_PASSWORD:-"a8aa283c1b3ca0bdfe1d2669dd400f3d"} + +NAMESPACE=${NAMESPACE:-"db"} + +# 检查模板文件 +TEMPLATE_FILE="dbf-import-ctllog-pwc-job.yaml" +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Template file not found: $TEMPLATE_FILE" + exit 1 +fi + +# 直接替换模板变量(不使用envsubst) +OUTPUT_FILE="dbf-import-ctllog-pwc-job-${JOB_ID}.yaml" +sed -e "s|{{JOB_ID}}|$JOB_ID|g" \ + -e "s|{{NAMESPACE}}|$NAMESPACE|g" \ + -e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \ + -e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \ + -e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \ + -e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \ + -e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \ + -e "s|{{DB_HOST}}|$DB_HOST|g" \ + -e "s|{{DB_PORT}}|$DB_PORT|g" \ + -e "s|{{DB_NAME}}|$DB_NAME|g" \ + -e "s|{{DB_USER}}|$DB_USER|g" \ + -e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \ + -e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \ + -e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \ + "$TEMPLATE_FILE" > "$OUTPUT_FILE" + +# 部署前验证 +echo "Validating generated YAML..." +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client + +# 部署Job +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" + +echo "Job deployed in namespace $NAMESPACE: dbf-import-ctllog-pwc-job-${JOB_ID}" +echo "To view logs: kubectl logs job/dbf-import-ctllog-pwc-job-${JOB_ID} -n $NAMESPACE" \ No newline at end of file diff --git a/src/pipelines/dbf_to_postgres_ctllog_pwr.py b/src/pipelines/dbf_to_postgres_ctllog_pwr.py new file mode 100644 index 0000000..8161148 --- /dev/null +++ b/src/pipelines/dbf_to_postgres_ctllog_pwr.py @@ -0,0 +1,202 @@ +import os +import csv +import pandas as pd +from dbfread import DBF +from itertools import islice +from datetime import datetime, timedelta +import logging +from core.database import Database +from core.utils import size_to_human_readable, calculate_file_hash +from pipelines.base_pipeline import BasePipeline + +class DbfToPostgresCtllogPwrPipeline(BasePipeline): + def __init__(self, config): + super().__init__(config) + # todo:本地调试打开 + self.data_root = 'D:\disney_test' + self.mapping_file = 'D:\disney_test\disney-mapping-elec-v3.xlsx' + # todo:本地调试打开 + # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + # self.mapping_file = os.getenv('MAPPING_FILE') + self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) + + # todo:debug use + self.csv_file_path = 'D:\disney_test\debug_controller_log_elec.csv' + # 初始化CSV文件 + if not os.path.exists(self.csv_file_path): + with open(self.csv_file_path, 'w') as f: + csv.writer(f).writerow( + ['created', 'control_group_controller_id', 'point_id', 'real_value']) + # todo:debug use + + self.db = None + self.group_cache = {} + self.batch_size = int(os.getenv('BATCH_SIZE', 1000)) + self.batch_data = [] + self.processed_records = 0 + self.current_date = None # 当前处理的日期 + + + def validate_config(self): + # 确保目录存在 + if not os.path.exists(self.dbf_dir): + raise ValueError(f"DBF directory not found: {self.dbf_dir}") + + # 如果有映射文件,验证存在 + if self.mapping_file and not os.path.exists(self.mapping_file): + self.logger.warning(f"Mapping file not found: {self.mapping_file}") + self.mapping_file = None + + def load_mapping(self): + """加载映射关系""" + if not self.mapping_file: + self.logger.info("No mapping file provided, using default mapping") + return {} + + try: + self.logger.info(f"Loading mapping from {self.mapping_file}") + mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping") + + # 清理数据 - 只保留有ACCOUNT值的行 + mapping_df = mapping_df.dropna(subset=['ACCOUNT']) + + # 创建映射字典 {ACCOUNT: [mapping_entries]} + mapping_dict = {} + for _, row in mapping_df.iterrows(): + account = str(row['ACCOUNT']) + if account not in mapping_dict: + mapping_dict[account] = [] + + mapping_dict[account].append({ + 'control_group_controller_id': int(row['control_group_controller_id']), + 'controller_point_id': int(row['controller_point_id']), + 'data_field_name': row['data_field_name'] + }) + + self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") + return mapping_dict + + except Exception as e: + self.logger.error(f"Failed to load mapping: {str(e)}") + return {} + + + def process(self): + # 加载映射关系 + mapping_dict = self.load_mapping() + + # 连接数据库 + # todo:本地调试时关闭 + # db_config = self.config.get_database_config() + # self.db = Database(**db_config) + # todo:本地调试时关闭 + + # 处理文件 + total_processed = 0 + for filename in os.listdir(self.dbf_dir): + if filename.casefold().endswith('.dbf'): + file_path = os.path.join(self.dbf_dir, filename) + processed = self.process_file(file_path, mapping_dict) + total_processed += processed + self.logger.info(f"Processed {processed} records from {filename}") + + # 关闭数据库连接 + # todo:本地调试时关闭 + # self.db.disconnect() + # todo:本地调试时关闭 + return total_processed + + def process_file(self, file_path, mapping_dict): + self.logger.info(f"Processing file: {os.path.basename(file_path)}") + try: + # 获取文件信息 + file_size = os.path.getsize(file_path) + file_hash = calculate_file_hash(file_path) + self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") + dbf_table = DBF(file_path, encoding='utf-8', ignore_missing_memofile=True) + self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}") + + # 处理DBF表 + for record in dbf_table: + self.process_record(record, mapping_dict) + + # 确保所有剩余数据都被处理 + self.final_flush() + + self.logger.info(f"Processed {self.processed_records} records from {os.path.basename(file_path)}") + return self.processed_records + + except Exception as e: + self.logger.error(f"Failed to process file {file_path}: {str(e)}") + return 0 + + def process_record(self, record, mapping_dict): + """处理单个记录""" + try: + """转换单个DBF记录为多行目标格式""" + account = str(record.get('ACCOUNT', '')) + if not account or account not in mapping_dict: + return [] + + transformed = [] + for mapping in mapping_dict[account]: + data_field = mapping['data_field_name'] + + # 根据字段类型选择源字段 + if data_field == "MAXIMUM": + created = record.get('TIMEDATEMA') + real_value = record.get('MAXIMUM') + elif data_field == "TOTALIZE": + created = record.get('TIMEDATE') + real_value = record.get('TOTALIZE') + else: + continue # 跳过不支持的类型 + + # 检查必要字段是否存在 + if created is None or real_value is None: + continue + + created_str = created.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + '+00' + + self.batch_data.append(( + created_str, + mapping['control_group_controller_id'], + mapping['controller_point_id'], + real_value + )) + + if len(self.batch_data) >= self.batch_size: + self.flush_batch() + + except Exception as e: + self.logger.error(f"Error processing record: {e}") + + def flush_batch(self): + """执行批量插入并清空批次数据""" + if not self.batch_data: + return + + # 实际插入数据库 + # self.db.execute_batch( + # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", + # [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + # ) + + # todo: debug时写入CSV(调试用) + with open(self.csv_file_path, "a", newline="") as f: + writer = csv.writer(f) + writer.writerows(self.batch_data) + # todo: debug时写入CSV + + # 更新处理记录数 + processed_count = len(self.batch_data) + self.processed_records += processed_count + self.logger.info(f"Inserted {processed_count} records, total {self.processed_records}") + + # 清空批次数据 + self.batch_data = [] + + def final_flush(self): + # 刷新剩余的批次数据 + self.flush_batch() +