好文档 - 专业文书写作范文服务资料分享网站

基于Flume的美团日志收集系统

天下 分享 时间: 加入收藏 我要投稿 点赞

4.7 提供实时流

美团的部分业务,如实时推荐,反爬虫服务等服务,需要处理实时的数据流。因此我们希望Flume能够导出一份实时流给Kafka/Storm系统。

一个非常重要的要求是实时数据流不应该受到其它Sink的速度影响,保证实时数据流的速度。这一点,我们是通过Collector中设置不同的Channel进行隔离,并且DualChannel的大容量保证了日志的处理不受Sink的影响。 5 系统监控

对于一个大型复杂系统来说,监控是必不可少的部分。设计合理的监控,可以对异常情况及时发现,只要有一部手机,就可以知道系统是否正常运作。对于美团的日志收集系统,我们建立了多维度的监控,防止未知的异常发生。 5.1 发送速度,拥堵情况,写Hdfs速度

通过发送给zabbix的数据,我们可以绘制出发送数量、拥堵情况和写Hdfs速度的图表,对于超预期的拥堵,我们会报警出来查找原因。

下面是Flume Collector HdfsSink写数据到Hdfs的速度截图:

下面是Flume Collector的FileChannel中拥堵的events数据量截图:

6

5.2 flume写hfds状态的监控

Flume写入Hdfs会先生成tmp文件,对于特别重要的日志,我们会每15分钟左右检查一下各个Collector是否都产生了tmp文件,对于没有正常产生tmp文件的Collector和日志我们需要检查是否有异常。这样可以及时发现Flume和日志的异常. 5.3 日志大小异常监控

对于重要的日志,我们会每个小时都监控日志大小周同比是否有较大波动,并给予提醒,这个报警有效的发现了异常的日志,且多次发现了应用方日志发送的异常,及时给予了对方反馈,帮助他们及早修复自身系统的异常。

通过上述的讲解,我们可以看到,基于Flume的美团日志收集系统已经是具备高可用性,高可靠性,可扩展等特性的分布式服务。

基于Flume的美团日志收集系统(二)改进和优化

问题导读:

1.Flume的存在些什么问题?

2.基于开源的Flume美团增加了哪些功能? 3.Flume系统如何调优?

在《基于Flume的美团日志收集系统(一)架构和设计》中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计。在本节中,我们将会讲述在实际部署和使用过程中遇到的问题,对Flume的功能改进和对系统做的优化。 1 Flume的问题总结

在Flume的使用过程中,遇到的主要问题如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常;使用FileChannel又导致IO繁忙的问题;

b. HdfsSink的性能问题:使用HdfsSink向Hdfs写日志,在高峰时间速度较慢; c. 系统的管理问题:配置升级,模块重启等; 2 Flume的功能改进和优化点

从上面的问题中可以看到,有一些需求是原生Flume无法满足的,因此,基于开源的Flume我们增加了许多功能,修改了一些Bug,并且进行一些调优。下面将对一些主要的方面做一些说明。

2.1 增加Zabbix monitor服务

一方面,Flume本身提供了http, ganglia的监控服务,而我们目前主要使用zabbix做监控。因此,我们为Flume添加了zabbix监控模块,和sa的监控服务无缝融合。 另一方面,净化Flume的metrics。只将我们需要的metrics发送给zabbix,避免 zabbix server造成压力。目前我们最为关心的是Flume能否及时把应用端发送过来的日志写到Hdfs上, 对应关注的metrics为: ? Source : 接收的event数和处理的event数 ? Channel : Channel中拥堵的event数 ? Sink : 已经处理的event数

2.2 为HdfsSink增加自动创建index功能

7

首先,我们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink可以读取hadoop配置文件中提供的编码类列表,然后通过配置的方式获取使用何种压缩编码,我们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于以下测试数据: event大小(Byte) 544 544

压缩

sink.batch-size hdfs.batchSize

格式 300 300

10000 10000

bz2 lzo

总数据大小(G) 9.1 9.1

耗时(s)

平均events/s

压缩后大小(G) 1.36 3.49

2448 6833 612

27333

其次,我们的HdfsSink增加了创建lzo文件后自动创建index功能。Hadoop提供了对lzo创建索引,使得压缩文件是可切分的,这样Hadoop Job可以并行处理数据文件。HdfsSink本身lzo压缩,但写完lzo文件并不会建索引,我们在close文件之后添加了建索引功能。 /**

* Rename bucketPath file from .tmp to permanent location. */

private void renameBucket() throws IOException, InterruptedException { if(bucketPath.equals(targetPath)) { return; }

final Path srcPath = new Path(bucketPath); final Path dstPath = new Path(targetPath);

callWithTimeout(new CallRunner() { @Override

public Object call() throws Exception {

if(fileSystem.exists(srcPath)) { // could block

LOG.info(\ fileSystem.rename(srcPath, dstPath); // could block

//index the dstPath lzo file

if (codeC != null && \ LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());

lzoIndexer.index(dstPath); } }

return null; } }); }

2.3 增加HdfsSink的开关

8

我们在HdfsSink和DualChannel中增加开关,当开关打开的情况下,HdfsSink不再往Hdfs上写数据,并且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。

2.4 增加DualChannel

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。 其具体的逻辑如下: /***

* putToMemChannel indicate put event to memChannel or fileChannel

* takeFromMemChannel indicate take event from memChannel or fileChannel * */

private AtomicBoolean putToMemChannel = new AtomicBoolean(true); private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

void doPut(Event event) {

if (switchon && putToMemChannel.get()) { //往memChannel中写数据

memTransaction.put(event);

if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) { putToMemChannel.set(false); } } else {

//往fileChannel中写数据 fileTransaction.put(event); } }

Event doTake() {

Event event = null;

if ( takeFromMemChannel.get() ) { //从memChannel中取数据

event = memTransaction.take(); if (event == null) {

takeFromMemChannel.set(false); } } else {

//从fileChannel中取数据

event = fileTransaction.take(); if (event == null) {

takeFromMemChannel.set(true);

9

putToMemChannel.set(true); } }

return event; }

2.5 增加NullChannel

Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。 2.6 增加KafkaSink

为支持向Storm提供实时数据流,我们增加了KafkaSink用来向Kafka写实时数据流。其基本的逻辑如下:

public class KafkaSink extends AbstractSink implements Configurable { private String zkConnect; private Integer zkTimeout; private Integer batchSize; private Integer queueSize; private String serializerClass; private String producerType; private String topicPrefix;

private Producer producer;

public void configure(Context context) { //读取配置,并检查配置 }

@Override

public synchronized void start() { //初始化producer

}

@Override

public synchronized void stop() { //关闭producer

}

@Override

public Status process() throws EventDeliveryException {

Status status = Status.READY;

10

基于Flume的美团日志收集系统

4.7提供实时流美团的部分业务,如实时推荐,反爬虫服务等服务,需要处理实时的数据流。因此我们希望Flume能够导出一份实时流给Kafka/Storm系统。一个非常重要的要求是实时数据流不应该受到其它Sink的速度影响,保证实时数据流的速度。这一点,我们是通过Collector中设置不同的Channel进行隔离,并且DualChannel的大容量保证了日志的处
推荐度:
点击下载文档文档为doc格式
  • 正文标题

  • 上下篇章

  • 相关推荐

  • 精选图文

252473l9y91klhk34qsi