54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
import argparse
|
|
import importlib
|
|
import logging
|
|
import os
|
|
|
|
from core.config import Config
|
|
from core.utils import setup_logging
|
|
|
|
|
|
def main():
|
|
# 设置命令行参数
|
|
parser = argparse.ArgumentParser(description='Databridge Data Pipeline')
|
|
parser.add_argument('--pipeline', type=str, required=True,
|
|
help='Pipeline type to execute')
|
|
args = parser.parse_args()
|
|
|
|
# 配置日志
|
|
log_level = os.getenv('LOG_LEVEL', 'INFO')
|
|
setup_logging(log_level)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
# 动态加载管道模块
|
|
try:
|
|
pipeline_module = importlib.import_module(f'pipelines.{args.pipeline}')
|
|
except ImportError as e:
|
|
logger.error(f"Pipeline module not found: {args.pipeline}")
|
|
raise
|
|
|
|
# 获取管道类 (命名约定: PipelineName + "Pipeline")
|
|
pipeline_class_name = args.pipeline.replace('_', ' ').title().replace(' ', '') + 'Pipeline'
|
|
if not hasattr(pipeline_module, pipeline_class_name):
|
|
logger.error(f"Pipeline class not found: {pipeline_class_name}")
|
|
raise ImportError(f"Class {pipeline_class_name} not found in {args.pipeline} module")
|
|
|
|
PipelineClass = getattr(pipeline_module, pipeline_class_name)
|
|
|
|
# 创建配置和管道实例
|
|
config = Config()
|
|
pipeline = PipelineClass(config)
|
|
|
|
# 运行管道
|
|
pipeline.run()
|
|
|
|
logger.info("Pipeline completed successfully")
|
|
exit(0)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Pipeline execution failed: {str(e)}", exc_info=True)
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |