第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

Flume 實(shí)現(xiàn)自己的實(shí)時(shí)日志(3)

標(biāo)簽:
Java 大數(shù)據(jù)
最近接触到Flume,这里通过一些小案例做一些学习的分享。主要包括以下内容:
1-概念、2-源码编译、3-快速入门:http://idcbgp.cn/article/278218
4-源码解读:http://idcbgp.cn/article/278294
5-TAILDIR监听日志文件,源码修改,新增系统名称
6-TAILDIR监听日志文件到HDFS的案例
7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
注:本系列所有文章基于Flume 1.7.0
所有分析和注释代码都在:https://github.com/lizu18xz/flume-release-1.7.0

TAILDIR监听日志文件

对于TAILDIR的使用,大家可以到
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html去查看
具体的细节。
如果我想在每次收集到的日志前面都加上来自哪个应用或者系统的日志,那么就需要简单对
TAILDIR Source的代码进行修改。
比如:
源格式:2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10
修改后:serviceWeb 2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10

(1)源码修改,org.apache.flume.source.taildir.TaildirSource
首先关注configure方法,这里可以获取到配置文件配置的参数,我们可以在这里入手:
//add 新增,获取系统名称,默认是webLog
serviceName=context.getString("serviceName", "webLog");
然后找到process()方法,这里是具体监听文件的地方。
        //add 设置获取的系统名称
        tf.setServiceName(serviceName);
        if (tf.needTail()) {
          tailFileProcess(tf, true);
        }
最后会到org.apache.flume.source.taildir.TailFile里面真正获取数据
Event readEvent(boolean backoffWithoutNL, boolean addByteOffset)
修改此方法,获取内容,然后追加系统名称,重新构建event,然后返回
    String row=new String(line.line, Charset.defaultCharset());
    if(row.contains("INFO")==true || 
       row.contains("WARN")==true || row.contains("ERROR")==true ||
       row.contains("DEBUG")==true){
       row=serviceName+" "+row;
    }
    Event event = EventBuilder.withBody(row,Charset.defaultCharset());
    //Event event = EventBuilder.withBody(line.line);

(2)配置修改
经过以上修改后,可以在配置文件中新增serviceName的配置
agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.serviceName = dataWorks
(3)测试
打包测试

TAILDIR监听日志文件到HDFS的案例

后面的小案例我们都会引入avro source和avro sink.简单来说就是类似rpc服务一样。
可以到官网上面搜索:Setting multi-agent flow 就可以明白了。
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

(1)配置文件 
[taildir-avro.conf]
agent.sources = avro_sources01
agent.channels = avro_channel01
agent.sinks = avro_sink01

agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.positionFile = /home/elasticsearch/data/flume/taildir_position.json
agent.sources.avro_sources01.filegroups = f1
agent.sources.avro_sources01.filegroups.f1 = /home/elasticsearch/data/weblog/.*
agent.sources.avro_sources01.serviceName = dataWorks
agent.sources.avro_sources01.channels = avro_channel01

agent.channels.avro_channel01.type = memory
agent.channels.avro_channel01.capacity = 1000000
agent.channels.avro_channel01.transactionCapacity = 2000

agent.sinks.avro_sink01.type = avro
agent.sinks.avro_sink01.hostname = 192.168.88.129
agent.sinks.avro_sink01.port = 4545
agent.sinks.avro_sink01.channel = avro_channel01

[avro-hdfs.conf]
agent.sources = hdfs_sources01
agent.channels = hdfs_channel01
agent.sinks = hdfs_sink01

agent.sources.hdfs_sources01.type = avro
agent.sources.hdfs_sources01.bind = 192.168.88.129
agent.sources.hdfs_sources01.port = 4545
agent.sources.hdfs_sources01.channels = hdfs_channel01

agent.channels.hdfs_channel01.type = memory
agent.channels.hdfs_channel01.capacity = 1000000
agent.channels.hdfs_channel01.transactionCapacity = 6000

agent.sinks.hdfs_sink01.type = hdfs
agent.sinks.hdfs_sink01.hdfs.path = hdfs://192.168.88.129:8020/flumeSink/
agent.sinks.hdfs_sink01.hdfs.filePrefix = log-
agent.sinks.hdfs_sink01.hdfs.rollInterval = 0
agent.sinks.hdfs_sink01.hdfs.rollSize = 1048576
agent.sinks.hdfs_sink01.hdfs.rollCount = 0
agent.sinks.hdfs_sink01.hdfs.batchSize = 10
agent.sinks.hdfs_sink01.hdfs.writeFormat = text
agent.sinks.hdfs_sink01.hdfs.fileType = DataStream
agent.sinks.hdfs_sink01.channel = hdfs_channel01

(2)启动hdfs
(3)分别启动flume
启动消费日志的agent
./bin/flume-ng agent --conf conf -f /home/hadoop/app/data/avro-hdfs.conf -n agent -Dflume.root.logger=INFO,console

启动生产日志的agent
./bin/flume-ng agent --conf conf -f /home/elasticsearch/data/flume/taildir-avro.conf -n agent -Dflume.root.logger=INFO,console

(4)模拟生产日志信息
echo "2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10" >log.txt

(5)查看hdfs文件
hadoop fs -text /flumeSink/log-.1549943212865.tmp

(6)相关参数含义,来自官网:
HDFS Sink
hdfs.rollInterval	30	Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize	1024	File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount	10	Number of events written to file before it rolled (0 = never roll based on number of events)

未完待续

7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)

代码地址

https://github.com/lizu18xz/flume-release-1.7.0
點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消