欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[Spark中移动平均法的实现]

程序员文章站 2022-07-14 13:03:24
...

一、 基本概念

       移动平均法是一种简单平滑预测技术,它的基本思想是:根据时间序列资料、逐项推移,依次计算包含一定项数的序时平均值,以反映短期趋势的方法。因此,当时间序列的数值由于受周期变动和随机波动的影响,起伏较大,不易显示出事件的发展趋势时,使用移动平均法可以消除这些因素的影响,显示出事件的发展方向与趋势(即趋势线),然后依趋势线分析预测序列的中短期趋势。

       移动平均法的应用比较广泛,尤其在股票,金融,期货等方向应用较多,通过计算移动平均值去预测短期内未来的走势等。同时,在企业中,企业通过实际数据值来预测未来一期或几期内公司产品的需求量、公司产能等的一种常用方法。移动平均法适用于即期预测

二、 移动平均法的表示方式

       移动平均法分为简单移动平均法和加权移动平均法,文本中主要介绍简单移动发在股票中的计算。

       首先要了解股票中的时间序列数据。时间序列数据表示一个变量在一段时间内的值,如1秒、1分钟、1小时、1天、1月、1季度或1年。我们可以以不严格地时间序列数据形式化表示为三元组序列:

            (k,t,v)

       这里的k是键(如股票代码),t是时间(天,小时,分钟或秒),v是相关联的值(如某一只骨片在时间点t的值)。一般地,只要在一段时间内记录相同的度量值,就会得到时间序列数据。例如,一个公司的股票的收盘价就是基于分钟、小时或天的时间序列数据。多个连续周期的时间序列数据平均值(按相同时间间隔得到的观察值,如每小时一次或每天一次)称为移动平均。

       简单移动平均算法的公式如下:

                             [Spark中移动平均法的实现]

三、移动平均法的Spark实现

3.1测试数据

     本文中的数据仅用于测试,数据不具有真实性,仅仅是为了实现移动平均法的计算使用。假设有股票的时间序列数据如下

股票代码,时间,收盘价

AA,2017-1-7,10.8

AA,2017-1-8,10.9

AA,2017-1-9,11

...,...,...

AA,2017-1-30,10.5

BB,2017-1-31,10.7

BB,2017-2-1,10.9

BB,2017-2-2,11.1

...,...,...

BB,2017-2-19,14.9

3.2内存中排序实现移动平均

/**
  * 在内存中进行排序计算移动平均值
  **/
object MovingAverageInMemory {
    def main(args: Array[String]): Unit = {
        if (args.length < 3) {
            println("Usage: MovingAverageInMemory <period> <input-path> <output-path>")
            sys.exit(1)
        }
        //移动宽度
        val period: Int = args(0).toInt
        //文件输入路径
        val inputPath: String = args(1)
        //输出路径
        val outputPath: String = args(2)

        val sparkConf: SparkConf = new SparkConf()
            .setMaster("local[1]")
            .setAppName("MovingAverageInMemory")
        //构建Spark上下文
        val sc: SparkContext = SparkContext.getOrCreate(sparkConf)
        //广播变量
        val brodcastPeriod: Broadcast[Int] = sc.broadcast(period)
        //读取文件原始数据
        val rawData: RDD[String] = sc.textFile(inputPath)
        val keyValue: RDD[(String, (String, Double))] = rawData.map(line => {
            val tokens = line.split(",")
            (tokens(0), (tokens(1), tokens(2).toDouble))
        })
        val groupValue: RDD[(String, List[(String, Double)])] = keyValue.combineByKey(
            (v: (String, Double)) => List(v),
            (c: List[(String, Double)], v: (String, Double)) => c :+ v,
            (c1: List[(String, Double)], c2: List[(String, Double)]) => c1 ::: c2
        )
        val movingAverage: RDD[(String, Seq[(String, Double)])] = groupValue.mapValues(values => {
            val dateFormat: SimpleDateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
            // 在内存中排序,对于大型数据集谨慎使用这样的排序
            val sortedValues: Seq[(Long, Double)] = values.map(s => (dateFormat.parse(s._1).getTime, s._2)).toSeq.sortBy(_._1)
            val queue: mutable.Queue[Double] = new scala.collection.mutable.Queue[Double]()
            for (tup <- sortedValues) yield {
                queue.enqueue(tup._2)
                if (queue.size > brodcastPeriod.value) {
                    queue.dequeue
                }
                (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))
            }
        })

