0%

Flume初识

本文是尚硅谷Flume课程随手笔记,记录课程的一些实战的操作步骤


一、安装Flume并实现一个简易端口监控

实现监控端口案例,通过Flume来监听端口数据,并将数据打印到控制台


1. 首先进入官网下载Flume源码包


2. 修改flume-env.sh文件

1
export JAVA_HOME=/opt/module/jdk1.8.0_144

3. 添加配置文件

创建Flume Agent配置文件flume-netcat-logger.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Name the components on this agent a1:表示agent的名称
# r1:表示a1的Source的名称
a1.sources = r1
# k1:表示a1的Sink的名称
a1.sinks = k1
# c1:表示a1的Channel的名称
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为netcat端口类型
a1.sources.r1.type = netcat
# 表示a1的监听的主机
a1.sources.r1.bind = localhost
# 表示a1的监听的端口号
a1.sources.r1.port = 44444

# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示将r1和c1连接起来
a1.sources.r1.channels = c1
# 表示将k1和c1连接起来
a1.sinks.k1.channel = c1


4. 启动Flume

启动终端,在终端中属于下面的指令

1
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

使用下面的指令

1
nc localhost 44444

通过netcat发送数据,发现此时flume日志中显示接收到数据



二、 实时监控单个追加文件

实时监控 Hive 日志,并上传到 HDFS 中


1. 添加配置文件

创建本次程序的配置文件,创建 flume-file-logger.conf 文件,根据官网添加一下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Name the components on this agent a1:表示agent的名称
# r1:表示a1的Source的名称
a1.sources = r1
# k1:表示a1的Sink的名称
a1.sinks = k1
# c1:表示a1的Channel的名称
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为exec source类型
a1.sources.r1.type = exec
# 监控文件
a1.sources.r1.command = tail -F /Users/user/Library/Hive/logs/hive.log

# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示将r1和c1连接起来
a1.sources.r1.channels = c1
# 表示将k1和c1连接起来
a1.sinks.k1.channel = c1


2. 启动Flume

在终端中输入下面的指令

1
bin/flume-ng agent --conf conf --conf-file job/file-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console

之后启动该任务,并且可以监听hive的日志文件



三、 实时监控单个追加文件到HDFS

Flume监控Hive实时更新日志然后上传到HDFS


1. 添加相关依赖包

首先将需要使用的相关Jar包放入Flume的lib目录下

1
2
3
4
5
6
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar


2.添加配置文件

创建本次程序的配置文件,创建 flume-file-hdfs.conf 文件,根据官网添加一下配置,因为这里与上一节区别主要在sink部分,所以只需要在上一节配置的基础上对sink相关的部分进行修改即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Name the components on this agent a1:表示agent的名称
# r1:表示a1的Source的名称
a1.sources = r1
# k1:表示a1的Sink的名称
a1.sinks = k1
# c1:表示a1的Channel的名称
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为exec source类型
a1.sources.r1.type = exec
# 监控文件
a1.sources.r1.command = tail -F /Users/user/Library/Hive/logs/hive.log

# Describe the sink

# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示将r1和c1连接起来
a1.sources.r1.channels = c1
# 表示将k1和c1连接起来
a1.sinks.k1.channel = c1


3. 启动Flume

在终端中输入下面的指令

1
bin/flume-ng agent --conf conf --conf-file job/file-file-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

之后启动该任务,并且可以监听文件的追加



四、 实时监控目录下的多个新文件至HDFS

当监控的目录下有新文件时,进行上传


1. 添加配置文件

写下如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# Name the components on this agent a1:表示agent的名称
# r1:表示a1的Source的名称
a1.sources = r1
# k1:表示a1的Sink的名称
a1.sinks = k1
# c1:表示a1的Channel的名称
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为spooldir类型
a1.sources.r1.type = spooldir
# 监控文件的路径
a1.sources.r1.spoolDir = /Users/user/Library/Flume/upload
# 上传完成文件后缀
a1.sources.r1.fileSuffix = .COMPLETED
# 忽略所有以.tmp 结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)

# Describe the sink

# 表示a1的输出目的地是控制台hdfs类型
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示将r1和c1连接起来
a1.sources.r1.channels = c1
# 表示将k1和c1连接起来
a1.sinks.k1.channel = c1


2. 启动Flume

在终端中输入下面的指令

1
bin/flume-ng agent --conf conf --conf-file job/file-dir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

之后我们向upload文件夹添加文件,发现成功上传至HDFS。

注: 但是它并不能监控动态变化的数据,在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件,上传完成的文件会以.COMPLETED 结尾,被监控文件夹每 500 毫秒扫描一次文件变动。



五、 实时监控目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。


1. 添加配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Name the components on this agent a1:表示agent的名称
# r1:表示a1的Source的名称
a1.sources = r1
# k1:表示a1的Sink的名称
a1.sinks = k1
# c1:表示a1的Channel的名称
a1.channels = c1

# Describe/configure the source
#表示a1的输入源类型为TAILDIR类型
a1.sources.r1.type = TAILDIR
# 指定position_file位置
a1.sources.r1.positionFile = /Users/user/Library/Flume/upload/tail_dir.json
# 文件组
a1.sources.r1.filegroups = f1 f2
# f1文件组
a1.sources.r1.filegroups.f1 = /Users/user/Library/Flume/upload/dict1/a.log
# f2文件组
a1.sources.r1.filegroups.f2 = /Users/user/Library/Flume/upload/dict2/.*.txt

# Describe the sink

# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示将r1和c1连接起来
a1.sources.r1.channels = c1
# 表示将k1和c1连接起来
a1.sinks.k1.channel = c1

2. 启动Flume

在终端中输入下面的指令

1
bin/flume-ng agent --conf conf --conf-file job/file-taildir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

之后我们对监控的文件组中的文件进行追加,发现成功监听。

注: 在查看tail_dir.json中可以发现监听记录,同时每条记录都存在inode字段和pos字段,inode字段为文件唯一标识符,不随文件名称的变化而变化,pos则是上传文件的偏移量,正是通过这个来实现断点续传。