图解Spark 大数据快速分析实战

978-7-115-58011-5
作者: 王磊王磊
译者:
编辑: 谢晓芳谢晓芳

图书目录:

详情

本书共8章,内容主要包括Spark概述及入门实战,Spark的作业调度和资源分配算法,Spark SQL、DataFrame、Dataset的原理和实战,深入理解Spark数据源,流式计算的原理和实战,亿级数据处理平台Spark性能调优,Spark机器学习库,Spark 3.0的新特性和数据湖等。 本书适合Spark开发人员和Spark运维人员阅读。

图书摘要

版权信息

书名:图解Spark:大数据快速分析实战

ISBN:978-7-115-58011-5

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。


著    王 磊

责任编辑 谢晓芳

人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

读者服务热线:(010)81055410

反盗版热线:(010)81055315


本书共8章,内容主要包括Spark概述及入门实战,Spark的作业调度和资源分配算法,Spark SQL、DataFrame、Dataset的原理和实战,深入理解Spark数据源,流式计算的原理和实战,亿级数据处理平台Spark性能调优,Spark机器学习库,Spark 3.0的新特性和数据湖等。

本书适合Spark开发人员和Spark运维人员阅读。


本书的写作初衷是作者在工作中发现很多的Spark开发人员在日常工作中经常因为不理解Spark内核原理而陷入Spark开发的泥沼。尤其在进行几十亿甚至百亿级别数据的Spark任务开发时,虽然对于许多任务,开发人员很快就能实现功能代码的开发,但是在线上经常遇到任务处理超时、数据倾斜、内存溢出、任务分配不均等问题,而很多开发人员在面对这些问题时,常常会在不断尝试设置调优参数的过程中浪费太多宝贵的时间,最终收效甚微。这些问题的出现归根到底是因为不理解Spark内核的原理造成的,市面上的Spark书籍在介绍Spark内核的原理时大多以源码为基础,大量的源码和专业名词让很多读者望而生畏,好像必须要花很大力气才能理解Spark内核。因此,作者萌生了以图解的方式形象地介绍Spark内核的原理的想法,旨在让读者阅读起来既轻松有趣,又能全面理解Spark内核的原理。

本书在写作过程中尽量使用图文结合的方式展开介绍。本书对于每个知识点都配有图解,可以说,读者只要理解了图中的内容,基本上也就理解了对应的文字介绍部分,因此阅读起来会更加轻松愉悦和快速。同时本书的编写更贴近实战,尤其是Spark各种数据源的对接。数据格式原理的介绍、Spark性能调优、Spark延迟数据处理等内容都是笔者每次解决线上问题后的经验总结,阅读本书对于读者在日常工作中解决问题大有裨益。

本书内容主要包括Spark概述及入门实战,Spark的作业调度和资源分配算法,Spark SQL、DataFrame、Dataset的原理和实战,深入理解Spark数据源,流式计算原理和实战,亿级数据处理平台Spark性能调优,Spark机器学习库,Spark 3.0的新特性和数据湖等。全书内容丰富、翔实、简单易懂,旨在以最简单的方式讲解Spark内核复杂的原理。

本书共8章,主要内容如下。

第1章首先介绍Spark,然后讲述Spark的原理、特点和入门实战。

第2章主要介绍Spark的作业调度、Spark on YARN 资源调度、RDD概念、RDD 分区、RDD依赖关系、Stage、RDD 持久化、RDD检查点、RDD实战等。

第3章讲述Spark SQL、DataFrame、Dataset的原理和实战。

第4章讲述Spark数据源。

第5章讲述Spark流式计算的原理和实战,具体包括Spark Streaming的原理和实战、Spark Structured Streaming的原理和实战。

第6章讲述亿级数据处理平台Spark性能调优,具体包括内存调优、任务调优、数据本地性调优、算子调优、Spark SQL调优、Spark Shuffle调优、Spark Streaming调优、Spark数据倾斜问题处理。

第7章概述Spark机器学习、Spark机器学习常用统计方法、Spark分类模型、协同过滤和Spark聚类模型。

第8章讲述Spark 3.0的新特性和Spark未来的趋势——数据湖。

感谢人民邮电出版社的张涛编辑,他的鼓励和引导对本书的写作与出版有很大的帮助。

写技术书是很耗费精力的,我常常因为一句话或一张图能否准确表达含义而思考再三。出于工作的原因,我只能在晚上和周末写作,写作难度很大,整个写书过程持续一年之久,在收到写作邀请时本人还没有宝宝,现在宝宝王默白已经一岁有余,每次写书累的时候看一下宝贝王默白的笑容,所有的疲惫一下子都烟消云散了。真心祝愿王默白开心快乐地成长,同时也十分感谢妻子张艳娇女士,没有她的鼓励和支持,本书很难顺利出版。最后感谢父母和朋友在工作和生活中给予的关心和帮助。在这里衷心地祝愿大家身体健康,万事如意。


王磊,阿里云MVP(最有价值专家)、易点天下大数据架构师,《Offer来了:Java面试核心知识点精讲(原理篇)》和《Offer来了:Java面试核心知识点精讲(框架篇)》的作者,极客时间每日一课专栏作者;喜欢读书和研究新技术,长期从事物联网和大数据研发工作;有十余年丰富的物联网及大数据研发和技术架构经验,对物联网及大数据的原理和技术实现有深刻理解;长期从事海外项目的研发和交付工作,对异地多活数据中心的建设及高可用、高并发系统的设计有丰富的实战经验。


本书由异步社区出品,社区(https://www.epubit.com/)为您提供后续服务。

作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。

当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,单击“提交勘误”,输入勘误信息,单击“提交”按钮即可,如下图所示。本书的作者和编辑会对您提交的勘误进行审核,确认并接受后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。

我们的联系邮箱是contact@epubit.com.cn。

如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。

如果您有兴趣出版图书、录制教学视频,或者参与图书翻译、技术审校等工作,可以发邮件给我们;有意出版图书的作者也可以到异步社区投稿(直接访问www.epubit.com/contribute即可)。

如果您所在的学校、培训机构或企业想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。

如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接通过邮件发送给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值内容的动力之源。

“异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。

“异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、人工智能、测试、前端、网络技术等。

异步社区

微信服务号


在开始学习Spark之前,我们首先了解一下大数据的发展史。其实,大数据的应用很早就在一些知名的互联网公司中开始了,比如Facebook存储着全球30多亿用户的个人信息和日常每个用户在Facebook上发布的生活状态等内容;Google为全球搜索引擎巨头,其数据中心的规模很早就达到拍字节(PB[1])级别了。除此之外,还有Twitter、AWS、百度、腾讯等知名互联网公司。最初,这些大数据技术基本上是在各个大公司内部独立进行研发和使用的,它们并没有开源,并且每个公司使用的技术方案也有很大的差异。另外,这些技术方案属于公司内部,其他开发者还很难接触到大数据技术。

到了2003年和2004年,Google决定将其内部的部分大数据方案公开,并因此发表了关于分布式文件系统、分布式计算模型和BigTable的三篇著名的论文。随后,Hadoop之父Doug Cutting基于这三篇论文实现了一套开源的大数据解决方案,也就是大家熟知的Hadoop。Hadoop具体包括HDFS(Hadoop分布式文件系统)和MapReduce(分布式计算引擎),2008年1月,Hadoop开始成为Apache顶级孵化项目并迎来了快速发展。

大数据技术真正被大众认识是从2008年后Hadoop的兴起开始的,随后大数据开源技术迎来了发展的快车道。紧接着Twitter 开源了分布式流式计算框架Storm,再到后来便是大家熟知的Spark了。Spark提供了“流批一体”的解决方案并被广泛使用至今。最近兴起的Flink则立足于实时流计算,并且在不断创新,向“流批一体”的解决方案靠近,具体如图1-1所示。

图1-1 大数据的发展演进

在这些众多的技术中,Spark是目前大数据项目中应用最广泛的产品之一。虽然Flink的流式方案正在受到大家的热捧,但是为了在十几分钟甚至几分钟内完成太字节(TB)级别复杂数据的分析和计算,仍然需要使用Spark才行。Spark也是大数据计算和机器学习项目中使用最广泛的产品之一。

下面我们来看看什么是Spark。Spark是美国加州大学伯克利分校的AMP实验室推出的一种开源、通用的分布式大数据计算引擎。Spark从2010年开始正式对外开源;2012年,Spark的0.16版本开始快速推广并得到应用;2014年,Spark发布了1.0.0版本,Spark已经完全成熟,成为大数据开发的必备技术方案;接下来是2016年发布的2.0.0版本,此时Spark和Structured Streaming在生产环境中开始被大量使用;2020年6月,Spark发布了3.0.0版本,并进一步在SQL智能优化和AI方面做出重大改进。随着数据湖和AI的快速发展,Spark正以更灵活的方式拥抱数据湖和AI,Spark + AI将成为未来发展的重要方向。Spark的具体发展历程如图1-2所示。

图1-2 Spark的发展历程

首先,Spark作为目前大数据计算领域必备计算引擎已经成为不争的事实。其次,Spark的批量计算在生产环境中基本上完全替代了传统的MapReduce计算,Spark的流式计算则取代了大部分以Storm为基础的流式计算。最后,随着人工智能的迅速发展,Spark近几年也持续在机器学习和AI方向发力,在机器学习的模型训练中起到至关重要的作用。基于以上事实,无论是数据研发工程师还是机器学习等算法工程师,Spark都是必须掌握的一门技术。

那么,为什么Spark会拥有如此重要的地位呢?这和Spark本身的特点有直接关系。Spark的特点是计算速度快、易于使用,此外,Spark还提供了一站式大数据解决方案,支持多种资源管理器,且Spark生态圈丰富,具体如图1-3所示。

图1-3 Spark的特点

1.计算速度快

Spark将每个任务构造成DAG(Directed Acyclic Graph,有向无环图)来执行,其内部计算过程是基于RDD(Resilient Distributed Dataset,弹性分布式数据集)在内存中对数据进行迭代计算的,因此运行效率很高。

Spark官网上的数据表明,当Spark计算所需的数据在磁盘上时,Spark的数据处理速度是Hadoop MapReduce的10倍以上;当Spark计算所需的数据在内存中时,Spark的数据处理速度是Hadoop MapReduce的100倍以上。

2.易于使用

首先,Spark的算子十分丰富。Spark支持80多个高级的运算操作,开发人员只需要按照Spark封装好的API实现即可,不需要关心Spark的底层架构,使用起来易于上手,十分方便。其次,Spark支持多种编程语言,包括Java、Scala、Python等,这使得具有不同编程语言背景的开发人员都能快速开展Spark应用的开发并相互协作,而不用担心因编程语言不同带来的困扰。最后,由于Spark SQL的支持,Spark开发门槛进一步降低了,开发人员只需要将数据加载到Spark中并映射为对应的表,就可以直接使用SQL语句对数据进行分析和处理,使用起来既简单又方便。综上所述,Spark是一个易于使用的大数据平台。

3.一站式大数据解决方案

Spark提供了多种类型的开发库,包括Spark Core API、即时查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)、图计算(GraphX),使得开发人员可以在同一个应用程序中按需使用各种类库,而不用像传统的大数据方案那样将离线任务放在Hadoop MapReduce上运行,也不需要将实时流式计算任务放在Flink上运行并维护多个计算平台。Spark提供了从实时流式计算、离线计算、SQL计算、图计算到机器学习的一站式解决方案,为多场景应用的开发带来了极大便利。

