English Русский Español Deutsch 日本語 Português
preview
基于转移熵的时间序列因果分析

基于转移熵的时间序列因果分析

MetaTrader 5示例 | 21 二月 2025, 11:45
262 0
Francis Dube
Francis Dube

概述

传递熵是一种统计工具,用于量化一个时间序列向另一个时间序列传递的信息量,从而揭示目标变量的性质和行为。在本文中,我们深入探讨了以传递熵为衡量标准的统计因果关系的概念。我们研究了这种方法如何揭示不同过程之间因果影响的方向。此外,我们还提供了用于测量传递熵的MQL5实现的详细描述,展示了如何将这种技术实际应用于分析可能耦合的时间序列。通过利用传递熵,我们旨在识别能够增强预测任务的变量。


因果关系

实证数据可能会具有欺骗性。仅仅因为两个变量似乎同步变化,并不意味着其中一个导致了另一个的变化,这就是“相关性不等于因果关系”这一说法的真实含义。相关性仅仅衡量两个变量之间的联系程度,而不是它们为何存在联系。例如,想象一下在夏天,冰淇淋销售量和某只股票价格之间存在很强的相关性。这并不意味着购买冰淇淋会使股票价格上涨!更有可能的原因是一个隐藏的因素,比如季节本身,分别影响了这两个变量。同样,一家公司的股票价格和黄金价格之间可能存在某种联系,但真正的原因可能是完全不同的东西,比如整体市场情绪或通货膨胀同时影响了这两个价格。这些例子突显了相关数据可能具有的误导性。它们展示了两者之间的联系,但并没有揭示其背后的原因。为了真正理解一件事情是否导致了另一件事情的发生,我们需要更先进的工具。

单摆

因果关系的概念,即一个事件导致另一个事件发生,是科学研究的基础。然而,精确地定义因果关系是一个多方面的挑战,它涉及深刻的哲学、物理和统计学考量。理想情况下,一个原因总是会产生单一的影响。然而,从通常复杂的影响力网络中分离出单一的因果因素可能很困难。例如,交易量的激增可能与股票价格的上涨相关,但其他因素,如市场情绪和经济数据的发布,也可能发挥着重要作用。在这些情况下,研究人员使用统计技术来推断因果关系。

因果关系的性质,无论是确定性的(保证的结果)还是概率性的(影响后续事件的可能性),都取决于底层过程。在确定性系统中,第一个事件明显导致第二个事件的发生,正如我们观察到的掉落物体的可预测下落。相反,在概率性系统中,重点转向第一个事件是否增强了我们预测第二个事件发生的能力。例如,虽然最近的降雨可能与随后花朵的盛开有关联,但其他环境因素也可能对此有影响。在这种情况下,问题变成了第一个事件的知识是否提高了我们预测第二个事件的能力。

经济学家克莱夫·格兰杰(Clive Granger)在诺伯特·维纳(Norbert Wiener)的工作基础上,发展了因果关系的概念,提出如果第二个信号的未来值可以通过使用第一个和第二个信号的过去信息来更好地解释,而不仅仅是使用第二个信号自身的滞后值,那么第一个信号就导致了第二个信号。格兰杰的因果关系定义基于两个原则。首先,效果不能在其原因之前出现。其次,原因会包含传递给效果的独特信息。这些原则表明,为了量化因果关系,我们需要理解所涉及变量的时间属性,以及它们信息含量的某种度量。这使得时间序列非常适合因果分析。

因果关系

根据时间序列数据的特性,我们可以分析一个序列在某一时刻的信息如何影响另一个序列在后续时刻的可预测性。格兰杰将因果关系定义为不确定性的减少。如果了解序列X的过去值能够改善我们对序列Y未来值的预测(相比仅使用Y自身的过去值),那么就说 X 对 Y 具有预测性。基于这一想法,格兰杰开发了使用滞后时间序列和自回归模型的因果关系检验方法。他提出,除非包含X的过去值能显著改善对Y未来值的预测,否则X和Y之间不存在因果关系。这种改善通常通过减少预测误差来衡量,例如在回归模型中获得更好的拟合度。格兰杰因果关系使我们能够从统计学上检测时间序列之间的关系。然而,需要注意它的局限性。格兰杰因果关系仅能识别方向性的关系,并不一定揭示明确的因果机制。可能存在我们没有数据作依据的其他原因在起作用。此外,由于格兰杰因果关系本质上是基于自回归框架构建的,它在揭示线性因果关系方面最为有效。非线性因果关系需要采用不同的方法。

从数学上讲,格兰杰因果关系可以通过考虑两个时间序列X和Y来表示。每个序列的滞后值分别用X(t−k) 和Y(t−k) 表示,其中k表示滞后的时间步长。。可接受的最大滞后时间用p表示。应用自回归模型时,Y的未来值会基于其自身的过去值进行回归分析。

自回归公式

这个表达式仅从Y的过去值来考虑Y的未来值。通过将X引入模型,未来值则可以同时用X和Y的过去值来表示。

向量自回归公式

如果将X的过去值引入模型能够显著改善对Y的预测(相比仅使用Y的过去值的模型),则称基于格兰杰因果关系(Granger-cause)X导致Y。这通常是通过检验系数是否联合为0的零假设来评估的。如果拒绝了这一零假设,则表明X提供了关于Y的显著预测信息,这些信息超出了Y自身过去值所包含的内容。为了检验基于格兰杰因果关系X是否导致Y,我们在假设检验下比较两个模型:

  • 零假设:基于格兰杰因果关系X不导致Y。
  • 备择假设:基于格兰杰因果关系X导致Y。

F检验用于通过检验两个模型的残差来比较受限模型(不含X)和非受限模型(含X)的拟合优度。受限模型的残差平方和来自不含X的模型,而非受限模型的残差平方和来自包含X的模型。F统计量的计算公式为:

F统计量公式

其中,n是观测值的数量。计算出的F统计量将与具有p和n - 2p - 1自由度的F分布的临界值进行比较。如果F统计量大于临界值,我们将拒绝零假设,并得出结论:基于格兰杰因果关系X导致Y。或者,也可以通过单因素方差分析(ANOVA)检验来计算F统计量。其公式如下:

基于方差分析的格兰杰因果关系

转移熵

在信息论的早期,科学家们使用互信息来理解耦合过程之间的相互作用。这一概念基于克劳德·香农的熵理论,告诉我们一个时间序列中的信息是否与另一个时间序列重叠。简单来说,它揭示了我们是否可以用比单独编码更少的信息来同时编码这两个序列。正因为如此,互信息有时被称为冗余信息。一个进程与另一个进程共享信息,从而使第二个进程能够高效地通过复用从第一个进程中已捕获的信息来进行描述。

正式地,当两个随机过程X(t)和Y(t)的边缘熵之和超过它们组合系统的联合熵时,就确立了它们之间的互信息。这种数学关系反映了与单个过程相比,组合系统不确定性的减少。换句话说,它捕捉到了一个进程的信息可用于减少另一个进程固有熵的程度。由于熵完全由底层概率分布决定,因此任何这样的分布都可以用一个相关的熵值来表征。对于给定的已知概率分布,这个值量化了与特定结果相关的意外程度。

在格兰杰因果关系的背景下,这一概念变得尤为相关。当研究时间序列之间的潜在因果关系时,目标是通过纳入潜在源过程的信息来减少与目标过程相关的不确定性。如果加入一个次要时间序列可以显著减少目标进程分布的熵,这表明源序列对目标序列存在统计上的因果影响。这种减少被称为传递熵


传递熵(TE)基于Kullback-Leibler散度的概念,用于衡量两个时间序列之间信息传递的方向。具体来说,传递熵基于条件互信息的概念,并可以使用Kullback-Leibler(KL)距离来表示,因此也被称为Kullback-Leibler散度或相对熵。KL散度用于衡量两个概率分布之间的差异。在传递熵中,KL距离衡量的是Y的当前状态和X、Y的过去状态的联合概率分布,与这些状态的边缘分布的乘积之间的差异。从数学上讲,时间序列X向时间序列Y的信息传递可以表示为:

转移熵公式



其中,y(t+1) 是Y的未来状态,y(t) 是Y的过去状态,而x(t) 是X的过去状态。该公式强调了传递熵衡量的是,在考虑了来自x(t)以及y(t)的信息后,y(t+1)的概率分布会发生多大的变化。

2009年,莱昂内尔·巴内特(Lionel Barnett)、亚当·巴内特(Adam Barrett)和阿尼尔·赛斯(Anil Seth)共同撰写了一篇论文,题为“格兰杰因果关系与传递熵对高斯变量等价”,证明了当时间序列遵循高斯分布时,传递熵等同于格兰杰因果关系F统计量的一半。

转移熵公式中的因果关系

这一结果为我们后续在代码中实现的线性传递熵的定义提供了基础。了考虑非线性因果关系,我们借鉴了托马斯·施赖伯(Thomas Schreiber)的研究工作,将时间序列视为具有不同转移概率分布的马尔可夫过程,用以扩展了不确定性减少的概念。

Schreiber在建模不确定性减少时,利用了信息论,将时间序列X(t)和Y(t)视为具有已知转移概率p(x)和q(x)的马尔可夫过程。与Granger的自回归模型依赖线性模型不同,这种方法使用条件互信息来描述信息传递。由于互信息是从熵的差异中得出的,条件互信息则是通过对每个熵项附加额外信息来进行条件化而获得的。然后,通过将滞后变量代入条件互信息方程来计算传递熵,从而使我们能够利用条件互熵来分析在特定滞后k下从X(t)到Y(t)的信息传递。

联合条件熵

独立条件熵

从计算的角度来看,这种方法很有吸引力,因为联合熵仅需概率分布即可计算。对于单个滞后k的传递熵,可以表示为四个独立的联合熵项,这些项可以通过从数据中获得的准确概率分布轻松计算。该公式的优点在于能够处理更多的滞后维度。然而,每个额外的滞后都会使状态空间的维度增加两倍,这显著影响了准确量化传递熵的能力,因为与估计概率密度相关的有限数据问题会呈指数级增长。

非线性熵项

这种方法的一个关键优势在于其非参数化特性。与其他方法不同,它除了假设数据的平稳性之外,并不对底层数据分布做出任何假设,从而允许在没有数据生成过程先验知识的情况下应用。然而,这一优势也伴随着一个警告:结果在很大程度上依赖于对底层分布的准确估计。传递熵需要利用有限的数据来近似随机过程的真实概率分布,并计算四个熵项。这种估计的准确性显著影响传递熵结果的可靠性。鉴于此,我们必须考虑计算出的熵值是虚假的可能性。因此,我们需要某种方法来验证结果的稳健性。

我们坚持采用非参数化方法来估计传递熵,这就带来了相当大的挑战,即确保结果传达的是某种真实信息,而不是垃圾信息。因此,我们有必要考虑一种更具信息量的传递熵解释方法,这涉及对估计值的统计显著性进行评估。常见的显著性检验包括将时间序列数据预设随机打乱的次数。然后为每个打乱顺序的版本计算传递熵。随后,通过计算传递熵低于原始值的比例来确定p值。

另一种方法要求计算结果与打乱顺序的数据均值之间的标准差数量。由于打乱数据会破坏时间结构,因此打乱后的传递熵值的均值应接近0。数据围绕这一均值的分布反映了原始结果的显著性。计算出的值称为z分数。与p值相比,z分数通常所需较少的打乱次数,因此计算效率更高。  

