0%

MLLib Pipeline的实现分析(转)

文章来源:https://github.com/ColZer/DigAndBuried/blob/master/spark/mllib-pipeline.md

以下为原文:

在2014年11月,他就在Spark MLLib代码中CI了一个全新的package:”org.apache.spark.ml”, 和传统的”org.apache.spark.mllib”独立, 这个包即Spark MLLib的
Pipeline and Parameters

pipeline即机器学习流水线, 在实际的机器学习应用过程中,一个任务(Job)由很多过程(Stage)组成, 比如日志的收集,清理,到字段/属性/feature的提取, 多算法的组合而成的模型,
模型的离线训练和评估,到最后的模型在线服务和在线评估,所进行的每一个stage都可以概况为数据到数据的转换以及数据到模型的转换.

后面我们会看到,mllib pipeline中的stage也做了相应的划分.

Parameters即参数化,机器学习大部分的过程是一个参数调优的过程,一个模型应该很显示的告诉使用者,模型有那些参数可以进行设置,以及这些参数的默认值是怎么样的;在现有的机器学习算法中,
模型的参数可能是以模型类字段或通过一些函数进行设置,不是很直观的展现当前模型有哪些可以调优的参数;

针对这个问题,在这个版本中,提出了Parameters功能,通过trait的方式显式地指定stage拥有那些参数.

下面我们就针对这两个部分进行分析, pipeline到我今天学习为止,还只有三个PR,很多东西应该还在实验中,这里做一个分析仅供学习和备忘.

##一、Parameters

1
2
3
4
5
6
7
8
class LogisticRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

private var threshold: Option[Double] = Some(0.5)
//...
}

上面是传统的org.apache.spark.mllib包中一个分类器:LogisticRegressionModel, 如果理解logistics分类器,那么我知道其中的threshold为模型一个很重要的参数.
但是从对于一般的用户来说,我们只知道这个模型类中有一个threshold字段,并不能很清楚了该字段是否是模型可调优的参数,还是只是类的一个”全局变量”而已;

针对这个问题, 就有了Parameters参数化的概念,先直接看结果:

1
2
3
4
5
6
7
8
9
class LogisticRegressionModel private[ml] (
override val parent: LogisticRegression,
override val fittingParamMap: ParamMap,
weights: Vector)
extends Model[LogisticRegressionModel] with LogisticRegressionParams {

private[classification] trait LogisticRegressionParams extends Params
with HasRegParam with HasMaxIter with HasLabelCol with HasThreshold with HasFeaturesCol
with HasScoreCol with HasPredictionCol {

我们看到这里的LogisticRegressionModel实现了LogisticRegressionParams,而LogisticRegressionParams继承了Params类,并且”拥有”一组Param,即HasMaxIter, HasRegParam之类.
相比传统的LogisticRegressionModel, 这个版本我们可以清楚的看到,该模型有RegParam, MaxIter等7个可控参数,其中包括我们上面谈到的threshold参数, 即HasThreshold.

即Parameters 将mllib中的组件参数进行标准化和可视化,下面我们继续针对Parameters进行分析.

1
2
class Param[T] (val parent: Params,val name: String,val doc: String,
val defaultValue: Option[T] = None)extends Serializable {

Param表示一个参数,从概念上来说,一个参数有下面几个属性:

  • param的类型:即上面的[T], 它表示param值是何种类型
  • param的名称:即name
  • param的描述信息,和name相比, 它可以更长更详细, 即doc
  • param的默认值, 即defaultValue

针对param的类型,ml提供了一组默认的子类, 如IntParam,FloatParam之类的.就不详细展开

另外针对Param, 提供了接口来设置Param的值

1
2
3
def w(value: T): ParamPair[T] = this -> value
def ->(value: T): ParamPair[T] = ParamPair(this, value)
case class ParamPair[T](param: Param[T], value: T)

即将Param和value封装为paramPair类, paramPair是一个case类,没有其他的方法, 仅仅封装了Param和Value对. 因此我们可以通过param->value来创建一个ParamPair, 后面我们会看到怎么对ParamPair
进行管理.

上面谈到Param其中一个参数我们没有进行描述, 即”parent: Params”. 如果站在模型, 特征提取器等组件角度来, 它们应该是一个param的容器,它们的逻辑代码可以使用自身的容器中的param.
换句话说,如果一个组件继承自Params类,那么该组件就是一个被参数化的组件.

参考上面的LogisticRegressionModel, 它继承了LogisticRegressionParams, 而LogisticRegressionParams实现了Params, 此时LogisticRegressionModel就是一个被参数化的模型. 即自身是一个param容器

现在问题来了, 一个组件继承自Params, 然而我们没有通过add等方式来将相应的param添加到这个容器里面, 而是通过”with HasRegParam”方式来进行设置的,到底是怎么实现的呢?看一个rf现

1
2
3
4
5
6
7
8
9
private[ml] trait HasRegParam extends Params {
val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter")
def getRegParam: Double = get(regParam)
}

private[ml] trait HasMaxIter extends Params {
val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations")
def getMaxIter: Int = get(maxIter)
}

我们看到每个具体的RegParam都是继承自Params, 这个继承感觉意义不大,所有这里就不纠结它的继承机制, 核心是它的val regParam: DoubleParam常量的定义,这里的常量会被编译为一个函数,
其中函数为public, 返回值为DoubleParam, 参数为空. 为什么要强调这个呢?这是规范. 一个具体的Param只有这样的实现, 它才会被组件的Params容器捕获. 怎么捕获呢? 在Params中有这样一个代码:

1
2
3
4
5
6
7
8
9
10
// 使用 java 反射的方法列出所有的(没有入参和返回值为Param)的方法
def params: Array[Param[_]] = {
val methods = this.getClass.getMethods
methods.filter { m =>
Modifier.isPublic(m.getModifiers) &&
classOf[Param[_]].isAssignableFrom(m.getReturnType) &&
m.getParameterTypes.isEmpty
}.sortBy(_.getName)
.map(m => m.invoke(this).asInstanceOf[Param[_]])
}

这里就清晰了,一个组件, 继承Params类, 成为一个可参数化的组件, 该组件就有一个params方法可以返回该组件所有配置信息,即 Array[Param[_]]. 因为我们组件使用With方式继承了符合上面规范的”常量定义”,
这些param会被这里的def params所捕获, 从而返回该组件所有的Params;

不过还有一个问题,我们一直在说, 组件继承Params类, 成为一个可参数化的组件,但是我这里def params只是返回 Array[Param[_]], 而一个属性应该有Param和value组成, 即上面我们谈到ParamPair,
因此Params类中应该还有一个容器,维护Array[ParamPair], 或者还有一个Map,维护param->value对.

没错,这就是ParamMap的功能, 它维护了维护param->value对.并且每个实现Params的组件都有一个paramMap字段. 如下:

1
2
3
trait Params extends Identifiable with Serializable {
protected val paramMap: ParamMap = ParamMap.empty
}

具体的ParamMap实现这里就不分析, 就是一个Map的封装, 这里我们总结一下Parameters的功能:

Parameters即组件的可参数化, 一个组件,可以是模型,可以是特征选择器, 如果继承自Params, 那么它就是被参数化的组件, 将具体参数按照HasMaxIter类的规范进行定义, 然后通过With的
方式追加到组件中,从而表示该组件有相应的参数, 并通过调用Params中的getParam,set等方法来操作相应的param.

整体来说Parameters的功能就是组件参数标准化

二、Pipeline

模型训练是整个数据挖掘和机器学习的目标, 如果把整个过程看着为一个黑盒, 内部可以包含了很多很多的特征提取, 模型训练等子stage,
但是站在黑盒的外面, 整个黑盒的输出就是一个模型(Model), 我们目标就是训练出一个完美的模型, 然后再利于该模型去做服务.

这句话就是pipeline的核心, 首先pipeline是一个黑盒生成的过程, 它对外提供fit接口, 完成黑盒的训练, 生成一个黑盒模型, 即PipelineModel

如果站在白盒的角度来看, pipeline的黑盒中肯定维护了一组stage, 这些stage可以是原子的stage,也可能继续是一个黑盒模型, 在fit接口调用时候,
会按照流水线的顺序依次调用每个stage的fit/transform函数,最后输出PipelineModel.

2.1 PipelineStage

如上所言, mllib pipeline是基于Spark SQL中的schemeRDD来实现, pipeline中每一次处理操作都表现为对schemeRDD的scheme或数据进行处理, 这里的操作步骤被抽象为Stage, 即PipelineStage

1
2
3
abstract class PipelineStage extends Serializable with Logging {
private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType
}

抽象的PipelineStage的实现很简单, 只提供了transformSchema虚函数, 由具体的stage进行实现,从而在一定参数paramMap作用下,对scheme进行修改(transform).

上面我们也谈到, 具体的stage是在PipelineStage基础上划分为两大类, 即数据到数据的转换(transform)以及数据到模型的转换(fit).

  • Transformer: 数据到数据的转换
  • Estimator: 数据到模型的转换

2.1.1 Transformer

我们首先来看Transformer, 数据预处理, 特征选择与提取都表现为Transformer, 它对提供的SchemaRDD进行转换生成新的SchemaRDD, 如下所示:

1
2
3
abstract class Transformer extends PipelineStage with Params {
def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD
}

特殊的Transformer:模型

在mllib中, 有一种特殊的Transformer, 即模型(Model), 下面我们会看到模型是Estimator stage的产物,但是model本身是一个Transformer,
模型是经过训练和挖掘以后的一个对象, 它的一个主要功能就是预测/推荐服务, 即它可以对传入的dataset:SchemaRDD进行预测, 填充dataset中残缺的决策字段或评分字段, 返回更新后的SchemaRDD

2.1.2 Estimator

Estimator stage的功能是模型的估计/训练, 即它是一个SchemaRDD到Model的fit过程. 如下所示fit接口.

1
2
3
abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
def fit(dataset: SchemaRDD, paramMap: ParamMap): M
}

2.2 PipelineModel

PipelineModel是由一组Transformer组成, 在对dataset进行预测(transform)时, 是按照Transformer的有序性(Array)逐步的对dataset进行处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
class PipelineModel private[ml] (
override val parent: Pipeline,
override val fittingParamMap: ParamMap,
private[ml] val stages: Array[Transformer])
extends Model[PipelineModel] with Logging {

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
// Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
val map = (fittingParamMap ++ this.paramMap) ++ paramMap
transformSchema(dataset.schema, map, logging = true)
stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, map))
}
}