4.支持多种资源管理器

Spark支持Standalone、Hadoop YARN、Apache Mesos、Kubernetes等多种资源管理器,用户可以根据现有的大数据平台灵活地选择运行模式。

5.Spark生态圈丰富

Spark生态圈以Spark Core为核心,支持从HDFS、Amazon S3、HBase、ElasticSearch、MongoDB、MySQL、Kafka等多种数据源读取数据。同时,Spark支持以Standalone、Hadoop YARN、Apache Mesos、Kubernetes为资源管理器调度任务,从而完成Spark应用程序的计算任务。另外,Spark应用程序还可以基于不同的组件来实现,如Spark Shell、Spark Submit、Spark Streaming、Spark SQL、BlinkDB(权衡查询)、Spark MLlib(机器学习)、GraphX(图计算)和SparkR(数学计算)等组件。Spark生态圈已经从大数据计算和数据挖掘扩展到图计算、机器学习、数学计算等多个领域。

图1-4对Spark的特点做了全面总结。

图1-4 Spark的特点

Spark的诸多优势使得Spark成为目前最流行的计算引擎,那么学好Spark的关键点都有哪些呢?具体如图1-5所示。

图1-5 学好Spark的关键点

(1)只有充分理解Spark分布式计算引擎背后的原理,才能为后续基于不同场景快速实现不同的功能以及进行任务优化打下坚实的基础。

(2)只有充分了解算子背后的原理,才能在不同场景中游刃有余地使用它们。

(3)通常,基于Spark实现某个数据分析功能相对而言比较简单,可能只需要简单的几行SQL代码就能实现。但是,我们在实践中经常会遇到数据倾斜、长尾任务、部分任务超时等情况,此时就需要熟悉数据模型和Spark算子的优化逻辑,并根据数据模型的特点和各个任务上数据的分布对其进行调优,以消除数据倾斜等问题,保障任务稳定运行。

(4)在对Spark的原理和使用有了一定的了解后,我们便可以尝试阅读Spark源码,这对于在实践中遇到问题时快速定位和处理问题会有很大的帮助。尤其在遇到错误时,我们可以通过源码快速了解出错的日志处Spark源码上下文的执行逻辑,从而快速定位问题,避免花费大量精力和反复尝试解决问题。

(5)“实践是检验真理的唯一标准”这句话同样适用于大数据领域。同样的代码在不同规模的数据集上有时候能正常运行并计算出结果,但有时候会出现计算超时或任务失败等情况,这在日常的大数据开发中是很常见的事情。大数据计算首先需要有大量的数据才能更好地验证应用程序的稳定性和健壮性,因此基于真实数据的实战是掌握Spark的关键。

(6)除了基于真实数据进行实战之外,丰富的业务场景也是学好Spark的关键点之一。只有在具备丰富的应用场景后,我们才能更好地理解Spark模块在不同场景中的应用,如Spark流式计算、Spark机器学习、Spark图计算等模块。

在了解了学好Spark的关键点之后,我们再来看一看Spark都有哪些学习难点。

本书由易到难,先深入剖析原理,再进行代码实战。本书会尽量避免介绍大量的原理,以免枯燥乏味;同时,本书也会尽量避免在读者不了解原理的情况下进行太多的源码实战,以免读者仅仅成为Spark API使用者。以上两种情况都不利于Spark的学习。

本书还将介绍 Spark 机器学习方面的内容。考虑到大部分读者可能未接触过机器学习,因此在介绍的时候,我们首先会对机器学习的基本原理进行介绍,以便读者在使用Spark提供的机器学习函数时,对背后的原理能有更清晰的认识。

在内容编排上,本书对基础概念的介绍力求简洁,对于高级特性,则尽可能详细介绍背后的原理。因此,无论是刚开始学习Spark,还是已有一定的Spark开发经验,本书都值得阅读。

阅读完本书后,您将全面掌握Spark内核原理、Spark资源调度、Spark离线计算、Spark流式计算、Spark任务调优、Spark机器学习等知识。本书最后还介绍了大数据的未来趋势及相关技术,比如数据湖和AI技术,为您未来决胜大数据计算打下坚实基础。

Spark为什么能在短时间内突然崛起?Spark相对Hadoop MapReduce有何优势?接下来,我们将介绍Spark相对于Hadoop MapReduce的3个核心优势——高性能、高容错性和通用性。

1.高性能

Spark继承了Hadoop MapReduce大数据计算的优点,但不同于MapReduce的是:MapReduce每次执行任务时的中间结果都需要存储到HDFS磁盘上,而Spark每次执行任务时的中间结果可以保存到内存中,因而不再需要读写HDFS磁盘上的数据,具体如图1-6所示。这里假设任务的计算逻辑需要执行两次迭代计算才能完成,在MapReduce任务的计算过程中,MapReduce任务首先从HDFS磁盘上读取数据,然后执行第一次迭代计算,等到第一次迭代计算完成后,才会将计算结果写入HDFS磁盘;当第二次迭代计算开始时,需要从HDFS磁盘上读取第一次迭代计算的结果并执行第二次迭代计算,并且等到第二次迭代计算完成后,才将计算结果写到HDFS磁盘上,此时整个迭代计算过程才完成。可以看出,在MapReduce任务的计算过程中,分别经历了两次HDFS磁盘上的数据读和两次HDFS磁盘上的数据写,而大数据计算产生的耗时很大一部分来自磁盘数据的读写,尤其是在数据超过TB(太字节)级别后,磁盘读写这个耗时因素将变得更加明显。

图1-6 对比MapReduce任务计算和Spark任务计算

为了解决数据读写磁盘慢的问题,Spark会将中间的计算结果保存到内存中(前提是内存中有足够的空间)。当后面的迭代计算需要用到这些数据时,Spark可直接从内存中读取它们。因为内存中数据的读写速度和磁盘上数据的读写速度不是一个级别,所以Spark通过从内存中读写数据,这样能够更快速地完成数据的处理。例如,对于同一个需要两次迭代计算的任务,在Spark任务的计算过程中,首先会从HDFS磁盘上读取数据并执行第一次迭代计算,在第一次迭代计算完成后,Spark会将计算结果保存到分布式内存中;等到执行第二次迭代计算时,Spark会直接从内存中读取第一次迭代计算的结果并执行第二次迭代计算,并在第二次迭代计算完成后,将最终结果写入HDFS磁盘。可以看出,Spark在任务执行过程中分别进行了一次HDFS磁盘读和一次HDFS磁盘写。也就是说,Spark仅在第一次读取源数据和最后一次将结果写出时,基于HDFS进行磁盘数据的读写,而计算过程中产生的中间数据都存放在内存中。因此,Spark的计算速度自然要比MapReduce快很多。

2.高容错性

对于任何一个分布式计算引擎来说,容错性都是必不可少的功能,因为几乎没有人能够忍受任务的失败和数据的错误或丢失。在单机环境下,开发人员可以通过锁、事务等方式保障数据的正确性。但是,对于分布式环境来说,既需要将数据打散分布在多个服务器上以并发执行,也需要保障集群中的每份数据都是正确的,后者相对来说实现难度就大多了。另外,由于网络故障、系统硬件故障等问题不可避免,因此分布式计算引擎还需要保障在系统发生故障时,能及时从故障中恢复并保障故障期间数据的正确性。

Spark从基于“血统”(lineage)的数据恢复和基于检查点(checkpoint)的容错两方面提高系统的容错性。

Spark引入了RDD的概念。RDD是分布在一个或多个节点上的只读数据的集合,这些集合是弹性的并且相互之间存在依赖关系,数据集之间的这种依赖关系又称为“血缘关系”。如果数据集中的一部分数据丢失,则可以根据“血缘关系”对丢失的数据进行重建。具体如图1-7所示,这里假设一个任务中包含了Map计算、Reduce计算和其他计算,当基于 Reduce计算的结果进行计算时,如果任务失败导致数据丢失,则可以根据之前Reduce计算的结果对数据进行重建,而不必从Map计算阶段重新开始计算。这样便根据数据的“血缘关系”快速完成了故障恢复。

图1-7 Spark基于“血缘关系”进行数据恢复

Spark任务在进行RDD计算时,可以通过检查点来实现容错。例如,当编写一个Spark Stream程序时,我们可以为其设置检查点,这样当出现故障时,便可以根据预先设置的检查点从故障点进行恢复,从而避免数据的丢失和保障系统的安全升级等。

如图1-8所示,这里通过val ssc = new StreamingContext(conf, Seconds(10))定义了一个名为ssc的StreamingContext,然后通过ssc.checkpoint(checkpointDir)设置了检查点。其中,checkpointDir为检查点的存储路径,当任务发生错误时,可从检查点恢复任务,从而有效保障了任务的安全性。

图1-8 Spark检查点容错

3.通用性

Spark是通用的大数据计算框架,这主要表现在两个方面:一是Spark相对于Hadoop来说支持更多的数据集操作,二是Spark支持更丰富的计算场景。

Hadoop只支持Map和Reduce操作,而Spark支持的数据集操作类型丰富得多,具体分为Transformation操作和Action操作两种。Transformation操作包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort和PartitionBy等操作,Action操作则包括Collect、Reduce、Lookup和Save等操作。另外,Spark的计算节点之间的通信模型不但支持Shuffle操作,而且支持用户命名、物化视图、控制中间结果的存储、数据分区等,具体如图1-9所示。

图1-9 Spark支持的数据集操作

缘于卓越的性能,Spark被广泛应用于复杂的批数据处理(batch data processing),这种场景下的数据延迟一般要求在几十分钟或几分钟;基于历史数据的交互式查询(interactive query)这种场景下的数据延迟一般也要求在几十分钟或几分钟;而基于实时数据流的数据处理(streaming data processing)场景下的数据延迟通常要求在数百毫秒到数秒之间。Spark还被广泛应用于图计算和机器学习领域。Spark常见的应用场景如图1-10所示。

图1-10 Spark常见的应用场景

上面总结了Spark 相对于Hadoop MapReduce都有哪些核心优势。表1-1从数据存储结构、编程范式、数据读写性能和任务执行方式的角度分别对比了Hadoop MapReduce和Spark的差别。

表1-1 Hadoop MapReduce和Spark的差别

Hadoop MapReduce

Spark

数据存储结构

在磁盘上存储HDFS 文件

在内存中构建RDD并对数据进行运算和缓存

编程范式

Map和Reduce

由Transformation操作和 Action操作组成的DAG

数据读写性能

中间的计算结果存储在磁盘上,I/O、序列化及反序列化代价大

中间的计算结果保存在内存中,存取速度比磁盘高了好几个数量级

任务执行方式

任务以进程的方式维护,需要数秒时间才能启动

任务以线程的方式维护,对于小数据集,读取时能够实现亚秒级的延迟

Spark生态也称为BDAS(伯克利数据分析栈),它由伯克利APMLab实验室打造,目标是在算法(algorithm)、机器(machine)和人(people)之间通过大规模集成来构建大数据应用的一个平台,具体关系如图1-11所示。BDAS通过对通信、大数据、机器学习、云计算等技术的运用以及资源的整合,试图通过对人类生活中海量的不透明数据进行收集、存储、分析和计算,来使人类从数字化的角度更好地理解我们自身所处的世界。

图1-11 Spark生态

从Spark生态的概念中可以看出,Spark生态的范围是十分广泛的。Spark生态中到底使用了哪些具体的技术呢?接下来我们从多语言支持、多调度框架的运行、多组件支撑下的多场景应用、多种存储介质、多数据格式等角度介绍Spark生态中一些常用的技术,具体如图1-12所示。

图1-12 Spark生态中常用的技术

Spark基于Spark Core建立了Spark SQL、Spark Streaming、GraphX、Spark MLlib、SparkR等核心组件,基于不同的组件可以实现不同的计算任务。

Spark模块的组成如图1-13所示。

图1-13 Spark模块的组成

从运行模式看,Spark任务的运行模式有本地模式、独立模式、Mesos模式、YARN模式和Kubernetes模式。

从数据源看,Spark任务的计算可以基于HDFS、AWS S3、ElasticSearch、HBase或Cassandra等多种数据源。

1.Spark Core

Spark Core的核心组件包括基础设施、存储系统、调度系统和计算引擎,具体如图1-14所示。其中,基础设施包括SparkConf(配置信息)、SparkContext(上下文信息)、Spark RPC(远程过程调用)、ListenerBus(事件监听总线)、MetricsSystem(度量系统)和SparkEvn(环境变量);存储系统包括内存和磁盘等;调度系统包括DAG调度器和任务调度器等;而计算引擎包括内存管理器、任务管理器和Shuffle管理器等。

图1-14 Spark Core的核心组件

1)Spark基础设施

Spark基础设施为其他组件提供最基础的服务,是Spark中最底层、最常用的一类组件,具体包括如下组件。

2)Spark存储系统

Spark存储系统用于管理Spark运行过程中数据的存储方式和存储位置。Spark存储系统如图1-15所示。Spark存储系统的设计采用内存优先的原则。Spark存储系统首先会将各个计算节点产生的数据存储在内存中,当内存不足时就将数据存储到磁盘上。这种内存优先的存储策略,使得Spark的计算性能无论是在实时流计算还是在批量计算的场景下都表现十分良好,同时使Spark的内存空间和磁盘存储空间得到了灵活控制。除此之外,Spark还可以通过网络将结果存储到远程存储(比如HDFS、AWS S3、阿里云OSS等)中,以实现分离计算和存储的目的。

图1-15 Spark存储系统

3)Spark调度系统

Spark调度系统主要由DAG调度器和任务调度器组成,如图1-16所示。DAG调度器的主要功能是创建作业(job),将DAG中的RDD划分到不同的Stage中,为Stage创建对应的任务(task)、批量提交任务等。任务调度器的主要功能是对任务进行批量调度。Spark使用的调度算法有先进先出(FIFO)、公平调度等。

图1-16 Spark调度系统

4)Spark计算引擎

Spark计算引擎由内存管理器、作业管理器、任务管理器、Shuffle管理器等组成。Spark计算引擎主要负责集群任务计算过程中内存的分配、作业和任务的运行、作业和任务状态的监控及管理等。

2.Spark SQL

Spark提供了两个抽象的编程对象,分别叫作DataFrame(数据框)和Dataset(数据集),它们是分布式SQL查询引擎的基础,Spark正是基于它们构建了基于SQL的数据处理方式,具体如图1-17所示。这使得分布式数据的处理变得十分简单,开发人员只需要将数据加载到Spark中并映射为表,就可以通过SQL语句来实现数据的分析。

图1-17 Spark SQL的构建

1)DataFrame

DataFrame是Spark SQL对结构化数据所做的抽象,可简单理解为DataFrame就是Spark中的数据表,DataFrame相比RDD多了数据的结构信息,即Schema信息。DataFrame的数据结构如下:DataFrame(表)= Data(表数据)+ Schema(表结构信息)。如图1-18所示,其中,DataFrame有Name、Legs、Size三个属性,第一条数据中的Name为pig,第二条数据中的Name为cat,第三条数据中的Name为dog。

图1-18 DataFrame的数据结构

