美文网首页
Spark Streaming整合flume

Spark Streaming整合flume

作者: 机灵鬼鬼 | 来源:发表于2019-06-20 11:01 被阅读0次
两种整合方式一种方式是推送,另一种方式是拉取

Spark Streaming支持的数据源有两大类,第一大类是基础的数据源

第二大类是高级数据源

Spark Streaming和Flume的整合指南

下面是我写的样例:

第一步配置flume的agent,并启动flume,监控一个文件夹datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streaming.sources = spooldir-source

spooldir-memory-avro-streaming.sinks = avro-sink

spooldir-memory-avro-streaming.channels = memory-channel

# Describe/configure the source

##注意:不能往监控目中重复丢同名文件

## 通过spooldir来监控文件内容的变化

spooldir-memory-avro-streaming.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streaming.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streaming.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streaming.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streaming.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streaming.sinks.avro-sink.type = avro

spooldir-memory-avro-streaming.sinks.avro-sink.hostname=10.101.3.3

spooldir-memory-avro-streaming.sinks.avro-sink.port=44445

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用内存的方式

spooldir-memory-avro-streaming.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streaming.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

第二步编写streaming与flume整合的java代码

第三步提交jar到spark上,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

bin/spark-submit --class com.liushun.Flume2StreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

第四步 启动flume由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streaming.conf --name spooldir-memory-avro-streaming -Dflume.root.logger=INFO,console

注意:一定不能先启动flume,因为flume在启动时会预先测试监听端口是否存在,如果存在才继续启动,否则就会中断启动,报错。

第五步 验证

创建文件移动到usr/local/datatest目录下,观察streaming打印的日志是否跟我们预想的一样。

在运行程序时你可能会遇到ClassNotFound的错误,不用紧张,只需要根据错误,去你的maven仓库种找到相应的jar包添加到spark的jars目录中即可。

截止目前位置:flume主动推送到Streaming的方式介绍完毕,接下来我们介绍另外一种。

这一种方案,相较于第一种推送的方式更加高可用,更加健壮

那么我们要使用这种方式,那么具体步骤是什么尼?看下面

第一步:配置flume

(1)添加jar包

(2)编写运行的自定义的flume的agent配置文件

第二步编写Spark Streaming应用程序

以下是我的整合样例,仅供参考

第一步、配置flume的agent的自定义Sink,并启动flume,监控一个文件夹datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streamingfromSink.sources = spooldir-source

spooldir-memory-avro-streamingfromSink.sinks = spark-sink

spooldir-memory-avro-streamingfromSink.channels = memory-channel

# Describe/configure the source

##注意:不能往监控目中重复丢同名文件

## 通过spooldir来监控文件内容的变化

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.hostname=10.101.3.3

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.port=44445

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用内存的方式

spooldir-memory-avro-streamingfromSink.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

应用程序编写样例:

第三步启动flume的,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streamingfromSink.conf --name spooldir-memory-avro-streamingfromSink -Dflume.root.logger=INFO,console

第四步启动应用程序,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./spark-submit --class com.liushun.StreamingFromFlumeWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

验证结果:

相关文章

网友评论

      本文标题:Spark Streaming整合flume

      本文链接:https://www.haomeiwen.com/subject/oxxrfctx.html