flume安装
从http://flume.apache.org/官网下载对应的版本的flume,下载之后解压:解压命令 tar xf apache-flume-1.7.0-bin.tar.gz 解压后目录结构如下
+ apache-flume-1.7.0-bin
- bin 存放flume的启动脚本
- conf 存放一些flume的的template配置。读者如果是刚上手不妨先运行这几个template
- lib 存放flume的运行是以来的jar
- docs 存放flume的一些官方文档,其中包含有api文档。还有官方的例子,以html呈现给用户
此时就算是将flume解压完毕了
编写agent配置文件
agent文件一般存放于installpath/conf下。flume整体是一个是数据流模型(Data Flow Model)Source通过通道添加事件,Sinks通过通道取事件。所以通道类似缓存的存在
===========================================================================
| | | kafka |
| webServer | (Source) (channels) (Sinks) | hdfs |
| | | FileSystem |
===========================================================================
flume+kafka的AGENT配置文件
agent.sources = s1
agent.sinks = k1
agent.channels = c1
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /opt/mood/frontend/webapps/frontend/logs/dc_access-1.log
注:监控单文件的要执行的shell命令
agent.sources.s1.channels=c1
注:连接那个通道
#通道配置
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收器
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=192.168.2.162:9092
#设置Kafka的Topic
agent.sinks.k1.topic=testKJ1
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
对应的几种常用的Source
-
a1.sources.r1.type = avro
监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。
- a1.sources.r1.type = spooldir
这个Source允许你将将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入通道,它会被重命名或可选的直接删除。要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。
- a1.sources.r1.type = http
HTTP Source接受HTTP的GET和POST请求作为Flume的事件,其中GET方式应该只用于试验。该Source需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口,该处理器接受一个 HttpServletRequest对象, 并返回一个Flume Envent对象集合。从一个HTTP请求中得到的事件将在一个事务中提交到通道中。因此允许像文件通道那样对通道提高效率。如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。
- a1.sources.r1.type = netcat
监控端口
- a1.sources.r1.type = exec (shell命令)
监控单文件
对应的几种常用的Channels
- agent.channels.c1.type=memory
Memory Channel是事件存储在一个内存队列中。速度快,吞吐量大。但会有代理出现故障后数据丢失的情况。
- agent.channels.c1.type=jdbc
JDBC Channel是把事件存储在数据库。目前的JDBC Channel支持嵌入式Derby。主要是为了数据持久化,并且可恢复的特性。
- agent.channels.c1.type=file
注意默认情况下,File Channel使用检查点(checkpointDir)和在用户目录(dataDirs)上指定的数据目录。所以在一个agent下面启动多个File Channel实例,只会有一个File channel能锁住文件目录,其他的都将初始化失败。因此,有必要提供明确的路径的所有已配置的通道,同时考虑最大吞吐率,检查点与数据目录最好是在不同的磁盘上。
- agent.channels.c1.type=org.example.MyChannel
Custom Channel是对channel接口的实现。需要在classpath中引入实现类和相关的jar文件。这Channel对应的type是该类的完整路径
对应的几种常用的Sinks
- agent.sinks.k1.type=hdfs
将事件写入到Hadoop分布式文件系统(HDFS)中。主要是Flume在hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景。
- agent.sinks.k1.type=avro
Avro Sink主要用于Flume分层结构。Flumeevent 发送给这个sink的事件都会转换成Avro事件,发送到配置好的Avro主机和端口上。这些事件可以批量传输给通道。
- agent.sinks.k1.type=file_roll
存储到本地存储中。但是会有个滚动间隔的设置,设置多长时间去生成文件(默认是30秒)。
- agent.sinks.k1.type=null
丢弃从通道接收的所有事件。。。
- agent.sinks.k1.type=hbase
HBaseSinks负责将数据写入到Hbase中。hbase的配置信息从classpath路径里面遇到的第一个hbase-site.xml文件中获取。在配置文件中指定的实现了HbaseEventSerializer接口的类,用于将事件转换成Hbase所表示的事件或者增量。然后将这些事件和增量写入Hbase中。Hbase Sink支持写数据到安全的Hbase。为了将数据写入安全的Hbase,用户代理运行必须对配置的table表有写权限。主要用来验证对KDC的密钥表可以在配置中指定。在Flume Agent的classpath路径下的Hbase-site.xml文件必须设置到Kerberos认证。注意这一点很重要,就是这个sinks 对格式的规范要求非常高。至于 AsyncHBaseSink则是异步的HBaseSinks。
- agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
将事件写入到Kafka中,适用大数据日志场景。
启动flume
在conf目录下执行的命令:
../bin/flume-ng agent -n agent -c conf -f ./kafkasink.conf -Dflume.root.logger=DEBUG,console
第一个agent表明以agent模式启动,第二个-n agent是指定该agent的name是agent在配置文件中有写到("agent"...),第三个conf -f ./kafkasink.conf是用来指定配置文件为刚才书写的kafkasink.conf,第四个是用来配置日志的输出级别启动时-D传入。flume内部就成了log4j。最后一个配置控制台输出。
该Demo监控公司的数据上传的access日志,将其打在Kafka指定的Topic上面所以启动一个消费者进程去消费该Topic上的消息,既可以看到结果。
注:实现该Demo的前提依赖于java环境和而且还需要保证zookeeper和kafka都必须是启动的状态。否则flume会报错提示连接不上去kafka
启动kfaka
命令: bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
由于在flume中配置了(agent.sinks.k1.topic=testKJ1),在flume连接到kafak之后会创建Topic:"testKJ1",然后再Kafka的Data目录也会存放对应的topic目录
启动kafka的消费者
命令: ./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
执行此命令则会在控制台上打印出来Topic上的消息。