Skip to content
Snippets Groups Projects
Commit 8d2a1af4 authored by Malte Nyhuis's avatar Malte Nyhuis
Browse files

add stationaritycheck-function for timeseries

parent 1e6be92d
No related branches found
No related tags found
1 merge request!12Stationaritycheck
import numpy as np
import matplotlib.pyplot as plt
from ntrfc.timeseries.integral_scales import integralscales
from dataclasses import dataclass
def stationarity_uncertainties(timesteps, values, verbose=False):
'''
:param timesteps: 1D np.array()
:param values: 1D np.array()
:param verbose: bool
:return: stationary_timestep ,mean_value , uncertainty
'''
assert len(timesteps)==len(values)
numts = len(timesteps)
mins = 8
maxs = 100
idealsplits = int(numts/256)
maxsplits = mins if idealsplits < mins else maxs if idealsplits > maxs else idealsplits
check_splits = range(mins,maxsplits)
doinvestigate = []
for split in check_splits:
timesplits = np.array_split(timesteps,split)
valsplit = np.array_split(values,split)
for cs,(ts,vs) in enumerate(zip(timesplits,valsplit)):
# https://www.statisticshowto.com/choose-bin-sizes-statistics/
numbins = int(1+3.22*np.log(len(ts)))
_, histogram_bins = np.histogram(values,bins=numbins)
mean_all = np.mean(vs)
var_all = np.var(vs)
autocorr_all,lengthscal_all = integralscales(vs,ts)
hist_all = np.histogram(vs,bins=histogram_bins,density=True)
investigate = investigation(mean_all,var_all,autocorr_all,hist_all,cs ,timesplits,valsplit)
doinvestigate.append(investigate)
for invest in doinvestigate:
invest.compare()
stationary_guesses = [i for i in doinvestigate if i.stationary==True]
best_guess = np.argmin([i.stationary_time for i in stationary_guesses])
return stationary_guesses[best_guess].stationary_time
@dataclass
class investigation:
mean_all: float
var_all: float
autocorr_all: float
hist_all: np.histogram
checkssplit: int
timesplits: np.array
valsplits: np.array
def plot_tobeinvestigated(self):
plt.figure()
for ts,vs in zip(self.timesplits,self.valsplits):
plt.plot(ts,vs)
plt.vlines(ts[-1],ymin=np.min(self.valsplits[0]),ymax=np.max(self.valsplits[0]))
if self.stationary_time:
plt.axvline(self.stationary_time)
plt.show()
def compare(self):
means = []
varis = []
acrrs = []
hists = []
valid_timesplits=self.timesplits[:]
del valid_timesplits[self.checkssplit]
valid_valsplits=self.valsplits[:]
del valid_valsplits[self.checkssplit]
for cs ,(ts,vs) in enumerate(zip(valid_timesplits,valid_valsplits)):
mean_all = np.mean(vs)
var_all = np.var(vs)
autocorr_all, lengthscal_all= integralscales(vs,ts)
hist_all = np.histogram(vs,bins=self.hist_all[1],density=True)
means.append(mean_all)
varis.append(var_all)
acrrs.append(autocorr_all)
hists.append(hist_all)
mean_reldiffs = []
var_reldiffs = []
accr_reldiffs = []
hist_reldiffs = []
for mean, vari, accr, hist in zip(means, varis,acrrs,hists):
mean_reldiffs.append(reldiff(self.mean_all,mean))
var_reldiffs.append(reldiff(self.var_all,vari))
accr_reldiffs.append(reldiff(self.autocorr_all,accr))
#https://mpatacchiola.github.io/blog/2016/11/12/the-simplest-classifier-histogram-intersection.html
histdiff =[1-return_intersection( _a, _b) for _a, _b in zip(self.hist_all[0], hist[0])]
meanhistdiff = np.nanmean(histdiff)
hist_reldiffs.append(meanhistdiff)
num_splits = len(self.timesplits)
self.mean_rdmeans = [np.mean(mean_reldiffs[i:]) for i in range(num_splits-1)]
self.var_rdmeans = [np.mean(var_reldiffs[i:]) for i in range(num_splits-1)]
self.accr_rdmeans = [np.mean(accr_reldiffs[i:]) for i in range(num_splits-1)]
self.hist_rdmeans = [np.mean(hist_reldiffs[i:]) for i in range(num_splits-1)]
if 0.05>np.nanmin(self.mean_rdmeans) and 0.05>np.nanmin(self.var_rdmeans) and 0.05>np.nanmin(self.accr_rdmeans) and 0.05>np.nanmin(self.hist_rdmeans):
self.stationary = True
self.scores = [np.nanmean([self.mean_rdmeans[i],self.var_rdmeans[i],self.accr_rdmeans[i],self.hist_rdmeans[i]]) for i in range(num_splits-1)]
best = np.nanargmin(self.scores)
self.score = self.scores[best]
self.stationary_time=self.timesplits[best][0]
else:
self.stationary = False
def reldiff(a,b):
#https://en.wikipedia.org/wiki/Relative_change_and_difference
if a==b:
return 0
else:
return abs(abs(a)-abs(b))/((abs(a)+abs(b))/2)
def return_intersection(hist_1, hist_2):
minima = np.minimum(hist_1, hist_2)
intersection = np.true_divide(np.sum(minima), np.sum(hist_2))
return intersection
import os
import pytest
ON_CI = 'CI' in os.environ
def test_stationarity_uncertainties_stationarysine(verbose=False):
from ntrfc.timeseries.uncertainty import stationarity_uncertainties
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
def signalgen_sine(amplitude, frequency, mean, time):
frequency_resolution = 512
tau = frequency ** -1
step = tau / frequency_resolution
times = np.arange(0, time, step)
values = amplitude * np.sin(frequency * (2 * np.pi) * times) + mean
return times, values
test_amplitudes = [1] #1,10
test_frequencies = [1]#1,10
test_times = [10] #10, 60
test_mean = [-1]#-1,0,1
maxperiods = -1
minperiods = np.inf
test_configs = list(product(test_amplitudes, test_frequencies, test_times, test_mean))
for amplitude, frequency, time, mean in test_configs:
equals_periods = time / frequency ** -1
if equals_periods > maxperiods:
maxperiods = int(equals_periods)
if minperiods > equals_periods:
minperiods = int(equals_periods)
timesteps, values = signalgen_sine(amplitude=amplitude, frequency=frequency, mean=mean, time=time)
stationary_timestep = stationarity_uncertainties(timesteps, values)
analytic_stationary_limit = 0.0
assert stationary_timestep == analytic_stationary_limit, "computation failed"
if verbose:
plt.figure()
plt.plot(timesteps,values)
plt.axvline(stationary_timestep)
def test_stationarity_uncertainties_abatingsine(verbose=False):
from ntrfc.timeseries.uncertainty import stationarity_uncertainties
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
def signalgen_abatingsine(amplitude, frequency, mean, abate, time):
resolution = 2048
step = (resolution * frequency ** -1) ** -1
times = np.arange(0, time, step)
values = amplitude * np.sin(frequency * (2 * np.pi) * times) + mean + np.e ** -(times * abate)
return times, values
test_amplitudes = [0.2]
test_frequencies = [10]
test_times = [20]
test_mean = [-1]
test_abate = [1, 10]
test_configs = list(product(test_amplitudes, test_frequencies, test_times, test_mean, test_abate))
for amplitude, frequency, time, mean, abate in test_configs:
timesteps, values = signalgen_abatingsine(amplitude=amplitude, frequency=frequency, mean=mean, time=time,
abate=abate)
stationary_timestep = stationarity_uncertainties(timesteps, values)
well_computed_stationarity_limit = -np.log(0.05) / abate
if verbose:
plt.figure()
plt.plot(timesteps,values)
plt.axvline(stationary_timestep)
plt.show()
assert stationary_timestep >well_computed_stationarity_limit , "computation failed"
def test_stationarity_uncertainties_abatingsinenoise(verbose=False):
from ntrfc.timeseries.uncertainty import stationarity_uncertainties
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
def signalgen_abatingsine(amplitude,noiseamplitude, frequency, mean, abate, time):
resolution = 2048
step = (resolution * frequency ** -1) ** -1
times = np.arange(0, time, step)
noise = np.random.normal(-1,1,len(times))*noiseamplitude
values = amplitude * np.sin(frequency * (2 * np.pi) * times) + mean + np.e ** -(times * abate)+noise
return times, values
test_amplitudes = [2]
test_noiseamplitude = [0.1]
test_frequencies = [6]
test_times = [20]
test_mean = [-1]
test_abate = [0.5]
test_configs = list(product(test_amplitudes,test_noiseamplitude, test_frequencies, test_times, test_mean, test_abate))
for amplitude, noiseamplitude, frequency, time, mean, abate in test_configs:
timesteps, values = signalgen_abatingsine(amplitude=amplitude, noiseamplitude=noiseamplitude,frequency=frequency, mean=mean, time=time,
abate=abate)
stationary_timestep = stationarity_uncertainties(timesteps, values)
well_computed_stationarity_limit = -np.log(0.05) / abate
if verbose:
plt.figure()
plt.plot(timesteps,values)
plt.axvline(stationary_timestep)
plt.show()
assert 3*well_computed_stationarity_limit > stationary_timestep > well_computed_stationarity_limit , "computation failed"
@pytest.mark.skipif(ON_CI, reason="currently cant render on ci-systems")
def test_stationarity_uncertainties_abating(verbose=False):
from ntrfc.timeseries.uncertainty import stationarity_uncertainties
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
def signalgen_abatingsine(noiseamplitude, mean, abate, time):
resolution = 30000
step = (time/resolution )
times = np.arange(0, time, step)
noise = np.random.normal(-1,1,len(times))*noiseamplitude
values = mean + np.e ** -(times * abate)+noise
return times, values
test_noiseamplitude = [0.01]
test_times = [10]
test_mean = [-1]
test_abate = [3]
test_configs = list(product(test_noiseamplitude, test_times, test_mean, test_abate))
for noiseamplitude, time, mean, abate in test_configs:
timesteps, values = signalgen_abatingsine(noiseamplitude=noiseamplitude, mean=mean, time=time,
abate=abate)
stationary_timestep = stationarity_uncertainties(timesteps, values)
well_computed_stationarity_limit = -np.log(0.05) / abate
if verbose:
plt.figure()
plt.plot(timesteps,values)
plt.axvline(stationary_timestep)
plt.show()
assert 3*well_computed_stationarity_limit > stationary_timestep >well_computed_stationarity_limit , "computation failed"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment