banner

首席工程师揭秘:LinkedIn大数据后台是如何运作的?(二)

作者: 大数据观察来源: 大数据观察时间:2017-03-11 11:33:430

本文作者:Jay Kreps,linkedin公司首席工程师;文章来自于他在linkedin上的分享;原文标题:The Log: What every software engineer should know about real-time data’s unifying abstraction。

文章内容非常干货,非常值得学习。文章将以四部分进行阐述,建议大家耐心看完。

第一部分:Log是什么?

第二部分:数据集成

第三部分:日志和实时流处理

第四部分:系统建设

接上篇:http://www.shuju.net/archives/6387

第二部分:数据集成

请让我首先解释 一下“数据集成”是什么意思,还有为什么我觉得它很重要,之后我们再来看看它和日志有什么关系。

数据集成就是将数据组织起来,使得在与其有关的服务和系统中可以访问它们。“数据集成”(data integration)这个短语应该不止这么简单,但是我找不到一个更好的解释。而更常见的术语 ETL 通常只是覆盖了数据集成的一个有限子集(译注:ETL,Extraction-Transformation-Loading的缩写,即数据提取、转换和加载)——相对于关系型数据仓库。但我描述的东西很大程度上可以理解为,将ETL推广至实时系统和处理流程。

你一定不会听到数据集成就兴趣盎然屏住呼吸,并且天花乱坠的想到关于大数据的概念,不过,我相信世俗的问题“让数据可被访问” 是一个组织应该关注的有价值的事情。

对数据的高效使用遵循一种 马斯洛的需要层次理论 。金字塔的基础部分包括捕获所有相关数据,能够将它们全部放到适当的处理环境(那个环境应该是一个奇妙的实时查询系统,或者仅仅是文本文件和python脚本)。这些数据需要以统一的方式建模,这样就可以方便读取和数据处理。如果这种以统一的方式捕获数据的基本需求得到满足,那么就可以在基础设施上以若干种方法处理这些数据——映射化简(MapReduce),实时查询系统,等等。

很明显,有一点值得注意:如果没有可靠的、完整的数据流,Hadoop集群除了象昂贵的且难于安装的空间取暖器哪样外不会做更多事情了。一旦数据和处理可用,人们就会关心良好数据模型和一致地易于理解的语法哪些更细致的问题。最后,人们才会关注更加高级的处理-更好的可视化、报表以及处理和预测算法。

以我的经验,大多数机构在数据金字塔的底部存在巨大的漏洞-它们缺乏可靠的、完整的数据流-而是打算直接跳到高级数据模型技术上。这样做完全是反着来做的。 因此,问题是我们如何构建通过机构内所有数据系统的可靠的数据流。

数据集成:两个并发症

两种趋势使数据集成变得更困难。

事件数据管道

第一个趋势是增长的事件数据(event data)。事件数据记录的是发生的事情,而不是存在的东西。在web系统中,这就意味着用户活动日志,还有为了可靠的操作以及监控数据中心的机器的目的,所需要记录的机器级别的事件和统计数字。人们倾向称它们为“日志数据”,因为它们经常被写到应用的日志中,但是这混淆了形式与功能。这种数据位于现代web的中心:归根结底,Google的资产是由这样一些建立在点击和映像基础之上的相关管道所生成的——那也就是事件。

这些东西并不是仅限于网络公司,只是网络公司已经完全数字化,所以它们更容易用设备记录。财务数据一直是面向事件的。RFID(无线射频识别)将这种跟踪能力赋予物理对象。我认为这种趋势仍将继续,伴随着这个过程的是传统商务活动的数字化。

这种类型的事件数据记录下发生的事情,而且往往比传统数据库应用要大好几个数量级。这对于处理提出了重大挑战。

专门的数据系统的爆发

第二个趋势来自于专门的数据系统的爆发,通常这些数据系统在最近的五年中开始变得流行,并且可以免费获得。专门的数据系统是为OLAP, 搜索, 简单 在线 存储, 批处理, 图像分析, 等 等 而存在的。

更多的不同类型数据的组合,以及将这些数据存放到更多的系统中的愿望,导致了一个巨大的数据集成问题。

日志结构数据流

为了处理系统之间的数据流,日志是最自然的数据结构。其中的秘诀很简单:

将所有组织的数据提取出来,并将它们放到一个中心日志,以便实时查阅。

每个逻辑数据源都可以建模为它自己的日志。一个数据源可以是一个应用程序的事件日志(如点击量或者页面浏览量),或者是一个接受修改的数据库表。每个订阅消息的系统都尽可能快的从日志读取信息,将每条新的记录保存到自己的存储,并且提升其在日志中的地位。订阅方可以是任意一种数据系统 —— 一个缓存,Hadoop,另一个网站中的另一个数据库,一个搜索系统,等等。

例如,日志针对每个更改给出了逻辑时钟的概念,这样所有的订阅方都可以被测量。推导不同的订阅系统的状态也因此变得相对简单的多,因为每个系统都有一个读取动作的“时间点”。

为了让这个显得更具体,我们考虑一个简单的案例,有一个数据库和一组缓存服务器集群。日志提供了一种同步更新所有这些系统,并推导出每一个系统的接触时间点的方法。我们假设写了一条日志X,然后需要从缓存做一次读取。如果我们想保证看到的不是陈旧的数据,我们只需保证没有从任何尚未复制X的缓存中读取即可。

