一、背景
日志文件部分内容
2020-12-25 00:02:58.049 INFO [job-0] com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator:50 - Total 1043200 records, 218743048 bytes | Speed 0B/
s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 9.835s | All Task WaitReaderTime 1,240,595.000s | Transfermor Success 1043201 records | Transformer Error 0 records | T
ransformer Filter 0 records | Transformer usedTime 0.338s | Percentage 0.00%
2020-12-25 00:03:02.571 INFO [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":30,"succLineNum":30,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.572 INFO [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":29,"succLineNum":29,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.635 INFO [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":22,"succLineNum":22,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.669 INFO [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":26,"succLineNum":26,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
需求
将日志文件中的日期以及”allLineNum”:29,“succLineNum”:29,“errLineNum”:0部分内容提取出来导入到oracle数据库
oracle数据库建表
create table ZNYW.LOG_KAFKA_BACKLOG
(
LOG_DATE DATE,
DAY_HOUR VARCHAR2(200),
ALLLINENUM VARCHAR2(200),
SUCCLINENUM VARCHAR2(200),
ERRLINENUM VARCHAR2(200)
);
二、python实现
主要模块使用:cx_Oracle和Oracle Instant Client
安装方式参考:https://cx-oracle.readthedocs.io/en/latest/ ,
https://blogs.oracle.com/linux/cx_oracle-rpms-have-landed-on-oracle-linux-yum-server
yum源:http://yum.oracle.com/repo/OracleLinux/OL7/latest/,https://yum.oracle.com/repo/OracleLinux/OL7/UEKR6/x86_64/
实现代码:
#!/bin/python
import cx_Oracle
from datetime import datetime
import time
import shutil
import re
# 创建函数用正则的方式提取出日志文件中需要的数据
def get_data(file):
date_patt = re.compile('\d{4}-\d{2}-\d{2}')
time_patt = re.compile('\d{2}:\d{2}:\d{2}\.\d+')
all_patt = re.compile('"allLineNum":\d+')
suc_patt = re.compile('"succLineNum":\d+')
err_patt = re.compile('"errLineNum":\d+')
data_patt = re.compile('\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ .*,"allLineNum":\d+,"succLineNum":\d+,"errLineNum":\d+,.*')
# data_patt = re.compile('\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+.*')
data_all = []
# 读取日志文件中的内容
with open(file, 'r') as f:
log_lists = f.readlines()
for log_list in log_lists:
# print(log_list)
oral_data = []
m = data_patt.search(log_list)
if m:
list = m.group()
date_str = date_patt.match(list)
date = datetime.strptime(date_str.group(),'%Y-%m-%d').date()
#date = datetime.strptime(date_str.group(),'%Y-%m-%d')
#date = "TO_DATE(" + date_str.group() + ",'YYYY-MM-DD')"
# 将提取到的内容保存至oral_data列表中
oral_data.append(date)
time_str = time_patt.search(list)
oral_data.append(time_str.group())
all_str = all_patt.search(list)
oral_data.append(all_str.group().replace('\"',''))
suc_str =suc_patt.search(list)
oral_data.append(suc_str.group().replace('\"',''))
err_str =err_patt.search(list)
oral_data.append(err_str.group().replace('\"',''))
data_all.append(tuple(oral_data))
return data_all
# 创建函数将提取到的日志内容导入oracle
def to_oral(jdbc):
conn =cx_Oracle.connect(jdbc)
db = conn.cursor()
data_list = get_data(file)
sql = 'insert into WAYNE.LOG_KAFKA_BACKLOG values(:1,:2,:3,:4,:5)'
# 用sql批量导入
db.executemany(sql,data_list)
#db.execute(sql)
#results = db.fetchall()
#print(results)
conn.commit()
db.close()
conn.close()
if __name__=='__main__':
# 日志文件路径
log_file = '/root/log'
file = log_file + time.strftime("%Y-%m-%d-%H%M%S")
# oracle连接
jdbc = 'WAYNE/123456@192.168.72.50:1521/ORCLDB'
# 将原日志文件重命名,kafka程序会产生新的日志文件,相当于每次运行脚本都会先切割日志
shutil.move(log_file,file)
to_oral(jdbc)
三、shell实现
工具:splldr
- 写ctl配置文件
[oracle@oracle ~]$ cat log.ctl
load data
infile '/home/oracle/log.dat'
APPEND INTO table WAYNE.LOG_KAFKA_BACKLOG #oracle 表名
fields terminated by '@' #字段之间用@分隔
trailing nullcols
(
LOG_DATE DATE "YYYY-MM-DD",
DAY_HOUR,
ALLLINENUM,
SUCCLINENUM,
ERRLINENUM
)
- 实现脚本
[oracle@oracle ~]$ cat LogToOracle.sh
#!/bin/bash
date=`date +%Y%m%d-%H%M%S`
newlog=/home/oracle/log-$date
# 将原日志切割重命名
mv /home/oracle/datax-`date +%Y-%m-%d`.log $newlog
# 导入需要的数据到dat文件中
grep "jobID" $newlog | awk -F' |,' '{print $1,$2,$15,$16,$17}' > log.dat
#指定用@符号分隔
sed -i 's/ /@/g' log.dat
#用sqlldr工具,指定控制文件和生成的日志路径
sqlldr userid = wayne/123456@orcldb control = /home/oracle/log.ctl bad = /home/oracle/bad.log log = /home/oracle/log.log data = /home/oracle/log.dat
执行脚本会生成日志文件log.log bad.log
日志文件改名处理后(log.dat)的效果
[oracle@oracle ~]$ cat log.dat
2020-12-25@00:03:02.571@"allLineNum":30@"succLineNum":30@"errLineNum":0
2020-12-25@00:03:02.572@"allLineNum":29@"succLineNum":29@"errLineNum":0
2020-12-25@00:03:02.635@"allLineNum":22@"succLineNum":22@"errLineNum":0
2020-12-25@00:03:02.669@"allLineNum":26@"succLineNum":26@"errLineNum":0
2020-12-25@00:03:02.690@"allLineNum":27@"succLineNum":27@"errLineNum":0
2020-12-25@00:03:02.691@"allLineNum":15@"succLineNum":15@"errLineNum":0
2020-12-25@00:03:02.698@"allLineNum":23@"succLineNum":23@"errLineNum":0
2020-12-25@00:03:02.704@"allLineNum":25@"succLineNum":25@"errLineNum":0
2020-12-25@00:03:02.720@"allLineNum":29@"succLineNum":29@"errLineNum":0
2020-12-25@00:03:02.874@"allLineNum":24@"succLineNum":24@"errLineNum":0
以上两种方式可以看出用shell方式简便很多,而且没有环境限制,用python实现需要在linux环境安装相关模块,如果python环境没有准备好将无法实现。
版权声明:本文为wayne342175926原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。