Skip to content

一、概述

Flume是的一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数据进行简单处理并写到各种数据接收方的能力。

Flume的设计原理是基于数据流的,能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。 Flume能够做到近似实时的推送,并且可以满足数据量是持续且量级很大的情况。比如它可以收集社交网站日志,并将这些数量庞大的日志数据从网站服务器上汇集起来,存储到HDFS或 HBase分布式数据库中。

Flume的应用场景:比如一个电商网站,想从网站访问者中访问一些特定的节点区域来分析消费者的购物意图和行为。为了实现这一点,需要收集到消费者访问的页面以及点击的产品等日志信息,并移交到大数据 Hadoop平台上去分析,可以利用 Flume做到这一点。现在流行的内容推送,比如广告定点投放以及新闻私人定制也是基于这个道理。

1.1 相关概念

Event

事件是Flume内部数据传输的最基本单元,将传输的数据进行封装。事件本身是由一个载有数据的字节数组和可选的headers头部信息构成,如下图所示。Flume以事件的形式将数据从源头传输到最终的目的地。

Agent

Flume Agent 是一个JVM进程,通过三个组件(source、channel、sink)将事件流从一个外部数据源收集并发送给下一个目的地。

Source

从数据发生器接收数据,并将数据以Flume的Event格式传递给一个或多个通道(Channel)

支持Source:

Channel

一种短暂的存储容器,位于 Source和Sink之间,起着桥梁的作用。 Channel将从Source处接收到的 Event格式的数据缓存起来,当Sink成功地将 Events发送到下一跳的Channel或最终目的地后, Events从 Channel移除。Channel是一个完整的事务,这一点保证了数据在收发的时候的一致性。可以把 Channel看成一个FIFO(先进先出)队列,当数据的获取速率超过流出速率时,将Event保存到队列中,再从队中一个个出来。

有以下几种Channel:

  • Memory Channel 事件存储在可配置容量的内存队列中,队列容量即为可存储最大事件数量,适用于高吞吐量场景,在agent出现错误时有可能会丢失部分数据
  • File Channel 基于文件系统的持久化存储
  • Spillable Memory Channel 内存和文件混合Channel,当内存队列满了之后,新的事件会存储在文件系统,目前处于实验阶段,不建议在生产环境中使用
  • JDBC Channe 事件存储在持久化的数据库中,目前只支持Derby
  • Kafka Channel 事件存储在Kafka集群中
  • Pseudo Transaction Channel 伪事务Channel,仅用于测试,不能在生产环境使用
  • Custom Channel 自定义Channel

Sink

获取Channel暂时保存的数据并进行处理。sink从channel中移除事件,并将其发送到下一个agent(简称下一跳)或者事件的最终目的地,比如HDFS。

Sink分类:

1.2 流程

(1)外部数据源(Web Server)将Flume可识别的 Event发送到 Source (2) Source收到 Event事件后存储到一个或多个Channel通道中。 (3)Channel保留 Event直到Sink将其处理完毕。 (4)Sink从 Channel中取出数据,并将其传输至外部存储(HDFS)。

1.3 可靠性

事件在每个agent的channel中短暂存储,然后事件被发送到下一个agent或者最终目的地。事件只有在存储在下一个channel或者最终存储后才从当前的channel中删除。

Flume使用事务的办法来保证Events的可靠传递。Source和Sink分别被封装在事务中,事务由保存Event的存储或者Channel提供。这就保证了Event在数据流的点对点传输中是可靠的。在多跳的数据流中,上一跳的sink和下一跳的source均运行事务来保证数据被安全地存储到下一跳的channel中。

二、SDK使用

2.1使用Demo

自定义Source

java
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

添加MySource

