commit 71045d753103b6fd28a10cefc97ce9661370a2e2 Author: mingsheng.li Date: Fri Jul 25 11:54:20 2025 +0800 初始合入databridge,用于后续数据的导出导入 diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..9fcd73f --- /dev/null +++ b/.drone.yml @@ -0,0 +1,42 @@ +kind: pipeline +type: docker +name: build_dev + +trigger: + event: + - promote + target: + - dev + +clone: + disable: true + +steps: + +- name: clone + image: harbor.dc.teramesh.cn/library/bitnami/git:latest + pull: if-not-exists + commands: + - git clone $DRONE_REPO_LINK . + - git checkout $DRONE_COMMIT + +- name: build_dev + image: harbor.dc.teramesh.cn/library/moby/buildkit:master + pull: if-not-exists + environment: + PIP_INDEX_URL: + from_secret: PIP_INDEX_URL + HARBOR_DOCKER_AUTH: + from_secret: HARBOR_DOCKER_AUTH + commands: + - mkdir -p ~/.docker + - echo "$HARBOR_DOCKER_AUTH" > ~/.docker/config.json + - > + buildctl + --addr tcp://buildkitd:1234 + build + --frontend=dockerfile.v0 + --local context=app + --local dockerfile=app + --opt build-arg:PIP_INDEX_URL=$PIP_INDEX_URL + --output type=image,"name=harbor.dc.teramesh.cn/idrc/tools/databridge:dev",push=true \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fed8031 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Python +__pycache__/ +*.pyc +*.pyo +*.pyd +.python-version + +# Environment files +.env +.venv/ + +# OS +.DS_Store +Thumbs.db + +# IDE +.vscode/ +.idea/ + +*.egg-info/ +build/ +dist/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d3ef484 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM harbor.dc.teramesh.cn/library/deploybase-python:3.11-slim + +# 设置环境变量 +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 + +# 安装系统依赖 +RUN apt-get update && apt-get install -y \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# 设置工作目录 +WORKDIR /app + +# 复制依赖文件并安装 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY . . + +# 设置入口点 +ENTRYPOINT ["python", "src/main.py"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0cb8832 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +# Databridge - Data Pipeline System + +Databridge is a flexible data pipeline system for processing and transferring data between various sources and destinations. It is designed to run on Kubernetes and supports multiple data processing pipelines. + +## Features + +- **DBF to PostgreSQL**: Import data from DBF files to PostgreSQL +- **CSV Export**: Export data from PostgreSQL to CSV files +- **Kubernetes Native**: Designed to run as Kubernetes Jobs +- **ZFS Storage**: Supports ZFS persistent storage +- **Parameterized Pipelines**: Flexible configuration via environment variables + +## Getting Started + +### Prerequisites + +- Kubernetes cluster +- ZFS storage provisioner +- PostgreSQL database + +### Installation + +1. **Deploy Storage Infrastructure**: + ```bash + kubectl apply -f k8s/pv.yaml + kubectl apply -f k8s/pvc.yaml + kubectl apply -f k8s/rbac.yaml \ No newline at end of file diff --git a/k8s/job-templates/csv-export-job.yaml b/k8s/job-templates/csv-export-job.yaml new file mode 100644 index 0000000..a2cfe9e --- /dev/null +++ b/k8s/job-templates/csv-export-job.yaml @@ -0,0 +1,40 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: csv-export-job-{{JOB_ID}} +spec: + ttlSecondsAfterFinished: 86400 + template: + spec: + containers: + - name: exporter + image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}} + args: ["--pipeline", "csv_export"] + env: + - name: DATA_PVC_MOUNT_PATH + value: "/data" + - name: OUTPUT_DIR + value: "/data/csv-exports" + - name: EXPORT_QUERY + value: "{{EXPORT_QUERY}}" + - name: DB_HOST + value: "{{DB_HOST}}" + - name: DB_PORT + value: "{{DB_PORT}}" + - name: DB_NAME + value: "{{DB_NAME}}" + - name: DB_USER + value: "{{DB_USER}}" + - name: DB_PASSWORD + value: "{{DB_PASSWORD}}" + - name: LOG_LEVEL + value: "{{LOG_LEVEL}}" + volumeMounts: + - name: data-volume + mountPath: "/data" + volumes: + - name: data-volume + persistentVolumeClaim: + claimName: {{DATA_PVC_NAME}} + restartPolicy: Never + backoffLimit: 1 \ No newline at end of file diff --git a/k8s/job-templates/dbf-import-job.yaml b/k8s/job-templates/dbf-import-job.yaml new file mode 100644 index 0000000..79691b8 --- /dev/null +++ b/k8s/job-templates/dbf-import-job.yaml @@ -0,0 +1,42 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: dbf-import-job-{{JOB_ID}} +spec: + ttlSecondsAfterFinished: 86400 + template: + spec: + containers: + - name: importer + image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}} + args: ["--pipeline", "dbf_to_postgres"] + env: + - name: DATA_PVC_MOUNT_PATH + value: "/data" + - name: DBF_INPUT_DIR + value: "/data/dbf-input" + - name: MAPPING_FILE + value: "/data/mapping.xlsx" + - name: DB_HOST + value: "{{DB_HOST}}" + - name: DB_PORT + value: "{{DB_PORT}}" + - name: DB_NAME + value: "{{DB_NAME}}" + - name: DB_USER + value: "{{DB_USER}}" + - name: DB_PASSWORD + value: "{{DB_PASSWORD}}" + - name: BATCH_SIZE + value: "{{BATCH_SIZE}}" + - name: LOG_LEVEL + value: "{{LOG_LEVEL}}" + volumeMounts: + - name: data-volume + mountPath: "/data" + volumes: + - name: data-volume + persistentVolumeClaim: + claimName: {{DATA_PVC_NAME}} + restartPolicy: Never + backoffLimit: 1 \ No newline at end of file diff --git a/k8s/pv.yaml b/k8s/pv.yaml new file mode 100644 index 0000000..0c010e9 --- /dev/null +++ b/k8s/pv.yaml @@ -0,0 +1,22 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: zfs-data-import-export-pv # 任意,但要保证唯一 +spec: + capacity: + storage: 50Gi # 与数据集配额一致即可 + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain # 删除 PV 时保留数据 + storageClassName: "" # 留空,防止动态 Provisioner 抢占 + volumeMode: Filesystem + local: + path: /data/data-import-export # ← 指向节点上 ZFS 挂载目录 + nodeAffinity: + required: + nodeSelectorTerms: + - matchExpressions: + - key: openebs.io/nodeid + operator: In + values: + - node008-zina diff --git a/k8s/pvc.yaml b/k8s/pvc.yaml new file mode 100644 index 0000000..c88a6fe --- /dev/null +++ b/k8s/pvc.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: data-import-export-pvc + namespace: db # 与 Job 同命名空间 +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 50Gi + storageClassName: "" # 必须与 PV 一致 + volumeName: zfs-data-import-export-pv # ← 显式绑定到刚才创建的 PV \ No newline at end of file diff --git a/k8s/rbac.yaml b/k8s/rbac.yaml new file mode 100644 index 0000000..58805f3 --- /dev/null +++ b/k8s/rbac.yaml @@ -0,0 +1,28 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: databridge-role +rules: +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +- apiGroups: [""] + resources: ["pods", "pods/log"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "create"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: databridge-role-binding +subjects: +- kind: ServiceAccount + name: default + namespace: default +roleRef: + kind: Role + name: databridge-role + apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..082aa56 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tool-databridge" +version = "0.1.0" +description = "A data pipeline tool" +authors = [{ name = "Your Name", email = "you@example.com" }] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "pandas==1.5.3", + "numpy==1.24.4", + # 你可以继续添加其他依赖 +] + +[project.scripts] +databridge = "src.main:main" # 可选:命令行入口 + +[tool.setuptools.packages.find] +where = ["src"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6674db6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pandas==1.5.3 +dbfread==2.0.7 +psycopg2-binary==2.9.6 +python-dotenv==1.0.0 +openpyxl>=3.1.0 \ No newline at end of file diff --git a/scripts/deploy-csv-export.sh b/scripts/deploy-csv-export.sh new file mode 100644 index 0000000..88805ca --- /dev/null +++ b/scripts/deploy-csv-export.sh @@ -0,0 +1,37 @@ +#!/bin/bash +set -e + +# 默认配置 +JOB_ID=$(date +%Y%m%d-%H%M%S) +IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/idrc/tools"} +IMAGE_TAG=${IMAGE_TAG:-"latest"} +DATA_PVC_NAME=${DATA_PVC_NAME:-"databridge-data-pvc"} +DB_HOST=${DB_HOST:-"postgres-service"} +DB_PORT=${DB_PORT:-"5432"} +DB_NAME=${DB_NAME:-"energy_data"} +DB_USER=${DB_USER:-"db_user"} +DB_PASSWORD=${DB_PASSWORD:-"db_password"} +EXPORT_QUERY=${EXPORT_QUERY:-"SELECT * FROM source_table"} +LOG_LEVEL=${LOG_LEVEL:-"INFO"} + +# 导出变量用于envsubst +export JOB_ID IMAGE_REPO IMAGE_TAG DATA_PVC_NAME +export DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD +export EXPORT_QUERY LOG_LEVEL + +# 检查模板文件 +TEMPLATE_FILE="../k8s/job-templates/csv-export-job.yaml" +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Template file not found: $TEMPLATE_FILE" + exit 1 +fi + +# 处理模板 +OUTPUT_FILE="../k8s/jobs/csv-export-${JOB_ID}.yaml" +envsubst < "$TEMPLATE_FILE" > "$OUTPUT_FILE" + +# 部署Job +kubectl apply -f "$OUTPUT_FILE" + +echo "Job deployed: databridge-csv-export-${JOB_ID}" +echo "To view logs: kubectl logs job/databridge-csv-export-${JOB_ID}" \ No newline at end of file diff --git a/scripts/deploy-dbf-import.sh b/scripts/deploy-dbf-import.sh new file mode 100644 index 0000000..a549be2 --- /dev/null +++ b/scripts/deploy-dbf-import.sh @@ -0,0 +1,39 @@ +#!/bin/bash +set -e + +# 默认配置 +JOB_ID=$(date +%Y%m%d-%H%M%S) +IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/idrc/tools"} +IMAGE_TAG=${IMAGE_TAG:-"latest"} +BATCH_SIZE=${BATCH_SIZE:-"1000"} +LOG_LEVEL=${LOG_LEVEL:-"INFO"} +DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"} +# todo: 下面参数使用时需要修改 +DB_HOST=${DB_HOST:-"xx-postgres-service"} +DB_PORT=${DB_PORT:-"5432"} +DB_NAME=${DB_NAME:-"xx"} +DB_USER=${DB_USER:-"xx_db_user"} +DB_PASSWORD=${DB_PASSWORD:-"xx_db_password"} + + +# 导出变量用于envsubst +export JOB_ID IMAGE_REPO IMAGE_TAG DATA_PVC_NAME +export DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD +export BATCH_SIZE LOG_LEVEL + +# 检查模板文件 +TEMPLATE_FILE="../k8s/job-templates/dbf-import-job.yaml" +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Template file not found: $TEMPLATE_FILE" + exit 1 +fi + +# 处理模板 +OUTPUT_FILE="../k8s/jobs/dbf-import-job-${JOB_ID}.yaml" +envsubst < "$TEMPLATE_FILE" > "$OUTPUT_FILE" + +# 部署Job +kubectl apply -f "$OUTPUT_FILE" + +echo "Job deployed: dbf-import-job-${JOB_ID}" +echo "To view logs: kubectl logs job/dbf-import-job-${JOB_ID}" \ No newline at end of file diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..c0b75fa --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,23 @@ +import os +import logging + + +class Config: + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + + def get_path(self, *args): + """构建绝对路径""" + return os.path.join(self.data_root, *args) + + def get_database_config(self): + """获取数据库配置""" + + return { + 'host': os.getenv('DB_HOST'), + 'port': os.getenv('DB_PORT', '5432'), + 'dbname': os.getenv('DB_NAME'), + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD') + } diff --git a/src/core/database.py b/src/core/database.py new file mode 100644 index 0000000..f244864 --- /dev/null +++ b/src/core/database.py @@ -0,0 +1,51 @@ +import psycopg2 +import logging +from psycopg2 import sql +from psycopg2.extras import execute_batch + + +class Database: + def __init__(self, host, port, dbname, user, password): + self.conn = None + self.logger = logging.getLogger(self.__class__.__name__) + self.db_config = { + 'host': host, + 'port': port, + 'dbname': dbname, + 'user': user, + 'password': password + } + self.connect() + + def connect(self): + try: + self.conn = psycopg2.connect(**self.db_config) + self.logger.info(f"Connected to database: {self.db_config['dbname']}@{self.db_config['host']}") + except Exception as e: + self.logger.error(f"Database connection failed: {str(e)}") + raise + + def disconnect(self): + if self.conn: + self.conn.close() + self.logger.info("Database connection closed") + + def execute_batch(self, query, data_list): + try: + with self.conn.cursor() as cursor: + execute_batch(cursor, query, data_list) + self.conn.commit() + self.logger.debug(f"Inserted {len(data_list)} records") + return len(data_list) + except Exception as e: + self.logger.error(f"Batch execution failed: {str(e)}") + self.conn.rollback() + raise + + def table_exists(self, table_name): + with self.conn.cursor() as cursor: + cursor.execute( + "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = %s)", + (table_name,) + ) + return cursor.fetchone()[0] \ No newline at end of file diff --git a/src/core/init.py b/src/core/init.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/utils.py b/src/core/utils.py new file mode 100644 index 0000000..af8ec5b --- /dev/null +++ b/src/core/utils.py @@ -0,0 +1,51 @@ +import os +import logging +import hashlib +from datetime import datetime + +def setup_logging(level="INFO"): + """配置日志""" + logging.basicConfig( + level=level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] + ) + +def generate_job_id(prefix=""): + """生成唯一Job ID""" + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + return f"{prefix}{timestamp}" + +def validate_directory(path, create=True): + """验证目录是否存在,可选创建""" + if not os.path.exists(path): + if create: + os.makedirs(path, exist_ok=True) + logging.info(f"Created directory: {path}") + else: + raise ValueError(f"Directory not found: {path}") + return True + +def calculate_file_hash(filepath, algorithm="md5"): + """计算文件哈希值""" + hasher = hashlib.new(algorithm) + with open(filepath, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hasher.update(chunk) + return hasher.hexdigest() + +def size_to_human_readable(size_bytes): + """字节数转可读格式""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.2f} PB" + +def parse_env_vars(prefix="DB_"): + """解析带前缀的环境变量""" + return { + k[len(prefix):].lower(): v + for k, v in os.environ.items() + if k.startswith(prefix) + } \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..007412c --- /dev/null +++ b/src/main.py @@ -0,0 +1,54 @@ +import argparse +import importlib +import logging +import os + +from core.config import Config +from core.utils import setup_logging + + +def main(): + # 设置命令行参数 + parser = argparse.ArgumentParser(description='Databridge Data Pipeline') + parser.add_argument('--pipeline', type=str, required=True, + help='Pipeline type to execute') + args = parser.parse_args() + + # 配置日志 + log_level = os.getenv('LOG_LEVEL', 'INFO') + setup_logging(log_level) + logger = logging.getLogger(__name__) + + try: + # 动态加载管道模块 + try: + pipeline_module = importlib.import_module(f'pipelines.{args.pipeline}') + except ImportError as e: + logger.error(f"Pipeline module not found: {args.pipeline}") + raise + + # 获取管道类 (命名约定: PipelineName + "Pipeline") + pipeline_class_name = args.pipeline.replace('_', ' ').title().replace(' ', '') + 'Pipeline' + if not hasattr(pipeline_module, pipeline_class_name): + logger.error(f"Pipeline class not found: {pipeline_class_name}") + raise ImportError(f"Class {pipeline_class_name} not found in {args.pipeline} module") + + PipelineClass = getattr(pipeline_module, pipeline_class_name) + + # 创建配置和管道实例 + config = Config() + pipeline = PipelineClass(config) + + # 运行管道 + pipeline.run() + + logger.info("Pipeline completed successfully") + exit(0) + + except Exception as e: + logger.error(f"Pipeline execution failed: {str(e)}", exc_info=True) + exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/pipelines/base_pipeline.py b/src/pipelines/base_pipeline.py new file mode 100644 index 0000000..b65f915 --- /dev/null +++ b/src/pipelines/base_pipeline.py @@ -0,0 +1,30 @@ +import abc +import logging + + +class BasePipeline(metaclass=abc.ABCMeta): + def __init__(self, config): + self.config = config + self.logger = logging.getLogger(self.__class__.__name__) + + @abc.abstractmethod + def validate_config(self): + """验证管道配置""" + pass + + @abc.abstractmethod + def process(self): + """执行管道处理""" + pass + + def run(self): + """运行管道""" + try: + self.logger.info(f"Starting {self.__class__.__name__} pipeline") + self.validate_config() + result = self.process() + self.logger.info(f"Pipeline completed: {self.__class__.__name__}") + return result + except Exception as e: + self.logger.error(f"Pipeline failed: {str(e)}", exc_info=True) + raise \ No newline at end of file diff --git a/src/pipelines/csv_export.py b/src/pipelines/csv_export.py new file mode 100644 index 0000000..b075d24 --- /dev/null +++ b/src/pipelines/csv_export.py @@ -0,0 +1,58 @@ +import os +import csv +import logging +from core.database import Database +from core.utils import size_to_human_readable +from base_pipeline import BasePipeline + + +class CSVExportPipeline(BasePipeline): + def __init__(self, config): + super().__init__(config) + self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + self.output_dir = os.getenv('OUTPUT_DIR', os.path.join(self.data_root, 'output')) + self.query = os.getenv('EXPORT_QUERY', 'SELECT * FROM source_table') + + def validate_config(self): + # 确保输出目录存在 + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir, exist_ok=True) + self.logger.info(f"Created output directory: {self.output_dir}") + + def process(self): + # 连接数据库 + db_config = self.config.get_database_config() + db = Database(**db_config) + + # 执行查询并导出 + output_file = os.path.join(self.output_dir, f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + self.logger.info(f"Exporting data to: {output_file}") + + with open(output_file, 'w', newline='') as csvfile: + writer = None + row_count = 0 + + with db.conn.cursor() as cursor: + cursor.execute(self.query) + + # 获取列名 + column_names = [desc[0] for desc in cursor.description] + writer = csv.DictWriter(csvfile, fieldnames=column_names) + writer.writeheader() + + # 写入数据 + for row in cursor: + row_dict = dict(zip(column_names, row)) + writer.writerow(row_dict) + row_count += 1 + + if row_count % 1000 == 0: + self.logger.info(f"Exported {row_count} rows...") + + # 关闭数据库连接 + db.disconnect() + + # 记录结果 + file_size = os.path.getsize(output_file) + self.logger.info(f"Export completed: {row_count} rows, {size_to_human_readable(file_size)}") + return row_count \ No newline at end of file diff --git a/src/pipelines/dbf_to_postgres.py b/src/pipelines/dbf_to_postgres.py new file mode 100644 index 0000000..0c00152 --- /dev/null +++ b/src/pipelines/dbf_to_postgres.py @@ -0,0 +1,185 @@ +import os +import pandas as pd +from dbfread import DBF +from itertools import islice +from datetime import datetime, timedelta +import logging +from core.database import Database +from core.utils import size_to_human_readable, calculate_file_hash +from pipelines.base_pipeline import BasePipeline + + +class DbfToPostgresPipeline(BasePipeline): + def __init__(self, config): + super().__init__(config) + # todo:本地调试打开 + # self.data_root = 'D:\disney_test' + # self.mapping_file = 'D:\disney_test\disney-mapping.xlsx' + # todo:本地调试打开 + self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + self.mapping_file = os.getenv('MAPPING_FILE') + + self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) + + self.db = None + + def validate_config(self): + # 确保目录存在 + if not os.path.exists(self.dbf_dir): + raise ValueError(f"DBF directory not found: {self.dbf_dir}") + + # 如果有映射文件,验证存在 + if self.mapping_file and not os.path.exists(self.mapping_file): + self.logger.warning(f"Mapping file not found: {self.mapping_file}") + self.mapping_file = None + + def load_mapping(self): + """加载映射关系""" + if not self.mapping_file: + self.logger.info("No mapping file provided, using default mapping") + return {} + + try: + self.logger.info(f"Loading mapping from {self.mapping_file}") + mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping") + + # 清理数据 + mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id']) + mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id', + 'device_instance_capability_id', 'device_capability_point_id']] + + # 创建映射字典 + mapping_dict = {} + for _, row in mapping_df.iterrows(): + key = (str(row['AS_SERIAL']), str(row['ID'])) + if key not in mapping_dict: + mapping_dict[key] = [] + + mapping_dict[key].append({ + 'seq_id': int(row['data_field_sequence_id']), + 'cap_id': int(row['device_instance_capability_id']), + 'point_id': int(row['device_capability_point_id']) + }) + + self.logger.info(f"Loaded {len(mapping_dict)} mapping entries") + return mapping_dict + except Exception as e: + self.logger.error(f"Failed to load mapping: {str(e)}") + return {} + + def process(self): + # 加载映射关系 + mapping_dict = self.load_mapping() + + # 连接数据库 + + # todo:本地调试时打开 + db_config = self.config.get_database_config() + self.db = Database(**db_config) + # todo:本地调试时打开 + + # 处理文件 + total_processed = 0 + for filename in os.listdir(self.dbf_dir): + if filename.casefold().endswith('.dbf'): + file_path = os.path.join(self.dbf_dir, filename) + processed = self.process_file(file_path, mapping_dict) + total_processed += processed + self.logger.info(f"Processed {processed} records from {filename}") + + # 关闭数据库连接 + # todo:本地调试时打开 + self.db.disconnect() + # todo:本地调试时打开 + return total_processed + + def process_file(self, file_path, mapping_dict): + self.logger.info(f"Processing file: {os.path.basename(file_path)}") + try: + # 获取文件信息 + file_size = os.path.getsize(file_path) + file_hash = calculate_file_hash(file_path) + self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}") + + dbf_table = DBF(file_path, encoding='utf-8') + batch_data = [] + processed_records = 0 + batch_size = int(os.getenv('BATCH_SIZE', 1000)) + self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}") + + # 分片读取是个大坑,不能分片 + # dbf_table = DBF(file_path, load=False, encoding='utf-8') + # chunk_idx = 0 + # while True: + # chunk = list(islice(dbf_table._iter_records(), 100000)) + # if not chunk: # 读完了 + # break + # chunk_idx += 1 + # # 处理这十万行 + # self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}") + for record in dbf_table: + try: + as_serial = str(record.get('AS_SERIAL', '')).strip() + device_id = str(record.get('ID', '')).strip() + key = (as_serial, device_id) + + # 跳过没有映射的记录 + if key not in mapping_dict: + continue + + # 处理时间 (+8小时) + dt_str = record.get('DATETIME', '') + if not dt_str: + continue + + original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') + target_time = original_time + timedelta(hours=8) + formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00') + + # 处理每个映射 + for mapping in mapping_dict[key]: + data_field = f"DATA{mapping['seq_id']:02d}" + value = record.get(data_field) + if value is None: + continue + + batch_data.append(( + formatted_time, + mapping['cap_id'], + mapping['point_id'], + float(value) + )) + + # 批量插入 + if len(batch_data) >= batch_size: + # todo:本地调试先注释掉 + self.db.execute_batch( + "INSERT INTO target_table (t, device_instance_capability_id, point_id, value) VALUES (%s, %s, %s, %s)", + batch_data + ) + # todo:本地调试先注释掉 + processed_records += len(batch_data) + self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") + batch_data = [] + + except Exception as e: + self.logger.warning(f"Skipping record due to error: {str(e)}") + continue + + # 插入剩余记录 + if batch_data: + # todo:本地调试先注释掉 + self.db.execute_batch( + "INSERT INTO target_table (t, device_instance_capability_id, point_id, value) VALUES (%s, %s, %s, %s)", + batch_data + ) + # todo:本地调试先注释掉 + processed_records += len(batch_data) + self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}") + + self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}") + return processed_records + + except Exception as e: + self.logger.error(f"Failed to process file {file_path}: {str(e)}") + return 0 \ No newline at end of file diff --git a/src/pipelines/init.py b/src/pipelines/init.py new file mode 100644 index 0000000..e69de29