在p值的情况下,目标是获得尽可能接近0的概率。而表示统计显著性的z分数应该高于3.0。


MQL5实现

用于量化传递熵及其显著性的工具的代码被封装在transfer_entropy.mqh文件中。该文件包含了CTransEntropy 类的定义,以及其他辅助类和函数。这个类为时间序列数据的统计分析提供了一个框架,专门用于评估变量之间的因果关系。它提供了两种不同的方法来量化线性格兰杰因果关系(线性传递熵)和非线性传递熵。这两种方法是双向计算的,从而更全面地展现了变量之间信息流。

为了解决数据中可能存在的非平稳性,该类引入了一个窗口化过程。用户可以定义窗口大小和步长,从而能够以较小的、重叠的片段分析数据。这种方法可以针对每个窗口得出特定的结果,有助于识别因果关系强度的时序变化。此外,它还减轻了分析非平稳数据的挑战。该类还提供了一个集成的显著性检验机制。用户可以在保留边缘分布的同时,指定数据打乱顺序的次数。基于这些打乱后的数据集,该类会计算每个方向上传递熵的p值和z分数。这些统计值为分析提供了重要的见解,用于判断观察到的因果关系或信息传递是否仅仅是偶然现象,从而增强了分析的稳健性。

通过无参数的默认构造函数来创建类的实例。

public:
                     CTransEntropy(void)
     {
      if(!m_transfer_entropies.Resize(2))
         Print(__FUNCTION__, " error ", GetLastError());

     }

用户应调用Initialize()方法,该方法使用给定的数据集初始化对象,并设置分析的各种参数。

bool              Initialize(matrix &in, ulong endog_index, ulong exog_index, ulong lag, bool maxLagOnly=true, ulong winsize=0,ulong winstride=0)
     {
      if(!lag || lag>in.Rows()/2)
        {
         Print(__FUNCTION__, " Invalid parameter(s) : lag must be > 0  and < rows/2");
         return false;
        }

      if(endog_index==exog_index)
        {
         Print(__FUNCTION__, " Invalid parameter(s) : endog cannot be = exog ");
         return false;
        }

      if(!m_dataset.Resize(in.Rows(),2))
        {
         Print(__FUNCTION__, " error ", GetLastError());
         return false;
        }

      if(!m_dataset.Col(in.Col(endog_index),0) || !m_dataset.Col(in.Col(exog_index),1))
        {
         Print(__FUNCTION__, " error ", GetLastError());
         return false;
        }

      if(!m_wins.Initialize(m_dataset,lag,maxLagOnly,winsize,winstride))
         return false;

      m_tlag = lag;
      m_endog = endog_index;
      m_exog = exog_index;
      m_maxlagonly = maxLagOnly;

      return true;
     }

第一个必需的参数是一个至少包含两列的矩阵,待分析的时间序列应置于在输入矩阵的列中。。如果处理的是非平稳数据,建议在分析前对数据进行差分处理。第二个和第三个参数是输入数据矩阵的列索引,分别指示内生(因变量)时间序列和外生(自变量)时间序列。

第四个参数是滞后参数,定义了分析中考虑的滞后值。接下来的布尔参数maxLagOnly决定了滞后值是否定义了一个单独的项(如果为 true),或者定义了包括滞后值在内的所有滞后项(如果为 false)。倒数第二个参数winsize表示窗口长度。如果设置为 0,则不对数据应用窗口化处理。最后,winstride 参数可选地设置了窗口操作的步长,定义了窗口在时间序列数据上滑动时连续窗口之间的步长。

该方法首先确保内生和外生索引不相同。如果它们相同,则打印错误消息并返回 false。调整内部矩阵m_dataset的大小,以存储待分析的双变量数据集。然后,将输入矩阵中由endog_index和exog_index指定的列分别复制到m_dataset的第一列和第二列。如果请求了窗口化处理,则使用辅助类CDataWindows对m_dataset矩阵进行窗口化处理。完成以上这些后,该方法将内部变量设置为提供的参数,以便后续使用。

//+------------------------------------------------------------------+
//|class that generates windows of the dataset to be analyzed        |
//+------------------------------------------------------------------+
class CDataWindows
  {
private:
   matrix m_dwins[],
          m_data;
   ulong  m_lag,
          m_win_size,
          m_stride_size;

   bool m_max_lag_only,
        m_has_windows;

   matrix            applylags(void)
     {
      matrix out=np::sliceMatrixRows(m_data,m_lag);

      if(m_max_lag_only)
        {
         if(!out.Resize(out.Rows(),m_data.Cols()+2))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return matrix::Zeros(1,1);
           }

         for(ulong i = 2; i<4; i++)
           {
            vector col = m_data.Col(i-2);
            col = np::sliceVector(col,0,col.Size()-m_lag);

            if(!out.Col(col,i))
              {
               Print(__FUNCTION__, " error ", GetLastError());
               return matrix::Zeros(1,1);
              }
           }
        }
      else
        {
         if(!out.Resize(out.Rows(),m_data.Cols()+(m_lag*2)))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return matrix::Zeros(1,1);
           }

         for(ulong i = 0,k = 2; i<2; i++)
           {
            for(ulong t = 1; t<(m_lag+1); t++,k++)
              {
               vector col = m_data.Col(i);
               col = np::sliceVector(col,m_lag-t,col.Size()-t);

               if(!out.Col(col,k))
                 {
                  Print(__FUNCTION__, " error ", GetLastError());
                  return matrix::Zeros(1,1);
                 }
              }

           }
        }

      return out;

     }

   bool              applywindows(void)
     {
      if(m_dwins.Size())
         ArrayFree(m_dwins);

      for(ulong i = (m_stride_size+m_win_size); i<m_data.Rows(); i+=ulong(MathMax(m_stride_size,1)))
        {
         if(ArrayResize(m_dwins,int(m_dwins.Size()+1),100)<0)
           {
            Print(__FUNCTION__," error ", GetLastError());
            return false;
           }
         m_dwins[m_dwins.Size()-1] = np::sliceMatrixRows(m_data,i-m_win_size,(i-m_win_size)+m_win_size);
        }

      return true;
     }


public:
                     CDataWindows(void)
     {

     }

                    ~CDataWindows(void)
     {

     }

   bool              Initialize(matrix &data, ulong lag, bool max_lag_only=true, ulong window_size=0, ulong window_stride =0)
     {
      if(data.Cols()<2)
        {
         Print(__FUNCTION__, " matrix should contain at least 2 columns ");
         return false;
        }

      m_data = data;

      m_max_lag_only = max_lag_only;

      if(lag)
        {
         m_lag = lag;
         m_data = applylags();
        }

      if(window_size)
        {
         m_win_size = window_size;
         m_stride_size = window_stride;
         m_has_windows = true;
         if(!applywindows())
            return false;
        }
      else
        {
         m_has_windows = false;

         if(m_dwins.Size())
            ArrayFree(m_dwins);

         if(ArrayResize(m_dwins,1)<0)
           {
            Print(__FUNCTION__," error ", GetLastError());
            return false;
           }

         m_dwins[0]=m_data;
        }

      return true;
     }

   matrix            getWindowAt(ulong ind)
     {
      if(ind < ulong(m_dwins.Size()))
         return m_dwins[ind];
      else
        {
         Print(__FUNCTION__, " Index out of bounds ");
         return matrix::Zeros(1,1);
        }
     }

   ulong             numWindows(void)
     {
      return ulong(m_dwins.Size());
     }

   bool              hasWindows(void)
     {
      return m_has_windows;
     }
  };

如果Initialize() 方法成功,用户可以调用Calculate_Linear_TE()或Calculate_NonLinear_TE()来分别测试线性传递熵和非线性传递熵。两种方法在完成时都会返回一个布尔值。Calculate_Linear_TE()方法可以接受一个可选参数n_shuffles。如果n_shuffles 为0(默认值),则不会进行显著性检验。

bool              Calculate_Linear_TE(ulong n_shuffles=0)
     {
      ulong c = m_wins.numWindows();

      matrix TE(c,2);
      matrix sTE(c,2);
      matrix pvals(c,2);
      matrix zscores(c,2);

      for(ulong i=0; i<m_wins.numWindows(); i++)
        {
         matrix df = m_wins.getWindowAt(i);

         m_transfer_entropies[0] = linear_transfer(df,0,1);

         m_transfer_entropies[1] = linear_transfer(df,1,0);


         if(!TE.Row(m_transfer_entropies,i))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return false;
           }

         SigResult rlts;

         if(n_shuffles)
           {
            significance(df,m_transfer_entropies,m_endog,m_exog,m_tlag,m_maxlagonly,n_shuffles,rlts);

            if(!sTE.Row(rlts.mean,i) || !pvals.Row(rlts.pvalue,i) || !zscores.Row(rlts.zscore,i))
              {
               Print(__FUNCTION__, " error ", GetLastError());
               return false;
              }

           }

        }

      m_results.TE_XY = TE.Col(0);
      m_results.TE_YX = TE.Col(1);
      m_results.p_value_XY = pvals.Col(0);
      m_results.p_value_YX = pvals.Col(1);
      m_results.z_score_XY = zscores.Col(0);
      m_results.z_score_YX = zscores.Col(1);
      m_results.Ave_TE_XY = sTE.Col(0);
      m_results.Ave_TE_YX = sTE.Col(1);

      return true;
     }

该方法通过格兰杰方式计算线性传递熵。这一功能在私有方法linear_transfer()中实现。该例程的最后两个参数用于识别输入矩阵中的因变量和自变量(列)。通过两次调用该方法并交换列索引,我们可以得到两个方向上的传递熵。

double            linear_transfer(matrix &testdata,long dep_index, long indep_index)
     {
      vector joint_residuals,independent_residuals;
      double entropy=0.0;

      OLS ols;

      double gc;
      vector y;
      matrix x,xx;

      matrix joint;
      if(m_maxlagonly)
         joint = np::sliceMatrixCols(testdata,2);
      else
        {
         if(!joint.Resize(testdata.Rows(), testdata.Cols()-1))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
         matrix sliced = np::sliceMatrixCols(testdata,2);
         if(!np::matrixCopyCols(joint,sliced,1) || !joint.Col(testdata.Col(indep_index),0))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
        }
      matrix indep = (m_maxlagonly)?np::sliceMatrixCols(testdata,dep_index+2,dep_index+3):np::sliceMatrixCols(testdata,(dep_index==0)?2:dep_index+m_tlag+1,(dep_index==0)?2+m_tlag:END);

      y = testdata.Col(dep_index);

      if(dep_index>indep_index)
        {
         if(m_maxlagonly)
           {
            if(!joint.SwapCols(0,1))
              {
               Print(__FUNCTION__, " error ", GetLastError());
               return entropy;
              }
           }
         else
           {
            for(ulong i = 0; i<m_tlag; i++)
              {
               if(!joint.SwapCols(i,i+m_tlag))
                 {
                  Print(__FUNCTION__, " error ", GetLastError());
                  return entropy;
                 }
              }
           }
        }

      if(!addtrend(joint,xx))
         return entropy;

      if(!ols.Fit(y,xx))
         return entropy;

      joint_residuals = ols.Residuals();

      if(!addtrend(indep,x))
         return entropy;

      if(!ols.Fit(y,x))
         return entropy;

      independent_residuals = ols.Residuals();

      gc = log(independent_residuals.Var()/joint_residuals.Var());

      entropy = gc/2.0;

      return entropy;

     }

