网上借鉴了很多网友搭建的日志收集系统,跟着搭建了一下,搭建的过程中遇到了很多问题,最终经过各种尝试搭建成功,搭建过程分享给大家。本次搭建是基于docker和docker-compose。
1、vmware创建了两台虚拟机,虚拟机采用centos操作系统。
分别在两台机器上安装docker和docker-compose
安装详情参考安装docker和docker-compose
2、修改两台机器的ip
查看本机ip ifconfig
cd /etc/sysconfig/network-scripts/
vi ifcfg-ens33
##修改完毕重启网络服务
service network restart
3、在207机器上安装kafka,由于kafka依赖zookeeper,所以按照顺序安装
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p localhost:2181 -t wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.207:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.207:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
启动完以后进入kafka容器创建生产者和消费者测试
##进入容器
docker exec -it kafka /bin/bash
##进入到kafka目录
cd /opt/kafka_2.12-2.2.0/
##创建生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
##创建消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
4、在208上安装elk,elk的安装采用github上的docker-elk
下载项目之前需要安装git
##查看是否已经安装git
git --version
##安装git
yum install -y git
##查看是否安装成功
git --version
##删除git
yum remove git
按照github上的操作启动elk成功后并测试是否成功,若测试成功则进入下一步
5、由于上面并未和kafka产生关联,所以修改docker-elk安装目录下的配置文件logstash/confg/test.conf
input {
file {
path => "/logs/input/*"
}
stdin {}
kafka {
bootstrap_servers =>["192.168.1.207:9092"] #kafka的地址
topics => ["mytopic"] #消费topic 多个用逗号隔开
}
}
output {
file {
path => "/logs/output/%{+yyyy-MM-dd}/mylog.log" #日志输入到本地并按格式保存
}
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "elasticsearch:9200" #es地址
index => "file-log-%{+YYYY.MM}" #es中index的名称
}
}
修改完之后重新启动docker-elk
6、springboot项目采用logback将日志信息输出到kafka中
引入依赖pom
<!--kafka依赖包-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
</dependency>
创建Formatter接口
package com.open.beijing.kafka.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
public interface Formatter {
String format(ILoggingEvent event);
}
package com.open.beijing.kafka.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.open.beijing.utils.JodaTimeUtil;
public class MessageFormatter implements Formatter{
@Override
public String format(ILoggingEvent event) {
return JodaTimeUtil.formatDateToString(event.getTimeStamp())+" "+event.getThreadName()+" "+event.getLevel()+" "+event.getFormattedMessage();
}
}
package com.open.beijing.kafka.log;
import java.util.Properties;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
将日志信息传入kafka中
**/
public class KafkaAppender extends AppenderBase<ILoggingEvent> {
private String topic;
private String zookeeperHost;
private Producer<String, String> producer;
private Formatter formatter;
private String brokerList;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getZookeeperHost() {
return zookeeperHost;
}
public void setZookeeperHost(String zookeeperHost) {
this.zookeeperHost = zookeeperHost;
}
public Formatter getFormatter() {
return formatter;
}
public void setFormatter(Formatter formatter) {
this.formatter = formatter;
}
public String getBrokerList() {
return brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
@Override
public void start() {
if(this.formatter == null){
this.formatter = new MessageFormatter();
}
super.start();
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", this.brokerList);
ProducerConfig config = new ProducerConfig(props);
this.producer = new Producer<String, String>(config);
}
@Override
public void stop() {
super.stop();
this.producer.close();
}
@Override
protected void append(ILoggingEvent event) {
String payload = this.formatter.format(event);
KeyedMessage<String, String> data = new KeyedMessage(this.topic, payload);
this.producer.send(data);
}
}
7、在resource目录下创建logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="KAFKA" class="com.open.beijing.kafka.log.KafkaAppender">
<topic>mytopic</topic><!--kafka的topic-->
<brokerList>192.168.1.207:9092</brokerList> <!--kafka地址-->
</appender>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="off" />
<logger name="org.apache.kafka" level="off" />
<root level="warn">
<appender-ref ref="KAFKA"/>
</root>
</configuration>
8、启动项目,打开kibana查看数据 打开本地浏览器输入 http://192.168.1.208:5601
9、大功告成!!
转载于:https://my.oschina.net/momei/blog/3055722