# Flume 练习题
# 题目 1
需求:使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = netcat  | 
 | a1.sources.r1.bind = localhost   | 
 | a1.sources.r1.port = 44444    | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = logger  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 2
实时监控 Hive 日志,并上传到 HDFS 中
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 | a1.sources.r1.type = exec  | 
 | a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log   | 
 | a1.sources.r1.shell = /bin/bash -c  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 | a1.sinks.k1.type = hdfs  | 
 | a1.sinks.k1.hdfs.path = hdfs:  | 
 |  | 
 | a1.sinks.k1.hdfs.filePrefix = upload-  | 
 |  | 
 | a1.sinks.k1.hdfs.round = true  | 
 |  | 
 | a1.sinks.k1.hdfs.roundValue = 1  | 
 |  | 
 | a1.sinks.k1.hdfs.roundUnit = hour  | 
 |  | 
 | a1.sinks.k1.hdfs.useLocalTimeStamp = true  | 
 |  | 
 | a1.sinks.k1.hdfs.batchSize = 100  | 
 |  | 
 | a1.sinks.k1.hdfs.fileType = DataStream  | 
 |  | 
 | a1.sinks.k1.hdfs.rollInterval = 60   | 
 |  | 
 | a1.sinks.k1.hdfs.rollSize = 134217700  | 
 |  | 
 | a1.sinks.k1.hdfs.rollCount = 0  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 3
使用 Flume 监听整个目录的文件,并上传至 HDFS
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = spooldir   | 
 | a1.sources.r1.spoolDir = /opt/module/flume/upload   | 
 | a1.sources.r1.fileSuffix = .COMPLETED   | 
 | a1.sources.r1.fileHeader = true   | 
 |  | 
 | a1.sources.r1.ignorePattern = ([^ ]*\.tmp)  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 | a1.sinks.k1.type = hdfs  | 
 | a1.sinks.k1.hdfs.path = hdfs:  | 
 |  | 
 | a1.sinks.k1.hdfs.filePrefix = upload-  | 
 |  | 
 | a1.sinks.k1.hdfs.round = true  | 
 |  | 
 | a1.sinks.k1.hdfs.roundValue = 1  | 
 |  | 
 | a1.sinks.k1.hdfs.roundUnit = hour  | 
 |  | 
 | a1.sinks.k1.hdfs.useLocalTimeStamp = true  | 
 |  | 
 | a1.sinks.k1.hdfs.batchSize = 100  | 
 |  | 
 | a1.sinks.k1.hdfs.fileType = DataStream  | 
 |  | 
 | a1.sinks.k1.hdfs.rollInterval = 60   | 
 |  | 
 | a1.sinks.k1.hdfs.rollSize = 134217700  | 
 |  | 
 | a1.sinks.k1.hdfs.rollCount = 0  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 4
