Flume支持通过zookeeper来管理Agent的配置,但是这是一个实验性的功能。配置文件必须先上传到zookeeper中。以下Agent在Zookeeper节点树的结构:
- /flume
|- /a1 [Agent配置文件]
|- /a2 [Agent配置文件]
处理配置文件的类:
org.apache.flume.node.PollingZooKeeperConfigurationProvider : 如果zookeeper指定的路径有变更,就从Zookeeper重新获取配置文件。
org.apache.flume.node.StaticZooKeeperConfigurationProvider : 启动Flume后,不会重新加载配置文件,即使Zookeeper的配置文件有变更。
org.apache.flume.agent.embedded.MemoryConfigurationProvider : 从存储中读取配置文件。传入数据格式是Map。
org.apache.flume.node.PollingPropertiesFileConfigurationProvider : 定时冲硬盘读取配置文件。
org.apache.flume.node.AbstractZooKeeperConfigurationProvider创建Zookeeper客户端:
protected CuratorFramework createClient() {
return CuratorFrameworkFactory.newClient(zkConnString,
new ExponentialBackoffRetry(1000, 1));
}
Flume采用Curator作为zookeeper的客户端,Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
Curator的maven配置:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
<version>2.9.0</version>
</dependency>
Zookeeper还有一个原生态的客户端,maven配置:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
使用原生态的客户端,上传配置文件flume-zookeeper.properties到zookeeper集群:
@Test
public void uploadFileToZK() throws KeeperException, InterruptedException {
String propFilePath = "D:\\flume-zookeeper.properties";
ZooKeeper zk = null;
try {
zk = new ZooKeeper("10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181", 300000, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
}
});
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (zk.exists("/flume", true) == null) {
zk.create("/flume", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
InputStream is = null;
ByteArrayOutputStream bytestream = null;
byte[] data = null;
try {
is = new FileInputStream(propFilePath);
bytestream = new ByteArrayOutputStream();
int ch;
while ((ch = is.read()) != -1) {
bytestream.write(ch);
}
data = bytestream.toByteArray();
System.out.println(new String(data));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bytestream.close();
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建一个目录节点
Stat stat = zk.exists("/flume/a1", true);
if (stat == null) {
zk.create("/flume/a1", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.delete("/flume/a1", stat.getVersion());
zk.create("/flume/a1", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
@Test
public void get() throws KeeperException, InterruptedException {
ZooKeeper zk = null;
try {
zk = new ZooKeeper("10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181", 300000, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
}
});
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(new String(zk.getData("/flume/a1", true, null)));
}
flume-zookeeper.properties配置文件内容:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sources.r1.type = AVRO
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /logs/
a1.sinks.k1.sink.rollInterval = 0
a1.channels = c1
a1.sources = r1
a1.sinks = k1
avro获取数据,通过Memory Channel写入文件中。
配置文件上传到Zookeeper后,通过如下命令启动Flume:
$ bin/flume-ng agent –conf conf -z 10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
参数名称
默认值
描述
z
|
–
|
Zookeeper的连接字符串hostname:port列表通过逗号分隔。
|
p
|
/flume
|
Agent配置文件的根路径。
|
zookeeper中的配置如果有更新,Flume会通过PollingZooKeeperConfigurationProvider类的refreshConfiguration方法重新加载配置:private void refreshConfiguration() throws IOException {
LOGGER.info("Refreshing configuration from ZooKeeper");
byte[] data = null;
ChildData childData = agentNodeCache.getCurrentData();
if (childData != null) {
data = childData.getData();
}
flumeConfiguration = configFromBytes(data);
eventBus.post(getConfiguration());
}
分享到:
相关推荐
zookeeper,kafka,storm,flume,spark的部署安装。安装之前要确认安装好了oracle jdk,如果系统自带OpenJDK,需要先卸载,可以参考cdh的安装文档 kafka和storm依赖zookeeper,所以要先安装好zookeeper并启动之后才能...
这是一套完整的集群搭建配置文件,包含了hadoop所有组件有:Hadoop,zookeeper,Kafka,spark,flume,Scala,hive等安装配置文件
本文档包括的内容如下: 1. Hadoop 的安装与配置 2. Hive 的安装与配置 3. Flume 的安装与配置 4. Kettle 的安装与配置 5. Mysql 的安装与配置 6. Zookeeper 的安装与配置 7. Otter 的安装与配置
/ class”开头的表示实战课程,然后通过流水线Flume + Kafka + SparkStreaming进行实时日志的收集,HBase来存储数据)*注意事项(使用的软件工具及环境配置) hadoop-2.6.0-cdh5.7.0 hbase-1.2.0-cdh5.7.0 zookeeper...
文章目录一、启动 Kafka二、创建 Topic 消息队列三、查询 kafka 消息队列四、启动 consumer 监控窗口五、写 Flume 自定义配置文件六、开启 Flume七、结果分析 一、启动 Kafka kafka-server-start.sh /opt/soft/kafka...
目录如下,第一章为centos7的基础配置,详细可以看我博客 第二章 hadoop 1 ...第五章 zookeeper 45 第六章 高可用hadoop配置 46 第七章 hive 51 第八章 flume 53 第九章Azkaban 64 第十章 sqoop 83
提供clouder manager 大数据综合paas层的安装与配置步骤,可以按文档安装hadoop的大数据集群,以及spark,flume,storm,kafaka, zookeeper等相关集群
01_新能源汽车课程介绍.mp4 02_新能源汽车数仓概述.mp4 03_数仓需求介绍及框架选择.mp4 04_数仓架构介绍.mp4 05_服务器选型介绍.mp4 06_集群资源规划介绍.mp4 ...25_采集日志flume的配置文件编写.mp4 26_
运行环境:centos 6.x、java、kafka、zookeeper、Flume、Hbase、HDFS、YARN、Spark、MySQl。 安装教程 spark_student是IDEA项目,直接使用IDEA打开。(需要自行配置运行环境) 在IDEA下运行,配置好maven。(国内...
Hadoop HA集群 + Zookeeper 进行集群配置、管理 Kafka集群 + Flume + Hive 进行数据采集、存储 SparkStreaming + SparkSQL 进行数据清洗、分析 Nginx负载均衡、反向代理服务器 MySQL集群 + MongoDB集群 + Redis集群 ...
大数据与云计算教程课件 优质大数据课程 14.Hadoop集群配置(共6页).pptx 大数据与云计算教程课件 优质大数据课程 15.Hive(共46页).pptx 大数据与云计算教程课件 优质大数据课程 16.Hive操作(共43页).pptx ...
大数据与云计算教程课件 优质大数据课程 14.Hadoop集群配置(共6页).pptx 大数据与云计算教程课件 优质大数据课程 15.Hive(共46页).pptx 大数据与云计算教程课件 优质大数据课程 16.Hive操作(共43页).pptx ...
大数据与云计算教程课件 优质大数据课程 14.Hadoop集群配置(共6页).pptx 大数据与云计算教程课件 优质大数据课程 15.Hive(共46页).pptx 大数据与云计算教程课件 优质大数据课程 16.Hive操作(共43页).pptx ...
Flume,分发采用Apache Kafka,实时处理采用Spark Streaming,入侵检测采用Spark MLlib,数据存储使用HDFS与Redis,可视化采用Flask、SocketIO、Echarts、Bootstrap 项目使用 简易使用说明 开箱即用,直接在v2...
大数据与云计算教程课件 优质大数据课程 14.Hadoop集群配置(共6页).pptx 大数据与云计算教程课件 优质大数据课程 15.Hive(共46页).pptx 大数据与云计算教程课件 优质大数据课程 16.Hive操作(共43页).pptx ...
Hadoop生态系统中有多种服务,例如HDFS,Map-Reduce,HBase,Hive,Yarn,Flume,Spark,Storm,Zookeeper等,这增加了部署和配置的复杂性。 构建Hadoop集群需要花费大量时间。 尽管有一些管理工具可帮助管理员自动...
14.Hadoop集群配置 15.Hive 16.Hive操作 17.Hive查询 18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28....
14.Hadoop集群配置 15.Hive 16.Hive操作 17.Hive查询 18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28....
14.Hadoop集群配置 15.Hive 16.Hive操作 17.Hive查询 18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28....
14.Hadoop集群配置 15.Hive 16.Hive操作 17.Hive查询 18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28....