spark-BigDl:深度学习之神经网络编写
BigDl主要实现了各种深度学习神经网络算法,当然也可以构建简单的神经网络。
一、下载依赖包和初始化系统
1.第一步是下载适合本地spark版本的
https://github.com/intel-analytics/BigDL/wiki/Downloads
2.把上述步骤的解压包在linux上解压后,在spark中初始化,运行其bin/bigdl.sh。运行命令为:
source bigdl.sh
3.在spark-shell中,要添加依赖包:
--driver-class-path bigdl-0.1.0-jar-with-dependencies.jar
二、神经网络算法
(一)神经网络nn模型训练
1.运行程序为:kingpoint.nn.NnTrain
2.程序:
(1)神经网络模型:NnModel
package kingpoint.nn
import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.numeric.NumericFloat
/**
* Created by llq on 2017/6/16.
*/
object NnModel {
/**
* 神经网络训练层参数设置
* @param layerNumber
* @return
*/
def apply(layerNumber: String,neuralNetNumber:String): Module[Float] = {
val layerInt=layerNumber.toInt
val neuralNetArray=neuralNetNumber.split(",").map(_.toInt)
val model = Sequential()
//添加第一层
model.add(Reshape(Array(neuralNetArray(0))))
//循环添加到output层的前一层
for(i<- 0 to layerInt-3){
model.add(Linear(neuralNetArray(i), neuralNetArray(i+1)))
.add(Tanh())
}
//添加output层
model.add(Linear(neuralNetArray(layerInt-2), neuralNetArray(layerInt-1)))
.add(LogSoftMax())
}
}
(2)模型训练:NnTrain
package kingpoint.nn
import java.io.File
import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.dataset.DataSet.SeqFileFolder
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.dataset.{MiniBatch, Sample, ByteRecord, DataSet}
import com.intel.analytics.bigdl.nn.ClassNLLCriterion
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter, T}
import kingpoint.lenet5.{modelNameAccuary, LeNet5, LabeledDataFileName}
import org.apache.hadoop.io.Text
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.hive.HiveContext
import scala.collection.mutable.ArrayBuffer
/**
* 从HDFS上读取图片文件Seq
* Created by llq on 2017/6/6.
*/
object NnTrain {
LoggerFilter.redirectSparkInfoLogs()
Logger.getLogger("com.intel.analytics.bigdl.optim").setLevel(Level.INFO)
/**
* 读取数据,形成Sample[Float]
* @param trainLabelDataRdd
* @return
*/
def inLoad(trainLabelDataRdd:RDD[Row]): RDD[Sample[Float]]={
trainLabelDataRdd.mapPartitions{iter=>
iter.map{row=>
var dataBuffer=new ArrayBuffer[Float]()
val data=row.mkString(",").split(",").map(_.toFloat)
dataBuffer ++= data
dataBuffer.remove(0,1)
//获得label值,并形成tensor
val labelTensor=Tensor[Float](Array(data(0)),Array(1,1))
//获得数据值,并形成Tensor
val trainTensor=Tensor[Float](dataBuffer.toArray,Array(1,dataBuffer.length))
Sample(trainTensor,labelTensor)
}
}
}
/**
* 遍历model保存路径,提取最后一次迭代的结果
* @param file
*/
def lsLinuxCheckPointPath(file:File): String ={
val modelPattern="model".r
val numberPattern="[0-9]+".r
var epcho=0
if(file.isDirectory){
val fileArray=file.listFiles()
for(i<- 0 to fileArray.length-1){
//识别出model
if(modelPattern.findFirstIn(fileArray(i).getName).mkString(",")!=""){
//取出最大一次的迭代值
val epchoNumber=numberPattern.findFirstIn(fileArray(i).getName).mkString(",").toInt
if(epchoNumber>epcho){
epcho=epchoNumber
}
}
}
}else{
throw new Exception("the path is not right")
}
"model."+epcho
}
/**
* 主方法,读取hive data,并训练NN模型
* @param args
*/
def main (args: Array[String]){
val conf = Engine.createSparkConf()
.setAppName("kingpoint.nn.NnTrain")
val sc = new SparkContext(conf)
val hiveContext=new HiveContext(sc)
Engine.init
/**
* 参数设置
*/
if(args.length<12){
System.err.println("Error:the parameter is less than 12")
System.exit(1)
}
//读取hive中数据(dl.iris_train)
val databaseTableName=args(0)
//feature name(f1,f2,f3,f4)
val featureName=List(args(1).split(","):_*)
//label name(label)
val labelName=args(2)
//设置分割数据集的比例:训练集和验证集比例(8,2)
val trainValidationRatio=args(3)
//神经网络参数配置
val layerNumber=args(4) //神经网络的层数:4
val neuralNetNumber=args(5) //神经网络每一层的神经元个数(4,100,10,3)
val batchSize=args(6).toInt //batch size(4)
val learningRate=args(7).toDouble //学习率(0.01)
val learningRateDecay=args(8).toDouble //(0.0)
val maxEpoch=args(9).toInt //设置最大Epoch值为多少之后停止.(1)
val modelSave=args(10) //模型保存路径(/root/data/model)
val outputTableName=args(11) //模型训练后参数在hive中保存的名称(dl.nn_train)
/**
* 读取数据,形成sample格式
*/
//合并feature和label名称
val featureLabelName=labelName::featureName
//读取hive数据
val originData=hiveContext.sql("select * from "+databaseTableName+" where "+featureName(0)+" !='"+featureName(0)+"'")
//train data+label data
val trainLabelDataRdd=originData.select(featureLabelName.head,featureLabelName.tail:_*).rdd
trainLabelDataRdd.repartition(32)
//分割训练集和验证集
val trainRatio=trainValidationRatio.split(",")(0).toInt
val validataionRatio=trainValidationRatio.split(",")(1).toInt
val dataRdd=trainLabelDataRdd.randomSplit(Array(trainRatio,validataionRatio))
val trainSetDataRdd=dataRdd(0)
val valitationDataRdd=dataRdd(1)
//形成Sample训练数据+验证数据
val trainSampleData=inLoad(trainSetDataRdd)
val valitationSampleData=inLoad(valitationDataRdd)
/**
* 模型参数设置和训练
*/
//建立lenet5模型,并且设置相应的参数
val model = NnModel(layerNumber,neuralNetNumber)
//设置学习率(梯度下降的时候用到)
val state =
T(
"learningRate" -> learningRate,
"learningRateDecay" -> learningRateDecay
)
//模型参数设置;训练集;根据输出误差更新权重
val optimizer = Optimizer(
model = model,
sampleRDD = trainSampleData,
criterion = new ClassNLLCriterion[Float](),
batchSize=batchSize)
optimizer.setCheckpoint(modelSave, Trigger.everyEpoch)
//开始训练模型:设置验证集;学习率;设置迭代次数;开始训练触发
optimizer
.setValidation(
trigger = Trigger.everyEpoch,
sampleRDD = valitationSampleData,
vMethods = Array(new Top1Accuracy, new Loss[Float]),
batchSize=batchSize)
.setState(state)
.setEndWhen(Trigger.maxEpoch(maxEpoch)) //设置最大Epoch值为多少之后停止。
.optimize()
//模型结构输出
println(model.evaluate())
//遍历model名称,取出最后一次迭代的model名字。再合并成全路径
val modelEpochFile=optimizer.getCheckpointPath().get+"/"+lsLinuxCheckPointPath(new File(optimizer.getCheckpointPath().get))
//获得准确率
val valitationFeatureMiniBatch=valitationSampleData.map(sample=>MiniBatch[Float](sample.feature(),sample.label()))
val validator = Validator(model, DataSet.rdd(valitationFeatureMiniBatch))
val result = validator.test(Array(new Top1Accuracy[Float]))
/**
* 模型路径和准确率存放
*/
val modelNameAccuaryRdd=sc.parallelize(List(modelNameAccuary(modelEpochFile,result(0)._1.toString)))
val modelNameAccuaryDf=hiveContext.createDataFrame(modelNameAccuaryRdd)
//保存到hive中
modelNameAccuaryDf.show()
modelNameAccuaryDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName)
}
}
3.执行命令:
spark-submit \
--master local[4] \
--driver-class-path /root/data/dlLibs/lib/bigdl-0.1.0-jar-with-dependencies.jar \
--class "kingpoint.nn.NnTrain" /root/data/SparkBigDL.jar \
dl.iris_train \
f1,f2,f3,f4 \
label \
8,2 \
4 \
4,100,10,3 \
4 \
0.01 \
0.0 \
25 \
/root/data/model \
dl.nn_train
(1)读取hive中数据:dl.iris_train
(2)选择需要作为特征的数据列的名称,并用逗号隔开:f1,f2,f3,f4
(3)选择需要作为标签的数据列的名称:label
(4)设置分割数据集的比例:训练集和验证集比例:8,2
(5)神经网络的层数:4
(6)神经网络每一层的神经元个数:4,100,10,3
(7)batch size:4
(8)学习率:0.01
(9)learningRateDecay:0.0
(10)设置最大Epoch值为多少之后停止:1
(11)模型保存路径:/root/data/model
(12)模型训练后参数在hive中保存的名称:dl.nn_train
保存在hive里面,输出字段为:模型保存路径(modelName)+验证集的准确率(accuary)。如下图所示:(注意,当需要测试模型时,需要查看modelName的值,并把这个值作为参数填写到测试模型时的参数当中)
(二)神经网络nn模型测试
1.运行程序为:kingpoint.nn.NnTrain
2.程序:
(1)DataRowToFloatRecords
package kingpoint.nn
import com.intel.analytics.bigdl.dataset.Transformer
import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import scala.collection.Iterator
import scala.collection.mutable.ArrayBuffer
/**
* 转换Row=》FloatRecords
* Created by llq on 2017/6/21.
*/
/**
* 存放数据值和label值
* @param data
* @param label
*/
case class FloatRecords(data:Array[Float],label:Float)
object DataRowToFloatRecords {
val logger = Logger.getLogger(getClass)
def apply(featureName: List[String], labelName:String): DataRowToFloatRecords = {
new DataRowToFloatRecords(featureName,labelName)
}
}
/**
* transform [[Row]] to [[FloatRecords]]
* @param featureName column name
* @param labelName label name
*/
class DataRowToFloatRecords(featureName: List[String],labelName:String)
extends Transformer[Row, FloatRecords] {
override def apply(prev: Iterator[Row]): Iterator[FloatRecords] = {
prev.map(
data => {
val dataFloatArrayBuffer=new ArrayBuffer[Float]()
for(i<- 0 to featureName.length-1){
dataFloatArrayBuffer +=data.getAs[String](featureName(i)).toFloat
}
FloatRecords(dataFloatArrayBuffer.toArray, data.getAs[String](labelName).toFloat)
}
)
}
}
(2)FloatRecordsToVector
package kingpoint.nn
import com.intel.analytics.bigdl.dataset.Transformer
import org.apache.log4j.Logger
import org.apache.spark.mllib.linalg.DenseVector
import scala.collection.Iterator
/**
* FloatRecords =>DenseVector
* Created by llq on 2017/6/21.
*/
object FloatRecordsToVector {
val logger = Logger.getLogger(getClass)
def apply(): FloatRecordsToVector = {
new FloatRecordsToVector()
}
}
/**
* Convert data to (label,denseVector) of spark mllib
*/
class FloatRecordsToVector()
extends Transformer[FloatRecords, (Float,DenseVector)] {
override def apply(prev: Iterator[FloatRecords]): Iterator[(Float,DenseVector)] = {
prev.map(
data => {
(data.label,new DenseVector(data.data.map(_.toDouble)))
}
)
}
}
(3)NnTest
package kingpoint.nn
import com.intel.analytics.bigdl.dataset.Transformer
import com.intel.analytics.bigdl.nn.Module
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter}
import kingpoint.lenet5.LenetTest
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.{SaveMode, Row, DataFrame}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.ml.{DLClassifier => SparkDLClassifier}
/**
* 神经网络模型NN测试
* Created by llq on 2017/6/16.
*/
/**
* 数据预处理后,在工作流时存放图片信息:label+data+fileName
* @param label
* @param features
* @param name
*/
case class LabeledDataFloatName(label:Float,features:DenseVector,name:String)
object NnTest {
LoggerFilter.redirectSparkInfoLogs()
Logger.getLogger("com.intel.analytics.bigdl.optim").setLevel(Level.INFO)
/**
* 工作流df转换
* 合并:label+转换后的data(DenseVector)+name
* @param data
* @param f
* @return
*/
def transformDF(data: DataFrame, f: Transformer[Row, (Float,DenseVector)],dataName:String): DataFrame = {
//利用工作流转换数据,形成RDD[(Float,DenseVector)]
val vectorRdd = data.rdd.mapPartitions(f(_))
//合并:转换后的数据+名字+label
val dataRDD = data.rdd.zipPartitions(vectorRdd) { (a, b) =>
b.zip(a.map(_.getAs[String](dataName)))
.map(
v => LabeledDataFloatName(v._1._1, v._1._2,v._2)
)
}
data.sqlContext.createDataFrame(dataRDD)
}
def main(args: Array[String]) {
val conf = Engine.createSparkConf()
.setAppName("kingpoint.nn.NnTest")
val sc = new SparkContext(conf)
Engine.init
val hiveContext = new HiveContext(sc)
/**
* 参数设置
*/
//model路径(/root/data/model/20170621_112534/model.601)
val modelPath=args(0)
//读取hive中数据(dl.iris_test)
val databaseTableName=args(1)
//feature name(f1,f2,f3,f4)
val featureName=List(args(2).split(","):_*)
//label name(label)
val labelName=args(3)
//数据名称:name
val dataName=args(4)
//batchSize(16)
val batchShape=args(5).toInt
//模型测试后参数在hive中保存的名称(dl.nn_test)
val outputTableName=args(6)
//模型评估参数在hive中保存的名称(dl.nn_test_evaluation)
val outputTableNameEvaluation=args(7)
/**
* 读取测试集数据
*/
//合并feature和label名称
val featureLabelName=dataName::labelName::featureName
val originData=hiveContext.sql("select * from "+databaseTableName+" where "+featureName(0)+" !='"+featureName(0)+"'")
//test data+label data:DataFrame
val testLabelDataDf=originData.select(featureLabelName.head,featureLabelName.tail:_*)
/**
* 模型导入和测试
*/
//导入模型
val model = Module.load[Float](modelPath)
val valTrans = new SparkDLClassifier[Float]()
.setInputCol("features")
.setOutputCol("predict")
val paramsTrans = ParamMap(
valTrans.modelTrain -> model,
valTrans.batchShape ->
Array(batchShape, featureName.length))
//数据集预处理
val transf =DataRowToFloatRecords(featureName,labelName) ->
FloatRecordsToVector()
//形成预测结果DF
val valDF = transformDF(testLabelDataDf, transf ,dataName)
val testResult=valTrans.transform(valDF, paramsTrans).select("label","name","predict")
testResult.show(30)
//准确率,并形成df
val countAccuracyDf=hiveContext.createDataFrame(sc.parallelize(Seq(LenetTest.evaluationAccuracy(testResult))))
countAccuracyDf.show()
/**
* 结果保存
*/
//保存到hive中
testResult.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName)
countAccuracyDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableNameEvaluation)
}
}
spark-submit \
--master local[4] \
--driver-class-path /root/data/dlLibs/lib/bigdl-0.1.0-jar-with-dependencies.jar \
--class "kingpoint.nn.NnTest" /root/data/SparkBigDL.jar \
/root/data/model/20170621_112534/model.601 \
dl.iris_test \
f1,f2,f3,f4 \
label \
name \
16 \
dl.nn_test \
dl.nn_test_evaluation
(1)model路径:/root/data/model/20170621_112534/model.601
(2)读取hive中数据:dl.iris_test
(3)选择需要作为特征的数据列的名称,并用逗号隔开:f1,f2,f3,f4
(4)选择需要作为标签的数据列的名称:label
(5)选择需要作为数据名称的数据列的名称:name
(6)batchSize:16
(7)模型测试后参数在hive中保存的名称:dl.nn_test
(8)模型评估参数在hive中保存的名称:dl.nn_test_evaluation
4.保存结果
(1)dl.nn_test:
(2)dl.nn_test_evaluation