初始合入databridge,用于后续数据的导出导入

This commit is contained in:
2025-07-25 11:54:20 +08:00
commit 71045d7531
22 changed files with 815 additions and 0 deletions

23
src/core/config.py Normal file
View File

@@ -0,0 +1,23 @@
import os
import logging
class Config:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
def get_path(self, *args):
"""构建绝对路径"""
return os.path.join(self.data_root, *args)
def get_database_config(self):
"""获取数据库配置"""
return {
'host': os.getenv('DB_HOST'),
'port': os.getenv('DB_PORT', '5432'),
'dbname': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}

51
src/core/database.py Normal file
View File

@@ -0,0 +1,51 @@
import psycopg2
import logging
from psycopg2 import sql
from psycopg2.extras import execute_batch
class Database:
def __init__(self, host, port, dbname, user, password):
self.conn = None
self.logger = logging.getLogger(self.__class__.__name__)
self.db_config = {
'host': host,
'port': port,
'dbname': dbname,
'user': user,
'password': password
}
self.connect()
def connect(self):
try:
self.conn = psycopg2.connect(**self.db_config)
self.logger.info(f"Connected to database: {self.db_config['dbname']}@{self.db_config['host']}")
except Exception as e:
self.logger.error(f"Database connection failed: {str(e)}")
raise
def disconnect(self):
if self.conn:
self.conn.close()
self.logger.info("Database connection closed")
def execute_batch(self, query, data_list):
try:
with self.conn.cursor() as cursor:
execute_batch(cursor, query, data_list)
self.conn.commit()
self.logger.debug(f"Inserted {len(data_list)} records")
return len(data_list)
except Exception as e:
self.logger.error(f"Batch execution failed: {str(e)}")
self.conn.rollback()
raise
def table_exists(self, table_name):
with self.conn.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = %s)",
(table_name,)
)
return cursor.fetchone()[0]

0
src/core/init.py Normal file
View File

51
src/core/utils.py Normal file
View File