方法Calculate_NonLinear_TE()除了n_shuffles外,还接受一个额外的参数numBins。这个参数定义了在估计变量的概率密度时使用的箱数。  

bool              Calculate_NonLinear_TE(ulong numBins, ulong n_shuffles=0)
     {
      ulong c = m_wins.numWindows();

      matrix TE(c,2);
      matrix sTE(c,2);
      matrix pvals(c,2);
      matrix zscores(c,2);

      for(ulong i=0; i<m_wins.numWindows(); i++)
        {
         matrix df = m_wins.getWindowAt(i);

         m_transfer_entropies[0] = nonlinear_transfer(df,0,1,numBins);

         m_transfer_entropies[1] = nonlinear_transfer(df,1,0,numBins);


         if(!TE.Row(m_transfer_entropies,i))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return false;
           }

         SigResult rlts;

         if(n_shuffles)
           {
            significance(df,m_transfer_entropies,m_endog,m_exog,m_tlag,m_maxlagonly,n_shuffles,rlts,numBins,NONLINEAR_TE);

            if(!sTE.Row(rlts.mean,i) || !pvals.Row(rlts.pvalue,i) || !zscores.Row(rlts.zscore,i))
              {
               Print(__FUNCTION__, " error ", GetLastError());
               return false;
              }

           }

        }

      m_results.TE_XY = TE.Col(0);
      m_results.TE_YX = TE.Col(1);
      m_results.p_value_XY = pvals.Col(0);
      m_results.p_value_YX = pvals.Col(1);
      m_results.z_score_XY = zscores.Col(0);
      m_results.z_score_YX = zscores.Col(1);
      m_results.Ave_TE_XY = sTE.Col(0);
      m_results.Ave_TE_YX = sTE.Col(1);

      return true;


     }

直方图方法被用于估计概率密度。 选择这种方法是因为它是最简单的实现方式。通过私有方法nonlinear_entropy()和get_entropy()计算广义传递熵。 

double            get_entropy(matrix &testdata, ulong num_bins)
     {

      vector hist;
      vector bounds[];
      hist=vector::Ones(10);

      if(!np::histogramdd(testdata,num_bins,hist,bounds))
        {
         Print(__FUNCTION__, " error ");
         return EMPTY_VALUE;
        }

      vector pdf = hist/hist.Sum();
      vector lpdf = pdf;

      for(ulong i = 0; i<pdf.Size(); i++)
        {
         if(lpdf[i]==0.0)
            lpdf[i] = 1.0;
        }

      vector ent = pdf*log(lpdf);

      return -1.0*ent.Sum();

     }

在nonlinear_transfer()方法中,将用于计算联合熵和独立条件熵的四个组成部分的值合并在一起,得到最终的估计值。

double            nonlinear_transfer(matrix &testdata,long dep_index, long indep_index, ulong numbins)
     {
      double entropy=0.0;

      matrix one;
      matrix two;
      matrix three;
      matrix four;

      if(m_maxlagonly)
        {
         if(!one.Resize(testdata.Rows(),3) || !two.Resize(testdata.Rows(),2) || !three.Resize(testdata.Rows(),2) || !four.Resize(testdata.Rows(),1) ||
            !one.Col(testdata.Col(dep_index),0) || !one.Col(testdata.Col(dep_index+2),1) || !one.Col(testdata.Col(indep_index+2),2) ||
            !two.Col(testdata.Col(indep_index+2),0) || !two.Col(testdata.Col(dep_index+2),1) ||
            !three.Col(testdata.Col(dep_index),0) || !three.Col(testdata.Col(dep_index+2),1) ||
            !four.Col(testdata.Col(dep_index),0))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
        }
      else
        {

         if(!one.Resize(testdata.Rows(), testdata.Cols()-1) || !two.Resize(testdata.Rows(), testdata.Cols()-2) ||
            !three.Resize(testdata.Rows(), m_tlag+1))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }

         matrix deplag = np::sliceMatrixCols(testdata,dep_index?dep_index+m_tlag+1:2,dep_index?END:2+m_tlag);
         matrix indlag = np::sliceMatrixCols(testdata,indep_index?indep_index+m_tlag+1:2,indep_index?END:2+m_tlag);
         //one
         if(!np::matrixCopyCols(one,deplag,1,1+m_tlag) || !np::matrixCopyCols(one,indlag,1+m_tlag) || !one.Col(testdata.Col(dep_index),0))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
         //two
         if(!np::matrixCopyCols(two,indlag,indlag.Cols()) || !np::matrixCopyCols(two,deplag,indlag.Cols()))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
         //three
         if(!np::matrixCopyCols(three,deplag,1) || !three.Col(testdata.Col(dep_index),0))
           {
            Print(__FUNCTION__, " error ", GetLastError());
            return entropy;
           }
         //four
         four = deplag;
        }

      double h1=get_entropy(one,numbins);
      double h2=get_entropy(two,numbins);
      double h3=get_entropy(three,numbins);
      double h4=get_entropy(four,numbins);

      // entropy = independent conditional entropy (h3-h4)  - joint conditional entropy (h1-h2)
      entropy = (h3-h4) - (h1-h2);

      return entropy;

     }

可以通过get_results()方法获取全面的测试结果,该方法返回一个向量结构。该结构的每个成员都代表结果的不同方面,每个向量的长度取决于Initialize()方法设置的参数以及执行的传递熵分析类型。

//+------------------------------------------------------------------+
//| Transfer entropy results struct                                  |
//+------------------------------------------------------------------+
struct TEResult
  {
   vector            TE_XY;
   vector            TE_YX;
   vector            p_value_XY;
   vector            p_value_YX;
   vector            z_score_XY;
   vector            z_score_YX;
   vector            Ave_TE_XY;
   vector            Ave_TE_YX;
  };

所得结果的结构属性如下所述。

结构属性
说明
TE_XY
转移熵从外生到
内生变量
TE_YX
转移熵从内生到
外生变量
z_score_XY
转移熵的显著性
从外生到内生变量
z_score_YX
转移熵的显著性
从内生到外生变量
p_value_XY
传递熵的p值显著性
从外生到内生变量
p_value_YX
传递熵的p值显著性
从内生到外生变量
Ave_TE_XY
平均转移熵从外生到
内生变量
Ave_TE_YX
平均转移熵从内生到
外生变量

调用get_transfer_entropies()将返回一个向量,包含数据集中最后一个窗口的估计传递熵值,这些值在两个方向上都有测量。结果的顺序遵循传递给类的原始数据的列顺序。因此,向量中的第一个熵值对应于第一列中的序列。


示例

该类的功能将通过对具有预设特征的随机生成序列进行测试来验证。这些序列是使用以下列出的函数生成的,这些函数都在generate_time_series.mqh中定义。

//+------------------------------------------------------------------+
//|Generate a random walk time series under Geometric Brownian Motion|
//+------------------------------------------------------------------+
vector random_series(double initial_val, ulong steps, ulong len, double muu, double sgma)
  {
   vector out(len);

   out[0] = initial_val;

   int err=0;

   for(ulong i=1; i<len; i++)
     {
      out[i] = out[i-1]*(1.0+(muu*(double(steps)/double(len)))+(MathRandomNormal(muu,sgma,err)*sqrt(double(steps)/double(len))));
      if(err)
        {
         Print(__FUNCTION__, " MathRandonNormal() ", GetLastError());
         return vector::Zeros(1);
        }
     }

   return out;
  }

random_series()函数会生成一个具有几何布朗运动特征的随机游走时间序列。其参数如下:

  • initial_val:时间序列的初始值。
  • steps:随机游走的总步数。
  • len:要生成的时间序列的长度。
  • muu:几何布朗运动(GBM)的漂移项(均值)。
  • sgma:几何布朗运动(GBM)的波动率(标准差)。
//+-----------------------------------------------------------------------------------------------+
//|Generate two time series under Geometric Brownian Motion with S2 dependent in part on S1-lagged|
//+-----------------------------------------------------------------------------------------------+
matrix coupled_random_series(double init_1,double init_2,ulong steps, ulong len, double muu_1, double muu_2, double sgma_1, double sgma_2,
                            double alpha, double epsilon, ulong lag)
  {

   vector gbm1 = random_series(init_1,steps,len,muu_1,sgma_1);
   vector gbm2 = random_series(init_2,steps,len,muu_2,sgma_2);

   if(gbm1.Size()!=gbm2.Size())
     {
      return matrix::Zeros(1,1);
     }

   matrix out(gbm2.Size()-lag,2);

   for(ulong i = lag; i<gbm2.Size(); i++)
     {
      gbm2[i]=(1.0-alpha)*(epsilon*gbm2[i-lag] + (1.0-epsilon) * gbm2[i]) + (alpha) * gbm1[i-lag];
      out[i-lag][0] = gbm2[i];
      out[i-lag][1] = gbm1[i];
     }

   return out;
  }

函数coupled_random_series()生成两个耦合的随机游走时间序列,其中第二个序列(gbm2)部分依赖于第一个序列(gbm1)的滞后值。该函数返回一个包含两列的矩阵,依赖的序列被置于在第一列。函数的参数如下:

  • init_1:第一个时间序列的初始值。
  • init_2:第二个时间序列的初始值。
  • steps:随机游走的总步数。
  • len:要生成的时间序列的长度。
  • muu_1:第一个序列的漂移项。
  • muu_2:第二个序列的漂移项。
  • sgma_1:第一个序列的波动率。
  • sgma_2:第二个序列的波动率。
  • alpha:一个混合参数,用于调整独立序列对依赖序列的影响。
  • epsilon:一个参数,用于调整滞后依赖序列值的影响。
  • lag:依赖序列对独立序列依赖的滞后。

为了说明CTransEntropy类的功能,我们准备了两个MetaTrader 5脚本。这两个脚本都展示了如何使用该类分析数据集,并检测出独立变量(时间序列)的滞后值,这些滞后值最能表征依赖变量(时间序列)中观察到的依赖性。第一种方法依赖于视觉检查,以从一组在不同滞后下分析传递熵的结果中确定最重要的方向熵值。这种方法在LagDetection.ex5脚本中实现。

//+------------------------------------------------------------------+
//|                                                 LagDetection.mq5 |
//|                                  Copyright 2024, MetaQuotes Ltd. |
//|                                             https://www.mql5.com |
//+------------------------------------------------------------------+
#property copyright "Copyright 2024, MetaQuotes Ltd."
#property link      "https://www.mql5.com"
#property version   "1.00"
#property script_show_inputs
#include<transfer_entropy.mqh>
#include<generate_time_series.mqh>
//--- input parameters

