基于efk+logstash+kafka构建高吞吐量的日志收集平台

Filebeat采集日志->kafka topic存起来日志->logstash去kafka获取日志,进行格式转换->elasticsearch->kibana

1、安装zookeeper集群

准备三台机器安装zookeeper高可用集群

1
2
3
4
5
6
7
8
9
10
准备三台机器安装zookeeper高可用集群
zoo1: 192.168.101.31
zoo2: 192.168.101.32
zoo3: 192.168.101.33

#关闭selinux和firewalld
sed -i "s/^SELINUX.*/SELINUX=disabled/g" /etc/selinux/config
setenforce 0
systemctl stop firewalld
systemctl mask firewalld

1.1 zookeeper简介

zookeeper就是动物园管理员,他是用来管hadoop(大象)、Hive(蜜蜂)、pig(小猪)、kafka消息系统的管理员, Apache Hbase 和 Apache Solr 的分布式集群都用到了 zookeeper;Zookeeper是一个分布式的、开源的程序协调服务,是 hadoop 项目下的一个子项目。它提供的主要功能包括:配置管理、名字服务、分布式锁、集群管理。

1.2 Zookeeper主要作用

1. 2.1 节点选举

Master节点,主节点挂了之后,从节点就会接手工作 ,并且,保证这个节点是唯一的,这就是首脑模式,从而保证集群的高可用

1.2.2 统一配置文件管理

只需要部署一台服务器

则可以把相同的配置文件,同步更新到其他所有服务器,比如,修改了Hadoop,Kafka,redis统一配置等

1.2.3 发布与订阅消息

类似于消息队列,发布者把数据存在znode节点上,订阅者会读取这个数据

1.2.4 集群管理

集群中保证数据的一致性

Zookeeper的选举机制===>过半机制安装的台数===>奇数台(否则无法过半机制)

一般情况下10台服务器需安装ZK3台

​ 20台======>5台

​ 50台=======>7台

​ 100台=======>11台

1.2.5 zookeeper角色

leader 领导者 :负责发起选举和决议的,更新系统状态

follower 跟随者:接收客户端请求,给客户端返回结果,在选举的过程中参与投票

observer 观察者:接收客户端的连接,同步leader状态,不参与选主

1.3 解压zookeeper并作配置

1
2
3
scp apache-zookeeper-3.8.0-bin.tar.gz 192.168.101.31:/opt/
scp apache-zookeeper-3.8.0-bin.tar.gz 192.168.101.32:/opt/
scp apache-zookeeper-3.8.0-bin.tar.gz 192.168.101.33:/opt/
1
2
3
#在zoo1-3节点上解压文件,并修改名称
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper
1
2
3
#在/opt/zookeeper/目录下创建数据文件和日志文件目录
mkdir /opt/zookeeper/zkData
mkdir /opt/zookeeper/zkLog
1
2
3
4
5
6
#复制配置文件并修改(配置文件中的server.1对应的则是myid中的数值)
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
sed -i "s/^dataDir.*/dataDir=\/opt\/zookeeper/g" zoo.cfg
echo "dataLogDir=/opt/zookeeper/zkLog" >> zoo.cfg
echo -e "server.1=192.168.101.31:2188:3888\nserver.2=192.168.101.32:2188:3888\nserver.3=192.168.101.33:2188:3888" >>zoo.cfg
1
2
3
4
5
#启动zookeeper
yum install java -y
cd /opt/zookeeper/
echo `hostname | awk -F "oo" {'print $2'}` > myid
cd /opt/zookeeper/bin && nohup ./zkServer.sh start ../conf/zoo.cfg &
1
2
3
#测试是否连接成功
cd /opt/zookeeper/bin/
./zkCli.sh -server 127.0.0.1:2181

image-20230307223738470

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#创建节点以及和它关联的字符串
[zk: 127.0.0.1:2181(CONNECTED) 0] create /test "zy"
Created /test


#获取刚才创建的节点信息
[zk: 127.0.0.1:2181(CONNECTED) 1] get /test
zy


#修改节点的信息
[zk: 127.0.0.1:2181(CONNECTED) 2] set /test "zyzy"
[zk: 127.0.0.1:2181(CONNECTED) 3] get /test
zyzy

image-20230307224734575

2、单节点安装kafka集群

2.1 kafka介绍

Kafka 是一种高吞吐量的分布式发布订阅消息系统,即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。采用生产者消费者模型

基于 Kafka-ZooKeeper 的分布式消息队列系统总体架构如下:

image-20230325120025073

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

上图仅描摹了一个总体架构,并没有对作为 Kafka 节点的 broker 进行深入刻画,事实上,它的内部细节相当复杂,如下图所示,Kafka 节点涉及 Topic、Partition 两个重要概念。

image-20230325120146535

2.2 kafka相关术语

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer:负责发布消息到Kafka broker,生产者,消息的发送者。

