Flume收集Tomcat日志

Scroll Down

flume安装

从http://flume.apache.org/官网下载对应的版本的flume,下载之后解压:解压命令 tar xf apache-flume-1.7.0-bin.tar.gz 解压后目录结构如下
    + apache-flume-1.7.0-bin
      - bin 存放flume的启动脚本
      - conf 存放一些flume的的template配置。读者如果是刚上手不妨先运行这几个template
      - lib 存放flume的运行是以来的jar
      - docs 存放flume的一些官方文档,其中包含有api文档。还有官方的例子,以html呈现给用户
此时就算是将flume解压完毕了

编写agent配置文件

agent文件一般存放于installpath/conf下。flume整体是一个是数据流模型(Data Flow Model)Source通过通道添加事件,Sinks通过通道取事件。所以通道类似缓存的存在

   
   ===========================================================================
   |                    |                                                                                               |       kafka    |      
   |  webServer |       (Source)        (channels)        (Sinks)   |        hdfs    |
   |                    |                                                                                           | FileSystem |                              
   ===========================================================================

flume+kafka的AGENT配置文件

        agent.sources = s1
        agent.sinks = k1
        agent.channels = c1
        agent.sources.s1.type=exec
        agent.sources.s1.command=tail -F /opt/mood/frontend/webapps/frontend/logs/dc_access-1.log
        注:监控单文件的要执行的shell命令
        agent.sources.s1.channels=c1
        注:连接那个通道
        #通道配置
        agent.channels.c1.type=memory
        agent.channels.c1.capacity=10000
        agent.channels.c1.transactionCapacity=100
        #设置Kafka接收器
        agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
        #设置Kafka的broker地址和端口号
        agent.sinks.k1.brokerList=192.168.2.162:9092
        #设置Kafka的Topic
        agent.sinks.k1.topic=testKJ1
        #设置序列化方式
        agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
        agent.sinks.k1.channel=c1
   

对应的几种常用的Source

  • a1.sources.r1.type  =  avro
    

监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。

  • a1.sources.r1.type = spooldir

这个Source允许你将将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入通道,它会被重命名或可选的直接删除。要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。

  • a1.sources.r1.type = http

HTTP Source接受HTTP的GET和POST请求作为Flume的事件,其中GET方式应该只用于试验。该Source需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口,该处理器接受一个 HttpServletRequest对象, 并返回一个Flume Envent对象集合。从一个HTTP请求中得到的事件将在一个事务中提交到通道中。因此允许像文件通道那样对通道提高效率。如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。

  • a1.sources.r1.type = netcat

监控端口

  • a1.sources.r1.type = exec (shell命令)

监控单文件


对应的几种常用的Channels

  • agent.channels.c1.type=memory

Memory Channel是事件存储在一个内存队列中。速度快,吞吐量大。但会有代理出现故障后数据丢失的情况。

  • agent.channels.c1.type=jdbc

JDBC Channel是把事件存储在数据库。目前的JDBC Channel支持嵌入式Derby。主要是为了数据持久化,并且可恢复的特性。

  • agent.channels.c1.type=file

注意默认情况下,File Channel使用检查点(checkpointDir)和在用户目录(dataDirs)上指定的数据目录。所以在一个agent下面启动多个File Channel实例,只会有一个File channel能锁住文件目录,其他的都将初始化失败。因此,有必要提供明确的路径的所有已配置的通道,同时考虑最大吞吐率,检查点与数据目录最好是在不同的磁盘上。

  • agent.channels.c1.type=org.example.MyChannel

Custom Channel是对channel接口的实现。需要在classpath中引入实现类和相关的jar文件。这Channel对应的type是该类的完整路径


对应的几种常用的Sinks

  • agent.sinks.k1.type=hdfs

将事件写入到Hadoop分布式文件系统(HDFS)中。主要是Flume在hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景。

  • agent.sinks.k1.type=avro

Avro Sink主要用于Flume分层结构。Flumeevent 发送给这个sink的事件都会转换成Avro事件,发送到配置好的Avro主机和端口上。这些事件可以批量传输给通道。

  • agent.sinks.k1.type=file_roll

存储到本地存储中。但是会有个滚动间隔的设置,设置多长时间去生成文件(默认是30秒)。

  • agent.sinks.k1.type=null

丢弃从通道接收的所有事件。。。

  • agent.sinks.k1.type=hbase

HBaseSinks负责将数据写入到Hbase中。hbase的配置信息从classpath路径里面遇到的第一个hbase-site.xml文件中获取。在配置文件中指定的实现了HbaseEventSerializer接口的类,用于将事件转换成Hbase所表示的事件或者增量。然后将这些事件和增量写入Hbase中。Hbase Sink支持写数据到安全的Hbase。为了将数据写入安全的Hbase,用户代理运行必须对配置的table表有写权限。主要用来验证对KDC的密钥表可以在配置中指定。在Flume Agent的classpath路径下的Hbase-site.xml文件必须设置到Kerberos认证。注意这一点很重要,就是这个sinks 对格式的规范要求非常高。至于 AsyncHBaseSink则是异步的HBaseSinks。

  • agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink

将事件写入到Kafka中,适用大数据日志场景。


启动flume
在conf目录下执行的命令:

../bin/flume-ng agent -n agent -c conf -f ./kafkasink.conf -Dflume.root.logger=DEBUG,console

第一个agent表明以agent模式启动,第二个-n agent是指定该agent的name是agent在配置文件中有写到("agent"...),第三个conf -f ./kafkasink.conf是用来指定配置文件为刚才书写的kafkasink.conf,第四个是用来配置日志的输出级别启动时-D传入。flume内部就成了log4j。最后一个配置控制台输出。

该Demo监控公司的数据上传的access日志,将其打在Kafka指定的Topic上面所以启动一个消费者进程去消费该Topic上的消息,既可以看到结果。

注:实现该Demo的前提依赖于java环境和而且还需要保证zookeeper和kafka都必须是启动的状态。否则flume会报错提示连接不上去kafka

启动kfaka
        命令:     bin/zookeeper-server-start.sh config/zookeeper.properties
                   bin/kafka-server-start.sh config/server.properties
由于在flume中配置了(agent.sinks.k1.topic=testKJ1),在flume连接到kafak之后会创建Topic:"testKJ1",然后再Kafka的Data目录也会存放对应的topic目录
启动kafka的消费者
        命令: ./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
        执行此命令则会在控制台上打印出来Topic上的消息。