在Spark中,RDD表示分布式数据集,而DataFrame表示分布式数据框,数据集和数据框最大的差别就在于数据框中的数据是结构化的。因此,基于数据框中的数据结构,Spark可以根据不同的数据结构对数据框上的运算自动进行不同维度的优化,从而避免不必要的数据读取等问题,提高程序的运行效率。

RDD和DataFrame的数据结构对比如图1-19所示。这里假设有一个Animal数据集,开发人员从RDD的角度仅能看到每条数据,但从DataFrame的角度能看到每条数据的内部结构,比如Name字段为string类型,Legs字段为int类型,Size字段为double类型。其中,Name字段表示动物的名称,Legs字段表示动物有几条腿,Size字段表示动物的体型大小。这样当Spark程序在DataFrame上对每条数据执行运算时,便可以有针对性地进行优化。例如,要读取Legs等于4的数据,Spark在Legs字段上进行逻辑运算时就会使用int类型的函数进行运算。在Java中,int型数据的存储结构和优化空间相比string型数据要好很多,因此执行效率也会高很多。

图1-19 对比RDD和DataFrame的数据结构

在Spark中,DataFrame可以通过多种方式来构建。例如,开发人员可通过Spark RDD构建DataFrame,可通过Hive读取数据并将它们转换为DataFrame,可通过读取CSV、JSON、XML、Parquet等文件并将它们转换为DataFrame,可通过读取RDBMS中的数据并将它们转换为DataFrame。除此之外,开发人员还可通过Cassandra或HBase这样的列式数据库来构建DataFrame。构建好DataFrame之后,开发人员便可以直接将DataFrame映射为表并在表上执行SQL语句以完成数据分析,如图1-20所示。

图1-20 DataFrame的构建

2)Dataset

Dataset是数据的分布式集合。Dataset结合了RDD强类型化的优点和Spark SQL优化后执行引擎的优点。可以从JVM对象构建Dataset,然后使用Map()、FlatMap()、Filter()等函数对其进行操作。此外,Spark还提供对Hive SQL的支持。

3.Spark Streaming

Spark Streaming为Spark提供了流式计算的能力。Spark Streaming支持从Kafka、HDFS、Twitter、AWS Kinesis、Flume和TCP服务等多种数据源获取数据,然后利用Spark计算引擎,在数据经过Spark Streaming的微批处理后,最终将计算结果写入Kafka、HDFS、Cassandra、Redis和Dashboard(报表系统)。此外,Spark Streaming还提供了基于时间窗口的批量流操作,用于对一定时间周期内的流数据执行批量处理。图1-21展示了Spark Streaming的流式计算架构。

图1-21 Spark Streaming的流式计算架构

4.GraphX

GraphX用于分布式图计算。利用Pregel提供的API,开发人员可以快速实现图计算的功能。

5.Spark MLlib

Spark MLlib(见图1-22)是Spark的机器学习库。Spark MLlib提供了统计、分类、回归等多种机器学习算法的实现,其简单易用的API降低了机器学习的门槛。

图1-22 Spark MLlib

6.SparkR

SparkR(见图1-23)是一个R语言包,它提供了一种轻量级的基于R语言使用Spark的方式。SparkR实现了分布式的数据框,支持类似于查询、过滤及聚合这样的操作,功能类似于R语言中的DataFrame包dplyr。SparkR使得Spark能够基于R语言更方便地处理大规模的数据集,同时SparkR还支持机器学习。

图1-23 SparkR架构

Spark运行模式指的是Spark在哪个资源调度平台上以何种方式(一般分单机和集群两种方式)运行。Spark运行模式主要包括local(本地模式)、standalone(独立模式)、on YARN、on Mesos、on Kubernetes以及on Cloud(运行在AWS等公有云平台上),如表1-2所示。

表1-2 Spark运行模式

运行模式

运行方式

说  明

local

单机方式

本地模式,常用于本地开发测试,本地模式又分为 local单线程和 local-cluster 多线程两种方式

standalone

单机方式

独立模式,运行在Spark自己的资源管理框架上,该框架采用主从结构设计

on YARN

集群方式

运行在YARN资源管理框架上,由YARN负责资源管理,Spark负责任务调度和计算

on Mesos

集群方式

运行在Mesos资源管理框架上,由Mesos负责资源管理,Spark负责任务调度和计算

on Kubernetes

集群方式

运行在Kubernetes上

on Cloud

集群方式

运行在 AWS、阿里云、华为云等公有云平台上

Spark集群主要由集群管理器(cluster manager)、工作节点(worker)、执行器(executor)、Spark应用程序(application)和驱动器(driver)5部分组成,如图1-24所示。

图1-24 Spark集群的角色组成

1.集群管理器

集群管理器用于Spark集群资源的管理和分配。

2.工作节点

工作节点用于执行提交到Spark中的任务。工作节点的工作职责和交互流程如图1-25所示。

图1-25 工作节点的工作职责和交互流程

(1)工作节点通过注册机制向集群管理器汇报自身的CPU和内存等资源使用情况。

(2)工作节点在Spark主节点的指示下创建并启动执行器,执行器是真正执行计算任务的组件。

(3)Spark主节点将任务分配给工作节点上的执行器并运行。

(4)工作节点同步资源信息和执行器状态信息给集群管理器。

3.执行器

执行器是真正执行计算任务的组件,它在工作节点上以一个进程的形式存在,这个进程负责任务的运行并将运行结果保存到内存中或磁盘上。

4.Spark应用程序

Spark应用程序是基于Spark API编写的,其中包括用于实现驱动器功能的驱动程序以及运行在集群的多个节点上的执行器程序。Spark应用程序由一个或多个作业组成,如图1-26所示。

图1-26 Spark应用程序

5.驱动器

驱动器包含了运行应用程序的主函数和构建SparkContext实例的程序。Spark应用程序通过驱动器来与集群管理器和执行器进行通信。驱动器既可以运行在应用程序节点上,也可以由应用程序提交给集群管理器,再由集群管理器安排给工作节点运行。当执行器运行完毕后,驱动器负责将SparkContext关闭。驱动器的主要职责如下。

(1)驱动器包含运行应用程序的主函数。

(2)在Spark中,SparkContext是在驱动器中创建的。SparkContext负责和集群管理器通信,进行资源的申请以及任务的分配和监控等。

(3)Spark在驱动器中划分RDD并生成DAG。

(4)Spark在驱动器中构建作业并将每个作业划分为多个Stage,各个Stage相互独立。作业是由多个Stage构建的并行计算任务,具体由Spark中的Action操作(如Count、Collect、Save等操作)触发。

(5)驱动器能与Spark中的其他组件进行资源协调。

(6)Spark在驱动器中生成任务并将任务发送到执行器上运行。

下面展示了一个读取JSON文件的简单Spark程序。

public class RDDSimple {
        //定义运行应用程序的主函数
    public static void main(String[] args) {
        //初始化SparkConf实例
        SparkConf conf = new
             SparkConf().setAppName(RDDSimple.class.getName()).setMaster("local");
        //初始化JavaSparkContext实例
        JavaSparkContext sc = new JavaSparkContext(conf);
        String filepath = "your_file_path/temp.json"; 
        //读取文件到Spark中
        JavaRDD<String> lines = sc.textFile(filepath);
        JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
        //通过reduce算子触发Action操作
        int totalLength = lineLengths.reduce((a, b) -> a + b);
        System.out.println("[ spark map reduce operation: ] count is:"+totalLength);
        //关闭JavaSparkContext并释放资源
         sc.close();
        }
 }       

上面这个简单的Spark程序包含了运行应用程序的主函数,主函数中定义了SparkConf实例conf和JavaSparkContext实例sc,可通过JavaSparkContext实例sc的textFile()方法读取名为temp.json 的JSON文件。读取结果为JavaRDD类型的数据,可通过调用RDD的map()方法来计算每条数据的长度并通过reduce()方法将所有的长度加起来。其中,RDD的reduce()方法会触发Action操作。统计完之后,需要调用JavaSparkContext实例sc的close()方法以关闭JavaSparkContext并释放资源。

在上面的Spark程序中,SparkConf和JavaSparkContext的初始化代码为驱动程序,运行在驱动器节点上,代码lines.map(s -> s.length())则运行在集群中一个或多个节点的执行器上。

1.SparkContext

SparkContext是整个Spark应用程序中非常重要的对象之一。SparkContext是应用程序和Spark集群交互的通道,如图1-27所示,主要用于初始化运行Spark应用程序所需的基础组件,具体包括如下组件。

另外,SchedulerBackend是调度器的通信终端,主要负责运行任务所需资源的申请。

图1-27 SparkContext

同时,SparkContext还负责向Spark管理节点注册应用程序等。

2.RDD

RDD是弹性分布式数据集,它是Spark对数据和计算模型所做的统一抽象。也就是说,RDD中既包含了数据,也包含了针对数据执行操作的算子。RDD可通过在其他RDD上执行算子操作转换而来,RDD之间是相互依赖的,从而形成了RDD之间的“血缘关系”,这又称为RDD之间的DAG。开发人员可通过一系列算子对RDD进行操作,比如进行Transformation操作和Action操作。