input double   Init1=100.0;
input double   Init2=90.0;
input ulong    Steps=1;
input ulong    Len=500;
input double   Avg1=0;
input double   Avg2=0;
input double   Sigma1=1;
input double   Sigma2=1;
input double   Alph=0.5;
input double   Epsilon=0.3;
input ulong    Lag=3;
input bool     UseSeed = true;
input ulong    Bins=3;
input ENUM_TE_TYPE testtype=NONLINEAR_TE;
input ulong    NumLagsToTest = 10;
input int      PlotViewTimeInSecs = 20;
//+------------------------------------------------------------------+
//| Script program start function                                    |
//+------------------------------------------------------------------+
void OnStart()
  {
   if(UseSeed)
    {
     MathSrand(256);
    }
//---
   if(!NumLagsToTest)
     {
      Print(" Invalid input parameter value for \'NumLagsToTest\'. It must be > 0 ");
      return;
     }

   matrix series = coupled_random_series(Init1,Init2,Steps,Len,Avg1,Avg2,Sigma1,Sigma2,Alph,Epsilon,Lag);

   series = log(series);

   series = np::diff(series,1,false);
   
   matrix entropies(NumLagsToTest,2);

   for(ulong k = 0; k<NumLagsToTest; k++)
     {
      CTransEntropy ote;

      if(!ote.Initialize(series,0,1,k+1))
         return;

      if((testtype==NONLINEAR_TE && !ote.Calculate_NonLinear_TE(Bins)) ||
         (testtype==LINEAR_TE && !ote.Calculate_Linear_TE()))
         return;

      vector res = ote.get_transfer_entropies();

      entropies.Row(res,k);
     }

   Print(" entropies ", entropies);

   CGraphic* g = np::plotMatrix(entropies,"Transfer Entropies","Col 0,Col 1","Lag","TE");

   if(g==NULL)
      return;
   else
     {
      Sleep(int(MathAbs(PlotViewTimeInSecs))*1000);
      g.Destroy();
      delete g;
     }

   return;
  }
//+------------------------------------------------------------------+

脚本的前11个用户可访问的输入参数控制生成序列的属性。最后4个输入参数配置用于分析的各个方面:

  • Bins:设置用于估计数据概率密度的直方图方法的箱数。
  • testtype:启用选择线性或非线性传递熵分析。
  • NumLagsToTest:设置测试将要进行的最大滞后数量,从1开始。
  • PlotViewTimeInSecs:确定图表在程序退出前保持可见的时间。
  • UseSeed:如果为真,则启用随机数生成器的种子,以确保测试结果的可重复性。

脚本生成两个具有预设依赖关系的时间序列,并在不同的滞后下估计传递熵。请注意,数据在分析前进行了差分处理。在这种情况下可能并不是必要的,但这是一个良好的实践。然后将结果(传递熵)在图表上可视化,在垂直轴上绘制传递熵,在水平轴上绘制对应的滞后。如果测试成功,图表应在用于生成随机序列的滞后处显示出一个明显的峰值。

运行该程序表明,线性测试成功识别了用于生成序列的滞后依赖关系。回想一下,依赖序列位于随机生成数据集的第一列。

滞后检测图:线性测试

再次使用非线性测试选项进行测试,结果类似。在这种情况下,熵值明显较小。这可能是由于使用直方图方法估计数据的概率分布存在局限性。还应注意,所选的箱数会影响估计的传递熵。  

滞后检测结果:非线性测试

在下一个演示中,我们测试在特定滞后处获得的熵值的显著性。这一功能在脚本LagDetectionUsingSignificance.ex5中实现。

//+------------------------------------------------------------------+
//|                                LagDetectionUsingSignificance.mq5 |
//|                                  Copyright 2024, MetaQuotes Ltd. |
//|                                             https://www.mql5.com |
//+------------------------------------------------------------------+
#property copyright "Copyright 2024, MetaQuotes Ltd."
#property link      "https://www.mql5.com"
#property version   "1.00"
#property script_show_inputs
#include<transfer_entropy.mqh>
#include<generate_time_series.mqh>
//--- input parameters
input double   Init1=100.0;
input double   Init2=90.0;
input ulong    Steps=1;
input ulong    Len=500;
input double   Avg1=0;
input double   Avg2=0;
input double   Sigma1=1;
input double   Sigma2=1;
input double   Alph=0.5;
input double   Epsilon=0.3;
input ulong    Lag=3;
input bool     UseSeed = true;
input ulong    Bins=3;
input ENUM_TE_TYPE testtype=LINEAR_TE;
input ulong    LagToTest = 3;
input ulong    NumIterations = 100;
//+------------------------------------------------------------------+
//| Script program start function                                    |
//+------------------------------------------------------------------+
void OnStart()
  {
   if(UseSeed)
    {
     MathSrand(256);
    }
//---
   if(!LagToTest)
     {
      Print(" Invalid input parameter value for \'LagToTest\'. It must be > 0 ");
      return;
     }

   matrix series = coupled_random_series(Init1,Init2,Steps,Len,Avg1,Avg2,Sigma1,Sigma2,Alph,Epsilon,Lag);

   series = log(series);

   series = np::diff(series,1,false);

   matrix entropies(1,2);


   CTransEntropy ote;

   if(!ote.Initialize(series,0,1,LagToTest))
      return;

   if((testtype==NONLINEAR_TE && !ote.Calculate_NonLinear_TE(Bins,NumIterations)) ||
      (testtype==LINEAR_TE && !ote.Calculate_Linear_TE(NumIterations)))
      return;

   vector res = ote.get_transfer_entropies();

   entropies.Row(res,0);

   TEResult alres = ote.get_results();

   Print(" significance: ", " pvalue 1->0 ",alres.p_value_XY, " pvalue 0->1 ",alres.p_value_YX);
   Print(" zscore 1->0 ",alres.z_score_XY, " zscore 0->1 ",alres.z_score_YX);

   return;
  }
//+------------------------------------------------------------------+

脚本具有类似的用户可调节输入参数,最后两个除外:

  • LagToTest:设置将要进行测试的具体滞后值。
  • NumIterations:定义数据将被随机打乱以进行显著性检验的次数。

