麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

flume+kafka+storm整合

2019-11-14 12:37:14
字體:
供稿:網(wǎng)友

flume采集數(shù)據(jù)

kafka做消息隊(duì)列(緩存)

storm做流式處理

flume版本 apache-flume-1.7.0-bin

kafka版本 kafka_2.11-0.10.1.0(要注意的是有些flume的版本和kafka的版本不兼容,flume采集的數(shù)據(jù)無法寫入到kafka的話題中去,我在這里被坑過)

storm版本 apache-storm-0.9.2-incubating

一、配置(必須先安裝zookeeper)

flume配置:

在conf文件夾下新建demoagent.conf文件

(1)監(jiān)聽端口配置

A simple example # example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

  (2)命令監(jiān)聽程序

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source       netcat a1.sources.r1.type = execa1.sources.r1.command = tail -f /home/zzq/flumedemo/test.loga1.sources.r1.port = 44444a1.sources.r1.channels = c1# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

 (3)flume 和 kafka整合

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source       netcat a1.sources.r1.type = execa1.sources.r1.command = tail -f /home/zzq/flumedemo/test.loga1.sources.r1.port = 44444a1.sources.r1.channels = c1# Describe the sink#a1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = testKJ1a1.sinks.k1.kafka.bootstrap.servers = weekend114:9092,weekend115:9092,weekend116:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.PRoducer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 1a1.sinks.ki.kafka.producer.compression.type = snappy# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

我們現(xiàn)在要用的就是第3種flume 和 kafka整合,我們將這個內(nèi)容放到demoagent.conf文件

[zzq@weekend110 conf]$ cat demoagent.conf # example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source       netcat a1.sources.r1.type = execa1.sources.r1.command = tail -f /home/zzq/flumedemo/test.loga1.sources.r1.port = 44444a1.sources.r1.channels = c1# Describe the sink#a1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = testKJ2a1.sinks.k1.kafka.bootstrap.servers = weekend110:9092a1.sinks.k1.kafka.flumeBatchSize = 200a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 1a1.sinks.ki.kafka.producer.compression.type = snappy# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

配置kafka:

 

vim config/server.properties 
broker.id=1zookeeper.connect=weekend114:2181,weekend115:2181,weekend116:2181加入id和zookeeper地址(我的是zookeeper集群)

配置storm:

修改配置文件storm.yaml

#所使用的zookeeper集群主機(jī)storm.zookeeper.servers:     - "weekend114"     - "weekend115"     - "weekend116"
#nimbus所在的主機(jī)名nimbus.host: "weekend114"
supervisor.slots.ports-6701-6702-6703-6704-6705二、啟動

       (1)、啟動strom

在nimbus主機(jī)上nohup ./storm nimbus 1>/dev/null 2>&1 &nohup ./storm ui 1>/dev/null 2>&1 &在supervisor主機(jī)上nohup ./storm supervisor 1>/dev/null 2>&1 &

   (2)啟動kafka

在每一臺節(jié)點(diǎn)上啟動brokerbin/kafka-server-start.sh config/server.properties

kafka其他實(shí)用操作:

5、在kafka集群中創(chuàng)建一個topicbin/kafka-topics.sh --create --zookeeper weekend114:2181 --replication-factor 3 --partitions 1 --topic order6、用一個producer向某一個topic中寫入消息bin/kafka-console-producer.sh --broker-list weekend110:9092 --topic order7、用一個comsumer從某一個topic中讀取信息bin/kafka-console-consumer.sh --zookeeper weekend114:2181 --from-beginning --topic order8、查看一個topic的分區(qū)及副本狀態(tài)信息bin/kafka-topics.sh --describe --zookeeper weekend114:2181 --topic order查看全部話題./bin/kafka-topics.sh --list --zookeeper weekend114:2181

(3)啟動flume
bin/flume-ng agent --conf conf --conf-file conf/demoagent.conf --name a1 -Dflume.root.logger=INFO,console

我們現(xiàn)在向/home/zzq/flumedemo/test.log文件追加內(nèi)容

[zzq@weekend110 ~]$ echo '您好啊' >> /home/zzq/flumedemo/test.log

此時我們查看kafka話題的內(nèi)容

可以看到kafka已經(jīng)接收到了,我們現(xiàn)在再用storm讀kafka做流式處理

storm代碼下載地址:http://download.csdn.net/detail/baidu_19473529/9746787

這樣整合就完成了


發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 色污视频在线观看 | 国产精品久久999 | 欧美成人三级大全 | 日韩一级片免费 | 在线成人免费视频 | www.成人在线 | 久久99精品久久久久久久久久久久 | 操你视频| 成年人免费视频播放 | 在线视频 欧美日韩 | 高潮激情aaaaa免费看 | 欧美视频一级 | 日本欧美中文字幕 | 国产羞羞视频在线观看 | 看全色黄大色黄大片女图片 | 九九热欧美| 99精品国产一区二区三区 | 欧美黑人xx | 亚洲aⅴ免费在线观看 | 国产成人自拍av | 亚洲一区二区在线免费 | 韩国美女一区 | 欧美成人午夜一区二区三区 | 中文字幕亚洲一区二区三区 | 国产精品久久久免费看 | 国产va在线观看免费 | 91午夜少妇三级全黄 | 精品av在线播放 | 国内精品视频饥渴少妇在线播放 | 国产精品久久久久久久久久东京 | av老司机久久 | 欧美精品一区二区三区在线 | 久久久国产精品成人免费 | 综合97| 精品国产乱码久久久久久久久 | 色就色 综合偷拍区91网 | sm高h视频| 在线成人免费观看视频 | 亚洲精品无码不卡在线播放he | 国产精品久久久久久久久粉嫩 | 国产久草视频在线 |