使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = TAILDIR   | 
 | a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json   | 
 | a1.sources.r1.filegroups = f1 f2   | 
 | a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.*   | 
 | a1.sources.r1.filegroups.f2 = /opt/module/flume/files/.*log.*   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 | a1.sinks.k1.type = hdfs  | 
 | a1.sinks.k1.hdfs.path = hdfs:  | 
 |  | 
 | a1.sinks.k1.hdfs.filePrefix = upload-  | 
 |  | 
 | a1.sinks.k1.hdfs.round = true  | 
 |  | 
 | a1.sinks.k1.hdfs.roundValue = 1  | 
 |  | 
 | a1.sinks.k1.hdfs.roundUnit = hour  | 
 |  | 
 | a1.sinks.k1.hdfs.useLocalTimeStamp = true  | 
 |  | 
 | a1.sinks.k1.hdfs.batchSize = 100  | 
 |  | 
 | a1.sinks.k1.hdfs.fileType = DataStream  | 
 |  | 
 | a1.sinks.k1.hdfs.rollInterval = 60   | 
 |  | 
 | a1.sinks.k1.hdfs.rollSize = 134217700  | 
 |  | 
 | a1.sinks.k1.hdfs.rollCount = 0  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 5
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1 k2  | 
 | a1.channels = c1 c2  | 
 |  | 
 |  | 
 | a1.sources.r1.type = TAILDIR   | 
 | a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json   | 
 | a1.sources.r1.filegroups = f1  | 
 | a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*log.*   | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 | a1.channels.c2.type = memory  | 
 | a1.channels.c2.capacity = 1000   | 
 | a1.channels.c2.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = avro  | 
 | a1.sinks.k1.hostname = hadoop103   | 
 | a1.sinks.k1.port = 6666    | 
 |  | 
 | a1.sinks.k2.type = avro  | 
 | a1.sinks.k2.hostname = hadoop104   | 
 | a1.sinks.k2.port = 8888    | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1 c2  | 
 | a1.sinks.k1.channel = c1  | 
 | a1.sinks.k2.channel = c2  | 
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop103   | 
 | a1.sources.r1.port = 6666   | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = hdfs  | 
 | a1.sinks.k1.hdfs.path = hdfs:  | 
 |  | 
 | a1.sinks.k1.hdfs.filePrefix = upload-  | 
 |  | 
 | a1.sinks.k1.hdfs.round = true  | 
 |  | 
 | a1.sinks.k1.hdfs.roundValue = 1  | 
 |  | 
 | a1.sinks.k1.hdfs.roundUnit = hour  | 
 |  | 
 | a1.sinks.k1.hdfs.useLocalTimeStamp = true  | 
 |  | 
 | a1.sinks.k1.hdfs.batchSize = 100  | 
 |  | 
 | a1.sinks.k1.hdfs.fileType = DataStream  | 
 |  | 
 | a1.sinks.k1.hdfs.rollInterval = 60    | 
 |  | 
 | a1.sinks.k1.hdfs.rollSize = 134217700  | 
 |  | 
 | a1.sinks.k1.hdfs.rollCount = 0  | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop104   | 
 | a1.sources.r1.port = 8888  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = file_roll  | 
 | a1.sinks.k1.sink.directory = /opt/module/flume/datas/flume3  | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 6
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor,实现故障转移的功能
 |  | 
 | a1.sources = r1  | 
 | a1.channels = c1  | 
 | a1.sinkgroups = g1  | 
 | a1.sinks = k1 k2  | 
 |  | 
 |  | 
 |  | 
 | a1.sources.r1.type = netcat  | 
 | a1.sources.r1.bind = localhost   | 
 | a1.sources.r1.port = 44444   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 | a1.sinkgroups.g1.processor.type = failover   | 
 | a1.sinkgroups.g1.processor.priority.k1 = 10   | 
 | a1.sinkgroups.g1.processor.priority.k2 = 5   | 
 | a1.sinkgroups.g1.processor.maxpenalty = 10000   | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = avro  | 
 | a1.sinks.k1.hostname = hadoop103  | 
 | a1.sinks.k1.port = 1111    | 
 |  | 
 | a1.sinks.k2.type = avro  | 
 | a1.sinks.k2.hostname = hadoop104  | 
 | a1.sinks.k2.port = 2222    | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinkgroups.g1.sinks = k1 k2  | 
 | a1.sinks.k1.channel = c1  | 
 | a1.sinks.k2.channel = c1  | 
#步骤一:agent Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#步骤二:source
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 1111 
#步骤三: channel selector
a1.sources.r1.selector.type = replicating
#步骤四: channel
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
#步骤五: sinkprocessor,默认配置defaultsinkprocessor
#步骤六: sink
# Describe the sink
a1.sinks.k1.type = logger 
#步骤七:连接source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop104  | 
 | a1.sources.r1.port = 2222  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = logger   | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 7
hadoop102 上的 Flume-1 监控文件 /opt/module/group.log,
hadoop103 上的 Flume-2 监控某一个端口的数据流,
Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = TAILDIR   | 
 | a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json   | 
 | a1.sources.r1.filegroups = f1  | 
 | a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*log.*   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = avro  | 
 | a1.sinks.k1.hostname = hadoop104  | 
 | a1.sinks.k1.port = 4141    | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = netcat  | 
 | a1.sources.r1.bind = localhost   | 
 | a1.sources.r1.port = 3333  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = avro  | 
 | a1.sinks.k1.hostname = hadoop104  | 
 | a1.sinks.k1.port = 4141    | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop104  | 
 | a1.sources.r1.port = 4141   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = logger   | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  | 