@@ -0,0 +1,51 @@
import os
import logging
import hashlib
from datetime import datetime
def setup_logging(level="INFO"):
"""配置日志"""
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
def generate_job_id(prefix=""):
"""生成唯一Job ID"""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
return f"{prefix}{timestamp}"
def validate_directory(path, create=True):
"""验证目录是否存在,可选创建"""
if not os.path.exists(path):
if create:
os.makedirs(path, exist_ok=True)
logging.info(f"Created directory: {path}")
else:
raise ValueError(f"Directory not found: {path}")
return True
def calculate_file_hash(filepath, algorithm="md5"):
"""计算文件哈希值"""
hasher = hashlib.new(algorithm)
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
return hasher.hexdigest()
def size_to_human_readable(size_bytes):
"""字节数转可读格式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def parse_env_vars(prefix="DB_"):
"""解析带前缀的环境变量"""
return {
k[len(prefix):].lower(): v
for k, v in os.environ.items()
if k.startswith(prefix)
}

54
src/main.py Normal file
View File

@@ -0,0 +1,54 @@
import argparse
import importlib
import logging
import os
from core.config import Config
from core.utils import setup_logging
def main():
# 设置命令行参数
parser = argparse.ArgumentParser(description='Databridge Data Pipeline')
parser.add_argument('--pipeline', type=str, required=True,
help='Pipeline type to execute')
args = parser.parse_args()
# 配置日志
log_level = os.getenv('LOG_LEVEL', 'INFO')
setup_logging(log_level)
logger = logging.getLogger(__name__)
try:
# 动态加载管道模块
try:
pipeline_module = importlib.import_module(f'pipelines.{args.pipeline}')
except ImportError as e:
logger.error(f"Pipeline module not found: {args.pipeline}")
raise
# 获取管道类 (命名约定: PipelineName + "Pipeline")
pipeline_class_name = args.pipeline.replace('_', ' ').title().replace(' ', '') + 'Pipeline'
if not hasattr(pipeline_module, pipeline_class_name):
logger.error(f"Pipeline class not found: {pipeline_class_name}")
raise ImportError(f"Class {pipeline_class_name} not found in {args.pipeline} module")
PipelineClass = getattr(pipeline_module, pipeline_class_name)
# 创建配置和管道实例
config = Config()
pipeline = PipelineClass(config)
# 运行管道
pipeline.run()
logger.info("Pipeline completed successfully")
exit(0)
except Exception as e:
logger.error(f"Pipeline execution failed: {str(e)}", exc_info=True)
exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,30 @@
import abc
import logging
class BasePipeline(metaclass=abc.ABCMeta):
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
@abc.abstractmethod
def validate_config(self):
"""验证管道配置"""
pass
@abc.abstractmethod
def process(self):
"""执行管道处理"""
pass
def run(self):
"""运行管道"""
try:
self.logger.info(f"Starting {self.__class__.__name__} pipeline")
self.validate_config()
result = self.process()
self.logger.info(f"Pipeline completed: {self.__class__.__name__}")
return result
except Exception as e:
self.logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
raise

View File

@@ -0,0 +1,58 @@
import os
import csv
import logging
from core.database import Database
from core.utils import size_to_human_readable
from base_pipeline import BasePipeline
class CSVExportPipeline(BasePipeline):
def __init__(self, config):
super().__init__(config)
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
self.output_dir = os.getenv('OUTPUT_DIR', os.path.join(self.data_root, 'output'))
self.query = os.getenv('EXPORT_QUERY', 'SELECT * FROM source_table')
def validate_config(self):
# 确保输出目录存在
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir, exist_ok=True)
self.logger.info(f"Created output directory: {self.output_dir}")
def process(self):
# 连接数据库
db_config = self.config.get_database_config()
db = Database(**db_config)
# 执行查询并导出
output_file = os.path.join(self.output_dir, f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
self.logger.info(f"Exporting data to: {output_file}")
with open(output_file, 'w', newline='') as csvfile:
writer = None
row_count = 0
with db.conn.cursor() as cursor:
cursor.execute(self.query)
# 获取列名
column_names = [desc[0] for desc in cursor.description]
writer = csv.DictWriter(csvfile, fieldnames=column_names)
writer.writeheader()
# 写入数据
for row in cursor:
row_dict = dict(zip(column_names, row))
writer.writerow(row_dict)
row_count += 1
if row_count % 1000 == 0:
self.logger.info(f"Exported {row_count} rows...")
# 关闭数据库连接
db.disconnect()
# 记录结果
file_size = os.path.getsize(output_file)
self.logger.info(f"Export completed: {row_count} rows, {size_to_human_readable(file_size)}")
return row_count

View File

@@ -0,0 +1,185 @@
import os
import pandas as pd
from dbfread import DBF
from itertools import islice
from datetime import datetime, timedelta
import logging
from core.database import Database
from core.utils import size_to_human_readable, calculate_file_hash
from pipelines.base_pipeline import BasePipeline
class DbfToPostgresPipeline(BasePipeline):
def __init__(self, config):
super().__init__(config)
# todo本地调试打开
# self.data_root = 'D:\disney_test'
# self.mapping_file = 'D:\disney_test\disney-mapping.xlsx'
# todo本地调试打开
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
self.mapping_file = os.getenv('MAPPING_FILE')
self.dbf_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
self.db = None
def validate_config(self):
# 确保目录存在
if not os.path.exists(self.dbf_dir):
raise ValueError(f"DBF directory not found: {self.dbf_dir}")
# 如果有映射文件,验证存在
if self.mapping_file and not os.path.exists(self.mapping_file):
self.logger.warning(f"Mapping file not found: {self.mapping_file}")
self.mapping_file = None
def load_mapping(self):
"""加载映射关系"""
if not self.mapping_file:
self.logger.info("No mapping file provided, using default mapping")
return {}
try:
self.logger.info(f"Loading mapping from {self.mapping_file}")
mapping_df = pd.read_excel(self.mapping_file, sheet_name="Mapping")
# 清理数据
mapping_df = mapping_df.dropna(subset=['AS_SERIAL', 'ID', 'data_field_sequence_id'])
mapping_df = mapping_df[['AS_SERIAL', 'ID', 'data_field_sequence_id',
'device_instance_capability_id', 'device_capability_point_id']]
# 创建映射字典
mapping_dict = {}
for _, row in mapping_df.iterrows():
key = (str(row['AS_SERIAL']), str(row['ID']))
if key not in mapping_dict:
mapping_dict[key] = []
mapping_dict[key].append({
'seq_id': int(row['data_field_sequence_id']),
'cap_id': int(row['device_instance_capability_id']),
'point_id': int(row['device_capability_point_id'])
})
self.logger.info(f"Loaded {len(mapping_dict)} mapping entries")
return mapping_dict
except Exception as e:
self.logger.error(f"Failed to load mapping: {str(e)}")
return {}
def process(self):
# 加载映射关系
mapping_dict = self.load_mapping()
# 连接数据库
# todo本地调试时打开
db_config = self.config.get_database_config()
self.db = Database(**db_config)
# todo本地调试时打开
# 处理文件
total_processed = 0
for filename in os.listdir(self.dbf_dir):
if filename.casefold().endswith('.dbf'):
file_path = os.path.join(self.dbf_dir, filename)
processed = self.process_file(file_path, mapping_dict)
total_processed += processed
self.logger.info(f"Processed {processed} records from {filename}")
# 关闭数据库连接
# todo本地调试时打开
self.db.disconnect()
# todo本地调试时打开
return total_processed
def process_file(self, file_path, mapping_dict):
self.logger.info(f"Processing file: {os.path.basename(file_path)}")
try:
# 获取文件信息
file_size = os.path.getsize(file_path)
file_hash = calculate_file_hash(file_path)
self.logger.info(f"File info: Size={size_to_human_readable(file_size)}, Hash={file_hash}")
dbf_table = DBF(file_path, encoding='utf-8')
batch_data = []
processed_records = 0
batch_size = int(os.getenv('BATCH_SIZE', 1000))
self.logger.info(f"the DBF file: {os.path.basename(file_path)} have record: #{len(dbf_table.records)}")
# 分片读取是个大坑,不能分片
# dbf_table = DBF(file_path, load=False, encoding='utf-8')
# chunk_idx = 0
# while True:
# chunk = list(islice(dbf_table._iter_records(), 100000))
# if not chunk: # 读完了
# break
# chunk_idx += 1
# # 处理这十万行
# self.logger.info(f"Handle chunk: #{chunk_idx} of file: {os.path.basename(file_path)}")
for record in dbf_table:
try:
as_serial = str(record.get('AS_SERIAL', '')).strip()
device_id = str(record.get('ID', '')).strip()
key = (as_serial, device_id)
# 跳过没有映射的记录
if key not in mapping_dict:
continue
# 处理时间 (+8小时)
dt_str = record.get('DATETIME', '')
if not dt_str:
continue
original_time = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
target_time = original_time + timedelta(hours=8)
formatted_time = target_time.strftime('%Y-%m-%d %H:%M:%S+00')
# 处理每个映射
for mapping in mapping_dict[key]:
data_field = f"DATA{mapping['seq_id']:02d}"
value = record.get(data_field)
if value is None:
continue
batch_data.append((
formatted_time,
mapping['cap_id'],
mapping['point_id'],
float(value)
))
# 批量插入
if len(batch_data) >= batch_size:
# todo本地调试先注释掉
self.db.execute_batch(
"INSERT INTO target_table (t, device_instance_capability_id, point_id, value) VALUES (%s, %s, %s, %s)",
batch_data
)
# todo本地调试先注释掉
processed_records += len(batch_data)
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
batch_data = []
except Exception as e:
self.logger.warning(f"Skipping record due to error: {str(e)}")
continue
# 插入剩余记录
if batch_data:
# todo本地调试先注释掉
self.db.execute_batch(
"INSERT INTO target_table (t, device_instance_capability_id, point_id, value) VALUES (%s, %s, %s, %s)",
batch_data
)
# todo本地调试先注释掉
processed_records += len(batch_data)
self.logger.debug(f"Processed {processed_records} records from {os.path.basename(file_path)}")
self.logger.info(f"Processed {processed_records} records from {os.path.basename(file_path)}")
return processed_records
except Exception as e:
self.logger.error(f"Failed to process file {file_path}: {str(e)}")
return 0

0
src/pipelines/init.py Normal file
View File