文章目录
- 一、Flume基础介绍
- 1、Flume是什么
- 2、Flume能干什么
- 3、我们为什么要使用Flume
- 4、Flume基础架构
- 4.1 Agent
- 4.2 Source
- 4.3 Sink
- 4.4 Channel
- 4.5 Event
- 二、Flume快速入门
- 1、Flume安装部署
- 1.1 下载地址
- 1.2 安装部署
- 2、Flume官方入门案例
- 2.1 案例需求
- 2.2 实现步骤
- 三、Flume进阶
- 1、Flume事务
- 2、Flume内部原理
- 2.1 重要组件:
- 3、Flume拓扑结构
- 3.1 简单串联
- 3.2 复制和多路复用
- 3.3 负载均衡和故障转移
- 3.4 聚合
- 四、Flume高级
- 1、自定义组件
- 1.1 自定义Source
- 1.2 自定义Sink
- 2、Flume数据流监控
- 2.1 Ganglia介绍
- 2.2 Ganglia的安装和部署
- 2.3 操作Flume测试监控
一、Flume基础介绍
1、Flume是什么
下图为Flume图标(湍急的河流里有一个木头)
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。
2、Flume能干什么
作为大数据体系结构重要的一环,Flume作为日志收集数据,并可以在日志系统中定制Source,sink(也就是数据发送方,接收方);同时还可以对数据进行简单处理,并有将数据写入Hbase,HDFS数据存储系统的能力;另外,我常用Flume+Kafka+Redis+Spark Streaming来做系统的的实时运算。可见,Flume的功能还是很强大的。
3、我们为什么要使用Flume
为了方便读者理解,我是用图解的方式来解释,如下图所示。
从图中可以看出,我们可以使用Flume,实时读取服务器本地磁盘的数据,将数据写入到HDFS或者对接Kafka。在这个信息过载的时代,我们不可能对数以万计的数据信息都自己收集,而Flume可以满足我们这个愿望。
4、Flume基础架构
首先我还是利用图的方式进行直观的说明,然后下面逐一讲解。
4.1 Agent
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,Agent主要由:Source、Cannel、Sink三部分组成。
4.2 Source
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型,各种格式的日志数据,包括Avro、Thrift、Exec、Jms、Spooling、Directory、Netcat、Sequence、Generator、Syslog、Http、Legacy。
4.3 Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
4.4 Channel
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写出操作和几个Sink读取操作。Flume自带两种Channel:Memory Channel和File Channel以及Kafka Channel。Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
4.5 Event
传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该Event的一些属性,以K-V结构,Body用来存放该条数据,形式为字节数组,如下图所示。
二、Flume快速入门
1、Flume安装部署
1.1 下载地址
Flume官方下载地址
1.2 安装部署
①:将下载好的安装包(我用的是apache-flume-1.7.0-bin.tar.gz)上传到Linux的你常用的软件安装包保存路径(我自己的路径为/opt/software)
②:在当前目录解压此安装包,注意,我的/opt/software专门存放安装包,/opt/module专门存放安装的软件,所以按照自己的路径来,那么我就解压到/opt/module下
[ironmanjay@hadoop102 software]$ tar -zxf apache-flume-1.7.0bin.tar.gz -C /opt/module/
③:文件名太长了,修改一下
[ironmanjay@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
④:将flume/conf下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件
[ironmanjay@hadoop102 module]$ mv flume/conf/flume-env.sh.template flume-env.sh
[ironmanjay@hadoop102 module]$ vi flume/conf/flume-env.sh
⑤:将Java的环境变量加入即可,在文件末尾添加这句话。注意:根据自己的Java安装路径添加
export JAVA_HOME=/opt/module/jdk1.8.0_144
2、Flume官方入门案例
2.1 案例需求
使用Flume监听一个端口,收集该端口数据,并打印到控制台
2.2 实现步骤
①:安装netcat工具
[ironmanjay@hadoop102 software]$ sudo yum install -y nc
②:判断44444端口是否被占用
[ironmanjay@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444
③:创建Flume Agent配置文件flume-netcat-logger.conf
在flume目录下创建job文件夹并进入job文件夹
[ironmanjay@hadoop102 flume]$ mkdir job
[ironmanjay@hadoop102 flume]$ cd job/
④:在job文件夹下创建Flume Agent配置文件flume-netcat-logger.conf,并添加如下内容
# 以下来源于官方手册
a1.sources = r1 # a1表示agent的的名称,r1表示a1的Source的名称
a1.sinks = k1 # k1表示a1的Sink名称
a1.channels = c1 # c1 表示a1的Channel的名称
a1.sources.r1.type = netcat # 表示a1的输入类型为netcat端口类型
a1.sources.r1.bind = localhost # 表示a1监听主机
a1.sources.r1.port = 44444 # 表示a1监听的端口号
a1.sinks.k1.type = logger # 表示a1的输出目的地是控制台logger类型
a1.channels.c1.type = memory # 表示a1的channel类型是memory内存型
a1.channels.c1.capacity = 1000 # 表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100 # 表示a1的channel传输时收集到了100event以后再去提交更多事务
a1.sources.r1.channels = c1 # 表示将r1和c1连接起来
a1.sinks.k1.channel = c1 # 表示将k1和c1连接起来
⑤:开启flume监听窗口
第一种写法:
[ironmanjay@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
第二种写法:
[ironmanjay@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf/-c:表示配置文件存储在 conf/目录
--name/-n: 表示给 agent 起名为 a1
--conf-file/-f: flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
文件
-Dflume.root.logger=INFO,console : -D 表示 flume 运行时动态修改 flume.root.logger
参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、 info、 warn、error
⑥:使用netcat工具向本机的4
4444端口发送内容
[ironmanjay@hadoop102 ~]$ nc localhost 44444
hello
⑦:在Flume监听页面观察接收数据情况
三、Flume进阶
1、Flume事务
①:Put事务流程
· doPut:将批数据先写入临时缓冲区putList
· doCommit:检查channel内存队列是否足够合并
· doRollback:channel内存队列空间不足,回滚数据
②:Take事务
· doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
· doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
· doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列
2、Flume内部原理
2.1 重要组件:
①:ChannelSelector
ChannelSelecyor的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。ReplicationSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel
②:SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor。DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以实现故障转移的功能
3、Flume拓扑结构
3.1 简单串联
这种模式是将多个Flume顺序连接起来了,从最初的Source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的Flume数量,Flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点Flume宕机,会影响整个传输系统
3.2 复制和多路复用
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个Channel中,或者将不同数据分发到不同的Channel中
,Sink可以选择传送到不同的目的地
3.3 负载均衡和故障转移
Flume支持使用将多个Sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor,可以实现负载均衡和错误恢复的功能
3.4 聚合
这种模式是我们最常见的,也非常实用。日常Web引用通常分布在上百个服务器,大者甚至上千、上万个服务器。产生的日志,处理起来也非常麻烦。用Flume这种组合方式能很好的解决这一问题,每台服务器部署一个Flume采集日志,传送到一个集中收集日志的Flume,再由此Flume上传到HDFS、Hbase、Hive等,进行日志分析
四、Flume高级
1、自定义组件
1.1 自定义Source
①:介绍
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling、directory、netcat、sequence等等。官方提供的Source类型已经很多,但是有时候并不能满组实际开放当中的需求,此时我们就需要根据实际需求自定义某些Source
②:需求
使用Flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可以从Flume配置文件中配置。
③:步骤
· 导入pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
· 编写代码
package Source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
// 定义全局的前缀&后缀
private String prefix;
private String subfix;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
// 1、接收数据
for (int i = 0; i < 5; i++) {
// 2、构建事件对象
SimpleEvent event = new SimpleEvent();
// 3、给事件设置值
event.setBody((prefix + "--" + i + "--" + subfix).getBytes());
// 4、将事件传给channel
getChannelProcessor().processEvent(event);
status = Status.READY;
}
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回结果
return status;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public void configure(Context context) {
// 读取配置信息,给前后缀赋值
prefix = context.getString("prefix");
subfix = context.getString("subfix", "IronmanJay");
}
}
④:测试
· 打包
将写好的代码打包,并放到flume的lib目录下(/opt/module/flume)下
· 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = Source
a1.sources.r1.delay = 1000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
· 开启任务
[ironmanjay@hadoop102 flume]$ pwd /opt/module/flume
[ironmanjay@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
1.2 自定义Sink
①:介绍
Sink不断地轮询Channel中的事件并批量的移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或者下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。Sink组件目的地包括HDFS、logger、avro、thrift、ipc、file、null、Hbase、solr。官方提供的Sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink
②:需求
使用Flume接收数据,并在Sink端给每条数据添加前缀和后缀,输出到控制台。前后缀可在Flume任务配置文件中配置
③:编码
package Sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
// 获取Logger对象
private Logger logger = LoggerFactory.getLogger(MySink.class);
// 定义两个属性,前后缀
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
// 读取配置文件,给前后缀赋值
prefix = context.getString("prefix");
subfix = context.getString("subfix", "IronmanJay");
}
@Override
public Status process() throws EventDeliveryException {
// 1、定义返回值
Status status = null;
// 2、获取channel
Channel channel = getChannel();
// 3、从channel获取事务
Transaction transaction = channel.getTransaction();
// 4、开启事务
transaction.begin();
try {
// 5、从channel获取数据
Event event = channel.take();
if (event != null) {
// 6、处理事件
String body = new String(event.getBody());
logger.info(prefix + body + subfix);
}
// 7、提交事务
transaction.commit();
// 8、提交成功,修改状态信息
status = Status.READY;
} catch (ChannelException e) {
e.printStackTrace();
// 9、提交失败
transaction.rollback();
// 10、修改状态
status = Status.BACKOFF;
} finally {
// 11、最终关闭事务
transaction.close();
}
// 12、返回状态信息
return status;
}
}
④:测试
· 打包
将写好的代码打包,并放到flume的lib目录下(/opt/module/flume)下
· 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = Sink
a1.sinks.k1.suffix = :IronmanJay
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
· 开启任务
[ironmanjay@hadoop102 flume]$ bin/flume-ng agent -c conf/ -fjob/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[ironmanjay@hadoop102 ~]$ nc localhost 44444
hello
OK
ironmanjay
OK
2、Flume数据流监控
2.1 Ganglia介绍
Ganglia 由 gmond、 gmetad 和 gweb 三部分组成
- gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等
- gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务
- gweb(Ganglia Web) Ganglia 可视化工具, gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
2.2 Ganglia的安装和部署
①:安装httpd服务与php
[ironmanjay@hadoop102 flume]$ sudo yum -y install httpd php
②:安装其他依赖
[ironmanjay@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool rrdtool-devel
[ironmanjay@hadoop102 flume]$ sudo yum -y install apr-devel
③:安装Ganglia
[ironmanjay@hadoop102 flume]$ sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
[ironmanjay@hadoop102 flume]$ sudo yum -y install ganglia-gmetad
[ironmanjay@hadoop102 flume]$ sudo yum -y install ganglia-web
[ironmanjay@hadoop102 flume]$ sudo yum -y install ganglia-gmond
④:修改配置文件/etc/httpd/conf.d/ganglia.conf
[ironmanjay@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf
修改为红颜色的配置:
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
#Deny from all
Allow from all
# Allow from 127.0.0.1
# Allow from ::1
# Allow from .example.com
</Location>
⑤:修改配置文件/etc/ganglia/gmetad.conf
修改内容:
data_source "你的虚拟机名称" 你的虚拟机ip地址
⑥:修改配置文件/etc/ganglia/gmond.conf
[ironmanjay@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf
修改为:
cluster {
name = "你的虚拟机名称"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source
address
# that resolves to the machine's hostname.
Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
host = 你的虚拟机ip
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
bind = 192.168.9.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you
really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
⑦:修改配置文件/etc/selinux/config
[ironmanjay@hadoop102 flume]$ sudo vim /etc/selinux/config
修改为:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
提示:selinux 本次生效关闭必须重启,如果此时不想重启,可以临时生效,如下:
[ironmanjay@hadoop102 flume]$ sudo setenforce 0
⑧:启动Ganglia
[ironmanjay@hadoop102 flume]$ sudo service httpd start
[ironmanjay@hadoop102 flume]$ sudo service gmetad start
[@hadoop102 flume]$ sudo service gmond start
⑨:打开网页浏览 Ganglia 页面
http://你的虚拟机ip/ganglia
提示: 如果完成以上操作出现权限不足错误,请修改/var/lib/ganglia 目录的权限:
[ironmanjay@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia
2.3 操作Flume测试监控
①:修改Flume安装目录/conf下的 flume-env.sh 配置
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=你的虚拟机ip:8649 -Xms100m -Xmx200m"
②:启动Flume任务
[ironmanjay@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=你的虚拟机ip:8649
③:发送数据观察Ganglia监测图
[ironmanjay@hadoop102 flume]$ nc localhost 44444
如图所示: