随着时间的积累,日志数据会越来越多。在实际应用场景中,为了满足大数据实时检索的需求,您可以使用Filebeat采集日志数据,将Kafka作为Filebeat的输出端。Kafka实时接收到Filebeat采集的数据后,以Logstash作为输出端输出。输出到Logstash中的数据在格式或内容上可能不能满足您的需求,此时可以通过Logstash的filter插件过滤数据。最后将满足需求的数据输出到ES中进行分布式检索,并通过Kibana进行数据分析与展示。简单流程如下。
-
Kafka是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。本文使用阿里云消息队列Kafka版,详情请参见什么是消息队列Kafka版。
-
Elasticsearch是一个基于Lucene的实时分布式的搜索与分析引擎,是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。它提供了一个分布式服务,可以使您快速的近乎于准实时的存储、查询和分析超大数据集,通常被用来作为构建复杂查询特性和需求强大应用的基础引擎或技术。
阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0等版本,并提供了商业插件X-Pack服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。本文使用阿里云Elasticsearch为您演示,单击此处即可免费试用。 -
Logstash是一个开源的服务处理管道,能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。
阿里云Logstash作为服务器端的数据处理管道,提供了100%兼容开源Logstash的能力。除了支持所有官方预置插件外,还致力于打造包含logstash-input-sls、logstash-input-oss、logstash-output-oss等适用各类场景的插件中心,为您提供更为强大的数据处理和搬迁能力,实现云上数据生态打通。本文使用阿里云Logstash为您演示,单击此处即可免费试用。
在阿里云ELK(Elasicsearch、Logstash、Kibana)生态下,Elasticsearch作为实时分布式搜索和分析引擎,Kibana为Elasticsearch提供了强大的可视化界面,Logstash提供了数据采集、转换、优化和输出的能力,可以被广泛应用于实时日志处理、全文搜索和数据分析等领域。
操作流程
-
准备工作
完成环境准备,包括创建实例、安装Filebeat等。
-
步骤一:配置Filebeat
配置Filebeat的input为系统日志,output为Kafka,将日志数据采集到Kafka的指定Topic中。
-
步骤二:配置Logstash管道
配置Logstash管道的input为Kafka,output为阿里云ES,使用Logstash消费Topic中的数据并传输到阿里云ES中。
-
步骤三:查看日志消费状态
在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。
-
步骤四:通过Kibana过滤日志数据
在Kibana控制台的Discover页面,通过Filter过滤出Kafka相关的日志。
准备工作
-
创建阿里云ES实例,并开启实例的自动创建索引功能。
具体操作步骤请参见创建阿里云Elasticsearch实例和开启自动创建索引,本文以6.7版本为例。
-
创建阿里云Logstash实例。要求该实例与阿里云ES实例的版本相同,并且在同一专有网络VPC(Virtual Private Cloud)下。
具体操作步骤请参见创建阿里云Logstash实例。
-
购买并部署阿里云消息队列Kafka版实例、创建Topic和Consumer Group。
本文使用VPC实例,并要求该实例与阿里云ES实例在同一VPC下,具体操作步骤请参见VPC接入。
创建Topic和Consumer Group的具体步骤请参见步骤三:创建资源。
-
创建阿里云ECS实例,并且该ECS实例与阿里云ES实例和Logstash实例处于同一VPC下。
具体操作步骤请参见使用向导创建实例。
说明: 该ECS实例用来安装Filebeat,由于Filebeat目前仅支持Aliyun Linux、RedHat和CentOS这三种操作系统,因此在创建时请选择其中一种操作系统。
-
在ECS实例上安装Filebeat。
具体操作步骤请参见Install Filebeat。本文使用6.8.5版本。
步骤一:配置Filebeat
-
连接安装了Filebeat的ECS服务器。
具体操作步骤请参见连接实例。
-
进入Filebeat安装目录,执行以下命令,创建并配置filebeat.kafka.yml文件。
cd filebeat-6.8.5-linux-x86_64 vi filebeat.kafka.yml
filebeat.kafka.yml配置如下。
filebeat.prospectors: - type: log enabled: true paths: - /var/log/*.log output.kafka: hosts: ["172.16.**.**:9092","172.16.**.**:9092","172.16.**.**:9092"] topic: estest version: 0.10.2
参数 说明 type
输入类型。设置为log,表示输入源为日志。 enabled
设置配置是否生效。true表示生效,false表示不生效。 paths
需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。 hosts
消息队列Kafka实例的接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点。 topic
日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。 version
Kafka的版本,可在消息队列Kafka的实例详情页面获取。 说明: 不配置此参数会报错。 -
启动Filebeat。
./filebeat -e -c filebeat.kafka.yml
步骤二:配置Logstash管道
-
登录阿里云Elasticsearch控制台。
-
在顶部菜单栏处,选择地域。
-
在左侧导航栏,单击Logstash实例,再在实例列表中单击目标实例ID。
-
在左侧导航栏,单击管道管理。
-
在管道列表区域,单击创建管道。
-
在创建管道任务页面,输入管道ID并配置管道。
本文使用的管道配置如下。
input { kafka { bootstrap_servers => ["172.**.**.92:9092,172.16.**.**:9092,172.16.**.**:9092"] group_id => "es-test" topics => ["estest"] codec => json } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user =>"elastic" password =>"<your_password>" index => "kafka‐%{+YYYY.MM.dd}" } }
input参数说明
参数 说明 bootstrap_servers
消息队列Kafka实例的接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点。 group_id
指定为您已创建的Consumer Group的名称。 topics
指定为您已创建的Topic的名称,需要与Filebeat中配置的Topic名称保持一致。 codec
设置为json,表示解析json格式的字段,便于在Kibana中分析。 output参数说明
参数 说明 hosts
阿里云ES的访问地址,取值为 http://<阿里云ES实例的内网地址>:9200
。
说明: 您可在阿里云ES实例的基本信息页面获取其内网地址,详情请参见查看实例的基本信息。user
访问阿里云ES的用户名,默认为elastic。您也可以使用自建用户,详情请参见创建用户。 password
访问阿里云ES的密码,在创建实例时设置。如果忘记密码,可进行重置,重置密码的注意事项及操作步骤请参见重置实例访问密码。 index
索引名称。设置为 kafka‐%{+YYYY.MM.dd}
表示索引名称以kafka为前缀,以日期为后缀,例如kafka-2020.05.27
。更多Config配置详情请参见Logstash配置文件说明。
-
单击下一步,配置管道参数。
参数 说明 管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。 管道批大小 单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。 管道批延迟 创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。 队列类型 用于事件缓冲的内部排队模型。可选值: MEMORY:默认值。基于内存的传统队列。 PERSISTED:基于磁盘的ACKed队列(持久队列)。 队列最大字节数 请确保该值小于您的磁盘总容量。默认值:1024MB。 队列检查点写入数 启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。 警告: 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。
-
单击保存或者保存并部署。
- 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
- 保存并部署:保存并且部署后,会触发实例重启,使配置生效。
步骤三:查看日志消费状态
-
进入消息队列Kafka控制台。
-
在左侧导航栏,单击Consumer Group管理。
-
在Consumer Group管理页面,单击目标消息队列Kafka实例。
-
单击对应Consumer Group右侧操作列下的消费状态。
-
在消费状态对话框中,单击对应Topic右侧操作列下的详情,查看详细消费状态。
正常情况下,返回结果如下。
步骤四:通过Kibana过滤日志数据
-
登录目标阿里云ES实例的Kibana控制台。
具体步骤请参见登录Kibana控制台。
-
创建一个索引模式。
-
在左侧导航栏,单击Management。
-
在Kibana区域,单击Index Patterns。
-
单击Create index pattern。
-
输入Index pattern(本文使用kafka-*),单击Next step。
-
选择Time Filter field name(本文选择**@timestamp**),单击Create index pattern。
-
-
在左侧导航栏,单击Discover。
-
从页面左侧的下拉列表中,选择您已创建的索引模式(kafka-*)。
-
在页面右上角,选择一段时间,查看对应时间段内的Filebeat采集的日志数据。
-
单击Add a filter,在Add filter页面中设置过滤条件,查看符合对应过滤条件的日志数据。