观察图1-28,开发人员可通过sc.textFile()方法从HDFS读取数据到Spark中并将其转换为RDD,然后在RDD上分别执行flatMap、map、reduceByKey算子操作,从而对RDD上的数据进行计算,计算完成后,可通过调用saveAsTextFile()方法将计算结果写到HDFS中。

图1-28 Spark RDD

具体的代码实现片段如下:

val rdd-0 = sc.textFile("your_text_file_path")
val rdd-1 = rdd-0.flatMap(x=> {x.split(" ")})
val rdd-2 = rdd-1.map(x => (x,1))
val rdd-3 = rdd-2.reduceByKey((pre, after) => pre + after)
rdd-3.saveAsTextFile(basePath+"simple-1")

从上述代码可以看出,rdd-0是通过调用sc.textFile()方法转换而来的,rdd-1是通过调用rdd-0的flatMap算子转换而来的,rdd-2是通过调用rdd-1的map算子转换而来的,rdd-3是通过调用rdd-2的reduceByKey算子转换而来的。RDD之间可以相互转换,从而形成了DAG。

在上述操作过程中,textFile、flatMap和map操作属于Transformation操作,reduceByKey和saveAsTextFile操作属于Action操作。在实践中,我们一般不会定义这么多RDD,而是通过链式编程一气呵成,具体的代码实现片段如下:

val rdd-0 = sc.textFile("your_text_file_path")
rdd-0.flatMap(x=> {x.split(" ")}).map(x => (x,1)).reduceByKey((pre, after) => pre + after)
     .saveAsTextFile(basePath+"simple-1")

3.DAG

DAG是有向无环图,通常用于建模。Spark是通过DAG对RDD之间的关系进行建模的。也就是说,DAG描述了RDD之间的依赖关系,这种依赖关系也叫作血缘关系。Spark通过Dependency对象来维护RDD之间的依赖关系。

当处理数据时,Spark会将RDD之间的依赖关系转换为DAG。基于DAG的血缘关系,当计算发生故障时,Spark便能够对RDD快速地进行数据恢复。观察图1-29,这里一共有4个RDD——RDD0、RDD1、RDD2、RDD3。其中,RDD0由外部数据源“数据输入1”转换而来;RDD1由外部数据源“数据输入2”转换而来;RDD2由RDD0和RDD1转换而来,并且在转换过程中发生了数据的Shuffle操作;RDD2在经过转换后生成了RDD3;RDD3执行完毕后,就会将计算结果写入“数据输出”。这几个RDD之间的依赖关系是:RDD3依赖于RDD2,RDD2依赖于RDD0和RDD1。因此,如果在执行RDD3计算时发生故障,那么只需要从RDD2开始重新计算RDD3,而不必从“数据输入1”和“数据输入2”重新开始计算。

图1-29 Spark DAG

4.DAG调度器

DAG调度器面向Stage级别,执行逻辑层面的调度。DAG调度器主要负责Stage的划分、提交、状态跟踪以及结果的获取,如图1-30所示。

图1-30 Spark DAG调度器

5.任务调度器

任务调度器的主要职责包括物理资源调度管理、任务集调度管理、任务执行、任务状态跟踪以及将任务的运行结果汇报给DAG调度器,如图1-31所示。

图1-31 Spark任务调度器

6.作业

Spark应用程序通常包含一个或多个作业。Spark将根据Action操作(如saveAsTextFile、Collect)划分作业并触发作业的执行,而一个作业又分为一个或多个可以并行计算的Stage(至于是否可以并行计算,则需要根据Stage的依赖关系来定)。Stage是根据Shuffle操作来划分的,一个Stage和一个任务集对应,任务集是多个任务的集合,每个任务则对应RDD中某个分区上数据的处理,如图1-32所示。

图1-32 Spark作业

7.Stage

DAG调度器会把DAG划分为相互依赖的多个Stage,Stage的划分依据则是RDD之间依赖的宽窄。当遇到宽依赖(数据发生了Shuffle操作)时,就划分出一个Stage,每个Stage中则包含一个或多个任务。然后,DAG调度器会将这些任务以任务集的形式提交给任务调度器并运行。和RDD之间的依赖关系类似,Stage之间也存在依赖关系。Spark中的Stage分为ShuffleMapStage和ResultStage两种类型。在Spark应用程序中,最后一个Stage为ResultStage,其他的Stage均为ShuffleMapStage。

观察图1-33,其中包含3个Stage,分别为Stage-1、Stage-2和Stage-3。其中,Stage-3依赖于Stage-1和Stage-2。由于A和B的依赖关系为宽依赖,也就是说,从A到B会发生数据的Shuffle操作,因此划分出一个Stage;由于F和G的依赖关系为宽依赖,因此同样划分出一个Stage;由于C和D、D和F、E和F的依赖关系均为窄依赖,因此它们都被划分到同一个Stage中,也就是划分到Stage-2中。

图1-33 Spark Stage

8.任务集

一组任务就是一个任务集,对应一个Stage。任务集中包含多个任务,并且同一任务集中的所有任务之间不会发生数据的Shuffle操作,因此,同一任务集中的所有任务可以相互不受影响地并行执行,如图1-34所示。

图1-34 Spark任务集

9.任务

任务是Spark中独立的工作单元,它以线程的方式在执行器上运行。一般情况下,一个任务对应一个线程,负责处理RDD中某个分区上的数据。任务根据返回类型的不同,又分为ShuffleMapTask和ResultTask两种。

10.总结

图1-35对Spark核心概念做了总结。开发人员构建出来的、可运行的Spark项目称为Spark应用程序,Spark应用程序包含了驱动程序,而驱动程序包含了SparkConf、SparkContext等核心组件的初始化代码。同时,SparkContext又包含了DAG调度器和任务调度器两个核心组件。在执行Spark应用程序的过程中,Spark会根据Action操作将Spark 应用程序划分为多个作业并交给DAG调度器处理,DAG调度器负责将作业构建为DAG并划分Stage,同时提交Stage到任务调度器。任务调度器负责加载并注册任务集到集群管理器,集群管理器负责集群管理、资源分配、任务分配并跟踪作业的提交和执行。实际情况是,任务是在工作节点的执行器上执行的。

图1-35 Spark核心概念

1.Spark作业运行流程简述

Spark应用程序以进程集合为单位运行在分布式集群上,可通过驱动程序的主函数创建SparkContext对象并通过SparkContext对象与集群进行交互,如图1-36所示。

图1-36 Spark作业运行流程

(1)Spark通过SparkContext向集群管理器申请运行应用程序所需的资源(CPU、内存等资源)。

(2)集群管理器分配执行应用程序所需的资源,并在工作节点上创建执行器。

(3)SparkContext将程序代码和任务发送到执行器上运行并收集运行结果到驱动程序节点上。程序代码一般为Jar包或Python文件。

2.Spark RDD迭代过程

Spark数据计算主要通过RDD迭代来完成,RDD是弹性分布式数据集,可以看作对各种数据计算模型所做的统一抽象。在Spark RDD迭代过程中,数据被分到多个分区以进行并行计算,分区的数量取决于应用程序对此是如何设定的。每个分区里的数据只会在一个任务上计算,所有分区可在多个机器节点的执行器上并行执行。

Spark RDD迭代过程如图1-37所示。

(1)SparkContext创建RDD对象,计算RDD之间的依赖关系并由此生成DAG。

(2)DAG调度器将DAG划分为多个Stage,并将Stage对应的任务集提交到集群管理器。Stage的划分依据就是RDD之间依赖的宽窄。当遇到宽依赖时,就划分出一个Stage,每个Stage包含一个或多个任务。

(3)任务调度器通过集群管理器为每个任务申请系统资源并将任务提交到工作节点以执行。

(4)工作节点上的执行器负责执行具体的任务。

图1-37 Spark RDD迭代过程

3.Spark作业运行的详细流程

在简要了解了Spark作业运行的流程之后,接下来介绍Spark作业运行的详细流程,如图1-38所示。

(1)SparkContext向资源管理器注册任务。

(2)资源管理器申请运行任务所需的执行器。

(3)资源管理器分配执行器。

(4)资源管理器启动执行器。

(5)执行器发送心跳到资源管理器。

(6)SparkContext根据代码构建DAG。

(7)DAG调度器将DAG划分为Stage。

(8)DAG调度器将Stage以任务集的方式发送给任务调度器。

(9)执行器向SparkContext申请任务。

图1-38 Spark作业运行的详细流程

(10)SparkContext发送应用程序代码到执行器。

(11)任务调度器将任务发送给执行器运行。

(12)在执行器上运行任务。

(13)应用程序完成运行,释放资源。

4.YARN资源管理器

