模型学习笔记08-时间序列模型

时间序列是按时间顺序排列的、随时间变化且相互关联的数据序列。分析时间序列的方法构成数据分析的一个重要领域,即时间序列分析。时间序列根据所研究的依据不同,可有不同的分类。

  • 按所研究的对象的多少分,有一元时间序列和多元时间序列。
  • 按时间的连续性可将时间序列分为离散时间序列和连续时间序列两种。
  • 按序列的统计特性分,有平稳时间序列和非平稳时间序列。如果一个时间序列的概率分布与时间t 无关,则称该序列为严格的(狭义的)平稳时间序列。如果序列的一、二阶矩存在,而且对任意时刻t 满足:
    • 均值为常数
    • 协方差为时间间隔τ 的函数。
  • 则称该序列为宽平稳时间序列,也叫广义平稳时间序列。我们以后所研究的时间序列主要是宽平稳时间序列。
  • 按时间序列的分布规律来分,有高斯型时间序列和非高斯型时间序列。

时间序列分析方法概述

时间序列预测技术就是通过对预测目标自身时间序列的处理,来研究其变化趋势的。一个时间序列往往是以下几类变化形式的叠加或耦合。

  • 长期趋势变动。它是指时间序列朝着一定的方向持续上升或下降,或停留在某一水平上的倾向,它反映了客观事物的主要变化趋势。
  • 季节变动。
  • 循环变动。通常是指周期为一年以上,由非季节因素引起的涨落起伏波形相似的波动。
  • 不规则变动。通常它分为突然变动和随机变动。

通常用T_t表示长期趋势项,S_t表示季节变动趋势项,C_t表示循环变动趋势项,R_t表示随机干扰项。常见的时间序列模型有以下几种类型:

  • 加法模型
    • y_t=T_t+S_t+C_t+R_t
  • 乘法模型
    • y_t=T_t·S_t·C_t·R_t
  • 混合模型
    • y_t=T_t·S_t+R_t
    • y_t=S_t+T_t·C_t·R_t

其中t_y是观测目标的观测记录,E(R_t)=0 ,E(R_t^2)=σ^2。如果在预测时间范围以内,无突然变动且随机变动的方差σ^2较小,并且有理由认为过去和现在的演变趋势将继续发展到未来时,可用一些经验方法进行预测。

移动平均法

移动平均法是根据时间序列资料逐渐推移,依次计算包含一定项数的时序平均数,以反映长期趋势的方法。当时间序列的数值由于受周期变动和不规则变动的影响,起伏较大,不易显示出发展趋势时,可用移动平均法,消除这些因素的影响,分析、预测序列的长期趋势。移动平均法有简单移动平均法,加权移动平均法,趋势移动平均法等。

简单移动平均法

公式

设观测的数列为y_1,...,y_T,取移动平均的项数N<T。一次简单移动平均值计算公式为:

\begin{aligned}
M_t^{(1)}
&=\frac{1}{N}(y_t+y_{t-1}+...+y_{t-N+1})\\
&=\frac{1}{N}(y_{t-1}+...+y_{t-N})+\frac{1}{N}(y_t-y_{t-N})\\
&=M_{t-1}^{(1)}+\frac{1}{N}(y_t-y_{t-N})\\
\end{aligned}

当预测目标的基本趋势是在某一水平上下波动时,可用一次简单移动平均方法建立预测模型:

\hat{y}_{t+1}=M^{(1)}_t=\frac{1}{N}(y_t+y_{t-1}+...+y_{t-N+1}),t=N,N+1,...,T

其预测标准误差为:

S=\sqrt{\frac{\sum^T_{t=N+1}(\hat y_t-y_t)^2}{T-N}}

最近 N 期序列值的平均值作为未来各期的预测结果。一般 N 取值范围:5 ≤ N ≤ 200 。当历史序列的基本趋势变化不大且序列中随机变动成分较多时, N 的取值应较大一些。否则 N 的取值应小一些。在有确定的季节变动周期的资料中,移动平均的项数应取周期长度。选择最佳 N 值的一个有效方法是,比较若干模型的预测误差。预测标准误差最小者为好。

计算示例

某企业 1 月~11 月份的销售收入时间序列如表所示。试用一次简单滑动平均法预测第 12 月份的销售收入。

月份1234567891011
收入533.8574.6606.9649.8705.1722.0816.4892.7963.91015.11102.7
class TimingModel:
    def getTimeSeries(self,ts):
        if type(ts)!=type(None):
            return ts
        else:
            return np.arange(0,self.t+1)
    def __init__(self,arr:np.ndarray,time_series=None) -> None:
        self.arr=arr/1.                                     # 时间序列数据
        self.t=self.arr.size                                # 时间序列长度
        self.time_series=self.getTimeSeries(time_series)    # 时间序列
class SimpleMovingAverage(TimingModel):    
    def simpleMovingAverage(self):
        M=np.array([],dtype=np.float64)
        t=0
        while t<self.t+1:
            M=np.insert(M,M.size,np.sum(self.arr[t-self.N:t])/self.N)
            t+=1
        M[M==0]=np.nan
        return M
    def calStdError(self):
        return np.sqrt(np.sum(np.power(self.M[self.N:-1]-self.arr[self.N:],2))/(self.t-self.N))
    def __init__(self,arr:np.ndarray,N,time_series=None) -> None:
        super().__init__(arr,time_series)
        self.N=N                                            # 移动平均的项数
        self.M=self.simpleMovingAverage()                   # 一次预测数列
        self.Mt=self.M[-1:][0]                              # 一次预测值
        self.std_err=self.calStdError()                     # 预测标准误差                           
    def show(self,lite=False):
        print(f'N={self.N}的预测结果如下:')
        raw_data=np.insert(self.arr,self.t,np.nan).reshape(1,self.t+1)
        est_data=self.M.reshape(1,self.M.size)
        columns=self.time_series
        index=['观测值','预测值']
        data=np.r_[raw_data,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'预测结果为:{round(self.Mt,4)},标准误差为{round(self.std_err,4)}')
data=np.array([533.8,574.6,606.9,649.8,705.1,722.0,816.4,892.7,963.9,1015.1,1102.7])
ts=['JAN','FEB','MAR','APR','MAY','JUN','JUL','AUG','SEPT','OCT','NOV','DEC']
profit=SimpleMovingAverage(data,4,time_series=ts)
profit.show()
N=4的预测结果如下:
JAN	FEB	MAR	APR	MAY	JUN	JUL	AUG	SEPT	OCT	NOV	DEC
观测值	533.8	574.6	606.9	649.8	705.100	722.0	816.40	892.700	963.90	1015.10	1102.700	NaN
预测值	NaN	NaN	NaN	NaN	591.275	634.1	670.95	723.325	784.05	848.75	922.025	993.6
预测结果为:993.6,标准误差为152.6845

简单移动平均法只适合做近期预测,而且是预测目标的发展趋势变化不大的情况。如果目标的发展趋势存在其它的变化,采用简单移动平均法就会产生较大的预测偏差和滞后。

加权移动平均法

公式

在简单移动平均公式中,每期数据在求平均时的作用是等同的。但是,每期数据所包含的信息量不一样,近期数据包含着更多关于未来情况的信息。因此,把各期数据等同看待是不尽合理的,应考虑各期数据的重要性,对近期数据给予较大的权重,这就是加权移动平均法的基本思想。

设观测的数列为y_1,...,y_T,加权移动平均公式为:

M_{tw}=\frac{w_1y_t+w_2y_{t-1}+...+w_Ny_{t-N+1}}{w_1+w_2+...+w_N}

式中M_{tw}为t期加权移动平均数;w_iy_{t-i+1}的权数,它体现了相应的y_t在加权平均数中的重要性。利用加权移动平均数来做预测,其预测公式为:

\hat y_{t+1}=M_{tw}

即以第t期加权移动平均数作为第t+1期的预测值。

计算示例

我国1979~1988年原煤产量如表所示,试用加权移动平均法预测1989年的产量。

年份1979198019811982198319841985198619871988
原煤产量6.356.206.226.667.157.898.728.949.289.8
class WeightedMovingAverage(TimingModel):
    def weightedMovingAverage(self):
        M=np.zeros(self.N)
        t=self.N
        while t<self.t+1:
            M=np.insert(M,M.size,(np.sum(np.flip(self.arr[t-self.N:t])*self.weight))/np.sum(self.weight))
            t+=1
        M[M==0]=np.nan
        return M
    def calRelativeError(self):
        return (self.arr-self.M[:-1])/self.arr
    def __init__(self,arr:np.ndarray,weight:np.ndarray,time_series=None) -> None:
        super().__init__(arr,time_series)
        self.weight=weight                                  # 分配权重
        self.N=self.weight.size                             # 权重长度
        self.M=self.weightedMovingAverage()                 # 一次预测数列
        self.Mt=self.M[-1:][0]                              # 一次预测结果
        self.relative_err=self.calRelativeError()           # 相对误差
        self.avg_rel_err=np.nanmean(self.relative_err)      # 平均相对误差
        self.adj_Mt=self.Mt/(1-self.avg_rel_err)            # 修正预测结果
    def show(self,lite=False):
        print(f'取weight={self.weight}的预测结果:')
        raw_data=np.insert(self.arr,self.t,np.nan).reshape(1,self.t+1)
        est_data=self.M.reshape(1,self.M.size)
        rel_err=np.insert(self.relative_err,self.t,np.nan).reshape(1,self.t+1)
        data=np.r_[raw_data,est_data,rel_err*100]
        columns=self.time_series
        index=['观测值','预测值','相对误差%']
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'平均相对误差={round(self.avg_rel_err*100,4)}%,修正预测值={round(self.adj_Mt,4)}')
data=np.array([6.35,6.20,6.22,6.66,7.15,7.89,8.72,8.94,9.28,9.8])
weight=np.array([3,2,1])
ts=np.arange(1979,1990)
coal=WeightedMovingAverage(data,weight,time_series=ts)
coal.show()
取weight=[3 2 1]的预测结果:
1979	1980	1981	1982	1983	1984	1985	1986	1987	1988	1989
观测值	6.35	6.2	6.22	6.6600	7.1500	7.8900	8.7200	8.9400	9.2800	9.8000	NaN
预测值	NaN	NaN	NaN	6.2350	6.4367	6.8317	7.4383	8.1817	8.6917	9.0733	9.4833
相对误差%	NaN	NaN	NaN	6.3814	9.9767	13.4136	14.6980	8.4825	6.3398	7.4150	NaN
平均相对误差=9.5296%,修正预测值=10.4822

在加权移动平均法中, w_t的选择,同样具有一定的经验性。一般的原则是:近期数据的权数大,远期数据的权数小。至于大到什么程度和小到什么程度,则需要按照预测者对序列的了解和分析来确定。

趋势移动平均法

公式

简单移动平均法和加权移动平均法,在时间序列没有明显的趋势变动时,能够准确反映实际情况。但当时间序列出现直线增加或减少的变动趋势时,用简单移动平均法和加权移动平均法来预测就会出现滞后偏差。因此,需要进行修正,修正的方法是作二次移动平均,利用移动平均滞后偏差的规律来建立直线趋势的预测模型。这就是趋势移动平均法。

一次移动的平均数为:

M^{(1)}_t=\frac{1}{N}(y_t+y_{t-1}+...+y_{t-N+1})

在一次移动平均的基础上再进行一次移动平均就是二次移动平均,其计算公式为:

M^{(2)}_t=\frac{1}{N}(M^{(1)}_t+M^{(1)}_{t-1}+...+M^{(1)}_{t-N+1})=M_{t-1}^{(2)}+\frac{1}{N}(M^{(1)}_t-M^{(1)}_{t-N})

下面讨论如何利用移动平均的滞后偏差建立直线趋势预测模型。

设时间序列{y_t}从某时期开始具有直线趋势,且认为未来时期也按此直线趋势变化,则可设此直线趋势预测模型为:

\hat y_{t+m}=a_t+b_tm,\ m=1,2,...

其中t 为当前时期数;m为由t 至预测期的时期数;a_t为截距;b_t为斜率。两者又称为平滑系数。

现在,我们根据移动平均值来确定平滑系数。由上式可知:

\begin{aligned}
&a_t=y_t\\
&y_{t-1}=y_t-b_t\\
&y_{t-2}=y_t-2b_t\\
&...\\
&y_{t-N+1}=y_t-(N-1)b_t
\end{aligned}

可以推得:

M^{(1)}_t=\frac{1}{N}(y_t+y_{t-1}+...+y_{t-N+1})=y_t-\frac{N-1}{2}

因此:

y_t-M_t^{(1)}=\frac{N-1}{2}b_t\\\ \\
y_{t-1}-M_{t-1}^{(1)}=\frac{N-1}{2}b_t

所以:

y_t-y_{t-1}=M_t^{(1)}-M_{t-1}^{(1)}=b_t\\\ \\
M_t^{(1)}-M_t^{(2)}=\frac{N-1}{2}b_t

于是可得平滑系数的计算公式为:

\left\{\begin{aligned}
&a_t=2M_t^{(1)}-M_t^{(2)}\\
&b_t=\frac{2}{N-1}(M_t^{(1)}-M_t^{(2)})
\end{aligned}\right.

计算示例

我国1965~1985年的发电总量如表所示,试预测1986年和1987年的发电总量。

年份t发电总量yt
19651676
19662825
19673774
19684716
19695940
197061159
197171384
197281524
197391668
1974101688
1975111958
1976122031
1977132234
1978142566
1979152820
1980163006
1981173093
1982183277
1983193514
1984203770
1985214107
class TrendMovingAverage(TimingModel):
    def getTimeSeries(self,ts):
        if type(ts)!=type(None):
            return ts
        else:
            return np.arange(0,self.t+2)
    def __init__(self,arr:np.ndarray,N,time_series=None) -> None:
        super().__init__(arr,time_series)
        self.N=N                                            # 移动平均的项数
        self.time_series=self.getTimeSeries(time_series)    # 时间序列
        self.M1=SimpleMovingAverage(self.arr,self.N).M
        self.M2=SimpleMovingAverage(self.M1,self.N).M
        self.M1t=self.M1[-1:][0]
        self.M2t=self.M2[-1:][0]
        self.a=2*self.M1t-self.M2t
        self.b=2*(self.M1t-self.M2t)/(self.N-1)
    def forecast(self,m=None,target=None):                      
        if type(target)!=type(None):
            this_m=target-int(self.time_series[len(self.time_series)-3])
        elif type(m)!=type(None):
            this_m=m
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return self.a+self.b*this_m
    def show(self,lite=False): 
        print(f'N={self.N}的预测结果如下:')
        raw_data=np.insert(self.arr,self.t,[np.nan,np.nan]).reshape(1,self.t+2)
        est1_data=np.insert(self.M1,self.M1.size,np.nan).reshape(1,self.t+2)
        est2_data=self.M2.reshape(1,self.t+2)
        columns=self.time_series
        index=['观测值','一次预测值','二次预测值']
        data=np.r_[raw_data,est1_data,est2_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'一次移动平均值={round(self.M1t,4)},二次移动平均值={round(self.M2t,4)}')
        print(f'预测模型为y_(t+m)={round(self.a,4)}+{round(self.b,4)}m')
        print(f'修正预测结果为:')
        print(f'    {self.time_series[len(self.time_series)-2]}={round(self.a+self.b,4)}')
        print(f'    {self.time_series[len(self.time_series)-1]}={round(self.a+2*self.b,4)}')
        print('可以使用forecast()方法进行进一步预测!')
file='E:\DEV\python\math\data\移动平均.xlsx'
data=pd.read_excel(file)
ts=np.arange(1965,1988)
arr=data['发电总量yt'].values

electricity=TrendMovingAverage(arr,6,time_series=ts)
electricity.show(lite=True)
print(electricity.forecast(target=1988))
N=6的预测结果如下:
1965	1966	1967	1968	1969	1970	1971	1972	1973	1974	...	1978	1979	1980	1981	1982	1983	1984	1985	1986	1987
观测值	676.0	825.0	774.0	716.0	940.0	1159.0	1384.0000	1524.0000	1668.0000	1688.0000	...	2566.0000	2820.0000	3006.0000	3093.0000	3277.0	3514.0000	3770.0000	4107.0000	NaN	NaN
一次预测值	NaN	NaN	NaN	NaN	NaN	NaN	848.3333	966.3333	1082.8333	1231.8333	...	1850.5000	2024.1667	2216.1667	2435.8333	2625.0	2832.6667	3046.0000	3246.6667	3461.1667	NaN
二次预测值	NaN	NaN	NaN	NaN	NaN	NaN	NaN	NaN	NaN	NaN	...	1324.5278	1471.8889	1628.7778	1792.8333	1966.5	2143.4167	2330.7222	2529.9722	2733.7222	2941.2222
3 rows × 23 columns

一次移动平均值=3461.1667,二次移动平均值=2941.2222
预测模型为y_(t+m)=3981.1111+207.9778m
修正预测结果为:
    1986=4189.0889
    1987=4397.0667
可以使用forecast()方法进行进一步预测!
4605.044444444444

指数平滑

一次移动平均实际上认为最近 N 期数据对未来值影响相同,都加权1/N;而N期以前的数据对未来值没有影响,加权为 0。但是,二次及更高次移动平均数的权数却不是1/N,且次数越高,权数的结构越复杂,但永远保持对称的权数,即两端项权数小,中间项权数大,不符合一般系统的动态性。一般说来历史数据对未来值的影响是随时间间隔的增长而递减的。所以,更切合实际的方法应是对各期观测值依时间顺序进行加权平均作为预测值。指数平滑法可满足这一要求,而且具有简单的递推形式。

指数平滑法根据平滑次数的不同,又分为一次指数平滑法、二次指数平滑法和三次指数平滑法等,分别介绍如下。

一次指数平滑

预测模型

设观测的数列为y_1,...,y_T,α为加权系数,0<α<1,一次指数平滑公式为:

S_t^{(1)}=αy_t+(1-α)S_{t-1}^{(1)}=S_{t-1}^{(1)}+α(y_t-S_{t-1}^{(1)})

以这种平滑值进行预测,就是一次指数平滑法。预测模型为:

\hat y_{t+1}=S_t^{(1)}=αy_t+(1-α)\hat y_t

也就是以第t期指数平滑值作为t +1期预测值。

加权系数的选择

在进行指数平滑时,加权系数的选择是很重要的。由预测模型可以看出,α 的大小规定了在新预测值中新数据和原预测值所占的比重。α 值越大,新数据所占的比重就愈大,原预测值所占的比重就愈小,反之亦然。若把预测模型改写为:

\hat y_{t+1}=\hat y_t+α(y_t-\hat y_t)

则从上式可看出,新预测值是根据预测误差对原预测值进行修正而得到的。α的大小则体现了修正的幅度,α值愈大,修正幅度愈大;α值愈小,修正幅度也愈小。

若选取α=0,则\hat y_{t+1}=\hat y_t,即下期预测值就等于本期预测值,在预测过程中不考虑任何新信息;若选取α=1,则\hat y_{t+1}=y_t,即下期预测值就等于本期观测值,完全不相信过去的信息。这两种极端情况很难做出正确的预测。因此,α值应根据时间序列的具体性质在0~1之间选择。具体如何选择一般可遵循下列原则:

  • 如果时间序列波动不大,比较平稳,则α应取小一点,如(0.1~0.5)。以减少修正幅度,使预测模型能包含较长时间序列的信息;
  • 如果时间序列具有迅速且明显的变动倾向,则α应取大一点,如(0.6~0.8)。使预测模型灵敏度高一些,以便迅速跟上数据的变化。

在实用上,类似移动平均法,多取几个α值进行试算,看哪个预测误差小,就采用哪个。

初始值的确定

用一次指数平滑法进行预测,除了选择合适的α外,还要确定初始值S_0^{(1)}。初始值是由预测者估计或指定的。当时间序列的数据较多,比如在20个以上时,初始值对以后的预测值影响很少,可选用第一期数据为初始值。如果时间序列的数据较少,在20个以下时,初始值对以后的预测值影响很大,这时,就必须认真研究如何正确确定初始值。一般以最初几期实际值的平均值作为初始值。

计算示例

某市 1976~1987 年某种电器销售额如表 4 所示。试用指数平滑法预测1988年该电器销售额。

年份197619771978197919801981198219831984198519861987
销售额505247514948514048525159
class ExponentialSmoothing(TimingModel):
    def __init__(self,arr:np.ndarray,alpha,init,time_series=None) -> None:
        super().__init__(arr,time_series)
        self.alpha=alpha
        self.init=init
class PrimaryExponentialSmoothing(ExponentialSmoothing):
    def primaryExponentialSmoothing(self):
        S=np.array([self.init],dtype=np.float64)
        for t in range(0,self.t):
            temp=S.size
            yt=S[-1:][0]
            S=np.insert(S,S.size,self.alpha*self.arr[t]+(1-self.alpha)*yt)
        return S
    def __init__(self,arr:np.ndarray,alpha,init,time_series=None) -> None:
        super().__init__(arr,alpha,init,time_series)
        self.S=self.primaryExponentialSmoothing()
        self.St=self.S[-1:][0]
        self.avg_err=np.mean(self.S[1:]-self.arr)
    def show(self,lite=False):
        print(f'alpha={self.alpha},init={round(self.init,4)}的预测结果如下:')
        raw_data=self.arr.reshape(1,self.t)
        est_data=self.S[1:].reshape(1,self.t)
        columns=self.time_series[:-1]
        index=['观测值','平滑值']
        data=np.r_[raw_data,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'预测结果为:{round(self.St,4)},平均误差为{round(self.avg_err,4)}')
data=np.array([50,52,47,51,49,48,51,40,48,52,51,59])
init=(data[0]+data[1])/2
alpha=0.2
ts=np.arange(1976,1989)
sale=PrimaryExponentialSmoothing(data,alpha,init,ts)
sale.show()
alpha=0.2,init=51.0的预测结果如下:
1976	1977	1978	1979	1980	1981	1982	1983	1984	1985	1986	1987
观测值	50.0	52.00	47.000	51.0000	49.0000	48.0000	51.0000	40.0000	48.0000	52.0000	51.0000	59.0000
平滑值	50.8	51.04	50.232	50.3856	50.1085	49.6868	49.9494	47.9595	47.9676	48.7741	49.2193	51.1754
预测结果为:51.1754,平均误差为-0.0585

二次指数平滑

公式

一次指数平滑法虽然克服了移动平均法的缺点。但当时间序列的变动出现直线趋势时,用一次指数平滑进行预测,仍存在明显的滞后偏差。因此,也必须加以修正。修正的方法与趋势移动平均法相同,即再作二次指数平滑,利用滞后偏差的规律建立直线趋势模型。这就是二次指数平滑法。其计算公式为:

S_t^{(1)}=αy_t+(1-α)S_{t-1}^{(1)}\\\ \\
S_t^{(2)}=αS_t^{(1)}+(1-α)S_{t-1}^{(2)}\\

式中S_t^{(1)}为一次指数的平滑值;S_t^{(2)}为二次指数的平滑值。当时间序列\{y_t\}从某时期开始具有直线趋势时,类似趋势移动平均法,可用直线趋势模型进行预测:

\hat y_{t+m}=a_t+b_tm,m=1,2,...\\\ \\
\left\{\begin{aligned}
&a_t=2S_t^{(1)}-S_t^{(2)}\\
&b_t=\frac{α}{1-α}(S_t^{(1)}-S_t^{(2)})
\end{aligned}\right.

计算示例

仍以我国1965~1985 年的发电总量资料为例,试用二次指数平滑法预测1986年和1987年的发电总量。

class QuadraticExponentialSmoothing(ExponentialSmoothing):
    def getTimeSeries(self,ts):
        if type(ts)!=type(None):
            return ts
        else:
            return np.arange(0,self.t+2)
    def __init__(self,arr:np.ndarray,alpha,init,time_series=None) -> None:
        super().__init__(arr,alpha,init,time_series)
        self.time_series=self.getTimeSeries(time_series)    # 时间序列
        self.S1=PrimaryExponentialSmoothing(self.arr,self.alpha,self.init).S[1:]
        self.S2=PrimaryExponentialSmoothing(self.S1,self.alpha,self.init).S[1:]
        self.S1t=self.S1[-1:][0]
        self.S2t=self.S2[-1:][0]
        self.a=2*self.S1t-self.S2t
        self.b=(self.S1t-self.S2t)*self.alpha/(1-self.alpha)
        self.est=(1+1/(1-self.alpha))*self.S1-(1/(1-self.alpha))*self.S2
        self.MSE=np.sum(np.power(self.arr-self.est,2))/self.t           # 均方误差
        self.MAPE=np.sum(np.abs((self.arr-self.est)/self.arr))/self.t   # 平均绝对误差
    def forecast(self,m=None,target=None):                      
        if type(target)!=type(None):
            this_m=target-int(self.time_series[len(self.time_series)-3])
        elif type(m)!=type(None):
            this_m=m
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return self.a+self.b*this_m   
    def show(self,lite=False): 
        print(f'alpha={self.alpha},init={round(self.init,4)}的预测结果如下:')
        raw_data=self.arr.reshape(1,self.t)
        est1_data=self.S1.reshape(1,self.t)
        est2_data=self.S2.reshape(1,self.t)
        est_data=np.insert(self.est,0,np.nan)[:-1].reshape(1,self.t)
        columns=self.time_series[:-2]
        index=['观测值','一次平滑值','二次平滑值','综合估计值']
        data=np.r_[raw_data,est1_data,est2_data,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'一次平滑值={round(self.S1t,4)},二次平滑值={round(self.S2t,4)}')
        print(f'预测模型为y_(t+m)={round(self.a,4)}+{round(self.b,4)}m,模型均方误差{round(self.MSE,4)},平均绝对误差{round(self.MAPE*100,4)}%')
        print(f'修正预测结果为:')
        print(f'    {self.time_series[len(self.time_series)-2]}={round(self.forecast(1),4)}')
        print(f'    {self.time_series[len(self.time_series)-1]}={round(self.forecast(2),4)}')
        print('可以使用forecast()方法进行进一步预测!')
file='E:\DEV\python\math\data\移动平均.xlsx'
data=pd.read_excel(file)
ts=np.arange(1965,1988)
arr=data['发电总量yt'].values
alpha=0.3
init=arr[0]
electricity=QuadraticExponentialSmoothing(arr,alpha,init,ts)
electricity.show(lite=True)
alpha=0.3,init=676的预测结果如下:
1965	1966	1967	1968	1969	1970	1971	1972	1973	1974	...	1976	1977	1978	1979	1980	1981	1982	1983	1984	1985
观测值	676.0	825.00	774.000	716.0000	940.0000	1159.0000	1384.0000	1524.0000	1668.0000	1688.0000	...	2031.0000	2234.0000	2566.0000	2820.0000	3006.0000	3093.0000	3277.0000	3514.0000	3770.0000	4107.0000
一次平滑值	676.0	720.70	736.690	730.4830	793.3381	903.0367	1047.3257	1190.3280	1333.6296	1439.9407	...	1726.0509	1878.4357	2084.7050	2305.2935	2515.5054	2688.7538	2865.2277	3059.8594	3272.9016	3523.1311
二次平滑值	676.0	689.41	703.594	711.6607	736.1639	786.2257	864.5557	962.2874	1073.6901	1183.5652	...	1432.7875	1566.4820	1721.9489	1896.9523	2082.5182	2264.3889	2444.6405	2629.2062	2822.3148	3032.5597
综合估计值	NaN	676.00	765.400	783.9700	757.3720	875.0155	1069.9094	1308.4256	1516.1002	1704.9718	...	2007.1517	2144.9987	2324.0838	2602.9280	2888.6381	3134.0586	3294.9894	3466.0664	3675.0782	3916.5969
4 rows × 21 columns

一次平滑值=3523.1311,二次平滑值=3032.5597
预测模型为y_(t+m)=4013.7025+210.2449m,模型均方误差10585.3928,平均绝对误差4.1903%
修正预测结果为:
    1986=4223.9474
    1987=4434.1923
可以使用forecast()方法进行进一步预测!

三次指数平滑

公式

当时间序列的变动表现为二次曲线趋势时,则需要用三次指数平滑法。三次指数平滑是在二次指数平滑的基础上,再进行一次平滑,其计算公式为:

S_t^{(1)}=αy_t+(1-α)S_{t-1}^{(1)}\\\ \\
S_t^{(2)}=αS_t^{(1)}+(1-α)S_{t-1}^{(2)}\\\ \\
S_t^{(3)}=αS_t^{(2)}+(1-α)S_{t-1}^{(3)}\\

预测模型为:

\hat y_{t+m}=a_t+b_tm+c_tm^2,m=1,2,...\\\ \\
\left\{\begin{aligned}
&a_t=3S_t^{(1)}-3S_t^{(2)}-S_t^{(3)}\\
&b_t=\frac{α^2}{2(1-α)^2}[(6-5α)S_t^{(1)}-2(5-4α)S_t^{(2)}+(4-3α)S_t^{(3)}]\\
&c_t=\frac{α^2}{2(1-α)^2}(S_t^{(1)}-2S_t^{(2)}+S_t^{(3)})
\end{aligned}\right.

计算示例

某省1978~1988年全民所有制单位固定资产投资总额如表 7 所示,试预测1989年和1990年固定资产投资总额。

年份19781979198019811982198319841985198619871988
投资总额20.0420.0625.7234.6151.7755.9280.65131.11148.58162.67232.26
class TripleExponentialSmoothing(QuadraticExponentialSmoothing):
    def calEst(self):
        A=(3-3*self.alpha+self.alpha**2)/(1-self.alpha)**2
        B=(3-self.alpha)/(1-self.alpha)**2
        C=1/(1-self.alpha)**2
        return A*self.S1-B*self.S2+C*self.S3
    def __init__(self,arr:np.ndarray,alpha,init,time_series=None) -> None:
        super().__init__(arr,alpha,init,time_series)
        self.S3=PrimaryExponentialSmoothing(self.S2,self.alpha,self.init).S[1:]
        self.S3t=self.S3[-1:][0]
        _alpha=self.alpha/(2*(1-self.alpha)**2)
        self.a=3*self.S1t-3*self.S2t+self.S3t
        self.b=_alpha*((6-5*self.alpha)*self.S1t-2*(5-4*self.alpha)*self.S2t+(4-3*self.alpha)*self.S3t)
        self.c=_alpha*self.alpha*(self.S1t-2*self.S2t+self.S3t)
        self.est=self.calEst()
        self.MSE=np.sum(np.power(self.arr-self.est,2))/self.t           # 均方误差
        self.MAPE=np.sum(np.abs((self.arr-self.est)/self.arr))/self.t   # 平均绝对误差
    def forecast(self,m=None,target=None):                      
        if type(target)!=type(None):
            this_m=target-int(self.time_series[len(self.time_series)-3])
        elif type(m)!=type(None):
            this_m=m
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return self.a+self.b*this_m+self.c*this_m**2
    def show(self,lite=False): 
        print(f'alpha={self.alpha},init={round(self.init,4)}的预测结果如下:')
        raw_data=self.arr.reshape(1,self.t)
        est1_data=self.S1.reshape(1,self.t)
        est2_data=self.S2.reshape(1,self.t)
        est3_data=self.S3.reshape(1,self.t)
        est_data=np.insert(self.est,0,np.nan)[:-1].reshape(1,self.t)
        columns=self.time_series[:-2]
        index=['观测值','一次平滑值','二次平滑值','三次平滑值','综合估计值']
        data=np.r_[raw_data,est1_data,est2_data,est3_data,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'一次平滑值={round(self.S1t,4)},二次平滑值={round(self.S2t,4)},三次平滑值={round(self.S3t,4)}')
        print(f'预测模型为y_(t+m)={round(self.a,4)}+{round(self.b,4)}m+{round(self.c,4)}m^2,'
              +f'模型均方误差{round(self.MSE,4)},平均绝对误差{round(self.MAPE*100,4)}%')
        print(f'修正预测结果为:')
        print(f'    {self.time_series[len(self.time_series)-2]}={round(self.forecast(1),4)}')
        print(f'    {self.time_series[len(self.time_series)-1]}={round(self.forecast(2),4)}')
        print('可以使用forecast()方法进行进一步预测!')
data=np.array([20.04,20.06,25.72,34.61,51.77,55.92,80.65,131.11,148.58,162.67,232.26])
ts=np.arange(1978,1991)
alpha=0.3
init=np.mean(data[:3])
investment=TripleExponentialSmoothing(data,alpha,init,ts)
investment.show(lite=True)
alpha=0.3,init=21.94的预测结果如下:
1978	1979	1980	1981	1982	1983	1984	1985	1986	1987	1988
观测值	20.0400	20.0600	25.7200	34.6100	51.7700	55.9200	80.6500	131.1100	148.5800	162.6700	232.2600
一次平滑值	21.3700	20.9770	22.3999	26.0629	33.7751	40.4185	52.4880	76.0746	97.8262	117.2793	151.7735
二次平滑值	21.7690	21.5314	21.7919	23.0732	26.2838	30.5242	37.1133	48.8017	63.5091	79.6401	101.2802
三次平滑值	21.8887	21.7815	21.7846	22.1712	23.4050	25.5408	29.0125	34.9493	43.5172	54.3541	68.4319
综合估计值	NaN	20.2300	19.5640	24.4942	34.5944	53.8901	64.5755	89.2963	142.4245	176.0860	196.2601
一次平滑值=151.7735,二次平滑值=101.2802,三次平滑值=68.4319
预测模型为y_(t+m)=219.912+38.3849m+1.6205m^2,模型均方误差266.7075,平均绝对误差8.9296%
修正预测结果为:
    1989=259.9174
    1990=303.1637
可以使用forecast()方法进行进一步预测!

指数平滑预测模型是以时刻t为起点,综合历史序列的信息,对未来进行预测的。选择合适的加权系数α 是提高预测精度的关键环节。根据实践经验,α 的取值范围一般以 0.1~0.3 为宜。α 值愈大,加权系数序列衰减速度愈快,所以实际上α取值大小起着控制参加平均的历史数据的个数的作用。α 值愈大意味着采用的数据愈少。因此,可以得到选择α值的一些基本准则:

  • 如果序列的基本趋势比较稳,预测偏差由随机因素造成,则α 值应取小一些,以减少修正幅度,使预测模型能包含更多历史数据的信息。
  • 如果预测目标的基本趋势已发生系统的变化,则α 值应取得大一些。这样,可以偏重新数据的信息对原模型进行大幅度修正,以使预测模型适应预测目标的新变化。

另外,由于指数平滑公式是递推计算公式,所以必须确定初始值。可以取前 3~5 个数据的算术平均值作为初始值。

差分指数平滑

在上节我们已经讲过,当时间序列的变动具有直线趋势时,用一次指数平滑法会出现滞后偏差,其原因在于数据不满足模型要求。因此,我们也可以从数据变换的角度来考虑改进措施,即在运用指数平滑法以前先对数据作一些技术上的处理,使之能适合于一次指数平滑模型,以后再对输出结果作技术上的返回处理,使之恢复为原变量的形态。差分方法是改变数据变动趋势的简易方法。下面我们讨论如何用差分方法来改进指数平滑法。

一阶差分指数平滑

公式

当时间序列呈直线增加时,可运用一阶差分指数平滑模型来预测。其公式如下:

\begin{aligned}
&∇y_t=y_t-y_{t-1} &(1)\\ 
&∇\hat y_{t+1}=α∇y_t+(1-α)∇\hat y_t &(2)\\
&\hat y_{t+1}=∇\hat y_{t+1}+y_t &(3)
\end{aligned}

其中的∇ 为差分记号。式(1)表示对呈现直线增加的序列作一阶差分,构成一个平稳的新序列;式(3)表示把经过一阶差分后的新序列的指数平滑预测值与变量当前的实际值迭加,作为变量下一期的预测值在前面我们已分析过,指数平滑值实际上是一种加权平均数。因此把序列中逐期增量的加权平均数(指数平滑值)加上当前值的实际数进行预测,比一次指数平滑法只用变量以往取值的加权平均数作为下一期的预测更合理。从而使预测值始终围绕实际值上下波动,从根本上解决了在有直线增长趋势的情况下,用一次指数平滑法所得出的结果始终落后于实际值的问题。

计算示例

某工业企业 1977~1986 年锅炉燃料消耗量资料如表 8 所示,试预测 1987 年的燃料消耗量。

年份1977197819791980198119821983198419851986
消耗量24262730323336404144
class FirstDifferenceExponentialSmoothing(ExponentialSmoothing):
    def calEstDif(self):
        est_dif=np.array([self.dif_arr[0]],dtype=np.float64)
        for t in range(1,self.t):
            res=self.alpha*self.dif_arr[t-1]+(1-self.alpha)*est_dif[t-1]
            est_dif=np.insert(est_dif,est_dif.size,res)
        return est_dif[1:]        
    def __init__(self,arr:np.ndarray,alpha,init,time_series=None) -> None:
        super().__init__(arr,alpha,init,time_series)
        self.dif_arr=self.arr[1:]-self.arr[:-1]     # 差分
        self.est_dif=self.calEstDif()               # 差分指数平滑值
        self.est=self.arr[1:]+self.est_dif          # 预测值
        self.est_vel=self.est[-1:][0]
        self.MSE=np.mean(np.power(self.est[:-1]-self.arr[2:],2))                    # 均方误差
        self.MAPE=np.mean(np.abs((self.arr[2:]-self.est[:-1])/self.arr[2:]))        # 平均绝对误差
    def show(self,lite=False):
        print(f'alpha={self.alpha},init={round(self.init,4)}的预测结果如下:')
        raw_data=self.arr.reshape(1,self.t)
        dif_arr=np.insert(self.dif_arr,0,np.nan).reshape(1,self.t)
        est_dif=np.insert(self.est_dif[:-1],0,[np.nan,np.nan]).reshape(1,self.t)
        est_data=np.insert(self.est[:-1],0,[np.nan,np.nan]).reshape(1,self.t)
        columns=self.time_series[:-1]
        index=['观测值','差分','差分平滑值','预测值']
        data=np.r_[raw_data,dif_arr,est_dif,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'预测结果为:{round(self.est_vel,4)},均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
data=np.array([24,26,27,30,32,33,36,40,41,44])
ts=np.arange(1977,1988)
alpha=0.4
init=data[0]
fuel=FirstDifferenceExponentialSmoothing(data,alpha,init,ts)
fuel.show()
alpha=0.4,init=24的预测结果如下:
1977	1978	1979	1980	1981	1982	1983	1984	1985	1986
观测值	24.0	26.0	27.0	30.0	32.00	33.000	36.0000	40.0000	41.0000	44.00
差分	NaN	2.0	1.0	3.0	2.00	1.000	3.0000	4.0000	1.0000	3.00
差分平滑值	NaN	NaN	2.0	1.6	2.16	2.096	1.6576	2.1946	2.9167	2.15
预测值	NaN	NaN	28.0	28.6	32.16	34.096	34.6576	38.1946	42.9167	43.15
预测结果为:46.49,均方误差为1.7056,平均绝对误差为3.3801%

二阶差分指数平滑

当时间序列呈现二次曲线增长时,可用二阶差分指数平滑模型来预测,计算公式如下:

\begin{aligned}
&∇y_t=y_t-y_{t-1} &(1)\\ 
&∇^2y_t=∇y_t-∇y_{t-1} &(2)\\ 
&∇^2\hat y_{t+1}=α∇^2y_t+(1-α)∇^2\hat y_t &(3)\\
&\hat y_{t+1}=∇^2\hat y_{t+1}+∇y_t+y_t &(4)
\end{aligned}

其中∇^2表示二阶差分。

差分方法和指数平滑法的联合运用,除了能克服一次指数平滑法的滞后偏差之外,对初始值的问题也有显著的改进。因为数据经过差分处理后,所产生的新序列基本上是平稳的。这时,初始值取新序列的第一期数据对于未来预测值不会有多大影响。其次,它拓展了指数平滑法的适用范围,使一些原来需要运用配合直线趋势模型处理的情况可用这种组合模型来取代。但是,对于指数平滑法存在的加权系数α 的选择问题,以及只能逐期预测问题,差分指数平滑模型也没有改进。

自适应滤波

自适应滤波法的基本过程

自适应滤波法与移动平均法、指数平滑法一样,也是以时间序列的历史观测值进行某种加权平均来预测的,它要寻找一组“最佳”的权数,其办法是先用一组给定的权数来计算一个预测值,然后计算预测误差,再根据预测误差调整权数以减少误差。这样反复进行,直至找出一组“最佳”权数,使误差减少到最低限度。由于这种调整权数的过程与通讯工程中的传输噪声过滤过程极为接近,故称为自适应滤波法。

自适应滤波法的基本预测公式为:

\hat y_{t+1}=w_1y_t+w_2y_{t-1}+...+w_Ny_{t-N+1}=\sum^N_{i=1}w_iy_{t-i+1}

\hat y_{t+1}为第t+1期的预测值,w_i为第t−i+1期的观测值权数,y_{t−i+1}为第t−i+1期的观测值,N为权数的个数。其调整权数的公式为:

w'_i=w_i+2k·e_{t+1}y_{t-i+1}

式中,i=1,2,L,N,t=N,N+1,L,n,n为序列数据的个数,w_i为调整前的第i个权数,w'_i为调整后的第i个权数,k为学习常数,e_{t+1}为第t+1期的预测误差。上式表明:调整后的一组权数应等于旧的一组权数加上误差调整项,这个调整项包括预测误差、原观测值和学习常数等三个因素。学习常数k的大小决定权数调整的速度。

下面举一个简单的例子来说明此法的全过程。设有一个时间序列包括 10 个观测值,如表所示。试用自适应滤波法,以两个权数来求第 11 期的预测值。

时期12345678910
观测值0.10.20.30.40.50.60.70.80.91.0

本例中N=2。取初始权数w1=0.5,w2=0.5,并设k=0.9,t的取值由N=2开始。

class AdaptiveFilter(TimingModel):
    def adjWeight(self,w,t,e) -> np.ndarray:
        return w+2*self.k*e*np.flip(self.arr[t-self.N:t])
    def adaptiveFilter(self,_w:np.ndarray,last_sum=0):        
        w=_w
        est_arr=np.array([],dtype=np.float64)   # 预测数列
        e_arr=np.array([],dtype=np.float64)     # 误差数列
        for t in range(self.N,self.t):
            est=np.sum(w*np.flip(self.arr[t-self.N:t]))
            e=self.arr[t]-est
            est_arr=np.insert(est_arr,est_arr.size,est)
            e_arr=np.insert(e_arr,e_arr.size,e)
            w=self.adjWeight(w,t,e)
        # 递归,重复实验减小误差
        if abs(np.sum(e_arr)-last_sum)<1e-9:
            return [est_arr,e_arr,w]
        else:
            return self.adaptiveFilter(w,np.sum(e_arr))
    def __init__(self,arr:np.ndarray,w,k,time_series=None) -> None:
        super().__init__(arr,time_series)
        self.w=np.array(w)                                      # 观测值权重
        self.N=self.w.size                                      # 权数个数
        self.k=k                                                # 学习常数
        self.est_arr=self.adaptiveFilter(self.w)[0]             # 预测数列
        self.e_arr=self.adaptiveFilter(self.w)[1]               # 预测误差
        self.best_w=self.adaptiveFilter(self.w)[2]              # 最佳权重
        self.est_val=np.sum(self.best_w*np.flip(self.arr[-2:])) # 预测值
        self.MSE=np.mean(np.power(self.est_arr-self.arr[self.N:],2))                    # 均方误差
        self.MAPE=np.mean(np.abs((self.arr[self.N:]-self.est_arr)/self.arr[self.N:]))   # 平均绝对误差
    def show(self,lite=False):
        print(f'初始权重w={self.w},学习常数k={self.k}的预测结果如下:')
        raw_data=self.arr.reshape(1,self.t)
        est_data=np.insert(self.est_arr,0,np.full(self.N,np.nan)).reshape(1,self.t)
        columns=self.time_series[:-1]
        index=['观测值','预测值']
        data=np.r_[raw_data,est_data]
        if len(columns)<15 or lite==True:
            display(pd.DataFrame(data,columns=columns,index=index).round(4))
        else:
            display(pd.DataFrame(data.T,columns=index,index=columns).round(4))
        print(f'修正后权重为:{self.best_w.round(4)}')
        print(f'预测结果为:{round(self.est_val,4)},均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
data=np.arange(0.1,1.1,0.1)
w=(0.5,0.5)
k=0.9
test=AdaptiveFilter(data,w,k)
test.show()
初始权重w=[0.5 0.5],学习常数k=0.9的预测结果如下:
0	1	2	3	4	5	6	7	8	9
观测值	0.1	0.2	0.3	0.4	0.5	0.6	0.7	0.8	0.9	1.0
预测值	NaN	NaN	0.3	0.4	0.5	0.6	0.7	0.8	0.9	1.0
修正后权重为:[ 2. -1.]
预测结果为:1.1,均方误差为0.0,平均绝对误差为0.0%

N,k值和初始权数的确定

在开始调整权数时,首先要确定权数个数N和学习常数k。一般说来,当时间序列的观测值呈季节变动时,N应取季节性长度值。如序列以一年为周期进行季节变动时,若数据是月度的,则取N=12,若季节是季度的,则取N=4。如果时间序列无明显的周期变动,则可用自相关系数法来确定,即取N为最高自相关系数的滞后时期。k的取值一般可定为1/N,也可以用不同的k值来进行计算,以确定一个能使S最小的k值。

初始权数的确定也很重要,如无其它依据,也可用1/N作为初始权系数用,即:

w_i=\frac{1}{N}(i=1,2,...,N)

自适应滤波法有两个明显的优点:一是技术比较简单,可根据预测意图来选择权数的个数和学习常数,以控制预测。也可以由计算机自动选定。二是它使用了全部历史数据来寻求最佳权系数,并随数据轨迹的变化而不断更新权数,从而不断改进预测。由于自适应滤波法的预测模型简单,又可以在计算机上对数据进行处理,所以这种预测方法应用较为广泛。

趋势外推

趋势外推法是根据事物的历史和现时资料,寻求事物发展规律,从而推测出事物未来状况的一种比较常用的预测方法。利用趋势外推法进行预测,主要包括六个阶段:

  • 选择应预测的参数;
  • 收集必要的数据;
  • 利用数据拟合曲线;
  • 趋势外推;
  • 预测说明;
  • 研究预测结果在进行决策中应用的可能性。

趋势外推法常用的典型数学模型有:指数曲线、修正指数曲线、生长曲线、包络曲线等。

指数曲线

一般来说,技术的进步和生产的增长,在其未达饱和之前的新生时期是遵循指数曲线增长规律的,因此可以用指数曲线对发展中的事物进行预测。

指数曲线的数学模型为:

y=y_0e^{K_t}

其中系数y0 和K值由历史数据利用回归方法求得。对上式取对数可得:

\ln y=\ln y_0+K_t

令:

Y=\ln y,\quad A=\ln y_0

则:

Y=A+Kt

其中A,K可以用最小二乘法求得。

修正指数曲线

利用指数曲线外推来进行预测时,存在着预测值随着时间的推移会无限增大的情况。这是不符合客观规律的。因为任何事物的发展都是有一定限度的。例如某种畅销产品,在其占有市场的初期是呈指数曲线增长的,但随着产品销售量的增加,产品总量接近于社会饱和量时。这时的预测模型应改用修正指数曲线。

\hat y_t=K+ab^t

在此数学模型中有三个参数 K, a 和b 要用历史数据来确定。修正指数曲线用于描述这样一类现象:

  • 初期增长迅速,随后增长率逐渐降低。
  • 当K>0,a<0,0<b<1时,t→∞,ab^t→0,即\hat y_t→K。

当K值可预先确定时,采用最小二乘法确定模型中的参数。而当K值不能预先确定时,应采用三和法。
把时间序列的n个观察值等分为三部分,每部分有m期,即n=3m。令每部分的趋势值之和等于相应的观察值之和,由此给出参数估计值。

记观察值的各部分之和:

S_1=\sum^m_{t=1}y_t,\ S_2=\sum^{2m}_{t=m+1}y_t,\ S_3=\sum^{3m}_{t=2m+1}y_t

则参数的推导值为:

\left\{\begin{aligned}
&b=\left(\frac{S_3-S_2}{S_2-S_1}\right)^{\frac{1}{m}}\\
&a=(S_2-S_1)\frac{b-1}{b(b^m-1)^2}\\
&K=\frac{1}{m}\left[S_1-\frac{ab(b^m-1)}{b-1}\right]
\end{aligned}\right.

至此三个参数全部确定了,于是就可以用模型进行预测。

值得注意的是,并不是任何一组数据都可以用修正指数曲线拟合。采用前应对数据进行检验,检验方法是看给定数据的逐期增长量的比率是否接近某一常数b 。即:

\frac{y_{t+1}-y_t}{y_t-y_{t-1}}≈b

计算示例:根据统计资料,某厂收音机连续 15 年的销售量如表,试用修正指数曲线预测1986年的销量。

时间196919701971197219731974197519761977197819791980198119821983
销售量42.147.552.757.762.567.171.575.779.883.787.591.194.697.9101.1
# 趋势外推
class TrendExtrapolation(TimingModel):
    # 三和法
    def doThreeSum(self,arr):
        S1=np.sum(arr[0:self.m])
        S2=np.sum(arr[self.m:2*self.m])
        S3=np.sum(arr[2*self.m:3*self.m])
        return{'S1':S1,'S2':S2,'S3':S3}
    # 计算系数
    def calCoefficient(self,S1,S2,S3):
        b=np.power((S3-S2)/(S2-S1),1/self.m)
        a=(S2-S1)*(b-1)/(b*(b**self.m-1)**2)
        K=(S1-a*b*(b**self.m-1)/(b-1))/self.m
        return{'b':b,'a':a,'K':K}
    def __init__(self,arr:np.ndarray,time_series=None) -> None:
        super().__init__(arr,time_series)
        if self.arr.size%3!=0:
            raise ValueError('输入观测值数据的长度必须为3的整数倍!')
        self.n=self.arr.size        # 观测值长度
        self.m=int(self.n/3)        # 分期长度
    # 完整预测
    def full_forecast(self):
        res=np.array([],dtype=np.float64)
        for t in range(0,self.t):
            res=np.insert(res,res.size,self.forecast(t+1))
        return res
    # 绘图
    def paint(self):
        plt.figure(figsize=(20,8),dpi=80)
        x=self.time_series[:-1]
        plt.plot(x,self.arr,x,self.est_arr,marker='o')
        plt.legend(labels=["Real Value","Estimated Value"],loc="lower right")
        plt.show()
class ModifiedIndexCurve(TrendExtrapolation):
    def calCascadeGrowthRate(self):
        return (self.arr[2:]-self.arr[1:-1])/(self.arr[1:-1]-self.arr[:-2])
    def forecast(self,t=None,target=None):                      
        if type(target)!=type(None):
            this_t=target-int(self.time_series[0])+1
        elif type(t)!=type(None):
            this_t=t
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return self.K+self.a*self.b**(this_t)
    def __init__(self,arr:np.ndarray,time_series=None) -> None:
        super().__init__(arr,time_series)
        # 逐期增长量比率
        self.cascade_growth_rate=self.calCascadeGrowthRate()
        # 三和法
        self.S1=self.doThreeSum(self.arr)['S1']
        self.S2=self.doThreeSum(self.arr)['S2']
        self.S3=self.doThreeSum(self.arr)['S3']
        # 系数计算
        self.b=self.calCoefficient(self.S1,self.S2,self.S3)['b']
        self.a=self.calCoefficient(self.S1,self.S2,self.S3)['a']
        self.K=self.calCoefficient(self.S1,self.S2,self.S3)['K']
        # 预测
        self.est_arr=self.full_forecast()                                   # 预测数列
        self.MSE=np.mean(np.power(self.est_arr-self.arr,2))                 # 均方误差
        self.MAPE=np.mean(np.abs((self.arr-self.est_arr)/self.arr))         # 平均绝对误差
    def show(self,target=None):
        print('预测结果如下:')
        print('观测值逐期增长量比值落在'+
              f'[{round(np.min(self.cascade_growth_rate),4)},{round(np.max(self.cascade_growth_rate),4)}]范围内;'+
              f'极差为{round(np.ptp(self.cascade_growth_rate),4)},均值为{round(np.mean(self.cascade_growth_rate),4)}'
              )
        print(f'三和法计算结果:S1={round(self.S1,4)},S2={round(self.S2,4)},S3={round(self.S3,4)}')
        print(f'系数计算结果:b={round(self.b,4)},a={round(self.a,4)},K={round(self.K,4)}')
        if type(target)!=type(None):
            print(f'预测结果为:{round(self.forecast(target=target),4)},均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        else:
            print(f'均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        print(f'可以用forecast()方法进行进一步预测!')
        self.paint()
data=np.array([42.1,47.5,52.7,57.7,62.5,67.1,71.5,75.7,79.8,83.7,87.5,91.1,94.6,97.9,101.1])
ts=np.arange(1969,1985)
radio=ModifiedIndexCurve(data,ts)
radio.show(1986)
预测结果如下:
观测值逐期增长量比值落在[0.9429,0.9762]范围内;极差为0.0333,均值为0.9606
三和法计算结果:S1=262.5,S2=377.8,S3=472.2
系数计算结果:b=0.9608,a=-143.2063,K=179.7162
预测结果为:110.0093,均方误差为0.0004,平均绝对误差为0.0272%
可以用forecast()方法进行进一步预测!

Gompertz曲线

Gompertz曲线是以英国统计学家和数学家B.Gompertz而命名的。Gompertz曲线所描述的现象的特点是:初期增长缓慢,以后逐渐加快,当达到一定程度后,增长率又逐渐下降,最后接近一条水平线。该曲线的两端都有渐近线,其上渐近线为Y=K,下渐近线为Y=0。Gompertz曲线通常用于描述事物的发展由萌芽、成长到饱和的周期过程。现实中有许多现象符合Gompertz曲线,如工业生产的增长、产品的寿命周期、一定时期内的人口增长等,因而该曲线被广泛应用于现象的趋势变动研究。

曲线的一般形式为:

\hat y_t=Ka^{b^t},K>0,0< a  \not= 1,0 < b \not=1

采用 Gompertz曲线前应对数据进行检验,检验方法是看给定数据的对数逐期增长量的比率是否接近某一常数b 。即:

\frac{\ln y_{t+1}-\ln y_t}{\ln y_t-\ln y_{t-1}}≈b

参数估计方法:

\hat y'_t=\ln \hat y_t,K'=\ln K,a'=\ln a

\hat y'_t=K'+a'b'

仿照修正指数曲线的三和法估计参数,令

S_1=\sum^m_{t=1}y'_t,\ S_2=\sum^{2m}_{t=m+1}y'_t,\ S_3=\sum^{3m}_{t=2m+1}y'_t,\ 其中y'_t=\ln y_t

则参数的推导值为:

\left\{\begin{aligned}
&b=\left(\frac{S_3-S_2}{S_2-S_1}\right)^{\frac{1}{m}}\\
&a'=(S_2-S_1)\frac{b-1}{b(b^m-1)^2}\\
&K'=\frac{1}{m}\left[S_1-\frac{ab(b^m-1)}{b-1}\right]
\end{aligned}\right.

计算示例:根据上题的数据,试确定收音机销售量的Gompertz曲线方程,求出各年收音机销售量的趋势值,并预测 1986 年的销售量。

class GompertzCurve(TrendExtrapolation):
    def calLogCascadeGrowthRate(self):
        return (np.log(self.arr[2:])-np.log(self.arr[1:-1]))/(np.log(self.arr[1:-1])-np.log(self.arr[:-2]))
    def forecast(self,t=None,target=None):                      
        if type(target)!=type(None):
            this_t=target-int(self.time_series[0])+1
        elif type(t)!=type(None):
            this_t=t
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return self.K*self.a**(self.b**this_t)
    def __init__(self,arr:np.ndarray,time_series=None) -> None:
        super().__init__(arr,time_series)
        # 对数逐期增长量比率
        self.log_cascade_growth_rate=self.calLogCascadeGrowthRate() 
        # 三和法
        self.S1=self.doThreeSum(np.log(self.arr))['S1']
        self.S2=self.doThreeSum(np.log(self.arr))['S2']
        self.S3=self.doThreeSum(np.log(self.arr))['S3']
        # 系数计算
        self.b=self.calCoefficient(self.S1,self.S2,self.S3)['b']
        self.a_=self.calCoefficient(self.S1,self.S2,self.S3)['a']
        self.K_=self.calCoefficient(self.S1,self.S2,self.S3)['K']
        self.a=np.exp(self.a_)
        self.K=np.exp(self.K_)
        # 预测
        self.est_arr=self.full_forecast()                                   # 预测数列
        self.MSE=np.mean(np.power(self.est_arr-self.arr,2))                 # 均方误差
        self.MAPE=np.mean(np.abs((self.arr-self.est_arr)/self.arr))         # 平均绝对误差
    def show(self,target=None):
        print('预测结果如下:')
        print('观测值对数逐期增长量比值落在'+
              f'[{round(np.min(self.log_cascade_growth_rate),4)},{round(np.max(self.log_cascade_growth_rate),4)}]范围内;'+
              f'极差为{round(np.ptp(self.log_cascade_growth_rate),4)},均值为{round(np.mean(self.log_cascade_growth_rate),4)}'
              )
        print(f'三和法计算结果:S1={round(self.S1,4)},S2={round(self.S2,4)},S3={round(self.S3,4)}')
        print(f'系数计算结果:b={round(self.b,4)},a\'={round(self.a_,4)},K\'={round(self.K_,4)},'+
              f'a={round(self.a,4)},K={round(self.K,4)}')
        if type(target)!=type(None):
            print(f'预测结果为:{round(self.forecast(target=target),4)},均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        else:
            print(f'均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        print(f'可以用forecast()方法进行进一步预测!')
        self.paint()
data=np.array([42.1,47.5,52.7,57.7,62.5,67.1,71.5,75.7,79.8,83.7,87.5,91.1,94.6,97.9,101.1])
ts=np.arange(1969,1985)
radio=GompertzCurve(data,ts)
radio.show(1986)
预测结果如下:
观测值对数逐期增长量比值落在[0.8608,0.938]范围内;极差为0.0772,均值为0.9036
三和法计算结果:S1=19.7558,S2=21.6094,S3=22.7333
系数计算结果:b=0.9048,a'=-1.2588,K'=4.8929,a=0.284,K=133.3341
预测结果为:108.3143,均方误差为0.07,平均绝对误差为0.3486%
可以用forecast()方法进行进一步预测!

Logistic曲线(生长曲线)

生物的生长过程经历发生、发展到成熟三个阶段,在三个阶段生物的生长速度是不一样的,例如南瓜的重量增长速度,在第一阶段增长的较慢,在发展时期则突然加快,而到了成熟期又趋减慢,形成一条S形曲线,这就是有名的Logistic曲线(生长曲线),很多事物,如技术和产品发展进程都有类似的发展过程,因此Logistic曲线在预测中有相当广泛的应用。

Logistic曲线的一般数学模型是:

y_t=\frac{1}{K+ab^t},K>0,a>0,0< b\not=1

其可以变换为:

y'_t=K+ab^t,\ y'_t=\frac{1}{y_t}

检验能否使用Logistic曲线的方法,是看给定数据倒数的逐期增长量的比率是否接近某一常数b。即

\frac{1/y_{t+1}-1/y_t}{1/y_t-1/y_{t-1}}≈b

仿照修正指数曲线的三和法估计参数,令

S_1=\sum^m_{t=1}y'_t,\ S_2=\sum^{2m}_{t=m+1}y'_t,\ S_3=\sum^{3m}_{t=2m+1}y'_t,\ 其中y'_t=\frac{1}{y_t}

则参数的推导值为:

\left\{\begin{aligned}
&b=\left(\frac{S_3-S_2}{S_2-S_1}\right)^{\frac{1}{m}}\\
&a'=(S_2-S_1)\frac{b-1}{b(b^m-1)^2}\\
&K'=\frac{1}{m}\left[S_1-\frac{ab(b^m-1)}{b-1}\right]
\end{aligned}\right.

计算示例:根据上题的数据,试确定收音机销售量的Logistic曲线方程,求出各年收音机销售量的趋势值,并预测 1986 年的销售量。

class LogisticCurve(TrendExtrapolation):
    def calInverseCascadeGrowthRate(self):
        return (1/self.arr[2:]-1/self.arr[1:-1])/(1/self.arr[1:-1]-1/self.arr[:-2])
    def forecast(self,t=None,target=None):                      
        if type(target)!=type(None):
            this_t=target-int(self.time_series[0])+1
        elif type(t)!=type(None):
            this_t=t
        else:
            raise ValueError('target和m至少有一个参数传入!')
        return 1/(self.K+self.a*self.b**this_t)
    def __init__(self,arr:np.ndarray,time_series=None) -> None:
        super().__init__(arr, time_series)
        # 倒数逐期增长量比率
        self.inverse_cascade_growth_rate=self.calInverseCascadeGrowthRate()
        # 三和法
        self.S1=self.doThreeSum(1/self.arr)['S1']
        self.S2=self.doThreeSum(1/self.arr)['S2']
        self.S3=self.doThreeSum(1/self.arr)['S3']
        # 系数计算
        self.b=self.calCoefficient(self.S1,self.S2,self.S3)['b']
        self.a=self.calCoefficient(self.S1,self.S2,self.S3)['a']
        self.K=self.calCoefficient(self.S1,self.S2,self.S3)['K']
        # 预测
        self.est_arr=self.full_forecast()                                   # 预测数列
        self.MSE=np.mean(np.power(self.est_arr-self.arr,2))                 # 均方误差
        self.MAPE=np.mean(np.abs((self.arr-self.est_arr)/self.arr))         # 平均绝对误差
    def show(self,target=None):
        print('预测结果如下:')
        print('观测值倒数逐期增长量比值落在'+
              f'[{round(np.min(self.inverse_cascade_growth_rate),4)},{round(np.max(self.inverse_cascade_growth_rate),4)}]范围内;'+
              f'极差为{round(np.ptp(self.inverse_cascade_growth_rate),4)},均值为{round(np.mean(self.inverse_cascade_growth_rate),4)}'
              )
        print(f'三和法计算结果:S1={round(self.S1,4)},S2={round(self.S2,4)},S3={round(self.S3,4)}')
        print(f'系数计算结果:b={round(self.b,4)},a={round(self.a,4)},K={round(self.K,4)}')
        if type(target)!=type(None):
            print(f'预测结果为:{round(self.forecast(target=target),4)},均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        else:
            print(f'均方误差为{round(self.MSE,4)},平均绝对误差为{round(self.MAPE*100,4)}%')
        print(f'可以用forecast()方法进行进一步预测!')
        self.paint()
data=np.array([42.1,47.5,52.7,57.7,62.5,67.1,71.5,75.7,79.8,83.7,87.5,91.1,94.6,97.9,101.1])
ts=np.arange(1969,1985)
radio=LogisticCurve(data,ts)
radio.show(1986)
预测结果如下:
观测值倒数逐期增长量比值落在[0.7693,0.9074]范围内;极差为0.1381,均值为0.8504
三和法计算结果:S1=0.0971,S2=0.0666,S3=0.0531
系数计算结果:b=0.8493,a=0.0174,K=0.0085
预测结果为:106.3981,均方误差为0.2546,平均绝对误差为0.649%
可以用forecast()方法进行进一步预测!

趋势线的选择

趋势线的选择有以下几种方式。

  • 由散点图选择趋势线。
  • 由数据本身的取值规律选择趋势线。
  • 比较预测误差大小
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