提问人:fede72bari 提问时间:9/29/2023 最后编辑:fede72bari 更新时间:10/5/2023 访问量:84
广义 Goertzel 算法可实现比 FFT 更好的峰值检测:使用真实数据进行偏移频率?
Generalized Goertzel algorithm for better peaks detection than FFT: shifted frequencies with real data?
问:
我已经从 Matlab 代码中翻译了 Python 中广义 Goertzel 技术的算法,可以在这里找到。
我在真实数据上使用它时遇到了麻烦,而且只能在真实数据上使用它:生成具有 5 个正弦分量的测试“合成”信号,Goertzel 返回正确的频率,显然比 FFT 具有更好的精度;这两种技术是一致的。
但是,如果我提供 N 个样本的市场数据,则 FFT 给出的最低频率 f = 1/N 正如预期的那样;Goertzel 返回所有高于 1 的频率。数据的时间帧是 1 小时,但数据没有用时间戳标记,也可能是秒,所以我的期望是,计算频率变换的两种方法应该返回,除了精度不同之外,应该在频域上返回相同的谐波。
为什么我在一种方法中得到的频率最低,而使用另一种具有真实数据的方法却大于 1?
import numpy as np
def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
# Check input arguments
if len(indvec) < 1:
raise ValueError('Not enough input arguments')
if not isinstance(x, np.ndarray) or x.size == 0:
raise ValueError('X must be a nonempty numpy array')
if not isinstance(indvec, np.ndarray) or indvec.size == 0:
raise ValueError('INDVEC must be a nonempty numpy array')
if np.iscomplex(indvec).any():
raise ValueError('INDVEC must contain real numbers')
lx = len(x)
x = x.reshape(lx, 1) # forcing x to be a column vector
# Initialization
no_freq = len(indvec)
y = np.zeros((no_freq,), dtype=complex)
# Computation via second-order system
for cnt_freq in range(no_freq):
# Precompute constants
pik_term = 2 * np.pi * indvec[cnt_freq] / lx
cos_pik_term2 = 2 * np.cos(pik_term)
cc = np.exp(-1j * pik_term) # complex constant
# State variables
s0 = 0
s1 = 0
s2 = 0
# Main loop
for ind in range(lx - 1):
s0 = x[ind] + cos_pik_term2 * s1 - s2
s2 = s1
s1 = s0
# Final computations
s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
y[cnt_freq] = s0 - s1 * cc
# Complex multiplication substituting the last iteration
# and correcting the phase for potentially non-integer valued
# frequencies at the same time
y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
return y
以下是合成测试 5 个分量信号的 FFT 和 Goertzel 变换图表
这里是 Goertzel 一个
原始频率是
signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]
相反,如果我尝试下载市场数据
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
并尝试转换 SPY 的数据['Close'],这就是我通过 N = 800 个样本的 FFT 转换得到的
以及前 2 个组件上的重建信号(不太好)
这就是我从 Goertzel 变换中得到的
请注意,FFT 上的第一个峰值低于 0.005,而 Goertzel 则高于 1。
这是我在 SPY 数据上测试 FFT 的方式
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
num_harmonics = num_harmonics *1
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Calcola la FFT dei valori "Close"
fft_result = np.fft.fft(data["Close"].values)
frequency_range = np.fft.fftfreq(len(fft_result))
print("Frequencies: ")
print(frequency_range)
print("N Frequencies: ")
print(len(frequency_range))
print("First frequencies magnitude: ")
print(np.abs(fft_result[0:num_harmonics]))
# Trova le armoniche dominanti
# top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one
print("Top harmonics: ")
print(top_harmonics)
# top_harmonics = [1, 4]#, 8, 5, 9]
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Calcola la ricostruzione basata sulle prime N armoniche
reconstructed_signal = np.zeros(len(data))
time = np.linspace(0, num_samples, num_samples, endpoint=False)
# print('time')
# print(time)
for harmonic_index in top_harmonics[:num_harmonics]:
amplitude = np.abs(fft_result[harmonic_index]) #.real
phase = np.angle(fft_result[harmonic_index])
frequency = frequency_range[harmonic_index]
reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)
# print('first reconstructed_signal len')
# print(len(reconstructed_signal))
zeros = np.zeros(len(original_data) - 2*len(data))
reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)
# print('second reconstructed_signal len')
# print(len(reconstructed_signal))
reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)
original_data['reconstructed_signal'] = reconstructed_signal
# print('reconstructed_signal len')
# print(len(reconstructed_signal))
# print('original_data len')
# print(len(original_data))
# print('reconstructed_signal[300:320]')
# print(reconstructed_signal[290:320])
# print('original_data[300:320]')
# print(original_data[290:320][['Close', 'reconstructed_signal']])
# reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])
# print('reconstructed_signal')
# print(reconstructed_signal)
# Converte i valori complessi in valori reali per la ricostruzione
# reconstructed_signal_real = reconstructed_signal.real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# Visualizza il grafico dello spettro
fig_spectrum.show()
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
as well as the test of SPY data on Goertzel
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Frequenze desiderate
frequency_range = np.arange(0, 20, 0.001)
# Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
transform = goertzel_general_shortened(data['Close'].values, frequency_range)
harmonics_amplitudes = np.abs(transform)
frequency_range = frequency_range
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Visualizza il grafico dello spettro
fig_spectrum.show()
peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
peak_frequencies = frequency_range[peaks_indexes]
peak_amplitudes = harmonics_amplitudes[peaks_indexes]
print('peaks_indexes')
print(peaks_indexes[0:30])
print('peak_frequencies')
print(peak_frequencies[0:30])
print('peak_amplitudes')
print(peak_amplitudes[0:30])
lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies
higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]
print('higher_amplitudes_sort_peak_indexes')
print(higher_amplitudes_sort_peak_indexes[0:10])
# used_indexes = lower_freq_sort_peak_indexes
used_indexes = higher_amplitudes_sort_peak_indexes
# Creazione del segnale ricostruito utilizzando i picchi
time = np.linspace(0, num_samples, num_samples, endpoint=False)
reconstructed_signal = np.zeros(len(time), dtype=float)
print('num_samples')
print(num_samples)
print('time[0:20]')
print(time[0:20])
print('reconstructed_signal')
print(reconstructed_signal[0:10])
for index in used_indexes:
phase = np.angle(transform[index])
amplitude = np.abs(transform[index])
frequency = frequency_range[index]
print('phase')
print(phase)
print('amplitude')
print(amplitude)
print('frequency')
print(frequency)
reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)
# Estrai la parte reale del segnale ricostruito
reconstructed_signal_real = reconstructed_signal
print('reconstructed_signal[1]')
print(reconstructed_signal[1])
print('reconstructed_signal.shape')
print(reconstructed_signal.shape)
zeros = np.zeros(len(original_data) - 2*num_samples)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
original_data['reconstructed_signal'] = reconstructed_signal_real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
Edit 30/09/2023
I tried to normalize the SPY data as suggested in the answers, but the problem is still there, here is the resulting chart
答:
I finally found the point. The Goertzel transform basically counts how many times each harmonic stays inside the sampling period returning, in addition, its amplitude and phase. To get the frequency it is necessary to divide by the number of samples, something that probably is implicitly done by the standard FFT libraries. The example with 5 "synthetic" sin waves turned out to be similar between FTT and Goertzel because was sampled in 1 second, so, for example, a 50Hz harmonic had exactly 50 sin waves in one full circle angle that madethe Goertzel transform resulting to be correct too.
The correction is in the following added second line of code
transform = goertzel_general_shortened(data, frequency_range)
frequency_range = frequency_range/num_samples
harmonics_amplitudes = np.abs(transform)
peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
peak_frequencies = frequency_range[peaks_indexes]
peak_periods = 1 / frequency_range[peaks_indexes]
peak_amplitudes = harmonics_amplitudes[peaks_indexes]
peak_phases = np.angle(transform[peaks_indexes])
peak_amplitudes = peak_amplitudes*peak_frequencies
harmonics_amplitudes = harmonics_amplitudes*frequency_range
评论