# 题目 8
需求:
a1 102 接收 TailDirSource 数据,监控 /var/log/*.log,复制输出到 a2 a3
a2 103 接收 a1 数据,输出到 HDFS,failover 到本地 FileRoll
a3 104 接收 a1 数据,输出到控制台
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1 k2  | 
 | a1.channels = c1 c2  | 
 |  | 
 |  | 
 |  | 
 | a1.sources.r1.type = TAILDIR   | 
 | a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json   | 
 | a1.sources.r1.filegroups = f1   | 
 | a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.*   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 | a1.channels.c2.type = memory  | 
 | a1.channels.c2.capacity = 1000   | 
 | a1.channels.c2.transactionCapacity = 100  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = avro  | 
 | a1.sinks.k1.hostname = hadoop103  | 
 | a1.sinks.k1.port = 6666  | 
 |  | 
 | a1.sinks.k2.type = avro  | 
 | a1.sinks.k2.hostname = hadoop104   | 
 | a1.sinks.k2.port = 8888  | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1 c2  | 
 | a1.sinks.k1.channel = c1  | 
 | a1.sinks.k2.channel = c2  | 
 |  | 
 | a1.sources = r1  | 
 | a1.channels = c1  | 
 | a1.sinkgroups = g1  | 
 | a1.sinks = k1 k2  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop103  | 
 | a1.sources.r1.port = 6666   | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100   | 
 |  | 
 |  | 
 | a1.sinkgroups.g1.processor.type = failover   | 
 | a1.sinkgroups.g1.processor.priority.k1 = 10  | 
 | a1.sinkgroups.g1.processor.priority.k2 = 5   | 
 | a1.sinkgroups.g1.processor.maxpenalty = 10000   | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = hdfs  | 
 | a1.sinks.k1.hdfs.path = hdfs:  | 
 |  | 
 | a1.sinks.k1.hdfs.filePrefix = upload-  | 
 |  | 
 | a1.sinks.k1.hdfs.round = true  | 
 |  | 
 | a1.sinks.k1.hdfs.roundValue = 1  | 
 |  | 
 | a1.sinks.k1.hdfs.roundUnit = hour  | 
 |  | 
 | a1.sinks.k1.hdfs.useLocalTimeStamp = true  | 
 |  | 
 | a1.sinks.k1.hdfs.batchSize = 100  | 
 |  | 
 | a1.sinks.k1.hdfs.fileType = DataStream  | 
 |  | 
 | a1.sinks.k1.hdfs.rollInterval = 60    | 
 |  | 
 | a1.sinks.k1.hdfs.rollSize = 134217700  | 
 |  | 
 | a1.sinks.k1.hdfs.rollCount = 0  | 
 |  | 
 | a1.sinks.k2.type = logger  | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinkgroups.g1.sinks = k1 k2  | 
 | a1.sinks.k1.channel = c1  | 
 | a1.sinks.k2.channel = c1  | 
 |  | 
 | a1.sources = r1  | 
 | a1.sinks = k1  | 
 | a1.channels = c1  | 
 |  | 
 |  | 
 | a1.sources.r1.type = avro  | 
 | a1.sources.r1.bind = hadoop104  | 
 | a1.sources.r1.port = 8888  | 
 |  | 
 |  | 
 | a1.sources.r1.selector.type = replicating  | 
 |  | 
 |  | 
 |  | 
 | a1.channels.c1.type = memory  | 
 | a1.channels.c1.capacity = 1000   | 
 | a1.channels.c1.transactionCapacity = 100  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | a1.sinks.k1.type = logger  | 
 |  | 
 |  | 
 | a1.sources.r1.channels = c1  | 
 | a1.sinks.k1.channel = c1  |