第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定

Flume 實現(xiàn)自己的實時日志(4)

標(biāo)簽:
Java 大數(shù)據(jù)
最近接触到Flume,这里通过一些小案例做一些学习的分享。主要包括以下内容:
1-概念、2-源码编译、3-快速入门:http://idcbgp.cn/article/278218
4-源码解读:http://idcbgp.cn/article/278294
5-TAILDIR监听日志文件,源码修改、6-TAILDIR监听日志文件到HDFS的案例:
http://idcbgp.cn/article/278481
7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
注:本系列所有文章基于Flume 1.7.0
所有分析和注释代码都在:https://github.com/lizu18xz/flume-release-1.7.0

TAILDIR监听日志文件到Kafka的案例

(1)配置文件
1.配置文件
[taildir-avro.conf]
agent.sources = avro_sources01
agent.channels = avro_channel01
agent.sinks = avro_sink01

agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.positionFile = /home/elasticsearch/data/flume/taildir_position.json
agent.sources.avro_sources01.filegroups = f1
agent.sources.avro_sources01.filegroups.f1 = /home/elasticsearch/data/weblog/.*
agent.sources.avro_sources01.serviceName = dataWorks
agent.sources.avro_sources01.channels = avro_channel01

agent.channels.avro_channel01.type = memory
agent.channels.avro_channel01.capacity = 1000000
agent.channels.avro_channel01.transactionCapacity = 2000

agent.sinks.avro_sink01.type = avro
agent.sinks.avro_sink01.hostname = 192.168.88.129
agent.sinks.avro_sink01.port = 4545
agent.sinks.avro_sink01.channel = avro_channel01

[avro-kafka.conf]
agent.sources = kafka_sources01
agent.channels = kafka_channel01
agent.sinks = kafka_sink01

agent.sources.kafka_sources01.type = avro
agent.sources.kafka_sources01.bind = 192.168.88.129
agent.sources.kafka_sources01.port = 4545
agent.sources.kafka_sources01.channels = kafka_channel01

agent.channels.kafka_channel01.type = memory
agent.channels.kafka_channel01.capacity = 1000000
agent.channels.kafka_channel01.transactionCapacity = 6000

agent.sinks.kafka_sink01.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink01.kafka.topic = flume-kafka
agent.sinks.kafka_sink01.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka_sink01.flumeBatchSize = 6000
agent.sinks.kafka_sink01.kafka.producer.acks = 1
agent.sinks.kafka_sink01.kafka.producer.linger.ms = 1
agent.sinks.kafka_sink01.kafka.producer.compression.type = snappy
agent.sinks.kafka_sink01.channel = kafka_channel01

(2)启动kafka和flume
启动消费日志到kafka生产者的agent
./bin/flume-ng agent --conf conf -f /home/hadoop/app/data/avro-kafka.conf -n agent -Dflume.root.logger=INFO,console

启动生产日志的agent
bin/flume-ng agent --conf conf -f /home/elasticsearch/data/flume/taildir-avro.conf -n agent -Dflume.root.logger=INFO,console

(3)生产数据,启动kafka消费者
echo "2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10" >log.txt

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume-kafka --from-beginning

注意,需要现在kafka 创建flume-kafka的topic.

(4)kafka sink参数解析
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#kafka-sink
可以在官网上面查看相关的参数
[type]	–	Must be set to org.apache.flume.sink.kafka.KafkaSink
类型必须这样设置,之前源码分析的时候讲过具体原因。
[kafka.bootstrap.servers,kafka.topic]都是基本的参数
[flumeBatchSize]  –  How many messages to process in one batch.一批处理多少条消息
注意和kafka 中的batch.size不是同一个:
如果接收的条数大于这个flumeBatchSize的数量,就会进行producer.flush(),
当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka,Prevent(防止) 
linger.ms from holding the batch,防止linger.ms参数控制这个批次。
这个配置是flume进行控制的,和kafka的batch.size参数无关。
部分代码:

for (; processedEvents < batchSize; processedEvents += 1) {
        //从channel获取event
        event = channel.take();
        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        eventTopic = headers.get(TOPIC_HEADER);
        if (eventTopic == null) {
          eventTopic = topic;
        }
        eventKey = headers.get(KEY_HEADER);
      
        Integer partitionId = null;
        try {
          ProducerRecord<String, byte[]> record;
         
          if (partitionId != null) {
            record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
                serializeEvent(event, useAvroEventFormat));
          } else {
            record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
                serializeEvent(event, useAvroEventFormat));
          }
          kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
        } catch (NumberFormatException ex) {
          throw new EventDeliveryException("Non integer partition id specified", ex);
        } catch (Exception ex) {
          ...
          throw new EventDeliveryException("Could not send event", ex);
        }
      }
      //Prevent(防止) linger.ms from holding the batch,当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka
      producer.flush();

[Other Kafka Producer Properties]	–	可以配置kafka producer的所有支持的
参数,For example: kafka.producer.linger.ms

[kafka.producer.linger.ms]    单位是ms      延迟 可以获取更多消息
[kafka.producer.batch.size]   单位是bytes   控制一次发送的大小,批处理

这两个都是kafka client里面的参数,当达到一个后就会进行批量发送数据了
关于kafka producer的相关参数可以到官网查看:
http://kafka.apache.org/documentation.html#producerapi

未完待续

8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)

代码地址

https://github.com/lizu18xz/flume-release-1.7.0
點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
JAVA開發(fā)工程師
手記
粉絲
6390
獲贊與收藏
157

關(guān)注作者,訂閱最新文章

閱讀免費(fèi)教程

  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊有機(jī)會得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消