日志也起到缓存的作用,使数据生产与数据消费相同步。由于许多原因这个功能很重要,特别是在多个订阅方消费数据的速度各不相同的时候。这意味着一个订阅数据系统可以宕机,或者下线维护,之后重新上线以后再赶上来:订阅方按照自己控制的节拍来消费数据。批处理系统,如Hadoop或者是一个数据仓库,或许只是每小时或者每天消费一次数据,而实时查询系统可能需要及时到秒。由于无论是原始数据源还是日志,都没有各种目标数据系统的相关知识,因此消费方系统可以被添加和删除,而无需传输管道的变化。

“每个工作数据管道设计得就像是一个日志;每个损坏的数据管道以其自己的方式损坏。”—Count Leo Tolstoy (由作者翻译)

特别重要的是:目标系统只知道日志,不知道数据源系统的任何细节。消费方系统自身无需考虑数据到底是来自于一个RDBMS(关系型数据库管理系统Relational Database Management System),一种新型的键值存储,或者它不是由任何形式的实时查询系统所生成的。这似乎是一个小问题,但实际上是至关重要的。

这里我使用术语“日志”取代了“消息系统”或者“发布-订阅”,因为它在语义上更明确,并且对支持数据复制的实际实现这样的需求,有着更接近的描述。我发现“发布订阅”并不比间接寻址的消息具有更多的含义——如果你比较任何两个发布-订阅的消息传递系统的话,你会发现他们承诺的是完全不同的东西,而且大多数模型在这一领域都不是有用的。你可以认为日志是一种消息系统,它具有持久性保证和强大的订阅语义。在分布式系统中,这个通信模型有时有个(有些可怕的)名字叫做原子广播。

值得强调的是,日志仍然只是基础设施。这并不是管理数据流这个故事的结束:故事的其余部分围绕着元数据,模式,兼容性,以及处理数据结构的所有细节及其演化。除非有一种可靠的,一般的方法来处理数据流运作,语义在其中总是次要的细节。

在 LinkedIn(SNS社交网站)

在LinkedIn从集中式关系数据库向分布式系统集合转化的过程中,我看到这个数据集成问题迅速演变。

现在主要的数据系统包括:

搜索 社交图谱 Voldemort (键值存储)(译注:一种分布式数据库) Espresso (文档存储) 推举引擎 OLAP查询引擎(译注:OLAP联机分析技术) Hadoop Terradata Ingraphs (监控图表和指标服务)

这些都是专门的分布式系统,在其专业领域提供先进的功能。

这种使用日志作为数据流的思想,甚至在我到这里之前就已经与LinkedIn相伴了。我们开发的一个最早的基础设施之一,是一种称为databus 的服务,它在我们早期的Oracle表上提供了一种日志缓存抽象,可伸缩订阅数据库修改,这样我们就可以很好支持我们的社交网络和搜索索引。

我会给出一些历史并交代一下上下文。我首次参与到这些大约是在2008年左右,在我们转移键值存储之后。我的下一个项目是让一个工作中的Hadoop配置演进,并给其增加一些我们的推荐流程。由于缺乏这方面的经验,我们自然而然的安排了数周计划在数据的导入导出方面,剩下的时间则用来实现奇妙的预测算法。这样我们就开始了长途跋涉。

我们本来计划是仅仅将数据从现存的Oracle数据仓库中剖离。但是我们首先发现将数据从Oracle中迅速取出是一种黑暗艺术。更糟的是,数据仓库的处理过程与我们为Hadoop而计划的批处理生产过程不适合——其大部分处理都是不可逆转的,并且与即将生成的报告具体相关。最终我们采取的办法是,避免使用数据仓库,直接访问源数据库和日志文件。最后,我们为了加载数据到键值存储 并生成结果,实现了另外一种管道。 这种普通的数据复制最终成为原始开发项目的主要内容之一。糟糕的是,在任何时间任意管道都有一个问题,Hadoop系统很大程度上是无用的——在错误的数据基础上运行奇特的算法,只会产生更多的错误数据。

虽然我们已经以一种通用的方式创建事物,但是每个数据源都需要自定义配置安装。这也被证明是巨量错误与失败的根源。我们在Hadoop上实现的网站功能已经开始流行起来,同时我们发现我们有一长串感兴趣的工程师。每个用户都有他们想要集成的一系列系统,他们想要的一系列新数据源。

古希腊时代的 ETL(提取转换加载Extract Transform and Load)。并没有太多变化。

有些东西在我面前开始渐渐清晰起来。

首先,我们已建成的通道虽然有一些杂乱,但实质上它们是很有价值的。在采用诸如Hadoop的新的处理系统生成可用数据的过程,它开启了大量的可能性。 基于这些数据过去很难实现的计算,如今变为可能。 许多新的产品和分析技术都来源于把分片的数据放在一起,这些数据过被锁定在特定的系统中。

第二, 众所周知,可靠的数据加载需要数据通道的深度支持。如果我们可以捕获所有我们需要的结构,我就就可以使得Hadoop数据全自动的加载,这样就不需要额外的操作来增加新的数据源或者处理模式变更–数据就会自动的出现在HDFS,Hive表就会自动的生成对应于新数据源的恰当的列。

第三,我们的数据覆盖率仍然非常低。如果你查看存储于Hadoop中的可用的Linked 数据的全部百分比,它仍然是不完整的。花费大量的努力去使得各个新的数据源运转起来,使得数据覆盖度完整不是一件容易的事情。

我们正在推行的,为每个数据源和目标增建客户化数据加载,这种方式很显然是不可行的。我们有大量的数据系统和数据仓库。把这些系统和仓库联系起来,就会导致任意一对系统会产生如下所示的客户化通道。

需要注意的是:数据是双向流动的:例如许多系统诸如数据库和Hadoop既是数据转化的来源又是数据转化的目的地。这就意味着我们我们不必为每个系统建立两个通道:一个用于数据输入,一个用于数据输出。

这显然需要一大群人,而且也不具有可操作性。随着我们接近完全连接,最终我们将有差不多O(N2)条管道。

替代的,我们需要像这样通用的东西:

我们需要尽可能的将每个消费者与数据源隔离。理想情形下,它们应该只与一个单独的数据仓库集成,并由此让他们能访问到所有东西。

这个思想是增加一个新的数据系统——或者它是一个数据源或者它是一个数据目的地——让集成工作只需连接到一个单独的管道,而无需连接到每个数据消费方。

这种经历使得我关注创建Kafka来关联我们在消息系统所见的与数据库和分布式系统内核所发布的日志。我们希望一些实体作为中心的通道,首先用于所有的活动数据,逐步的扩展到其他用途,例如Hadoop外的数据实施,数据监控等。

在相当长的时间内,Kafka是独一无二的底层产品,它既不是数据库,也不是日志文件收集系统,更不是传统的消息系统。但是最近Amazon提供了非常类似Kafka的服务,称之为Kinesis.相似度包括了分片处理的方式,数据的保持,甚至包括在Kafka API中,有点特别的高端和低端消费者分类。我很开心看到这些,这表明了你已经创建了很好的底层协议,AWS已经把它作为服务提供。他们对此的期待与我所描述的吻合:通道联通了所有的分布式系统,诸如DynamoDB, RedShift, S3等,它同时作为使用EC2进行分布式流处理的基础。

与ETL和数据仓库的关系

我们再来聊聊数据仓库。数据仓库是清洗和归一数据结构用于支撑数据分析的仓库。这是一个伟大的理念。对不熟悉数据仓库概念的人来说,数据仓库方法论包括了:周期性的从数据源抽取数据,把它们转化为可理解的形式,然后把它导入中心数据仓库。对于数据集中分析和处理,拥有高度集中的位置存放全部数据的原始副本是非常宝贵的资产。在高层级上,也许你抽取和加载数据的顺序略微调整,这个方法论不会有太多变化,无论你使用传统的数据仓库Oracle还是Teradata或者Hadoop。

数据仓库是极其重要的资产,它包含了原始的和规整的数据,但是实现此目标的机制有点过时了。

对以数据为中心的组织关键问题是把原始的归一数据联结到数据仓库。数据仓库是批处理的基础查询:它们适用于各类报表和临时性分析,特别是当查询包含了简单的计数、聚合和过滤。但是如果一个批处理系统仅仅包含了原始的完整的数据的数据仓库,这就意味着这些数据对于实时数据处理、搜索索引和系统监控等实时的查询是不可用的。

依我之见,ETL包括两件事:首先,它是抽取和数据清洗过程–特别是释放被锁在组织的各类系统中的数据,移除系统专有的无用物。第二,依照数据仓库的查询重构数据。例如使其符合关系数据库类型系统,强制使用星号、雪花型模式,或者分解为高性能的柱状格式等。合并这两者是有困难的。这些规整的数据集应当可以在实时或低时延处理中可用,也可以在其它实施存储系统索引。

在我看来,正是因为这个原因有了额外好处:使得数据仓库ETL更具了组织级的规模。数据仓库的精典问题是数据仓库负责收集和清洗组织中各个组所生成的全部数据。各组织的动机是不同的,数据的生产者并不知晓在数据仓库中数据的使用情况,最终产生的数据很难抽取,或者需要花费规模化的转化才可以转化为可用的形式。当然, 中心团队不可能恰到好处的掌握规模,使得这规模刚好与组织中其它团队相匹配,因此数据的覆盖率常常差别很大,数据流是脆弱的同时变更是缓慢的。

较好的方法是有一个中心通道,日志和用于增加数据的定义良好的API。与通道集成的且提供良好的结构化的数据文件的职责依赖于数据的生产者所生成的数据文件。这意味着在设计和实施其它系统时应当考虑数据的输出以及输出的数据如何转化为结构良好的形式并传递给中心通道。增加新的存储系统倒是不必因为数据仓库团队有一个中心结点需要集成而关注数据仓库团队。数据仓库团队仅需处理简单的问题,例如从中心日志中加载结构化的数据,向其它周边系统实施个性化的数据转化等。

如图所示:当考虑在传统的数据仓库之外增加额外的数据系统时,组织结构的可扩展性显得尤为重要。例如,可以考虑为组织的完整的数据集提供搜索功能。或者提供二级的数据流监控实时数据趋势和告警。无论是这两者中的哪一个,传统的数据仓库架构甚至于Hadoop聚簇都不再适用。更糟的是,ETL的流程通道的目的就是支持数据加载,然而ETL似乎无法输出到其它的各个系统,也无法通过引导程序,使得这些外围的系统的各个架构成为适用于数据仓库的重要资产。这就不难解释为什么组织很难轻松的使用它的全部数据。反之,如果组织已建立起了一套标准的、结构良好的数据,那么任何新的系统要使用这些数据仅仅需要与通道进行简单的集成就可以实现。

这种架构引出了数据清理和转化在哪个阶段进行的不同观点:

由数据的生产者在把数据增加到公司全局日志之前。 在日志的实时转化阶段进行,这将会产生一个新的转化日志。 在向目标系统加载数据时,做为加载过程的一部分进行。

理想的模形是:由数据的生产者在把数据发布到日志之前对数据进行清理。这样可以确保数据的权威性,不需要维护其它的遗留物例如为数据产生的特殊处理代码或者维护这些数据的其它的存储系统。这些细节应当由产生数据的团队来处理,因为他们最了解他们自己的数据。这个阶段所使用的任何逻辑都应该是无损的和可逆的。

任何可以实时完成的增值转化类型都应当基于原始日志进行后期处理。这一过程包括了事件数据的会话流程,或者增加大众感兴趣的衍生字段。原始的日志仍然是可用的,但是这种实时处理产生的衍生日志包含了参数数据。

最终,只有针对目标系统的聚合需要做了加载流程的一部分。它包括了把数据转化成特定的星型或者雪花状模式,从而用于数据仓库的分析和报表。因为在这个阶段,大部分自然的映射到传统的ETL流程中,而现在它是在一个更加干净和规整的数据流集在进行的,它将会更加的简单。

日志文件和事件

我们再来聊聊这种架构的优势:它支持解耦和事件驱动的系统。

在网络行业取得活动数据的典型方法是把它记为文本形式的日志,这些文本文件是可分解进入数据仓库或者Hadoop,用于聚合和查询处理的。由此产生的问题与所有批处理的ETL的问题是相同的:它耦合了数据流进入数据仓库系统的能力和流程的调度。

在LinkedIn中,我们已经以中心日志的方式构建了事件数据处理。我们正在使用Kafka做为中心的、多订阅者事件日志。我们已经定义了数百种事件类型,每种类型都会捕获用于特定类型动作的独特的属性。这将会覆盖包括页面视图、表达式、搜索以及服务调用、应用异常等方方面面。

为了进一步理解这一优势:设想一个简单的事务–在日志页面显示已发布的日志。这个日志页面应当只包括显示日志所需要的逻辑。然而,在相当多的动态站点中,日志页面常常变的添加了很多与显示日志无关的逻辑。例如,我们将对如下的系统进行集成:

需要把数据传送到Hadoop和数据仓库中用于离线数据处理。 需要对视图进行统计,确保视图订阅者不会攻击一些内容片段。 需要聚合这些视图,视图将用于作业发布者的分析页面显示。 需要记录视图以确保我们为作业推荐的使用者提供了恰当的印象覆盖,我们不想一次次的重复同样的事情。 推荐系统需要记录日志用于正确的跟踪作业的普及度。 等等。

不久,简单的作业显示变得相当的复杂。我们增加了作业显示的其它终端–移动终端应用等–这些逻辑必须继续存在,复杂度不断的增加。更糟的是我们需要与之做接口交互的系统现在是错综复杂的–在为显示日作业而工作的工程师们需要知晓多个其它系统和它们的特征,才可以确保它们被正确的集成了。这仅仅是问题的简单版本,真实的的应用系统只会更加的复杂。

“事件驱动”的模式提供了一种简化这类问题的机制。作业显示页面现在只显示作业并记录与正在显示的作业,作业订阅者相关的其它属性,和其它与作业显示相关的其它有价值的属性。每个与此相关的其它系统诸如推荐系统、安全系统、作业推送分析系统和数据仓库,所有这些只是订阅种子文件,并进行它们的操作。显示代码并不需要关注其它的系统,也不需要因为增加了数据的消费者而相应的进行变更。

构建可伸缩的日志

当然,把发布者与订阅者分离不再是什么新鲜事了。但是如果你想要确保提交日志的行为就像多个订阅者实时的分类日志那样记录网站发生的每件事时,可扩展性就会成为你所面临的首要挑战。如果我们不能创建快速、高性价比和可扩展性灵活的日志以满足实际的可扩展需求,把日志做为统一的集成机制不再是美好的想像,

人们普遍认为分布式日志是缓慢的、重量经的概念(并且通常会把它仅仅与“原数据”类型的使用联系起来,对于这类使用Zookeeper可以适用)。但是深入实现并重点关注分类记录大规模的数据流,这种需求是不切实际的。在LinkedIn, 我们现在每天通过Kafka运行着超过600亿个不同的消息写入点(如果统计镜相与数据中心之间的写入,那么这个数字会是数千亿。)

我们在Kafk中使用了一些小技巧来支持这种可扩展性:

日志分片 通过批处理读出和写入优化吞吐力 规避无用的数据复制。