YARN是一种分布式资源管理和任务调度框架,由资源管理器、节点管理器和应用程序管理器3个核心模块组成。其中,资源管理器负责集群资源的管理、监控和分配;节点管理器负责节点的维护;应用程序管理器负责具体应用程序的调度和协调。由于资源管理器负责所有应用程序的控制以及资源的分配权,因此每个应用程序管理器都会与资源管理器协商资源,同时与节点管理器通信并监控任务的运行。YARN 资源管理器如图1-39所示。

图1-39 YARN资源管理器

1)资源管理器

资源管理器负责整个集群的资源管理和分配。节点管理器以心跳的方式向资源管理器汇报CPU内存等资源的使用情况。资源管理器接收节点管理器的资源汇报信息,具体的资源处理则交给节点管理器负责。

2)节点管理器

节点管理器负责具体节点的资源管理和任务分配,相当于资源管理器,用来管理节点的代理节点,主要负责节点程序的运行以及资源的管理与监控。YARN集群的每个节点上都运行着一个节点管理器。

节点管理器定时向资源管理器汇报节点资源的使用情况和容器的运行状态。当资源管理器宕机时,节点管理器会自动连接资源管理器的备用节点。同时,节点管理器还会接收并处理来自应用程序管理器的容器启动、停止等请求。

3)应用程序管理器

每个应用程序都有一个应用程序管理器。应用程序管理器的主要职责如下。

4)容器

容器是对YARN集群中物理资源的抽象,它封装了每个节点上的资源(如内存、CPU、磁盘、网络等)信息。当应用程序管理器向资源管理器申请资源时,资源管理器为应用程序管理器返回的资源就是以容器表示的。YARN会将任务分配到容器中运行,同时任务只能使用容器中描述的资源,从而达到隔离资源的目的。

5.YARN任务的提交和运行流程

YARN任务的提交由客户端向资源管理器发起,然后由资源管理器启动应用程序管理器并为其分配用于运行作业的容器资源,应用程序管理器收到容器资源后便初始化容器,最后交由节点管理器启动容器并运行具体的任务。这里的任务既可以是MapReduce任务,也可以是Spark任务或Flink任务。任务运行完之后,应用程序管理器向资源管理器申请注销自己并释放资源。YARN任务的提交和运行流程如图1-40所示。

(1)客户端向资源管理器提交任务,其中包括启动应用程序必需的信息。

(2)资源管理器启动一个容器,并在这个容器中启动应用程序管理器。

(3)启动中的应用程序管理器向资源管理器注册自己,并在启动成功后与资源管理器保持心跳。

(4)应用程序管理器向资源管理器发送请求,申请相应数目的容器。

(5)资源管理器返回应用程序管理器申请的容器信息。

图1-40 YARN任务的提交和运行流程

(6)申请成功的容器由应用程序管理器进行初始化。

(7)在初始化容器的启动信息后,应用程序管理器与对应的节点管理器通信,要求节点管理器启动容器。

(8)应用程序管理器与节点管理器保持定时心跳,以便实时对节点管理器上运行的任务进行监控和管理。

(9)容器在运行期间,通过RPC协议向对应的应用程序管理器汇报自己的进度和状态等信息,应用程序管理器对容器进行监控。

(10)在应用程序运行期间,客户端通过RPC协议与应用程序管理器通信以获取应用程序的运行状态、进度更新等信息。

(11)应用程序完成运行,应用程序管理器向资源管理器申请注销自己,并释放占用的容器资源。

6.Spark应用程序在YARN上的执行流程

Spark应用程序在生产环境中一般运行在YARN上。下面介绍Spark应用程序在YARN 上的执行流程,如图1-41所示。

图1-41 Spark应用程序在YARN上的执行流程

(1)提交Spark应用程序和相关依赖到YARN资源管理器。

(2)Spark引擎加载应用程序管理器。

(3)Spark驱动器开始执行。

(4)SparkContext向应用程序管理器申请资源。

(5)应用程序管理器向YARN资源管理器申请容器资源。

(6)YARN节点管理器启动容器。

(7)YARN节点管理器启动Spark执行器。

(8)将执行器注册到Spark驱动器。

(9)SparkContext加载并运行任务。

前面介绍了Spark的概念、原理、特点、模块组成、集群角色和作业运行流程。下面开始进行Spark实战,具体内容包括Spark独立环境安装实战、YARN环境安装实战、Spark批处理作业入门实战和Spark流式作业入门实战。

Spark独立模式是Spark实现的资源调度框架,采用的是主备架构。下面介绍单机版Spark 独立环境的安装以及任务的快速运行。

(1)从Apache官网下载最新的Spark安装包。注意,我们需要下载编译好的带Hadoop的Spark版本并解压。

tar -zxvf spark-3.0.0-bin-hadoop2.7.tgz

(2)按照如下命令在profile文件中加入Spark环境变量。

#编辑profile文件
vim /etc/profile
#加入Spark环境变量
export SPARK_HOME=/your_spark_path/spark-3.0.0-bin-hadoop2.7/conf
export PATH=$PATH:$SPARK_HOME/bin
#执行source命令,使文件修改立刻生效
source /etc/profile

(3)按照如下命令编辑 spark-env.sh 配置文件。

#进入Spark的conf目录
cd  $SPARK_HOME/conf/
#根据Spark提供的spark-env.sh.template模板文件复制出新的名为spark-env.sh的配置文件
cp  spark-env.sh.template spark-env.sh
#编辑spark-env.sh文件
vim  spark-env.sh
#配置JAVA_HOME、SCALA_HOME、SPARK_HOME
export JAVA_HOME=/your_jdk_path/jdk1.8.0_201.jdk
#SCALA_HOME:Scala的安装路径
export SCALA_HOME=/your_scala_path/ /scala-2.12.12
#SPARK_HOME:Spark的安装路径
export SPARK_HOME= /your_spark_path/spark-3.0.0-bin-hadoop2.7/
#SPARK_MASTER_IP:Spark Master的IP地址
export SPARK_MASTER_IP=127.0.0.1 
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_EXECUTOR_MEMORY=512M
export SPARK_WORKER_MEMORY=1G
#Spark Master UI的IP地址
export master=spark://127.0.0.1:7070

这里的Java安装和Scala安装不再详细介绍。安装Scala时,需要注意的是,Scala的版本需要和Spark编译的版本相同。

(4)按照如下命令编辑slaves配置文件。

#进入Spark的conf目录 
cd /your_spark_path/spark-3.0.0-bin-hadoop2.7/conf/
#复制出一份新的名为slaves的配置文件
cp slaves.template slaves 
#在slaves配置文件中加入工作节点的host信息 
echo "localhost" >> slaves

(5)按照如下命令启动Spark。

#进入Spark的sbin目录
cd $SPARK_HOME/sbin/ 
#启动Spark
./start-all.sh

在浏览器的地址栏中输入http://127.0.0.1:8080并按Enter键,查看Spark Master页面,结果如图1-42所示。从图1-42可以看出,Spark Master的地址(URL)为spark://wangleigis163comdeMacBook- Pro.local:7077,运行中的工作节点(Alive Workers)的数量为1,正在使用的内存大小(Memory in use)为1024MB,集群的状态(Status)为运行中(ALIVE)。

图1-42 Spark Master页面

(6)按照如下命令提交Spark默认的examples示例程序到集群。

#进入Spark路径
cd $SPARK_HOME
#启动Spark Pi
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:
7077 examples/jars/spark-examples_2.12-3.0.0.jar

在上述代码中,class参数代表应用程序的入口类,master参数代表要将examples示例程序提交到哪个集群环境,最后一个参数examples/jars/spark-examples_2.12-3.0.0.jar代表Jar包路径。

运行结果如图1-43所示。从图1-43中可以看出,我们在集群上运行了一个名(Name)为Spark Pi的应用程序,提交时间(Submit Data)为2021-01-29 03:46:26,运行状态(State)为FINISHED,Spark Pi应用程序在运行过程中使用了512MB(1MB=1024KB)的内存和4个CPU核。

图1-43 examples示例程序的运行结果

(7)实现多节点部署。

为了实现多节点部署,我们需要执行以下操作:在各个服务器上配置相同的Scala和Java环境,然后在slaves配置文件加入其他服务器的IP地址并将安装包复制到其他服务器,最后在Master节点上执行./start-all.sh命令以启动集群。这里需要说明的是,执行集群启动命令的服务器节点会以Master角色启动。

在on YARN模式下,Spark通常以HDFS为数据存储,以YARN为资源管理器,并以Spark应用为计算引擎来完成数据的计算。

on YARN模式是Spark开发中最常用的模式。Hadoop中包含了HDFS、MapReduce和YARN,下面详细介绍如何安装Hadoop并且提交Spark任务到YARN。

(1)从Hadoop官网下载需要的Hadoop安装包,这里选择下载最新的3.3.0版本。

(2)在服务器上执行以下命令,完成 SSH 配置。

