广义 Goertzel 算法可实现比 FFT 更好的峰值检测:使用真实数据进行偏移频率?

Generalized Goertzel algorithm for better peaks detection than FFT: shifted frequencies with real data?

提问人:fede72bari 提问时间:9/29/2023 最后编辑:fede72bari 更新时间:10/5/2023 访问量:84

问:

我已经从 Matlab 代码中翻译了 Python 中广义 Goertzel 技术的算法,可以在这里找到。

我在真实数据上使用它时遇到了麻烦,而且只能在真实数据上使用它:生成具有 5 个正弦分量的测试“合成”信号,Goertzel 返回正确的频率,显然比 FFT 具有更好的精度;这两种技术是一致的。

但是,如果我提供 N 个样本的市场数据,则 FFT 给出的最低频率 f = 1/N 正如预期的那样;Goertzel 返回所有高于 1 的频率。数据的时间帧是 1 小时,但数据没有用时间戳标记,也可能是秒,所以我的期望是,计算频率变换的两种方法应该返回,除了精度不同之外,应该在频域上返回相同的谐波。

为什么我在一种方法中得到的频率最低,而使用另一种具有真实数据的方法却大于 1?

import numpy as np

def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
    # Check input arguments
    if len(indvec) < 1:
        raise ValueError('Not enough input arguments')
    if not isinstance(x, np.ndarray) or x.size == 0:
        raise ValueError('X must be a nonempty numpy array')
    if not isinstance(indvec, np.ndarray) or indvec.size == 0:
        raise ValueError('INDVEC must be a nonempty numpy array')
    if np.iscomplex(indvec).any():
        raise ValueError('INDVEC must contain real numbers')
    
    lx = len(x)
    x = x.reshape(lx, 1)  # forcing x to be a column vector
    
    # Initialization
    no_freq = len(indvec)
    y = np.zeros((no_freq,), dtype=complex)
    
    # Computation via second-order system
    for cnt_freq in range(no_freq):
        # Precompute constants
        pik_term = 2 * np.pi * indvec[cnt_freq] / lx
        cos_pik_term2 = 2 * np.cos(pik_term)
        cc = np.exp(-1j * pik_term)  # complex constant
        
        # State variables
        s0 = 0
        s1 = 0
        s2 = 0
        
        # Main loop
        for ind in range(lx - 1):
            s0 = x[ind] + cos_pik_term2 * s1 - s2
            s2 = s1
            s1 = s0
        
        # Final computations
        s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
        y[cnt_freq] = s0 - s1 * cc
        
        # Complex multiplication substituting the last iteration
        # and correcting the phase for potentially non-integer valued
        # frequencies at the same time
        y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
    
    return y

以下是合成测试 5 个分量信号的 FFT 和 Goertzel 变换图表

enter image description here

这里是 Goertzel 一个

enter image description here

原始频率是

signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]

相反,如果我尝试下载市场数据

data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")

并尝试转换 SPY 的数据['Close'],这就是我通过 N = 800 个样本的 FFT 转换得到的

enter image description here

以及前 2 个组件上的重建信号(不太好)

enter image description here

这就是我从 Goertzel 变换中得到的

enter image description here

请注意,FFT 上的第一个峰值低于 0.005,而 Goertzel 则高于 1。

这是我在 SPY 数据上测试 FFT 的方式

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):

    num_harmonics = num_harmonics *1
    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Calcola la FFT dei valori "Close"
    fft_result = np.fft.fft(data["Close"].values)
    frequency_range = np.fft.fftfreq(len(fft_result))


    print("Frequencies: ")
    print(frequency_range)
    print("N Frequencies: ")
    print(len(frequency_range))


    print("First frequencies magnitude: ")
    print(np.abs(fft_result[0:num_harmonics]))


    # Trova le armoniche dominanti
    # top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
    top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one

    print("Top harmonics: ")
    print(top_harmonics)

    # top_harmonics = [1, 4]#, 8, 5, 9]



    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Calcola la ricostruzione basata sulle prime N armoniche
    reconstructed_signal = np.zeros(len(data))

    time = np.linspace(0, num_samples, num_samples, endpoint=False)
    # print('time')
    # print(time)
    for harmonic_index in top_harmonics[:num_harmonics]:
      amplitude = np.abs(fft_result[harmonic_index]) #.real
      phase = np.angle(fft_result[harmonic_index])
      frequency = frequency_range[harmonic_index]
      reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)


    # print('first reconstructed_signal len')
    # print(len(reconstructed_signal))

    zeros = np.zeros(len(original_data) - 2*len(data))
    reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)

    # print('second reconstructed_signal len')
    # print(len(reconstructed_signal))

    reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)

    original_data['reconstructed_signal'] = reconstructed_signal


    # print('reconstructed_signal len')
    # print(len(reconstructed_signal))

    # print('original_data len')
    # print(len(original_data))

    # print('reconstructed_signal[300:320]')
    # print(reconstructed_signal[290:320])

    # print('original_data[300:320]')
    # print(original_data[290:320][['Close', 'reconstructed_signal']])


    # reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])

    # print('reconstructed_signal')
    # print(reconstructed_signal)

    # Converte i valori complessi in valori reali per la ricostruzione
    # reconstructed_signal_real = reconstructed_signal.real

    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")

    # Visualizza il grafico dello spettro
    fig_spectrum.show()

    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)

