banner

探寻从HDFS到Spark的高效数据通道:以小文件输入为案例

作者: 大数据观察来源: 大数据观察时间:2016-12-01 15:27:030

为了保证高效的数据移动,locality是大数据栈以及分布式应用程序所必须保证的性质,这一点在Spark中尤为明显。如果数据集大到不能保证完全放入内存,那就不能贸然使用cache()将数据固化到内存中。如果读取数据不能保证较好的locality性质的话,不论是对即席查询还是迭代计算都将面临输入瓶颈。而作为常用的分布式文件系统,HDFS承担着数据存储、一致性保证等关键问题。HDFS自开发之初就与Google GFS一脉相承,因此也继承了其无法较好的处理小文件的问题,但大量小文件输入又是分布式计算中常见场景。本文以小文件输入为案例,看看从HDFS到Spark的数据通道中到底发生了什么,并讨论如何设计有效的小文件输入。了解了这些话题,可以更高效的使用Spark。背景MLlib进展如火如荼,近期最令人振奋的消息莫过于MLlib对sparse vector的支持,以及随之而来的一系列重构和改进工作。机器学习一般算法的输入是训练集和测试集,通常来说是(label, key : value)这样的序对。对于这种输入,直接使用SparkContext提供的textFile()接口就好了,MLlib内部会转换成LabeledPoint类。但MLlib还缺少图模型算法,如LDA。LDA (Latent Allocation Dirichlet)算法常用来获取文档集合的主题,是机器学习中广泛应用的算法,其输入格式和核心组件与常见的机器学习分类、聚类算法不同。两个月之前,笔者有一份差不多要完成的基于Spark的并行LDA算法准备提交给Spark社区,同时也在准备酝酿已久的学术论文。当笔者完成了LDA算法的核心模块Gibbs sampling之后,突然发觉要想实现一个“可用的“LDA算法,不仅仅是一个核心功能这么简单,还牵扯到许多零碎的事情。所谓零碎的事情,其实并不简单。机器学习算法就是这样,学起来难,但是真正懂了之后发现核心算法特别简单,预处理又非常难。总之,机器学习算法学起来难的地方做起来简单,但是学起来简单的地方, 并不见得很快就能做好。

 

QCon全球软件开发大会(北京站)2014,4月25-27日,诚邀莅临。大部分的零碎工作在语料库的预处理和后续输出的模型的使用上,这些零碎的工作机器学习者们都不怎么注重,因为书本上很少会讲到这些知识。就拿后者来说,模型后续使用这件事儿看起来小,其实不然,这关系到机器学习算法的实际运用能力。我们做模型的最终目的除了发论文之外还是想让它对现实生活产生影响。学术派关注模型多,但对学术和工业的结合看的相对少一点,线下模型如何轻松部署?模型可否增量训练?模型的训练和使用是否可以同步进行?是否可以做到对模型的在线查询?这都是将机器学习“搬出实验室”的关键问题。这类问题在Strata大会上有很多工业界人士做了很好的讲解,比如这里。闲言莫谈,回到语料库的预处理工作。关于分词的问题不多谈,笔者学习ScalaNLP的做法,直接在Lucene的分词实现上裹了一层Scala的接口。但是在语料集的输入上花了很多时间。Spark目前所有的标准输入接口是SparkContext类中提供的textFile(path, miniSplits)接口,但该接口不适合语料库的直接输入,因为这是一个文本行处理函数,每次只能操作其中的一行文本。而LDA更期待的输入格式是Key-Value对,其中key是文件的绝对路径(便于分辨和去重),value是文件的全部内容。由于Spark下层多使用HDFS作为输入,因此笔者打算自己定制InputFormat。LDA应用场景首先得说明一些问题。LDA的实际使用场景有二:第一种是在实验室环境下使用,这是最直观的情况。例如你有一堆小文件存在本地磁盘上,即你的语料库。可能你想直接把它们上传到HDFS,或者在每台机器的磁盘上仍一份,甚至直接放在当前机器的本地磁盘好了(这种情况下不是真正的分布式,所有的Spark executor只会在你当前机器上启动),之后打开Spark调用其中的LDA算法。如果你只是打算做个实验,这样就足够了。换言之,这是一种offline的训练方式。第二种情况是工业应用,你可能不会有一堆离线的语料库,而是有一个流式管道,语料文本源源不断地传递过来,如twitter/weibo feed等。或许你可以把这些数据放到HDFS或HBase上,也有可能直接处理流数据,而不管最终存储。这是完全不同的应用场景,针对不同的场景要有不同的处理方式。不论是实验室环境下的尝试,还是工业应用,两者都很重要。本文只涉及offline的数据处理方法,因为offline的数据处理才更加需求让数据经过HDFS。离线LDA训练离线场景下或许我们不必理会语料集预处理的过程,直接交给最终用户好了。用户将语料集转换成你指定的样式,之后将预处理结果上载到HDFS,这样你的LDA程序可以直接访问这部分数据,而我们要做的只是规定好输入的样式,妙不可言。我们舒服了,用户吃些苦头。例如我们指定用户输入文件的每一行是一个完整的文件内容,开头处以Tab分割作为文件名。这样我们可以直接调用textFile()接口,自己切分一下就可以得到”文件名--文件内容”这个KV对。值得一提的是,这种离线场景下看似不好用的方法,在工业界线上训练过程中反倒可能有好的效果。比如一次记录过来就是一个KV对,这样就省去了这一步输入的处理。或许我们可以进一步帮帮用户。咱们写一个预处理程序,不论是串行的还是并行的,帮助用户进行预处理,Mahout就是这么做的。这种情况下,可能需要最终用户写一个ad-hoc的shell脚本组织所有的工作流和数据流。Mahout中的dirTosequentialFile就是把本地磁盘或者HDFS上的目录读入,将其中的小文件合并在一起转换成一个sequential file。但是,笔者觉得最好的方法还是将预处理过程与LDA训练过程融合起来,不要让用户做这么多工作就能调用Spark上的LDA,用户只需要指定文件路径即可。这时我们必须提供函数将语料库所有的文本和文件名读入。CombineFileInputFormat比较适合处理小文件,因此最初笔者实现了一个CombineFileInputFormat,一个CombineFileRecordReader,一个FileLineWritable以及一个类似于textFile()的接口。Interface exposed to end-user - SparkContext.scala