2.3 Pipeline

Pipeline首先是一个Estimator, fit输出的模型为 PipelineModel, 其次Pipeline也继承Params类, 即被参数化,
其中有一个参数, 即stages, 它的值为Array[PipelineStage], 该参数存储了Pipeline拥有的所有的stage;

Pipeline提供了fittransformSchema两个接口:

  • transformSchema 接口: 使用foldLeft函数, 对schema进行转换.
  • fit 接口: 遍历 stages列表。将 Estimator 类型的stage 转换为 model(model其实就是一种Transformer)
class Pipeline extends Estimator[PipelineModel] {
 val stages: Param[Array[PipelineStage]] = new Param(this, "stages", "stages of the pipeline")
 def setStages(value: Array[PipelineStage]): this.type = { set(stages, value); this }
 def getStages: Array[PipelineStage] = get(stages)

 override def fit(dataset: SchemaRDD, paramMap: ParamMap): PipelineModel = {
     transformSchema(dataset.schema, paramMap, logging = true)
     val map = this.paramMap ++ paramMap
     val theStages = map(stages)
     // 记录最后一个 Estimator 的位置
     var indexOfLastEstimator = -1
     theStages.view.zipWithIndex.foreach { case (stage, index) =>
       stage match {
         case _: Estimator[_] =>
           indexOfLastEstimator = index
         case _ =>
       }
     }
     var curDataset = dataset
     val transformers = ListBuffer.empty[Transformer]
     //遍历所有的stages
     theStages.view.zipWithIndex.foreach { case (stage, index) =>
       // 遍历在 indexOfLastEstimator 之前的stage
       if (index <= indexOfLastEstimator) {
         val transformer = stage match {
           // 若 stage 为 Estimator,则将 estimator.fit的结果返回 
           case estimator: Estimator[_] =>
             estimator.fit(curDataset, paramMap)
           // 若 stage 为 Transformer,直接返回
           case t: Transformer =>
             t
           case _ =>
             throw new IllegalArgumentException(
               s"Do not support stage $stage of type ${stage.getClass}")
         }
         curDataset = transformer.transform(curDataset, paramMap)
         // 将返回的 transformer 加入 transformers列表
         transformers += transformer
       } else {
         //直接将 stage 转为 Transformer
         transformers += stage.asInstanceOf[Transformer]
       }
     }

     new PipelineModel(this, map, transformers.toArray)
   }

 private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
     val map = this.paramMap ++ paramMap
     val theStages = map(stages)
     require(theStages.toSet.size == theStages.size,
            "Cannot have duplicate components in a pipeline.")
     theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur, paramMap))
        }
}

2.3 实例讲解

Transformer1 ---> Estimator1 --> Transformer2 --> Transformer3 --> Estimator2 --> Transformer4

我们知道Estimator和Transformer的区别是, Estimator需要经过一次fit操作, 才会输出一个Transformer, 而Transformer直接就是Transformer;

对于训练过程中的Transformer,只有一个数据经过Transformer操作后会被后面的Estimator拿来做fit操作的前提下,该Transformer操作才是有意义的,否则训练数据不应该经过该Transformer.

拿上面的Transformer4来说, 它后面没有Estimator操作, 如果对训练数据进行Transformer操作没有任何意义,但是在预测数据上还是有作用的,所以它可以直接用来构建PipelineModel.

对于训练过程中Estimator(非最后一个Estimator), 即上面的Estimator1,非Estimator2, 它训练数据上训练出的模型以后, 需要利用该模型对训练数据进行transform操作,输出的数据
继续进行后面Estimator和Transformer操作.

拿缺失值填充的例子来说, 我们可以利用当前数据,训练出一个模型, 该模型计算出每个字段的平均值, 然后我们理解利用这个模型对训练数据进行缺失值填充.

但是对于最后一个Estimator,其实是没有必要做这个”curDataset = transformer.transform(curDataset, paramMap)”操作的, 所以这里还是可以有优化的!!嘿嘿!!!

##三、总结

到目前为止,已经将Pipeline讲解的比较清楚了, 利用Pipeline可以将数据挖掘中各个步骤进行流水线化, api方便还是很简介清晰的!

最后拿孟祥瑞CI的描述信息中一个例子做结尾,其中pipeline包含两个stage,顺序为StandardScaler和LogisticRegression

val scaler = new StandardScaler() 
    .setInputCol("features") 
    .setOutputCol("scaledFeatures") 
val lr = new LogisticRegression() 
    .setFeaturesCol(scaler.getOutputCol) 

val pipeline = new Pipeline() 
    .setStages(Array(scaler, lr)) 
val model = pipeline.fit(dataset) 
val predictions = model.transform(dataset) 
    .select('label, 'score, 'prediction)