为了确保水平可扩展性,我们把日志进行切片:

每个切片都是一篇有序的日志,但是各片之间没有全局的次序(这个有别于你可能包含在消息中的挂钟时间)。把消息分配到特定的日志片段这是由写入者控制的,大部分使用者会通过用户ID等键值来进行分片。分片可以把日志追加到不存在协作的片段之间,也可以使系统的吞吐量与Kafka聚簇大小成线性比例关系。

每个分片都是通过可配置数量的复制品复制的,每个复制品都有分片的一份完全一致的拷贝。无论何时,它们中的任一个都可以做为主分片,如果主分片出错了,任何一个复制品都可以接管并做为主分片。

缺少跨分片的全局顺序是这个机制的局限性,但是我们不认为它是最主要的。事实上,与日志的交互主要来源于成百上千个不同的流程,以致于对于它们的行为排一个总体的顺序是没什么意义的。相反,我们可以确保的是我们提供的每个分片都是按顺序保留的。Kafka保证了追加到由单一发送者送出的特定分片会按照发送的顺序依次处理。

日志,就像文件系统一样,是容易优化成线性可读可写的样式的。日志可以把小的读入和写出组合成大的、高吞吐量的操作。Kafka一直至立于实现这一优化目标。批处理可以发生在由客户端向服务器端发送数据、写入磁盘;在服务器各端之间复制;数据传递给消费者和确认提交数据等诸多环节。

最终,Kafka使用简单的二进制形式维护内存日志,磁盘日志和网络数据传送。这使得我们可以使用包括“0数据复制传送”在内的大量的优化机制。

这些优化的积累效应是你常常进行的写出和读入数据的操作可以在磁盘和网络上得到支持,甚至于维护内存以外的大量数据集。

这些详细记述并不意味着这是关于Kafka的主要内容,那么我就不需要了解细节了。你可阅读到更多的关于LinkedIn的方法在这个链接,和Kafka的设计总述在这个链接。

Part Two: Data Integration

Let me first say what I mean by “data integration” and why I think it’s important, then we’ll see how it relates back to logs.

Data integration is making all the data an organization has available in all its services and systems.

This phrase “data integration” isn’t all that common, but I don’t know a better one. The more recognizable term ETL usually covers only a limited part of data integration—populating a relational data warehouse. But much of what I am describing can be thought of as ETL generalized to cover real-time systems and processing flows.

You don’t hear much about data integration in all the breathless interest and hype around the idea of big data, but nonetheless, I believe this mundane problem of “making the data available” is one of the more valuable things an organization can focus on.

Effective use of data follows a kind of Maslow’s hierarchy of needs. The base of the pyramid involves capturing all the relevant data, being able to put it together in an applicable processing environment (be that a fancy real-time query system or just text files and python scripts). This data needs to be modeled in a uniform way to make it easy to read and process. Once these basic needs of capturing data in a uniform way are taken care of it is reasonable to work on infrastructure to process this data in various ways—MapReduce, real-time query systems, etc.

It’s worth noting the obvious: without a reliable and complete data flow, a Hadoop cluster is little more than a very expensive and difficult to assemble space heater. Once data and processing are available, one can move concern on to more refined problems of good data models and consistent well understood semantics. Finally, concentration can shift to more sophisticated processing—better visualization, reporting, and algorithmic processing and prediction.

In my experience, most organizations have huge holes in the base of this pyramid—they lack reliable complete data flow—but want to jump directly to advanced data modeling techniques. This is completely backwards.

So the question is, how can we build reliable data flow throughout all the data systems in an organization?

Data Integration: Two complications Two trends make data integration harder.

The event data firehose

The first trend is the rise of event data. Event data records things that happen rather than things that are. In web systems, this means user activity logging, but also the machine-level events and statistics required to reliably operate and monitor a data center’s worth of machines. People tend to call this “log data” since it is often written to application logs, but that confuses form with function. This data is at the heart of the modern web: Google’s fortune, after all, is generated by a relevance pipeline built on clicks and impressions—that is, events.

And this stuff isn’t limited to web companies, it’s just that web companies are already fully digital, so they are easier to instrument. Financial data has long been event-centric. RFID adds this kind of tracking to physical objects. I think this trend will continue with the digitization of traditional businesses and activities.

This type of event data records what happened, and tends to be several orders of magnitude larger than traditional database uses. This presents significant challenges for processing.

The explosion of specialized data systems

The second trend comes from the explosion of specialized data systems that have become popular and often freely available in the last five years. Specialized systems exist for OLAP, search, simple online storage, batch processing, graph analysis, and so on.

The combination of more data of more varieties and a desire to get this data into more systems leads to a huge data integration problem.

Log-structured data flow The log is the natural data structure for handling data flow between systems. The recipe is very simple:

Take all the organization’s data and put it into a central log for real-time subscription.

Each logical data source can be modeled as its own log. A data source could be an application that logs out events (say clicks or page views), or a database table that accepts modifications. Each subscribing system reads from this log as quickly as it can, applies each new record to its own store, and advances its position in the log. Subscribers could be any kind of data system—a cache, Hadoop, another database in another site, a search system, etc.

