diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 668713d..fcf791a 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,11 +5,21 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.8.0" +__version__ = "1.9.0" # matching version of the SigMF specification __specification__ = "1.2.6" -from . import archive, archivereader, error, schema, sigmffile, utils, validate +from . import ( + archive, + archivereader, + error, + schema, + siggen, + sigmffile, + utils, + validate, +) from .archive import SigMFArchive from .archivereader import SigMFArchiveReader +from .siggen import SigMFGenerator from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromfile diff --git a/sigmf/error.py b/sigmf/error.py index 1551177..29e0d9e 100644 --- a/sigmf/error.py +++ b/sigmf/error.py @@ -35,3 +35,7 @@ def __init__(self, file_path, file_type="File"): class SigMFConversionError(SigMFError): """Exceptions related to converting to SigMF format.""" + + +class SigMFGeneratorError(SigMFError): + """Exceptions related to synthetic signal generation.""" diff --git a/sigmf/siggen.py b/sigmf/siggen.py new file mode 100644 index 0000000..cbcb615 --- /dev/null +++ b/sigmf/siggen.py @@ -0,0 +1,624 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Simple signal generator utilities for SigMF.""" + +import io +from typing import Optional + +import numpy as np + +from .error import SigMFGeneratorError +from .sigmffile import SigMFFile +from .utils import get_data_type_str, get_sigmf_iso8601_datetime_now + + +class SigMFGenerator: + """ + Builder pattern class for generating synthetic RF signals as SigMF files. + + Supports deterministic generation (with specified parameters) and random + generation (parameterless methods with seed-controlled randomness). + + Parameters + ---------- + seed : int, optional + Random seed for reproducible signal generation. + + Examples + -------- + >>> # deterministic 1khz tone + >>> gen = SigMFGenerator(seed=42) + >>> signal = gen.tone(1000).sample_rate(48000).duration(1.0).generate() + + >>> # multiple tones combined + >>> signal = gen.tone(1000).tone(1500).tone(2000).generate() + + >>> # tone plus sweep + >>> signal = SigMFGenerator().sample_rate(100e3).tone(440).sweep(1000, 5000).duration(0.5).generate() + """ + + def __init__(self, seed: Optional[int] = None): + # random state for reproducible generation across arch / platforms + self._rng = np.random.RandomState(seed) + self._seed = seed + + # signal components (list of dicts) + self._signal_components = [] + + # signal configuration + self._sample_rate_hz = None + self._duration_s = None + self._amplitude = 1.0 + self._snr_db = None + self._frequency_offset_hz = 0.0 + self._phase_offset_rad = 0.0 + + # metadata + self._author = None + self._description = None + self._comment = None + + def tone(self, frequency_hz: Optional[float] = None, amplitude: Optional[float] = None): + """ + Add a sinusoidal tone to the signal. + + Parameters + ---------- + frequency_hz : float, optional + Tone frequency in Hz. If None, will be randomly generated. + amplitude : float, optional + Tone amplitude (linear scale). If None, uses default amplitude. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + component = {"type": "tone"} + if frequency_hz is not None: + component["frequency_hz"] = float(frequency_hz) + if amplitude is not None: + component["amplitude"] = float(amplitude) + self._signal_components.append(component) + return self + + def sweep( + self, + start_frequency_hz: Optional[float] = None, + end_frequency_hz: Optional[float] = None, + amplitude: Optional[float] = None, + ): + """ + Add a linear frequency sweep to the signal. + + Parameters + ---------- + start_frequency_hz : float, optional + Starting frequency in Hz. If None, will be randomly generated. + end_frequency_hz : float, optional + Ending frequency in Hz. If None, will be randomly generated. + amplitude : float, optional + Sweep amplitude (linear scale). If None, uses default amplitude. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + component = {"type": "sweep"} + if start_frequency_hz is not None: + component["start_frequency_hz"] = float(start_frequency_hz) + if end_frequency_hz is not None: + component["end_frequency_hz"] = float(end_frequency_hz) + if amplitude is not None: + component["amplitude"] = float(amplitude) + self._signal_components.append(component) + return self + + def sample_rate(self, rate_hz: float): + """ + Set sample rate. + + Parameters + ---------- + rate_hz : float + Sample rate in Hz. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._sample_rate_hz = float(rate_hz) + return self + + def duration(self, duration_s: float): + """ + Set signal duration. + + Parameters + ---------- + duration_s : float + Duration in seconds. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._duration_s = float(duration_s) + return self + + def amplitude(self, amplitude: float): + """ + Set signal amplitude. + + Parameters + ---------- + amplitude : float + Signal amplitude (linear scale). + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._amplitude = float(amplitude) + return self + + def snr(self, snr_db: float): + """ + Add white gaussian noise at specified snr. + + Parameters + ---------- + snr_db : float + Signal-to-noise ratio in dB. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._snr_db = float(snr_db) + return self + + def frequency_offset(self, offset_hz: float): + """ + Add frequency offset to signal. + + Parameters + ---------- + offset_hz : float + Frequency offset in Hz. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._frequency_offset_hz = float(offset_hz) + return self + + def phase_offset(self, offset_rad: float): + """ + Add phase offset to signal. + + Parameters + ---------- + offset_rad : float + Phase offset in radians. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._phase_offset_rad = float(offset_rad) + return self + + def author(self, author: str): + """ + Set author metadata. + + Parameters + ---------- + author : str + Author name/email. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._author = str(author) + return self + + def description(self, description: str): + """ + Set description metadata. + + Parameters + ---------- + description : str + Signal description. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._description = str(description) + return self + + def comment(self, comment: str): + """ + Set comment metadata. + + Parameters + ---------- + comment : str + Comment text. + + Returns + ------- + SigMFGenerator + Self for method chaining. + """ + self._comment = str(comment) + return self + + def generate(self) -> SigMFFile: + """ + Generate the synthetic signal and return as sigmf file. + + Returns + ------- + SigMFFile + Generated signal file with metadata. + + Raises + ------ + SigMFGeneratorError + If required parameters are missing or invalid. + """ + # validation and random parameter generation + self._fill_random_parameters() + self._validate_parameters() + + # generate signal samples + samples = self._generate_samples() + + # create sigmf file with in-memory buffer + data_buffer = io.BytesIO() + data_buffer.write(samples.tobytes()) + data_buffer.seek(0) + + # build metadata + metadata = self._build_metadata(samples) + + # create sigmf file object + sigmf_file = SigMFFile(metadata=metadata) + sigmf_file.set_data_file(data_buffer=data_buffer) + + return sigmf_file + + def _fill_random_parameters(self) -> None: + """Fill unspecified parameters with random values.""" + # common sample rates to choose from + common_sample_rates = [8000, 22050, 44100, 48000, 96000, 192000, 1e6, 2e6] + + # set random sample rate if not specified + if self._sample_rate_hz is None: + self._sample_rate_hz = float(self._rng.choice(common_sample_rates)) + + # set random duration if not specified (0.1s to 5s) + if self._duration_s is None: + self._duration_s = self._rng.uniform(0.1, 5.0) + + # if no components specified, randomly generate some + if len(self._signal_components) == 0: + while True: + if self._rng.random() < 0.5: + self._signal_components.append({"type": "tone"}) + else: + self._signal_components.append({"type": "sweep"}) + if self._rng.random() <= 0.2: + # E[N] = 1 / threshold -> 5 components on average + break + + # fill parameters for each signal component + for component in self._signal_components: + # add random timing for each component + if "start_time_s" not in component: + # random start time in first 80% of total duration + max_start_time = self._duration_s * 0.8 + component["start_time_s"] = round(self._rng.uniform(0.0, max_start_time), 3) + + if "component_duration_s" not in component: + # random duration from start time to end (minimum 10% of total duration) + remaining_time = self._duration_s - component["start_time_s"] + min_duration = min(self._duration_s * 0.1, remaining_time) + component["component_duration_s"] = round(self._rng.uniform(min_duration, remaining_time), 3) + + # set amplitude if not specified + if "amplitude" not in component: + component["amplitude"] = self._amplitude + + if component["type"] == "tone": + if "frequency_hz" not in component: + # random frequency across full baseband: -nyquist to +nyquist (excluding DC ±100 Hz) + nyquist = self._sample_rate_hz / 2 + freq = self._rng.uniform(-nyquist + 100.0, nyquist - 100.0) + component["frequency_hz"] = round(freq, 1) + + elif component["type"] == "sweep": + if "start_frequency_hz" not in component: + component["start_frequency_hz"] = round(self._rng.uniform(100.0, self._sample_rate_hz / 4 * 0.8), 1) + if "end_frequency_hz" not in component: + start_freq = component["start_frequency_hz"] + # ensure end freq is different from start + if start_freq < self._sample_rate_hz / 4 * 0.5: + component["end_frequency_hz"] = round( + self._rng.uniform(start_freq * 1.5, self._sample_rate_hz / 4), 1 + ) + else: + component["end_frequency_hz"] = round(self._rng.uniform(100.0, start_freq * 0.7), 1) + + def _validate_parameters(self) -> None: + """Validate current parameters.""" + if self._sample_rate_hz <= 0: + raise SigMFGeneratorError(f"sample rate must be positive, got {self._sample_rate_hz}") + + if self._duration_s <= 0: + raise SigMFGeneratorError(f"duration must be positive, got {self._duration_s}") + + # validate frequencies against nyquist limit + nyquist = self._sample_rate_hz / 2 + for component in self._signal_components: + frequencies_to_check = [] + + if component["type"] == "tone": + frequencies_to_check = [(component["frequency_hz"], "tone frequency")] + + elif component["type"] == "sweep": + frequencies_to_check = [ + (component["start_frequency_hz"], "start frequency"), + (component["end_frequency_hz"], "end frequency"), + ] + + for freq, freq_name in frequencies_to_check: + if abs(freq) >= nyquist: + raise SigMFGeneratorError(f"{freq_name} {freq} hz exceeds nyquist limit {nyquist} hz") + + def _generate_samples(self) -> np.ndarray: + """Generate signal samples by combining all signal components with timing.""" + # calculate number of samples + num_samples = int(self._sample_rate_hz * self._duration_s) + time_samples = np.arange(num_samples, dtype=np.float64) / self._sample_rate_hz + + # initialize combined signal + combined_signal = np.zeros(num_samples, dtype=np.complex128) + + # generate and sum each signal component with timing + for component in self._signal_components: + # calculate component timing in samples + start_sample = int(component["start_time_s"] * self._sample_rate_hz) + component_duration_samples = int(component["component_duration_s"] * self._sample_rate_hz) + end_sample = min(start_sample + component_duration_samples, num_samples) + + if start_sample >= num_samples or end_sample <= start_sample: + continue # component is outside signal bounds + + # create time vector for this component only + component_samples = end_sample - start_sample + component_time = time_samples[start_sample:end_sample] - component["start_time_s"] + + if component["type"] == "tone": + freq_hz = component["frequency_hz"] + amplitude = component["amplitude"] + component_signal = amplitude * np.exp(2j * np.pi * freq_hz * component_time) + + elif component["type"] == "sweep": + start_freq = component["start_frequency_hz"] + end_freq = component["end_frequency_hz"] + amplitude = component["amplitude"] + component_duration = component["component_duration_s"] + + # linear frequency sweep over component duration + freq_slope = (end_freq - start_freq) / component_duration + # integrate to get phase + phase = 2 * np.pi * (start_freq * component_time + 0.5 * freq_slope * component_time**2) + component_signal = amplitude * np.exp(1j * phase) + + # apply tapering to avoid clicks (5ms taper or 10% of component duration, whichever is smaller) + taper_samples = min(int(0.005 * self._sample_rate_hz), component_samples // 10) + if taper_samples > 1: + # hann window tapering + taper_window = np.hanning(2 * taper_samples) + # apply fade-in + component_signal[:taper_samples] *= taper_window[:taper_samples] + # apply fade-out + component_signal[-taper_samples:] *= taper_window[taper_samples:] + + # add this component to combined signal at correct time + combined_signal[start_sample:end_sample] += component_signal + + # apply global frequency offset + if self._frequency_offset_hz != 0: + combined_signal *= np.exp(2j * np.pi * self._frequency_offset_hz * time_samples) + + # apply global phase offset + if self._phase_offset_rad != 0: + combined_signal *= np.exp(1j * self._phase_offset_rad) + + # add noise based on snr + if self._snr_db is not None: + signal_power = np.mean(np.abs(combined_signal) ** 2) + noise_power = signal_power / (10 ** (self._snr_db / 10)) + + # complex white gaussian noise + noise_real = self._rng.normal(0, np.sqrt(noise_power / 2), num_samples) + noise_imag = self._rng.normal(0, np.sqrt(noise_power / 2), num_samples) + noise = noise_real + 1j * noise_imag + + combined_signal += noise + + # convert to complex64 for sigmf + return combined_signal.astype(np.complex64) + + def _build_annotations(self, samples: np.ndarray) -> list: + """Build annotations describing each signal component with timing.""" + annotations = [] + generator_name = "sigmf-python SigMFGenerator" + + # create annotation for each signal component + for component in self._signal_components: + # calculate component timing in samples + start_sample = int(component["start_time_s"] * self._sample_rate_hz) + component_duration_samples = int(component["component_duration_s"] * self._sample_rate_hz) + end_sample = min(start_sample + component_duration_samples, len(samples)) + + if start_sample >= len(samples) or end_sample <= start_sample: + continue # skip components outside signal bounds + + # base annotation common to all components + base_annotation = { + SigMFFile.START_INDEX_KEY: start_sample, + SigMFFile.LENGTH_INDEX_KEY: end_sample - start_sample, + SigMFFile.GENERATOR_KEY: generator_name, + } + + if component["type"] == "tone": + base_freq = component["frequency_hz"] + total_freq = base_freq + self._frequency_offset_hz + bandwidth = 2.0 # narrow bandwidth for tone + + base_annotation.update( + { + SigMFFile.FLO_KEY: total_freq - bandwidth / 2, + SigMFFile.FHI_KEY: total_freq + bandwidth / 2, + SigMFFile.LABEL_KEY: f"{base_freq:.1f} Hz tone ({component['start_time_s']:.3f}-{component['start_time_s'] + component['component_duration_s']:.3f}s)", + } + ) + + elif component["type"] == "sweep": + start_freq = component["start_frequency_hz"] + self._frequency_offset_hz + end_freq = component["end_frequency_hz"] + self._frequency_offset_hz + + # include timing information in sweep label + start_time_ms = component["start_time_s"] * 1000 + end_time_ms = (component["start_time_s"] + component["component_duration_s"]) * 1000 + + base_annotation.update( + { + SigMFFile.FLO_KEY: min(start_freq, end_freq), + SigMFFile.FHI_KEY: max(start_freq, end_freq), + SigMFFile.LABEL_KEY: f"{component['start_frequency_hz']:.1f}-{component['end_frequency_hz']:.1f} Hz {component['type']} ({start_time_ms:.1f}-{end_time_ms:.1f} ms)", + } + ) + + annotations.append(base_annotation) + + # add user comment to first component if provided + if self._comment is not None and len(annotations) > 0: + annotations[0][SigMFFile.COMMENT_KEY] = self._comment + + # helper function to create full-signal annotations + def create_full_signal_annotation(label: str) -> dict: + return { + SigMFFile.START_INDEX_KEY: 0, + SigMFFile.LENGTH_INDEX_KEY: len(samples), + SigMFFile.GENERATOR_KEY: generator_name, + SigMFFile.LABEL_KEY: label, + } + + # noise annotation if snr was applied + if self._snr_db is not None: + noise_annotation = create_full_signal_annotation(f"AWGN {self._snr_db:.1f} dB SNR") + noise_annotation.update( + { + SigMFFile.FLO_KEY: 0.0, + SigMFFile.FHI_KEY: self._sample_rate_hz / 2, # full nyquist bandwidth + } + ) + annotations.append(noise_annotation) + + # frequency offset annotation if applied + if abs(self._frequency_offset_hz) > 0.1: # only annotate non-trivial offsets + offset_annotation = create_full_signal_annotation(f"freq offset {self._frequency_offset_hz:+.1f} Hz") + annotations.append(offset_annotation) + + # phase offset annotation if applied + if abs(self._phase_offset_rad) > 0.01: # only annotate non-trivial offsets + phase_deg = self._phase_offset_rad * 180 / np.pi + phase_annotation = create_full_signal_annotation(f"phase offset {phase_deg:+.1f}°") + annotations.append(phase_annotation) + + return annotations + + def _build_metadata(self, samples: np.ndarray) -> dict: + """Build sigmf metadata dict.""" + # build description based on signal components + if self._description is None: + + def get_component_description(component): + if component["type"] == "tone": + return f"{component['frequency_hz']:.1f} hz tone" + elif component["type"] == "sweep": + return f"{component['start_frequency_hz']:.1f}-{component['end_frequency_hz']:.1f} hz {component['type']}" + return component["type"] + + component_descriptions = [get_component_description(c) for c in self._signal_components] + + if len(component_descriptions) == 1: + desc = f"synthetic {component_descriptions[0]}" + else: + desc = f"synthetic signal with {', '.join(component_descriptions)}" + + if self._snr_db is not None: + desc += f" at {self._snr_db:.1f} db snr" + + self._description = desc + + # build generator info + generator_info = f"sigmf-python siggen.SigMFGenerator" + if self._seed is not None: + generator_info += f" (seed={self._seed:#x})" + + # create metadata structure + global_info = { + SigMFFile.DATATYPE_KEY: get_data_type_str(samples), + SigMFFile.SAMPLE_RATE_KEY: self._sample_rate_hz, + SigMFFile.NUM_CHANNELS_KEY: 1, + SigMFFile.GENERATOR_KEY: generator_info, + SigMFFile.DESCRIPTION_KEY: self._description, + } + + if self._author is not None: + global_info[SigMFFile.AUTHOR_KEY] = self._author + + # create capture info + capture_info = { + SigMFFile.START_INDEX_KEY: 0, + SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + } + + # add frequency if there's a single dominant tone component + tone_components = [c for c in self._signal_components if c["type"] == "tone"] + if len(tone_components) == 1 and len(self._signal_components) == 1: + dominant_freq = tone_components[0]["frequency_hz"] + self._frequency_offset_hz + capture_info[SigMFFile.FREQUENCY_KEY] = dominant_freq + + # create annotations for signal components + annotations = self._build_annotations(samples) + + return { + SigMFFile.GLOBAL_KEY: global_info, + SigMFFile.CAPTURE_KEY: [capture_info], + SigMFFile.ANNOTATION_KEY: annotations, + } diff --git a/tests/test_siggen.py b/tests/test_siggen.py new file mode 100644 index 0000000..6a638b8 --- /dev/null +++ b/tests/test_siggen.py @@ -0,0 +1,420 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for signal generation utilities.""" + +import unittest +from io import BytesIO + +import numpy as np +import numpy.testing as npt + +from sigmf import SigMFFile +from sigmf.error import SigMFGeneratorError +from sigmf.siggen import SigMFGenerator + + +class TestSigGen(unittest.TestCase): + """Test Signal Generator functionality.""" + + def setUp(self): + """setup test fixtures""" + self.seed = 0xDEADC0DE + self.test_sample_rate = 48000 + self.test_duration = 1.0 + self.test_freq = -1000.0 + + def test_deterministic_tone_generation(self): + """test deterministic tone generation with specified parameters""" + signal = ( + SigMFGenerator(self.seed) + .tone(self.test_freq) + .sample_rate(self.test_sample_rate) + .duration(self.test_duration) + .generate() + ) + + # verify metadata + self.assertEqual(signal.sample_rate, self.test_sample_rate) + self.assertEqual(signal.datatype, "cf32_le") + self.assertIn("-1000.0 hz tone", signal.description) + + # verify signal characteristics + samples = signal.read_samples() + expected_samples = int(self.test_sample_rate * self.test_duration) + self.assertEqual(len(samples), expected_samples) + + # verify it's complex data + self.assertTrue(np.iscomplexobj(samples)) + + # verify frequency content by checking dominant frequency + fft_samples = np.fft.fft(samples) + fft_freqs = np.fft.fftfreq(len(samples), 1 / self.test_sample_rate) + dominant_freq_idx = np.argmax(np.abs(fft_samples)) + dominant_freq = fft_freqs[dominant_freq_idx] + self.assertAlmostEqual(dominant_freq, self.test_freq, delta=10) # within 10 hz, signed + + def test_reproducible_generation(self): + """test that same seed produces identical results""" + # generate signal 3 times with same seed + # only difference will be datetime when capure is created + signal0 = SigMFGenerator(self.seed).generate() + signal1 = SigMFGenerator(self.seed).generate() + signal2 = SigMFGenerator(self.seed).generate() + + # set capture datetime identical + for sig in [signal0, signal1, signal2]: + sig.add_capture(0, {SigMFFile.DATETIME_KEY: "2026-01-01T00:00:00Z"}) + + # compare metadata (which includes checksums) + self.assertEqual(signal0, signal1) + self.assertEqual(signal0, signal2) + + def test_sweep_generation(self): + """test linear frequency sweep generation""" + signal = ( + SigMFGenerator() + .sweep(-500.0, 2000.0) + .sample_rate(self.test_sample_rate) + .duration(self.test_duration) + .generate() + ) + self.assertIn("-500.0-2000.0 hz sweep", signal.description) + + # random sweep (no args) also works + self.assertIn("sweep", SigMFGenerator().sweep().generate().description) + + def test_nominal_chaining(self): + """test builder pattern method chaining""" + signal = ( + SigMFGenerator(self.seed) + .tone(2000) + .sample_rate(44100) + .duration(0.5) + .amplitude(0.8) + .snr(15) + .frequency_offset(100) + .phase_offset(np.pi / 4) + .author("test@example.com") + .description("test signal") + .comment("test comment") + .generate() + ) + + # verify chaining worked + self.assertEqual(signal.get_global_info()[SigMFFile.AUTHOR_KEY], "test@example.com") + self.assertEqual(signal.description, "test signal") + + # should have multiple annotations: main signal + noise + freq offset + phase offset + annotations = signal.get_annotations() + self.assertGreaterEqual(len(annotations), 3) # at least main + noise + offsets + + # find main signal annotation (has comment) + main_annotation = next(ann for ann in annotations if SigMFFile.COMMENT_KEY in ann) + self.assertEqual(main_annotation[SigMFFile.COMMENT_KEY], "test comment") + + # verify there's a noise annotation + noise_annotations = [ann for ann in annotations if "AWGN" in ann.get(SigMFFile.LABEL_KEY, "")] + self.assertEqual(len(noise_annotations), 1) + + # verify there's a frequency offset annotation + freq_offset_annotations = [ann for ann in annotations if "freq offset" in ann.get(SigMFFile.LABEL_KEY, "")] + self.assertEqual(len(freq_offset_annotations), 1) + + def test_snr_noise_addition(self): + """test that snr parameter adds appropriate noise""" + # generate clean tone and noisy tone + clean_signal = ( + SigMFGenerator(self.seed) + .tone(1000) + .sample_rate(self.test_sample_rate) + .duration(0.1) + .generate() + ) + noisy_signal = ( + SigMFGenerator(self.seed) + .tone(1000) + .sample_rate(self.test_sample_rate) + .duration(0.1) + .snr(10) + .generate() + ) + + # noisy signal should have higher variance due to added noise + clean_power = np.mean(np.abs(clean_signal[:]) ** 2) + noisy_power = np.mean(np.abs(noisy_signal[:]) ** 2) + + # noisy signal should have more power due to added noise + self.assertGreater(noisy_power, clean_power) + + def test_frequency_offset(self): + """test frequency offset functionality""" + base_freq = 1000.0 + offset_freq = 500.0 + + signal = ( + SigMFGenerator(self.seed) + .tone(base_freq) + .frequency_offset(offset_freq) + .sample_rate(self.test_sample_rate) + .duration(self.test_duration) + .generate() + ) + + # verify frequency in capture metadata includes offset + captures = signal.get_captures() + self.assertEqual(captures[0][SigMFFile.FREQUENCY_KEY], base_freq + offset_freq) + + def test_metadata_completeness(self): + """test that generated metadata is complete and valid""" + signal = SigMFGenerator().tone(1000).generate() + + # verify required global fields + global_info = signal.get_global_info() + required_keys = [ + SigMFFile.DATATYPE_KEY, + SigMFFile.SAMPLE_RATE_KEY, + SigMFFile.VERSION_KEY, + SigMFFile.NUM_CHANNELS_KEY, + SigMFFile.GENERATOR_KEY, + SigMFFile.DESCRIPTION_KEY, + ] + + for key in required_keys: + self.assertIn(key, global_info) + + # verify captures exist + captures = signal.get_captures() + self.assertEqual(len(captures), 1) + self.assertIn(SigMFFile.START_INDEX_KEY, captures[0]) + self.assertIn(SigMFFile.DATETIME_KEY, captures[0]) + + # should be valid sigmf + signal.validate() + + def test_generator_info_includes_seed(self): + """test that generator metadata includes seed when provided""" + signal = SigMFGenerator(seed=self.seed).generate() + + generator_info = signal.get_global_info()[SigMFFile.GENERATOR_KEY] + self.assertIn(f"seed={self.seed:#x}", generator_info) + + def test_no_seed_in_generator_info(self): + """test that generator metadata excludes seed when not provided""" + signal = SigMFGenerator().generate() + + generator_info = signal.get_global_info()[SigMFFile.GENERATOR_KEY] + self.assertNotIn("seed=", generator_info) + + def test_data_buffer_creation(self): + """test that signals are created with in-memory buffers""" + signal = SigMFGenerator().generate() + + # should be able to read samples multiple times + samples_0 = signal.read_samples() + samples_1 = signal.read_samples() + npt.assert_array_equal(samples_0, samples_1) + + # verify data is complex64 + self.assertEqual(samples_0.dtype, np.complex64) + + def test_with_different_amplitudes(self): + """test amplitude parameter""" + amp_0 = 0.5 + amp_1 = 1.5 + + signal_0 = ( + SigMFGenerator(self.seed) + .tone(1000) + .amplitude(amp_0) + .sample_rate(48000) + .duration(0.1) + .generate() + ) + signal_1 = ( + SigMFGenerator(self.seed) + .tone(1000) + .amplitude(amp_1) + .sample_rate(48000) + .duration(0.1) + .generate() + ) + + power_0 = np.mean(np.abs(signal_0.read_samples()) ** 2) + power_1 = np.mean(np.abs(signal_1.read_samples()) ** 2) + + expected_power_ratio = (amp_1 / amp_0) ** 2 + actual_power_ratio = power_1 / power_0 + self.assertAlmostEqual(actual_power_ratio, expected_power_ratio, places=1) + + def test_automatic_annotations(self): + """test that appropriate annotations are automatically created""" + # tone with snr and frequency offset should create multiple annotations + signal = ( + SigMFGenerator() + .tone(1000) + .sample_rate(48000) + .duration(0.1) + .snr(15) + .frequency_offset(200) + .comment("test") + .generate() + ) + + annotations = signal.get_annotations() + + # should have main tone, noise, and offset annotations + self.assertEqual(len(annotations), 3) + + # find and verify main tone annotation + tone_annotation = next(ann for ann in annotations if "1000.0 Hz tone" in ann.get(SigMFFile.LABEL_KEY, "")) + # with temporal windowing, start index can be any valid sample index + self.assertGreaterEqual(tone_annotation[SigMFFile.START_INDEX_KEY], 0) + self.assertLess(tone_annotation[SigMFFile.START_INDEX_KEY], 48000 * 0.1) # less than total samples + self.assertEqual(tone_annotation[SigMFFile.GENERATOR_KEY], "sigmf-python SigMFGenerator") + self.assertIn(SigMFFile.FLO_KEY, tone_annotation) + self.assertIn(SigMFFile.FHI_KEY, tone_annotation) + self.assertEqual(tone_annotation[SigMFFile.COMMENT_KEY], "test") + + # verify tone frequency edges account for offset (1000 + 200 = 1200 Hz center) + center_freq = (tone_annotation[SigMFFile.FLO_KEY] + tone_annotation[SigMFFile.FHI_KEY]) / 2 + self.assertAlmostEqual(center_freq, 1200.0, places=1) + + # find and verify noise annotation + noise_annotation = next(ann for ann in annotations if "AWGN" in ann.get(SigMFFile.LABEL_KEY, "")) + self.assertIn("15.0 dB SNR", noise_annotation[SigMFFile.LABEL_KEY]) + self.assertEqual(noise_annotation[SigMFFile.FLO_KEY], 0.0) + self.assertEqual(noise_annotation[SigMFFile.FHI_KEY], 24000.0) # nyquist + + # find and verify frequency offset annotation + offset_annotation = next(ann for ann in annotations if "freq offset" in ann.get(SigMFFile.LABEL_KEY, "")) + self.assertIn("+200.0 Hz", offset_annotation[SigMFFile.LABEL_KEY]) + + def test_sweep_annotations(self): + """test sweep annotations have correct frequency bounds including negative""" + signal = SigMFGenerator().sweep(-2500, 2500).sample_rate(22050).generate() + + annotations = signal.get_annotations() + self.assertEqual(len(annotations), 1) # just main sweep annotation + + sweep_annotation = annotations[0] + self.assertEqual(sweep_annotation[SigMFFile.FLO_KEY], -2500.0) + self.assertEqual(sweep_annotation[SigMFFile.FHI_KEY], 2500.0) + self.assertIn("-2500.0-2500.0 Hz sweep", sweep_annotation[SigMFFile.LABEL_KEY]) + + def test_reverse_sweep_annotations(self): + """test reverse sweep crossing DC has correct bounds""" + signal = SigMFGenerator().sweep(3000, -800).sample_rate(48000).generate() + + annotations = signal.get_annotations() + sweep_annotation = annotations[0] + + # frequency bounds should be min/max regardless of sweep direction + self.assertEqual(sweep_annotation[SigMFFile.FLO_KEY], -800.0) + self.assertEqual(sweep_annotation[SigMFFile.FHI_KEY], 3000.0) + # but label should show original order + self.assertIn("3000.0--800.0 Hz sweep", sweep_annotation[SigMFFile.LABEL_KEY]) + + def test_minimal_annotations(self): + """test that simple signals get minimal but complete annotations""" + signal = SigMFGenerator().tone(440).sample_rate(44100).generate() + + annotations = signal.get_annotations() + self.assertEqual(len(annotations), 1) # just main signal, no noise/offsets + + annotation = annotations[0] + # with temporal windowing, start index can be any valid sample index + self.assertGreaterEqual(annotation[SigMFFile.START_INDEX_KEY], 0) + self.assertIn(SigMFFile.LENGTH_INDEX_KEY, annotation) + self.assertIn(SigMFFile.GENERATOR_KEY, annotation) + self.assertIn("440.0 Hz tone", annotation[SigMFFile.LABEL_KEY]) + + def test_phase_offset(self): + """test phase offset functionality""" + phase_offset = np.pi / 2 + + # use clean signals without noise for precise phase comparison + signal_0 = ( + SigMFGenerator(seed=42) + .tone(1000) + .sample_rate(48000) + .duration(0.1) + .amplitude(1.0) + .generate() + ) + signal_1 = ( + SigMFGenerator(seed=42) + .tone(1000) + .phase_offset(phase_offset) + .sample_rate(48000) + .duration(0.1) + .amplitude(1.0) + .generate() + ) + + # find where the actual signal starts by looking at annotations + start_idx_0 = signal_0.get_annotations()[0][SigMFFile.START_INDEX_KEY] + start_idx_1 = signal_1.get_annotations()[0][SigMFFile.START_INDEX_KEY] + + # both should start at the same sample index (same seed) + self.assertEqual(start_idx_0, start_idx_1) + + # compare samples from the actual signal start + some offset to avoid edge effects + sample_offset = 100 + if start_idx_0 + sample_offset < len(signal_0) and start_idx_1 + sample_offset < len(signal_1): + phase_diff = np.angle(signal_1[start_idx_0 + sample_offset]) - np.angle(signal_0[start_idx_0 + sample_offset]) + + # normalize to [-pi, pi] + phase_diff = (phase_diff + np.pi) % (2 * np.pi) - np.pi + + self.assertAlmostEqual(phase_diff, phase_offset, places=1) + + +class TestEdgeCases(unittest.TestCase): + """Test edge cases and error conditions.""" + + def test_zero_duration(self): + """test zero duration raises error""" + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().duration(0).generate() + + def test_negative_duration(self): + """test negative duration raises error""" + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().duration(-1.0).generate() + + def test_negative_sample_rate(self): + """test negative sample rate raises error""" + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().sample_rate(-8000).generate() + + def test_tone_nyquist_validation(self): + """test tone frequency exceeding nyquist raises error""" + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().tone(5000).sample_rate(8000).generate() + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().tone(-5000).sample_rate(8000).generate() + + def test_sweep_nyquist_validation(self): + """test sweep frequencies exceeding nyquist raise error""" + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().sweep(1000, 5000).sample_rate(8000).generate() + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().sweep(5000, 1000).sample_rate(8000).generate() + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().sweep(1000, -5000).sample_rate(8000).generate() + with self.assertRaises(SigMFGeneratorError): + SigMFGenerator().sweep(-5000, 1000).sample_rate(8000).generate() + + def test_sweep_same_start_end_frequency(self): + """test sweep with same start and end frequency""" + # should generate successfully (effectively a tone) + SigMFGenerator().sweep(333, 333).sample_rate(8000).duration(0.1).generate() + + + +if __name__ == "__main__": + unittest.main()