Skip to content
Snippets Groups Projects
Commit 277c9b31 authored by Malte Nyhuis's avatar Malte Nyhuis
Browse files

new stationarity definition and tests

parent 16e3e66e
No related branches found
No related tags found
No related merge requests found
......@@ -4,8 +4,10 @@ from scipy.integrate import simps
from ntrfc.utils.math.methods import autocorr, zero_crossings
def integralscales(mean, fluctations, timesteps):
def integralscales(signal, timesteparray):
mean = np.mean(signal)
fluctations = signal-mean
timesteps = timesteparray.copy()
autocorrelated = autocorr(fluctations)
# we are integrating from zero to zero-crossing in the autocorrelation, we need the time to begin with zeros
# probably the used datasample is not beginning with 0. therefore:
......@@ -16,6 +18,8 @@ def integralscales(mean, fluctations, timesteps):
print("no zero crossing found, using first minimal value (possibly last timestep). check data quality!")
acorr_zero_crossings = np.where(autocorrelated == min(autocorrelated))[0][0]
if all(np.isnan(autocorrelated)) or acorr_zero_crossings==0:
return 0, 0
integral_time_scale = simps(autocorrelated[:acorr_zero_crossings], timesteps[:acorr_zero_crossings])
integral_length_scale = integral_time_scale * mean
......
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import pyplot as plt
from ntrfc.postprocessing.timeseries.integral_scales import integralscales
from postprocessing.timeseries.integral_scales import integralscales
"""
this module is supposed to return a timesstamp from a time-series, that equals the time when the transient signal ends
Ries 2018
https://link.springer.com/content/pdf/10.1007/s00162-018-0474-0.pdf
numerical solutions have a transient behaviour at the initial process
it is assumed that this initial transient can be reproduced by a sine, tanh and a noise-function
with these functions given, we can analytically define where the transient process ends
"""
def test_transientcheck(verbose=False):
"""
toddo: this test must be rewritten in a more defined way
:param verbose:
:return:
"""
class signal_generator:
"""
this is a signal-generator
def chunks(somelist, numchunks):
def split(list_a, chunk_size):
for i in range(0, len(list_a), chunk_size):
yield list_a[i:i + chunk_size]
it can be rewritten to serve more precise data about the signal-stationarity
"""
# resolution
timeresolution = 100 # resolution of times
return list(split(somelist, numchunks))
transientlimit = 0.95
def __init__(self):
# some kind of factor for the duration of an abating signal
self.tanh_lasting = np.random.randint(0, 100)
self.sin_lasting = np.random.randint(0, 100)
def timeseries_analysis(timesteps, signal):
ts = timesteps.copy()
stationarity, timescale = check_isconstant(signal)
if stationarity and not timescale:
return stationarity, timescale
stationarity, timescale = check_stationarity(ts, signal)
return stationarity, timescale
self.sin_omega = np.random.rand()
self.tanh_stationary_ts = np.arctanh(self.transientlimit) * self.tanh_lasting
self.sin_stationary_ts = -self.sin_lasting * (1 + np.log(1 - self.transientlimit))
# as defined in Ries 2018, the signal must be at least two times as long as the transient process
if self.sin_stationary_ts > self.tanh_stationary_ts:
self.time = 2 * self.sin_stationary_ts
else:
self.time = 2 * self.tanh_stationary_ts
# time equals approx 300 timescales, adjust to approx 1000
self.time *= 4
# defining the used timesteps (unnesessary, the signal-length could be normed to 1!)
self.timesteps = np.arange(0, self.time, self.timeresolution ** -1)
# damping the sine with euler
abate = np.e ** (-self.timesteps * self.sin_lasting ** -1)
self.sin_abate = abate / max(abate)
# values for the 'stationarity'. this can be defined because the function is analytical. but is this correct?
self.sin_stationary = np.argmax(self.sin_abate < (1 - self.transientlimit))
self.tanh_stationarity = np.argmax(np.tanh(self.timesteps * self.tanh_lasting ** -1) > self.transientlimit)
def tanh_signal(self):
ans = np.tanh(self.timesteps * self.tanh_lasting ** -1)
return ans # * self.tanh_sign
def sin_signal(self):
sinus = np.sin(self.timesteps * self.sin_omega) * self.sin_abate
return sinus # * 0.5
def noise_signal(self):
mu, sigma = 0, np.random.rand() # mean and standard deviation
s = np.random.normal(mu, sigma, size=len(self.timesteps))
t = self.time
Dt = t / len(self.timesteps)
# dt is the timescale of the noisy signal --> emulated length scale!
dt = t / 1000
timescale = int(dt / Dt)
weights = np.repeat(1.0, timescale) / timescale
out = np.convolve(s, weights, 'same')
out /= max(out)
out += np.sin(self.timesteps * dt ** -1) * 0.25
out /= max(out) * 2
return out
def generate(self):
sinus = self.sin_signal()
tanh = self.tanh_signal()
rausch = self.noise_signal()
sin_stats = (-1 + self.sin_abate) * -1
tanh_stats = np.sinh(self.timesteps * self.tanh_lasting ** -1) / np.cosh(
self.timesteps * self.tanh_lasting ** -1)
signal = sinus + tanh + rausch
return sinus, tanh, rausch, signal, sin_stats, tanh_stats
def plot(self, sinus, tanh, rausch, signal, stat_sin, stat_tanh):
fig, axs = plt.subplots(6, 1)
axs[0].plot(np.arange(0, self.time, self.timeresolution ** -1), sinus, color="orange",
label="abating sine signal")
axs[0].axvline(self.sin_stationary_ts)
axs[1].plot(np.arange(0, self.time, self.timeresolution ** -1), stat_sin, color="orange",
label="stationarity sine signal")
axs[1].axvline(self.sin_stationary_ts)
axs[1].fill_between(np.arange(0, self.time, self.timeresolution ** -1), stat_sin, color="orange")
axs[2].plot(np.arange(0, self.time, self.timeresolution ** -1), tanh, color="blue", label="tanh signal")
axs[2].axvline(self.tanh_stationary_ts)
axs[3].plot(np.arange(0, self.time, self.timeresolution ** -1), stat_tanh, color="blue",
label="stationarity tanh signal")
axs[3].fill_between(np.arange(0, self.time, self.timeresolution ** -1), stat_tanh, color="blue")
axs[4].plot(np.arange(0, self.time, self.timeresolution ** -1), rausch, color="black", label="noise")
axs[5].plot(np.arange(0, self.time, self.timeresolution ** -1), signal, color="red", label="signal")
for a in axs:
a.legend(loc="upper right")
plt.show()
sig_gen = signal_generator()
sinus, tanh, rausch, signal, stat_sin, stat_tanh = sig_gen.generate()
if verbose:
sig_gen.plot(sinus, tanh, rausch, signal, stat_sin, stat_tanh)
stationary, stationary_time = transientcheck(signal, sig_gen.timesteps)
assert stationary, "signal is stationary by definition"
def transientcheck(signal, timesteps):
"""
:param signal: timeseries
:return: time_of_stationarity
"""
second_half_id = int(len(signal) / 2)
second_half_of_signal = np.copy(signal[second_half_id:])
second_half_mean = np.mean(second_half_of_signal)
second_half_of_signal_fluctations = second_half_of_signal-second_half_mean
second_half_timesteps = np.copy(timesteps[second_half_id:])
integral_time_scale, integral_length_scale = integralscales(second_half_mean, second_half_of_signal_fluctations, timesteps)
integrals_window = 30
time_window = integrals_window * integral_time_scale
windows = []
window_upperlimit = time_window
windows_rms = []
windows_mean = []
window_signal = []
window_std = []
for signal_at_time, time in zip(signal, timesteps):
if time >= window_upperlimit:
window_upperlimit += time_window
windows.append(np.array(window_signal))
window_std.append(np.std(window_signal))
windows_mean.append(np.mean(window_signal))
windows_rms.append(np.sqrt(np.sum(windows[-1] ** 2) / len(windows[-1])))
window_signal = []
else:
window_signal.append(signal_at_time)
eps_helper = (integral_time_scale / (second_half_timesteps[-1] - second_half_timesteps[0]))
# todo the next line was most likely corrupted and therefore it is replaced
# eps_time_mean = np.std(second_half_of_signal_fluctations) / second_half_mean * (2 *eps_helper) ** .5
eps_time_mean = np.std(signal) / np.mean(second_half_of_signal) * (2 *eps_helper)** .5
eps_time_rms = eps_helper ** .5
no_windows_mean = len(windows_mean)
no_windows_rms = len(windows_rms)
confidence_mean_high = second_half_mean * (1 + 1.96 * eps_time_mean)
confidence_mean_low = second_half_mean * (1 - 1.96 * eps_time_mean)
confidence_rms_high = np.mean(windows_rms) * (1 + 1.96 * eps_time_rms)
confidence_rms_low = np.mean(windows_rms) * (1 - 1.96 * eps_time_rms)
mean_in_ci_range = [True if (confidence_mean_high >= i >= confidence_mean_low) else False for i in windows_mean]
rms_in_ci_range = [True if (confidence_rms_high >= i >= confidence_rms_low) else False for i in windows_rms]
mean_stationarity_fraction = np.array(
[sum(mean_in_ci_range[pt:]) / (no_windows_mean - pt) for pt in range(len(mean_in_ci_range))])
rms_stationarity_fraction = np.array(
[sum(rms_in_ci_range[pt:]) / (no_windows_rms - pt) for pt in range(len(rms_in_ci_range))])
# todo: the following line is not working as there might be an error in statwindow_mean
statwindow_mean_stationarity = np.where(mean_stationarity_fraction > 0.95)[0]
statwindow_rms_stationarity = np.where(rms_stationarity_fraction > 0.95)[0]
statwindow_mean = (statwindow_mean_stationarity[0] if len(statwindow_mean_stationarity)>0 else None)
statwindow_rms = (statwindow_rms_stationarity[0] if len(statwindow_mean_stationarity)>0 else None)
fig, axs = plt.subplots(3, 1)
axs[0].plot(windows_mean,color="green")
axs[0].hlines(confidence_mean_high, xmin=0, xmax=no_windows_mean,color="green")
axs[0].hlines(confidence_mean_low, xmin=0, xmax=no_windows_mean,color="green")
axs[1].plot(windows_rms,color="blue")
axs[1].hlines(confidence_rms_high, xmin=0, xmax=no_windows_rms,color="blue")
axs[1].hlines(confidence_rms_low, xmin=0, xmax=no_windows_rms,color="blue")
axs[2].plot(timesteps,signal,color="orange")
axs[2].vlines(statwindow_rms*time_window*integral_time_scale,ymin=min(signal),ymax=max(signal),color="k")
axs[2].vlines(statwindow_mean*integral_time_scale,ymin=min(signal),ymax=max(signal),color="k")
plt.show()
converged = (True if (statwindow_rms>0 and statwindow_mean>0) else False)
converged_time = (max([statwindow_rms,statwindow_mean])*time_window*integral_time_scale if converged==True else False)
return converged, converged_time
def parsed_timeseries_analysis(timesteps, signal, resolvechunks=20, verbose=True):
checksigchunks = chunks(signal, int(len(signal) / resolvechunks))
checktimechunks = chunks(timesteps, int(len(timesteps) / resolvechunks))
min_chunk = int(resolvechunks / 2)
signal_type, stationarity, stationarity_timestep, timescale, lengthscale = check_signal_stationarity(resolvechunks, signal, timesteps)
scales = (timescale,lengthscale)
if stationarity==True:
plt.figure()
plt.plot(timesteps, signal, color="k" )
plt.vlines(stationarity_timestep, ymin=0, ymax=2, linewidth=4, color="k", linestyles="dashed")
plt.xlim(0, timesteps[-1])
plt.ylim(-10, 10)
plt.show() #
return stationarity, timescale, stationarity_timestep
for i in range(min_chunk, resolvechunks + 1):
csig = np.concatenate([*checksigchunks[resolvechunks - i:]])
ctime = np.concatenate([*checktimechunks[resolvechunks - i:]])
signal_type,newstationarity,newstationarity_timestep, newtimescale,newlengthscale = check_signal_stationarity(resolvechunks, csig, ctime)
if newstationarity:
newscales = (timescale, lengthscale)
stationarity=True
scales = newscales
stationarity_timestep = newstationarity_timestep
if not newstationarity:
# when no further stationarity found, return status
# when done, return last status
if verbose:
plt.figure()
plt.plot(timesteps, signal)
plt.plot(ctime, csig, color="black", linewidth=0.1)
plt.vlines(stationarity_timestep, ymin=-10, ymax=10, linewidth=4, color="k", linestyles="dashed")
plt.axvspan(stationarity_timestep, max(timesteps), facecolor='grey', alpha=0.5)
plt.xlim(0, timesteps[-1])
plt.ylim(-10, 10)
plt.show() #
return stationarity, scales, stationarity_timestep
plt.figure()
plt.plot(timesteps, signal)
plt.plot(ctime, csig, color="black", linewidth=4)
plt.vlines(stationarity_timestep, ymin=0, ymax=2, linewidth=4, color="k", linestyles="dashed")
plt.xlim(0, timesteps[-1])
plt.ylim(-10, 10)
plt.show() #
return stationarity, scales, stationarity_timestep
def check_signal_stationarity(resolvechunks, signal, timesteps, verbose = True):
checksigchunks = chunks(signal, int(len(signal) / resolvechunks))
checktimechunks = chunks(timesteps, int(len(timesteps) / resolvechunks))
# a new approach could be to check chunkwise the stationarity by logic.
# a constant has a constant mean but no trend and no variation
# a trend has a constant trend, but no mean and no variation
# a correlating signal has a time and length scale, a mean, a constant variation and autocorrelation
mean = np.mean(signal)
means = np.mean(checksigchunks, axis=1)
var = np.std(signal)
vars = np.std(checksigchunks, axis=1)
sigma = var**.5
tolerance = var
const_mean = np.allclose(mean, means,rtol=0.08)
const_val = np.allclose(mean, signal,rtol=0.08)
const_var = np.allclose(var,vars,rtol=3)
#
if const_mean and const_var:
timescale, lengthscale = integralscales(signal, timesteps)
timescales, lengthscales = zip(*[integralscales(s, t) for s, t in zip(checksigchunks, checktimechunks)])
if (timesteps[-1]-timesteps[0])/timescale<30:
print("warning")
else:
timescale, lengthscale = 0,0
timescales, lengthscales = np.zeros(resolvechunks),np.zeros(resolvechunks)
const_tscales = np.allclose(timescales, timescale,rtol=tolerance)
const_lscales = np.allclose(lengthscales, lengthscale,rtol=tolerance)
if const_mean == const_val == const_var == True:
# constant signal
signal_type = "constant"
stationarity = True
stationarity_timestep = timesteps[0]
elif const_mean == const_var == True and const_val == False:
# stationary signal
signal_type = "stationary structured"
stationarity = True
stationarity_timestep = timesteps[0]
elif const_mean == True and (const_var == True or const_val == True):
# stationary signal
signal_type = "stationary unstructured"
stationarity = True
stationarity_timestep = timesteps[0]
elif const_var == const_val == True and const_mean == False:
# nonstationary signal
signal_type = "nonstationary"
stationarity = False
stationarity_timestep = -1
elif const_var == True and const_val == False and const_mean == False:
# nonstationary signal
signal_type = "nonstationary"
stationarity = False
stationarity_timestep = -1
elif const_var == False and const_val == True and const_mean == False:
# nonstationary signal
signal_type = "nonstationary"
stationarity = False
stationarity_timestep = -1
elif const_var == False and const_val == False and const_mean == False:
# nonstationary signal
signal_type = "nonstationary"
stationarity = False
stationarity_timestep = -1
else:
# nonstationary signal
signal_type = "nonstationary"
stationarity = False
stationarity_timestep = -1
return signal_type, stationarity, stationarity_timestep, timescale, lengthscale
......@@ -9,3 +9,4 @@ PyYaml==6.0
pandas==1.3.3
tqdm==4.59.0
importlib_resources==5.4.0
statsmodel==0.13.2
......@@ -9,3 +9,4 @@ PyYaml==6.0
pandas==1.3.3
tqdm==4.59.0
importlib_resources==5.4.0
statsmodel==0.13.2
import numpy as np
from ntrfc.postprocessing.timeseries.stationary_signal_check import parsed_timeseries_analysis
def constant_signal(numtimesteps, time):
timesteps = np.linspace(0, 1, numtimesteps) * time
signal = np.ones(len(timesteps))
return timesteps, signal
def ramp_signal(numtimesteps, time):
timesteps = np.linspace(0, 1, numtimesteps) * time
signal = np.linspace(0, 1, numtimesteps) * time
return timesteps, signal
def ramptoconst_signal(numtimesteps, time):
timesteps = np.linspace(0, 1, numtimesteps) * time
signal = np.linspace(0, 2, numtimesteps) * time
signal = np.clip(signal, a_min=0, a_max=1)
return timesteps, signal
def sine_signal(numtimesteps, time):
timesteps = np.linspace(0, 1, numtimesteps) * time
omega = 1000
signal = np.sin(timesteps * omega) + 1
return timesteps, signal
def noise_signal(numtimesteps, time):
mu, sigma = 0, 0.2 # mean and standard deviation
s = np.random.normal(mu, sigma, size=numtimesteps)
Dt = time / numtimesteps
timesteps = np.linspace(0, time, numtimesteps)
# dt is the timescale of the noisy signal --> emulated length scale!
dt = time / 1000
timescale = int(dt / Dt)
weights = np.repeat(1.0, timescale) / timescale
signal = np.convolve(s, weights, 'same') + 1
return timesteps, signal
def convergentsine_signal(numtimesteps, time):
omega = 1000
timesteps = np.linspace(0, time, numtimesteps)
signal = -3 * np.sin(timesteps*omega) / timesteps / 10
signal += 1
signal = np.nan_to_num(signal,0)
return timesteps, signal
def tanh_signal(numtimesteps, time):
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/3
tanhsignal = np.tanh(timesteps * stationary ** -1)
return timesteps, tanhsignal
def tanh_sin_signal(numtimesteps, time):
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/3
tanhsignal = np.tanh(timesteps * stationary ** -1)
timesteps = np.linspace(0, 1, numtimesteps) * time
omega = 10000 * np.random.rand()
signal = np.sin(timesteps * omega) + 1
return timesteps, signal*0.3+tanhsignal
def tanh_sin_noise_signal(numtimesteps, time):
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/4
tanhsignal = np.tanh(timesteps * stationary ** -1)
timesteps = np.linspace(0, 1, numtimesteps) * time
omega = 1000
signal = np.sin(timesteps * omega) + 1
mu, sigma = 0, 0.2 # mean and standard deviation
s = np.random.normal(mu, sigma, size=numtimesteps)
Dt = time / numtimesteps
# dt is the timescale of the noisy signal --> emulated length scale!
dt = time / 1000
timescale = int(dt / Dt)
weights = np.repeat(1.0, timescale) / timescale
noise = np.convolve(s, weights, 'same') + 1
omega = 1000
timesteps = np.linspace(0, time, numtimesteps)
nsignal = -3 * np.sin(timesteps*omega) / timesteps / 10
nsignal += 1
nsignal = np.nan_to_num(signal,0)
return timesteps, signal*0.1+tanhsignal*2+noise*0.1+signal*0.1+nsignal*0.1
def sine_abate_signal(numtimesteps, time):
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/4
abate = np.e ** (-timesteps * stationary ** -1)
omega = 1000
sinesignal = abate / max(abate) * np.sin(timesteps * omega)*0.4+1
return timesteps, sinesignal
def complex_signal(numtimesteps, time):
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/4
tanhsignal = np.tanh(timesteps * stationary ** -1)
timesteps = np.linspace(0, 1, numtimesteps) * time
omega = 1000
signal = np.sin(timesteps * omega) + 1
mu, sigma = 0, 0.2 # mean and standard deviation
s = np.random.normal(mu, sigma, size=numtimesteps)
Dt = time / numtimesteps
# dt is the timescale of the noisy signal --> emulated length scale!
dt = time / 1000
timescale = int(dt / Dt)
weights = np.repeat(1.0, timescale) / timescale
noise = np.convolve(s, weights, 'same') + 1
timesteps = np.linspace(0, time, numtimesteps)
stationary = time/4
abate = np.e ** (-timesteps * stationary ** -1)
omega = 1000
sinesignal = abate / max(abate) * np.sin(timesteps * omega)*0.4+1
omega = 1000
timesteps = np.linspace(0, time, numtimesteps)
cssignal = -3 * np.sin(timesteps*omega) / timesteps / 10
cssignal += 1
cssignal = np.nan_to_num(signal,0)
return timesteps, signal*0.1+tanhsignal*2+noise*0.1+sinesignal*0.1+0.2*cssignal
def test_analize_stationarity(verbose=True):
tight_tolerance = 1e-2
loose_tolerance = 1
signals = {"constant": constant_signal(1000, 1),
"ramp": ramp_signal(1000, 1),
"rampconst": ramptoconst_signal(10000, 1),
"sine": sine_signal(10000, 1),
"noise": noise_signal(10000, 1),
"convergentsine_signal": convergentsine_signal(10000, 10),
"tanh":tanh_signal(10000,10),
"tanh_sin":tanh_sin_signal(10000,10),
"tanh_sin_noise": tanh_sin_noise_signal(10000, 2.8),
"sine_abate":sine_abate_signal(40000, 100),
"complex":complex_signal(40000,60)
}
expections = {"constant": (True, (0, 0), 0),
"ramp": (False, (0, 0), -1),
"rampconst": (True, (0, 0), 0.5),
"sine": (True, (0.0005, 0.0005), 0),
"noise": (True, (0.0005, 0.0005), 0),
"convergentsine_signal": (True, (0.0005, 0.0005), 1),
"tanh":(True,(0,0),5),
"tanh_sin":(True,(0.0005,0.0005),4.5),
"tanh_sin_noise":(True,(0.0005,0.0005),0.98),
"sine_abate":(True,(0.00025,0.00025),45),
"complex":(True,(0.00025,0.0025),21)
}
for name, sig in signals.items():
stationarity, scales, stationary_ts = parsed_timeseries_analysis(*sig, resolvechunks=20, verbose=verbose)
exp_stationarity, exp_scales, exp_stationarity_ts = expections[name]
assert stationarity == exp_stationarity, f"{name} is expected to be stationary -> {exp_stationarity}"
assert np.allclose(scales, exp_scales, rtol=loose_tolerance), f"{name} should have a timescale"
assert np.isclose(stationary_ts, exp_stationarity_ts,
rtol=tight_tolerance), f"{name} is stationary at {exp_stationarity_ts} instead of computed {stationary_ts}"
print(f"successfully tested {name}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment