From 090a511b59f9594eea991a01556cbc00f384aa7a Mon Sep 17 00:00:00 2001 From: Q Xu Date: Sat, 4 Jan 2025 16:27:22 +0800 Subject: [PATCH] generate broadband signal in time domain --- doa_py/arrays.py | 121 +++++++++++++++++++++++++++++++++++++++---- doa_py/signals.py | 127 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 211 insertions(+), 37 deletions(-) diff --git a/doa_py/arrays.py b/doa_py/arrays.py index 4de08fc..97d9f1e 100644 --- a/doa_py/arrays.py +++ b/doa_py/arrays.py @@ -1,4 +1,5 @@ from abc import ABC +from typing import Literal import numpy as np @@ -94,6 +95,7 @@ def received_signal( nsamples=100, amp=None, unit="deg", + use_cache=False, **kwargs, ): """Generate array received signal based on array signal model @@ -113,6 +115,8 @@ def received_signal( amp: The amplitude of each signal, 1d numpy array unit: The unit of the angle, `rad` represents radian, `deg` represents degree. Defaults to 'deg'. + use_cache (bool): If True, use cache to generate identical signals + (noise is random). Default to `False`. **kwargs: Additional parameters for generating broadband signal, check `gen` method of `BroadSignal`. """ @@ -121,22 +125,34 @@ def received_signal( if isinstance(signal, BroadSignal): received = self._gen_broadband( - signal, - snr, - nsamples, - angle_incidence, - amp, + signal=signal, + snr=snr, + nsamples=nsamples, + angle_incidence=angle_incidence, + amp=amp, + use_cache=use_cache, **kwargs, ) if isinstance(signal, NarrowSignal): received = self._gen_narrowband( - signal, snr, nsamples, angle_incidence, amp + signal=signal, + snr=snr, + nsamples=nsamples, + angle_incidence=angle_incidence, + amp=amp, + use_cache=use_cache, ) return received def _gen_narrowband( - self, signal: NarrowSignal, snr, nsamples, angle_incidence, amp + self, + signal: NarrowSignal, + snr, + nsamples, + angle_incidence, + amp, + use_cache=False, ): """Generate narrowband received signal @@ -151,7 +167,9 @@ def _gen_narrowband( signal.frequency, angle_incidence, unit="rad" ) - incidence_signal = signal.gen(n=num_signal, nsamples=nsamples, amp=amp) + incidence_signal = signal.gen( + n=num_signal, nsamples=nsamples, amp=amp, use_cache=use_cache + ) received = manifold_matrix @ incidence_signal @@ -167,9 +185,44 @@ def _gen_broadband( nsamples, angle_incidence, amp, + use_cache=False, + calc_method: Literal["fft", "delay"] = "fft", + **kwargs, + ): + assert calc_method in ["fft", "delay"], "Invalid calculation method" + + if calc_method == "fft": + return self._gen_broadband_fft( + signal=signal, + snr=snr, + nsamples=nsamples, + angle_incidence=angle_incidence, + amp=amp, + use_cache=use_cache, + **kwargs, + ) + else: + return self._gen_broadband_delay( + signal=signal, + snr=snr, + nsamples=nsamples, + angle_incidence=angle_incidence, + amp=amp, + use_cache=use_cache, + **kwargs, + ) + + def _gen_broadband_fft( + self, + signal: BroadSignal, + snr, + nsamples, + angle_incidence, + amp, + use_cache=False, **kwargs, ): - """Generate broadband received signal + """Generate broadband received signal using FFT model `azimuth` and `elevation` are already in radians """ @@ -184,6 +237,8 @@ def _gen_broadband( n=num_signal, nsamples=nsamples, amp=amp, + use_cache=use_cache, + delay=None, **kwargs, ) @@ -209,6 +264,54 @@ def _gen_broadband( return received + def _gen_broadband_delay( + self, + signal: BroadSignal, + snr, + nsamples, + angle_incidence, + amp, + use_cache=False, + **kwargs, + ): + """Generate broadband received signal by applying delay + + `azimuth` and `elevation` are already in radians + """ + if angle_incidence.ndim == 1: + num_signal = angle_incidence.size + else: + num_signal = angle_incidence.shape[1] + + # calculate time delay + angle_incidence = np.reshape(angle_incidence, (2, -1)) + cos_cos = np.cos(angle_incidence[0]) * np.cos(angle_incidence[1]) + sin_cos = np.sin(angle_incidence[0]) * np.cos(angle_incidence[1]) + sin_ = np.sin(angle_incidence[1]) + time_delay = ( + 1 / C * self.array_position @ np.vstack((cos_cos, sin_cos, sin_)) + ) + + received = np.zeros((self.num_antennas, nsamples), dtype=np.complex128) + + for i in range(self.num_antennas): + received[i, :] = np.sum( + signal.gen( + n=num_signal, + nsamples=nsamples, + amp=amp, + use_cache=use_cache, + delay=time_delay[i, :], + **kwargs, + ), + axis=0, + ) + + if snr is not None: + received = self._add_awgn(received, snr) + + return received + def _add_awgn(self, signal, snr_db): sig_pow = np.mean(np.abs(signal) ** 2, axis=1) noise_pow = sig_pow / 10 ** (snr_db / 10) diff --git a/doa_py/signals.py b/doa_py/signals.py index c053cbe..df194c4 100644 --- a/doa_py/signals.py +++ b/doa_py/signals.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Literal, Optional, Union +from typing import Any, Literal, Optional, Union import numpy as np import numpy.typing as npt @@ -20,6 +20,9 @@ def __init__(self, rng: Optional[np.random.Generator] = None): else: self._rng = rng + # caches used to generate the identical signals + self._cache = {} + def set_rng(self, rng: np.random.Generator): """Setting random number generator @@ -29,6 +32,15 @@ def set_rng(self, rng: np.random.Generator): """ self._rng = rng + def _set_cache(self, key: str, value: Any): + """Set cache value + + Args: + key (str): Cache key + value (Any): Cache value + """ + self._cache[key] = value + def _get_amp( self, amp: Optional[ListLike], @@ -53,7 +65,11 @@ def _get_amp( @abstractmethod def gen( - self, n: int, nsamples: int, amp=Optional[ListLike] + self, + n: int, + nsamples: int, + amp=Optional[ListLike], + use_cache: bool = False, ) -> npt.NDArray[np.complex128]: """Generate sampled signals @@ -63,6 +79,8 @@ def gen( amp (np.array): Amplitude of the signals (1D array of size n), used to define different amplitudes for different signals. By default it will generate equal amplitude signal. + use_cache (bool): If True, use cache to generate identical signals. + Default to `False`. Returns: signal (np.array): Sampled signals @@ -92,7 +110,11 @@ def frequency(self): @abstractmethod def gen( - self, n: int, nsamples: int, amp: Optional[ListLike] = None + self, + n: int, + nsamples: int, + amp: Optional[ListLike] = None, + use_cache: bool = False, ) -> npt.NDArray[np.complex128]: raise NotImplementedError @@ -109,17 +131,28 @@ def __init__(self, fc, rng=None): """ super().__init__(fc, rng) - def gen(self, n, nsamples, amp=None) -> npt.NDArray[np.complex128]: + def gen( + self, n, nsamples, amp=None, use_cache=False + ) -> npt.NDArray[np.complex128]: amp = self._get_amp(amp, n) + if use_cache and not self._cache == {}: + # use cache + real = self._cache["real"] + imag = self._cache["imag"] + assert real.shape == (n, nsamples) and imag.shape == ( + n, + nsamples, + ), "Cache shape mismatch" + else: + # Generate random amp + real = self._rng.standard_normal(size=(n, nsamples)) + imag = self._rng.standard_normal(size=(n, nsamples)) + self._set_cache("real", real) + self._set_cache("imag", imag) + # Generate complex envelope - signal = amp @ ( - np.sqrt(1 / 2) - * ( - self._rng.standard_normal(size=(n, nsamples)) - + 1j * self._rng.standard_normal(size=(n, nsamples)) - ) - ) + signal = amp @ (np.sqrt(1 / 2) * (real + 1j * imag)) return signal @@ -145,22 +178,31 @@ def __init__( ), "This signal must be narrowband: freq_ratio in (0, 0.1)" self._freq_ratio = freq_ratio - def gen(self, n, nsamples, amp=None): + def gen(self, n, nsamples, amp=None, use_cache=False): amp = self._get_amp(amp, n) + if use_cache and not self._cache == {}: + freq = self._cache["freq"] + phase = self._cache["phase"] + assert freq.shape == (n, 1) and phase.shape == ( + n, + 1, + ), "Cache shape mismatch" + else: + # Generate random phase signal + freq = self._rng.uniform( + 0, self._freq_ratio * self._fc, size=(n, 1) + ) + phase = self._rng.uniform(0, 2 * np.pi, size=(n, 1)) + self._set_cache("freq", freq) + self._set_cache("phase", phase) + fs = self._fc * self._freq_ratio * 5 # Generate random frequency signal signal = ( amp - @ np.exp( - 1j - * 2 - * np.pi - * self._rng.uniform(0, self._freq_ratio * self._fc, size=(n, 1)) - / fs - * np.arange(nsamples) - ) - * np.exp(1j * self._rng.uniform(0, 2 * np.pi, size=(n, 1))) # phase + @ np.exp(1j * 2 * np.pi * freq / fs * np.arange(nsamples)) + * np.exp(1j * phase) # phase ) return signal @@ -189,8 +231,10 @@ def gen( n: int, nsamples: int, amp: Optional[ListLike] = None, + use_cache: bool = False, min_length_ratio: float = 0.1, no_overlap: bool = False, + delay: Optional[Union[npt.NDArray, int, float]] = None, ): """Generate sampled signals @@ -200,10 +244,13 @@ def gen( amp (np.array): Amplitude of the signals (1D array of size n), used to define different amplitudes for different signals. By default it will generate equal amplitude signal. + use_cache (bool): If True, use cache to generate identical signals. + Default to `False`. min_length_ratio (float): Minimum length ratio of the frequency range in (f_max - f_min) no_overlap (bool): If True, generate signals with non-overlapping subbands + delay (float | None): If not None, apply delay to all signals. Returns: signal (np.array): Sampled signals @@ -276,26 +323,50 @@ def __init__(self, f_min, f_max, fs, rng=None): super().__init__(f_min, f_max, fs, rng=rng) def gen( - self, n, nsamples, amp=None, min_length_ratio=0.1, no_overlap=False + self, + n, + nsamples, + amp=None, + use_cache=False, + min_length_ratio=0.1, + no_overlap=False, + delay=None, ) -> npt.NDArray[np.complex128]: amp = self._get_amp(amp, n) - if no_overlap: - fre_ranges = self._generate_non_overlapping_bands( - n, min_length_ratio - ) + + if use_cache and not self._cache == {}: + fre_ranges = self._cache["fre_ranges"] + phase = self._cache["phase"] + assert fre_ranges.shape == (n, 2) and phase.shape == ( + n, + 1, + ), "Cache shape mismatch" else: - fre_ranges = self._gen_fre_bands(n, min_length_ratio) + if no_overlap: + fre_ranges = self._generate_non_overlapping_bands( + n, min_length_ratio + ) + else: + fre_ranges = self._gen_fre_bands(n, min_length_ratio) + phase = self._rng.uniform(0, 2 * np.pi, size=(n, 1)) + self._set_cache("fre_ranges", fre_ranges) + self._set_cache("phase", phase) t = np.arange(nsamples) * 1 / self._fs f0 = fre_ranges[:, 0] k = (fre_ranges[:, 1] - fre_ranges[:, 0]) / t[-1] + if delay is not None: + if isinstance(delay, (int, float)): + delay = np.ones(n) * delay + t = t + (delay.reshape(-1, 1) / self._fs) + signal = np.exp( 1j * 2 * np.pi * (f0.reshape(-1, 1) * t + 0.5 * k.reshape(-1, 1) * t**2) - ) * np.exp(1j * self._rng.uniform(0, 2 * np.pi, size=(n, 1))) # phase + ) * np.exp(1j * phase) signal = amp @ signal