log format data inporting script
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
59f6526cca
commit
55279cd40d
59
k8s/job-templates/log-import-ctllog-job.yaml
Normal file
59
k8s/job-templates/log-import-ctllog-job.yaml
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
apiVersion: batch/v1
|
||||||
|
kind: Job
|
||||||
|
metadata:
|
||||||
|
name: log-import-ctllog-job-{{JOB_ID}}
|
||||||
|
namespace: {{NAMESPACE}}
|
||||||
|
spec:
|
||||||
|
ttlSecondsAfterFinished: 86400
|
||||||
|
backoffLimit: 0
|
||||||
|
template:
|
||||||
|
spec:
|
||||||
|
affinity:
|
||||||
|
nodeAffinity:
|
||||||
|
requiredDuringSchedulingIgnoredDuringExecution:
|
||||||
|
nodeSelectorTerms:
|
||||||
|
- matchExpressions:
|
||||||
|
- key: {{JOB_HOST_KEY}}
|
||||||
|
operator: In
|
||||||
|
values:
|
||||||
|
- {{JOB_HOST_NAME}}
|
||||||
|
containers:
|
||||||
|
- name: importer
|
||||||
|
image: {{IMAGE_REPO}}/databridge:{{IMAGE_TAG}}
|
||||||
|
args: ["--pipeline", "log_to_postgres_ctllog"]
|
||||||
|
env:
|
||||||
|
- name: DATA_PVC_MOUNT_PATH
|
||||||
|
value: "/data"
|
||||||
|
- name: DBF_INPUT_DIR
|
||||||
|
value: "/data/dbf-input"
|
||||||
|
- name: MAPPING_FILE
|
||||||
|
value: "/data/disney-mapping-v2.xlsx"
|
||||||
|
- name: DB_HOST
|
||||||
|
value: "{{DB_HOST}}"
|
||||||
|
- name: DB_PORT
|
||||||
|
value: "{{DB_PORT}}"
|
||||||
|
- name: DB_NAME
|
||||||
|
value: "{{DB_NAME}}"
|
||||||
|
- name: DB_USER
|
||||||
|
value: "{{DB_USER}}"
|
||||||
|
- name: DB_PASSWORD
|
||||||
|
value: "{{DB_PASSWORD}}"
|
||||||
|
- name: BATCH_SIZE
|
||||||
|
value: "{{BATCH_SIZE}}"
|
||||||
|
- name: LOG_LEVEL
|
||||||
|
value: "{{LOG_LEVEL}}"
|
||||||
|
volumeMounts:
|
||||||
|
- name: data-volume
|
||||||
|
mountPath: "/data"
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: "500m"
|
||||||
|
memory: "800Mi"
|
||||||
|
limits:
|
||||||
|
cpu: "1000m"
|
||||||
|
memory: "1700Mi"
|
||||||
|
volumes:
|
||||||
|
- name: data-volume
|
||||||
|
persistentVolumeClaim:
|
||||||
|
claimName: {{DATA_PVC_NAME}}
|
||||||
|
restartPolicy: Never
|
||||||
55
scripts/deploy-log-import-ctllog-disney.sh
Normal file
55
scripts/deploy-log-import-ctllog-disney.sh
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# 默认配置
|
||||||
|
JOB_ID=$(date +%Y%m%d-%H%M%S)
|
||||||
|
IMAGE_REPO=${IMAGE_REPO:-"harbor.dc.teramesh.cn/library/tools"}
|
||||||
|
IMAGE_TAG=${IMAGE_TAG:-"dev"}
|
||||||
|
BATCH_SIZE=${BATCH_SIZE:-"20"}
|
||||||
|
LOG_LEVEL=${LOG_LEVEL:-"INFO"}
|
||||||
|
DATA_PVC_NAME=${DATA_PVC_NAME:-"data-import-export-pvc"}
|
||||||
|
JOB_HOST_KEY=${JOB_HOST_KEY:-"kubernetes.io/hostname"}
|
||||||
|
JOB_HOST_NAME=${JOB_HOST_NAME:-"idrc-disney-1"}
|
||||||
|
# 数据库配置(使用时需要修改)
|
||||||
|
DB_HOST=${DB_HOST:-"db"}
|
||||||
|
DB_PORT=${DB_PORT:-"6432"}
|
||||||
|
DB_NAME=${DB_NAME:-"idrc"}
|
||||||
|
DB_USER=${DB_USER:-"teramesh"}
|
||||||
|
DB_PASSWORD=${DB_PASSWORD:-"2iqTCHwnf75stGBzM8le"}
|
||||||
|
|
||||||
|
NAMESPACE=${NAMESPACE:-"default"}
|
||||||
|
|
||||||
|
# 检查模板文件
|
||||||
|
TEMPLATE_FILE="log-import-ctllog-job.yaml"
|
||||||
|
if [ ! -f "$TEMPLATE_FILE" ]; then
|
||||||
|
echo "Template file not found: $TEMPLATE_FILE"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 直接替换模板变量(不使用envsubst)
|
||||||
|
OUTPUT_FILE="log-import-ctllog-job-${JOB_ID}.yaml"
|
||||||
|
sed -e "s|{{JOB_ID}}|$JOB_ID|g" \
|
||||||
|
-e "s|{{NAMESPACE}}|$NAMESPACE|g" \
|
||||||
|
-e "s|{{IMAGE_REPO}}|$IMAGE_REPO|g" \
|
||||||
|
-e "s|{{IMAGE_TAG}}|$IMAGE_TAG|g" \
|
||||||
|
-e "s|{{DATA_PVC_NAME}}|$DATA_PVC_NAME|g" \
|
||||||
|
-e "s|{{JOB_HOST_KEY}}|$JOB_HOST_KEY|g" \
|
||||||
|
-e "s|{{JOB_HOST_NAME}}|$JOB_HOST_NAME|g" \
|
||||||
|
-e "s|{{DB_HOST}}|$DB_HOST|g" \
|
||||||
|
-e "s|{{DB_PORT}}|$DB_PORT|g" \
|
||||||
|
-e "s|{{DB_NAME}}|$DB_NAME|g" \
|
||||||
|
-e "s|{{DB_USER}}|$DB_USER|g" \
|
||||||
|
-e "s|{{DB_PASSWORD}}|$DB_PASSWORD|g" \
|
||||||
|
-e "s|{{BATCH_SIZE}}|$BATCH_SIZE|g" \
|
||||||
|
-e "s|{{LOG_LEVEL}}|$LOG_LEVEL|g" \
|
||||||
|
"$TEMPLATE_FILE" > "$OUTPUT_FILE"
|
||||||
|
|
||||||
|
# 部署前验证
|
||||||
|
echo "Validating generated YAML..."
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE" --dry-run=client
|
||||||
|
|
||||||
|
# 部署Job
|
||||||
|
kubectl apply -f "$OUTPUT_FILE" -n "$NAMESPACE"
|
||||||
|
|
||||||
|
echo "Job deployed in namespace $NAMESPACE: log-import-ctllog-job-${JOB_ID}"
|
||||||
|
echo "To view logs: kubectl logs job/log-import-ctllog-job-${JOB_ID} -n $NAMESPACE"
|
||||||
|
|
@ -15,20 +15,20 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
super().__init__(config)
|
super().__init__(config)
|
||||||
# todo:本地调试打开
|
# todo:本地调试打开
|
||||||
self.data_root = 'D:\disney_test'
|
# self.data_root = 'D:\disney_test'
|
||||||
self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx'
|
# self.mapping_file = 'D:\disney_test\disney-mapping-v2.xlsx'
|
||||||
# todo:本地调试打开
|
# todo:本地调试打开
|
||||||
# self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
|
self.data_root = os.getenv('DATA_PVC_MOUNT_PATH', '/data')
|
||||||
# self.mapping_file = os.getenv('MAPPING_FILE')
|
self.mapping_file = os.getenv('MAPPING_FILE')
|
||||||
self.log_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
|
self.log_dir = os.getenv('DBF_INPUT_DIR', os.path.join(self.data_root, 'dbf-input'))
|
||||||
|
|
||||||
# todo:debug use
|
# todo:debug use
|
||||||
self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv'
|
# self.csv_file_path = 'D:\disney_test\debug_controller_log_logformat.csv'
|
||||||
# 初始化CSV文件
|
# # 初始化CSV文件
|
||||||
if not os.path.exists(self.csv_file_path):
|
# if not os.path.exists(self.csv_file_path):
|
||||||
with open(self.csv_file_path, 'w') as f:
|
# with open(self.csv_file_path, 'w') as f:
|
||||||
csv.writer(f).writerow(
|
# csv.writer(f).writerow(
|
||||||
['created', 'control_group_controller_id', 'point_id', 'real_value'])
|
# ['created', 'control_group_controller_id', 'point_id', 'real_value'])
|
||||||
# todo:debug use
|
# todo:debug use
|
||||||
|
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -77,7 +77,7 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
'disney_device_point_name': row['disney_device_point_name'].strip(),
|
'disney_device_point_name': row['disney_device_point_name'].strip(),
|
||||||
'control_group_controller_id': int(row['control_group_controller_id']),
|
'control_group_controller_id': int(row['control_group_controller_id']),
|
||||||
'point_id': int(row['controller_point_id']),
|
'point_id': int(row['controller_point_id']),
|
||||||
'control_group_name': row['control_group_name'].strip(),
|
'control_group_name': row['control_group_name'],
|
||||||
'control_group_id': int(row['control_group_id'])
|
'control_group_id': int(row['control_group_id'])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -93,8 +93,8 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
|
|
||||||
# 连接数据库
|
# 连接数据库
|
||||||
# todo:本地调试时关闭
|
# todo:本地调试时关闭
|
||||||
# db_config = self.config.get_database_config()
|
db_config = self.config.get_database_config()
|
||||||
# self.db = Database(**db_config)
|
self.db = Database(**db_config)
|
||||||
# todo:本地调试时关闭
|
# todo:本地调试时关闭
|
||||||
|
|
||||||
# 处理文件
|
# 处理文件
|
||||||
|
|
@ -114,7 +114,7 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
|
|
||||||
# 关闭数据库连接
|
# 关闭数据库连接
|
||||||
# todo:本地调试时关闭
|
# todo:本地调试时关闭
|
||||||
# self.db.disconnect()
|
self.db.disconnect()
|
||||||
# todo:本地调试时关闭
|
# todo:本地调试时关闭
|
||||||
return total_processed
|
return total_processed
|
||||||
|
|
||||||
|
|
@ -128,55 +128,55 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
def clean_header_debug(self, headers):
|
def clean_header_debug(self, headers):
|
||||||
"""带调试信息的列名清理"""
|
"""带调试信息的列名清理"""
|
||||||
cleaned_headers = []
|
cleaned_headers = []
|
||||||
print(f"开始处理CSV列名,共 {len(headers)} 列")
|
# print(f"开始处理CSV列名,共 {len(headers)} 列")
|
||||||
print(f"原始列名: {headers}")
|
# print(f"原始列名: {headers}")
|
||||||
|
|
||||||
for i, original_header in enumerate(headers):
|
for i, original_header in enumerate(headers):
|
||||||
try:
|
try:
|
||||||
print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'")
|
# print(f"\n处理第 {i + 1}/{len(headers)} 列: '{original_header}'")
|
||||||
|
|
||||||
# 类型检查
|
# 类型检查
|
||||||
if not isinstance(original_header, str):
|
if not isinstance(original_header, str):
|
||||||
print(f"警告: 列名不是字符串类型,类型为 {type(original_header)}")
|
self.logger.info(f"警告: 列名不是字符串类型,类型为 {type(original_header)}")
|
||||||
header_str = str(original_header)
|
header_str = str(original_header)
|
||||||
print(f"转换为字符串: '{header_str}'")
|
self.logger.info(f"转换为字符串: '{header_str}'")
|
||||||
else:
|
else:
|
||||||
header_str = original_header
|
header_str = original_header
|
||||||
|
|
||||||
# 长度信息
|
# 长度信息
|
||||||
orig_length = len(header_str)
|
orig_length = len(header_str)
|
||||||
print(f"原始长度: {orig_length} 字符")
|
# print(f"原始长度: {orig_length} 字符")
|
||||||
|
|
||||||
# 移除引号
|
# 移除引号
|
||||||
stripped = header_str.strip().strip("'").strip('"')
|
stripped = header_str.strip().strip("'").strip('"')
|
||||||
stripped_length = len(stripped)
|
stripped_length = len(stripped)
|
||||||
print(f"移除引号后: '{stripped}' ({stripped_length} 字符)")
|
# print(f"移除引号后: '{stripped}' ({stripped_length} 字符)")
|
||||||
|
|
||||||
# 移除括号内容
|
# 移除括号内容
|
||||||
cleaned = re.sub(r'\s*\([^)]*\)', '', stripped).strip()
|
cleaned = re.sub(r'\s*\([^)]*\)', '', stripped).strip()
|
||||||
cleaned_length = len(cleaned)
|
cleaned_length = len(cleaned)
|
||||||
print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)")
|
# print(f"移除括号内容后: '{cleaned}' ({cleaned_length} 字符)")
|
||||||
|
|
||||||
# 空值检查
|
# 空值检查
|
||||||
if not cleaned:
|
if not cleaned:
|
||||||
print(f"警告: 清理后列名为空,使用原始值")
|
self.logger.warning(f"警告: 清理后列名为空,使用原始值")
|
||||||
cleaned = f"Column_{i + 1}"
|
cleaned = f"Column_{i + 1}"
|
||||||
|
|
||||||
cleaned_headers.append(cleaned)
|
cleaned_headers.append(cleaned)
|
||||||
print(f"处理完成: '{original_header}' => '{cleaned}'")
|
# print(f"处理完成: '{original_header}' => '{cleaned}'")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"\n错误! 处理列名失败: '{original_header}'")
|
self.logger.error(f"\n错误! 处理列名失败: '{original_header}'")
|
||||||
print(f"错误详情: {str(e)}")
|
self.logger.error(f"错误详情: {str(e)}")
|
||||||
print(f"使用原始列名作为后备")
|
self.logger.error(f"使用原始列名作为后备")
|
||||||
|
|
||||||
if original_header:
|
if original_header:
|
||||||
cleaned_headers.append(original_header)
|
cleaned_headers.append(original_header)
|
||||||
else:
|
else:
|
||||||
cleaned_headers.append(f"Column_{i + 1}")
|
cleaned_headers.append(f"Column_{i + 1}")
|
||||||
|
|
||||||
print(f"\n列名处理完成")
|
# print(f"\n列名处理完成")
|
||||||
print(f"清理后列名: {cleaned_headers}")
|
self.logger.info(f"清理后列名: {cleaned_headers}")
|
||||||
return cleaned_headers
|
return cleaned_headers
|
||||||
|
|
||||||
def process_log_file(self, file_path, as_serial, device_id, mapping_dict):
|
def process_log_file(self, file_path, as_serial, device_id, mapping_dict):
|
||||||
|
|
@ -203,6 +203,7 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
cleaned_headers = self.clean_header_debug(headers)
|
cleaned_headers = self.clean_header_debug(headers)
|
||||||
|
|
||||||
# 创建DictReader
|
# 创建DictReader
|
||||||
|
self.logger.info(f"start to read record in the Log file: {os.path.basename(file_path)} ")
|
||||||
dict_reader = csv.DictReader(
|
dict_reader = csv.DictReader(
|
||||||
file,
|
file,
|
||||||
fieldnames=cleaned_headers
|
fieldnames=cleaned_headers
|
||||||
|
|
@ -243,17 +244,6 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
clean_dt_str = dt_str.strip().strip("'").strip('"')
|
clean_dt_str = dt_str.strip().strip("'").strip('"')
|
||||||
record_time = datetime.strptime(clean_dt_str, '%Y-%m-%d %H:%M:%S')
|
record_time = datetime.strptime(clean_dt_str, '%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
# 检查日期变化
|
|
||||||
record_date = record_time.date()
|
|
||||||
if self.current_date is None:
|
|
||||||
self.current_date = record_date
|
|
||||||
elif record_date != self.current_date:
|
|
||||||
# 日期变化时,处理前一天的缓存数据
|
|
||||||
self.process_day_cache()
|
|
||||||
self.current_date = record_date
|
|
||||||
|
|
||||||
hour_key = record_time.replace(minute=0, second=0, microsecond=0)
|
|
||||||
|
|
||||||
# seq_id不再有作用,需要根据disney_device_point_name去记录里查找
|
# seq_id不再有作用,需要根据disney_device_point_name去记录里查找
|
||||||
for mapping in mapping_dict[key]:
|
for mapping in mapping_dict[key]:
|
||||||
data_field = mapping['disney_device_point_name']
|
data_field = mapping['disney_device_point_name']
|
||||||
|
|
@ -267,21 +257,16 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
except ValueError:
|
except ValueError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可
|
cgc_id = mapping['control_group_controller_id']
|
||||||
# 创建分组键
|
point_id = mapping['point_id']
|
||||||
group_key = (
|
record_insert = (record_time, cgc_id, point_id, float_value)
|
||||||
mapping['control_group_controller_id'],
|
if record_insert not in self.seen:
|
||||||
mapping['point_id'],
|
self.seen.add(record_insert)
|
||||||
hour_key
|
self.batch_data.append(record_insert)
|
||||||
)
|
|
||||||
|
|
||||||
# 添加到分组缓存
|
|
||||||
self.add_to_group_cache(group_key, record_time, float_value, mapping)
|
|
||||||
|
|
||||||
# 检查批次大小
|
# 检查批次大小
|
||||||
if len(self.batch_data) >= self.batch_size:
|
if len(self.batch_data) >= self.batch_size:
|
||||||
self.flush_batch()
|
self.flush_batch()
|
||||||
# todo 改造一下,log格式的数据质量比较好,不需要这么复杂的处理,可以直接写入,按照pwc的处理即可
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error processing record: {e}")
|
self.logger.error(f"Error processing record: {e}")
|
||||||
|
|
@ -291,16 +276,25 @@ class LogToPostgresCtllogPipeline(BasePipeline):
|
||||||
if not self.batch_data:
|
if not self.batch_data:
|
||||||
return
|
return
|
||||||
|
|
||||||
# 实际插入数据库
|
# todo: 实际插入数据库
|
||||||
# self.db.execute_batch(
|
try:
|
||||||
# "INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value) VALUES (%s, %s, %s, %s)",
|
self.db.execute_batch(
|
||||||
# [(data[0], data[1], data[2], data[3]) for data in self.batch_data]
|
"""
|
||||||
# )
|
INSERT INTO controller_log (created, control_group_controller_id, point_id, real_value)
|
||||||
|
VALUES (%s, %s, %s, %s)
|
||||||
|
ON CONFLICT (created, control_group_controller_id, point_id)
|
||||||
|
DO UPDATE SET real_value = EXCLUDED.real_value
|
||||||
|
""",
|
||||||
|
[(data[0], data[1], data[2], data[3]) for data in self.batch_data]
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Batch Insert data failed: {e}")
|
||||||
|
# todo: 实际插入数据库
|
||||||
|
|
||||||
# todo: debug时写入CSV(调试用)
|
# todo: debug时写入CSV(调试用)
|
||||||
with open(self.csv_file_path, "a", newline="") as f:
|
# with open(self.csv_file_path, "a", newline="") as f:
|
||||||
writer = csv.writer(f)
|
# writer = csv.writer(f)
|
||||||
writer.writerows(self.batch_data)
|
# writer.writerows(self.batch_data)
|
||||||
# todo: debug时写入CSV
|
# todo: debug时写入CSV
|
||||||
|
|
||||||
# 更新处理记录数
|
# 更新处理记录数
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user