java
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    // 处理数据
    public Status process() throws EventDeliveryException {
        Status status = null;
        try {
            // 接收新数据
            for (int i = 0; i < 10; i++) {
                Event e = new SimpleEvent();
                e.setBody(("data:"+i).getBytes());
                // 将数据存储到与Source关联的Channel中
                getChannelProcessor().processEvent(e);
                status = Status.READY;
            }
            Thread.sleep(5000);
        } catch (Throwable t) {
            // 打印日志
            status = Status.BACKOFF;
            // 抛出异常
            if (t instanceof Error) {
                throw (Error)t;
            }
        } finally {

        }
        return status;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    public void configure(Context context) {

    }


}

添加mySourceAgent.conf

properties
# 定义agent名称为a1


<NolebasePageProperties />




# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为mysource
a1.sources.r1.type = com.itheima.flume.source.MySource

# 配置sink类型为Logger
a1.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

shell
flume-ng agent -n a1 -c conf -f mySourceAgent.conf

自定义Sink

java
public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}

添加MySink,可以参考LoggerSink

java
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(MySink.class);
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 开启事务
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        try {
            txn.begin();
            // 从channel中获取数据
            Event event = ch.take();
            if(event==null){
                status = Status.BACKOFF;
            }
            // 将事件发送到外部存储
            // storeSomeData(e);
            // 打印事件
            logger.info(new String(event.getBody()));
            // 提交事务
            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            txn.rollback();
            // 打印异常日志
            status = Status.BACKOFF;
            // 抛出异常
            if (t instanceof Error) {
                throw (Error)t;
            }
        }finally {
            txn.close();
        }
        return status;
    }

    public void configure(Context context) {

    }
}

修改mySourceAgent.conf

properties
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为mysource
a1.sources.r1.type = com.itheima.flume.source.MySource

# 配置sink类型为MySink
a1.sinks.k1.type = com.itheima.flume.sink.MySink

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三、测试Flume

3.1 Netcat测试Flume

使用Flume监听某个端口,使用Netcat向这个端口发送数据,Flume将接收到的数据打印到控制台。

Netcat是一款TCP/UDP测试工具,可以通过以下命令安装

shell
yum install -y nc
  • NetCat TCP Source

    必须属性

    属性名默认值说明
    channels
    typenetcat
    bind绑定的主机名或者IP地址
    port绑定端口
  • Memory Channel

    必须属性

    属性名默认值说明
    typememory
  • Logger Sink

    必须属性

    属性名默认值说明
    channel
    typelogger

添加配置文件

在apache-flume-1.9.0-bin/conf目录下添加配置文件example.conf

properties
# example.conf: 单节点Flume配置
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink类型为Logger
a1.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

查看Flume使用命令

shell
flume-ng help

启动agent

shell
flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console

或者

shell
flume-ng agent -n a1 -c conf -f example.conf -Dflume.root.logger=INFO,console
shell
# 将绑定端口配置为IP地址,绑定为localhost或者127.0.0.1在另外一台机器上无法连接
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.85.132
a1.sources.r1.port = 44444

如果开启了防火墙,需要添加防火墙规则

shell
firewall-cmd --zone=public --add-port=44444/tcp --permanent
firewall-cmd --reload

3.2 使用telnet测试

shell
# 查看有无安装
yum list | grep telnet

# 安装
yum install -y telnet.x86_64
yum install -y telnet-server.x86_64

# 启动测试
telnet 127.0.0.1 44444

四、重要组件

4.1 数据持久化 File Channel

使用组件

属性设置

属性名默认值说明
typefile
checkpointDir~/.flume/file-channel/checkpoint检查点文件存放路径
dataDirs~/.flume/file-channel/data日志存储路径,多个路径使用逗号分隔. 使用不同的磁盘上的多个路径能提高file channel的性能

配置

添加配置文件file-channel.conf,添加一个FileChannel

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
# 多个channel使用空格分隔
a1.channels = c1 c2

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink类型为Logger
a1.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置FileChannel,checkpointDir为检查点文件存储目录,dataDirs为日志数据存储目录,
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/soft/bak/flume/checkpoint
a1.channels.c2.dataDirs = /opt/soft/bak/flume/data

# 将source和sink绑定到channel上
# source同时绑定到c1和c2上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

