diff --git a/dpnegf/negf/lead_property.py b/dpnegf/negf/lead_property.py index b91a4c4..80575dd 100644 --- a/dpnegf/negf/lead_property.py +++ b/dpnegf/negf/lead_property.py @@ -517,7 +517,7 @@ def _estimate_worker_memory(lead_L, lead_R, kpoint=None, temp_allocation_factor= return total_estimate -def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.7, min_workers=1, kpoint=None): +def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.7, min_workers=1, kpoint=None, n_cpus=None): """ Calculate safe number of parallel workers based on available system memory. @@ -533,6 +533,9 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0. Minimum number of workers to use. Default 1. kpoint : array-like, optional A sample k-point for fetching Hamiltonian matrices to estimate memory. + n_cpus : int or None, optional + Optional upper bound on CPU cores to consider. If provided, caps the + available CPU count before memory-based limiting. Returns ------- @@ -543,6 +546,11 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0. if cpu_count is None or cpu_count < 1: cpu_count = 1 log.warning("os.cpu_count() returned None or invalid value. Defaulting to 1 CPU core.") + if n_cpus is not None: + if isinstance(n_cpus, int) and n_cpus > 0: + cpu_count = min(cpu_count, n_cpus) + else: + log.warning(f"Requested n_cpus={n_cpus} is not a positive integer. Ignoring.") available_memory = psutil.virtual_memory().available memory_per_worker = _estimate_worker_memory(lead_L, lead_R, kpoint=kpoint) @@ -592,7 +600,7 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0. def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid, - self_energy_save_path=None, n_jobs=-1, batch_size=200): + self_energy_save_path=None, n_jobs=-1, batch_size=200, n_cpus=None): """ Computes and saves self-energy matrices for all combinations of k-points and energy values for left and right leads. @@ -615,9 +623,15 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid, self_energy_save_path : str or None, optional Directory to save self-energy files. If None, uses lead_L's results_path. n_jobs : int, optional - Number of parallel jobs to use. Default is -1 (use all available CPUs). + Requested number of parallel jobs. Default is -1 (auto-detect). The + final worker count may be capped by ``n_cpus`` and reduced further by + memory-based limiting. batch_size : int, optional Number of (k, e) tasks per parallel batch. Default is 200. + n_cpus : int or None, optional + Optional upper bound on CPU cores to consider when auto-detecting or + capping ``n_jobs``. If None, uses all available CPUs. Memory-based + limiting may still reduce the final worker count. Returns ------- @@ -633,7 +647,7 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid, # Calculate safe number of workers based on available memory # Use first k-point for memory estimation sample_kpoint = kpoints_grid[0] if len(kpoints_grid) > 0 else None - safe_n_jobs = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=n_jobs, kpoint=sample_kpoint) + safe_n_jobs = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=n_jobs, kpoint=sample_kpoint, n_cpus=n_cpus) if n_jobs == -1: log.info(f"Auto-detected safe n_jobs={safe_n_jobs} based on available memory") elif safe_n_jobs < n_jobs: diff --git a/dpnegf/tests/test_auto_memory.py b/dpnegf/tests/test_auto_memory.py index 42c05b5..859e0d9 100644 --- a/dpnegf/tests/test_auto_memory.py +++ b/dpnegf/tests/test_auto_memory.py @@ -358,6 +358,20 @@ def test_capped_by_cpu_count(self, mock_os, mock_psutil): # Should be capped at CPU count assert result <= 4 + @patch('dpnegf.negf.lead_property.psutil') + @patch('dpnegf.negf.lead_property.os') + def test_respects_n_cpus_cap(self, mock_os, mock_psutil): + """Test that n_cpus caps available CPU count.""" + mock_os.cpu_count.return_value = 8 + mock_psutil.virtual_memory.return_value = Mock(available=128 * 1024**3) + + lead_L = MockLead("lead_L", matrix_size=10) + lead_R = MockLead("lead_R", matrix_size=10) + + result = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, n_cpus=2) + + assert result == 2 + # ============================================================================= # Integration tests