        val formattedResult: RDD[String] = movingAverage.sortByKey().flatMap(kv => {
            kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
        })

        //保存结果
        //formattedResult.saveAsTextFile(outputPath)
        formattedResult.foreach(println)
        sc.stop()
    }
}

运行结果:

AA,2017-01-01,10.2
AA,2017-01-02,10.25
AA,2017-01-03,10.299999999999999
AA,2017-01-04,10.35
AA,2017-01-05,10.4
AA,2017-01-06,10.450000000000001
AA,2017-01-07,10.5
AA,2017-01-08,10.55
AA,2017-01-09,10.600000000000001
AA,2017-01-10,10.65
AA,2017-01-11,10.75
AA,2017-01-12,10.85
AA,2017-01-13,10.95
AA,2017-01-14,11.05
AA,2017-01-15,11.15
AA,2017-01-16,11.25
AA,2017-01-17,11.35
AA,2017-01-18,11.45
AA,2017-01-19,11.55
AA,2017-01-20,11.65
AA,2017-01-21,11.75
AA,2017-01-22,11.85
AA,2017-01-23,11.95
AA,2017-01-24,12.05
AA,2017-01-25,12.15
AA,2017-01-26,12.25
AA,2017-01-27,12.35
AA,2017-01-28,12.45
AA,2017-01-29,12.28
AA,2017-01-30,12.120000000000001
BB,2017-01-31,10.7
BB,2017-02-01,10.8
BB,2017-02-02,10.9
BB,2017-02-03,11.0
BB,2017-02-04,11.1
BB,2017-02-05,11.200000000000001
BB,2017-02-06,11.3
BB,2017-02-07,11.4
BB,2017-02-08,11.422222222222222
BB,2017-02-09,11.47
BB,2017-02-10,11.620000000000001
BB,2017-02-11,11.78
BB,2017-02-12,11.95
BB,2017-02-13,12.129999999999999
BB,2017-02-14,12.32
BB,2017-02-15,12.52
BB,2017-02-16,12.73
BB,2017-02-17,12.95
BB,2017-02-18,13.25
BB,2017-02-19,13.55

3.3自动义排序实现移动平均

/**
  * 自定义排序计算移动平均值
  **/
object MovingAverageCustomSort {
    def main(args: Array[String]): Unit = {
        if (args.length < 3) {
            println("Usage: MovingAverageCustomSort <period> <input-path> <output-path>")
            sys.exit(1)
        }
        //移动宽度
        val period: Int = args(0).toInt
        //输入路径
        val inputPath: String = args(1)
        //输出路径
        val outputPath: String = args(2)
        //分区数
        val numPartitions: Int = 4
        val sparkConf: SparkConf = new SparkConf()
            .setMaster("local[2]")
            .setAppName("MovingAverageCustomSort")
        //构建Spark上下文
        val sc: SparkContext = SparkContext.getOrCreate(sparkConf)

        val brodcastPeriod: Broadcast[Int] = sc.broadcast(period)
        //读取原始文件数据
        val rawData: RDD[String] = sc.textFile(inputPath)

        // Key contains part of value (closing date in this case)
        val valueTokey: RDD[(CompositeKey, TimeSeriesData)] = rawData.map(line => {
            val tokens = line.split(",")
            val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
            val timestamp = dateFormat.parse(tokens(1)).getTime
            (CompositeKey(tokens(0), timestamp), TimeSeriesData(timestamp, tokens(2).toDouble))
        })

        //二次排序
        val sortedData: RDD[(CompositeKey, TimeSeriesData)] =
            valueTokey.repartitionAndSortWithinPartitions(new CompositeKeyPartitioner(numPartitions))

        val groupData: RDD[(String, Iterable[TimeSeriesData])] = sortedData.map(k => (k._1.stockSymbol, (k._2))).groupByKey()

        val movingAverage: RDD[(String, Iterable[(String, Double)])] = groupData.mapValues(values => {
            val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
            val queue = new scala.collection.mutable.Queue[Double]()
            for (timeSeriesData <- values) yield {
                queue.enqueue(timeSeriesData.closingStockPrice)
                if (queue.size > brodcastPeriod.value) {
                    queue.dequeue
                }
                (dateFormat.format(new java.util.Date(timeSeriesData.timeStamp)), (queue.sum / queue.size))
            }
        })

        val formattedResult: RDD[String] = movingAverage.sortByKey().flatMap(kv => {
            kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
        })
        //保存结果
        //formattedResult.saveAsTextFile(outputPath)
        formattedResult.foreach(println)
        // done
        sc.stop()
    }
}