yum install ssh     #安装ssh模块
yum install rsync   #安装rsync模块
#生成密钥并写入~/.ssh目录
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
#将公钥写入authorized_keys。如果有多个服务器,那么需要将多个服务器的公钥写入同一个
#authorized_keys并将其复制到多个服务器上
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 
chmod 600 ~/.ssh/authorized_keys  #密钥文件授权
#免密登录服务器,第一次登录时需要输入yes,从而将服务器加入未知host列表
ssh 127.0.0.1

(3)执行如下tar命令,解压安装包。

tar -xzvf hadoop-3.3.0.tar.gz

(4)进入Hadoop解压目录,执行以下命令,在hadoop-env.sh中加入JDK的环境配置。

cd hadoop-3.3.0 
vim etc/hadoop/hadoop-env.sh 
#在etc/hadoop/hadoop-env.sh文件中加入JAVA_HOME
export JAVA_HOME=/your_java_path/jdk1.8.0_201.jdk/ 

(5)进入Hadoop解压目录,执行以下命令,在core-site.xml中加入HDFS配置,从而指明HDFS集群中fs.defaultFS的服务地址为hdfs://localhost:9000。

vim etc/hadoop/core-site.xml 
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

(6)进入Hadoop解压目录,执行以下命令,在hdfs-site.xml中加入HDFS文件的副本数,一般设置为3即可。

vim etc/hadoop/hdfs-site.xml
<configuration>
   <property> 
    <name>dfs.datanode.data.dir</name>
 <value>file:///your_file_path/dfs/data</value>
</property>
<property>
    <name>dfs.namenode.name.dir</name> 
 <value>file:/// your_file_path/dfs/name</value> 
</property>  
  <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
</configuration>

(7)进入Hadoop解压目录,执行namenode -format命令,初始化NameNode。

bin/hdfs namenode -format

(8)进入Hadoop解压目录,执行sbin/start-all.sh命令,启动HDFS NameNode和DataNode。启动命令执行成功后,反馈如图1-44所示。从图1-44中可以看出,资源管理器和节点管理器都已启动成功。

图1-44 启动YARN

(9)在浏览器的地址栏中输入“http://127.0.0.1:9870/”并按Enter键,查看HDFS Web页面以及整个HDFS集群的统计信息。注意,新版Hadoop在HDFS中的默认端口号是9870。YARN HDFS管理界面如图1-45所示。

图1-45 YARN HDFS管理界面

(10)在浏览器的地址栏中输入http://127.0.0.1:8088/cluster 并按Enter键,查看YARN服务。YARN 管理界面如图1-46所示。

(11)在/etc/profile文件中配置Hadoop环境变量,具体配置信息如下。

export HADOOP_HOME=/your_spark_home/hadoop-3.3.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"

图1-46 YARN管理界面

按照上述代码配置好Hadoop环境变量后,当Spark以--master yarn向YARN提交任务时,就会根据环境变量读取Hadoop配置,并将任务提交到Hadoop配置对应的集群上。所以一般情况下,任务会在设置好环境变量的YARN 资源管理器上统一提交。

(12)将Spark任务提交到YARN上运行,具体的提交命令如下。

cd $SPARK_HOME/ && bin/spark-submit --class org.apache.spark.examples.SparkPi --master
yarn  examples/jars/spark-examples_2.12-3.0.0.jar

任务提交后,在YARN管理界面上就可以看到任务已经成功提交到YARN集群上并运行,如图1-47所示。

图1-47 Spark任务已经成功提交到YARN集群上并运行

Spark批处理框架一般包括数据摄取层、数据存储层、计算层和调度层,如图1-48所示。

图1-48 Spark批处理框架

介绍完Spark批处理框架后,接下来我们开始进行Spark批处理作业入门实战。一个简单的Spark批处理应用程序的构建过程如下。

首先,基于Spark进行数据分析的前提是有数据。由于大部分基于Spark的分析数据也都是基于HDFS的,因此这里执行如下命令,向HDFS导入示例数据以进行Spark数据分析。

bin/hdfs dfs -put /your_data_path/people.json /input

然后,新建基于MVN 的Spark项目,并加入如下Spark依赖。

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.0.0</spark.version>
    <scala.version>2.12</scala.version>
</properties>
<dependencies>
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

接下来,编写SparkBatchDemo并创建SparkSession,读取HDFS数据后输出数据,输出数据的Schema信息,在DataFrame上执行select、filter、groupBy操作并将结果写入HDFS。SparkBatchDemo的实现代码如下。

import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.control.Exception
object SparkBatchDemo {
  def main(args: Array[String]) {
    try {
      //定义输入文件地址
      val hdfsSourcePath = "hdfs://127.0.0.1:9000/input/people.json"
      //创建SparkSession
      val spark = SparkSession
      .builder().appName("SparkBatchDemo")     // .master("local")
        .master("yarn")
        // .config("spark.some.config.option", "some-value")
        .getOrCreate()
      //数据加载
      val df = spark.read.json(hdfsSourcePath)
      //数据分析处理
      df.show() 
      df.printSchema()  //输出Schema信息
      df.select("name").show()
      //对每行数据中的age加1 
      import spark.implicits._
      df.select($"name", $"age" + 1).show()
      df.filter($"age" > 21).show()
      df.groupBy("age").count().show()
      //将数据分析结果写出(将结果写到HDFS中)
      val hdfsTargetPath = "hdfs://127.0.0.1:9000/input/people_result.json"
      //如果提示文件存在,就执行bin/hdfs dfs -rm -r /input/people_result.json以删除数据
      df.write.mode(SaveMode.Overwrite).json(hdfsTargetPath)
      //关闭SparkSession
      spark.close()
    } catch {
      case ex: Exception => {
        ex.printStackTrace()                      // 输出到标准err
        System.err.println("exception===>: ...")  // 输出到标准err
      }
    }
  }
}

上述代码中的核心逻辑如下。

val spark = SparkSession.builder().appName("SparkBatchDemo").master("yarn").getOrCreate()用于构建SparkSession实例spark。其中的appName表示应用程序的名称,master表示运行模式。如果master为yarn,就表示在YARN上运行;如果master为local,就表示在编译器中启动Spark环境以运行。

val df = spark.read.json(hdfsSourcePath)表示将hdfsSourcePath中的数据加载到Spark中并转换为DataFrame。hdfsSourcePath中的JSON数据如下:

{"name":"Michael","time":"2019-06-22 01:45:52.478","time1":"2019-06-22 02:45:52.478"}
{"name":"Andy", "age":30,"time":"2019-06-22 01:45:52.478","time1":"2019-06-22 02:45:52.478"}
{"name":"Justin", "age":19,"time":"2019-06-22 01:45:52.478","time1":"2019-06-22 02:45:52.478"}
{"name":"Andy", "age":29,"time":"2019-06-22 01:45:52.478","time1":"2019-06-22 02:45:52.478"}

df.show()用于显示数据,执行结果如下:

+-----+--------+--------------------+--------------------+
|  age|    name|time|                 time1|
+-----+--------+--------------------+--------------------+
|null | Michael|2019-06-22 01:45:...|2019-06-22 02:45:...|
|  30 |    Andy|2019-06-22 01:45:...|2019-06-22 02:45:...|
|  19 |  Justin|2019-06-22 01:45:...|2019-06-22 02:45:...|
|  29 |    Andy|2019-06-22 01:45:...|2019-06-22 02:45:...|
+----+-------+--------------------+--------------------+

可以看出,HDFS中的JSON数据已被解析并正确加载到Spark中了。

df.printSchema()用于输出df的Schema信息,执行结果如下:

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- time: string (nullable = true)
 |-- time1: string (nullable = true)

可以看出,Spark能够自动识别JSON中的数据格式和类型。其中的字段分别为age、name、time、time1,数据类型则分别为long、string、string、string。

df.select("name").show()只查询name字段,执行结果如下:

+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
| Andy|
+-------+

可以看出,Spark能够成功将name字段中的数据过滤出来。

df.select($"name", $"age"+1).show()能够查询name字段和age字段中的数据,并且对age字段中的数据执行加1操作,执行结果如下:

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
|   Andy|       30|
+-------+---------+

可以看出,Spark不仅查询出name字段和age字段中的数据,而且成功对age字段中的数据执行了加1操作。

df.filter($"age" > 21).show()用于过滤age>21的数据,执行结果如下:

+---+----+--------------------+--------------------+
|age|name|                time|               time1|
+---+----+--------------------+--------------------+
| 30|Andy|2019-06-22 01:45:...|2019-06-22 02:45:...|
| 29|Andy|2019-06-22 01:45:...|2019-06-22 02:45:...|
+---+----+--------------------+--------------------+

df.groupBy("age").count().show()用于根据age字段进行统计,也就是统计对于每个年龄有多少条数据,执行结果如下:

+----+-----+
| age|count|
+----+-----+
|  29|    1|
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

df.write.mode(SaveMode.Overwrite).json(hdfsTargetPath)用于将来自df的数据写到hdfsTargetPath对应的目录中,其中,SaveMode.Overwrite表示保存模式(SaveMode)为覆写(Overwrite)。如果目录对应的数据不存在,就直接写入;否则,覆写之前的数据。