For example, the log concept gives a logical clock for each change against which all subscribers can be measured. This makes reasoning about the state of the different subscriber systems with respect to one another far simpler, as each has a “point in time” they have read up to.

To make this more concrete, consider a simple case where there is a database and a collection of caching servers. The log provides a way to synchronize the updates to all these systems and reason about the point of time of each of these systems. Let’s say we write a record with log entry X and then need to do a read from the cache. If we want to guarantee we don’t see stale data, we just need to ensure we don’t read from any cache which has not replicated up to X.

The log also acts as a buffer that makes data production asynchronous from data consumption. This is important for a lot of reasons, but particularly when there are multiple subscribers that may consume at different rates. This means a subscribing system can crash or go down for maintenance and catch up when it comes back: the subscriber consumes at a pace it controls. A batch system such as Hadoop or a data warehouse may consume only hourly or daily, whereas a real-time query system may need to be up-to-the-second. Neither the originating data source nor the log has knowledge of the various data destination systems, so consumer systems can be added and removed with no change in the pipeline.

“Each working data pipeline is designed like a log; each broken data pipeline is broken in its own way.”—Count Leo Tolstoy (translation by the author)

Of particular importance: the destination system only knows about the log and not any details of the system of origin. The consumer system need not concern itself with whether the data came from an RDBMS, a new-fangled key-value store, or was generated without a real-time query system of any kind. This seems like a minor point, but is in fact critical.

I use the term “log” here instead of “messaging system” or “pub sub” because it is a lot more specific about semantics and a much closer description of what you need in a practical implementation to support data replication. I have found that “publish subscribe” doesn’t imply much more than indirect addressing of messages—if you compare any two messaging systems promising publish-subscribe, you find that they guarantee very different things, and most models are not useful in this domain. You can think of the log as acting as a kind of messaging system with durability guarantees and strong ordering semantics. In distributed systems, this model of communication sometimes goes by the (somewhat terrible) name of atomic broadcast.

It’s worth emphasizing that the log is still just the infrastructure. That isn’t the end of the story of mastering data flow: the rest of the story is around metadata, schemas, compatibility, and all the details of handling data structure and evolution. But until there is a reliable, general way of handling the mechanics of data flow, the semantic details are secondary.

At LinkedIn I got to watch this data integration problem emerge in fast-forward as LinkedIn moved from a centralized relational database to a collection of distributed systems.

These days our major data systems include:

Search

Social Graph

Voldemort (key-value store)

Espresso (document store)

Recommendation engine

OLAP query engine

Hadoop

Terradata

Ingraphs (monitoring graphs and metrics services)

Each of these is a specialized distributed system that provides advanced functionality in its area of specialty.

This idea of using logs for data flow has been floating around LinkedIn since even before I got here. One of the earliest pieces of infrastructure we developed was a service called databus that provided a log caching abstraction on top of our early Oracle tables to scale subscription to database changes so we could feed our social graph and search indexes.

I’ll give a little bit of the history to provide context. My own involvement in this started around 2008 after we had shipped our key-value store. My next project was to try to get a working Hadoop setup going, and move some of our recommendation processes there. Having little experience in this area, we naturally budgeted a few weeks for getting data in and out, and the rest of our time for implementing fancy prediction algorithms. So began a long slog.

We originally planned to just scrape the data out of our existing Oracle data warehouse. The first discovery was that getting data out of Oracle quickly is something of a dark art. Worse, the data warehouse processing was not appropriate for the production batch processing we planned for Hadoop—much of the processing was non-reversable and specific to the reporting being done. We ended up avoiding the data warehouse and going directly to source databases and log files. Finally, we implemented another pipeline to load data into our key-value store for serving results.

This mundane data copying ended up being one of the dominate items for the original development. Worse, any time there was a problem in any of the pipelines, the Hadoop system was largely useless—running fancy algorithms on bad data just produces more bad data.

Although we had built things in a fairly generic way, each new data source required custom configuration to set up. It also proved to be the source of a huge number of errors and failures. The site features we had implemented on Hadoop became popular and we found ourselves with a long list of interested engineers. Each user had a list of systems they wanted integration with and a long list of new data feeds they wanted.

ETL in Ancient Greece. Not much has changed.

A few things slowly became clear to me.

First, the pipelines we had built, though a bit of a mess, were actually extremely valuable. Just the process of making data available in a new processing system (Hadoop) unlocked a lot of possibilities. New computation was possible on the data that would have been hard to do before. Many new products and analysis just came from putting together multiple pieces of data that had previously been locked up in specialized systems.

Second, it was clear that reliable data loads would require deep support from the data pipeline. If we captured all the structure we needed, we could make Hadoop data loads fully automatic, so that no manual effort was expanded adding new data sources or handling schema changes—data would just magically appear in HDFS and Hive tables would automatically be generated for new data sources with the appropriate columns.

Third, we still had very low data coverage. That is, if you looked at the overall percentage of the data LinkedIn had that was available in Hadoop, it was still very incomplete. And getting to completion was not going to be easy given the amount of effort required to operationalize each new data source.

The way we had been proceeding, building out custom data loads for each data source and destination, was clearly infeasible. We had dozens of data systems and data repositories. Connecting all of these would have lead to building custom piping between each pair of systems something like this:

As much as possible, we needed to isolate each consumer from the source of the data. They should ideally integrate with just a single data repository that would give them access to everything.

The idea is that adding a new data system—be it a data source or a data destination—should create integration work only to connect it to a single pipeline instead of each consumer of data.

This experience lead me to focus on building Kafka to combine what we had seen in messaging systems with the log concept popular in databases and distributed system internals. We wanted something to act as a central pipeline first for all activity data, and eventually for many other uses, including data deployment out of Hadoop, monitoring data, etc.

For a long time, Kafka was a little unique (some would say odd) as an infrastructure product—neither a database nor a log file collection system nor a traditional messaging system. But recently Amazon has offered a service that is very very similar to Kafka called Kinesis. The similarity goes right down to the way partitioning is handled, data is retained, and the fairly odd split in the Kafka API between high- and low-level consumers. I was pretty happy about this. A sign you’ve created a good infrastructure abstraction is that AWS offers it as a service! Their vision for this seems to be exactly similar to what I am describing: it is the piping that connects all their distributed systems—DynamoDB, RedShift, S3, etc.—as well as the basis for distributed stream processing using EC2.

Relationship to ETL and the Data Warehouse Let’s talk data warehousing for a bit. The data warehouse is meant to be a repository of the clean, integrated data structured to support analysis. This is a great idea. For those not in the know, the data warehousing methodology involves periodically extracting data from source databases, munging it into some kind of understandable form, and loading it into a central data warehouse. Having this central location that contains a clean copy of all your data is a hugely valuable asset for data-intensive analysis and processing. At a high level, this methodology doesn’t change too much whether you use a traditional data warehouse like Oracle or Teradata or Hadoop, though you might switch up the order of loading and munging.

A data warehouse containing clean, integrated data is a phenomenal asset, but the mechanics of getting this are a bit out of date.

The key problem for a data-centric organization is coupling the clean integrated data to the data warehouse. A data warehouse is a piece of batch query infrastructure which is well suited to many kinds of reporting and ad hoc analysis, particularly when the queries involve simple counting, aggregation, and filtering. But having a batch system be the only repository of clean complete data means the data is unavailable for systems requiring a real-time feed—real-time processing, search indexing, monitoring systems, etc.

In my view, ETL is really two things. First, it is an extraction and data cleanup process—essentially liberating data locked up in a variety of systems in the organization and removing an system-specific non-sense. Secondly, that data is restructured for data warehousing queries (i.e. made to fit the type system of a relational DB, forced into a star or snowflake schema, perhaps broken up into a high performance column format, etc). Conflating these two things is a problem. The clean, integrated repository of data should be available in real-time as well for low-latency processing as well as indexing in other real-time storage systems.

I think this has the added benefit of making data warehousing ETL much more organizationally scalable. The classic problem of the data warehouse team is that they are responsible for collecting and cleaning all the data generated by every other team in the organization. The incentives are not aligned: data producers are often not very aware of the use of the data in the data warehouse and end up creating data that is hard to extract or requires heavy, hard to scale transformation to get into usable form. Of course, the central team never quite manages to scale to match the pace of the rest of the organization, so data coverage is always spotty, data flow is fragile, and changes are slow.

A better approach is to have a central pipeline, the log, with a well defined API for adding data. The responsibility of integrating with this pipeline and providing a clean, well-structured data feed lies with the producer of this data feed. This means that as part of their system design and implementation they must consider the problem of getting data out and into a well structured form for delivery to the central pipeline. The addition of new storage systems is of no consequence to the data warehouse team as they have a central point of integration. The data warehouse team handles only the simpler problem of loading structured feeds of data from the central log and carrying out transformation specific to their system.

This point about organizational scalability becomes particularly important when one considers adopting additional data systems beyond a traditional data warehouse. Say, for example, that one wishes to provide search capabilities over the complete data set of the organization. Or, say that one wants to provide sub-second monitoring of data streams with real-time trend graphs and alerting. In either of these cases, the infrastructure of the traditional data warehouse or even a Hadoop cluster is going to be inappropriate. Worse, the ETL processing pipeline built to support database loads is likely of no use for feeding these other systems, making bootstrapping these pieces of infrastructure as large an undertaking as adopting a data warehouse. This likely isn’t feasible and probably helps explain why most organizations do not have these capabilities easily available for all their data. By contrast, if the organization had built out feeds of uniform, well-structured data, getting any new system full access to all data requires only a single bit of integration plumbing to attach to the pipeline.

This architecture also raises a set of different options for where a particular cleanup or transformation can reside:

It can be done by the data producer prior to adding the data to the company wide log.

It can be done as a real-time transformation on the log (which in turn produces a new, transformed log)

It can be done as part of the load process into some destination data system

The best model is to have cleanup done prior to publishing the data to the log by the publisher of the data. This means ensuring the data is in a canonical form and doesn’t retain any hold-overs from the particular code that produced it or the storage system in which it may have been maintained. These details are best handled by the team that creates the data since they know the most about their own data. Any logic applied in this stage should be lossless and reversible.

Any kind of value-added transformation that can be done in real-time should be done as post-processing on the raw log feed produced. This would include things like sessionization of event data, or the addition of other derived fields that are of general interest. The original log is still available, but this real-time processing produces a derived log containing augmented data.

