一、背景

日志文件部分内容

2020-12-25 00:02:58.049 INFO  [job-0] com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator:50 - Total 1043200 records, 218743048 bytes | Speed 0B/
s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 9.835s |  All Task WaitReaderTime 1,240,595.000s | Transfermor Success 1043201 records | Transformer Error 0 records | T
ransformer Filter 0 records | Transformer usedTime 0.338s | Percentage 0.00%
2020-12-25 00:03:02.571 INFO  [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":30,"succLineNum":30,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.572 INFO  [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":29,"succLineNum":29,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.635 INFO  [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":22,"succLineNum":22,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}
2020-12-25 00:03:02.669 INFO  [-ZB_CITY_IOTGROUPCODE_4G_2ORACLE-0-0-0-reader] com.alibaba.datax.plugin.reader.kafkareader.KafkaReaderHelper:410 - ######{"jobID":"","target":"","action":"inp
ut","media":"-","module":"datax","uuid":"","fileName":"","allLineNum":26,"succLineNum":26,"errLineNum":0,"checkTime":"2020-12-24 16:03:02","recTime":"0","endTime":"2020-12-24 16:03:02"}

需求
将日志文件中的日期以及”allLineNum”:29,“succLineNum”:29,“errLineNum”:0部分内容提取出来导入到oracle数据库
oracle数据库建表

create table ZNYW.LOG_KAFKA_BACKLOG
(
  LOG_DATE    DATE,
  DAY_HOUR    VARCHAR2(200),
  ALLLINENUM  VARCHAR2(200),
  SUCCLINENUM VARCHAR2(200),
  ERRLINENUM  VARCHAR2(200)
);

二、python实现

主要模块使用:cx_Oracle和Oracle Instant Client
安装方式参考:https://cx-oracle.readthedocs.io/en/latest/ ,

https://blogs.oracle.com/linux/cx_oracle-rpms-have-landed-on-oracle-linux-yum-server

yum源:http://yum.oracle.com/repo/OracleLinux/OL7/latest/,https://yum.oracle.com/repo/OracleLinux/OL7/UEKR6/x86_64/
实现代码:

#!/bin/python
import cx_Oracle
from datetime import datetime
import time
import shutil
import re
# 创建函数用正则的方式提取出日志文件中需要的数据
def get_data(file):
    date_patt = re.compile('\d{4}-\d{2}-\d{2}')
    time_patt = re.compile('\d{2}:\d{2}:\d{2}\.\d+')
    all_patt = re.compile('"allLineNum":\d+')
    suc_patt = re.compile('"succLineNum":\d+')
    err_patt = re.compile('"errLineNum":\d+')
    data_patt = re.compile('\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ .*,"allLineNum":\d+,"succLineNum":\d+,"errLineNum":\d+,.*')
    # data_patt = re.compile('\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+.*')
    data_all = []
    # 读取日志文件中的内容
    with open(file, 'r') as f:
        log_lists = f.readlines()
        for log_list in log_lists:
            # print(log_list)
            oral_data = []
            m = data_patt.search(log_list)
            if m:
                list = m.group()
                date_str = date_patt.match(list)
                date = datetime.strptime(date_str.group(),'%Y-%m-%d').date()
                #date = datetime.strptime(date_str.group(),'%Y-%m-%d')
                #date = "TO_DATE(" + date_str.group() + ",'YYYY-MM-DD')"
                # 将提取到的内容保存至oral_data列表中
                oral_data.append(date)
                time_str = time_patt.search(list)
                oral_data.append(time_str.group())
                all_str = all_patt.search(list)
                oral_data.append(all_str.group().replace('\"',''))
                suc_str =suc_patt.search(list)
                oral_data.append(suc_str.group().replace('\"',''))
                err_str =err_patt.search(list)
                oral_data.append(err_str.group().replace('\"',''))
                data_all.append(tuple(oral_data))
    return data_all
#  创建函数将提取到的日志内容导入oracle
def to_oral(jdbc):
    conn =cx_Oracle.connect(jdbc)
    db = conn.cursor()
    data_list = get_data(file)
    sql = 'insert into WAYNE.LOG_KAFKA_BACKLOG values(:1,:2,:3,:4,:5)'
    # 用sql批量导入
    db.executemany(sql,data_list)
    #db.execute(sql)
    #results = db.fetchall()
    #print(results)
    conn.commit()
    db.close()
    conn.close()
if __name__=='__main__':
    # 日志文件路径
    log_file = '/root/log'
    file = log_file +  time.strftime("%Y-%m-%d-%H%M%S")
    # oracle连接
    jdbc = 'WAYNE/123456@192.168.72.50:1521/ORCLDB'
    # 将原日志文件重命名,kafka程序会产生新的日志文件,相当于每次运行脚本都会先切割日志
    shutil.move(log_file,file)
    to_oral(jdbc)

三、shell实现

工具:splldr

  • 写ctl配置文件
[oracle@oracle ~]$ cat log.ctl
load data
infile '/home/oracle/log.dat'
APPEND INTO table  WAYNE.LOG_KAFKA_BACKLOG #oracle 表名
fields terminated by '@'                 #字段之间用@分隔
trailing nullcols
(
LOG_DATE DATE "YYYY-MM-DD",
DAY_HOUR,  
ALLLINENUM, 
SUCCLINENUM,
ERRLINENUM 
)
  • 实现脚本
[oracle@oracle ~]$ cat LogToOracle.sh 
#!/bin/bash
date=`date +%Y%m%d-%H%M%S`
newlog=/home/oracle/log-$date
# 将原日志切割重命名
mv /home/oracle/datax-`date +%Y-%m-%d`.log $newlog
# 导入需要的数据到dat文件中
grep "jobID" $newlog | awk -F' |,' '{print $1,$2,$15,$16,$17}' > log.dat
#指定用@符号分隔
sed -i 's/ /@/g' log.dat
#用sqlldr工具,指定控制文件和生成的日志路径
sqlldr userid = wayne/123456@orcldb control = /home/oracle/log.ctl bad = /home/oracle/bad.log log = /home/oracle/log.log data = /home/oracle/log.dat

执行脚本会生成日志文件log.log bad.log
日志文件改名处理后(log.dat)的效果

[oracle@oracle ~]$ cat log.dat 
2020-12-25@00:03:02.571@"allLineNum":30@"succLineNum":30@"errLineNum":0
2020-12-25@00:03:02.572@"allLineNum":29@"succLineNum":29@"errLineNum":0
2020-12-25@00:03:02.635@"allLineNum":22@"succLineNum":22@"errLineNum":0
2020-12-25@00:03:02.669@"allLineNum":26@"succLineNum":26@"errLineNum":0
2020-12-25@00:03:02.690@"allLineNum":27@"succLineNum":27@"errLineNum":0
2020-12-25@00:03:02.691@"allLineNum":15@"succLineNum":15@"errLineNum":0
2020-12-25@00:03:02.698@"allLineNum":23@"succLineNum":23@"errLineNum":0
2020-12-25@00:03:02.704@"allLineNum":25@"succLineNum":25@"errLineNum":0
2020-12-25@00:03:02.720@"allLineNum":29@"succLineNum":29@"errLineNum":0
2020-12-25@00:03:02.874@"allLineNum":24@"succLineNum":24@"errLineNum":0

以上两种方式可以看出用shell方式简便很多,而且没有环境限制,用python实现需要在linux环境安装相关模块,如果python环境没有准备好将无法实现。


版权声明:本文为wayne342175926原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/wayne342175926/article/details/115015613