/**
  * 定义时间序列类
  **/
case class TimeSeriesData(timeStamp: Long, closingStockPrice: Double)

/**
  * 自定义排序复合类
  **/
case class CompositeKey(stockSymbol: String, timeStamp: Long)

object CompositeKey {
    implicit def ordering[A <: CompositeKey]: Ordering[A] = {
        Ordering.by(fk => (fk.stockSymbol, fk.timeStamp))
    }
}

/**
  * 排序分区数定义
  **/
class CompositeKeyPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) must greater than 0.")

    //分区数
    def numPartitions: Int = partitions

    def getPartition(key: Any): Int = key match {
        case k: CompositeKey => math.abs(k.stockSymbol.hashCode % numPartitions)
        case null => 0
        case _ => math.abs(key.hashCode % numPartitions)
    }

    override def equals(other: Any): Boolean = other match {
        case h: CompositeKeyPartitioner => h.numPartitions == numPartitions
        case _ => false
    }

    override def hashCode: Int = numPartitions
}

运行结果:

AA,2017-01-01,10.2
AA,2017-01-02,10.25
AA,2017-01-03,10.299999999999999
AA,2017-01-04,10.35
AA,2017-01-05,10.4
AA,2017-01-06,10.450000000000001
AA,2017-01-07,10.5
AA,2017-01-08,10.55
AA,2017-01-09,10.600000000000001
AA,2017-01-10,10.65
AA,2017-01-11,10.75
AA,2017-01-12,10.85
AA,2017-01-13,10.95
AA,2017-01-14,11.05
AA,2017-01-15,11.15
AA,2017-01-16,11.25
AA,2017-01-17,11.35
AA,2017-01-18,11.45
AA,2017-01-19,11.55
AA,2017-01-20,11.65
AA,2017-01-21,11.75
AA,2017-01-22,11.85
AA,2017-01-23,11.95
AA,2017-01-24,12.05
AA,2017-01-25,12.15
AA,2017-01-26,12.25
AA,2017-01-27,12.35
AA,2017-01-28,12.45
AA,2017-01-29,12.28
AA,2017-01-30,12.120000000000001
BB,2017-01-31,10.7
BB,2017-02-01,10.8
BB,2017-02-02,10.9
BB,2017-02-03,11.0
BB,2017-02-04,11.1
BB,2017-02-05,11.200000000000001
BB,2017-02-06,11.3
BB,2017-02-07,11.4
BB,2017-02-08,11.422222222222222
BB,2017-02-09,11.47
BB,2017-02-10,11.620000000000001
BB,2017-02-11,11.78
BB,2017-02-12,11.95
BB,2017-02-13,12.129999999999999
BB,2017-02-14,12.32
BB,2017-02-15,12.52
BB,2017-02-16,12.73
BB,2017-02-17,12.95
BB,2017-02-18,13.25
BB,2017-02-19,13.55

四、 移动平均的特点及存在问题

4.1 特点

    1、移动平均对原序列有修匀或平滑的作用,使得原序列的上下波动被削弱了,而且平均的时距项数N越大,对数列的修匀作用越强。

    2、移动平均时距项数N为奇数时,只需一次移动平均,其移动平均值作为移动平均项数的中间一期的趋势代表值;而当移动平均项数N为偶数时,移动平均值代表的是这偶数项的中间位置的水平,无法对正某一时期,则需要在进行一次相临两项平均值的移动平均,这才能使平均值对正某一时期,这称为移正平均,也成为中心化的移动平均数。

    3、当序列包含季节变动时,移动平均时距项数N应与季节变动长度一致,才能消除其季节变动;若序列包含周期变动时,平均时距项数N应和周期长度基本一致,才能较好的消除周期波动 [1]  

    4、移动平均的项数不宜过大。

4.2存在问题

    1移动平均值并不能总是很好地反映出趋势。由于是平均值,预测值总是停留在过去的水平上而无法预计会导致将来更高或更低的波动;

    2移动平均法要由大量的过去数据的记录;

    3、它通过引进愈来愈期的新数据,不断修改平均值,以之作为预测值。

    移动平均法的基本原理,是通过移动平均消除时间序列中的不规则变动和其他变动,从而揭示出时间序列的长期趋势。

[Spark中移动平均法的实现]

相关标签: 移动平均法