diff --git a/k8s/job-templates/log-import-ctllog-job.yaml b/k8s/job-templates/log-import-ctllog-job.yaml new file mode 100644 index 0000000..d455b4f --- /dev/null +++ b/k8s/job-templates/log-import-ctllog-job.yaml @@ -0,0 +1,59 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: log-import-ctllog-job-{{JOB_ID}} + namespace: {{NAMESPACE}} +spec: + ttlSecondsAfterFinished: 86400 + backoffLimit: 0 + template: + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: {{JOB_HOST_KEY}} + operator: In + values: + - {{JOB_HOST_NAME}} + containers: + - name: importer + image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}} + args: ["--pipeline", "log_to_postgres_ctllog"] + env: + - name: DATA_PVC_MOUNT_PATH + value: "/data" + - name: DBF_INPUT_DIR + value: "/data/dbf-input" + - name: MAPPING_FILE + value: "/data/disney-mapping-v2.xlsx" + - name: DB_HOST + value: "{{DB_HOST}}" + - name: DB_PORT + value: "{{DB_PORT}}" + - name: DB_NAME + value: "{{DB_NAME}}" + - name: DB_USER + value: "{{DB_USER}}" + - name: DB_PASSWORD + value: "{{DB_PASSWORD}}" + - name: BATCH_SIZE + value: "{{BATCH_SIZE}}" + - name: LOG_LEVEL + value: "{{LOG_LEVEL}}" + volumeMounts: + - name: data-volume + mountPath: "/data" + resources: + requests: + cpu: "500m" + memory: "800Mi" + limits: + cpu: "1000m" + memory: "1700Mi" + volumes: + - name: data-volume + persistentVolumeClaim: + claimName: {{DATA_PVC_NAME}} + restartPolicy: Never \ No newline at end of file diff --git a/scripts/deploy-log-import-ctllog-disney.sh b/scripts/deploy-log-import-ctllog-disney.sh new file mode 100644 index 0000000..138ae6e --- /dev/null +++ b/scripts/deploy-log-import-ctllog-disney.sh @@ -0,0 +1,55 @@ +#!/bin/bash +set -e + +# 默认配置 +JOB_ID=$(date +%Y%m%d-%H%M%S) +IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"} +IMAGE_TAG=${IMAGE_TAG:-"dev"} +BATCH_SIZE=${BATCH_SIZE:-"20"} +LOG_LEVEL=${LOG_LEVEL:-"INFO"} +DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"} +JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"} +JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"} +# 数据库配置(使用时需要修改) +DB_HOST=${DB_HOST:-"db"} +DB_PORT=${DB_PORT:-"6432"} +DB_NAME=${DB_NAME:-"idrc"} +DB_USER=${DB_USER:-"teramesh"} +DB_PASSWORD=${DB_PASSWORD:-"2iqTCHwnf75stGBzM8le"} + +NAMESPACE=${NAMESPACE:-"default"} + +# 检查模板文件 +TEMPLATE_FILE="log-import-ctllog-job.yaml" +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Template file not found: $TEMPLATE_FILE" + exit 1 +fi + +# 直接替换模板变量(不使用envsubst) +OUTPUT_FILE="log-import-ctllog-job-${JOB_ID}.yaml" +sed -e "s|{{JOB_ID}}|$JOB_ID|g" \ + -e "s|{{NAMESPACE}}|$NAMESPACE|g" \ + -e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \ + -e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \ + -e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \ + -e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \ + -e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \ + -e "s|{{DB_HOST}}|$DB_HOST|g" \ + -e "s|{{DB_PORT}}|$DB_PORT|g" \ + -e "s|{{DB_NAME}}|$DB_NAME|g" \ + -e "s|{{DB_USER}}|$DB_USER|g" \ + -e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \ + -e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \ + -e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \ + "$TEMPLATE_FILE" > "$OUTPUT_FILE" + +# 部署前验证 +echo "Validating generated YAML..." +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client + +# 部署Job +kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" + +echo "Job deployed in namespace $NAMESPACE: log-import-ctllog-job-${JOB_ID}" +echo "To view logs: kubectl logs job/log-import-ctllog-job-${JOB_ID} -n $NAMESPACE" \ No newline at end of file diff --git a/src/pipelines/log_to_postgres_ctllog.py b/src/pipelines/log_to_postgres_ctllog.py index 47f7118..c2cd76d 100644 --- a/src/pipelines/log_to_postgres_ctllog.py +++ b/src/pipelines/log_to_postgres_ctllog.py @@ -15,20 +15,20 @@ class LogToPostgresCtllogPipeline(BasePipeline): def __init__(self, config): super().__init__(config) # todo:本地调试打开 - self.data_root = 'D:\disney_test' - self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' + # self.data_root = 'D:\disney_test' + # self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx' # todo:本地调试打开 - # self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') - # self.mapping_file = os.getenv('MAPPING_FILE') + self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data') + self.mapping_file = os.getenv('MAPPING_FILE') self.log_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input')) # todo:debug use - self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv' - # 初始化CSV文件 - if not os.path.exists(self.csv_file_path): - with open(self.csv_file_path, 'w') as f: - csv.writer(f).writerow( - ['created', 'control_group_controller_id', 'point_id', 'real_value']) + # self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv' + # # 初始化CSV文件 + # if not os.path.exists(self.csv_file_path): + # with open(self.csv_file_path, 'w') as f: + # csv.writer(f).writerow( + # ['created', 'control_group_controller_id', 'point_id', 'real_value']) # todo:debug use self.db = None @@ -77,7 +77,7 @@ class LogToPostgresCtllogPipeline(BasePipeline): 'disney_device_point_name': row['disney_device_point_name'].strip(), 'control_group_controller_id': int(row['control_group_controller_id']), 'point_id': int(row['controller_point_id']), - 'control_group_name': row['control_group_name'].strip(), + 'control_group_name': row['control_group_name'], 'control_group_id': int(row['control_group_id']) }) @@ -93,8 +93,8 @@ class LogToPostgresCtllogPipeline(BasePipeline): # 连接数据库 # todo:本地调试时关闭 - # db_config = self.config.get_database_config() - # self.db = Database(**db_config) + db_config = self.config.get_database_config() + self.db = Database(**db_config) # todo:本地调试时关闭 # 处理文件 @@ -114,7 +114,7 @@ class LogToPostgresCtllogPipeline(BasePipeline): # 关闭数据库连接 # todo:本地调试时关闭 - # self.db.disconnect() + self.db.disconnect() # todo:本地调试时关闭 return total_processed @@ -128,55 +128,55 @@ class LogToPostgresCtllogPipeline(BasePipeline): def clean_header_debug(self, headers): """带调试信息的列名清理""" cleaned_headers = [] - print(f"开始处理CSV列名,共 {len(headers)} 列") - print(f"原始列名: {headers}") + # print(f"开始处理CSV列名,共 {len(headers)} 列") + # print(f"原始列名: {headers}") for i, original_header in enumerate(headers): try: - print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'") + # print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'") # 类型检查 if not isinstance(original_header, str): - print(f"警告: 列名不是字符串类型,类型为 {type(original_header)}") + self.logger.info(f"警告: 列名不是字符串类型,类型为 {type(original_header)}") header_str = str(original_header) - print(f"转换为字符串: '{header_str}'") + self.logger.info(f"转换为字符串: '{header_str}'") else: header_str = original_header # 长度信息 orig_length = len(header_str) - print(f"原始长度: {orig_length} 字符") + # print(f"原始长度: {orig_length} 字符") # 移除引号 stripped = header_str.strip().strip("'").strip('"') stripped_length = len(stripped) - print(f"移除引号后: '{stripped}' ({stripped_length} 字符)") + # print(f"移除引号后: '{stripped}' ({stripped_length} 字符)") # 移除括号内容 cleaned = re.sub(r'\s*\([^)]*\)', '', stripped).strip() cleaned_length = len(cleaned) - print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)") + # print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)") # 空值检查 if not cleaned: - print(f"警告: 清理后列名为空,使用原始值") + self.logger.warning(f"警告: 清理后列名为空,使用原始值") cleaned = f"Column_{i + 1}" cleaned_headers.append(cleaned) - print(f"处理完成: '{original_header}' => '{cleaned}'") + # print(f"处理完成: '{original_header}' => '{cleaned}'") except Exception as e: - print(f"\n错误! 处理列名失败: '{original_header}'") - print(f"错误详情: {str(e)}") - print(f"使用原始列名作为后备") + self.logger.error(f"\n错误! 处理列名失败: '{original_header}'") + self.logger.error(f"错误详情: {str(e)}") + self.logger.error(f"使用原始列名作为后备") if original_header: cleaned_headers.append(original_header) else: cleaned_headers.append(f"Column_{i + 1}") - print(f"\n列名处理完成") - print(f"清理后列名: {cleaned_headers}") + # print(f"\n列名处理完成") + self.logger.info(f"清理后列名: {cleaned_headers}") return cleaned_headers def process_log_file(self, file_path, as_serial, device_id, mapping_dict): @@ -203,6 +203,7 @@ class LogToPostgresCtllogPipeline(BasePipeline): cleaned_headers = self.clean_header_debug(headers) # 创建DictReader + self.logger.info(f"start to read record in the Log file: {os.path.basename(file_path)} ") dict_reader = csv.DictReader( file, fieldnames=cleaned_headers @@ -243,17 +244,6 @@ class LogToPostgresCtllogPipeline(BasePipeline): clean_dt_str = dt_str.strip().strip("'").strip('"') record_time = datetime.strptime(clean_dt_str, '%Y-%m-%d %H:%M:%S') - # 检查日期变化 - record_date = record_time.date() - if self.current_date is None: - self.current_date = record_date - elif record_date != self.current_date: - # 日期变化时,处理前一天的缓存数据 - self.process_day_cache() - self.current_date = record_date - - hour_key = record_time.replace(minute=0, second=0, microsecond=0) - # seq_id不再有作用,需要根据disney_device_point_name去记录里查找 for mapping in mapping_dict[key]: data_field = mapping['disney_device_point_name'] @@ -267,21 +257,16 @@ class LogToPostgresCtllogPipeline(BasePipeline): except ValueError: continue - # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 - # 创建分组键 - group_key = ( - mapping['control_group_controller_id'], - mapping['point_id'], - hour_key - ) - - # 添加到分组缓存 - self.add_to_group_cache(group_key, record_time, float_value, mapping) + cgc_id = mapping['control_group_controller_id'] + point_id = mapping['point_id'] + record_insert = (record_time, cgc_id, point_id, float_value) + if record_insert not in self.seen: + self.seen.add(record_insert) + self.batch_data.append(record_insert) # 检查批次大小 if len(self.batch_data) >= self.batch_size: self.flush_batch() - # todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可 except Exception as e: self.logger.error(f"Error processing record: {e}") @@ -291,16 +276,25 @@ class LogToPostgresCtllogPipeline(BasePipeline): if not self.batch_data: return - # 实际插入数据库 - # self.db.execute_batch( - # "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)", - # [(data[0], data[1], data[2], data[3]) for data in self.batch_data] - # ) + # todo: 实际插入数据库 + try: + self.db.execute_batch( + """ + INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) + VALUES (%s, %s, %s, %s) + ON CONFLICT (created, control_group_controller_id, point_id) + DO UPDATE SET real_value = EXCLUDED.real_value + """, + [(data[0], data[1], data[2], data[3]) for data in self.batch_data] + ) + except Exception as e: + self.logger.error(f"Batch Insert data failed: {e}") + # todo: 实际插入数据库 # todo: debug时写入CSV(调试用) - with open(self.csv_file_path, "a", newline="") as f: - writer = csv.writer(f) - writer.writerows(self.batch_data) + # with open(self.csv_file_path, "a", newline="") as f: + # writer = csv.writer(f) + # writer.writerows(self.batch_data) # todo: debug时写入CSV # 更新处理记录数