def wholeTextFiles(path: String): RDD[(String, String)] =

结语分布式数据并行环境下,保持数据的本地性是非常重要的内容,事关分布式系统性能高下。要想更好的了解Spark是怎么运作的,输入也许是很重要的一个环节。举一个小例子,你或许有心情在一台不错的机器上使用Spark处理100GB的数据。按理说这不应算作多大的应用场景,但如果不仔细调整一下你的输入的话,你会发现Spark甚至会在这台机器上切分上千个partition来并行处理这份数据。而这上千个partition随便来一个shuffle造成的百万量级的shuffle数据交换会把Spark性能拖死。实际上,调用Hadoop的API访问本地磁盘的默认块大小为32MB,据其分块策略,当然会产生上千个partition。另外,如果你本地是一堆小文件,如LDA的语料库,你会发现Spark甚至会为每个文件分配一个或多个partition!所以,这下你应该知道为什么有时简单的Spark程序也会非常慢了吧。本文为了解决LDA小文件输入的问题,一步步揭开HDFS与Spark的数据通道的故事。总结来看,为了分布式使用各个机器,HDFS读取的时候将数据分成了各个分块,为了防止straggler的产生,MapReduce的读取模块会尽量保证各个分块在每台机器上的大小和个数均衡。为了保证较好的locality,Spark获取preferredLocation信息,尽量保证在临近的机器上读取所需的数据。为了合理读取小文件,CombineFileInputFormat合理安排小文件分片,既要保证数据在各个分块中均衡,又不能切断单个文件。为了保证HDFS与Spark之间的高效数据通道,正可谓”无所不用其极”。作者简介尹绪森,Intel实习生,熟悉并热爱机器学习相关内容,对自然语言处理、推荐系统等有所涉猎。目前致力于机器学习算法并行、凸优化层面的算法优化问题,以及大数据平台性能调优。对Spark、Mahout、GraphLab等开源项目有所尝试和理解,并希望从优化层向下,系统层向上对并行算法及平台做出贡献。

banner
看过还想看
可能还想看
热点推荐

永洪科技
致力于打造全球领先的数据技术厂商

申请试用
Copyright © 2012-2024开发者:北京永洪商智科技有限公司版本:V10.2
京ICP备12050607号-1京公网安备110110802011451号 隐私政策应用权限