Finally, only aggregation that is specific to the destination system should be performed as part of the loading process. This might include transforming data into a particular star or snowflake schema for analysis and reporting in a data warehouse. Because this stage, which most naturally maps to the traditional ETL process, is now done on a far cleaner and more uniform set of streams, it should be much simplified.

Log Files and Events Let’s talk a little bit about a side benefit of this architecture: it enables decoupled, event-driven systems.

The typical approach to activity data in the web industry is to log it out to text files where it can be scrapped into a data warehouse or into Hadoop for aggregation and querying. The problem with this is the same as the problem with all batch ETL: it couples the data flow to the data warehouse’s capabilities and processing schedule.

At LinkedIn, we have built our event data handling in a log-centric fashion. We are using Kafka as the central, multi-subscriber event log. We have defined several hundred event types, each capturing the unique attributes about a particular type of action. This covers everything from page views, ad impressions, and searches, to service invocations and application exceptions.

To understand the advantages of this, imagine a simple event—showing a job posting on the job page. The job page should contain only the logic required to display the job. However, in a fairly dynamic site, this could easily become larded up with additional logic unrelated to showing the job. For example let’s say we need to integrate the following systems:

We need to send this data to Hadoop and data warehouse for offline processing purposes

We need to count the view to ensure that the viewer is not attempting some kind of content scraping

We need to aggregate this view for display in the Job poster’s analytics page

We need to record the view to ensure we properly impression cap any job recommendations for that user (we don’t want to show the same thing over and over)

Our recommendation system may need to record the view to correctly track the popularity of that job

Etc

Pretty soon, the simple act of displaying a job has become quite complex. And as we add other places where jobs are displayed—mobile applications, and so on—this logic must be carried over and the complexity increases. Worse, the systems that we need to interface with are now somewhat intertwined—the person working on displaying jobs needs to know about many other systems and features and make sure they are integrated properly. This is just a toy version of the problem, any real application would be more, not less, complex.

The “event-driven” style provides an approach to simplifying this. The job display page now just shows a job and records the fact that a job was shown along with the relevant attributes of the job, the viewer, and any other useful facts about the display of the job. Each of the other interested systems—the recommendation system, the security system, the job poster analytics system, and the data warehouse—all just subscribe to the feed and do their processing. The display code need not be aware of these other systems, and needn’t be changed if a new data consumer is added.

Building a Scalable Log Of course, separating publishers from subscribers is nothing new. But if you want to keep a commit log that acts as a multi-subscriber real-time journal of everything happening on a consumer-scale website, scalability will be a primary challenge. Using a log as a universal integration mechanism is never going to be more than an elegant fantasy if we can’t build a log that is fast, cheap, and scalable enough to make this practical at scale.

Systems people typically think of a distributed log as a slow, heavy-weight abstraction (and usually associate it only with the kind of “metadata” uses for which Zookeeper might be appropriate). But with a thoughtful implementation focused on journaling large data streams, this need not be true. At LinkedIn we are currently running over 60 billion unique message writes through Kafka per day (several hundred billion if you count the writes from mirroring between datacenters).

We used a few tricks in Kafka to support this kind of scale:

Partitioning the log

Optimizing throughput by batching reads and writes

Avoiding needless data copies

In order to allow horizontal scaling we chop up our log into partitions:

Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some wall-clock time you might include in your messages). The assignment of the messages to a particular partition is controllable by the writer, with most users choosing to partition by some kind of key (e.g. user id). Partitioning allows log appends to occur without co-ordination between shards and allows the throughput of the system to scale linearly with the Kafka cluster size.

Each partition is replicated across a configurable number of replicas, each of which has an identical copy of the partition’s log. At any time, a single one of them will act as the leader; if the leader fails, one of the replicas will take over as leader.

Lack of a global order across partitions is a limitation, but we have not found it to be a major one. Indeed, interaction with the log typically comes from hundreds or thousands of distinct processes so it is not meaningful to talk about a total order over their behavior. Instead, the guarantees that we provide are that each partition is order preserving, and Kafka guarantees that appends to a particular partition from a single sender will be delivered in the order they are sent.

A log, like a filesystem, is easy to optimize for linear read and write patterns. The log can group small reads and writes together into larger, high-throughput operations. Kafka pursues this optimization aggressively. Batching occurs from client to server when sending data, in writes to disk, in replication between servers, in data transfer to consumers, and in acknowledging committed data.

Finally, Kafka uses a simple binary format that is maintained between in-memory log, on-disk log, and in network data transfers. This allows us to make use of numerous optimizations including zero-copy data transfer.

The cumulative effect of these optimizations is that you can usually write and read data at the rate supported by the disk or network, even while maintaining data sets that vastly exceed memory.

This write-up isn’t meant to be primarily about Kafka so I won’t go into further details. You can read a more detailed overview of LinkedIn’s approach here and a thorough overview of Kafka’s design here.

原文作者:Jay Kreps  

banner
看过还想看
可能还想看
热点推荐

永洪科技
致力于打造全球领先的数据技术厂商

申请试用
Copyright © 2012-2024开发者:北京永洪商智科技有限公司版本:V10.2
京ICP备12050607号-1京公网安备110110802011451号 隐私政策应用权限