为了方便日志打印,可以将-Dflume.root.logger=INFO,console添加在conf的环境配置中,从模板复制一份配置

shell
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
# 添加JAVA_OPTS
export JAVA_OPTS="-Dflume.root.logger=INFO,console"

启动验证

启动Flume

shell
flume-ng agent -n a1 -c ./ -f file-channnel.conf

通过Netcat发送数据,,此时发送到c2的数据没有被消费,关闭Flume,修改配置文件

shell
# 将sink绑定到c2上
a1.sinks.k1.channel = c2

重启Flume,可以看到会重新消费c2的数据

4.2 日志文件监控 Exec Soucre

企业中应用程序部署后会将日志写入到文件中,可以使用Flume从各个日志文件将日志收集到日志中心以便于查找和分析。

Exec Soucre

Exec Source通过指定命令监控文件的变化,加粗属性为必须设置的。

属性名默认值说明
channels
typeexec
command要执行的命令
restartfalse如果执行命令挂了是否要重启
batchSize20同时往channel发送的最大行数
batchTimeout3000批量发送超时时间
selector.typereplicatingchannel选择器replicating 或者 multiplexing
selector.*通道选择器匹配属性
interceptors拦截器
interceptors.*

配置

添加配置文件exec-log.conf

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为exec,命令为 tail -F app.log
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F app.log

# 配置sink类型为Logger
a1.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动验证

启动Flume

shell
flume-ng agent -n a1 -c conf -f file-log.conf -Dflume.root.logger=INFO,console

通过命令更新app.log文件

shell
echo "abcdef">> app.log

可以查看agent控制台接收到了最新的日志

4.3 解决重复消费 Taildir Source

属性名默认值说明
channels
typeTAILDIR.
filegroups可以定义多个组. 每个组里包含一序列被监控的文件
filegroups.<filegroupName>被监控文件的绝对路径,文件名支持正则表达式
positionFile~/.flume/taildir_position.json记录监控文件的绝对路径、上次读取位置的json文件

配置

新增dir-log.conf

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为TAILDIR
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/soft/flume/position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /opt/soft/apache-flume-1.9.0-bin/conf/applogs/.*log

# 配置sink类型为Logger
a1.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.4 多个agent模型

将多个Flume agent 程序连接在一起,其中一个agent的sink将数据发送到另一个agent的source。Avro文件格式是使用Flume通过网络发送数据的标准方法。从多个Web服务器收集日志,发送到一个或多个集中处理的agent,之后再发往日志存储中心.同样的日志发送到不同的目的地。

第一个agent从Netcat接收数据,增加一个channel和sink,将这个sink发送到第二个agent

第二个agent在监控文件变化的同时监控从sink发送来的事件,最终输出到控制台

Avro Sink属性

使用Avro Sink,必须设置以下属性

属性名默认值Description
channel
typeavro
hostname绑定的主机名或者IP地址
port监听端口

Avro Source 属性

使用Avro Source,必须设置以下属性

属性名默认值说明
channels
typeavro
bind绑定的主机名或者IP地址
port监听端口

配置

添加agent1配置文件

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink1类型为Logger
a1.sinks.k1.type = logger
# 配置sink2类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 55555

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

添加agent2配置文件

shell
# 定义agent名称为a2
# 设置3个组件的名称
a2.sources = r1 r2
a2.sinks = k1
a2.channels = c1

# 配置source类型为exec,命令为 tail -F app.log
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F app.log

# 配置source类型为avro
a2.sources.r2.type = avro
a2.sources.r2.bind = 192.168.85.132
a2.sources.r2.port = 55555

# 配置sink类型为Logger
a2.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a2.sources.r1.channels = c1
a2.sources.r2.channels = c1
a2.sinks.k1.channel = c1

启动验证

启动agent1和agent2

shell
flume-ng agent -n a1 -c conf -f agent1.conf
flume-ng agent -n a2 -c conf -f agent2.conf

先往app.log中写入日志,可以在agent2看到最新数据

