美文网首页
数据采集工具Flume实践

数据采集工具Flume实践

作者: 羋学僧 | 来源:发表于2020-08-19 18:18 被阅读0次

Flume简介

Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方, 用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。

1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同 属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用 来采集流动型数据。

2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集, 它支持从很多数据源聚合数据到 HDFS。

3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义 扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。

4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶 级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、可靠性。

Flume体系结构/核心组件

介绍

Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字 节 数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进 行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个 缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

核心组件

Agent:能够独立执行一个数据收集任务的JVM进程,Flume 以 Agent 为最小的独立运行单位,一个 Agent 就是一个 JVM,每台机器运行一个Agent。单 Agent 由 Source、Sink 和 Channel 三大组件构,可以包含多个。

Client:生产数据运行在一个独立的线程

Source:一个Agent中的用来跟数据源对接的服务,从Client收集数据,传递给Channel

Channel:Agent内部的一个中转组件

Sink:一个Agent中的用来跟目的地进行对接的服务,从Channel收集数据

Event:在Source、Channel、Sink中间进行流转的消息的封装对象

常见的source分类:
avro source:接收网络端口中的数据
exec source:检测新增内容。tail -f 监听某个文件新增加的内容。
spooldir source:监控文件夹的,如果这个文件夹中的数据变化,就可以采集
customer source:自定义
常见的channel分类:
memory:内存,快,但是不安全
file:安全,但是效率低
jdbc:使用数据库进行保存
常见的sink分类:
loggerSink:做测试用
HDFSSink:离线数据的sink
KafkaSink:流式数据使用

Flume三大核心组件

Event
Event 是 Flume 数据传输的基本单元。   
Flume 以事件的形式将数据从源头传送到最终的目的地。   
Event 由可选的 header 和载有数据的一个 byte array 构成。    
      载有的数据度 flume 是不透明的。   
      Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。   
      Header 可以在上下文路由中使用扩展。
Client
Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体   
目的是从数据源系统中解耦 Flume   
在 Flume 的拓扑结构中不是必须的
Agent
一个 Agent 包含 source,channel,sink 和其他组件。   
它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。   
Agent 是 flume 流的基础部分。   
Flume为这些组件提供了配置,声明周期管理,监控支持。  
Agent 之 Source
Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个  
包含 event 驱动和轮询两种类型  
不同类型的 Source    
     与系统集成的 Source:Syslog,Netcat,监测目录池
     自动生成事件的 Source:Exec   用于 Agent 和 Agent
     之间通信的 IPC source:avro,thrift  
Source 必须至少和一个 channel 关联   
Agent 之 Channel
Channel 位于 Source 和 Sink 之间,用于缓存进来的 event  
当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除   
不同的 channel 提供的持久化水平也是不一样的    
    Memory Channel:volatile(不稳定的)   
    File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现    
    JDBC Channel:基于嵌入式 database 实现   
Channel 支持事务,提供较弱的顺序保证   
可以和任何数量的 source 和 sink 工作
Agent 之 Sink
Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除  
不同类型的 sink ,比如 HDFS,HBase 



Flume三大案例:

一、官方案例监控端口数据案例

1、在flume的目录下面创建文件夹

cd /home/bigdata/apps/apache-flume-1.7.0-bin/

mkdir job
cd job

2、定义配置文件telnet-logger.conf

vim telnet-logger.conf

添加内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

解释

# example.conf: A single-node Flume configuration
# a1:表示agent的名称
# Name the components on this agent
a1.sources = r1  #r1:表示a1的输入源
a1.sinks = k1  #k1:表示a1输出目的地
a1.channels = c1 #c1:表示a1的缓存区

# Describe/configure the source
a1.sources.r1.type = netcat #表示a的输入的数据源的类型是netcat端口类型
a1.sources.r1.bind = localhost #表示a1监听的主机
a1.sources.r1.port = 44444 #表示a1监听的端口号

# Describe the sink
a1.sinks.k1.type = logger #a1的输出目的地是控制台的logger类型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #a1的channel类型的memory
a1.channels.c1.capacity = 1000 #channel的总容量是1000个event
a1.channels.c1.transactionCapacity = 100 #传输的时候收集够了100个event在提交

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #将r1和c1连接起来
a1.sinks.k1.channel = c1 #将k1和c1连接起来

3、先开启flume监听端口

退到flume目录

cd /home/bigdata/apps/apache-flume-1.7.0-bin/

官方样例:

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

实际操作:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
参数说明:
    --conf conf: 表示配置文件存储在conf这个目录
    --name a1: 表示给agent起名为a1
    --conf-file job/telnet-logger.conf: flume本次启动读取的配置文件是在job文件夹下面的telnet-logger.conf文件
    -Dflume.root.logger=INFO,console 打印日志

4、使用telnet测试端口

yum -y install telnet
telnet localhost 44444

5、发送命令测试即可


二、监控目录中的文件到HDFS

1、创建配置文件dir-hdfs.conf

在job目录下面

vim dir-hdfs.conf

添加下面的内容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

解释

a3.sources = r3 #定义source
a3.sinks = k3 #定义sink
a3.channels = c3 #定义channel

# Describe/configure the source
a3.sources.r3.type = spooldir #定义source的类型是目录
a3.sources.r3.spoolDir =/home/bigdata/data/flumedata  #监控的本地目录
a3.sources.r3.fileSuffix = .COMPLETED #上传完了的后缀
a3.sources.r3.fileHeader = true #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)  #忽略以tmp结尾的

# Describe the sink
a3.sinks.k3.type = hdfs #sink的类型hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata02:9000/flume/upload/%Y%m%d/%H  #本地文件上传到hdfs上面的路径
a3.sinks.k3.hdfs.filePrefix = upload- #上传到hdfs上面的文件的前缀
a3.sinks.k3.hdfs.round = true #是否按照时间滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1 #多长时间生成文件
a3.sinks.k3.hdfs.roundUnit = hour #单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地时间戳
a3.sinks.k3.hdfs.batchSize = 100 #到100个event刷写到hdfs
a3.sinks.k3.hdfs.fileType = DataStream #文件类型
a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700 #多大的时候生成新文件
a3.sinks.k3.hdfs.rollCount = 0 #多少个event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1 #最小副本数

# Use a channel which buffers events in memory
a3.channels.c3.type = memory #第一个案例中有
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3 #第一个案例中有
a3.sinks.k3.channel = c3

2、启动监控目录命令

cd ..
mkdir /home/bigdata/data/flumedata/

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf
cp text3.txt /home/bigdata/data/flumedata/

等600秒查看


新传一个文件立即查看,文件结尾.tmp

三、监控文件到HDFS

1、创建一个自动化文件

vim mydateauto.sh

写入:

#!/bin/bash

while true
do
    echo `date`
    sleep 1
done

然后运行测试:

sh mydateauto.sh 

然后修改配置,将输出的日志追加到某个文件中

#!/bin/bash

while true
do
        echo `date` >> /home/bigdata/data/flumedata/mydate.txt
        sleep 1
done

再次执行

sh mydateauto.sh 

就会在/home/bigdata/data/flumedata/的文件夹下面生成了mydate.txt文件

查看

tail -f /home/bigdata/data/flumedata/mydate.txt 

2、创建配置file-hdfs.conf

vim file-hdfs.conf

添加下面的内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

解释

# Name the components on this agent
a2.sources = r2 #定义source
a2.sinks = k2 #定义sink
a2.channels = c2 #定义channel

# Describe/configure the source
a2.sources.r2.type = exec #source的类型
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt #监控的本地文件
a2.sources.r2.shell = /bin/bash -c #执行脚本的绝对路径

# Describe the sink
a2.sinks.k2.type = hdfs 
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs- #前缀
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、启动

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

相关文章

  • 数据采集工具Flume实践

    Flume简介 Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方, 用于...

  • Kafka学习笔记二:Flume+Kafka安装

    Flume介绍 Flume是流式日志采集工具,FLume提供对数据进行简单处理并且写到各种数据接收方(可定制)的能...

  • Flume架构与实践

    Flume架构与实践 Flume是一款在线数据采集的系统,典型的应用场景是作为数据的总线,在线的进行日志的采集、分...

  • Flume从入门到精通1:Flume简介及环境搭建

    Flume是Apache公司旗下的一款数据采集工具。我们知道,Sqoop也是一款数据采集工具,主要用来采集关系型数...

  • Flume基础学习

    Flume是一款非常优秀的日志采集工具。支持多种形式的日志采集,作为apache的顶级开源项目,Flume再大数据...

  • Flume 理论

    1. Flume 简介 Flume 是一个分布式的海量日志采集,聚合,转移工具。 大数据常用数据处理框架 这里只是...

  • 数据平台实践①——Flume+Kafka+SparkStream

    蜻蜓点水 Flume——数据采集 如果说,爬虫是采集外部数据的常用手段的话,那么,Flume就是采集内部数据的常用...

  • 2018-07-06 flume

    flume:数据采集工具 核心角色agent agent三大组件: 1.source:采集源2.sink:存放地3...

  • 项目技术选型

    数据采集传输 FLUME,DATAHUB,RDS FLUME,KAFKA,SQOOP,DATAX 数据存储 MAX...

  • Flume_数据采集工具简介

    大数据常用的采集工具 我们的数据源一般有两种 业务数据库mysql,oracle等 sqoop采集 日志 ...

网友评论

      本文标题:数据采集工具Flume实践

      本文链接:https://www.haomeiwen.com/subject/tcedjktx.html