脚本生成一对相关序列,并在选定的滞后值处进行测试。传递熵及其对应的p值和z分数将被写入终端的“Experts”标签页。

滞后3时的脚本参数

在第一次运行中,脚本的LagToTest和Lag参数被设置为相同的值。结果如下所示:这表明矩阵第一列中的序列依赖于第二列中的序列。

JS      0       21:33:43.464    LagDetectionUsingSignificance (Crash 1000 Index,M10)     significance:  pvalue 1->0 [0] pvalue 0->1 [0.66]
LE      0       21:33:43.464    LagDetectionUsingSignificance (Crash 1000 Index,M10)     zscore 1->0 [638.8518379295961] zscore 0->1 [-0.5746565128024472]

在第二次运行中,我们只修改了LagToTest参数的值,并将这些结果与前一次运行的结果进行比较。

滞后5时的脚本参数


注意p值和z分数的差异。在这种情况下,p值和z分数都不显著。

RQ      0       21:33:55.147    LagDetectionUsingSignificance (Crash 1000 Index,M10)     significance:  pvalue 1->0 [0.37] pvalue 0->1 [0.85]
GS      0       21:33:55.147    LagDetectionUsingSignificance (Crash 1000 Index,M10)     zscore 1->0 [-0.2224969673139822] zscore 0->1 [-0.6582062358345131]

尽管测试结果表明CTransEntropy类表现良好,但在使用较大的滞后值进行分析时,尤其是当启用多个滞后项的选项(maxLagOnly为 false)时,存在一个显著的局限性。这种情况在非线性测试中尤为突出。这主要是由于使用直方图方法来估计数据的分布。使用直方图方法估计概率密度存在明显的缺点。箱宽(或箱数)的选择显著影响直方图的外观和准确性。箱宽太小可能导致直方图过于嘈杂和碎片化,而箱宽太大则可能掩盖重要细节并平滑掉特征。最大的问题是直方图主要用于一维数据。对于更高维度的数据,箱数会呈指数增长。如果需要考虑的滞后项很多,对可用计算资源的需求可能会显著增加。因此,建议在使用广义传递熵进行多滞后分析时,保持最大滞后数量较小。  


结论

总体而言,CTransEntropy类能够在线性和非线性情境中分析传递熵。通过实际演示,我们展示了它检测和量化一个时间序列对另一个时间序列影响的能力,通过视觉检查和显著性检验使结果得到验证。该类能够有效处理各种场景,为时间序列应用中的因果关系提供了宝贵的依据。然而,用户应注意分析多个滞后值时的计算挑战,尤其是在应用非线性方法时。为了确保高效的性能和准确的结果,建议限制所考虑滞后的数量。总之,CTransEntropy 类是一个实用的工具,可用于揭示复杂的依赖关系并增强对动态系统的理解。

文件
说明
Mql5\include\generate_time_series.mqh
包含生成随机时间序列的函数
Mql5\include\ np.mqh
向量和矩阵的实用函数集
Mql5\include\ OLS.mqh
包含实现普通最小二乘法回归的OLS类的定义
Mql5\include\ TestUtilities.mqh
提供了一系列用于准备数据集以供OLS评估的工具集
Mql5\include\ transfer_entropy
包含实现传递熵分析的CTransEntropy类的定义
Mql5\scripts\LagDetection.mq5
一个展现CTransEntropy类功能的脚本
Mql5\scripts\LagDetectionUsingSignificance.mq5
第二个脚本,展现了使用CTransEntropy结果的不同解释方法


本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/15393

附加的文件 |
np.mqh (51.13 KB)
OLS.mqh (13.36 KB)
TestUtilities.mqh (4.36 KB)
LagDetection.mq5 (2.43 KB)
Mql5.zip (18.26 KB)
基于套接字(Sockets)的Twitter情绪分析 基于套接字(Sockets)的Twitter情绪分析
这种创新的交易机器人将 MetaTrader 5 与 Python 结合,利用实时社交媒体情绪分析为自动化交易决策提供支持。通过分析与特定金融工具相关的 Twitter 情绪,该机器人将社交媒体趋势转化为可操作的交易信号。它采用客户端-服务器架构,并通过套接字通信实现无缝交互,将 MT5 的交易能力与 Python 的数据处理能力完美结合。该系统展示了将量化金融与自然语言处理相结合的潜力,提供了一种利用替代数据源的尖端算法交易方法。尽管显示出巨大潜力,但该机器人也突显了未来改进的方向,包括采用更先进的情绪分析技术以及改进风险管理策略。
使用MQL5与Python构建自我优化的智能交易系统 使用MQL5与Python构建自我优化的智能交易系统
在本文中,我们将讨论如何构建能够根据当前市场条件自主选择和更改交易策略的EA。我们将学习马尔可夫链(Markov Chains)以及它们如何帮助我们作为算法交易者。
将您自己的 LLM 集成到 EA 中(第 3 部分):使用 CPU 训练自己的 LLM 将您自己的 LLM 集成到 EA 中(第 3 部分):使用 CPU 训练自己的 LLM
在人工智能飞速发展的今天,大语言模型(LLM)是人工智能的重要组成部分,所以我们应该思考如何将强大的 LLM 融入到我们的算法交易中。对于大多数人来说,很难根据他们的需求微调这些强大的模型,在本地部署它们,然后将它们应用于算法交易。本系列文章将采取循序渐进的方法来实现这一目标。
在 MQL5 中创建交互式图形用户界面(第 2 部分):添加控制和响应 在 MQL5 中创建交互式图形用户界面(第 2 部分):添加控制和响应
通过动态功能增强 MQL5 图形用户界面(GUI)面板,可以大大改善用户的交易体验。通过整合互动元素、悬停效果和实时数据更新,该面板成为现代交易者的强大工具。