Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
N
NTRfC
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
TFD - Institut für Turbomaschinen und Fluid-Dynamik
tools
NTRfC
Commits
13fb9a09
Commit
13fb9a09
authored
1 year ago
by
Malte Nyhuis
Browse files
Options
Downloads
Plain Diff
Merge branch 'timeseries_finalization' into 'master'
Timeseries improvement See merge request
!33
parents
c82a7593
3cc1914b
No related branches found
No related tags found
1 merge request
!33
Timeseries improvement
Pipeline
#11373
failed
1 year ago
Stage: build
Stage: test
Stage: quality
Stage: docs
Stage: deploy
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
ntrfc/timeseries/stationarity.py
+122
-176
122 additions, 176 deletions
ntrfc/timeseries/stationarity.py
tests/timeseries/test_ntrfc_stationarity.py
+14
-14
14 additions, 14 deletions
tests/timeseries/test_ntrfc_stationarity.py
with
136 additions
and
190 deletions
ntrfc/timeseries/stationarity.py
+
122
−
176
View file @
13fb9a09
...
@@ -3,10 +3,13 @@ from matplotlib import pyplot as plt
...
@@ -3,10 +3,13 @@ from matplotlib import pyplot as plt
from
scipy
import
signal
from
scipy
import
signal
from
sklearn.decomposition
import
PCA
from
sklearn.decomposition
import
PCA
from
statsmodels.tsa.arima.model
import
ARIMA
from
statsmodels.tsa.arima.model
import
ARIMA
import
pandas
as
pd
from
ntrfc.math.methods
import
minmax_normalize
from
ntrfc.math.methods
import
minmax_normalize
def
optimal_bin_width
(
sample1
,
sample2
):
def
optimal_bin_width
(
sample1
,
sample2
):
"""
"""
Compute the optimal bin width using cross-validation estimator for mean integrated squared error.
Compute the optimal bin width using cross-validation estimator for mean integrated squared error.
...
@@ -43,19 +46,9 @@ def optimal_bin_width(sample1, sample2):
...
@@ -43,19 +46,9 @@ def optimal_bin_width(sample1, sample2):
return
bins
return
bins
def
var_compare
(
sample1
,
sample2
):
var1
=
np
.
var
(
sample1
)
var2
=
np
.
var
(
sample2
)
if
var1
==
var2
:
return
1.1
# variance is exactly the same, return value greater than 1
else
:
diff
=
np
.
abs
(
var1
-
var2
)
max_var
=
max
(
var1
,
var2
)
return
max
(
0
,
1
-
diff
/
max_var
)
def
pd
_compare
(
sample1
,
sample2
,
verbose
=
False
):
def
smd_probability
_compare
(
sample1
,
sample2
,
verbose
=
False
):
"""
Compare the probability distribution of two signals using the Freedman-Diaconis rule
"""
Compare the probability distribution of two signals using the Freedman-Diaconis rule
to determine the number of bins.
to determine the number of bins.
...
@@ -91,7 +84,7 @@ def pd_compare(sample1, sample2, verbose=False):
...
@@ -91,7 +84,7 @@ def pd_compare(sample1, sample2, verbose=False):
return
similarity
return
similarity
def
optimal_window_size
(
time_series
,
verbose
=
False
):
def
optimal_window_size
(
time_series
,
min_interval
=
0.05
,
max_interval
=
0.25
,
verbose
=
False
):
"""
"""
Determines the optimal window size for a given time series.
Determines the optimal window size for a given time series.
...
@@ -121,48 +114,48 @@ def optimal_window_size(time_series, verbose=False):
...
@@ -121,48 +114,48 @@ def optimal_window_size(time_series, verbose=False):
# Get the length of the time series and define a range of allowed window sizes
# Get the length of the time series and define a range of allowed window sizes
series_length
=
len
(
normalized_series
)
series_length
=
len
(
normalized_series
)
allowed_window_sizes
=
np
.
array
(
range
(
series_length
//
10
,
series_length
//
4
))
allowed_window_sizes
=
np
.
array
(
range
(
int
(
series_length
*
min_interval
),
int
(
series_length
*
max_interval
)
+
1
))
# Define a threshold for the sanity test
sanity_threshold
=
2.82
# Iterate through the allowed window sizes and perform the Kolmogorov-Smirnov test and correlation coefficient calculation
# Iterate through the allowed window sizes and perform the Kolmogorov-Smirnov test and correlation coefficient calculation
corr_coeff_scores
=
np
.
zeros
(
len
(
allowed_window_sizes
))
mean_scores
=
[]
pd_similiarity_scores
=
np
.
zeros
(
len
(
allowed_window_sizes
))
var_scores
=
[]
var_similiarity_scores
=
np
.
zeros
(
len
(
allowed_window_sizes
))
for
window_size
in
allowed_window_sizes
:
for
i
,
window_size
in
enumerate
(
allowed_window_sizes
):
check_window
=
normalized_series
[
-
window_size
*
2
:]
check_window
=
normalized_series
[
-
window_size
*
2
:]
first_subsequence
=
check_window
[:
-
window_size
]
check_window_df
=
pd
.
DataFrame
(
check_window
)
second_subsequence
=
check_window
[
-
window_size
:]
rolling_check
=
check_window_df
.
rolling
(
window_size
)
pd_similiarity
=
pd_compare
(
second_subsequence
,
first_subsequence
)
#
mean_scores
.
append
(
np
.
std
(
rolling_check
.
mean
()).
values
[
0
])
corr_coeff
=
(
1
+
np
.
corrcoef
(
first_subsequence
,
second_subsequence
)[
0
][
1
])
/
2
var_scores
.
append
(
np
.
std
(
rolling_check
.
var
()).
values
[
0
])
var_similiarity
=
var_compare
(
first_subsequence
,
second_subsequence
)
# Compute the correlation coefficient
if
np
.
array_equal
(
first_subsequence
,
first_subsequence
):
cumulated_scores
=
minmax_normalize
(
mean_scores
)
+
minmax_normalize
(
np
.
array
(
var_scores
))
corr_coeff
=
1
corr_coeff_scores
[
i
]
=
corr_coeff
optimal_window_size_index
=
np
.
argmin
(
cumulated_scores
)
pd_similiarity_scores
[
i
]
=
pd_similiarity
var_similiarity_scores
[
i
]
=
var_similiarity
cumulated_scores
=
corr_coeff_scores
+
pd_similiarity_scores
+
var_similiarity_scores
if
not
any
(
cumulated_scores
>=
sanity_threshold
):
return
False
,
False
,
False
optimal_window_size_index
=
np
.
argmax
(
cumulated_scores
)
opt_window_size
=
allowed_window_sizes
[
optimal_window_size_index
]
opt_window_size
=
allowed_window_sizes
[
optimal_window_size_index
]
opt_window
=
time_series
[
series_length
-
opt_window_size
*
2
:]
#
freqs
,
psd
=
signal
.
periodogram
(
opt_window
,
return_onesided
=
False
)
opt_window
=
time_series
[
-
opt_window_size
*
2
:]
assert
len
(
opt_window
)
==
opt_window_size
*
2
probability_similiarity
=
smd_probability_compare
(
opt_window
[:
opt_window_size
],
opt_window
[
opt_window_size
:])
if
probability_similiarity
<
0.99
:
return
False
,
False
,
False
# Compute the period of the time series
freqs
,
psd
=
signal
.
welch
(
time_series
,
fs
=
1
,
nperseg
=
series_length
//
4
)
maxpsdid
=
np
.
argmax
(
psd
)
maxpsdid
=
np
.
argmax
(
psd
)
if
maxpsdid
!=
0
:
if
maxpsdid
!=
0
:
tperiod
=
freqs
[
np
.
argmax
(
psd
)]
**
-
1
tperiod
=
freqs
[
np
.
argmax
(
psd
)]
**
-
1
nperiods
=
int
(
opt_window_size
/
tperiod
)
nperiods
=
(
opt_window_size
+
(
-
opt_window_size
%
tperiod
))
//
tperiod
else
:
else
:
tperiod
=
np
.
inf
tperiod
=
np
.
inf
nperiods
=
0
nperiods
=
0
# If verbose mode is enabled, display a plot of the correlation coefficient and KS test results for each window size
# If verbose mode is enabled, display a plot of the correlation coefficient and KS test results for each window size
if
verbose
:
if
verbose
:
plt
.
plot
(
corr_coeff_scores
)
plt
.
plot
(
cumulated_scores
)
plt
.
plot
(
pd_similiarity_scores
)
plt
.
axvline
(
optimal_window_size_index
)
plt
.
axvline
(
optimal_window_size_index
)
plt
.
legend
()
plt
.
legend
()
plt
.
show
()
plt
.
show
()
...
@@ -170,34 +163,11 @@ def optimal_window_size(time_series, verbose=False):
...
@@ -170,34 +163,11 @@ def optimal_window_size(time_series, verbose=False):
return
opt_window
,
opt_window_size
,
nperiods
return
opt_window
,
opt_window_size
,
nperiods
def
estimate_seasonal_error
(
series
,
opt_window_size
):
snr_winsize
=
int
(
np
.
ceil
((
opt_window_size
)
/
16
))
if
snr_winsize
>=
4
:
reconstructed_mean
,
fluct
,
snr
=
snr_pod
(
series
,
snr_winsize
)
else
:
reconstructed_mean
,
fluct
,
snr
=
snr_pod
(
series
,
len
(
series
)
//
2
)
# Calculate the rolling window of residuals
rolling_residuals
=
np
.
lib
.
stride_tricks
.
sliding_window_view
(
reconstructed_mean
,
opt_window_size
)
# Calculate the dominant frequency of residuals for each window
residual_freqs
=
[]
for
window
in
rolling_residuals
:
freqs
,
psd
=
signal
.
periodogram
(
window
,
return_onesided
=
False
)
dominant_freq
=
freqs
[
np
.
argmax
(
psd
)]
residual_freqs
.
append
(
dominant_freq
)
# Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies
residual_var
=
np
.
std
(
np
.
var
(
rolling_residuals
,
axis
=
1
))
residual_mean
=
np
.
std
(
np
.
mean
(
rolling_residuals
,
axis
=
1
))
residual_freq
=
np
.
std
(
residual_freqs
)
return
residual_mean
,
residual_var
,
residual_freq
def
estimate_stationarity
(
timeseries
,
verbose
=
False
):
def
estimate_stationarity
(
timeseries
,
verbose
=
False
):
sigma_threshold
=
4
sigma_threshold
=
3
normalized_series
=
minmax_normalize
(
timeseries
)
normalized_series
=
minmax_normalize
(
timeseries
)
#minmax_normalize(timeseries)
datalength
=
len
(
normalized_series
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
normalized_series
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
normalized_series
)
if
not
opt_window_size
:
if
not
opt_window_size
:
return
False
return
False
...
@@ -208,145 +178,121 @@ def estimate_stationarity(timeseries, verbose=False):
...
@@ -208,145 +178,121 @@ def estimate_stationarity(timeseries, verbose=False):
reference_mean
=
np
.
mean
(
reference_window
)
reference_mean
=
np
.
mean
(
reference_window
)
reference_autocorr_freq
=
reference_freqs
[
np
.
argmax
(
reference_psd
)]
reference_autocorr_freq
=
reference_freqs
[
np
.
argmax
(
reference_psd
)]
reference_variance
=
np
.
var
(
reference_window
)
reference_variance
=
np
.
var
(
reference_window
)
opt_rolling_window
=
np
.
lib
.
stride_tricks
.
sliding_window_view
(
opt_window
,
opt_window_size
)
residual_mean
,
residual_variance
,
residual_autocorr
=
estimate_residual_error
(
reference_window
,
opt_window_size
)
rolling_means
=
np
.
mean
(
opt_rolling_window
,
axis
=
1
)
seasonal_mean
,
seasonal_variance
,
seasonal_autocorr
=
estimate_seasonal_error
(
reference_window
,
rolling_vars
=
np
.
var
(
opt_rolling_window
,
axis
=
1
)
opt_window_size
//
nperiods
)
mean_jk_error
,
var_jk_error
,
accr_jk_error
=
estimate_error_jacknife
(
reference_window
,
assert
len
(
rolling_means
)
==
len
(
opt_rolling_window
)
block_size
=
opt_window_size
//
nperiods
)
assert
len
(
rolling_vars
)
==
len
(
opt_rolling_window
)
modelerror_mean
,
modelerror_freq
,
modelerror_var
=
estimate_model_error
(
reference_window
,
opt_window_size
,
nperiods
)
autocorr_limit
=
(
seasonal_autocorr
+
residual_autocorr
+
accr_jk_error
+
modelerror_freq
)
*
sigma_threshold
mean_uncertainty
=
np
.
std
(
rolling_means
)
mean_limit
=
(
seasonal_mean
+
residual_mean
+
mean_jk_error
+
modelerror_mean
)
*
sigma_threshold
var_uncertainty
=
np
.
std
(
rolling_vars
)
variance_limit
=
(
seasonal_variance
+
residual_variance
+
var_jk_error
+
modelerror_var
)
*
sigma_threshold
rolling_window
=
np
.
lib
.
stride_tricks
.
sliding_window_view
(
normalized_series
,
opt_window_size
)
freqs_rolling
=
[]
for
win
in
opt_rolling_window
:
freqs
,
psd
=
signal
.
periodogram
(
win
,
return_onesided
=
False
)
maxpsdid
=
np
.
argmax
(
psd
)
freq
=
freqs
[
maxpsdid
]
freqs_rolling
.
append
(
freq
)
def
cum_score
(
valid
):
freq_uncertainty
=
np
.
std
(
freqs_rolling
)
scores
=
np
.
cumsum
(
valid
[::
-
1
])[::
-
1
]
/
np
.
arange
(
1
,
len
(
valid
)
+
1
)[::
-
1
]
index
=
np
.
where
(
scores
>=
1
)[
0
][
0
]
return
scores
,
index
window_means
=
np
.
mean
(
rolling_window
,
axis
=
1
)
checkseries
=
normalized_series
[:
-
opt_window_size
*
2
]
mean_valid
=
np
.
isclose
(
abs
(
window_means
-
reference_mean
),
0
,
atol
=
mean_limit
)
if
not
any
(
mean_valid
):
checkseries_reversed
=
pd
.
DataFrame
(
checkseries
[::
-
1
])
return
False
rolling_win_reversed
=
checkseries_reversed
.
rolling
(
window
=
opt_window_size
)
mean_scores
,
mean_index
=
cum_score
(
mean_valid
)
rolling_means_reversed
=
rolling_win_reversed
.
mean
().
values
rolling_vars_reversed
=
rolling_win_reversed
.
var
().
values
window_corrs
=
[]
def
get_rolling_max_freq
(
window
):
for
window
in
rolling_window
:
freqs
,
psd
=
signal
.
periodogram
(
window
,
return_onesided
=
False
)
freqs
,
psd
=
signal
.
periodogram
(
window
,
return_onesided
=
False
)
window_corrs
.
append
(
freqs
[
np
.
argmax
(
psd
)])
return
freqs
[
np
.
argmax
(
psd
)]
autocorr_timescale_valid
=
np
.
isclose
(
abs
(
window_corrs
-
reference_autocorr_freq
),
0
,
atol
=
autocorr_limit
)
if
not
any
(
autocorr_timescale_valid
):
return
False
autocorr_timescale_scores
,
autocorr_timescale_index
=
cum_score
(
autocorr_timescale_valid
)
window_variances
=
np
.
var
(
rolling_window
,
axis
=
1
)
def
get_rolling_histcompare
(
window
):
variance_valid
=
np
.
isclose
(
abs
(
window_variances
-
reference_variance
),
0
,
atol
=
variance_limit
)
return
1
-
smd_probability_compare
(
window
,
reference_window
)
if
not
any
(
variance_valid
):
return
False
variance_scores
,
variance_index
=
cum_score
(
variance_valid
)
stationary_start_index
=
max
(
mean_index
,
autocorr_timescale_index
,
variance_index
)
if
verbose
:
plt
.
plot
(
abs
(
window_variances
-
reference_variance
),
label
=
"
var error
"
,
color
=
"
red
"
)
plt
.
hlines
(
y
=
(
variance_limit
),
xmin
=
0
,
xmax
=
len
(
window_variances
),
label
=
"
var_err+
"
)
plt
.
legend
()
plt
.
show
()
plt
.
plot
(
abs
(
window_corrs
-
reference_autocorr_freq
),
label
=
"
accr error
"
,
color
=
"
red
"
)
rolling_win_reversed
=
checkseries_reversed
.
rolling
(
window
=
opt_window_size
)
plt
.
hlines
(
y
=
(
autocorr_limit
),
xmin
=
0
,
xmax
=
len
(
window_corrs
),
label
=
"
accr_err+
"
)
rolling_freqs_reversed
=
rolling_win_reversed
.
apply
(
get_rolling_max_freq
,
raw
=
False
)
plt
.
legend
()
plt
.
show
()
plt
.
plot
(
abs
(
window_means
-
reference_mean
),
label
=
"
mean error
"
,
color
=
"
red
"
)
plt
.
hlines
(
y
=
(
mean_limit
),
xmin
=
0
,
xmax
=
len
(
window_means
),
label
=
"
mean_err+
"
)
plt
.
legend
()
plt
.
show
()
return
stationary_start_index
outer_mean_uncertainty
=
0.00175
outer_var_uncertainty
=
outer_mean_uncertainty
*
0.25
# variance can only be 25% of minmax normed series
outer_freq_uncertainty
=
0.05
outer_hists_errors
=
0.003
def
estimate_residual_error
(
reference
,
opt_window_size
):
rolling_means_errors_reversed
=
np
.
abs
(
rolling_means_reversed
-
reference_mean
)
"""
rolling_vars_errors_reversed
=
np
.
abs
(
rolling_vars_reversed
-
reference_variance
)
Estimates the residual errors for a given time series using an ARIMA model and a rolling window approach.
rolling_freqs_errors_reversed
=
np
.
array
([
np
.
abs
(
i
-
reference_autocorr_freq
)
for
i
in
rolling_freqs_reversed
.
values
])
rolling_hists_errors_reversed
=
rolling_win_reversed
.
apply
(
get_rolling_histcompare
,
raw
=
False
).
values
In the context of time series analysis, the residual error (or simply
"
residuals
"
) represents the difference
mean_limits
=
sigma_threshold
*
(
mean_uncertainty
)
+
outer_mean_uncertainty
between the actual values of the time series and the predicted values from a model. In other words, it is the
var_limits
=
sigma_threshold
*
(
var_uncertainty
)
+
outer_var_uncertainty
part of the time series that the model is unable to explain.
freq_limits
=
sigma_threshold
*
(
freq_uncertainty
)
+
outer_freq_uncertainty
hist_limits
=
sigma_threshold
*
(
outer_hists_errors
)
Args:
reference (array-like): A time series to estimate residual errors for.
opt_window_size (int): The size of the rolling window to use for calculating metrics.
Returns:
rolling_freqs_errors_inliers_reversed
=
rolling_freqs_errors_reversed
[
opt_window_size
:]
<=
freq_limits
Tuple: A tuple containing the following metrics of the residual errors:
rolling_means_errors_inliers_reversed
=
rolling_means_errors_reversed
[
opt_window_size
:]
<=
mean_limits
- residual_mean (float): The standard deviation of the mean residuals over the rolling windows.
rolling_vars_errors_inliers_reversed
=
rolling_vars_errors_reversed
[
opt_window_size
:]
<=
var_limits
- residual_var (float): The standard deviation of the residual variance over the rolling windows.
rolling_hists_errors_inliers_reversed
=
rolling_hists_errors_reversed
[
opt_window_size
:]
<=
hist_limits
- residual_freq (float): The standard deviation of the dominant frequencies of the residuals over the rolling windows.
"""
# Fit an ARIMA model to the reference data
arima_model
=
ARIMA
(
reference
,
order
=
(
1
,
1
,
1
))
arima_results
=
arima_model
.
fit
()
# Calculate the residuals
def
last_coherent_interval
(
arr
,
threshold
=
1
):
residuals
=
reference
-
arima_results
.
fittedvalues
reversed_arr
=
arr
[::
-
1
]
false_indices
=
np
.
where
(
reversed_arr
==
False
)[
0
]
last_false_index
=
false_indices
[
-
1
]
if
len
(
false_indices
)
>
0
else
0
coherent_arr
=
[
False
]
*
last_false_index
+
[
True
]
*
(
len
(
reversed_arr
)
-
last_false_index
)
success_rate
=
np
.
array
([
np
.
sum
(
coherent_arr
[
i
:])
/
len
(
coherent_arr
[
i
:])
for
i
in
range
(
len
(
coherent_arr
))])
answer_index
=
np
.
where
(
success_rate
>=
threshold
)[
0
][
0
]
return
len
(
arr
)
-
answer_index
# Calculate the rolling window of residuals
mean_index
=
datalength
-
last_coherent_interval
(
rolling_means_errors_inliers_reversed
)
-
3
*
opt_window_size
rolling_residuals
=
np
.
lib
.
stride_tricks
.
sliding_window_view
(
residuals
,
opt_window_size
)
# Calculate the dominant frequency of residuals for each window
hist_index
=
datalength
-
last_coherent_interval
(
rolling_hists_errors_inliers_reversed
)
-
3
*
opt_window_size
residual_freqs
=
[]
autocorr_timescale_index
=
datalength
-
last_coherent_interval
(
rolling_freqs_errors_inliers_reversed
)
-
3
*
opt_window_size
for
window
in
rolling_residuals
:
variance_index
=
datalength
-
last_coherent_interval
(
rolling_vars_errors_inliers_reversed
)
-
3
*
opt_window_size
freqs
,
psd
=
signal
.
periodogram
(
window
,
return_onesided
=
False
)
dominant_freq
=
freqs
[
np
.
argmax
(
psd
)]
residual_freqs
.
append
(
dominant_freq
)
# Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies
stationary_start_index
=
max
(
mean_index
,
autocorr_timescale_index
,
hist_index
,
variance_index
)
residual_var
=
np
.
std
(
np
.
var
(
rolling_residuals
,
axis
=
1
))
residual_mean
=
np
.
std
(
np
.
mean
(
rolling_residuals
,
axis
=
1
))
residual_freq
=
np
.
std
(
residual_freqs
)
return
residual_mean
,
residual_var
,
residual_freq
if
verbose
:
fig
,
axs
=
plt
.
subplots
(
5
,
1
,
figsize
=
(
24
,
20
))
axs
[
0
].
plot
(
normalized_series
,
label
=
"
normalized series
"
,
color
=
"
blue
"
)
axs
[
0
].
vlines
(
x
=
stationary_start_index
,
ymin
=
0
,
ymax
=
np
.
nanmax
(
normalized_series
),
label
=
"
stationary_start
"
,
color
=
"
k
"
)
def
estimate_model_error
(
reference_window
,
opt_window_size
,
nper
):
axs
[
1
].
plot
(
np
.
nan_to_num
(
rolling_means_errors_reversed
[::
-
1
],
0
),
label
=
"
mean error
"
,
color
=
"
red
"
)
ref_length
=
len
(
reference_window
)
axs
[
1
].
hlines
(
y
=
(
mean_limits
),
xmin
=
0
,
xmax
=
len
(
normalized_series
),
label
=
"
mean_limits
"
,
color
=
"
k
"
)
ref_mean
=
np
.
mean
(
reference_window
)
axs
[
1
].
vlines
(
x
=
stationary_start_index
,
ymin
=
0
,
ymax
=
max
(
mean_limits
,
np
.
nanmax
(
rolling_means_errors_reversed
)),
label
=
"
stationary_start
"
,
color
=
"
k
"
)
ref_var
=
np
.
var
(
reference_window
)
axs
[
1
].
vlines
(
x
=
mean_index
,
ymin
=
0
,
ymax
=
max
(
mean_limits
,
np
.
nanmax
(
rolling_means_errors_reversed
)),
label
=
"
mean_index
"
,
color
=
"
green
"
)
axs
[
1
].
legend
()
# Calculate the dominant frequency of residuals for each window
freqs
,
psd
=
signal
.
periodogram
(
reference_window
,
return_onesided
=
False
)
ref_freq
=
freqs
[
np
.
argmax
(
psd
)]
x
=
np
.
linspace
(
0
,
2
*
np
.
pi
*
nper
,
ref_length
)
axs
[
2
].
plot
(
np
.
nan_to_num
(
rolling_vars_errors_reversed
[::
-
1
],
0
),
label
=
"
variance error
"
,
color
=
"
red
"
)
axs
[
2
].
hlines
(
y
=
(
var_limits
),
xmin
=
0
,
xmax
=
len
(
normalized_series
),
label
=
"
var_limits
"
,
color
=
"
k
"
)
axs
[
2
].
vlines
(
x
=
stationary_start_index
,
ymin
=
0
,
ymax
=
max
(
var_limits
,
np
.
nanmax
(
rolling_vars_errors_reversed
)),
label
=
"
stationary_start
"
,
color
=
"
k
"
)
axs
[
2
].
vlines
(
x
=
variance_index
,
ymin
=
0
,
ymax
=
max
(
var_limits
,
np
.
nanmax
(
rolling_vars_errors_reversed
)),
label
=
"
variance_index
"
,
color
=
"
k
"
)
axs
[
2
].
legend
()
y
=
np
.
sin
(
x
)
+
1
rolling_residuals
=
np
.
lib
.
stride_tricks
.
sliding_window_view
(
y
,
opt_window_size
)
axs
[
3
].
plot
(
np
.
nan_to_num
(
rolling_freqs_errors_reversed
[::
-
1
],
0
),
label
=
"
autocorr error
"
,
color
=
"
red
"
)
# Calculate the dominant frequency of residuals for each window
axs
[
3
].
hlines
(
y
=
(
freq_limits
),
xmin
=
0
,
xmax
=
len
(
normalized_series
),
label
=
"
freq_limits
"
,
color
=
"
k
"
)
residual_ts
=
[]
axs
[
3
].
vlines
(
x
=
stationary_start_index
,
ymin
=
0
,
ymax
=
max
(
freq_limits
,
np
.
nanmax
(
rolling_freqs_errors_reversed
)),
label
=
"
stationary_start
"
,
color
=
"
k
"
)
for
window
in
rolling_residuals
:
axs
[
3
].
vlines
(
x
=
autocorr_timescale_index
,
ymin
=
0
,
ymax
=
max
(
freq_limits
,
np
.
nanmax
(
rolling_freqs_errors_reversed
)),
label
=
"
autocorr_timescale_index
"
,
color
=
"
k
"
)
freqs
,
psd
=
signal
.
periodogram
(
window
,
return_onesided
=
False
)
axs
[
3
].
legend
()
dominant_freq
=
freqs
[
np
.
argmax
(
psd
)]
residual_ts
.
append
(
dominant_freq
)
axs
[
4
].
plot
(
np
.
nan_to_num
(
rolling_hists_errors_reversed
[::
-
1
],
0
),
label
=
"
histogram error
"
,
color
=
"
red
"
)
y_mean
=
1
axs
[
4
].
hlines
(
y
=
(
hist_limits
),
xmin
=
0
,
xmax
=
len
(
normalized_series
),
label
=
"
hist_limits
"
,
color
=
"
k
"
)
y_var
=
0.5
axs
[
4
].
vlines
(
x
=
stationary_start_index
,
ymin
=
0
,
ymax
=
max
(
hist_limits
,
np
.
nanmax
(
rolling_hists_errors_reversed
)),
label
=
"
stationary_start
"
,
color
=
"
k
"
)
y_t
=
nper
/
ref_length
axs
[
4
].
vlines
(
x
=
hist_index
,
ymin
=
0
,
ymax
=
max
(
hist_limits
,
np
.
nanmax
(
rolling_hists_errors_reversed
)),
label
=
"
hist_index
"
,
color
=
"
k
"
)
axs
[
4
].
legend
()
# Calculate the standard deviation of mean residuals, residual variance, and dominant frequencies
plt
.
show
()
residual_var
=
np
.
var
(
rolling_residuals
,
axis
=
1
)
residual_mean
=
np
.
mean
(
rolling_residuals
,
axis
=
1
)
return
stationary_start_index
residual_freq
=
residual_ts
residual_length
=
len
(
residual_mean
)
mean_error
=
np
.
mean
(
np
.
abs
(
residual_mean
-
np
.
array
([
y_mean
]
*
residual_length
)))
/
y_mean
*
ref_mean
var_error
=
np
.
mean
(
np
.
abs
(
residual_var
-
np
.
array
([
y_var
]
*
residual_length
)))
/
y_var
*
ref_var
freq_error
=
np
.
mean
(
np
.
abs
(
residual_freq
-
np
.
array
([
y_t
]
*
residual_length
)))
/
y_t
*
ref_freq
return
mean_error
,
freq_error
,
var_error
def
estimate_error_jacknife
(
timeseries
,
block_size
=
20
,
n_samples
=
4000
):
def
estimate_error_jacknife
(
timeseries
,
block_size
=
20
,
n_samples
=
4000
):
...
...
This diff is collapsed.
Click to expand it.
tests/timeseries/test_ntrfc_stationarity.py
+
14
−
14
View file @
13fb9a09
...
@@ -11,24 +11,24 @@ def test_optimal_timewindow(verbose=False):
...
@@ -11,24 +11,24 @@ def test_optimal_timewindow(verbose=False):
nper
=
8
nper
=
8
x
=
np
.
linspace
(
0
,
2
*
np
.
pi
*
nper
,
res
)
x
=
np
.
linspace
(
0
,
2
*
np
.
pi
*
nper
,
res
)
min_interval
=
0.05
max_interval
=
0.25
# sin
# sin
# we have four periods, at least one period should be captured
# we have four periods, at least one period should be captured
# thats res // 4 as a return
# thats res // 4 as a return
ysin
=
np
.
sin
(
x
)
ysin
=
np
.
sin
(
x
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
ysin
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
ysin
,
min_interval
,
max_interval
)
assert
opt_window_size
==
res
//
(
nper
*
nperiods
)
-
1
assert
opt_window_size
==
res
//
(
nper
*
nperiods
)
assert
nperiods
==
1
assert
nperiods
==
1
# tanh
# tanh
eul
=
np
.
tanh
(
x
*
2
)
tanh
=
np
.
tanh
(
x
*
2
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
eu
l
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
tanh
,
min_interval
,
max_interva
l
)
assert
opt_window_size
==
res
//
10
assert
opt_window_size
==
res
*
min_interval
assert
nperiods
==
0
assert
nperiods
==
0
# euler
# euler
eul
=
np
.
e
**
(
-
x
*
60
)
eul
=
np
.
e
**
(
-
x
*
60
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
eul
)
opt_window
,
opt_window_size
,
nperiods
=
optimal_window_size
(
eul
,
min_interval
,
max_interval
)
assert
opt_window_size
==
res
//
10
assert
opt_window_size
==
res
*
min_interval
assert
nperiods
==
0
assert
nperiods
==
0
...
@@ -124,7 +124,7 @@ def test_stationarity_uncertainties_abatingsine(verbose=False):
...
@@ -124,7 +124,7 @@ def test_stationarity_uncertainties_abatingsine(verbose=False):
import
matplotlib.pyplot
as
plt
import
matplotlib.pyplot
as
plt
def
signalgen_abatingsine
(
amplitude
,
frequency
,
mean
,
abate
,
time
):
def
signalgen_abatingsine
(
amplitude
,
frequency
,
mean
,
abate
,
time
):
resolution
=
4
8
resolution
=
6
4
step
=
(
1
/
frequency
)
/
resolution
step
=
(
1
/
frequency
)
/
resolution
times
=
np
.
arange
(
0
,
time
,
step
)
times
=
np
.
arange
(
0
,
time
,
step
)
values
=
amplitude
*
np
.
sin
(
frequency
*
(
2
*
np
.
pi
)
*
times
)
+
mean
+
np
.
e
**
-
(
times
*
abate
)
values
=
amplitude
*
np
.
sin
(
frequency
*
(
2
*
np
.
pi
)
*
times
)
+
mean
+
np
.
e
**
-
(
times
*
abate
)
...
@@ -151,7 +151,8 @@ def test_stationarity_uncertainties_abatingsine(verbose=False):
...
@@ -151,7 +151,8 @@ def test_stationarity_uncertainties_abatingsine(verbose=False):
plt
.
figure
()
plt
.
figure
()
plt
.
plot
(
timesteps
,
values
)
plt
.
plot
(
timesteps
,
values
)
plt
.
axvline
(
timesteps
[
stationary_timestep
],
color
=
"
green
"
)
plt
.
axvline
(
timesteps
[
stationary_timestep
],
color
=
"
green
"
)
plt
.
axvline
(
well_computed_stationarity_limit
,
color
=
"
red
"
)
plt
.
axvline
(
well_computed_stationarity_limit
,
color
=
"
red
"
,
label
=
"
computed
"
)
plt
.
legend
()
plt
.
show
()
plt
.
show
()
assert
0.05
>
reldiff
(
stationary_time
,
well_computed_stationary_time
),
"
computation failed
"
assert
0.05
>
reldiff
(
stationary_time
,
well_computed_stationary_time
),
"
computation failed
"
...
@@ -164,7 +165,6 @@ def test_stationarity_uncertainties_abatingsinenoise(verbose=False):
...
@@ -164,7 +165,6 @@ def test_stationarity_uncertainties_abatingsinenoise(verbose=False):
import
matplotlib.pyplot
as
plt
import
matplotlib.pyplot
as
plt
def
signalgen_abatingsine
(
amplitude
,
noiseamplitude
,
frequency
,
mean
,
abate
,
time
):
def
signalgen_abatingsine
(
amplitude
,
noiseamplitude
,
frequency
,
mean
,
abate
,
time
):
# todo zu hoch aufgelöste signale können fehlerhaft sein
resolution
=
64
resolution
=
64
step
=
(
1
/
frequency
)
/
resolution
step
=
(
1
/
frequency
)
/
resolution
...
@@ -239,13 +239,13 @@ def test_stationarity_transientonly(verbose=False):
...
@@ -239,13 +239,13 @@ def test_stationarity_transientonly(verbose=False):
assert
statidx
==
False
assert
statidx
==
False
def
test_stationarity_
sine
transientonly
():
def
test_stationarity_transientonly
():
from
ntrfc.timeseries.stationarity
import
estimate_stationarity
from
ntrfc.timeseries.stationarity
import
estimate_stationarity
import
numpy
as
np
import
numpy
as
np
res
=
20000
res
=
20000
values
=
np
.
arang
e
(
0
,
10
,
res
)
values
=
np
.
linspac
e
(
0
,
10
,
res
)
stationary_timestep
=
estimate_stationarity
(
values
)
stationary_timestep
=
estimate_stationarity
(
values
)
assert
stationary_timestep
==
False
assert
stationary_timestep
==
False
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment