你好,游客 登录
背景:
阅读新闻

揭秘可视化自助机器学习平台

[日期:2018-06-21] 来源:公众账号  作者: [字体: ]

1 概述

数据管理平台(DMP)提供精准的人群画像,帮助用户挖掘潜在客户,提供精细化的数据营销服务。在广告营销领域,用户通常通过DMP定制标签,生成人群包并完成广告定向投放,为了提供给用户更为高级的数据挖掘的方式,我们开发了自助机器学习平台(神机平台),提供给用户打通融合一方数据和DMP数据能力,允许用户使用可视化方式构建模型并发布,通过预测打分生成人群包并用于广告营销等应用场景。

相对于其他通用机器学习平台,自助机器学习平台具备以下几个特点:1)数据单向流入保证数据安全;2)打通DMP构建模型产出人群包;3)支持用户上传自定义算法(Spark/Tensorflow);4)提供多种内置算法(逻辑回归,GBDT,线性回归,KMeans,ALS等)。下图简要概括了平台所具备的功能:

用户在前端界面通过拖拽组件的方式,完成数据上传、数据融合、构建模型、发布应用并产出人群包。系统采用三层架构:分为API接口层、逻辑功能层、计算存储层。系统架构如下图所示:

从功能模块划分:系统包括数据融合模块、模型任务模块、应用部署模块。技术方案主要包括Spray(接口层和模型服务Restful API),Hadoop/Spark/Tensorflow(存储计算平台),MLeap(模型序列化),以下章节将对算法后台的具体功能模块做详细介绍。

2 数据融合模块

2.1 数据接入

数据接入支持HTTP文件上传以及FTP文件上传,用户在上传数据时需要指定表名,字段名称以及字段类型,最后数据落入HDFS,接下来数据预处理阶段,需要完成1)字段类型检查2)空值处理3)生成列详细信息三种功能,每个功能通过定义一个Spark SQL UDF/UDAF完成:

1)字段检查:首先将用户上传的数据以文本格式加载DataFrame,然后根据用户定义字段类型检查与字段值是否符合,如果字段类型符合则根据字段类型定义Schema并转换为新的DataFrame,否则抛出异常并返回异常信息。

2)空值处理:根据用户指定字段类型和默认值,逐列填充空值。

3)生成列详细信息:该功能主要是为了优化用户体验,通常在特征处理阶段或算法执行阶段,需要指定Features列和Label列,比如Spark ML GBDT算法目前支持二分类问题且Label值必须为0|1,而逻辑回归算法则支持多分类,在特征处理阶段String类型字段明显只支持离散型编码,而int|double类型既支持离散型编码又支持连续型编码。那么在用户上传数据并新建表后,将数据表用于模型训练时,为了告知用户,哪些列可以用于构建Features以及可以选作Label,我们实现了这样的功能:统计每个字段值范围并根据字段类型,生成列的详细描述。对于某字段得到:{label: ["logisticregression", "gbdt"], feature:["categorical","continuous"]},表示该列可以做为逻辑回归和GBDT的Label,同时也可以选做连续型特征或者离散型特征。

2.2 数据融合

数据上传完成以后,在构建模型时用户首先会融合DMP标签。系统中有三种标签(基础标签、高级标签和定制化标签)可以选择,支持多种标识Key。例如当广告主在投放某种商业类型广告时,可以使用商业兴趣标签构建模型,通过预测筛选出对该广告感兴趣的人群,进而提升广告投放效果。前端展示的数据融合操作界面如下图所示:

1)数据标签准备:标签数据主要来自两种数据源,基础标签和高级标签来源DMP离线批量数据,定制化的标签通过API请求其他服务获得。与数据上传阶段类似,标签数据处理的技术方案选取DataFrame+Spark SQL+UDF/UDAF。在选取存储方案时,考虑到数据标签纬度很高(如果加入关键字等高纬度特征达到百万至千万纬),同时如果每个特征保存为一列,超长的列数使DataFrame处理起来效率变的很差。所以我们将数据标签重新编码并保存为SparseVector格式存储为DataFrame一列,这样DMP标签只有两列[key,tags],提升处理效率同时减少存储空间(相对纯文本+gzip压缩格式存储,DataFrame+SparseVector+Parquet+Gzip压缩格式减少40%存储空间)。SparseVector格式如下图所示,其中tags表示用户标签,当中三项分别表示标签总长度,标签index,标签值。

user_id

tags

   

14399

(5000,[1478,1479,1481,1482,1483,1485,1487,1489],[2.0,2.0,1.0,2.0,1.0,1.0,1.0,1.0])

23999

(5000,[1479,1484,1485,1486,1487,1491,1495,1502],[2.0,2.0,3.0,1.0,1.0,2.0,1.0,12.0])

28799

(5000,[1479,1488,1490,1492,1493,1496,1498,1500],[1.0,1.0,2.0,2.0,1.0,2.0,1.0,3.0])

2)数据融合流程:完成标签勾选点击提交任务后会触发请求到服务端,请求Body体格式如下:

{"in_table_name":"ctr_data_in","out_table_name":"ctr_data_out","join_cols":["tag1", "tag2"],"join_keys":[{"key_name":"user_id", "key_type":"qq"}]}

解析参数,并执行以下流程:

1)SparkSession通过in_table_name获取数据表路径加载数据。

2)Key列批量转换,将join_keys中用户标识列通过ID打通转化为DMP识别Key。

3)通过解析join_cols和和join_keys动态生成sql调用SparkSession执行。

4)如果join_cols包含自定义标签则需要通过调用API请求获取。

5)根据out_table_name将DataFrame写入指定路径。

6)前端根据任务ID轮询查询任务状态,成功返回成功状态码,失败返回捕获到Execption信息。

3 模型任务模块

数据上传完成并融合DMP标签后,用户就可以通过拖拽组件方式构建模型任务图,前端界面如下图所示,模型任务图中包含若干节点任务,我们在实现该模块功能时将其拆分为三个子模块1)模型任务表示2)节点任务执行3)模型任务保存。

3.1 模型任务表示

拓扑任务解析与表达:当用户点击新建模型下方运行按钮时,服务端收到请求首先要解析任务图的拓扑结构,然后得到如下参数表示形式:

[(b)=DataLoader[].load(a)~(c)=FeaturesEncoder[inputCols:f1|f2:seq[string]].transform(b)(d,e)=DataSpliter[weight:0.4:double].split(c)~(f)=LogisticRegression[maxItex:10:int,l1:0.1:double].fit(d)~(g)=GBDT[maxItex:10:int,depth:4:int].fit(d)~(h)=Combiner[type:1:int].combine(f,g)~(i)=Predictor[].predict(h,e)~(j)=Evaluator[].evaluate(i)]

上述表示形式解释:1)最外层[]表示一个完整的包含多个节点的拓扑序列;2)连接符号~表示拓扑节点先后关系;3)在=号前面部分表示节点的返回值,在=号后面部分表示类函数调用;4)类名后面[,,]表示类成员变量赋值以及相应类型;5)函数名后()表示函数接受参数,参数来源于前面节点的返回。

参数配置时我们通常习惯于使用xml或者json结构等,但之所以在表示图任务的Pipeline时,采用了自定义的表达式方式,主要原因有以下几个:1)上述表示方式相对xml与json变的更简洁且可读性变的更强;2)更容易得到任务图和Pipiline的关系,便于直接描述节点间的输入输出关系;3)容易扩展为DSL,尤其在交互式编程环境下更容易实现模型任务图构建执行。

上述得到的全图拓扑结构,即表示用户运行图中所有节点,为了避免运行已经运行过的节点,还有另外三种运行模式,例如当用户在逻辑回归节点图标右击执行以下操作:

1)执行到当前节点。从当前节点向上回溯,获取所有父节点,获取拓扑序列的最小子集。得到:

[(b)=DataLoader[].load(a)~(c)=FeaturesEncoder[inputCols:f1|f2:seq[string]].transform(b)(d,e)=DataSpliter[weight:0.4:double].split(c)~(f)=LogisticRegression[maxItex:10:int,l1:0.1:double].fit(d)]

2)从当前节点执行。从当前节点向下遍历,获取所有子节点,同时找出所有子节点的父节点(父节点应该是已经运行成功的状态,加入父节点保证子节点输入),从而获取拓扑序列的最小子集。

[(d,e)=DataSpliter[weight:0.4:double].split(c)~(f)=LogisticRegression[maxItex:10:int,l1:0.1:double].fit(d)~(g)=GBDT[maxItex:10:int,depth:4:int].fit(d)~(h)=Combiner[type:1:int].combine(f,g)~(i)=Predictor[].predict(h,e)~(j)=Evaluator[].evaluate(i)]

3)执行当前节点。找出当前节点的父节点(父节点应该是已经运行成功的状态,加入父节点保证子节点输入),得到拓扑序列的最小子集。得到:

[(d,e)=DataSpliter[weight:0.4:double].split(c)~(f)=LogisticRegression[maxItex:10:int,l1:0.1:double].fit(d)]

3.2 模型任务运行

通过对模型任务图的拓扑解析,我们将其表示为类DSL配置后,接下来还需要完成1)节点抽象化2)节点功能实现3)自定义算法上传,最后开始4)模型任务运行。

3.2.1 节点抽象化

在介绍节点抽象化之前,我们先了解下Spark ML的几个概念:

1)DataFrame/DataSet:是Spark SQL中的一种数据类型,结构中保存了数据表中Schema,并支持列式存储。在Spark ML中算法统一使用DataFrame/DataSet做为接口参数。

2)Transformer:将传入的DataFrame转换成另一个DataFrame。例如数据预处理的数据归一化,特征工程中的OneHot编码都被定义为Transformer。

3)Estimator:将传入的DataFrame用于模型训练,例如分类算法中逻辑回归算法,在Estimator中调用fit函数完成训练过程并返回模型,返回的模型必须是Transformer。这样的话模型的Transform函数可以使用模型完成对数据的预测,通过对数据追加列的方式保存预测结果,返回新的DataFrame。

4)Pipeline:Transformer和Estimator都继承与PipilineSpage,Pipeline将多个PipelineStage组合在一起组成一个ML Workflow,管理并执行,运行流程如下图所示:

总之,完整的Pipeline工作流工作机制可描述为:训练阶段Pipeline做为一个Estimator调用fit函数,该函数循环调用Pipeline中所有Stages中fit函数,将Stages返回的所有Transformer再封装成一个PipelineModel,PipelineModel同时也是一个Transformer。预测阶段PipelineModel调用transform函数,该函数循环调用所有Stages的transform函数,逐步向下传递DataFrame并完成数据处理。

Spark Pipeline定义非常简洁,Stages间输入输出均为DataFrame,但Pipeline局限于单条工作流。例如对于我们系统中支持的数据拆分节点和模型组合节点对应的多输入多输出功能,无法使用Pipeline执行任务,另外对于数据分拆节点也无需保存在模型中。为此我们做了以下改进:

1)抽象出Auxiliarier类型,Auxiliarier也是继承于PipelineStage,在不同阶段与Estimator、Transfromer相比具有不同的功能。Auxiliarier的fit函数和transfrom函数接受任意类型的变长参数。数据分拆和模型组合均是Auxiliarier,相对于Estimator和Transformer有以下区别:

节点类型

训练阶段

预测阶段

保存阶段

       

Estimator

训练模型

数据转换

保存参数

Transformer

数据转换

数据转换

保存模型

Auxiliarier

训练模型/数据转换

数据转换

可以不做保存

节点类型

fit

transform

     

Estimator

参数:DataFrame,返回:Transfromer

参数:DataFrame,返回:DataFrame

Transformer

参数:DataFrame,返回:DataFrame

参数:DataFrame,返回:DataFrame

Auxiliarier

参数:Any变长参数,返回:Any

参数:Any变长参数,返回:Any

加入Auxiliarier类型后继承关系如下图所示:

2)增加全局静态变量,用于保存模型任务图结构。例如对于模型任务拓扑序列:

[(b)=DataLoader[].load(a)~(c)=FeaturesEncoder[inputCols:f1|f2:seq[string]].transform(b)(d,e)=DataSpliter[weight:0.4:double].split(c)~(f)=LogisticRegression[maxItex:10:int,l1:0.1:double].fit(d)~(g)=GBDT[maxItex:10:int,depth:4:int].fit(d)~(h)=Combiner[type:1:int].combine(f,g)~(i)=Predictor[].predict(h,e)~(j)=Evaluator[].evaluate(i)]

得到每个Stages之间输入输出之间的关系,将7个Stage保存到全局的数据结构Array[case stage(...)]。每个Stage运行时从中取出Stage间的输入输出映射关系,并通过反射机制调用相应的类对象及函数,运行完成填入返回结果。

3)修改Pipeline/PipelineModel的fit/transform执行逻辑,相对与原生Pipeline顺序执行每个Stage,改造后的Pipeline具备以下几个功能:(1)保存并读取全局静态变量模型任务图结果;(2)Runtime时识别每个Stage调用函数的参数个数和参数类型;(3)根据保存的模型任务图结构,查找映射关系为每个Stage找到正确的参数输入;(4)将参数传入并顺序执行每个Stage(fit或transform函数);(5)得到返回结果将引用保存到全局变量;(6)如需要序列化并保存模型同样需要将全局变量模型任务图结构保存。

3.2.2 节点功能实现

模型任务图中主要包含以下几种类型节点:数据加载、数据预处理、特征处理、算法、模型组合、预测、模型评估。所有节点都要具备以下几种功能函数:1)对应的fit或transform函数处理主要逻辑;2)如果需要持久化保存到HDFS,需要定义save和load函数。其他辅助函数根据需要实现,如:节点状态更新功能、节点LOG日志生成功能、节点运行结果临时保存功能、节点生成预览信息功能。 开发算法时采用插件式开发的模式,新开发一个算法,需要在配置中注册算法名字和对应的包路径和类名比如:{"mylr","com.tancent.shenji.pluigns.MyLR"},再通过配置组装构建Pipeline。在模型任务图中比较重要的节点是:1)算法节点,目前支持的算法有逻辑回归、GBDT、线性回归、KMeans、ALS和自定义算法;2)模型评估节点。

1)算法节点:

节点运行:Pipeline在运行算法节点之前,首先通过查询,获取当前任务节点运行状态,如果节点运行状态已经成功,则调用算法的load函数直接加载已有模型。如果当前运行节点是非成功状态,根据算法插件配置反射生成类对象并初始化成员变量。最后调用算法fit函数完成模型训练。

节点保存:在算法节点保存的是算法模型。调用算法插件的save函数,在HDFS算法节点临时目录下保存模型,在下次运行时通过直接通过load函数加载模型。

节点预览:保存算法的训练信息。包括训练时候参数如迭代轮数,正则系数,训练耗时,每轮迭代损失等。前端所看到的效果如下图所示:

2)模型评估节点

节点运行:Pipeline在运行模型评估节点之前,同样先通过查询,获取当前任务节点运行状态,如果节点运行状态已经成功,则调用load函数直接加载模型评估结果。如果当前运行节点是非成功状态,根据算法插件配置反射生成类对象并初始化成员变量,调用transform函数完成模型评估功能。

节点保存/节点预览:两个节点的临时目录均保存模型评估报告,不同算法产生的评估不同,例如对于二分类问题,会生成AUC,Loss以及增加混淆矩阵等评估信息。 前端所看到的效果如下图所示:

3.2.3 自定义算法上传

由于提供的内置算法有限,对于想开发自己算法的用户,系统提供了算法插件开发方法。用户可以根据算法模版开发插件,模版中定义了几个必须的要重写的方法:train/predict/evaluate/save/load,Pipeline在不同执行阶段,调用上传算法的不同函数,完成模型任务图的执行。支持基于Spark集群或Tensorflow单机的算法,支持上传Jar包和Python脚本,Java模版示例如下: public class AlgorithmPlugin {

public LogisticRegressionModel model = null; private double threshold =0.5; public void setThreshold(double threshold) { this.threshold = threshold; } public LogisticRegressionModel train(Dataset trainData) { LogisticRegression lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.3) .setThreshold(this.threshold); this.model = lr.fit(trainData); return model; } public Dataset predict(Dataset testData) { return this.model.transform(testData); } public String evaluate(Dataset testData) { return "auc=0.8"; } public void save(String modelPath) { try { this.model.save(modelPath); } catch (Exception e) { e.printStackTrace(); } } public LogisticRegressionModel load(String modelPath) { this.model = LogisticRegressionModel.load(modelPath); return this.model; } }

3.2.4 模型任务运行

根据节点抽象类型Estimator/Transformer/Auxiliarier的不同特性,实现了节点具体功能以后,就可通过改写后的Pipeline执行模型任务拓扑序列,任务运行流程图如下:

3.3 模型任务保存

模型序列化模块采用MLeap,MLeap是一个通用的模型序列化开源包并提供执行环境。支持Spark、Scikit-learn、Tensorflow,可以将三种算法平台模型导出为MLeapModel,并提供通用的API方式加载模型提供服务。如果在系统中不想使用Python API接口或者你想脱离Spark平台加载并运行模型包预测本地数据,那么就可以选择MLeap做模型序列化。

MLeap支持绝大部分Spark原生算法的模型序列化,并且提供自定算法模型序列化方法,例如在平台中开发的特征抽取算法,需要根据MLeap提供的方法开发相应的插件。由于为了支持模型任务图运行,我们改造了Pipeline,相应的在MLeap序列化模型后,我们需要将模型任务图的配置并保存下来。

对于不同节点抽象类型Estimator/Transformer在保存阶段是需要保存模型或参数的,而对于Auxiliarier则可以不用保存。模型保存流程如下:

1)图拓扑序列回溯,找到Estimator/Transformer,对于本章开始的模型任务图,向上回溯得到两条Pipeline,即逻辑回归算法的Pipeline和GBDT算法的Pipeline。

2)单独保存需要保存的Auxiliarier。

3)保存模型任务图的结构。

将所有保存下来的模型和配置打包保存HDFS,保存模型即可以脱离Spark/Tensorflow/Scikit-learn环境,加载模型并提供预测服务。

4 应用部署模块

4.1 营销应用

营销应用主要功能是:当用户训练完成模型后,可以发布模型新建一个营销应用并关联模型ID,营销应用提供批量预测功能,上传数据并执行预测后,模型服务引擎加载模型完成预测打分,用户可以根据预测结果,生成相应人群包。前端对应的界面如下图所示:

4.2 在线应用

在线应用的主要功能是:当用户训练完成模型后,可以发布模型新建一个在线应用并关联模型ID,同时启动HTTP Server加载模型,返回用户URI和Assesskey,用户可以通过Restful API在线请求,前端相应的界面如下图所示:

5 总结

本文总体介绍了系统整体架构和功能实现,侧重讲解了模型任务节点抽象化,算法插件式开发和模型序列化等几个关键技术点。目前系统支持Spark ML部分算法和单机Tensorflow算法上传,后续对分布式深度学习的支持将是一个重要方向。

收藏 推荐 打印 | 录入:Cstor | 阅读:
相关新闻      
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款