Consumer:消息消费者,向Kafka broker读取消息的客户端。

Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;

replicas:partition 的副本,保障 partition 的高可用;

leader:replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;

follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;

controller:Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;

ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 meta 信息等

2.3 配置kafka

1
2
3
4
5
6
7
8
9
10
#在zoo3节点上
cd /opt/kafka/config


#修改配置文件
[root@zoo3 config]# cat server.properties | grep ^listen
listeners=PLAINTEXT://192.168.101.33:9092

[root@zoo3 config]# cat server.properties | grep ^zookeeper
zookeeper.connect=192.168.101.31:2181,192.168.101.32:2181,192.168.101.33:2181

zookeeper在kafka中的作用:管理broker、consumer,创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。

2.4 启动kafka

1
cd /opt/kafka/bin && ./kafka-server-start.sh -daemon ../config/server.properties

2.5 登录zookeeper客户端查看/brokers/ids

1
2
3
4
5
6
[root@zoo3 bin]# cd /opt/zookeeper/bin/
[root@zoo3 bin]# ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0]

#当出现[0]时测试通过

3、生产者和消费者测试

3.1 创建主题quickstart-events

1
2
3
[root@zoo3 bin]# cd /opt/kafka/bin/
[root@zoo3 bin]# ./kafka-topics.sh --create --topic quickstart-events --bootstrap-server 192.168.101.33:9092
Created topic quickstart-events.

3.2 查看topic

1
2
3
[root@zoo3 bin]# ./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 192.168.101.33:9092 
Topic: quickstart-events TopicId: myzlWd3_RLawswW8xfRfbw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

3.3 topic写入消息

1
2
3
4
[root@zoo3 bin]# ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.101.33:9092 
>hello
>word
>zy

3.4 打开新的终端从topic读取消息

1
2
[root@zoo3 ~]# cd /opt/kafka/bin/
[root@zoo3 bin]# ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.101.33:9092

image-20230318132112412

4、安装kafka高可用集群

将kafka安装文件拷贝到zoo1和zoo2中,这两台服务器不同的地方在于配置文件中的borker.id和listeners监听

1
2
3
#在zoo3上拷贝文件
scp -r kafka 192.168.101.31:/opt/
scp -r kafka 192.168.101.32:/opt

4.1 修改kafka配置文件

1
2
3
4
5
6
7
8
9
10
11
12
#zoo1上
[root@zoo1 config]# cat server.properties | grep ^listeners
listeners=PLAINTEXT://192.168.101.31:9092
[root@zoo1 config]# cat server.properties | grep ^broker.id
broker.id=1


#zoo2上
[root@zoo2 config]# cat server.properties | grep ^listeners
listeners=PLAINTEXT://192.168.101.32:9092
[root@zoo2 config]# cat server.properties | grep ^broker.id
broker.id=2

4.2 启动kafka

1
2
3
[root@zoo1 ~]# cd /opt/kafka/bin/ && ./kafka-server-start.sh -daemon ../config/server.properties

[root@zoo2 ~]# cd /opt/kafka/bin/ && ./kafka-server-start.sh -daemon ../config/server.properties

4.3 登陆zookeeper客户端查看/brokers/ids

1
2
3
[root@zoo3 bin]# cd /opt/zookeeper/bin/ && ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2]

5、部署filebeat服务

在zoo2上部署

filebeat是轻量级的日志收集组件

zoo2安装nginx,利用filebeat采集nginx日志(测试用,看能否产生日志)

1
[root@zoo2 ~]# yum install nginx -y && systemctl start  nginx

5.1 kafka集群创建topic(存放日志数据)

1
2
[root@zoo3 bin]# cd /opt/kafka/bin/ && ./kafka-topics.sh --create --topic test-topic --bootstrap-server 192.168.101.31:9092,192.168.101.32:9092,192.168.101.33:9092
Created topic test-topic.

5.2 配置和安装filebeat服务

1
[root@zoo2 ~]# tar -xvf filebeat-7.13.1-linux-x86_64.tar.gz -C /opt/

5.3 启动filebeat nginx模块

1
2
3
[root@zoo2 filebeat]# cd /opt/filebeat/
[root@zoo2 filebeat]# ./filebeat modules enable nginx
Enabled nginx

5.4 编写文件filebeat_nginx.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@zoo2 filebeat]# cat filebeat_nginx.yaml 
filebeat.modules:
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log*"]
error:
enabled: true
var.paths: ["/var/log/nginx/error.log*"]

#----------------------------------Kafka output--------------------------------#
output.kafka:
enabled: true
hosts: ['192.168.101.31:9092', '192.168.101.32:9092','192.168.101.33:9092']
topic: 'test-topic' #kafka的topic,需要提前创建好,上面步骤已经创建过了
required_acks: 1 #default
compression: gzip #default
max_message_bytes: 1000000 #default
codec.format:
string: '%{[message]}'

5.5 启动filebeat(直接启动会报错)

