文章来源:https://github.com/ColZer/DigAndBuried/blob/master/spark/mllib-pipeline.md
以下为原文:
在2014年11月,他就在Spark MLLib代码中CI了一个全新的package:”org.apache.spark.ml”, 和传统的”org.apache.spark.mllib”独立, 这个包即Spark MLLib的
Pipeline and Parameters
pipeline即机器学习流水线, 在实际的机器学习应用过程中,一个任务(Job)由很多过程(Stage)组成, 比如日志的收集,清理,到字段/属性/feature的提取, 多算法的组合而成的模型,
模型的离线训练和评估,到最后的模型在线服务和在线评估,所进行的每一个stage都可以概况为数据到数据的转换以及数据到模型的转换.
后面我们会看到,mllib pipeline中的stage也做了相应的划分.
Parameters即参数化,机器学习大部分的过程是一个参数调优的过程,一个模型应该很显示的告诉使用者,模型有那些参数可以进行设置,以及这些参数的默认值是怎么样的;在现有的机器学习算法中,
模型的参数可能是以模型类字段或通过一些函数进行设置,不是很直观的展现当前模型有哪些可以调优的参数;
针对这个问题,在这个版本中,提出了Parameters功能,通过trait的方式显式地指定stage拥有那些参数.
下面我们就针对这两个部分进行分析, pipeline到我今天学习为止,还只有三个PR,很多东西应该还在实验中,这里做一个分析仅供学习和备忘.
##一、Parameters
1 | class LogisticRegressionModel ( |
上面是传统的org.apache.spark.mllib包中一个分类器:LogisticRegressionModel, 如果理解logistics分类器,那么我知道其中的threshold为模型一个很重要的参数.
但是从对于一般的用户来说,我们只知道这个模型类中有一个threshold字段,并不能很清楚了该字段是否是模型可调优的参数,还是只是类的一个”全局变量”而已;
针对这个问题, 就有了Parameters参数化的概念,先直接看结果:
1 | class LogisticRegressionModel private[ml] ( |
我们看到这里的LogisticRegressionModel实现了LogisticRegressionParams,而LogisticRegressionParams继承了Params类,并且”拥有”一组Param,即HasMaxIter, HasRegParam之类.
相比传统的LogisticRegressionModel, 这个版本我们可以清楚的看到,该模型有RegParam, MaxIter等7个可控参数,其中包括我们上面谈到的threshold参数, 即HasThreshold.
即Parameters 将mllib中的组件参数进行标准化和可视化,下面我们继续针对Parameters进行分析.
1 | class Param[T] (val parent: Params,val name: String,val doc: String, |
Param表示一个参数,从概念上来说,一个参数有下面几个属性:
- param的类型:即上面的[T], 它表示param值是何种类型
- param的名称:即name
- param的描述信息,和name相比, 它可以更长更详细, 即doc
- param的默认值, 即defaultValue
针对param的类型,ml提供了一组默认的子类, 如IntParam,FloatParam之类的.就不详细展开
另外针对Param, 提供了接口来设置Param的值
1 | def w(value: T): ParamPair[T] = this -> value |
即将Param和value封装为paramPair类, paramPair是一个case类,没有其他的方法, 仅仅封装了Param和Value对. 因此我们可以通过param->value来创建一个ParamPair, 后面我们会看到怎么对ParamPair
进行管理.
上面谈到Param其中一个参数我们没有进行描述, 即”parent: Params”. 如果站在模型, 特征提取器等组件角度来, 它们应该是一个param的容器,它们的逻辑代码可以使用自身的容器中的param.
换句话说,如果一个组件继承自Params类,那么该组件就是一个被参数化的组件.
参考上面的LogisticRegressionModel, 它继承了LogisticRegressionParams, 而LogisticRegressionParams实现了Params, 此时LogisticRegressionModel就是一个被参数化的模型. 即自身是一个param容器
现在问题来了, 一个组件继承自Params, 然而我们没有通过add等方式来将相应的param添加到这个容器里面, 而是通过”with HasRegParam”方式来进行设置的,到底是怎么实现的呢?看一个rf现
1 | private[ml] trait HasRegParam extends Params { |
我们看到每个具体的RegParam都是继承自Params, 这个继承感觉意义不大,所有这里就不纠结它的继承机制, 核心是它的val regParam: DoubleParam
常量的定义,这里的常量会被编译为一个函数,
其中函数为public, 返回值为DoubleParam, 参数为空. 为什么要强调这个呢?这是规范. 一个具体的Param只有这样的实现, 它才会被组件的Params容器捕获. 怎么捕获呢? 在Params中有这样一个代码:
1 | // 使用 java 反射的方法列出所有的(没有入参和返回值为Param)的方法 |
这里就清晰了,一个组件, 继承Params类, 成为一个可参数化的组件, 该组件就有一个params方法可以返回该组件所有配置信息,即 Array[Param[_]]. 因为我们组件使用With方式继承了符合上面规范的”常量定义”,
这些param会被这里的def params所捕获, 从而返回该组件所有的Params;
不过还有一个问题,我们一直在说, 组件继承Params类, 成为一个可参数化的组件,但是我这里def params只是返回 Array[Param[_]], 而一个属性应该有Param和value组成, 即上面我们谈到ParamPair,
因此Params类中应该还有一个容器,维护Array[ParamPair], 或者还有一个Map,维护param->value对.
没错,这就是ParamMap的功能, 它维护了维护param->value对.并且每个实现Params的组件都有一个paramMap字段. 如下:
1 | trait Params extends Identifiable with Serializable { |
具体的ParamMap实现这里就不分析, 就是一个Map的封装, 这里我们总结一下Parameters的功能:
Parameters即组件的可参数化, 一个组件,可以是模型,可以是特征选择器, 如果继承自Params, 那么它就是被参数化的组件, 将具体参数按照HasMaxIter类的规范进行定义, 然后通过With的
方式追加到组件中,从而表示该组件有相应的参数, 并通过调用Params中的getParam,set等方法来操作相应的param.
整体来说Parameters的功能就是组件参数标准化
二、Pipeline
模型训练是整个数据挖掘和机器学习的目标, 如果把整个过程看着为一个黑盒, 内部可以包含了很多很多的特征提取, 模型训练等子stage,
但是站在黑盒的外面, 整个黑盒的输出就是一个模型(Model), 我们目标就是训练出一个完美的模型, 然后再利于该模型去做服务.
这句话就是pipeline的核心, 首先pipeline是一个黑盒生成的过程, 它对外提供fit接口, 完成黑盒的训练, 生成一个黑盒模型, 即PipelineModel
如果站在白盒的角度来看, pipeline的黑盒中肯定维护了一组stage, 这些stage可以是原子的stage,也可能继续是一个黑盒模型, 在fit接口调用时候,
会按照流水线的顺序依次调用每个stage的fit/transform函数,最后输出PipelineModel.
2.1 PipelineStage
如上所言, mllib pipeline是基于Spark SQL中的schemeRDD来实现, pipeline中每一次处理操作都表现为对schemeRDD的scheme或数据进行处理, 这里的操作步骤被抽象为Stage, 即PipelineStage
1 | abstract class PipelineStage extends Serializable with Logging { |
抽象的PipelineStage的实现很简单, 只提供了transformSchema虚函数, 由具体的stage进行实现,从而在一定参数paramMap作用下,对scheme进行修改(transform).
上面我们也谈到, 具体的stage是在PipelineStage基础上划分为两大类, 即数据到数据的转换(transform)以及数据到模型的转换(fit).
- Transformer: 数据到数据的转换
- Estimator: 数据到模型的转换
2.1.1 Transformer
我们首先来看Transformer, 数据预处理, 特征选择与提取都表现为Transformer, 它对提供的SchemaRDD进行转换生成新的SchemaRDD, 如下所示:
1 | abstract class Transformer extends PipelineStage with Params { |
特殊的Transformer:模型:
在mllib中, 有一种特殊的Transformer, 即模型(Model), 下面我们会看到模型是Estimator stage的产物,但是model本身是一个Transformer,
模型是经过训练和挖掘以后的一个对象, 它的一个主要功能就是预测/推荐服务, 即它可以对传入的dataset:SchemaRDD进行预测, 填充dataset中残缺的决策字段或评分字段, 返回更新后的SchemaRDD
2.1.2 Estimator
Estimator stage的功能是模型的估计/训练, 即它是一个SchemaRDD到Model的fit过程. 如下所示fit接口.
1 | abstract class Estimator[M <: Model[M]] extends PipelineStage with Params { |
2.2 PipelineModel
PipelineModel是由一组Transformer组成, 在对dataset进行预测(transform)时, 是按照Transformer的有序性(Array)逐步的对dataset进行处理.
1 | class PipelineModel private[ml] ( |
2.3 Pipeline
Pipeline首先是一个Estimator, fit输出的模型为 PipelineModel, 其次Pipeline也继承Params类, 即被参数化,
其中有一个参数, 即stages, 它的值为Array[PipelineStage], 该参数存储了Pipeline拥有的所有的stage;
Pipeline提供了fit
和transformSchema
两个接口:
- transformSchema 接口: 使用foldLeft函数, 对schema进行转换.
- fit 接口: 遍历 stages列表。将 Estimator 类型的stage 转换为 model(model其实就是一种Transformer)
class Pipeline extends Estimator[PipelineModel] {
val stages: Param[Array[PipelineStage]] = new Param(this, "stages", "stages of the pipeline")
def setStages(value: Array[PipelineStage]): this.type = { set(stages, value); this }
def getStages: Array[PipelineStage] = get(stages)
override def fit(dataset: SchemaRDD, paramMap: ParamMap): PipelineModel = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val theStages = map(stages)
// 记录最后一个 Estimator 的位置
var indexOfLastEstimator = -1
theStages.view.zipWithIndex.foreach { case (stage, index) =>
stage match {
case _: Estimator[_] =>
indexOfLastEstimator = index
case _ =>
}
}
var curDataset = dataset
val transformers = ListBuffer.empty[Transformer]
//遍历所有的stages
theStages.view.zipWithIndex.foreach { case (stage, index) =>
// 遍历在 indexOfLastEstimator 之前的stage
if (index <= indexOfLastEstimator) {
val transformer = stage match {
// 若 stage 为 Estimator,则将 estimator.fit的结果返回
case estimator: Estimator[_] =>
estimator.fit(curDataset, paramMap)
// 若 stage 为 Transformer,直接返回
case t: Transformer =>
t
case _ =>
throw new IllegalArgumentException(
s"Do not support stage $stage of type ${stage.getClass}")
}
curDataset = transformer.transform(curDataset, paramMap)
// 将返回的 transformer 加入 transformers列表
transformers += transformer
} else {
//直接将 stage 转为 Transformer
transformers += stage.asInstanceOf[Transformer]
}
}
new PipelineModel(this, map, transformers.toArray)
}
private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
val map = this.paramMap ++ paramMap
val theStages = map(stages)
require(theStages.toSet.size == theStages.size,
"Cannot have duplicate components in a pipeline.")
theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur, paramMap))
}
}
2.3 实例讲解
Transformer1 ---> Estimator1 --> Transformer2 --> Transformer3 --> Estimator2 --> Transformer4
我们知道Estimator和Transformer的区别是, Estimator需要经过一次fit操作, 才会输出一个Transformer, 而Transformer直接就是Transformer;
对于训练过程中的Transformer,只有一个数据经过Transformer操作后会被后面的Estimator拿来做fit操作的前提下,该Transformer操作才是有意义的,否则训练数据不应该经过该Transformer.
拿上面的Transformer4来说, 它后面没有Estimator操作, 如果对训练数据进行Transformer操作没有任何意义,但是在预测数据上还是有作用的,所以它可以直接用来构建PipelineModel.
对于训练过程中Estimator(非最后一个Estimator), 即上面的Estimator1,非Estimator2, 它训练数据上训练出的模型以后, 需要利用该模型对训练数据进行transform操作,输出的数据
继续进行后面Estimator和Transformer操作.
拿缺失值填充的例子来说, 我们可以利用当前数据,训练出一个模型, 该模型计算出每个字段的平均值, 然后我们理解利用这个模型对训练数据进行缺失值填充.
但是对于最后一个Estimator,其实是没有必要做这个”curDataset = transformer.transform(curDataset, paramMap)”操作的, 所以这里还是可以有优化的!!嘿嘿!!!
##三、总结
到目前为止,已经将Pipeline讲解的比较清楚了, 利用Pipeline可以将数据挖掘中各个步骤进行流水线化, api方便还是很简介清晰的!
最后拿孟祥瑞CI的描述信息中一个例子做结尾,其中pipeline包含两个stage,顺序为StandardScaler和LogisticRegression
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
val lr = new LogisticRegression()
.setFeaturesCol(scaler.getOutputCol)
val pipeline = new Pipeline()
.setStages(Array(scaler, lr))
val model = pipeline.fit(dataset)
val predictions = model.transform(dataset)
.select('label, 'score, 'prediction)