diff --git a/ntrfc/timeseries/stationarity.py b/ntrfc/timeseries/stationarity.py index f8fce52ffa0c3e70e3db22953e86fd97506340a7..e07de82018ce562250c9250f4ff366929d6ec6cc 100644 --- a/ntrfc/timeseries/stationarity.py +++ b/ntrfc/timeseries/stationarity.py @@ -3,10 +3,13 @@ from matplotlib import pyplot as plt from scipy import signal from sklearn.decomposition import PCA from statsmodels.tsa.arima.model import ARIMA +import pandas as pd from ntrfc.math.methods import minmax_normalize + + def optimal_bin_width(sample1, sample2): """ Compute the optimal bin width using cross-validation estimator for mean integrated squared error. @@ -43,19 +46,9 @@ def optimal_bin_width(sample1, sample2): return bins -def var_compare(sample1, sample2): - var1 = np.var(sample1) - var2 = np.var(sample2) - - if var1 == var2: - return 1.1 # variance is exactly the same, return value greater than 1 - else: - diff = np.abs(var1 - var2) - max_var = max(var1, var2) - return max(0, 1 - diff / max_var) -def pd_compare(sample1, sample2, verbose=False): +def smd_probability_compare(sample1, sample2, verbose=False): """Compare the probability distribution of two signals using the Freedman-Diaconis rule to determine the number of bins. @@ -91,7 +84,7 @@ def pd_compare(sample1, sample2, verbose=False): return similarity -def optimal_window_size(time_series, verbose=False): +def optimal_window_size(time_series, min_interval=0.05, max_interval=0.25, verbose=False): """ Determines the optimal window size for a given time series. @@ -121,48 +114,48 @@ def optimal_window_size(time_series, verbose=False): # Get the length of the time series and define a range of allowed window sizes series_length = len(normalized_series) - allowed_window_sizes = np.array(range(series_length // 10, series_length // 4)) + allowed_window_sizes = np.array(range(int(series_length*min_interval), int(series_length *max_interval)+1)) - # Define a threshold for the sanity test - sanity_threshold = 2.82 # Iterate through the allowed window sizes and perform the Kolmogorov-Smirnov test and correlation coefficient calculation - corr_coeff_scores = np.zeros(len(allowed_window_sizes)) - pd_similiarity_scores = np.zeros(len(allowed_window_sizes)) - var_similiarity_scores = np.zeros(len(allowed_window_sizes)) - for i, window_size in enumerate(allowed_window_sizes): + mean_scores = [] + var_scores = [] + for window_size in allowed_window_sizes: + check_window = normalized_series[-window_size * 2:] - first_subsequence = check_window[:-window_size] - second_subsequence = check_window[-window_size:] - pd_similiarity = pd_compare(second_subsequence, first_subsequence) # - corr_coeff = (1 + np.corrcoef(first_subsequence, second_subsequence)[0][1]) / 2 - var_similiarity = var_compare(first_subsequence, second_subsequence) - if np.array_equal(first_subsequence, first_subsequence): - corr_coeff = 1 - corr_coeff_scores[i] = corr_coeff - pd_similiarity_scores[i] = pd_similiarity - var_similiarity_scores[i] = var_similiarity - cumulated_scores = corr_coeff_scores + pd_similiarity_scores + var_similiarity_scores - if not any(cumulated_scores >= sanity_threshold): - return False, False, False - - optimal_window_size_index = np.argmax(cumulated_scores) + check_window_df = pd.DataFrame(check_window) + rolling_check = check_window_df.rolling(window_size) + mean_scores.append(np.std(rolling_check.mean()).values[0]) + var_scores.append(np.std(rolling_check.var()).values[0]) + # Compute the correlation coefficient + cumulated_scores = minmax_normalize(mean_scores) + minmax_normalize(np.array(var_scores)) + + optimal_window_size_index = np.argmin(cumulated_scores) opt_window_size = allowed_window_sizes[optimal_window_size_index] - opt_window = time_series[series_length - opt_window_size * 2:] - freqs, psd = signal.periodogram(opt_window, return_onesided=False) + # + opt_window = time_series[-opt_window_size * 2:] + + assert len(opt_window) == opt_window_size * 2 + probability_similiarity = smd_probability_compare(opt_window[:opt_window_size], opt_window[opt_window_size:]) + + if probability_similiarity< 0.99: + return False,False,False + + # Compute the period of the time series + freqs, psd = signal.welch(time_series, fs=1, nperseg=series_length // 4) maxpsdid = np.argmax(psd) + if maxpsdid != 0: tperiod = freqs[np.argmax(psd)] ** -1 - nperiods = int(opt_window_size / tperiod) + nperiods = (opt_window_size+(-opt_window_size%tperiod))//tperiod else: tperiod = np.inf nperiods = 0 # If verbose mode is enabled, display a plot of the correlation coefficient and KS test results for each window size if verbose: - plt.plot(corr_coeff_scores) - plt.plot(pd_similiarity_scores) + plt.plot(cumulated_scores) plt.axvline(optimal_window_size_index) plt.legend() plt.show() @@ -170,34 +163,11 @@ def optimal_window_size(time_series, verbose=False): return opt_window, opt_window_size, nperiods -def estimate_seasonal_error(series, opt_window_size): - snr_winsize = int(np.ceil((opt_window_size) / 16)) - if snr_winsize >= 4: - reconstructed_mean, fluct, snr = snr_pod(series, snr_winsize) - else: - reconstructed_mean, fluct, snr = snr_pod(series, len(series) // 2) - - # Calculate the rolling window of residuals - rolling_residuals = np.lib.stride_tricks.sliding_window_view(reconstructed_mean, opt_window_size) - - # Calculate the dominant frequency of residuals for each window - residual_freqs = [] - for window in rolling_residuals: - freqs, psd = signal.periodogram(window, return_onesided=False) - dominant_freq = freqs[np.argmax(psd)] - residual_freqs.append(dominant_freq) - - # Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies - residual_var = np.std(np.var(rolling_residuals, axis=1)) - residual_mean = np.std(np.mean(rolling_residuals, axis=1)) - residual_freq = np.std(residual_freqs) - - return residual_mean, residual_var, residual_freq - -def estimate_stationarity(timeseries, verbose=False): - sigma_threshold = 4 - normalized_series = minmax_normalize(timeseries) +def estimate_stationarity(timeseries, verbose=False): + sigma_threshold = 3 + normalized_series = minmax_normalize(timeseries)#minmax_normalize(timeseries) + datalength = len(normalized_series) opt_window, opt_window_size, nperiods = optimal_window_size(normalized_series) if not opt_window_size: return False @@ -208,145 +178,121 @@ def estimate_stationarity(timeseries, verbose=False): reference_mean = np.mean(reference_window) reference_autocorr_freq = reference_freqs[np.argmax(reference_psd)] reference_variance = np.var(reference_window) + opt_rolling_window = np.lib.stride_tricks.sliding_window_view(opt_window, opt_window_size) - residual_mean, residual_variance, residual_autocorr = estimate_residual_error(reference_window, opt_window_size) - seasonal_mean, seasonal_variance, seasonal_autocorr = estimate_seasonal_error(reference_window, - opt_window_size // nperiods) + rolling_means = np.mean(opt_rolling_window, axis=1) + rolling_vars = np.var(opt_rolling_window, axis=1) - mean_jk_error, var_jk_error, accr_jk_error = estimate_error_jacknife(reference_window, - block_size=opt_window_size // nperiods) - modelerror_mean, modelerror_freq, modelerror_var = estimate_model_error(reference_window, opt_window_size, nperiods) + assert len(rolling_means) == len(opt_rolling_window) + assert len(rolling_vars) == len(opt_rolling_window) - autocorr_limit = (seasonal_autocorr + residual_autocorr + accr_jk_error + modelerror_freq) * sigma_threshold - mean_limit = (seasonal_mean + residual_mean + mean_jk_error + modelerror_mean) * sigma_threshold - variance_limit = (seasonal_variance + residual_variance + var_jk_error + modelerror_var) * sigma_threshold + mean_uncertainty = np.std(rolling_means) + var_uncertainty = np.std(rolling_vars) - rolling_window = np.lib.stride_tricks.sliding_window_view(normalized_series, opt_window_size) + freqs_rolling = [] + for win in opt_rolling_window: + freqs, psd = signal.periodogram(win, return_onesided=False) + maxpsdid = np.argmax(psd) + freq = freqs[maxpsdid] + freqs_rolling.append(freq) - def cum_score(valid): - scores = np.cumsum(valid[::-1])[::-1] / np.arange(1, len(valid) + 1)[::-1] - index = np.where(scores >= 1)[0][0] - return scores, index + freq_uncertainty = np.std(freqs_rolling) - window_means = np.mean(rolling_window, axis=1) - mean_valid = np.isclose(abs(window_means - reference_mean), 0, atol=mean_limit) - if not any(mean_valid): - return False - mean_scores, mean_index = cum_score(mean_valid) + checkseries = normalized_series[:-opt_window_size * 2] + + checkseries_reversed = pd.DataFrame(checkseries[::-1]) + rolling_win_reversed = checkseries_reversed.rolling(window=opt_window_size) + + rolling_means_reversed = rolling_win_reversed.mean().values + rolling_vars_reversed = rolling_win_reversed.var().values - window_corrs = [] - for window in rolling_window: + def get_rolling_max_freq(window): freqs, psd = signal.periodogram(window, return_onesided=False) - window_corrs.append(freqs[np.argmax(psd)]) - autocorr_timescale_valid = np.isclose(abs(window_corrs - reference_autocorr_freq), 0, atol=autocorr_limit) - if not any(autocorr_timescale_valid): - return False - autocorr_timescale_scores, autocorr_timescale_index = cum_score(autocorr_timescale_valid) + return freqs[np.argmax(psd)] - window_variances = np.var(rolling_window, axis=1) - variance_valid = np.isclose(abs(window_variances - reference_variance), 0, atol=variance_limit) - if not any(variance_valid): - return False - variance_scores, variance_index = cum_score(variance_valid) + def get_rolling_histcompare(window): + return 1-smd_probability_compare(window,reference_window) - stationary_start_index = max(mean_index, autocorr_timescale_index, variance_index) - if verbose: - plt.plot(abs(window_variances - reference_variance), label="var error", color="red") - plt.hlines(y=(variance_limit), xmin=0, xmax=len(window_variances), label="var_err+") - plt.legend() - plt.show() - plt.plot(abs(window_corrs - reference_autocorr_freq), label="accr error", color="red") - plt.hlines(y=(autocorr_limit), xmin=0, xmax=len(window_corrs), label="accr_err+") - plt.legend() - plt.show() + rolling_win_reversed = checkseries_reversed.rolling(window=opt_window_size) + rolling_freqs_reversed = rolling_win_reversed.apply(get_rolling_max_freq, raw=False) - plt.plot(abs(window_means - reference_mean), label="mean error", color="red") - plt.hlines(y=(mean_limit), xmin=0, xmax=len(window_means), label="mean_err+") - plt.legend() - plt.show() - return stationary_start_index + outer_mean_uncertainty = 0.00175 + outer_var_uncertainty = outer_mean_uncertainty *0.25# variance can only be 25% of minmax normed series + outer_freq_uncertainty = 0.05 + outer_hists_errors = 0.003 -def estimate_residual_error(reference, opt_window_size): - """ - Estimates the residual errors for a given time series using an ARIMA model and a rolling window approach. + rolling_means_errors_reversed = np.abs(rolling_means_reversed - reference_mean) + rolling_vars_errors_reversed = np.abs(rolling_vars_reversed - reference_variance) + rolling_freqs_errors_reversed = np.array([np.abs(i - reference_autocorr_freq) for i in rolling_freqs_reversed.values]) + rolling_hists_errors_reversed = rolling_win_reversed.apply(get_rolling_histcompare, raw=False).values - In the context of time series analysis, the residual error (or simply "residuals") represents the difference - between the actual values of the time series and the predicted values from a model. In other words, it is the - part of the time series that the model is unable to explain. + mean_limits = sigma_threshold * (mean_uncertainty)+outer_mean_uncertainty + var_limits = sigma_threshold * (var_uncertainty)+outer_var_uncertainty + freq_limits = sigma_threshold * (freq_uncertainty)+outer_freq_uncertainty + hist_limits = sigma_threshold *(outer_hists_errors) - Args: - reference (array-like): A time series to estimate residual errors for. - opt_window_size (int): The size of the rolling window to use for calculating metrics. - Returns: - Tuple: A tuple containing the following metrics of the residual errors: - - residual_mean (float): The standard deviation of the mean residuals over the rolling windows. - - residual_var (float): The standard deviation of the residual variance over the rolling windows. - - residual_freq (float): The standard deviation of the dominant frequencies of the residuals over the rolling windows. - """ - # Fit an ARIMA model to the reference data - arima_model = ARIMA(reference, order=(1, 1, 1)) - arima_results = arima_model.fit() + rolling_freqs_errors_inliers_reversed = rolling_freqs_errors_reversed[opt_window_size :] <= freq_limits + rolling_means_errors_inliers_reversed = rolling_means_errors_reversed[opt_window_size:] <= mean_limits + rolling_vars_errors_inliers_reversed = rolling_vars_errors_reversed[opt_window_size:] <= var_limits + rolling_hists_errors_inliers_reversed = rolling_hists_errors_reversed[opt_window_size:] <= hist_limits - # Calculate the residuals - residuals = reference - arima_results.fittedvalues + def last_coherent_interval(arr, threshold=1): + reversed_arr = arr[::-1] + false_indices = np.where(reversed_arr == False)[0] + last_false_index = false_indices[-1] if len(false_indices) > 0 else 0 + coherent_arr = [False] * last_false_index + [True] * (len(reversed_arr) - last_false_index) + success_rate = np.array([np.sum(coherent_arr[i:]) / len(coherent_arr[i:]) for i in range(len(coherent_arr))]) + answer_index = np.where(success_rate >= threshold)[0][0] + return len(arr) - answer_index - # Calculate the rolling window of residuals - rolling_residuals = np.lib.stride_tricks.sliding_window_view(residuals, opt_window_size) + mean_index = datalength - last_coherent_interval(rolling_means_errors_inliers_reversed)-3*opt_window_size - # Calculate the dominant frequency of residuals for each window - residual_freqs = [] - for window in rolling_residuals: - freqs, psd = signal.periodogram(window, return_onesided=False) - dominant_freq = freqs[np.argmax(psd)] - residual_freqs.append(dominant_freq) + hist_index = datalength - last_coherent_interval(rolling_hists_errors_inliers_reversed)-3*opt_window_size + autocorr_timescale_index = datalength - last_coherent_interval(rolling_freqs_errors_inliers_reversed)-3*opt_window_size + variance_index = datalength - last_coherent_interval(rolling_vars_errors_inliers_reversed)-3*opt_window_size - # Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies - residual_var = np.std(np.var(rolling_residuals, axis=1)) - residual_mean = np.std(np.mean(rolling_residuals, axis=1)) - residual_freq = np.std(residual_freqs) + stationary_start_index = max(mean_index,autocorr_timescale_index, hist_index, variance_index) - return residual_mean, residual_var, residual_freq + if verbose: + fig ,axs = plt.subplots(5,1,figsize=(24,20)) + axs[0].plot(normalized_series, label="normalized series", color="blue") + axs[0].vlines(x=stationary_start_index, ymin=0, ymax=np.nanmax(normalized_series), label="stationary_start",color="k") -def estimate_model_error(reference_window, opt_window_size, nper): - ref_length = len(reference_window) - ref_mean = np.mean(reference_window) - ref_var = np.var(reference_window) + axs[1].plot(np.nan_to_num(rolling_means_errors_reversed[::-1], 0), label="mean error", color="red") + axs[1].hlines(y=(mean_limits), xmin=0, xmax=len(normalized_series), label="mean_limits",color="k") + axs[1].vlines(x=stationary_start_index, ymin=0, ymax=max(mean_limits, np.nanmax(rolling_means_errors_reversed)), label="stationary_start", color="k") + axs[1].vlines(x=mean_index, ymin=0, ymax=max(mean_limits, np.nanmax(rolling_means_errors_reversed)), label="mean_index", color="green") + axs[1].legend() - # Calculate the dominant frequency of residuals for each window - freqs, psd = signal.periodogram(reference_window, return_onesided=False) - ref_freq = freqs[np.argmax(psd)] - x = np.linspace(0, 2 * np.pi * nper, ref_length) + axs[2].plot(np.nan_to_num(rolling_vars_errors_reversed[::-1], 0), label="variance error", color="red") + axs[2].hlines(y=(var_limits), xmin=0, xmax=len(normalized_series), label="var_limits", color="k") + axs[2].vlines(x=stationary_start_index, ymin=0, ymax=max(var_limits, np.nanmax(rolling_vars_errors_reversed)), label="stationary_start", color="k") + axs[2].vlines(x=variance_index, ymin=0, ymax=max(var_limits, np.nanmax(rolling_vars_errors_reversed)), label="variance_index", color="k") + axs[2].legend() - y = np.sin(x) + 1 - rolling_residuals = np.lib.stride_tricks.sliding_window_view(y, opt_window_size) - # Calculate the dominant frequency of residuals for each window - residual_ts = [] - for window in rolling_residuals: - freqs, psd = signal.periodogram(window, return_onesided=False) - dominant_freq = freqs[np.argmax(psd)] - residual_ts.append(dominant_freq) - - y_mean = 1 - y_var = 0.5 - y_t = nper / ref_length - - # Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies - residual_var = np.var(rolling_residuals, axis=1) - residual_mean = np.mean(rolling_residuals, axis=1) - residual_freq = residual_ts - - residual_length = len(residual_mean) - mean_error = np.mean(np.abs(residual_mean - np.array([y_mean] * residual_length))) / y_mean * ref_mean - var_error = np.mean(np.abs(residual_var - np.array([y_var] * residual_length))) / y_var * ref_var - freq_error = np.mean(np.abs(residual_freq - np.array([y_t] * residual_length))) / y_t * ref_freq - return mean_error, freq_error, var_error + axs[3].plot(np.nan_to_num(rolling_freqs_errors_reversed[::-1], 0), label="autocorr error", color="red") + axs[3].hlines(y=(freq_limits), xmin=0, xmax=len(normalized_series), label="freq_limits", color="k") + axs[3].vlines(x=stationary_start_index, ymin=0, ymax=max(freq_limits, np.nanmax(rolling_freqs_errors_reversed)), label="stationary_start", color="k") + axs[3].vlines(x=autocorr_timescale_index, ymin=0, ymax=max(freq_limits, np.nanmax(rolling_freqs_errors_reversed)), label="autocorr_timescale_index", color="k") + axs[3].legend() + + + axs[4].plot(np.nan_to_num(rolling_hists_errors_reversed[::-1], 0), label="histogram error", color="red") + axs[4].hlines(y=(hist_limits), xmin=0, xmax=len(normalized_series), label="hist_limits", color="k") + axs[4].vlines(x=stationary_start_index, ymin=0, ymax=max(hist_limits, np.nanmax(rolling_hists_errors_reversed)), label="stationary_start", color="k") + axs[4].vlines(x=hist_index, ymin=0, ymax=max(hist_limits, np.nanmax(rolling_hists_errors_reversed)), label="hist_index", color="k") + axs[4].legend() + plt.show() + + return stationary_start_index + def estimate_error_jacknife(timeseries, block_size=20, n_samples=4000): diff --git a/tests/timeseries/test_ntrfc_stationarity.py b/tests/timeseries/test_ntrfc_stationarity.py index f8dd0f9daedd3dd7315aebde4693a6fcda757117..4ea82b67e7442a989230bf9f3438cb91511fac92 100644 --- a/tests/timeseries/test_ntrfc_stationarity.py +++ b/tests/timeseries/test_ntrfc_stationarity.py @@ -11,24 +11,24 @@ def test_optimal_timewindow(verbose=False): nper = 8 x = np.linspace(0, 2 * np.pi * nper, res) - + min_interval = 0.05 + max_interval = 0.25 # sin # we have four periods, at least one period should be captured # thats res // 4 as a return ysin = np.sin(x) - opt_window, opt_window_size, nperiods = optimal_window_size(ysin) - assert opt_window_size == res // (nper * nperiods) - 1 + opt_window, opt_window_size, nperiods = optimal_window_size(ysin,min_interval, max_interval) + assert opt_window_size == res // (nper * nperiods) assert nperiods == 1 # tanh - eul = np.tanh(x * 2) - opt_window, opt_window_size, nperiods = optimal_window_size(eul) - assert opt_window_size == res // 10 + tanh = np.tanh(x * 2) + opt_window, opt_window_size, nperiods = optimal_window_size(tanh, min_interval, max_interval) + assert opt_window_size == res * min_interval assert nperiods == 0 # euler - eul = np.e ** (-x * 60) - opt_window, opt_window_size, nperiods = optimal_window_size(eul) - assert opt_window_size == res // 10 + opt_window, opt_window_size, nperiods = optimal_window_size(eul, min_interval, max_interval) + assert opt_window_size == res * min_interval assert nperiods == 0 @@ -124,7 +124,7 @@ def test_stationarity_uncertainties_abatingsine(verbose=False): import matplotlib.pyplot as plt def signalgen_abatingsine(amplitude, frequency, mean, abate, time): - resolution = 48 + resolution = 64 step = (1 / frequency) / resolution times = np.arange(0, time, step) values = amplitude * np.sin(frequency * (2 * np.pi) * times) + mean + np.e ** -(times * abate) @@ -151,7 +151,8 @@ def test_stationarity_uncertainties_abatingsine(verbose=False): plt.figure() plt.plot(timesteps, values) plt.axvline(timesteps[stationary_timestep], color="green") - plt.axvline(well_computed_stationarity_limit, color="red") + plt.axvline(well_computed_stationarity_limit, color="red",label="computed") + plt.legend() plt.show() assert 0.05 > reldiff(stationary_time, well_computed_stationary_time), "computation failed" @@ -164,7 +165,6 @@ def test_stationarity_uncertainties_abatingsinenoise(verbose=False): import matplotlib.pyplot as plt def signalgen_abatingsine(amplitude, noiseamplitude, frequency, mean, abate, time): - # todo zu hoch aufgelöste signale können fehlerhaft sein resolution = 64 step = (1 / frequency) / resolution @@ -239,13 +239,13 @@ def test_stationarity_transientonly(verbose=False): assert statidx == False -def test_stationarity_sinetransientonly(): +def test_stationarity_transientonly(): from ntrfc.timeseries.stationarity import estimate_stationarity import numpy as np res = 20000 - values = np.arange(0, 10, res) + values = np.linspace(0, 10, res) stationary_timestep = estimate_stationarity(values) assert stationary_timestep == False