Skip to content

Commit

Permalink
generate broadband signal in time domain
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiim committed Jan 4, 2025
1 parent 3863dd7 commit 090a511
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 37 deletions.
121 changes: 112 additions & 9 deletions doa_py/arrays.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
from typing import Literal

import numpy as np

Expand Down Expand Up @@ -94,6 +95,7 @@ def received_signal(
nsamples=100,
amp=None,
unit="deg",
use_cache=False,
**kwargs,
):
"""Generate array received signal based on array signal model
Expand All @@ -113,6 +115,8 @@ def received_signal(
amp: The amplitude of each signal, 1d numpy array
unit: The unit of the angle, `rad` represents radian,
`deg` represents degree. Defaults to 'deg'.
use_cache (bool): If True, use cache to generate identical signals
(noise is random). Default to `False`.
**kwargs: Additional parameters for generating broadband signal,
check `gen` method of `BroadSignal`.
"""
Expand All @@ -121,22 +125,34 @@ def received_signal(

if isinstance(signal, BroadSignal):
received = self._gen_broadband(
signal,
snr,
nsamples,
angle_incidence,
amp,
signal=signal,
snr=snr,
nsamples=nsamples,
angle_incidence=angle_incidence,
amp=amp,
use_cache=use_cache,
**kwargs,
)
if isinstance(signal, NarrowSignal):
received = self._gen_narrowband(
signal, snr, nsamples, angle_incidence, amp
signal=signal,
snr=snr,
nsamples=nsamples,
angle_incidence=angle_incidence,
amp=amp,
use_cache=use_cache,
)

return received

def _gen_narrowband(
self, signal: NarrowSignal, snr, nsamples, angle_incidence, amp
self,
signal: NarrowSignal,
snr,
nsamples,
angle_incidence,
amp,
use_cache=False,
):
"""Generate narrowband received signal
Expand All @@ -151,7 +167,9 @@ def _gen_narrowband(
signal.frequency, angle_incidence, unit="rad"
)

incidence_signal = signal.gen(n=num_signal, nsamples=nsamples, amp=amp)
incidence_signal = signal.gen(
n=num_signal, nsamples=nsamples, amp=amp, use_cache=use_cache
)

received = manifold_matrix @ incidence_signal

Expand All @@ -167,9 +185,44 @@ def _gen_broadband(
nsamples,
angle_incidence,
amp,
use_cache=False,
calc_method: Literal["fft", "delay"] = "fft",
**kwargs,
):
assert calc_method in ["fft", "delay"], "Invalid calculation method"

if calc_method == "fft":
return self._gen_broadband_fft(
signal=signal,
snr=snr,
nsamples=nsamples,
angle_incidence=angle_incidence,
amp=amp,
use_cache=use_cache,
**kwargs,
)
else:
return self._gen_broadband_delay(
signal=signal,
snr=snr,
nsamples=nsamples,
angle_incidence=angle_incidence,
amp=amp,
use_cache=use_cache,
**kwargs,
)

def _gen_broadband_fft(
self,
signal: BroadSignal,
snr,
nsamples,
angle_incidence,
amp,
use_cache=False,
**kwargs,
):
"""Generate broadband received signal
"""Generate broadband received signal using FFT model
`azimuth` and `elevation` are already in radians
"""
Expand All @@ -184,6 +237,8 @@ def _gen_broadband(
n=num_signal,
nsamples=nsamples,
amp=amp,
use_cache=use_cache,
delay=None,
**kwargs,
)

Expand All @@ -209,6 +264,54 @@ def _gen_broadband(

return received

def _gen_broadband_delay(
self,
signal: BroadSignal,
snr,
nsamples,
angle_incidence,
amp,
use_cache=False,
**kwargs,
):
"""Generate broadband received signal by applying delay
`azimuth` and `elevation` are already in radians
"""
if angle_incidence.ndim == 1:
num_signal = angle_incidence.size
else:
num_signal = angle_incidence.shape[1]

# calculate time delay
angle_incidence = np.reshape(angle_incidence, (2, -1))
cos_cos = np.cos(angle_incidence[0]) * np.cos(angle_incidence[1])
sin_cos = np.sin(angle_incidence[0]) * np.cos(angle_incidence[1])
sin_ = np.sin(angle_incidence[1])
time_delay = (
1 / C * self.array_position @ np.vstack((cos_cos, sin_cos, sin_))
)

received = np.zeros((self.num_antennas, nsamples), dtype=np.complex128)

for i in range(self.num_antennas):
received[i, :] = np.sum(
signal.gen(
n=num_signal,
nsamples=nsamples,
amp=amp,
use_cache=use_cache,
delay=time_delay[i, :],
**kwargs,
),
axis=0,
)

if snr is not None:
received = self._add_awgn(received, snr)

return received

def _add_awgn(self, signal, snr_db):
sig_pow = np.mean(np.abs(signal) ** 2, axis=1)
noise_pow = sig_pow / 10 ** (snr_db / 10)
Expand Down
Loading

0 comments on commit 090a511

Please sign in to comment.