Feature engineering for time series data using Numba
Given a sequence of measurements values: np.ndarray
and observation times times: np.ndarray
, we want to engineer features for a machine learning model that captures the temporal trends and statistics over different temporal windows. Our data is irregularly sampled and have missing data.
Features representing the magnitude, dispersion, direction of change, and temporal trends are derived. Every time point is represented with 4 categories of features:
- Magnitude of the most recent observation within the last 6h for vital signs and 24h for laboratory measurements.
- Dispersion measured as the range over a short and long-time window.
- Direction of change (increasing, decreasing, no change) over a short and long-time window.
- Exponential moving averages (EMA) with varying decay rates that specify how the much impact each past observation has on the current mean. EMA features were calculated on the forward filled magnitudes and using an EMA algorithm specifically for irregularly sampled time series.
The engineered features include:
dt
: time elapsed since the measurement was madeval
: most recent measurement, measurements are forward filled up to a maximum duration aftersrng
: range as \(\frac{x_{max}-x_{min}}{x_{max}+x_{min}}100\) over a short windowssgn
: sign of the change (-1 or +1) between the first and last measurement in a short window, windows without measurements are filled with 0lrng
: range as (max-min)/(max+min) * 100 over a long windowlsgn
: sign of the change (-1 or +1) between the first and last measurement in a long window, windows without measurements are filled with 0sema
: slow exponential moving average, calculated after forward fillingfema
: fast exponential moving average, calculated after forward filling
We also need to define the following settings for every variable:
forward_fill_duration
: duration in minutes to forward fill a missing variableshort_rolling_window_size
: short rolling window size in minuteslong_rolling_window_size
: long rolling window size in minutesslow_ema_tau
: decay rate for slow exponential moving average in minutesfast_ema_tau
: decay rate for fast exponential moving average in minutes
Some derived features have no missing data, including dt
, ssgn
, srng
, lsgn
, and lrng
. Other variables like val
, sema
, fema
have missing values encoded as NaN. Missing data is forward filled up to forward_fill_duration
and all missing values after the forward filling time are NaN.
from numba import njit, prange
@njit()
def ema(times, values, tau):
"""Exponential moving average for irregularly sampled time series.
Units for ``times`` and ``tau`` should be in hours.
Args:
times (np.ndarray): 1D array of measurement times in hours
values (np.ndarray): 1D array of measurements
tau (float): time decay in hours
"""
= len(values)
n = np.empty(n, dtype=np.float64) * np.nan
ret 0] = values[0]
ret[= 0
last_i for i in range(1, n):
if np.isnan(times[i]) | np.isnan(values[i]):
continue
= (times[i] - times[last_i]) / tau
alpha = np.exp(-alpha)
w if alpha > 1e-6:
= (1 - w) / alpha
w2 else:
# use Taylor expansion for numerical stability
= 1 - (alpha / 2) + (alpha * alpha / 6) - (alpha * alpha * alpha / 24)
w2 = (ret[last_i] * w) + (values[i] * (1 - w2)) + (values[last_i] * (w2 - w))
ret[i] = i
last_i return ret
@njit()
def ffill(times, values):
"""Forward fill an array of values and times.
Times indicate the time of last observation.
Args:
arr (np.ndarray): array of values with nan
times (np.ndarray): array of times
Returns:
2D array with forward filled times (index 0)
and values (index 1).
"""
= values.copy()
out_values = times.copy()
out_times = values.shape[0]
n = np.zeros((n, 2), dtype=np.float64)
out = np.nan
lastval = np.nan
lasttime for row_idx in range(n):
if np.isfinite(values[row_idx]):
= values[row_idx]
lastval = times[row_idx]
lasttime = lastval
out_values[row_idx] = lasttime
out_times[row_idx] 0] = out_times
out[:, 1] = out_values
out[:, return out
@njit()
def sliding_windows(times, window_size):
= times - times.reshape(-1, 1)
tdiff = (tdiff >= 0) & (tdiff <= window_size)
twindows return twindows
@njit()
def feature_engineering(
times,
values,
forward_fill_duration,
short_rolling_window_size,
long_rolling_window_size,
slow_ema_tau,
fast_ema_tau,
):"""Feature engineering.
1. Age of the observation
2. Most recent forward-filled measurement
3. Dispersion as (max-min)/(max+min) over short window
4. Sign of change between first and last value in short window
5. Dispersion as (max-min)/(max+min) over long window
6. Sign of change between first and last value in long window
7. slow EMA
8. fast EMA
Forward filling replaces nan values up to a maximum duration given by
`forward_fill_duration`. Any remaining missing values should be imputed
with the median or by sampling from a standard reference range.
Args:
values (np.ndarry): 2D array of observations
times (np.ndarry): 1D array of observation times
forward_fill_duration (float): duration to forward fill missing values
short_rolling_window_size (float): observation duration for rolling
statistics with a short lookback window
long_rolling_window_size (float): observation duration for rolling
statistics with a long lookback window
slow_ema_tau (float): slow decay rate weights more of the past
fast_ema_tau (float): fast decay rate weights more of the present
Returns:
2D matrix with shape [n_samples, 8], where the features are:
suffixes = ['dt', 'val', 'short_rng', 'short_sgn', 'long_rng', 'long_sgn', 'slow_ema', 'fast_ema']
"""
= 0
DT_IX = 1
VAL_IX = 2
SRNG_IX = 3
SSGN_IX = 4
LRNG_IX = 5
LSGN_IX = 6
SEMA_IX = 7
FEMA_IX
= values.shape[0]
n = np.zeros((n, 8)) * np.nan
derived
= np.isnan(values)
missing
# return a median imputed array if there are no observations
if np.all(missing):
= 0.
derived[:, SSGN_IX] = 0.
derived[:, LSGN_IX] = 0.
derived[:, SRNG_IX] = 0.
derived[:, LRNG_IX] return derived
# indicate samples within a fixed window for each sample
= sliding_windows(times, short_rolling_window_size)
short_windows = short_windows & ~missing.reshape(-1,1)
short_windows = sliding_windows(times, long_rolling_window_size)
long_windows = long_windows & ~missing.reshape(-1,1)
long_windows
# forward fill
= ffill(times, values)
xfill = times - xfill[:,0]
dt
# 0. Age of the measurement
= dt
derived[:, DT_IX]
# 1. last measured value
= xfill[:, 1]
derived[:, VAL_IX] = ~np.isnan(dt)
expired = dt[expired] >= forward_fill_duration
expired[expired] = np.nan
derived[expired, VAL_IX]
# rolling statistics
for t in range(n):
# samples in this window
= short_windows[:, t]
short_inds = long_windows[:, t]
long_inds = values[short_inds]
short_vals = values[long_inds]
long_vals
if short_vals.size > 0:
# 2. rolling dispersion
= np.max(short_vals)
high = np.min(short_vals)
low = high + low
total if total > 1e-6:
= (high - low) / (total + 1e-6) * 100
derived[t, SRNG_IX] else:
= 0.
derived[t, SRNG_IX]
# 3. sign of change between last and first value
if short_vals.size >= 2:
= np.sign(short_vals[-1] - short_vals[0])
derived[t, SSGN_IX] else:
= 0.
derived[t, SSGN_IX] else:
= 0.
derived[t,SRNG_IX] = 0.
derived[t,SSGN_IX]
if long_vals.size > 0:
# 4. rolling dispersion
= np.max(long_vals)
high = np.min(long_vals)
low = high + low
total if total > 1e-6:
= (high - low) / (total + 1e-6) * 100
derived[t, LRNG_IX] else:
= 0.
derived[t, LRNG_IX]
# 5. sign of change between last and first value
if long_vals.size >= 2:
= np.sign(long_vals[-1] - long_vals[0])
derived[t, LSGN_IX] else:
= 0.
derived[t, LSGN_IX] else:
= 0.
derived[t, LRNG_IX] = 0.
derived[t, LSGN_IX]
# exponential moving average using the forward filled data
= np.empty(n, dtype=np.float64) * np.nan
ema_slow = np.empty(n, dtype=np.float64) * np.nan
ema_fast = np.isfinite(derived[:, 1])
m = derived[m, 1]
vals if vals.size > 0:
= ema(times[m], derived[m, 1], slow_ema_tau)
ema_slow = ema(times[m], derived[m, 1], fast_ema_tau)
ema_fast # 6. slow EMA
= ema_slow
derived[m, SEMA_IX] # 7. fast EMA
= ema_fast
derived[m, FEMA_IX]
return derived