Source code for torch_ecg.augmenters.baseline_wander

"""Add baseline wander composed of sinusoidal and Gaussian noise to the ECGs."""

import multiprocessing as mp
from itertools import repeat
from random import randint
from typing import Any, List, Optional, Sequence, Tuple, Union

import numpy as np
import torch
from numpy.typing import NDArray
from torch import Tensor

from ..cfg import DEFAULTS
from ..utils.utils_signal import get_ampl
from .base import Augmenter
from .registry import AUGMENTERS

__all__ = [
    "BaselineWanderAugmenter",
]


[docs] @AUGMENTERS.register(name="baseline_wander") @AUGMENTERS.register() class BaselineWanderAugmenter(Augmenter): """Generate baseline wander composed of sinusoidal and Gaussian noise. Parameters ---------- fs : int Sampling frequency of the ECGs to be augmented bw_fs : numpy.ndarray, optional Frequencies of the sinusoidal noises, of shape ``(n,)``, defaults to ``[0.33, 0.1, 0.05, 0.01]``. ampl_ratio : numpy.ndarray, optional Candidate ratios of noise amplitdes compared to the original ECGs for each `fs`, of shape ``(m, n)``, defaults to .. code-block:: python np.array( [ [0.01, 0.01, 0.02, 0.03], # low [0.01, 0.02, 0.04, 0.05], # low [0.1, 0.06, 0.04, 0.02], # low [0.02, 0.04, 0.07, 0.1], # low [0.05, 0.1, 0.16, 0.25], # medium [0.1, 0.15, 0.25, 0.3], # high [0.25, 0.25, 0.3, 0.35], # extremely high ] ) gaussian : numpy.ndarray, optional Candidate mean and std of the Gaussian noises, of shape ``(k, 2)``, defaults to .. code-block:: python np.array( [ # mean and std, in terms of ratio [0.0, 0.001], [0.0, 0.003], [0.0, 0.01], ] ) prob : float, default 0.5 Probability of performing the augmentation. inplace : bool, default True If True, ECG signal tensors will be modified inplace. kwargs : dict, optional Additional keyword arguments. Examples -------- .. code-block:: python blw = BaselineWanderAugmenter(300, prob=0.7) sig = torch.randn(32, 12, 5000) label = torch.ones((32, 20)) sig, _ = blw(sig, label) """ __name__ = "BaselineWanderAugmenter" def __init__( self, fs: int, bw_fs: Optional[NDArray] = None, ampl_ratio: Optional[NDArray] = None, gaussian: Optional[NDArray] = None, prob: float = 0.5, inplace: bool = True, **kwargs: Any, ) -> None: super().__init__() self.fs = fs self.bw_fs = bw_fs if bw_fs is not None else np.array([0.33, 0.1, 0.05, 0.01]) self.prob = prob assert 0 <= self.prob <= 1, "Probability must be between 0 and 1" self.ampl_ratio = ( ampl_ratio if ampl_ratio is not None else np.array( [ # default ampl_ratio [0.01, 0.01, 0.02, 0.03], # low [0.01, 0.02, 0.04, 0.05], # low [0.1, 0.06, 0.04, 0.02], # low [0.02, 0.04, 0.07, 0.1], # low [0.05, 0.1, 0.16, 0.25], # medium [0.1, 0.15, 0.25, 0.3], # high [0.25, 0.25, 0.3, 0.35], # extremely high ] ) ) if self.prob > 0: self.ampl_ratio = np.concatenate( ( np.zeros( ( int((1 - self.prob) * self.ampl_ratio.shape[0] / self.prob), self.ampl_ratio.shape[1], ) ), self.ampl_ratio, ) ) self.gaussian = ( gaussian if gaussian is not None else np.array( [ # default gaussian, mean and std, in terms of ratio [0.0, 0.001], [0.0, 0.003], [0.0, 0.01], ] ) ) if self.prob > 0: self.gaussian = np.concatenate( ( np.zeros( ( int((1 - self.prob) * self.gaussian.shape[0] / self.prob), self.gaussian.shape[1], ) ), self.gaussian, ) ) assert self.bw_fs.ndim == 1 and self.ampl_ratio.ndim == 2 and self.bw_fs.shape[0] == self.ampl_ratio.shape[1] self.inplace = inplace self._n_bw_choices = len(self.ampl_ratio) self._n_gn_choices = len(self.gaussian)
[docs] def forward( self, sig: Tensor, label: Optional[Tensor], *extra_tensors: Sequence[Tensor], **kwargs: Any ) -> Tuple[Tensor, ...]: """Forward function of the :class:`BaselineWanderAugmenter`. Parameters ---------- sig : torch.Tensor Batched ECGs to be augmented, of shape ``(batch, lead, siglen)``. label : torch.Tensor, optional Batched label tensor of the ECGs. Not used, but kept for consistency with other augmenters. extra_tensors : Sequence[torch.Tensor], optional, Not used, but kept for consistency with other augmenters. **kwargs : dict, optional Not used, but kept for consistency with other augmenters. Returns ------- sig : torch.Tensor The augmented ECGs. label : torch.Tensor Label tensor of the augmented ECGs, unchanged. extra_tensors : Sequence[torch.Tensor], optional Unchanged extra tensors. """ if not self.inplace: sig = sig.clone() if self.prob > 0: sig.add_(gen_baseline_wander(sig, self.fs, self.bw_fs, self.ampl_ratio, self.gaussian)) # type: ignore return (sig, label, *extra_tensors) # type: ignore
[docs] def extra_repr_keys(self) -> List[str]: return [ "fs", "bw_fs", "prob", "inplace", ] + super().extra_repr_keys()
def _get_ampl(sig: Tensor, fs: int) -> Tensor: """Get the amplitude of each lead. Parameters ---------- sig : torch.Tensor Batched ECG signal tensor, of shape ``(batch, lead, siglen)``. fs : int Sampling frequency of the ECGs. Returns ------- ampl : torch.Tensor Amplitude of each lead, of shape ``(batch, lead, 1)``. """ with mp.Pool(processes=max(1, mp.cpu_count() - 2)) as pool: ampl = pool.starmap( get_ampl, iterable=[(sig[i].cpu().numpy(), fs) for i in range(sig.shape[0])], ) ampl = torch.as_tensor(np.array(ampl), dtype=sig.dtype, device=sig.device).unsqueeze(-1) return ampl def _gen_gaussian_noise(siglen: int, mean: Union[float, int] = 0, std: Union[float, int] = 0) -> NDArray: """Generate 1d Gaussian noise of given length, mean, and standard deviation. Parameters ---------- siglen : int Length of the noise signal. mean : float or int, default 0 Mean value of the noise. std : float or int, default 0 Standard deviation of the noise. Returns ------- gn : numpy.ndarray Gaussian noise of given length, mean, and standard deviation. """ gn = DEFAULTS.RNG.normal(mean, std, siglen) return gn def _gen_sinusoidal_noise( siglen: int, start_phase: Union[float, int], end_phase: Union[float, int], amplitude: Union[float, int], amplitude_mean: Union[float, int] = 0, amplitude_std: Union[float, int] = 0, ) -> NDArray: """Generate 1d sinusoidal noise of given length, amplitude, start phase, and end phase. Parameters ---------- siglen : int Length of the (noise) signal. start_phase : float or int Start phase, with units in degrees. end_phase : float or int End phase, with units in degrees. amplitude : float or int Amplitude of the sinusoidal curve. amplitude_mean : float or int Mean amplitude of an extra Gaussian noise. amplitude_std : float or int, default 0 Standard deviation of an extra Gaussian noise Returns ------- sn : numpy.ndarray, Sinusoidal noise of given length, amplitude, start phase, and end phase. """ sn = np.linspace(start_phase, end_phase, siglen) sn = amplitude * np.sin(np.pi * sn / 180) sn += _gen_gaussian_noise(siglen, amplitude_mean, amplitude_std) return sn def _gen_baseline_wander( siglen: int, fs: Union[float, int], bw_fs: Union[float, int, Sequence[Union[float, int]]], amplitude: Union[float, int, Sequence[Union[float, int]]], amplitude_gaussian: Sequence[Union[float, int]] = [0, 0], ) -> NDArray: """Generate 1d baseline wander of given length, amplitude, and frequency. Parameters ---------- siglen : int Length of the (noise) signal. fs : float or int Sampling frequency of the original signal. bw_fs : float or int, or list of float or int Frequency (Frequencies) of the baseline wander. amplitude : float or int, or list of float or int Amplitude of the baseline wander (corr. to each frequency band). amplitude_gaussian : Tuple[float or int], default [0,0] 2-tuple of :class:`~float or int`. Mean and std of amplitude of an extra Gaussian noise. Returns ------- bw : numpy.ndarray Baseline wander of given length, amplitude, frequency. Example ------- >>> _gen_baseline_wander(4000, 400, [0.4,0.1,0.05], [0.1,0.2,0.4]) """ bw = _gen_gaussian_noise(siglen, amplitude_gaussian[0], amplitude_gaussian[1]) if isinstance(bw_fs, (int, float)): _bw_fs = [bw_fs] else: _bw_fs = bw_fs if isinstance(amplitude, (int, float)): _amplitude = list(repeat(amplitude, len(_bw_fs))) else: _amplitude = amplitude assert len(_bw_fs) == len(_amplitude) duration = siglen / fs for bf, a in zip(_bw_fs, _amplitude): start_phase = DEFAULTS.RNG_randint(0, 360) end_phase = duration * bf * 360 + start_phase bw += _gen_sinusoidal_noise(siglen, start_phase, end_phase, a, 0, 0) return bw def gen_baseline_wander( sig: Tensor, fs: Union[float, int], bw_fs: Union[float, int, Sequence[Union[float, int]]], ampl_ratio: NDArray, gaussian: NDArray, ) -> Tensor: """Generate 1d baseline wander of given length, amplitude, and frequency. Parameters ---------- sig : torch.Tensor Batched ECGs to be augmented, of shape (batch, lead, siglen). fs : float or int Sampling frequency of the original signal. bw_fs : float or int, or list of float or int, Frequency (Frequencies) of the baseline wander. ampl_ratio : numpy.ndarray, optional Candidate ratios of noise amplitdes compared to the original ECGs for each `fs`, of shape ``(m, n)``. gaussian : numpy.ndarray, optional Candidate mean and std of the Gaussian noises, of shape ``(k, 2)``. Returns ------- bw : torch.Tensor Baseline wander of given length, amplitude, frequency, of shape ``(batch, lead, siglen)``. """ batch, lead, siglen = sig.shape sig_ampl = _get_ampl(sig, fs) _n_bw_choices = len(ampl_ratio) _n_gn_choices = len(gaussian) with mp.Pool(processes=max(1, mp.cpu_count() - 2)) as pool: bw = pool.starmap( _gen_baseline_wander, iterable=[ ( siglen, fs, bw_fs, ampl_ratio[randint(0, _n_bw_choices - 1)], gaussian[randint(0, _n_gn_choices - 1)], ) for i in range(sig.shape[0]) for j in range(sig.shape[1]) ], ) bw = torch.as_tensor(np.array(bw), dtype=sig.dtype, device=sig.device).reshape(batch, lead, siglen) return bw