本章主要介绍Spark的机器学习套件MLlib。MLlib从功能上说与Scikit-Learn等机器学习库非常类似,但计算引擎采用的是Spark,即所有计算过程均实现了分布式,这也是它和其他机器学习库最大的不同。但读者在学习MLlib的时候,大可不必关注其分布式细节,这是MLlib组件与其他组件很不一样的地方,这里不用考虑GraphX、Structured Streaming中的关键抽象、分布式计算框架,而只需关注那些机器学习任务本身的一些东西,如参数、模型、工作流、测试、算法调优等。
本章包含以下内容:
- 机器学习;
- Spark MLlib与Spark ML;
- 数据预处理;
- 分类算法应用;
- 聚类算法应用;
- 推荐系统应用;
- 训练之后;
- 流式机器学习。
6.1 机器学习
在本节中,我们将试着从计算机科学、统计学和数据分析的定义角度来机器学习。机器学习是计算机科学的一个分支,为计算机提供了无须明确编程的学习能力(Arthur Samuel,1959)。这个研究领域是从人工智能中的模式识别和计算学习理论的研究中演化而来的。
更具体地说,机器学习探讨了启发式学习和基于数据进行预测的算法的研究和构建。这种算法通过从样本输入构建模型,通过制订数据驱动的预测来代替严格的静态程序代码。现在来看看卡耐基梅隆的Tom M. Mitchell教授对机器学习的定义,他从计算机科学的角度解释了机器学习的真正意义:
对于某类任务T和性能度量P,如果一个计算机程序在T上以P衡量的性能随着经验E而自我完善,那么就称这个计算机程序从经验E中学习。
基于该定义,我们能够得出计算机程序或者机器能够:
- 从历史数据中学习;
- 通过经验而获得提升;
- 交互式地增强可用于预测问题结果的模型。
典型的机器学习任务是概念学习、预测建模、聚类以及寻找有用的模式。最终目标是提高学习的自动化程度,从而不再需要人为地干预,或尽可能地降低人为干预的水平。
6.1.1 典型的机器学习工作流
一个典型的机器学习应用程序涉及从输入、处理到输出的几个步骤,形成一个科学的工作流程,如图6-1所示。典型的机器学习应用程序涉及以下步骤。
图6-1 机器学习流程
(1)加载样本数据。
(2)将数据解析为算法所需的格式。
(3)预处理数据并处理缺失值。
(4)将数据分成两组:一组用于构建模型(训练数据集),另一组用于测试模型(验证数据集)。
(5)运行算法来构建或训练你的ML模型。
(6)用训练数据进行预测并观察结果。
(7)使用测试数据测试和评估模型,或者使用第三个数据集(称为验证数据集)运用交叉验证技术验证模型。
(8)调整模型以获得更好的性能和准确性。
(9)调整模型扩展性,以便将来能够处理大量的数据集。
(10)部署模型。
在步骤4中,实验数据集是随机分割的,通常被分为一个训练数据集和一个称为采样的测试数据集。训练数据集用于训练模型,而测试数据集用于最终评估最佳模型的性能。更好的做法是尽可能多地使用训练数据集以提高泛化性能。另一方面,建议只使用一次测试数据集,以在计算预测误差和相关度量时避免过度拟合问题。
6.1.2 机器学习任务的学习类型
根据学习系统学习反馈的本质,机器学习任务通常被分为以下3类,即监督学习、无监督学习以及增强学习,如图6-2所示。
图6-2 机器学习的任务类型
1.监督学习
监督学习的目标是学习将输入映射到与现实世界相一致的输出的一般规则。例如,垃圾邮件过滤数据集通常包含垃圾邮件以及非垃圾邮件。因此,能够知道训练集中的数据是垃圾邮件还是正常邮件。我们有机会利用这些信息来训练模型,以便对新来的邮件进行分类。如图 6-3所示,该图为监督学习的示意图。算法找到所需的模式后,可以使用这些模式对未标记的测试数据进行预测。这是最常见的机器学习任务类型,MLlib 也不例外,其中大部分算法都是监督学习,如朴素贝叶斯、逻辑回归、随机森林等,监督学习的数据处理流程大致如图 6-3所示。
图6-3 监督学习流程
从图中可以看出,经过数据预处理后,数据被分为两部分,一部分为测试集,另一部分为训练集,通过学习算法,可以由训练集得到我们所需的模型,模型会用测试集进行验证,工程师会根据验证的情况对模型进行调优,实现一个数据驱动的优化过程。
2.无监督学习
在无监督学习中,数据没有相关的标签,也就是说无法区分训练集与测试集。因此,我们需要用算法上加上标签,如图 6-4 所示。因此,标签必须从数据集中推断出来,这意味着无监督学习算法的目标是通过描述结构,以某种结构化的方式对数据进行预处理。
图6-4 无监督学习流程
为了克服无监督学习中的这个障碍,通常使用聚类技术,基于某些相似性度量来对未标记样本进行分组。因此,无监督学习任务会涉及挖掘隐藏的模式、特征学习等。聚类是智能地对数据集中的元素进行分类的过程。总体思路是,同一个类中的两个元素比属于不同类中的元素彼此更为“接近”。“接近”的定义可以有很多种。
无监督的例子包括聚类、频繁模式挖掘以及降维等。MLlib也提供了 k均值聚类、潜在狄利克雷分布(Latent Dirichlet Allocation)、主成分分析(Principal Component Analysis)、奇异值分解(Singular value decomposition)等聚类与降维算法。
3.增强学习
作为一个人,我们也曾从过去的经验中学习。多年来积极的赞美和负面的批评都有助于塑造出今天的我们。通过与朋友、家人,甚至陌生人互动,我们可以了解什么让人开心,什么让人难过。当你执行某个操作时,你有时会立即得到奖励。例如,在附近找到购物中心可能会产生即时的满足感,但也有些时候,奖励不会马上兑现,比如长途跋涉去寻找某个地方。这些都与增强学习密切相关。
因此,增强学习是一种模型本身从一系列行为中学习的技术。数据集或样本的复杂性对于需要算法成功学习目标函数的增强学习非常重要。此外,为了达到最终目标,每条数据都需要做到一点,即在保证与外部环境相互作用的同时,应确保奖励函数的最大化,如图 6-5所示。
图6-5 增强学习流程
从图 6-5 中也可以看出增强学习与监督学习最大的不同是其训练集包含着一个尝试的过程,会试图从环境中获得评价或者反馈。如围棋这种博弈类游戏,会有两个代理互相用已有的模型制订策略,并根据最后的结果修正自己的模型的过程。谷歌公司的AlphaGo就是深度学习与增强学习相结合的一个很好的例子。总而言之,增强学习在行动——评价的环境中获得知识,改进行动方案以适应环境。增强学习在物联网环境、路线问题、股市交易、机器人等场景得到了广泛应用。
6.2 Spark MLlib与Spark ML
在Spark MLlib模块中,可以看到它的源码主要分为两个包:spark.ml与spark.mllib,我们将前者称为Spark ML API,后者称为Spark MLlib API,有些算法在两个包中都可以找到,如协同过滤,有些算法只有MLlib有,如SVD。除此以外,它们还有一些区别。
一言以蔽之,MLlib与ML之间最大的区别在于,ML基于DataFrame,而MLlib API基于RDD,这与GraphX和GraphFrame之间的关系类似。在Spark 2.0后,基于RDD的API,也就是MLlib API,就已经进入了维护状态,而Spark MLlib首要的API为ML API。虽然如此,Spark MLlib仍然会以修复Bug的方式支持MLlib API,但不会增加新特性了。在Spark 2.x的版本中,ML API会逐渐变得与MLlib API一样,在完成这个过程后,MLlib API会被弃用。在Spark 3.0中,MLlib API会被彻底舍弃。
Spark MLlib转而提供基于DataFrame API的原因有以下3点。
- 使用DataFrame可以享受到Spark 2.x带来的性能优化,以及跨语言的统一API。
- DataFrame易于使用ML Pipelines特性,这在特征转换方面尤其有用,其中Spark ML Pipelines会在下一节介绍。
- ML提供了一套跨语言以及跨机器学习算法的API。
本书提到的Spark ML,并不是一个正式的名字,而是指Spark MLlib中基于DataFrame的那一套API,它主要是由org.apache.spark.ml的包名,以及术语Spark ML Pipelines得名而来。此外,由于MLlib API是注定会被替代的,因此本书不会对其着墨太多,主要以Spark ML API为主。
Spark ML Pipelines
Spark MLlib想成为大数据机器学习的最佳实践,简化机器学习过程,并使其可扩展。Spark ML API引入了Pipelines API(管道),这类似于Python机器学习库Scikit-Learn中的Pipeline,它采用了一系列API定义并标准化了6.1.1节中工作流,它包含了数据收集、预处理、特征抽取、特征选择、模型拟合、模型验证、模型评估等一系列阶段。例如,对文档进行分类也许会包含分词、特征抽取、训练分类模型以及调优等过程。大多数机器学习库不是为分布式计算而设计的,也不提供Pipeline的创建与调优,而这就是Spark ML PipeLines要做的。
Spark ML Pipelines就是对分布式机器学习过程进行模块化的抽象,这样使得多个算法合并成一个Pipeline或者工作流变得更加容易,下面是Pipelines API的关键概念。
- DataFrame:DataFrame与Spark SQL中用到的DataFrame一样,是Spark的基础数据结构,贯穿了整个Pipeline。它可以存储文本、特征向量、训练集以及测试集。除了常见的类型,DataFrame还支持Spark MLlib特有的Vector类型。
- Transformer:Transformer对应了数据转换的过程,它接收一个DataFrame,在它的作用下,会生成一个新的DataFrame。在机器学习中,在涉及特征转换的过程中经常会用到,另外还会用于训练出的模型将特征数据集转换为带有预测结果的数据集的过程,Transformer都必须实现transform()方法。
- Estimator:从上面 Transformer 的定义可以得知,训练完成好的模型也是一个Transformer,那么Estimator包含了一个可以让数据集拟合出一个Transformer的算法,Estimator必须实现fit()方法。
- Pipeline:一个Pipeline将多个Transformer和Estimator组装成一个特定的机器学习工作流。
- Parameter:所有的Estimator和Transformer共用一套通用的API来指定参数。
文档分类是一个在自然语言处理中非常常见的应用,如垃圾邮件监测、情感分析等,下面通过一个文档分类的例子来让读者对Spark的Pipeline有一个感性的理解。简单来说,任何文档分类应用都需要以下4步。
(1)将文档分词。
(2)将分词的结果转换为词向量。
(3)学习模型。
(4)预测(是否为垃圾邮件或者正负情感)。
比如在垃圾邮件监测中,我们需要通过邮件正文甄别出哪些是垃圾邮件,邮件正文一般会是一段文字,如:代开各种发票,手续费极低,请联系我。这样一段文字是无法直接应用于Estimator的,需要将其转换为特征向量,一般做法是用一个词典构建一个向量空间,其中每一个维度都是一个词,出现过的为1,未出现的为0,再根据文档中出现的词语的频数,用TF-IDF算法为文档中出现的词维度赋予权重。这样的话,每个文档就能被转为一个等长的特征向量,如下:
(0, 0, …, 0.27, 0, 0, …, 0.1, 0)
接着就可以用来拟合模型并输出测试结果。
用一个流程图来表示整个过程,如图6-6所示,其中Tokenizer和HashingTF为Transformer,作用分别是分词和计算权重,训练出的模型也是Transformer,用来生成测试结果;Estimator采用的是逻辑回归算法(LR);DS0-DS3都是不同阶段输出的数据。这就是一个完整意义上的Pipeline。
图6-6 Spark ML Pipeline模型
下面用代码实现整个Pipeline,如下:
package com.spark.examples.mllib
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object PipelineExample{
def main(args: Array[String ]): Unit = {
val spark = SparkSession
.builder
.master("local[2]")
.appName("PipelineExample")
.getOrCreate()
import spark.implicits._
// 准备训练数据,其中最后一列就是该文档的标签,即是否为垃圾邮件
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// 配置整个Pipeline,由3个组件组成:tokenizer(Transformer)、hashingTF(Transformer)
// 和 lr(Estimator)
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// 拟合模型,得到结果
val model = pipeline.fit(training)
// 将模型持久化
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// 将Pipeline持久化
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// 加载模型
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// 准备无标签的测试集
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// 用模型预测测试集,得到预测结果(标签)
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
}
}
这样就用Spark完整实现了一个机器学习的流程。由上面的代码可以看出,这样的结构非常有利于复用Transformer与Estimator组件。
Spark MLlib(ML API)的算法包主要分为以下几个部分:
- 特征抽取、转换与选择;
- 分类和回归;
- 聚类;
- 协同过滤;
- 频繁项集挖掘。
其中每一类都有若干种算法的实现,用户可以利用Pipeline按需进行切换,下面将就这几个类别,分别实现一些真实数据的案例,让读者可以直接上手应用。
此外,在上面代码中,我们用Pipeline API将模型序列化成文件,这样的好处在于可以将模型看成一个黑盒,非常方便模型上线,而不用在上线应用时再去对模型进行硬编码,这将在6.8.3节中有所体现,类似于Python的Pickle库的用法。
6.3 数据预处理
在机器学习实践中,数据科学家拿到的数据通常是不尽如人意的,例如存在大量的缺失值、特征的值是不同的量纲、有一些无关的特征、特征的值需要再次处理等情况,这样的数据无法直接训练,因此我们需要对这些数据进行预处理。预处理在机器学习中是非常重要的步骤,如果没有按照正确的方法对数据进行预处理,往往会得到错误的训练结果。下面介绍几种常见的预处理方法。
6.3.1 数据标准化
通常,我们直接获得的数据包含了量纲,也就是单位,例如身高180 cm,体重75 kg,对于某些算法来说,如果特征的单位不统一,就无法直接进行计算,因此在很多情况下的预处理过程中,数据标准化是必不可少的。数据标准化的方法一般有z分数法、最大最小法等。
1.z分数法
这种方法根据原始数据(特征)的均值(mean)和标准差(standard deviation)进行数据的标准化,将原始数据转换为z分数,转换函数如下:
{:—}其中μ为所有样本数据的均值,σ为所有样本数据的标准差。Spark MLlib内置了z分数标准化转换功能的Transformer实现类StandardScaler,代码如下:
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// 计算汇总统计量,生成ScalerModel
val scalerModel = scaler.fit(dataFrame)
// 对特征进行标准化
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
2.最大最小法
这种方法也称为离差标准化,是对原始数据的线性变换,使结果值映射到 [0 - 1] 之间。转换函数如下:
其中max为样本数据的最大值,min为样本数据的最小值。这种方法的缺陷是当有新数据加入时,可能导致max和min的变化,需要重新定义,但这种情况在训练过程中很少见。这种方法对于方差特别小的特征可以增强其稳定性。Spark MLlib内置了最大最小转换功能的Transformer实现类MinMaxScaler。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// 计算汇总统计量,生成MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
3.p范数法
p范数法指的是通过计算样本的p范数,用该样本除以该样本的p范数,得到的值就是标准化的结果。p范数的计算公式如下:
当p = 1时,p范数也叫L1范数,此时L1等于样本的所有特征值的绝对值相加。当p = 2时也叫L2范数,此时L2等于样本x距离向量空间的原点的欧氏距离:
{:—}归一化的结果为:
Spark MLlib内置了p范数标准化转换功能的Transformer实现类Normalizer,其代码如下:
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// 设置p = 1
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
// 设置p = -∞
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
6.3.2 缺失值处理
对于缺失值的处理,需要根据数据的具体情况,比如如果特征值是连续型,通常用中位数来填充;如果特征值是标签型,通常用众数来补齐;某些情况下,还可以用一个显著区别于已有样本中该特征的值来补齐。Spark并没有提供预置的缺失值处理的Transformer,这通常需要自己实现,在后面的例子中,我们会实现一个对缺失值处理的自定义Transformer。
6.3.3 特征抽取
特征抽取(feature extraction)是从原有特征中按照某种映射关系生成原有特征的一个特征子集,而6.3.4节中将提到的特征选择(feature selection),是根据某种规则对原有特征筛选出一个特征子集。特征选择和特征抽取有相同之处,它们都试图减少数据集中的特征数目,但具体方法不同,特征抽取的方法主要是通过特征间的关系,如组合不同特征得到新的特征,这样就改变了原来的特征空间;而特征选择的方法是从原始特征数据集中选择出子集,是一种包含关系,没有更改原始的特征空间,如图6-7所示。
图6-7 特征抽取与特征选择
Spark 也提供了很多种特征抽取的方法,常见的如主成分分析、广泛应用于文本的Word2Vector等。
1.主成分分析
如果在样本中特征与特征互相关联,无关的特征太多,就会影响模式的发现。我们需要用降维技术从样本中生成用来代表原有特征的一个特征子集。
在了解主成分分析(PCA)之前,需要先了解协方差的概念,X特征与Y特征之间的协方差为:
如果协方差为正,说明X和Y是正相关关系。协方差为负说明是负相关关系。协方差为0时,X和Y相互独立。如果样本集D有n维特征,那么两两之间的协方差可以组成一个n×n的矩阵,如下是n = 3的情况:
然后需要对这个矩阵进行特征值分解,得到特征值和特征向量,再取出最大的m(m < n)个特征值对应的特征向量(w1, w2,…, wm),组成特征向量矩阵W,对每个样本xi执行如下操作,得到降维后的样本zi,如下:
则降维后的数据集为:
下面以数据挖掘领域著名的鸢尾花数据集(IRIS)来用PCA实现降维操作,鸢尾花数据集是常用的分类数据集,包含150个样本,分3种类别,各50条,每个样本4个维度,分别是花萼长度、花萼宽度、花瓣长度和花瓣宽度,代码如下:
package com.spark.examples.mllib
import org.apache.spark.ml.feature.{PCA, VectorAssembler}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.ml.Pipeline
object IRISPCA {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("IRISPCA")
.getOrCreate()
// 数据结构为花萼长度、花萼宽度、花瓣长度、花瓣宽度
val fields = Array("id","Species","SepalLength","SepalWidth","PetalLength","PetalWidth")
val fieldsType = fields.map(
r => if (r == "id"||r == "Species")
{StructField(r, StringType)}
else
{StructField(r, DoubleType)}
)
val schema = StructType(fieldsType)
val featureCols = Array("SepalLength","SepalWidth","PetalLength","PetalWidth")
val data=spark.read.schema(schema).csv("data/iris")
val vectorAssembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
val vectorData=vectorAssembler.transform(data)
// 特征标准化
val standardScaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithMean(true)
.setWithStd(false)
.fit(vectorData)
val pca = new PCA()
.setInputCol("scaledFeatures")
.setOutputCol("pcaFeatures")
// 主成分个数,也就是降维后的维数
.setK(2)
val pipeline = new Pipeline()
.setStages(Array(vectorAssembler,standardScaler,pca))
val model = pca.fit(data)
// 对特征进行PCA降维
model.transform(data).select("Species", "pcaFeatures").show(100, false)
}
}
降维后的数据只有两个维度,如下:
2.Word2Vector
在自然语言处理领域,训练集通常为纯文本,这样的数据是无法直接训练的,前面提到的TF-IDF就是一种生成词向量的方式。但是TF-IDF的缺点在于单纯以“词频”衡量一个词的重要性,不够全面,忽略了上下文信息,例如“阿里巴巴成立达摩院”与“人工智能应用有望加速落地”字面无任何相似之处,但它们之间有很强的关联,这用TF-IDF却无法体现。Word2Vec最先出现在谷歌公司在2013年发表的论文“Efficient Estimation of Word Representation in Vector Space”中,作者是Mikolov。Word2Vec的基本思想是采用一个3层的神经网络将每个词映射成n维的实数向量,为接下来的聚类或者比较相似性等操作做准备。这个3层神经网络实际是在对语言模型进行建模,但在建模的同时也获得了单词在向量空间上的表示,即词向量,也就是说这个词向量是建模过程的中间产物,而这个中间产物才是Word2Vec的真正目标。
Word2Vec采用了两种语言模型:CBOW与Skip-gram,前者是根据上下文预测下一个词,后者是根据当前词预测上下文。以CBOW为例,如图6-8所示。
图6-8 CBOW语言模型
我们选择一个固定的窗口作为语境(上下文):t − 2 — t + 2,在输入层是4个n维的词向量(初始为随机值),隐藏层做的操作是累计求和操作,隐藏层包含n个结点,输出层是一棵巨大的二叉树,构建这棵二叉树的算法就是霍夫曼树,它的叶子结点代表了语料中的M个词语,语料中有多少个词,就有多少个叶子结点。假设左子树为1,右子树为0,这样每个叶子结点都有一个唯一的编码。最后输出的时候,CBOW采用了层次Softmax算法,隐藏层的每个结点都与树的每个结点相连,则霍夫曼树上的每个结点都会有M条边,每条边都有权重,对于输入的上下文,我们需要预测的是在上下文一定的情况下,使得预测词w的概率最大,以010110为例,说明霍夫曼树有5层,我们希望在根结点,词向量与根结点相连,也就是第一层,经过回归运算得到第一位等于0的概率尽量等于1,依次类推,在第二层,希望第二位的值等于1的概率尽可能等于 1。这样一直下去,路径上所有的权重乘积就是预测词在当前上下文的概率P(wt),而在语料中我们可以得到当前上下文的残差1 − P(wt),这样的话,就可以使用梯度下降来学习参数了。
由于不需要标注,Word2Vec本质上是一种无监督学习,对自然语言处理有兴趣的读者不妨仔细阅读谷歌公司的那篇论文。
Spark MLlib内置了Word2Vec算法的Transformer实现类Word2Vec,代码如下:
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// 每一行输入数据都是来源于某句话或是某个文档
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// 设置word2Vec参数
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
6.3.4 特征选择
特征选择的目标通常是提高预测准确性、提升训练性能、能够更好地解释模型。特征选择的一种很重要思想就是对每一维的特征打分,这样就能选出最重要的特征了,基于这种思想的方法有:卡方检验、信息增益以及相关系数。相关系数主要量化的是任意两个特征是否存在线性相关,信息增益会在下一节介绍,本小节主要介绍卡方检验。
卡方检验是以卡方分布为基础的一种常用假设检验方法,它的原假设是观察频数与期望频数没有差别。该检验的基本思想是:首先假设原假设成立,基于此前提计算出值,它表示观察值与理论值之间的偏离程度。根据卡方分布及自由度可以确定在原假设成立的条件下获得当前统计量及更极端情况的概率P。如果P值很小,说明观察值与理论值偏离程度太大,应当拒绝无效假设,表示比较资料之间有显著差异;否则就不能拒绝无效假设,尚不能认为样本所代表的实际情况和理论假设有差别。
假设样本的某一个特征,它的取值为A和B两个组,而样本的类别有0和1两类。对样本进行统计,可以得到表6-1所示的统计表。
表6-1 样本特征统计表
组别 | 0 | 1 | 合计 |
---|---|---|---|
A | 19 | 24 | 43 |
B | 34 | 10 | 44 |
合计 | 53 | 34 | 87 |
从表6-1中,我们可看出A和B组对分类结果有很大的影响,但这不排除抽样的影响,首先假设该特征有结果是独立无关的,随机取一个样本,属于类0的概率为(19 + 34) / (19 + 34 + 24 + 10) = 60.9%。接下来,我们需要根据表6-1得到一个理论值表,如表6-2所示。
表6-2 理论值表
组别 | 0 | 1 | 合计 |
---|---|---|---|
A | 43 × 0.609 = 26.2 | 43 × 0.391 = 16.8 | 43 |
B | 44 × 0.609 = 26.8 | 44 × 0.391 = 17.2 | 44 |
如果两个变量是独立无关的,那么表6-2中的理论值与实际值的差别会非常小。
χ 2值的计算公式为:
{:—}其中A为实际值,也就是表6-1中的4个数据,T为理论值,也就是表6-2中给出的4个数据,计算得到χ 2值为10.01。得到该值后,需要在给定的置信水平下查得卡方分布临界值,如表6-3所示。
表6-3 给定置信水平下的卡方统计量的值
显然10.01 > 7.88,也就是说该特征与分类结果无关的概率小于0.5%,换言之,我们应该保留这个特征。
Spark MLlib内置了卡方检验组件ChiSqSelector,代码如下:
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
// 配置卡方检验参数
val selector = new ChiSqSelector()
.setNumTopFeatures(3)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
6.4 分类算法应用
分类器是机器学习最常见的应用,MLlib中也内置了许多分类模型,本节将会介绍决策树和随机森林,以及用Spark MLlib的随机森林分类器实现根据身体监控数据判断人体状态的实例。
6.4.1 决策树
决策树是一种机器学习的方法,本质上是通过一种树形结构对样本进行分类,每个非叶子结点是一次判断,每个叶子结点代表了分类结果。决策树是一种典型的监督学习,需要一定量的样本来学习的一个树形结构,常见的决策树构造树算法有C4.5与ID3。
下面先来看一个例子,是信贷审批常见的场景:根据信息判断客户是否会逾期,表6-4所示是一些样本,一共有4个特征,以及最后的分类结果,为了简单这是一个二分类场景。
表6-4 信贷审批样本
客户编号 | 年龄 | 负债率 | 过去3个月逾期次数 | 月收入 | 分类:是否逾期 |
---|---|---|---|---|---|
1 | 24 | 15% | 0 | 4000 | 否 |
2 | 25 | 12% | 0 | 6000 | 否 |
3 | 25 | 20% | 0 | 2000 | 否 |
4 | 24 | 7% | 0 | 4000 | 否 |
5 | 27 | 70% | 0 | 8000 | 是 |
6 | 58 | 50% | 0 | 15000 | 是 |
7 | 40 | 45% | 1 | 20000 | 是 |
8 | 20 | 12% | 0 | 3000 | 否 |
9 | 35 | 30% | 0 | 4000 | 否 |
10 | 23 | 33% | 1 | 3000 | 是 |
根据这些样本,我们可以构造出一棵这样的树,如图6-9所示。
图6-9 构建决策树
构造一棵决策树需要从样本中学习结点分裂的时机,以及判断阈值,图6-9所示的A、B、C、D、E,也就是特征选择。决策树构造算法通常是先选择一个最优特征,将样本分为若干个子集,如果子集已经被正确分类,那么就构造叶子结点,将样本分到叶子结点中。如果某个子集没有被正确分类,那么就对这个子集继续选择特征,决策树构造是一个递归过程,递归停止条件就是所有样本被基本正确分类,这样整个过程就构造出了一棵决策树。从这个过程可以看出,决策树通常对训练数据有良好的表现,但对新样本却未必如此,容易出现过拟合。因此我们需要对已经生成的树进行剪枝,从而提升模型的泛化能力。如果特征过多,在一开始构造的时候,我们也会对特征进行选择,只留下足够有区分度的特征。决策树的生成对应于模型的局部选择,而剪枝对应于模型的全局选择。
从上面可以看出,构造决策树主要包含特征选择、决策树生成与决策树剪枝。
1.特征选择
本节主要介绍两种特征选择的方式:信息增益与信息增益率。在介绍这两种方式前,先来看看熵的概念:熵是表示随机变量的不确定性的度量。
定义 假设随机变量X的可能取值有x1, x2, …, xn,对于每一个可能的取值xi,其概率 P(X = xi) = pi ( i = 1, 2, …, n),则随机变量X的熵:
对样本集合D来说,随机变量X是样本的类别,即,假设样本有k个类别,每个类别的概率是,其中
表示类别k的样本个数,
表示样本总数,则对样本集合D来说,熵(经验熵)为:
而条件熵的概念为:
设有随机变量(X, Y ),其联合概率分布为:
{:—}条件熵H(Y|X)表示在已知随机变量X的条件下随机变量Y的不确定性。随机变量X给定的条件下随机变量Y的条件熵H(Y|X),定义为X给定条件下Y的条件概率分布的熵对X的数学期望:
当熵和条件熵中的概率由数据估计得到时,所对应的熵与条件熵分别称为经验熵与经验条件熵。从经验熵与经验条件熵可以得到信息增益的定义:
特征A对训练数据集D的信息增益g(D, A),定义为集合D的经验熵H(D)与特征A给定条件下D的经验熵与条件熵之差:
信息增益通常用来选择特征,经验熵H(D)表示的是对数据集D进行分类的不确定性。而经验条件熵H(D|A)表示在特征A给定的条件下对数据集D进行分类的不确定性,那么信息增益就表示由于特征A而使得对数据集D的分类的不确定性减少程度。显然,对数据集D而言,信息增益依赖于特征,不同特征往往具有不同的信息增益。信息增益大的特征具有更强的分类能力。根据信息增益准则的特征选择方法是:对训练数据集(或子集)D,计算其每个特征的信息增益,并选择信息增益最大的特征。
信息增益率是对信息增益的改进,特征A对训练数据集D的信息增益率gR(D, A)定义为其信息增益g(D, A)与训练数据集D的经验熵H(D)之比,如下:
除了信息增益与信息增益率之外,还可以用基尼系数来完成特征选择。
2.决策树生成
决策树生成算法与特征选择方法相对应,选用信息增益进行特征选择的是ID3算法,选择信息增益比进行特征选择的是 C4.5 算法,选择基尼系数来完成特征选择的是分类回归树(CART)算法。本节将介绍C4.5算法与ID3算法。
ID3算法如下:
(在决策树各个结点上应用信息增益准则选择特征,递归地构建决策树。)
给定训练数据集D,特征集S,阈值ϵ:
(1)若D中所有实例属于同一类Ck,则T为单结点树,并将类Ck作为该结点的类标记,返回T;
(2)若S =ø,则T为单结点树,并将D中实例数最大的类Ck作为该结点的类标记,返回T;
(3)否则,计算S中各特征对D的信息增益,选择信息增益最大的特征Sg;
(4)如果Sg的信息增益小于阈值ϵ,则置T为单结点树,并将D中实例数最大的类Ck作为该结点的类标记,返回T;
(5)否则,对Sg的每一个可能值ai,将D分割为若干个非空子集Di,将Di中实例数最大的类作为标记,构建子结点,由结点及其子结点构成树T,返回T;
(6)对第i个子结点,以Di为训练集,以S − Sg为特征集,递归调用第1~5步,得到字数Ti,返回Ti。
C4.5算法与ID3算法非常类似,只是在用到信息增益的地方换成了信息增益比。
3.剪枝
通常决策树在训练数据上表现很好,但是在测试数据上就不尽如人意,这就是模型过拟合。决策树剪枝主要分为预剪枝和后剪枝。预剪枝是在构造决策树的同时进行剪枝,通常作为停止条件,即设定一个熵的阈值,就算可以继续降低熵,也停止创建分支。而通常我们说的剪枝是指的后剪枝。后剪枝通常有以下两种做法。
- 应用交叉验证的思想,若局部剪枝能够使得模型在测试集上的错误率降低,则进行局部剪枝。
- 应用正则化的思想,综合考虑不确定性和模型复杂度来定出一个新的损失,用该损失作为一个结点是否应该局部剪枝的标准,这种做法的核心是定义新的代价函数,通常会采用树的结构复杂度与模型预测误差之和作为代价衡量。
例如,在ID3、C4.5中我们会应用前者做法,而在分类回归树中,我们会采取后者做法。
6.4.2 随机森林
在决策树的基础上,了解随机森林的原理相对容易。随机森林就是通过集成学习的思想将多棵树集成的一种算法,它的基本单元是决策树,而它本质上属于机器学习的一大分支——集成学习(ensemble learning)方法。集成学习是通过构建多个弱分类器,并按一定规则组合起来的分类系统,常常比单一分类器具有显著优越的泛化性能,常见集成学习算法有随机森林、AdaBoost、XgBoost、梯度提升树等,在风险建模、疾病预测等领域应用相当广泛。
随机森林将N棵决策树集成,每一棵决策树都是一个分类器,相当于每个分类器对结果进行投票,随机森林会综合所有分类结果并将票数最高的分类结果作为最后结果输出。那么可以想到,在随机森林中,生成每一棵决策树是算法的关键。每棵树的生成规则如下。
(1)如果训练集大小为N,对于每棵树而言,随机且有放回地从训练集中抽取N个训练样本(这种采样方式称为bootstrap sample方法),作为该树的训练集。
(2)如果每个样本的特征维度为M,指定一个常数m<<M,随机地从M个特征中选取m个特征子集,每次树进行分裂时,从这m个特征中选择最优的。
(3)每棵树都尽最大限度地生长,并且没有剪枝过程。
随机森林中所谓随机的含义,就是模型在这里引入了随机性(随机抽取训练集、随机抽取特征),两个随机性的引入对随机森林的分类性能至关重要。由于它们的引入,使得随机森林不容易陷入过拟合,并且具有很好的抗噪能力。随机森林的特点有:
- 在当前所有算法中,具有极佳的准确率,在国内外最近几年的数据挖掘大赛中,随机森林取得了令人瞩目的成绩;
- 能够高效地运行在大数据集上,很容易可以看出,随机森林是非常容易分布式的;
- 能够处理具有高维特征的输入样本;
- 能够评估各个特征在分类问题上的重要性;
- 在生成过程中,能够获取到内部生成误差的一种无偏估计;
- 对于缺省值问题也能够获得很好的结果。
6.4.3 人体状态监测器
本节将用真实的数据拟合出一个随机森林分类器,通过身体监测数据来判断人体状态,如走路、骑行、跑步、看电视,数据集包括时间戳、心跳、活动标签和3个传感器监测人体活动的监测数据,传感器分别佩戴在手上、胸部、踝关节处,每个传感器有17个检测指标(温度、3D加速度、陀螺仪和磁强计数据、方位数据)。数据集共计54个属性,3 850 505个样本,包含了18种人体活动。代码如下:
package com.spark.examples.mllib
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler}
import org.apache.spark.sql.types._
import scala.collection.mutable
object RandomForestBodyDetection {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RandomForestBodyDetection")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
// 读取数据集
val dataFiles = spark.read.textFile("data/bodydetect")
val rawData = dataFiles.map(r=>r.toString().split(" ")).rdd.map(row => {
val list = mutable.ArrayBuffer[Any]()
for (i <- row.toSeq) {
list.append(i)
}
Row.fromSeq(list.map(v=>if (v.toString.toUpperCase == "NAN") Double.NaN else v.toString.toDouble))
})
val schema = StructType(Array(
StructField("timestamp", DoubleType), StructField("activityId", DoubleType), StructField("hr", DoubleType),
StructField("hand_temp", DoubleType), StructField("hand_accel1X", DoubleType), StructField("hand_accel1Y", DoubleType),
StructField("hand_accel1Z", DoubleType), StructField("hand_accel2X", DoubleType), StructField("hand_accel2Y", DoubleType),
StructField("hand_accel2Z", DoubleType), StructField("hand_gyroX", DoubleType), StructField("hand_gyroY", DoubleType),
StructField("hand_gyroZ", DoubleType), StructField("hand_magnetX", DoubleType), StructField("hand_magnetY", DoubleType),
StructField("hand_magnetZ", DoubleType), StructField("hand_orientX", DoubleType), StructField("hand_orientY", DoubleType),
StructField("hand_orientZ", DoubleType), StructField("hand_orientD", DoubleType), StructField("chest_temp", DoubleType),
StructField("chest_accel1X", DoubleType), StructField("chest_accel1Y", DoubleType), StructField("chest_accel1Z", DoubleType),
StructField("chest_accel2X", DoubleType), StructField("chest_accel2Y", DoubleType), StructField("chest_accel2Z", DoubleType),
StructField("chest_gyroX", DoubleType), StructField("chest_gyroY", DoubleType), StructField("chest_gyroZ", DoubleType),
StructField("chest_magnetX", DoubleType), StructField("chest_magnetY", DoubleType), StructField("chest_magnetZ", DoubleType),
StructField("chest_orientX", DoubleType), StructField("chest_orientY", DoubleType), StructField("chest_orientZ", DoubleType),
StructField("chest_orientD", DoubleType), StructField("ankle_temp", DoubleType), StructField("ankle_accel1X", DoubleType),
StructField("ankle_accel1Y", DoubleType), StructField("ankle_accel1Z", DoubleType), StructField("ankle_accel2X", DoubleType),
StructField("ankle_accel2Y", DoubleType), StructField("ankle_accel2Z", DoubleType), StructField("ankle_gyroX", DoubleType),
StructField("ankle_gyroY", DoubleType), StructField("ankle_gyroZ", DoubleType), StructField("ankle_magnetX", DoubleType),
StructField("ankle_magnetY", DoubleType), StructField("ankle_magnetZ", DoubleType), StructField("ankle_orientX", DoubleType),
StructField("ankle_orientY", DoubleType), StructField("ankle_orientZ", DoubleType), StructField("ankle_orientD", DoubleType)))
val df = spark.createDataFrame(rawData,schema)
// 数据集的列名,sensor_name 表示某个传感器的某个指标数据,例如,手上的传感器的温度指标为 hand_temp
val allColumnNames = Array(
"timestamp", "activityId", "hr") ++ Array(
"hand", "chest", "ankle").flatMap(sensor =>
Array(
"temp",
"accel1X", "accel1Y", "accel1Z",
"accel2X", "accel2Y", "accel2Z",
"gyroX", "gyroY", "gyroZ",
"magnetX", "magnetY", "magnetZ",
"orientX", "orientY", "orientZ", "orientD").map(name => s"${sensor}_${name}")
)
// 数据集中不需要的列、时间戳和方位数据,分别表示手、胸部、踝关节上传感器的第一个方位指标
val ignoredColumns = Array(0, 16, 17, 18, 19, 33, 34, 35, 36, 50, 51, 52, 53)
val inputColNames = ignoredColumns.map(l => allColumnNames(l))
val columnNames = allColumnNames.
filter { !inputColNames.contains(_) }
// 滤掉不需要的列,并填充缺失值
val typeTransformer = new FillMissingValueTranformer().setInputCols(inputColNames)
// 构造标签列
val labelIndexer = new StringIndexer()
.setInputCol("activityId")
.setOutputCol("indexedLabel")
.fit(df)
// 构造特征列
val vectorAssembler = new VectorAssembler()
.setInputCols(columnNames)
.setOutputCol("featureVector")
// 配置分类器
val rfClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featureVector")
.setFeatureSubsetStrategy("auto")
.setNumTrees(350)
.setMaxBins(30)
.setMaxDepth(30)
.setImpurity("entropy")
.setCacheNodeIds(true)
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2))
// 构建整个Pipeline
val pipeline = new Pipeline().setStages(
Array(typeTransformer,
labelIndexer,
vectorAssembler,
rfClassifier,
labelConverter))
val model = pipeline.fit(trainingData)
val predictionResultDF = model.transform(testData)
// 展示结果
predictionResultDF.select(
"hr", "hand_temp", "hand_accel1X", "hand_accel1Y", "hand_accel1Z", "hand_accel2X", "hand_accel2Y", "hand_accel2Z", "hand_gyroX", "hand_gyroY", "hand_gyroZ", "hand_magnetX", "hand_magnetY", "hand_magnetZ", "chest_temp", "chest_accel1X", "chest_accel1Y", "chest_accel1Z", "chest_accel2X", "chest_accel2Y", "chest_accel2Z", "chest_gyroX", "chest_gyroY", "chest_gyroZ","chest_magnetX", "chest_magnetY", "chest_magnetZ", "ankle_temp", "ankle_accel1X", "ankle_accel1Y", "ankle_accel1Z", "ankle_accel2X","ankle_accel2Y", "ankle_accel2Z", "ankle_gyroX", "ankle_gyroY", "ankle_gyroZ", "ankle_magnetX", "ankle_magnetY", "ankle_magnetZ", "indexedLabel", "predictedLabel")
.show(20)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val predictionAccuracy = evaluator.evaluate(predictionResultDF)
// 模型性能
println("Testing Error = " + (1.0 - predictionAccuracy))
val randomForestModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println("Trained Random Forest Model is:\n" + randomForestModel.toDebugString)
}
}
在处理流程中,用到了自定义的 Transformer 过滤掉了不需要的数据并填充了缺失值对数据进行了预处理,自定义的Transformer代码如下:
package com.spark.examples.mllib
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types.{BooleanType, NumericType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
// 继承基类Transformer
class FillMissingValueTranformer extends Transformer
{
val uid: String = Identifiable.randomUID("MissingValueTransformer")
final val inputCols = new Param[Array[String]](this, "inputCol", "The input column")
override def transformSchema(schema: StructType): StructType = {
// 检查输入和输出是否符合要求,比如数据类型
// 返回处理之后的schema
val inputColNames = $(inputCols)
val incorrectColumns = inputColNames.flatMap { name =>
schema(name).dataType match {
case _: NumericType | BooleanType => None
case other => Some(s"Data type $other of column $name is not supported.")
}
}
if (incorrectColumns.nonEmpty) {
throw new IllegalArgumentException(incorrectColumns.mkString("\n"))
}
StructType(schema.fields)
}
def setInputCols(value: Array[String]): this.type = set(inputCols, value)
override def transform(dataset: Dataset[_]): DataFrame = {
val inputColNames = $(inputCols)
var rawdata=dataset
for (i<-inputColNames) {rawdata=rawdata.drop(i)}
val allColumnNames = dataset.columns
// 过滤掉不需要的列名
val columnNames = allColumnNames.filter { !inputColNames.contains(_) }
// 心率的空值填充为60,其他属性的空值填充为0
var imputedValues:Map[String,Double]=Map()
for (colname<-columnNames){
if(colname=="hr"){imputedValues += (colname->60.0)}
else {imputedValues += (colname->0.0)}
}
val processdata=rawdata.na.drop(26,columnNames).na.fill(imputedValues)
processdata.toDF()
}
override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
}
6.4.4 集成学习
随机森林是由一系列决策树组成的分类器,效果比单个决策树分类器要好,这就是集成学习的思想,严格意义上来说,集成学习并不算是一种分类算法器,而是一种组合分类器的方法。在金融领域的模型工程中,集成学习远比常规的机器学习和深度学习应用得广泛。集成学习组合分类器的方法主要有两种,即Bagging和Boosting。
Bagging这种方式又称套袋法,它指的是根据均匀概率分布从数据中有放回抽样的技术,在每个抽样生成的样本集上训练一个基分类器,最后所有分类器再进行投票,得票最多的类即分类结果,如图6-10所示,这种方法的代表就是上面介绍的随机森林算法。
图6-10 Bagging集成学习
Boosting这种方式又称提升法,Boosting主要的思路是从初始训练集得到基学习器,根据其表现,对训练样本进行调整,使得之前分类错误的样本在后续训练过程中更受关注,用改变后的样本学习下一个分类器。重复学习T个分类器,T个分类器的结果加权得到最终结果。如图6-11所示,提升法的代表是AdaBoost、梯度提升决策树(GBDT)等。
图6-11 Boosting集成学习
可以看到,Bagging和Boosting的不同在于Bagging的训练集是随机的,各训练集是独立的,而Boosting的训练集不是独立的,每一次选择的训练集都依赖于上一次的学习结果,Bagging的基分类器没有权重,Boosting是根据每一次的训练误差得到该基分类器的权重。另外,Bagging能够很好地并行执行,而Boosting则只能在基分类器这个层次串行执行。
6.4.5 梯度提升决策树
Spark除了随机森林还实现了梯度提升决策树(Gradient Boosting Decision Tree,GBDT)算法,GBDT也是近年来在各大竞赛中无往不利的经典算法,也被认为是机器学习中性能最好的方法之一。
与随机森林采用普通决策树作为基分类器不同,GBDT采用分类回归树(Classification and Regression Tree,CART)作为基分类器。分类回归树对于分类问题就是二叉分类树,对回归问题来说就是回归二叉树,与决策树类似,CART也分为特征选择、生成与剪枝。它们之间最大的不同在于决策树采用信息增益与信息增益率来生成决策树,而CART采用基尼指数(分类问题)与平方误差(回归问题)进行特征选择来生成二叉树。GBDT核心在于每一棵树用梯度下降学习之前所有树预测结果和的残差。在前面的例子中应用GBDT非常简单,只需初始化好分类器,并将其加入Pipeline并替换rfCbassifier即可:
val gbdt = new GBTClassifier()
.setLabelCol("labelidx")
.setFeaturesCol("featureVector")
.setMaxDepth(10)
.setMaxBins(25)
.setMaxIter(100)
.setStepSize(0.2)
.setSubsamplingRate(0.65)
.setFeatureSubsetStrategy("auto")
目前应用得最多的XgBoost和微软公司新开源的LightGBM都是对GBDT的改进,其中XgBoost提供了Spark版的实现。
以上内容摘选自《Spark海量数据处理:技术详解与平台实战》
本书基于Spark发行版2.4,循序渐进,主要分为基本理论、应用实践和总结。本书主要有下面3个特点。
突出实践。本书第一部分包含20余个Spark应用的例子,绝大部分带有真实数据集,供读者实践;本书第二部分是一个选自生产环境的完整的真实案例,并针对本书做了相应的优化与简化。
层次分明,循序渐进。本书针对学习曲线进行了优化,对于应用型内容,主要突出使用方法与实践案例;对于原理与关键问题,会深入讲解,甚至少部分还会涉及源码解读与相关论文解析。
技术版本新。Spark 2.0是Spark一个非常重要的版本,在设计理念与使用方式上都与以前版本有较大不同,本书完稿前的新版本2.4.4,包含了这一版本的新特性,并根据社区进展对Spark 3.0的相关特性进行了讨论与展望。