欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark-BigDl:深度学习之神经网络编写

程序员文章站 2022-05-09 13:49:19
...

BigDl主要实现了各种深度学习神经网络算法,当然也可以构建简单的神经网络。

 

一、下载依赖包和初始化系统

1.第一步是下载适合本地spark版本的

https://github.com/intel-analytics/BigDL/wiki/Downloads

spark-BigDl:深度学习之神经网络编写

2.把上述步骤的解压包在linux上解压后,在spark中初始化,运行其bin/bigdl.sh。运行命令为:

source bigdl.sh

 

3.spark-shell中,要添加依赖包:

--driver-class-path bigdl-0.1.0-jar-with-dependencies.jar


二、神经网络算法

(一)神经网络nn模型训练

1.运行程序为:kingpoint.nn.NnTrain

2.程序

(1)神经网络模型:NnModel

package kingpoint.nn

import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.numeric.NumericFloat

/**
 * Created by llq on 2017/6/16.
 */
object NnModel {
  /**
   * 神经网络训练层参数设置
   * @param layerNumber
   * @return
   */
  def apply(layerNumber: String,neuralNetNumber:String): Module[Float] = {

    val layerInt=layerNumber.toInt
    val neuralNetArray=neuralNetNumber.split(",").map(_.toInt)

    val model = Sequential()
    //添加第一层
    model.add(Reshape(Array(neuralNetArray(0))))

    //循环添加到output层的前一层
    for(i<- 0 to layerInt-3){
      model.add(Linear(neuralNetArray(i), neuralNetArray(i+1)))
        .add(Tanh())
    }

    //添加output层
    model.add(Linear(neuralNetArray(layerInt-2), neuralNetArray(layerInt-1)))
      .add(LogSoftMax())
  }
}

(2)模型训练:NnTrain

package kingpoint.nn

import java.io.File
import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.dataset.DataSet.SeqFileFolder
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.dataset.{MiniBatch, Sample, ByteRecord, DataSet}
import com.intel.analytics.bigdl.nn.ClassNLLCriterion
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter, T}
import kingpoint.lenet5.{modelNameAccuary, LeNet5, LabeledDataFileName}
import org.apache.hadoop.io.Text
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.hive.HiveContext

import scala.collection.mutable.ArrayBuffer


/**
 * 从HDFS上读取图片文件Seq
 * Created by llq on 2017/6/6.
 */
object NnTrain {
  LoggerFilter.redirectSparkInfoLogs()
  Logger.getLogger("com.intel.analytics.bigdl.optim").setLevel(Level.INFO)

  /**
   * 读取数据,形成Sample[Float]
   * @param trainLabelDataRdd
   * @return
   */
  def inLoad(trainLabelDataRdd:RDD[Row]): RDD[Sample[Float]]={
    trainLabelDataRdd.mapPartitions{iter=>
      iter.map{row=>
        var dataBuffer=new ArrayBuffer[Float]()
        val data=row.mkString(",").split(",").map(_.toFloat)
        dataBuffer ++= data
        dataBuffer.remove(0,1)
        //获得label值,并形成tensor
        val labelTensor=Tensor[Float](Array(data(0)),Array(1,1))
        //获得数据值,并形成Tensor
        val trainTensor=Tensor[Float](dataBuffer.toArray,Array(1,dataBuffer.length))

        Sample(trainTensor,labelTensor)
      }
    }
  }

  /**
   * 遍历model保存路径,提取最后一次迭代的结果
   * @param file
   */
  def lsLinuxCheckPointPath(file:File): String ={
    val modelPattern="model".r
    val numberPattern="[0-9]+".r
    var epcho=0
    if(file.isDirectory){
      val fileArray=file.listFiles()
      for(i<- 0 to fileArray.length-1){
        //识别出model
        if(modelPattern.findFirstIn(fileArray(i).getName).mkString(",")!=""){
          //取出最大一次的迭代值
          val epchoNumber=numberPattern.findFirstIn(fileArray(i).getName).mkString(",").toInt
          if(epchoNumber>epcho){
            epcho=epchoNumber
          }
        }
      }
    }else{
      throw new Exception("the path is not right")
    }
    "model."+epcho
  }

  /**
   * 主方法,读取hive data,并训练NN模型
   * @param args
   */
  def main (args: Array[String]){
    val conf = Engine.createSparkConf()
      .setAppName("kingpoint.nn.NnTrain")
    val sc = new SparkContext(conf)
    val hiveContext=new HiveContext(sc)
    Engine.init

    /**
     * 参数设置
     */
    if(args.length<12){
      System.err.println("Error:the parameter is less than 12")
      System.exit(1)
    }
    //读取hive中数据(dl.iris_train)
    val databaseTableName=args(0)
    //feature name(f1,f2,f3,f4)
    val featureName=List(args(1).split(","):_*)
    //label name(label)
    val labelName=args(2)
    //设置分割数据集的比例:训练集和验证集比例(8,2)
    val trainValidationRatio=args(3)

    //神经网络参数配置
    val layerNumber=args(4)                    //神经网络的层数:4
    val neuralNetNumber=args(5)                //神经网络每一层的神经元个数(4,100,10,3)
    val batchSize=args(6).toInt                //batch size(4)
    val learningRate=args(7).toDouble          //学习率(0.01)
    val learningRateDecay=args(8).toDouble     //(0.0)
    val maxEpoch=args(9).toInt                 //设置最大Epoch值为多少之后停止.(1)
    val modelSave=args(10)                     //模型保存路径(/root/data/model)
    val outputTableName=args(11)               //模型训练后参数在hive中保存的名称(dl.nn_train)

    /**
     * 读取数据,形成sample格式
     */
    //合并feature和label名称
    val featureLabelName=labelName::featureName

    //读取hive数据
    val originData=hiveContext.sql("select * from "+databaseTableName+" where "+featureName(0)+" !='"+featureName(0)+"'")
    //train data+label data
    val trainLabelDataRdd=originData.select(featureLabelName.head,featureLabelName.tail:_*).rdd
    trainLabelDataRdd.repartition(32)

    //分割训练集和验证集
    val trainRatio=trainValidationRatio.split(",")(0).toInt
    val validataionRatio=trainValidationRatio.split(",")(1).toInt
    val dataRdd=trainLabelDataRdd.randomSplit(Array(trainRatio,validataionRatio))
    val trainSetDataRdd=dataRdd(0)
    val valitationDataRdd=dataRdd(1)

    //形成Sample训练数据+验证数据
    val trainSampleData=inLoad(trainSetDataRdd)
    val valitationSampleData=inLoad(valitationDataRdd)
    /**
     * 模型参数设置和训练
     */
    //建立lenet5模型,并且设置相应的参数
    val model = NnModel(layerNumber,neuralNetNumber)

    //设置学习率(梯度下降的时候用到)
    val state =
      T(
        "learningRate" -> learningRate,
        "learningRateDecay" -> learningRateDecay
      )

    //模型参数设置;训练集;根据输出误差更新权重
    val optimizer = Optimizer(
      model = model,
      sampleRDD = trainSampleData,
      criterion = new ClassNLLCriterion[Float](),
      batchSize=batchSize)

    optimizer.setCheckpoint(modelSave, Trigger.everyEpoch)

    //开始训练模型:设置验证集;学习率;设置迭代次数;开始训练触发
    optimizer
      .setValidation(
        trigger = Trigger.everyEpoch,
        sampleRDD = valitationSampleData,
        vMethods = Array(new Top1Accuracy, new Loss[Float]),
        batchSize=batchSize)
      .setState(state)
      .setEndWhen(Trigger.maxEpoch(maxEpoch))    //设置最大Epoch值为多少之后停止。
      .optimize()

    //模型结构输出
    println(model.evaluate())

    //遍历model名称,取出最后一次迭代的model名字。再合并成全路径
    val modelEpochFile=optimizer.getCheckpointPath().get+"/"+lsLinuxCheckPointPath(new File(optimizer.getCheckpointPath().get))

    //获得准确率
    val valitationFeatureMiniBatch=valitationSampleData.map(sample=>MiniBatch[Float](sample.feature(),sample.label()))
    val validator = Validator(model, DataSet.rdd(valitationFeatureMiniBatch))
    val result = validator.test(Array(new Top1Accuracy[Float]))

    /**
     * 模型路径和准确率存放
     */
    val modelNameAccuaryRdd=sc.parallelize(List(modelNameAccuary(modelEpochFile,result(0)._1.toString)))
    val modelNameAccuaryDf=hiveContext.createDataFrame(modelNameAccuaryRdd)

    //保存到hive中
    modelNameAccuaryDf.show()
    modelNameAccuaryDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName)

  }
}

3.执行命令:

spark-submit \
--master local[4] \
--driver-class-path /root/data/dlLibs/lib/bigdl-0.1.0-jar-with-dependencies.jar \
--class "kingpoint.nn.NnTrain" /root/data/SparkBigDL.jar \
dl.iris_train \
f1,f2,f3,f4 \
label \
8,2 \
4 \
4,100,10,3 \
4 \
0.01 \
0.0 \
25 \
/root/data/model \
dl.nn_train

(1)读取hive中数据:dl.iris_train

(2)选择需要作为特征的数据列的名称,并用逗号隔开:f1,f2,f3,f4

(3)选择需要作为标签的数据列的名称:label

(4)设置分割数据集的比例:训练集和验证集比例:8,2

(5)神经网络的层数:4

(6)神经网络每一层的神经元个数:4,100,10,3

(7)batch size4

(8)学习率:0.01

(9)learningRateDecay0.0

(10)设置最大Epoch值为多少之后停止:1

(11)模型保存路径:/root/data/model

(12)模型训练后参数在hive中保存的名称:dl.nn_train


4.保存结果和输出:

    保存在hive里面,输出字段为:模型保存路径(modelName+验证集的准确率(accuary)。如下图所示:(注意,当需要测试模型时,需要查看modelName的值,并把这个值作为参数填写到测试模型时的参数当中)

spark-BigDl:深度学习之神经网络编写

(二)神经网络nn模型测试

1.运行程序为:kingpoint.nn.NnTrain

2.程序:

(1)DataRowToFloatRecords

package kingpoint.nn

import com.intel.analytics.bigdl.dataset.Transformer
import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import scala.collection.Iterator
import scala.collection.mutable.ArrayBuffer

/**
 * 转换Row=》FloatRecords
 * Created by llq on 2017/6/21.
 */

/**
 * 存放数据值和label值
 * @param data
 * @param label
 */
case class FloatRecords(data:Array[Float],label:Float)

object DataRowToFloatRecords {
  val logger = Logger.getLogger(getClass)

  def apply(featureName: List[String], labelName:String): DataRowToFloatRecords = {
    new DataRowToFloatRecords(featureName,labelName)
  }
}

/**
 * transform [[Row]] to [[FloatRecords]]
 * @param featureName column name
 * @param labelName label name
 */
class DataRowToFloatRecords(featureName: List[String],labelName:String)
  extends Transformer[Row, FloatRecords] {

  override def apply(prev: Iterator[Row]): Iterator[FloatRecords] = {
    prev.map(
      data => {
        val dataFloatArrayBuffer=new ArrayBuffer[Float]()
        for(i<- 0 to featureName.length-1){
          dataFloatArrayBuffer +=data.getAs[String](featureName(i)).toFloat
        }
        FloatRecords(dataFloatArrayBuffer.toArray, data.getAs[String](labelName).toFloat)
      }
    )
  }
}

(2)FloatRecordsToVector

package kingpoint.nn

import com.intel.analytics.bigdl.dataset.Transformer
import org.apache.log4j.Logger
import org.apache.spark.mllib.linalg.DenseVector
import scala.collection.Iterator

/**
 * FloatRecords =>DenseVector
 * Created by llq on 2017/6/21.
 */
object FloatRecordsToVector {
  val logger = Logger.getLogger(getClass)

  def apply(): FloatRecordsToVector = {
    new FloatRecordsToVector()
  }
}

/**
 * Convert data to (label,denseVector) of spark mllib
 */
class FloatRecordsToVector()
  extends Transformer[FloatRecords, (Float,DenseVector)] {

  override def apply(prev: Iterator[FloatRecords]): Iterator[(Float,DenseVector)] = {
    prev.map(
      data => {
        (data.label,new DenseVector(data.data.map(_.toDouble)))
      }
    )
  }
}


(3)NnTest

package kingpoint.nn

import com.intel.analytics.bigdl.dataset.Transformer
import com.intel.analytics.bigdl.nn.Module
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter}
import kingpoint.lenet5.LenetTest
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.{SaveMode, Row, DataFrame}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.ml.{DLClassifier => SparkDLClassifier}

/**
 * 神经网络模型NN测试
 * Created by llq on 2017/6/16.
 */


/**
 * 数据预处理后,在工作流时存放图片信息:label+data+fileName
 * @param label
 * @param features
 * @param name
 */
case class LabeledDataFloatName(label:Float,features:DenseVector,name:String)