spark.close()用于关闭SparkSession并释放资源。

通过以上分析,Spark 批量(离线)任务的代码实现步骤总结如下。

(1)创建SparkSession。

(2)加载数据。

(3)进行数据分析。

(4)写出数据分析结果。

(5)关闭SparkSession。

接下来,编译和提交任务。在编译器中执行如下命令,将刚才的任务提交到YARN上并执行。其中,参数--class "SparkBatchDemo"表示应用程序的入口类为SparkBatchDemo,--master yarn表示将应用程序提交到YARN上并运行,--deploy-mode cluster表示在YARN上以cluster模式运行。

cd $SPARK_HOME && ./bin/spark-submit --class "SparkBatchDemo" --master yarn 
  --deploy-mode cluster spark-1.0.jar

任务提交后,在YARN页面上即可看到任务的执行情况,如图1-49所示。

图1-49 Spark任务在YARN上的执行情况

单击任务,查看具体的日志信息,如图1-50所示。

图1-50 Spark任务在YARN上执行的日志信息

Spark Streaming是基于Spark API的流式计算扩展,它实现了一个高吞吐量、高容错的流式计算引擎。Spark Streaming首先从多种数据源(如Kafka、Flume、Kinesis或TCP服务等)获取数据,然后使用高级函数(如Map、Reduce、Join、Window函数)组成的计算逻辑单元对数据进行处理,最后将经过处理的数据实时推送到消息服务、文件系统、数据库等。除基本的流式计算外,应用程序还可以在数据流上应用Spark 的机器学习和图形处理算法。Spark Streaming数据流程如图1-51所示。

图1-51 Spark Streaming数据流程

下面介绍如何构建一个从Kafka中获取数据,然后实时处理数据并将处理结果写出的应用程序,具体步骤如下。

(1)构建基于MVN的Spark项目,并加入Spark Streaming和Kafka依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
    <!--<scope>provided</scope>-->
</dependency>

(2)安装并启动ZooKeeper和Kafka。

① 安装并启动ZooKeeper:ZooKeeper是开源的分布式程序协调服务器,它能够为分布式程序提供一致性服务,这种一致性是通过基于Poxos算法的ZAB协议完成的。ZooKeeper主要用于配置维护、域名管理、分布式同步、集群管理等。具体的安装方法详见官网https://zookeeper.apache.org/ doc/r3.6.2/zookeeperStarted.html。ZooKeeper安装好之后,可执行如下命令以启动ZooKeeper:

cd /your_zookeeper_path/apache-zookeeper-3.5.5-bin && bin/zkServer.sh start

② 安装并启动Kafka:Kafka是分布式消息平台,主要功能是发布和订阅消息,作用类似于消息队列或企业消息系统。Kafka主要用于在多个应用系统之间构建消息传递的管道。具体的安装方法详见官网http://kafka.apache.org/quickstart。Kafka安装好之后,可执行如下命令以启动Kafka:

cd /your_kafka_path/kafka_2.12-2.2.0 && nohup bin/kafka-server-start.sh config/server.
        properties

(3)编写Spark Streaming演示程序SparkStreamingDemo,实现实时从Kafka接收数据并将符合JSON要求的数据输出到HDFS中,代码如下:

object SparkStreamingDemo {
  def main(args: Array[String]): Unit = {
    try {
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "spark_stream_cg",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      val topics = Array("spark_topic_1", "spark_topic_2")
      val conf = new SparkConf().setAppName("SparkStreamingDemo")
        //.setMaster("local")
       .setMaster("yarn")
       //创建streamingContext
      val streamingContext = new StreamingContext(conf, Seconds(30))
      val checkPointDirectory = "hdfs://127.0.0.1:9000/spark/checkpoint"
      streamingContext.checkpoint(checkPointDirectory);
      //数据接入(接入Kafka数据)
      val stream = KafkaUtils.createDirectStream[String, String](
        streamingContext,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)
      )
      val etlResultDirectory = "hdfs://127.0.0.1:9000/spark/etl/"
      //定义数据计算逻辑
      val etlRes = stream.map(record => (record.value().toString)).filter(message =>  
                      None != JSON.parseFull(message))
      etlRes.count().print()
      //输出流式计算结果
      etlRes.saveAsTextFiles(etlResultDirectory)
      //启动streamingContext
      streamingContext.start()
      streamingContext.awaitTermination()
    } catch {
      case ex: Exception => {
        ex.printStackTrace()                      // 输出到标准err
        System.err.println("exception===>: ...")  // 输出到标准err
      }
    }
  }
}
}

上述代码中的核心逻辑如下。

① 定义了一个名为kafkaParams的实例对象,并通过"bootstrap.servers" -> "localhost:9092"设置Kafka服务的地址为localhost:9092,通过"key.deserializer" -> classOf[StringDeserializer]设置Kafka消息中键的序列化方式为StringDeserializer,通过"value.deserializer" -> classOf[StringDeserializer]设置Kafka消息中消息体的序列化方式为StringDeserializer,通过"enable.auto.commit" -> (false: java.lang.Boolean)设置Kafka Offset的提交方式为非自动提交。

② 通过val topics = Array("spark_topic_1", "spark_topic_2")定义了需要接入Kafka中有关哪些Topic的数据到Spark。

③ 通过val conf = new SparkConf().setAppName("SparkStreamingDemo").setMaster("yarn")定义了SparkConf实例conf。

④ 通过val streamingContext = new StreamingContext(conf, Seconds(30))创建了用于流式计算的StreamingContext实例streamingContext。

⑤ 通过streamingContext.checkpoint(checkPointDirectory)设置了流式计算的检查点,用于中间状态数据的存储和故障恢复。

⑥ 通过调用KafkaUtils的createDirectStream()方法实现了Kafka数据源的接入。

⑦ 通过调用map算子对每条消息进行处理(map(record => (record.value().toString)表示将Kafka中的每条消息转换为字符串并返回),然后调用filter算子以过滤出数据格式为标准JSON的数据。

⑧ 通过etlRes.count().print()输出数据。

⑨ 通过etlRes.saveAsTextFiles(etlResultDirectory)将数据保存到HDFS中。

⑩ 通过调用streamingContext.start()启动流式计算,这样便完成了整个流式计算的逻辑。

通过以上分析,流式计算的代码实现步骤总结如下。

① 创建StreamingContext。

② 接入流式数据。

③ 定义数据计算逻辑。

④ 输出流式计算结果。

⑤ 启动StreamingContext。

(4)添加依赖的Jar包:代码编写好之后,执行mvn install便可对它们进行打包。程序打包完之后,需要将Spark Streaming Kafka依赖包复制到Spark目录的lib子目录下,具体包括如下Jar包。

kafka-clients-2.4.1.jar 
spark-streaming-kafka-0-10_2.12-3.0.0.jar 
spark-token-provider-kafka-0-10_2.12-3.0.0.jar

(5)运行程序:程序打包完且依赖的Jar包复制好之后,执行如下命令即可启动任务。

cd $SPARK_HOME && nohup ./bin/spark-submit --class "SparkStreamingDemo" --master yarn 
--deploy-mode cluster /spark-1.0.jar &

(6)创建Kafka Topic:在命令行中执行如下命令,创建一个名为spark_topic_1的Topic。

./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 
--partitions 1 --topic spark_topic_1

(7)启动Kafka Producer:执行如下命令以启动Kafka Producer。

./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic spark_topic_1 

(8)向Kafka发送数据:在控制台中输入如下数据,按Enter键,将数据发送到Kafka。

{"name":"Justin", "age":19,"time":"2019-06-22 01:45:52.478","time1":"2019-06-22 02:45:52.478"

(9)查看实时计算结果:在任务运行过程中,通过YARN页面查看Spark Streaming程序在YANR上的运行状态,并查询运行过程中有关输出的日志。通过图1-52可以看出,Spark Streaming已经在接收来自Kafka的数据并进行处理了。

图1-52 Spark Streaming程序在YARN上运行

此外,我们还可以登录HDFS页面以查看数据写入情况,通过图1-53可以看出,经过Spark Streaming处理的数据已经被实时写入HDFS。

图1-53 经过Spark Streaming处理的数据已经被实时写入HDFS

[1] 1PB = 210TB = 220GB = 230MB = 240KB = 250B。


相关图书

SPSS医学数据统计与分析
SPSS医学数据统计与分析
首席数据官知识体系指南
首席数据官知识体系指南
大数据技术基础
大数据技术基础
时序大数据平台TDengine核心原理与实战
时序大数据平台TDengine核心原理与实战
大数据安全治理与防范——流量反欺诈实战
大数据安全治理与防范——流量反欺诈实战
大数据实时流处理技术实战——基于Flink+Kafka技术
大数据实时流处理技术实战——基于Flink+Kafka技术

相关文章

相关课程