as well as the test of SPY data on Goertzel

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):


    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Frequenze desiderate
    frequency_range = np.arange(0, 20, 0.001) 


    # Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
    transform = goertzel_general_shortened(data['Close'].values, frequency_range)

    harmonics_amplitudes = np.abs(transform)

    frequency_range = frequency_range  
    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Visualizza il grafico dello spettro
    fig_spectrum.show()


    peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
    peak_frequencies = frequency_range[peaks_indexes] 
    peak_amplitudes = harmonics_amplitudes[peaks_indexes]

    print('peaks_indexes')
    print(peaks_indexes[0:30])
    print('peak_frequencies')
    print(peak_frequencies[0:30])
    print('peak_amplitudes')
    print(peak_amplitudes[0:30])

    lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies


    higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]

    print('higher_amplitudes_sort_peak_indexes')
    print(higher_amplitudes_sort_peak_indexes[0:10])



    # used_indexes = lower_freq_sort_peak_indexes
    used_indexes = higher_amplitudes_sort_peak_indexes

    # Creazione del segnale ricostruito utilizzando i picchi
    time = np.linspace(0, num_samples, num_samples, endpoint=False) 
    reconstructed_signal = np.zeros(len(time), dtype=float)

    print('num_samples')
    print(num_samples)
    print('time[0:20]')
    print(time[0:20])


    print('reconstructed_signal')
    print(reconstructed_signal[0:10])

    for index in used_indexes:

        phase = np.angle(transform[index])  
        amplitude = np.abs(transform[index])
        frequency = frequency_range[index]
        print('phase')
        print(phase)
        print('amplitude')
        print(amplitude)
        print('frequency')
        print(frequency)
        reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)


    # Estrai la parte reale del segnale ricostruito
    reconstructed_signal_real = reconstructed_signal

    print('reconstructed_signal[1]')
    print(reconstructed_signal[1])

    print('reconstructed_signal.shape')
    print(reconstructed_signal.shape)

    zeros = np.zeros(len(original_data) - 2*num_samples)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    original_data['reconstructed_signal'] = reconstructed_signal_real



    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")


    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo

analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)

Edit 30/09/2023

I tried to normalize the SPY data as suggested in the answers, but the problem is still there, here is the resulting chart

enter image description here

python fft 频率分析 goertzel-algorithm

评论

1赞 fede72bari 9/30/2023
you guys find a good one

答:

0赞 fede72bari 10/5/2023 #1

I finally found the point. The Goertzel transform basically counts how many times each harmonic stays inside the sampling period returning, in addition, its amplitude and phase. To get the frequency it is necessary to divide by the number of samples, something that probably is implicitly done by the standard FFT libraries. The example with 5 "synthetic" sin waves turned out to be similar between FTT and Goertzel because was sampled in 1 second, so, for example, a 50Hz harmonic had exactly 50 sin waves in one full circle angle that madethe Goertzel transform resulting to be correct too.

The correction is in the following added second line of code

transform = goertzel_general_shortened(data, frequency_range)

frequency_range = frequency_range/num_samples

harmonics_amplitudes = np.abs(transform)
peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
peak_frequencies = frequency_range[peaks_indexes]
peak_periods = 1 / frequency_range[peaks_indexes]
peak_amplitudes = harmonics_amplitudes[peaks_indexes]
peak_phases = np.angle(transform[peaks_indexes])

peak_amplitudes = peak_amplitudes*peak_frequencies
harmonics_amplitudes = harmonics_amplitudes*frequency_range