object NnTest {
  LoggerFilter.redirectSparkInfoLogs()
  Logger.getLogger("com.intel.analytics.bigdl.optim").setLevel(Level.INFO)

  /**
   * 工作流df转换
   * 合并:label+转换后的data(DenseVector)+name
   * @param data
   * @param f
   * @return
   */
  def transformDF(data: DataFrame, f: Transformer[Row, (Float,DenseVector)],dataName:String): DataFrame = {
    //利用工作流转换数据,形成RDD[(Float,DenseVector)]
    val vectorRdd = data.rdd.mapPartitions(f(_))
    //合并:转换后的数据+名字+label
    val dataRDD = data.rdd.zipPartitions(vectorRdd) { (a, b) =>
      b.zip(a.map(_.getAs[String](dataName)))
        .map(
          v => LabeledDataFloatName(v._1._1, v._1._2,v._2)
        )
    }
    data.sqlContext.createDataFrame(dataRDD)
  }


  def main(args: Array[String]) {
    val conf = Engine.createSparkConf()
      .setAppName("kingpoint.nn.NnTest")
    val sc = new SparkContext(conf)
    Engine.init
    val hiveContext = new HiveContext(sc)

    /**
     * 参数设置
     */
    //model路径(/root/data/model/20170621_112534/model.601)
    val modelPath=args(0)
    //读取hive中数据(dl.iris_test)
    val databaseTableName=args(1)
    //feature name(f1,f2,f3,f4)
    val featureName=List(args(2).split(","):_*)
    //label name(label)
    val labelName=args(3)
    //数据名称:name
    val dataName=args(4)
    //batchSize(16)
    val batchShape=args(5).toInt
    //模型测试后参数在hive中保存的名称(dl.nn_test)
    val outputTableName=args(6)
    //模型评估参数在hive中保存的名称(dl.nn_test_evaluation)
    val outputTableNameEvaluation=args(7)

    /**
     * 读取测试集数据
     */
    //合并feature和label名称
    val featureLabelName=dataName::labelName::featureName
    val originData=hiveContext.sql("select * from "+databaseTableName+" where "+featureName(0)+" !='"+featureName(0)+"'")

    //test data+label data:DataFrame
    val testLabelDataDf=originData.select(featureLabelName.head,featureLabelName.tail:_*)

    /**
     * 模型导入和测试
     */
    //导入模型
    val model =  Module.load[Float](modelPath)

    val valTrans = new SparkDLClassifier[Float]()
      .setInputCol("features")
      .setOutputCol("predict")

    val paramsTrans = ParamMap(
      valTrans.modelTrain -> model,
      valTrans.batchShape ->
        Array(batchShape, featureName.length))

    //数据集预处理
    val transf =DataRowToFloatRecords(featureName,labelName) ->
      FloatRecordsToVector()

    //形成预测结果DF
    val valDF = transformDF(testLabelDataDf, transf ,dataName)
    val testResult=valTrans.transform(valDF, paramsTrans).select("label","name","predict")
    testResult.show(30)

    //准确率,并形成df
    val countAccuracyDf=hiveContext.createDataFrame(sc.parallelize(Seq(LenetTest.evaluationAccuracy(testResult))))
    countAccuracyDf.show()

    /**
     * 结果保存
     */
    //保存到hive中
    testResult.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName)
    countAccuracyDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableNameEvaluation)
  }
}


3.执行命令:
spark-submit \
--master local[4] \
--driver-class-path /root/data/dlLibs/lib/bigdl-0.1.0-jar-with-dependencies.jar \
--class "kingpoint.nn.NnTest" /root/data/SparkBigDL.jar \
/root/data/model/20170621_112534/model.601 \
dl.iris_test \
f1,f2,f3,f4 \
label \
name \
16 \
dl.nn_test \
dl.nn_test_evaluation

(1)model路径:/root/data/model/20170621_112534/model.601

(2)读取hive中数据:dl.iris_test

(3)选择需要作为特征的数据列的名称,并用逗号隔开:f1,f2,f3,f4

(4)选择需要作为标签的数据列的名称:label

(5)选择需要作为数据名称的数据列的名称:name

(6)batchSize16

(7)模型测试后参数在hive中保存的名称:dl.nn_test

(8)模型评估参数在hive中保存的名称:dl.nn_test_evaluation

 

4.保存结果

(1)dl.nn_test

spark-BigDl:深度学习之神经网络编写
(2)dl.nn_test_evaluation

spark-BigDl:深度学习之神经网络编写