修改filebeat参数,将false改成true

1
2
[root@zoo2 filebeat]# cat  filebeat.yml | grep reload.enabled
reload.enabled: true
1
[root@zoo2 filebeat]# nohup ./filebeat -e -c filebeat_nginx.yaml &

5.6 进行测试

请求nginx

1
[root@zoo2 filebeat]# curl 192.168.101.32

查看kafka topic是否有日志数据

1
2
[root@zoo3 bin]# cd /opt/kafka/bin/
[root@zoo3 bin]# ./kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.101.31:9092,192.168.101.32:9092,192.168.101.33:9092

image-20230319170557981

6、部署logstash

6.1 解压与下载软件

在zoo2上部署

logsstash是日志收集组建,但是占用的资源较多,一半是用来对数据格式进行转换

1
2
3
4
5
6
[root@zoo2 ~]# cd /opt/
[root@zoo2 opt]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.9.2.tar.gz


[root@zoo2 opt]# tar -xf logstash-7.9.2.tar.gz
[root@zoo2 opt]# mv logstash-7.9.2 logstash

6.2 添加logstash配置文件

bootstrap_servers =>指定kafka集群地址

hosts => 指定es主机地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@zoo2 ~]# cd /opt/logstash/config

[root@zoo2 config]# cat nginx.conf
input{
kafka {
bootstrap_servers => ["192.168.101.31:9092,192.168.101.32:9092 ,192.168.101.33:9092"]
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => true
topics => ["test-topic"]
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.101.32:9200"]
index => "kafkalog-%{+YYYY.MM.dd}" # 这里定义的index就是kibana里面显示的索引名称
}
}

6.3 启动logstash服务

1
2
[root@zoo2 config]# cd ../bin/
[root@zoo2 bin]# nohup ./logstash -f ../config/nginx.conf >> logstash.log &

7、部署es和kibana服务

在zoo2上部署

elasticsearch是一个实时的,分布式的,可扩展的搜索引擎,它允许进行全文本和结构化搜索以及对日志进行分析。它通常用于索引和搜索大量日志数据,也可以用于搜索许多不同种类的文档。elasticsearch具有三大功能,搜索、分析、存储数据

7.1 安装es服务

安装docker

1
2
3
4
5
[root@zoo2 ~]# mkdir /es_data
[root@zoo2 ~]# chmod 777 /es_data/
[root@zoo2 ~]# yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
[root@zoo2 ~]# yum install docker-ce -y
[root@zoo2 ~]# systemctl enable docker --now

导入es镜像

1
[root@zoo2 ~]# docker load -i elasticsearch.tar.gz 

创建es的docker容器

1
2
3
4
5
6
[root@zoo2 ~]# docker run -p 9200:9200 -p 9330:9300 -itd -e "discovery.type=single-node" --name es -v /es_data:/usr/share/elasticsearch/data docker.elastic.co/elasticsearch/elasticsearch:7.9.2


[root@zoo2 ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1b4bd9930406 docker.elastic.co/elasticsearch/elasticsearch:7.9.2 "/tini -- /usr/local…" About a minute ago Up About a minute 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9330->9300/tcp, :::9330->9300/tcp es

7.2 安装kibana

kibana是一个基于Web的图形界面,用于搜索、分析和可视化存储在Elasticsearch指标中的日志数据。Kibana功能众多,在“Visualize” 菜单界面可以将查询出的数据进行可视化展示,“Dev Tools” 菜单界面可以让户方便地通过浏览器直接与 Elasticsearch 进行交互,发送 RESTFUL对 Elasticsearch 数据进行增删改查。。它操作简单,基于浏览器的用户界面可以快速创建仪表板(dashboard)实时显示Elasticsearch查询动态。设置Kibana非常简单。无需编码或者额外的基础架构,几分钟内就可以完成Kibana安装并启动Elasticsearch索引监测。

解压docker镜像包

1
[root@zoo2 ~]# docker load -i kibana.tar.gz 

创建docker容器

1
[root@zoo2 ~]# docker run -p 5601:5601 -it -d  --link es -e ELASTICSEARCH_URL=http://192.168.101.32:9200 --name kibana kibana:7.9.2

修改kibana的配置

1
2
3
4
5
6
7
8
9
10
11
[root@zoo2 ~]# docker exec -it kibana /bin/bash
bash-4.2$ cat config/kibana.yml
#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.101.32:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

重启kibana容器

1
[root@zoo2 ~]# docker restart kibana

配置kibana ui界面

http://192.168.101.32:5601/app/home#/

image-20230321203753901

image-20230321212901541

image-20230321213006909

image-20230321213246702

image-20230321213311584

回到kibana首页,点击Discover查看日志:

image-20230321213742742

总结:

19.1 安装zookeeper

19.2 安装kafka

19.3 部署filebeat服务

19.4 部署logstash服务

19.5 部署es和kibana服务

19.6 配置kibana ui界面