打开Netcat连接到44444,发送数据,可以同时在agent1和agent2看到最新数据。

4.5 使用Flume导入数据到HDFS

属性

数据导出到HDFS需要使用HDFS Sink,需要配置属性如下:

属性名默认值说明
channel
typehdfs
hdfs.pathHDFS 文件路径 (例如 hdfs://namenode/flume/webdata/)
hdfs.fileTypeSequenceFile文件格式: SequenceFile, DataStream or CompressedStream (1)DataStream 不会压缩输出文件且不用设置 codeC (2)CompressedStream 需要设置 hdfs.codeC
hdfs.codeC压缩格式 : gzip, bzip2, lzo, lzop, snappy

注:使用HDFS Sink需要用到Hadoop的多个包,可以在装有Hadoop的主机上运行Flume,如果是单独部署的Flume,可以通过多个Agent的形式将单独部署的Flume Agent 日志数据发送到装有Hadoop的Flume Agent上。

配置

创建hdfs.conf

properties
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink类型为hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动验证

启动flume

shell
bin/flume-ng agent --conf conf/ --conf-file conf/hdfs.conf -Dfile.root.logger=debug,info,console --name hdfs

注:如果出现com.google.common.base.Preconditions.checkArgument 查看下flume/lib目录下

guava.jar版本是否与hadoop/share/hadoop/common/lib中的版本是否一致,不一致拷贝新版

在后台查看

shell
hadoop fs -cat /user/flume/messages/flume-.1578835130630

五、拦截器

拦截器可以修改或者丢弃事件,Flume支持链式调用拦截器,拦截器定义在source中

5.1 Host Interceptor

这个拦截器将运行agent的hostname 或者 IP地址写入到事件的headers中

属性名默认值说明
typehost
preserveExistingfalse如果header已经存在host, 是否要保留 - true保留原始的,false写入当前机器
useIPtruetrue为IP地址, false为 hostname.
hostHeaderhostheader中key的名称

打开另外一台虚拟机,安装好Flume

在flume/conf目录下新建app.log文件

shell
touch app.log

配置

添加agent3.conf,这个agent监控本地的app.log,将数据发送到虚拟机132上

shell
# 定义agent名称为a3
# 设置3个组件的名称
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 配置source类型为exec,命令为 tail -F app.log
a3.sources.r1.type = exec
a3.sources.r1.command = tail -F app.log
# 配置拦截器为host
a3.sources.r1.interceptors = i1
a3.sources.r1.interceptors.i1.type = host

# 配置sink类型为avro
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = 192.168.85.132
a3.sinks.k1.port = 55555

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

这里注意需要在132上开启防火墙端口

shell
firewall-cmd --zone=public --add-port=55555/tcp --permanent
firewall-cmd --reload

启动验证

启动agent3

shell
flume-ng agent -n a3 -c conf -f agent3.conf

可以在132的agent2控制台看到agent3已经连接成功

往135的app.log中写入数据

shell
echo "data from 135" >> app.log

可以在agent2上看到添加了headers

修改agent2.conf,添加拦截器

shell
# 配置拦截器
a2.sources.r2.interceptors = i1
a2.sources.r2.interceptors.i1.type = host
a2.sources.r2.interceptors.i1.preserveExisting = false

可以看到135发送来的事件中header被修改成了本机的

5.2 Timestamp Interceptor

这个拦截器将当前时间写入到事件的headers中

属性名默认值说明
typetimestamp
headerNametimestampheader中key的名称
preserveExistingfalseIf the timestamp already exists, should it be preserved - true or false
shell
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp

5.3 Static Interceptor

运行用户对所有的事件添加固定的header

属性名默认值说明
typestatic
preserveExistingtrueIf configured header already exists, should it be preserved - true or false
keykeyheader 中key名称
valuevalueheader 中value值
shell
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK

5.4 UUID Interceptor

shell
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

5.5 Search and Replace Interceptor

sh
a1.sources.r1.interceptors = i1 i2 i3 i4 i5
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i5.type = search_replace
a1.sources.r1.interceptors.i5.searchPattern = \\d{6}
a1.sources.r1.interceptors.i5.replaceString = ******

5.6 自定义拦截器

  • 依赖

    xml
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
  • 自定义拦截器

    java
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    
    public class MyInterceptor implements Interceptor {
        private static final Logger logger = LoggerFactory
                .getLogger(MyInterceptor.class);
        public void initialize() {
    
        }
    
        /**
         * 拦截单个事件
         * @param event
         * @return
         */
        public Event intercept(Event event) {
            String host = event.getHeaders().get("host");
            logger.info("接收到host为:"+host);
            if (host.equals("192.168.85.135")) {
                logger.info("丢弃事件");
                return null;
            }
            return event;
        }
    
        public List<Event> intercept(List<Event> list) {
            List<Event> newList = new ArrayList<Event>();
            for (Event event : list) {
                event = intercept(event);
                if(event!=null){
                    newList.add(event);
                }
            }
            return newList;
        }
    
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder{
    
            public Interceptor build() {
                return new MyInterceptor();
            }
    
            public void configure(Context context) {
    
            }
        }
    }
  • 将项目打成jar包后复制到Flume安装目录的lib目录中,并修改agent2.conf

    shell
    # 配置拦截器
    a2.sources.r2.interceptors = i1
    a2.sources.r2.interceptors.i1.type = com.itheima.flume.interceptor.MyInterceptor$Builder

六、Channel选择器

6.1 Replicating Channel Selector

复制选择器,如果没有指定,这个为默认选择器

可选属性如下

属性名默认值说明
selector.typereplicatingreplicating
selector.optionaloptional

使用案例:

shell
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

上面的配置中,c3是一个可选的channel,写入c3失败的话会被忽略,c1和c2没有标记为可选,如果写入c1和c2失败会导致事务的失败。

6.2 Multiplexing Channel Selector

多路channel选择器,可选属性如下

属性名默认值说明
selector.typereplicatingmultiplexing
selector.headerflume.selector.header键值Key
selector.default
selector.mapping.*路由

使用案例:

shell
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4

这里通过事件的header值来判断将事件发送到哪个channel,可以配合拦截器一起使用。

shell
# 创建multichannel目录
mkdir multichannel
cd multichannel

创建agent1.conf

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink1类型为Logger
a1.sinks.k1.type = logger

# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100

a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100


# 将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4

添加agent2.conf

shell
# 定义agent名称为a2
# 设置3个组件的名称
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 配置source类型为avro
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.85.132
a2.sources.r1.port = 4040


# 配置sink类型为logger
a2.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

添加agent3.conf

shell
# 定义agent名称为a3
# 设置3个组件的名称
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 配置source类型为avro
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.85.132
a3.sources.r1.port = 4041


# 配置sink类型为logger
a3.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

添加agent4.conf

shell
# 定义agent名称为a4
# 设置3个组件的名称
a4.sources = r1
a4.sinks = k1
a4.channels = c1

# 配置source类型为avro
a4.sources.r1.type = avro
a4.sources.r1.bind = 192.168.85.132
a4.sources.r1.port = 4042


# 配置sink类型为logger
a4.sinks.k1.type = logger

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

启动agent2、agent3、agent4和agent1

shell
flume-ng agent -n a2 -c conf -f agent2.conf
flume-ng agent -n a3 -c conf -f agent3.conf
flume-ng agent -n a4 -c conf -f agent4.conf
flume-ng agent -n a1 -c conf -f agent1.conf

使用Netcat往agent1发送消息,可以在agent2\3\4上看到消息

修改agent1.conf,配置通道选择器

shell
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CZ
# 配置通道选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4

运行所有agent,发送数据到agent1,可以看到只有agent2收到了数据,修改拦截器的值为US,结果是agent3收到数据。

七、Sink处理器

可以将多个sink放入到一个组中,Sink处理器能够对一个组中所有的sink进行负载均衡,在一个sink出现临时错误时进行故障转移。

必须设置属性:

属性名默认值说明
sinks组中多个sink使用空格分隔
processor.typedefaultdefault, failoverload_balance

举例:

shell
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover

7.1 Default Sink Processor

默认的Sink处理器只支持单个Sink

7.2 Failover Sink Processor

故障转移处理器维护了一个带有优先级的sink列表,故障转移机制将失败的sink放入到一个冷却池中,如果sink成功发送了事件,将其放入到活跃池中,sink可以设置优先级,数字越高,优先级越高,如果一个sink发送事件失败,下一个有更高优先级的sink将被用来发送事件,比如,优先级100的比优先级80的先被使用,如果没有设置优先级,按配置文件中配置的顺序决定。设置属性如下:

属性名默认值说明
sinks组内多个sinks空格分隔
processor.typedefaultfailover
processor.priority.优先级
processor.maxpenalty30000失败sink的最大冷却时间

示例如下:

shell
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

修改以上agent1.conf

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 配置sink1类型为Logger
a1.sinks.k1.type = logger

# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

启动agent2\3\4和agent1,通过Netcat发送消息到agent1,可以看到消息一直发送给其中一个agent(agent3优先级最高),将这个agent关闭后,消息会发送到其他agent。

7.3 Load balancing Sink Processor

负载均衡处理器,可以通过轮询或者随机的方式进行负载均衡,也可以通过继承AbstractSinkSelector 自定义负载均衡,设置属性如下:

属性名默认值说明
processor.sinks组内多个sinks空格分隔
processor.typedefaultload_balance
processor.backofffalse是否将失败的sink加入黑名单
processor.selectorround_robin轮询机制:round_robin, random 或者自定义
processor.selector.maxTimeOut30000黑名单有效时间(单位毫秒)

示例如下:

shell
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

修改上面的agent1.conf,将所有的sink都绑定到c1上

shell
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

# 配置sink1类型为Logger
a1.sinks.k1.type = logger

# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042

# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

八、Ganglia监控

Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond(监控守护进程)、gmetad(元数据守护进程)以及一个Web前端。主要是用来监控系统性能,如:cpu 、mem、硬盘利用率, I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。

8.1 Ganglia安装

中心节点的安装

  • epel包的安装:yum install -y epel-release(解决不能yum安装某些安装包的问题)
  • gmetad的安装:yum install -y ganglia-gmetad
  • gmond的安装:yum install -y ganglia-gmond
  • rrdtool的安装:yum install -y rrdtool
  • httpd服务器的安装:yum install -y httpd
  • ganglia-web及php安装:yum install -y ganglia-web php

被监测节点的安装

  • epel包的安装:yum install -y epel-release(解决不能yum安装某些安装包的问题)
  • gmond的安装:yum install -y gmond(提示找不到,感觉应该换成上面那个yum install -y ganglia-gmond)

8.2 Ganglia配置

中心节点的配置

安装目录说明

  • ganglia配置文件目录:/etc/ganglia
  • rrd数据库存放目录:/var/lib/ganglia/rrds
  • ganglia-web安装目录:/usr/share/ganglia
  • ganglia-web配置目录:/etc/httpd/conf.d/ganglia.conf

相关配置文件修改 将ganglia-web的站点目录连接到httpd主站点目录

shell
$  ln -s /usr/share/ganglia /var/www/html

修改httpd主站点目录下ganglia站点目录的访问权限 将ganglia站点目录访问权限改为apache:apache,否则会报错

shell
$  chown -R apache:apache /var/www/html/ganglia
$  chmod -R 755 /var/www/html/ganglia

修改rrd数据库存放目录访问权限 将rrd数据库存放目录访问权限改为nobody:nobody,否则会报错

shell
$  chown -R nobody:nobody /var/lib/ganglia/rrds

修改ganglia-web的访问权限: 修改/etc/httpd/conf.d/ganglia.conf

shell
Alias /ganglia /usr/share/ganglia
<Location /ganglia> 
 Require all granted
 #Require ip 10.1.2.3
 #Require host example.org
</Location>

修改dwoo下面的权限

shell
chmod 777 /var/lib/ganglia/dwoo/compiled
chmod 777 /var/lib/ganglia/dwoo/cache

配置/etc/ganglia/gmetad.conf

shell
data_source  "my cluster" 192.168.85.132:8649(注意是所有节点都加上,如master:8649 slave0x:8649)
 
setuid_username nobody

配置/etc/ganglia/gmond.conf

shell
cluster { 
  name = "node01"
  ... 
} 
udp_send_channel { 
  # the host who gather this cluster's monitoring data and send these data   to gmetad node
  #注释掉多播模式的,以下出现这个都要注释掉
 #mcast_join = 239.2.11.71
 #添加单播模式的
 host = 192.168.85.132
 port = 8649 
} 
udp_recv_channel { 
  bind = 192.168.85.132
  port = 8649 
} 
tcp_accept_channel { 
  port = 8649 
}

被监测节点的配置

配置/etc/ganglia/gmond.conf

shell
cluster { 
  name = "hadoop cluster"
  ... 
} 
udp_send_channel { 
  # the host who gather this cluster's monitoring data and send these data   to gmetad node
 host = 192.168.26.139  
 port = 8649 
} 
udp_recv_channel { 
  port = 8649 
} 
tcp_accept_channel { 
  port = 8649 
}

8.3 Ganglia启动

中心节点的启动

start httpd, gmetad, gmond

shell
$ systemctl start httpd.service
$ systemctl start gmetad.service
$ systemctl start gmond.service
$ systemctl enable httpd.service
$ systemctl enable gmetad.service
$ systemctl enable gmond.service

被监测节点的启动

start gmond

shell
$ systemctl start gmond.service
$ systemctl enable gmond.service

关闭selinux

vi /etc/selinux/config,把SELINUX=enforcing改成SELINUX=disable;该方法需要重启机器。 可以使用命令setenforce 0来关闭selinux而不需要重启,刷新页面,即可访问;不过此法只是权宜之计,如果想永久修改selinux设置,还是要使用第一种方法

开启防火墙

shell
firewall-cmd --zone=public --add-port=80/tcp --permanent
firewall-cmd --reload

访问网页

浏览器访问 {namenode的ip}/ganglia即可

8.4 添加Flume监控

修改flume-env.sh配置,添加虚拟机选项

shell
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.85.132:8649 -Xms100m -Xmx200m"

8.5 启动flume

shell
flume-ng agent -n a1 -c conf -f example.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.85.132:8649

查看ganglia页面

字段(图表名称)字段含义
EventPutAttemptCountsource尝试写入channel的事件总数量
EventPutSuccessCount成功写入channel且提交的事件总数量
EventTakeAttemptCountsink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据。
EventTakeSuccessCountsink成功读取的事件的总数量
StartTimechannel启动的时间(毫秒)
StopTimechannel停止的时间(毫秒)
ChannelSize目前channel中事件的总数量
ChannelFillPercentagechannel占用百分比
ChannelCapacitychannel的容量

九、安装

9.1 常规安装

Flume下载页面:http://flume.apache.org/download.html

sh
# 解压命令
tar xzf apache-flume-1.9.0-bin.tar.gz
# 进入到apache-flume-1.9.0-bin 目录
cd apache-flume-1.9.0-bin
# Flume使用需要依赖JDK1.8以上环境,确保已安装
# 将Flume安装目录配置到PATH中,方便在任意目录使用
vi /etc/profile
# 添加以下内容
export JAVA_HOME=/opt/soft/jdk1.8.0_231
export JRE_HOME=/opt/soft/jdk1.8.0_231/jre
export FLUME_HOME=/opt/soft/apache-flume-1.9.0-bin
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLUME_HOME/bin:
# 保存成功后刷新
source /etc/profile
# 查看是否设置成功
echo $FLUME_HOME