From fc52eb7ef05bbcd9cf71142c8964cf9b61e434ee Mon Sep 17 00:00:00 2001 From: OutisLi Date: Sun, 11 Jan 2026 23:55:26 +0800 Subject: [PATCH 01/18] feat(pt): use num_epoch to set num_steps --- deepmd/pt/train/training.py | 114 ++++++++++++-- deepmd/utils/argcheck.py | 19 ++- source/tests/pt/test_sampler.py | 269 +++++++++++++++++++++++++++++++- 3 files changed, 385 insertions(+), 17 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index be480fda3f..90e4a1a845 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -147,9 +147,11 @@ def __init__( self.rank = dist.get_rank() if self.is_distributed else 0 self.world_size = dist.get_world_size() if self.is_distributed else 1 self.num_model = len(self.model_keys) + self.model_prob = None # Iteration config - self.num_steps = training_params["numb_steps"] + self.num_steps = training_params.get("numb_steps") + self.num_epoch = training_params.get("num_epoch") self.disp_file = training_params.get("disp_file", "lcurve.out") self.disp_freq = training_params.get("disp_freq", 1000) self.disp_avg = training_params.get("disp_avg", False) @@ -273,6 +275,47 @@ def get_dataloader_and_iter( valid_numb_batch, ) + def compute_total_numb_batch( + numb_batches: Iterable[int], + sampler_weights: np.ndarray, + ) -> int: + weights = np.asarray(sampler_weights, dtype=np.float64) + if weights.ndim != 1: + raise ValueError("Sampler weights must be 1D.") + if weights.size == 0: + raise ValueError("Sampler weights are empty.") + weight_sum = float(np.sum(weights)) + if weight_sum <= 0.0: + raise ValueError("Sampler weights must sum to a positive value.") + probs = weights / weight_sum + nbatches = np.asarray(numb_batches, dtype=np.float64) + if nbatches.shape[0] != probs.shape[0]: + raise ValueError("Number of batches and sampler weights must match.") + valid = probs > 0.0 + if not np.any(valid): + raise ValueError( + "Sampler probabilities must contain at least one positive entry." + ) + return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) + + def resolve_model_prob( + model_keys: list[str], + model_prob_config: dict[str, Any] | None, + model_training_data: dict[str, DpLoaderSet], + ) -> np.ndarray: + model_prob = np.zeros(len(model_keys), dtype=np.float64) + if model_prob_config is not None: + for ii, model_key in enumerate(model_keys): + if model_key in model_prob_config: + model_prob[ii] = float(model_prob_config[model_key]) + else: + for ii, model_key in enumerate(model_keys): + model_prob[ii] = float(len(model_training_data[model_key])) + sum_prob = float(np.sum(model_prob)) + if sum_prob <= 0.0: + raise ValueError("Sum of model prob must be larger than 0!") + return model_prob / sum_prob + def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -462,6 +505,56 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ), ) + # Resolve training steps + if not self.multi_task: + sampler_weights = to_numpy_array(self.training_dataloader.sampler.weights) + total_numb_batch = compute_total_numb_batch( + training_data.index, + sampler_weights, + ) + else: + per_task_total = [] + for model_key in self.model_keys: + sampler_weights = to_numpy_array( + self.training_dataloader[model_key].sampler.weights + ) + per_task_total.append( + compute_total_numb_batch( + training_data[model_key].index, + sampler_weights, + ) + ) + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, + ) + total_numb_batch = int( + np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) + ) + if self.num_steps is None: + if self.num_epoch is None: + raise ValueError( + "Either training.numb_steps or training.num_epoch must be set." + ) + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError("Total number of training batches must be positive.") + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) + elif self.num_epoch is not None: + log.warning( + "Both training.numb_steps and training.num_epoch are set; " + "using numb_steps=%d.", + self.num_steps, + ) + # Learning rate warmup_steps = training_params.get("warmup_steps", None) warmup_ratio = training_params.get("warmup_ratio", None) @@ -684,19 +777,12 @@ def single_model_finetune( ) # Get model prob for multi-task - if self.multi_task: - self.model_prob = np.array([0.0 for key in self.model_keys]) - if training_params.get("model_prob", None) is not None: - model_prob = training_params["model_prob"] - for ii, model_key in enumerate(self.model_keys): - if model_key in model_prob: - self.model_prob[ii] += float(model_prob[model_key]) - else: - for ii, model_key in enumerate(self.model_keys): - self.model_prob[ii] += float(len(self.training_data[model_key])) - sum_prob = np.sum(self.model_prob) - assert sum_prob > 0.0, "Sum of model prob must be larger than 0!" - self.model_prob = self.model_prob / sum_prob + if self.multi_task and self.model_prob is None: + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, + ) # Multi-task share params if shared_links is not None: diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 4b04269e3f..73a1ecd854 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3213,7 +3213,16 @@ def mixed_precision_args() -> list[Argument]: # ! added by Denghui. def training_args( multi_task: bool = False, ) -> list[Argument]: # ! modified by Ziyao: data configuration isolated. - doc_numb_steps = "Number of training batch. Each training uses one batch of data." + doc_numb_steps = "Number of training batches. Each training uses one batch of data. If set, this value takes precedence over num_epoch." + doc_num_epoch = ( + "Number of training epochs. " + "When numb_steps is not set, the total steps are computed as " + "ceil(num_epoch * total_numb_batch). For each training dataset, " + "total_numb_batch is computed as ceil(max_i(n_bch_i / p_i)), where p_i " + "is the sampling probability of system i after sys_probs/auto_prob. " + "In multi-task mode, total_numb_batch is the model_prob-weighted sum " + "over tasks." + ) doc_seed = "The random seed for getting frames from the training data set." doc_disp_file = "The file for printing learning curve." doc_disp_freq = "The frequency of printing learning curve." @@ -3314,7 +3323,13 @@ def training_args( args += [ mixed_precision_data, Argument( - "numb_steps", int, optional=False, doc=doc_numb_steps, alias=["stop_batch"] + "numb_steps", int, optional=True, doc=doc_numb_steps, alias=["stop_batch"] + ), + Argument( + "num_epoch", + [int, float], + optional=True, + doc=doc_only_pt_supported + doc_num_epoch, ), Argument("seed", [int, None], optional=True, doc=doc_seed), Argument( diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index 3d7143b350..0948073d87 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -7,11 +7,16 @@ ) import numpy as np +import pytest import torch from torch.utils.data import ( DataLoader, ) +import deepmd.pt.utils.dataloader as pt_dataloader +from deepmd.pt.utils import ( + dp_random, +) from deepmd.pt.utils.dataloader import ( DpLoaderSet, get_sampler_from_params, @@ -28,8 +33,25 @@ CUR_DIR = os.path.dirname(__file__) +class _SerialPool: + def __init__(self, *args, **kwargs) -> None: + pass + + def __enter__(self) -> "_SerialPool": + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + def map(self, func, iterable): + return [func(item) for item in iterable] + + class TestSampler(unittest.TestCase): def setUp(self) -> None: + self._monkeypatch = pytest.MonkeyPatch() + # Avoid SemLock/CUDA initialization failures in restricted CI by forcing a serial pool. + self._monkeypatch.setattr(pt_dataloader, "Pool", _SerialPool) with open(str(Path(__file__).parent / "water/se_e2_a.json")) as fin: content = fin.read() config = json.loads(content) @@ -40,6 +62,7 @@ def setUp(self) -> None: self.rcut = model_config["descriptor"]["rcut"] self.rcut_smth = model_config["descriptor"]["rcut_smth"] self.sel = model_config["descriptor"]["sel"] + self.type_map = model_config["type_map"] self.batch_size = config["training"]["training_data"]["batch_size"] self.systems = config["training"]["validation_data"]["systems"] if isinstance(self.systems, str): @@ -47,7 +70,7 @@ def setUp(self) -> None: self.my_dataset = DpLoaderSet( self.systems, self.batch_size, - model_config["type_map"], + self.type_map, seed=10, shuffle=False, ) @@ -55,6 +78,81 @@ def setUp(self) -> None: tf_random.seed(10) self.dp_dataset = DeepmdDataSystem(self.systems, self.batch_size, 1, self.rcut) + def tearDown(self) -> None: + self._monkeypatch.undo() + + def _make_dataloader(self, dataset: DpLoaderSet, sampler) -> DataLoader: + return DataLoader( + dataset, + sampler=sampler, + batch_size=None, + num_workers=0, + drop_last=False, + collate_fn=lambda batch: batch, + ) + + def _normalize_probs(self, weights: np.ndarray) -> np.ndarray: + weights = np.asarray(weights, dtype=np.float64) + return weights / np.sum(weights) + + def _compute_total_numb_batch(self, nbatches: np.ndarray, probs: np.ndarray) -> int: + return int(np.ceil(np.max(nbatches / probs))) + + def _sample_sid_counts( + self, dataloader: DataLoader, num_steps: int, nsystems: int + ) -> np.ndarray: + # === Step 1. Initialize Counters === + counts = np.zeros(nsystems, dtype=np.int64) + # === Step 2. Sample Steps === + with torch.device("cpu"): + iterator = iter(dataloader) + for _ in range(num_steps): + try: + batch_data = next(iterator) + except StopIteration: + iterator = iter(dataloader) + batch_data = next(iterator) + sid = batch_data["sid"] + if hasattr(sid, "item"): + sid = sid.item() + counts[int(sid)] += 1 + return counts + + def _sample_multitask_counts( + self, + dataloaders: dict[str, DataLoader], + model_prob: np.ndarray, + num_steps: int, + ) -> tuple[np.ndarray, dict[str, np.ndarray]]: + # === Step 1. Initialize Counters === + model_keys = list(dataloaders.keys()) + model_counts = np.zeros(len(model_keys), dtype=np.int64) + sid_counts = { + model_key: np.zeros(len(dataloaders[model_key].dataset), dtype=np.int64) + for model_key in model_keys + } + # === Step 2. Build Iterators and Sample Steps === + with torch.device("cpu"): + iters = { + model_key: iter(dataloaders[model_key]) for model_key in model_keys + } + for _ in range(num_steps): + model_index = dp_random.choice( + np.arange(len(model_keys), dtype=np.int_), p=model_prob + ) + model_key = model_keys[int(model_index)] + model_counts[int(model_index)] += 1 + try: + batch_data = next(iters[model_key]) + except StopIteration: + iters[model_key] = iter(dataloaders[model_key]) + batch_data = next(iters[model_key]) + sid = batch_data["sid"] + if hasattr(sid, "item"): + sid = sid.item() + sid_counts[model_key][int(sid)] += 1 + return model_counts, sid_counts + def test_sampler_debug_info(self) -> None: dataloader = DataLoader( self.my_dataset, @@ -126,6 +224,175 @@ def test_auto_prob_sys_size_ext_end2end(self): dp_probs = np.array(self.dp_dataset.sys_probs) self.assertTrue(np.allclose(my_probs, dp_probs)) + def test_sampling_stability_single_task(self) -> None: + # === Step 1. Build Dataset and Sampler === + systems = [ + str(Path(__file__).parent / "water/data/data_0"), + str(Path(__file__).parent / "water/data/data_1"), + str(Path(__file__).parent / "water/data/single"), + ] + dataset_epoch = DpLoaderSet( + systems, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + sys_probs = [0.2, 0.3, 0.5] + params = {"sys_probs": sys_probs, "auto_prob": "prob_sys_size"} + sampler_epoch = get_sampler_from_params(dataset_epoch, params) + probs = self._normalize_probs(np.asarray(sampler_epoch.weights)) + nbatches = np.asarray(dataset_epoch.index, dtype=np.float64) + total_numb_batch = self._compute_total_numb_batch(nbatches, probs) + num_epoch = 1.5 + num_steps = int(np.ceil(num_epoch * total_numb_batch)) + + # === Step 2. Sample Using Derived Steps === + torch.manual_seed(123) + dataloader_epoch = self._make_dataloader(dataset_epoch, sampler_epoch) + counts_epoch = self._sample_sid_counts( + dataloader_epoch, num_steps, len(dataset_epoch) + ) + empirical_epoch = counts_epoch / float(num_steps) + self.assertTrue(np.allclose(empirical_epoch, probs, atol=0.1)) + + # === Step 3. Sample Using Explicit Steps === + dataset_steps = DpLoaderSet( + systems, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + sampler_steps = get_sampler_from_params(dataset_steps, params) + torch.manual_seed(123) + dataloader_steps = self._make_dataloader(dataset_steps, sampler_steps) + counts_steps = self._sample_sid_counts( + dataloader_steps, num_steps, len(dataset_steps) + ) + self.assertTrue(np.array_equal(counts_epoch, counts_steps)) + + def test_sampling_stability_multi_task(self) -> None: + # === Step 1. Build Datasets and Samplers === + model_keys = ["model_1", "model_2"] + systems_1 = [ + str(Path(__file__).parent / "water/data/data_0"), + str(Path(__file__).parent / "water/data/data_1"), + ] + systems_2 = [ + str(Path(__file__).parent / "water/data/data_1"), + str(Path(__file__).parent / "water/data/single"), + ] + dataset_1 = DpLoaderSet( + systems_1, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + dataset_2 = DpLoaderSet( + systems_2, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + sampler_1 = get_sampler_from_params( + dataset_1, {"sys_probs": [0.7, 0.3], "auto_prob": "prob_sys_size"} + ) + sampler_2 = get_sampler_from_params( + dataset_2, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} + ) + probs_1 = self._normalize_probs(np.asarray(sampler_1.weights)) + probs_2 = self._normalize_probs(np.asarray(sampler_2.weights)) + per_task_total = np.array( + [ + self._compute_total_numb_batch( + np.asarray(dataset_1.index, dtype=np.float64), probs_1 + ), + self._compute_total_numb_batch( + np.asarray(dataset_2.index, dtype=np.float64), probs_2 + ), + ], + dtype=np.float64, + ) + model_prob = np.asarray([0.4, 0.6], dtype=np.float64) + model_prob = model_prob / np.sum(model_prob) + total_numb_batch = int(np.ceil(np.sum(per_task_total * model_prob))) + num_epoch = 1.5 + num_steps = int(np.ceil(num_epoch * total_numb_batch)) + + # === Step 2. Sample Using Derived Steps === + dataloaders_epoch = { + model_keys[0]: self._make_dataloader(dataset_1, sampler_1), + model_keys[1]: self._make_dataloader(dataset_2, sampler_2), + } + dp_random.seed(321) + torch.manual_seed(321) + model_counts_epoch, sid_counts_epoch = self._sample_multitask_counts( + dataloaders_epoch, model_prob, num_steps + ) + model_freq_epoch = model_counts_epoch / float(num_steps) + self.assertTrue(np.allclose(model_freq_epoch, model_prob, atol=0.1)) + if model_counts_epoch[0] == 0 or model_counts_epoch[1] == 0: + raise AssertionError("Model sampling produced zero counts for a task.") + self.assertTrue( + np.allclose( + sid_counts_epoch[model_keys[0]] / model_counts_epoch[0], + probs_1, + atol=0.1, + ) + ) + self.assertTrue( + np.allclose( + sid_counts_epoch[model_keys[1]] / model_counts_epoch[1], + probs_2, + atol=0.1, + ) + ) + + # === Step 3. Sample Using Explicit Steps === + dataset_1b = DpLoaderSet( + systems_1, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + dataset_2b = DpLoaderSet( + systems_2, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + sampler_1b = get_sampler_from_params( + dataset_1b, {"sys_probs": [0.7, 0.3], "auto_prob": "prob_sys_size"} + ) + sampler_2b = get_sampler_from_params( + dataset_2b, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} + ) + dataloaders_steps = { + model_keys[0]: self._make_dataloader(dataset_1b, sampler_1b), + model_keys[1]: self._make_dataloader(dataset_2b, sampler_2b), + } + dp_random.seed(321) + torch.manual_seed(321) + model_counts_steps, sid_counts_steps = self._sample_multitask_counts( + dataloaders_steps, model_prob, num_steps + ) + self.assertTrue(np.array_equal(model_counts_epoch, model_counts_steps)) + self.assertTrue( + np.array_equal( + sid_counts_epoch[model_keys[0]], sid_counts_steps[model_keys[0]] + ) + ) + self.assertTrue( + np.array_equal( + sid_counts_epoch[model_keys[1]], sid_counts_steps[model_keys[1]] + ) + ) + if __name__ == "__main__": unittest.main() From c9e5532d021045fa0bbf8a21d8edfa4cb06fab2e Mon Sep 17 00:00:00 2001 From: OutisLi Date: Mon, 12 Jan 2026 11:00:48 +0800 Subject: [PATCH 02/18] adopt --- deepmd/pt/train/training.py | 6 ++- deepmd/utils/argcheck.py | 27 +++++++++---- source/tests/pt/test_sampler.py | 69 ++++++++++++++++++++------------- 3 files changed, 67 insertions(+), 35 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 90e4a1a845..a01ec06f16 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -304,13 +304,17 @@ def resolve_model_prob( model_training_data: dict[str, DpLoaderSet], ) -> np.ndarray: model_prob = np.zeros(len(model_keys), dtype=np.float64) - if model_prob_config is not None: + if model_prob_config: for ii, model_key in enumerate(model_keys): if model_key in model_prob_config: model_prob[ii] = float(model_prob_config[model_key]) else: for ii, model_key in enumerate(model_keys): model_prob[ii] = float(len(model_training_data[model_key])) + if not np.all(np.isfinite(model_prob)): + raise ValueError("Model prob must be finite.") + if np.any(model_prob < 0.0): + raise ValueError("Model prob must be non-negative.") sum_prob = float(np.sum(model_prob)) if sum_prob <= 0.0: raise ValueError("Sum of model prob must be larger than 0!") diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 73a1ecd854..a3a65c3ba5 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3213,15 +3213,22 @@ def mixed_precision_args() -> list[Argument]: # ! added by Denghui. def training_args( multi_task: bool = False, ) -> list[Argument]: # ! modified by Ziyao: data configuration isolated. - doc_numb_steps = "Number of training batches. Each training uses one batch of data. If set, this value takes precedence over num_epoch." + doc_numb_steps = ( + "Number of training batches. Each training uses one batch of data. " + "If set, this value takes precedence over num_epoch. If both numb_steps " + "and num_epoch are not set, a ValueError is raised." + ) doc_num_epoch = ( - "Number of training epochs. " + "Number of training epochs (can be fractional). " "When numb_steps is not set, the total steps are computed as " - "ceil(num_epoch * total_numb_batch). For each training dataset, " - "total_numb_batch is computed as ceil(max_i(n_bch_i / p_i)), where p_i " - "is the sampling probability of system i after sys_probs/auto_prob. " - "In multi-task mode, total_numb_batch is the model_prob-weighted sum " - "over tasks." + "ceil(num_epoch * total_numb_batch). For each task, total_numb_batch " + "is computed as ceil(max_i(n_bch_i / p_i)), where n_bch_i is the number " + "of batches for system i and p_i is the sampling probability after " + "sys_probs/auto_prob normalization. In multi-task mode, model_prob is " + "normalized to sum to 1, per-task total_numb_batch values are computed " + "as above, and the final total_numb_batch is their model_prob-weighted " + "sum. At least one of numb_steps or num_epoch must be set; otherwise a " + "ValueError is raised." ) doc_seed = "The random seed for getting frames from the training data set." doc_disp_file = "The file for printing learning curve." @@ -3323,7 +3330,11 @@ def training_args( args += [ mixed_precision_data, Argument( - "numb_steps", int, optional=True, doc=doc_numb_steps, alias=["stop_batch"] + "numb_steps", + int, + optional=True, + doc=doc_numb_steps, + alias=["stop_batch", "num_steps"], ), Argument( "num_epoch", diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index 0948073d87..acc6a3f0b9 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -17,11 +17,6 @@ from deepmd.pt.utils import ( dp_random, ) -from deepmd.pt.utils.dataloader import ( - DpLoaderSet, - get_sampler_from_params, - get_weighted_sampler, -) from deepmd.tf.common import ( expand_sys_str, ) @@ -67,7 +62,7 @@ def setUp(self) -> None: self.systems = config["training"]["validation_data"]["systems"] if isinstance(self.systems, str): self.systems = expand_sys_str(self.systems) - self.my_dataset = DpLoaderSet( + self.my_dataset = pt_dataloader.DpLoaderSet( self.systems, self.batch_size, self.type_map, @@ -81,7 +76,9 @@ def setUp(self) -> None: def tearDown(self) -> None: self._monkeypatch.undo() - def _make_dataloader(self, dataset: DpLoaderSet, sampler) -> DataLoader: + def _make_dataloader( + self, dataset: pt_dataloader.DpLoaderSet, sampler + ) -> DataLoader: return DataLoader( dataset, sampler=sampler, @@ -96,6 +93,18 @@ def _normalize_probs(self, weights: np.ndarray) -> np.ndarray: return weights / np.sum(weights) def _compute_total_numb_batch(self, nbatches: np.ndarray, probs: np.ndarray) -> int: + # NOTE: This is a simplified test-only variant of training.py logic. + nbatches = np.asarray(nbatches, dtype=np.float64) + probs = np.asarray(probs, dtype=np.float64) + if nbatches.shape != probs.shape: + raise ValueError( + "nbatches and probs must have the same shape in this test helper." + ) + if not np.all(probs > 0.0): + raise ValueError( + "Zero or negative sampling probabilities are not supported in this " + "test helper." + ) return int(np.ceil(np.max(nbatches / probs))) def _sample_sid_counts( @@ -156,7 +165,9 @@ def _sample_multitask_counts( def test_sampler_debug_info(self) -> None: dataloader = DataLoader( self.my_dataset, - sampler=get_weighted_sampler(self.my_dataset, prob_style="prob_sys_size"), + sampler=pt_dataloader.get_weighted_sampler( + self.my_dataset, prob_style="prob_sys_size" + ), batch_size=None, num_workers=0, # setting to 0 diverges the behavior of its iterator; should be >=1 drop_last=False, @@ -171,7 +182,9 @@ def test_sampler_debug_info(self) -> None: def test_auto_prob_uniform(self) -> None: auto_prob_style = "prob_uniform" - sampler = get_weighted_sampler(self.my_dataset, prob_style=auto_prob_style) + sampler = pt_dataloader.get_weighted_sampler( + self.my_dataset, prob_style=auto_prob_style + ) my_probs = np.array(sampler.weights) self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style) dp_probs = np.array(self.dp_dataset.sys_probs) @@ -179,7 +192,9 @@ def test_auto_prob_uniform(self) -> None: def test_auto_prob_sys_size(self) -> None: auto_prob_style = "prob_sys_size" - sampler = get_weighted_sampler(self.my_dataset, prob_style=auto_prob_style) + sampler = pt_dataloader.get_weighted_sampler( + self.my_dataset, prob_style=auto_prob_style + ) my_probs = np.array(sampler.weights) self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style) dp_probs = np.array(self.dp_dataset.sys_probs) @@ -187,7 +202,9 @@ def test_auto_prob_sys_size(self) -> None: def test_auto_prob_sys_size_ext(self) -> None: auto_prob_style = "prob_sys_size;0:1:0.2;1:3:0.8" - sampler = get_weighted_sampler(self.my_dataset, prob_style=auto_prob_style) + sampler = pt_dataloader.get_weighted_sampler( + self.my_dataset, prob_style=auto_prob_style + ) my_probs = np.array(sampler.weights) self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style) dp_probs = np.array(self.dp_dataset.sys_probs) @@ -195,7 +212,7 @@ def test_auto_prob_sys_size_ext(self) -> None: def test_sys_probs(self) -> None: sys_probs = [0.1, 0.4, 0.5] - sampler = get_weighted_sampler( + sampler = pt_dataloader.get_weighted_sampler( self.my_dataset, prob_style=sys_probs, sys_prob=True ) my_probs = np.array(sampler.weights) @@ -209,7 +226,7 @@ def test_sys_probs_end2end(self): "sys_probs": sys_probs, "auto_prob": "prob_sys_size", } # use sys_probs first - sampler = get_sampler_from_params(self.my_dataset, _params) + sampler = pt_dataloader.get_sampler_from_params(self.my_dataset, _params) my_probs = np.array(sampler.weights) self.dp_dataset.set_sys_probs(sys_probs=sys_probs) dp_probs = np.array(self.dp_dataset.sys_probs) @@ -218,7 +235,7 @@ def test_sys_probs_end2end(self): def test_auto_prob_sys_size_ext_end2end(self): auto_prob_style = "prob_sys_size;0:1:0.2;1:3:0.8" _params = {"sys_probs": None, "auto_prob": auto_prob_style} # use auto_prob - sampler = get_sampler_from_params(self.my_dataset, _params) + sampler = pt_dataloader.get_sampler_from_params(self.my_dataset, _params) my_probs = np.array(sampler.weights) self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style) dp_probs = np.array(self.dp_dataset.sys_probs) @@ -231,7 +248,7 @@ def test_sampling_stability_single_task(self) -> None: str(Path(__file__).parent / "water/data/data_1"), str(Path(__file__).parent / "water/data/single"), ] - dataset_epoch = DpLoaderSet( + dataset_epoch = pt_dataloader.DpLoaderSet( systems, self.batch_size, self.type_map, @@ -240,7 +257,7 @@ def test_sampling_stability_single_task(self) -> None: ) sys_probs = [0.2, 0.3, 0.5] params = {"sys_probs": sys_probs, "auto_prob": "prob_sys_size"} - sampler_epoch = get_sampler_from_params(dataset_epoch, params) + sampler_epoch = pt_dataloader.get_sampler_from_params(dataset_epoch, params) probs = self._normalize_probs(np.asarray(sampler_epoch.weights)) nbatches = np.asarray(dataset_epoch.index, dtype=np.float64) total_numb_batch = self._compute_total_numb_batch(nbatches, probs) @@ -257,14 +274,14 @@ def test_sampling_stability_single_task(self) -> None: self.assertTrue(np.allclose(empirical_epoch, probs, atol=0.1)) # === Step 3. Sample Using Explicit Steps === - dataset_steps = DpLoaderSet( + dataset_steps = pt_dataloader.DpLoaderSet( systems, self.batch_size, self.type_map, seed=10, shuffle=False, ) - sampler_steps = get_sampler_from_params(dataset_steps, params) + sampler_steps = pt_dataloader.get_sampler_from_params(dataset_steps, params) torch.manual_seed(123) dataloader_steps = self._make_dataloader(dataset_steps, sampler_steps) counts_steps = self._sample_sid_counts( @@ -283,24 +300,24 @@ def test_sampling_stability_multi_task(self) -> None: str(Path(__file__).parent / "water/data/data_1"), str(Path(__file__).parent / "water/data/single"), ] - dataset_1 = DpLoaderSet( + dataset_1 = pt_dataloader.DpLoaderSet( systems_1, self.batch_size, self.type_map, seed=10, shuffle=False, ) - dataset_2 = DpLoaderSet( + dataset_2 = pt_dataloader.DpLoaderSet( systems_2, self.batch_size, self.type_map, seed=10, shuffle=False, ) - sampler_1 = get_sampler_from_params( + sampler_1 = pt_dataloader.get_sampler_from_params( dataset_1, {"sys_probs": [0.7, 0.3], "auto_prob": "prob_sys_size"} ) - sampler_2 = get_sampler_from_params( + sampler_2 = pt_dataloader.get_sampler_from_params( dataset_2, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} ) probs_1 = self._normalize_probs(np.asarray(sampler_1.weights)) @@ -352,24 +369,24 @@ def test_sampling_stability_multi_task(self) -> None: ) # === Step 3. Sample Using Explicit Steps === - dataset_1b = DpLoaderSet( + dataset_1b = pt_dataloader.DpLoaderSet( systems_1, self.batch_size, self.type_map, seed=10, shuffle=False, ) - dataset_2b = DpLoaderSet( + dataset_2b = pt_dataloader.DpLoaderSet( systems_2, self.batch_size, self.type_map, seed=10, shuffle=False, ) - sampler_1b = get_sampler_from_params( + sampler_1b = pt_dataloader.get_sampler_from_params( dataset_1b, {"sys_probs": [0.7, 0.3], "auto_prob": "prob_sys_size"} ) - sampler_2b = get_sampler_from_params( + sampler_2b = pt_dataloader.get_sampler_from_params( dataset_2b, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} ) dataloaders_steps = { From c6fe4e5755646084df7c97facaba1fa47c8e0d71 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Mon, 12 Jan 2026 13:02:23 +0800 Subject: [PATCH 03/18] tf & pd --- deepmd/pd/train/training.py | 115 +++++++++++++++++++++++---- deepmd/pt/train/training.py | 9 +++ deepmd/tf/entrypoints/change_bias.py | 43 +++++++++- deepmd/tf/entrypoints/train.py | 59 +++++++++++++- deepmd/utils/argcheck.py | 8 +- 5 files changed, 214 insertions(+), 20 deletions(-) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index 7ba0255494..2178ee1b0b 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -132,7 +132,8 @@ def __init__( self.num_model = len(self.model_keys) # Iteration config - self.num_steps = training_params["numb_steps"] + self.num_steps = training_params.get("numb_steps") + self.num_epoch = training_params.get("num_epoch") self.acc_freq: int = training_params.get( "acc_freq", 1 ) # gradient accumulation steps @@ -210,6 +211,52 @@ def get_dataloader_and_buffer( valid_numb_batch, ) + def compute_total_numb_batch(numb_batches, sampler_weights) -> int: + weights = np.asarray(sampler_weights, dtype=np.float64) + if weights.ndim != 1: + raise ValueError("Sampler weights must be 1D.") + if weights.size == 0: + raise ValueError("Sampler weights are empty.") + if not np.all(np.isfinite(weights)): + raise ValueError("Sampler weights must be finite.") + if np.any(weights < 0.0): + raise ValueError("Sampler weights must be non-negative.") + weight_sum = float(np.sum(weights)) + if weight_sum <= 0.0: + raise ValueError("Sampler weights must sum to a positive value.") + probs = weights / weight_sum + nbatches = np.asarray(numb_batches, dtype=np.float64) + if nbatches.shape[0] != probs.shape[0]: + raise ValueError("Number of batches and sampler weights must match.") + valid = probs > 0.0 + if not np.any(valid): + raise ValueError( + "Sampler probabilities must contain at least one positive entry." + ) + return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) + + def resolve_model_prob( + model_keys, + model_prob_config, + model_training_data, + ) -> np.ndarray: + model_prob = np.zeros(len(model_keys), dtype=np.float64) + if model_prob_config: + for ii, model_key in enumerate(model_keys): + if model_key in model_prob_config: + model_prob[ii] = float(model_prob_config[model_key]) + else: + for ii, model_key in enumerate(model_keys): + model_prob[ii] = float(len(model_training_data[model_key])) + if not np.all(np.isfinite(model_prob)): + raise ValueError("Model prob must be finite.") + if np.any(model_prob < 0.0): + raise ValueError("Model prob must be non-negative.") + sum_prob = float(np.sum(model_prob)) + if sum_prob <= 0.0: + raise ValueError("Sum of model prob must be larger than 0!") + return model_prob / sum_prob + def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -390,6 +437,57 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ), ) + if not self.multi_task: + sampler_weights = to_numpy_array( + self.training_dataloader.batch_sampler.sampler.weights + ) + total_numb_batch = compute_total_numb_batch( + training_data.index, + sampler_weights, + ) + else: + per_task_total = [] + for model_key in self.model_keys: + sampler_weights = to_numpy_array( + self.training_dataloader[model_key].batch_sampler.sampler.weights + ) + per_task_total.append( + compute_total_numb_batch( + training_data[model_key].index, + sampler_weights, + ) + ) + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, + ) + total_numb_batch = int( + np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) + ) + if self.num_steps is None: + if self.num_epoch is None: + raise ValueError( + "Either training.numb_steps or training.num_epoch must be set." + ) + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError("Total number of training batches must be positive.") + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) + elif self.num_epoch is not None: + log.warning( + "Both training.numb_steps and training.num_epoch are set; " + "using numb_steps=%d.", + self.num_steps, + ) + # Learning rate self.warmup_steps = training_params.get("warmup_steps", 0) self.gradient_max_norm = training_params.get("gradient_max_norm", 0.0) @@ -682,21 +780,6 @@ def warm_up_linear(step: int, warmup_steps: int) -> float: ) self.optimizer = fleet.distributed_optimizer(self.optimizer) - # Get model prob for multi-task - if self.multi_task: - self.model_prob = np.array([0.0 for key in self.model_keys]) - if training_params.get("model_prob", None) is not None: - model_prob = training_params["model_prob"] - for ii, model_key in enumerate(self.model_keys): - if model_key in model_prob: - self.model_prob[ii] += float(model_prob[model_key]) - else: - for ii, model_key in enumerate(self.model_keys): - self.model_prob[ii] += float(len(self.training_data[model_key])) - sum_prob = np.sum(self.model_prob) - assert sum_prob > 0.0, "Sum of model prob must be larger than 0!" - self.model_prob = self.model_prob / sum_prob - # Tensorboard self.enable_tensorboard = training_params.get("tensorboard", False) self.tensorboard_log_dir = training_params.get("tensorboard_log_dir", "log") diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index a01ec06f16..0b2b9bfeac 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -284,6 +284,10 @@ def compute_total_numb_batch( raise ValueError("Sampler weights must be 1D.") if weights.size == 0: raise ValueError("Sampler weights are empty.") + if not np.all(np.isfinite(weights)): + raise ValueError("Sampler weights must be finite.") + if np.any(weights < 0.0): + raise ValueError("Sampler weights must be non-negative.") weight_sum = float(np.sum(weights)) if weight_sum <= 0.0: raise ValueError("Sampler weights must sum to a positive value.") @@ -309,6 +313,11 @@ def resolve_model_prob( if model_key in model_prob_config: model_prob[ii] = float(model_prob_config[model_key]) else: + if self.rank == 0: + log.info( + "training.model_prob is not set or empty; defaulting to the " + "number of systems per task." + ) for ii, model_key in enumerate(model_keys): model_prob[ii] = float(len(model_training_data[model_key])) if not np.all(np.isfinite(model_prob)): diff --git a/deepmd/tf/entrypoints/change_bias.py b/deepmd/tf/entrypoints/change_bias.py index bafbf674b4..4abfe3bc11 100644 --- a/deepmd/tf/entrypoints/change_bias.py +++ b/deepmd/tf/entrypoints/change_bias.py @@ -190,7 +190,48 @@ def _change_bias_checkpoint_file( data = _load_data_systems(datafile, system, trainer) # Get stop_batch and origin_type_map like in train.py - stop_batch = jdata.get("training", {}).get("numb_steps", 0) + def compute_total_numb_batch(nbatches, sys_probs) -> int: + weights = np.asarray(sys_probs, dtype=np.float64) + if weights.ndim != 1: + raise ValueError("Sampler probabilities must be 1D.") + if weights.size == 0: + raise ValueError("Sampler probabilities are empty.") + if not np.all(np.isfinite(weights)): + raise ValueError("Sampler probabilities must be finite.") + if np.any(weights < 0.0): + raise ValueError("Sampler probabilities must be non-negative.") + weight_sum = float(np.sum(weights)) + if weight_sum <= 0.0: + raise ValueError("Sampler probabilities must sum to a positive value.") + probs = weights / weight_sum + nbatches = np.asarray(nbatches, dtype=np.float64) + if nbatches.shape[0] != probs.shape[0]: + raise ValueError("Number of batches and sampler probabilities must match.") + valid = probs > 0.0 + if not np.any(valid): + raise ValueError( + "Sampler probabilities must contain at least one positive entry." + ) + return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) + + training_params = jdata.get("training", {}) + stop_batch = training_params.get("numb_steps") + num_epoch = training_params.get("num_epoch") + if stop_batch is None and num_epoch is not None: + if num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + total_numb_batch = compute_total_numb_batch(data.nbatches, data.sys_probs) + if total_numb_batch <= 0: + raise ValueError("Total number of training batches must be positive.") + stop_batch = int(np.ceil(num_epoch * total_numb_batch)) + log.info( + "Computed numb_steps=%d from num_epoch=%s and total_numb_batch=%d.", + stop_batch, + num_epoch, + total_numb_batch, + ) + if stop_batch is None: + stop_batch = 0 origin_type_map = jdata["model"].get("origin_type_map", None) if origin_type_map is not None and not origin_type_map: # get the type_map from data if not provided diff --git a/deepmd/tf/entrypoints/train.py b/deepmd/tf/entrypoints/train.py index 817a1baf3b..d3a4d8a099 100755 --- a/deepmd/tf/entrypoints/train.py +++ b/deepmd/tf/entrypoints/train.py @@ -12,6 +12,8 @@ Any, ) +import numpy as np + from deepmd.common import ( j_loader, ) @@ -252,7 +254,62 @@ def _do_work( modifier.build_fv_graph() # get training info - stop_batch = jdata["training"]["numb_steps"] + def compute_total_numb_batch(nbatches, sys_probs) -> int: + weights = np.asarray(sys_probs, dtype=np.float64) + if weights.ndim != 1: + raise ValueError("Sampler probabilities must be 1D.") + if weights.size == 0: + raise ValueError("Sampler probabilities are empty.") + if not np.all(np.isfinite(weights)): + raise ValueError("Sampler probabilities must be finite.") + if np.any(weights < 0.0): + raise ValueError("Sampler probabilities must be non-negative.") + weight_sum = float(np.sum(weights)) + if weight_sum <= 0.0: + raise ValueError("Sampler probabilities must sum to a positive value.") + probs = weights / weight_sum + nbatches = np.asarray(nbatches, dtype=np.float64) + if nbatches.shape[0] != probs.shape[0]: + raise ValueError("Number of batches and sampler probabilities must match.") + valid = probs > 0.0 + if not np.any(valid): + raise ValueError( + "Sampler probabilities must contain at least one positive entry." + ) + return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) + + training_params = jdata["training"] + stop_batch = training_params.get("numb_steps") + num_epoch = training_params.get("num_epoch") + if stop_batch is None: + if num_epoch is None: + raise ValueError( + "Either training.numb_steps or training.num_epoch must be set." + ) + if num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if train_data is None: + raise ValueError( + "training.num_epoch requires training data to compute total_numb_batch." + ) + total_numb_batch = compute_total_numb_batch( + train_data.nbatches, train_data.sys_probs + ) + if total_numb_batch <= 0: + raise ValueError("Total number of training batches must be positive.") + stop_batch = int(np.ceil(num_epoch * total_numb_batch)) + log.info( + "Computed numb_steps=%d from num_epoch=%s and total_numb_batch=%d.", + stop_batch, + num_epoch, + total_numb_batch, + ) + elif num_epoch is not None: + log.warning( + "Both training.numb_steps and training.num_epoch are set; " + "using numb_steps=%d.", + stop_batch, + ) origin_type_map = jdata["model"].get("origin_type_map", None) if ( origin_type_map is not None and not origin_type_map diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index a3a65c3ba5..fdea38cc35 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3286,7 +3286,11 @@ def training_args( ) doc_opt_type = "The type of optimizer to use." doc_kf_blocksize = "The blocksize for the Kalman filter." - doc_model_prob = "The visiting probability of each model for each training step in the multi-task mode." + doc_model_prob = ( + "The visiting probability of each model for each training step in the " + "multi-task mode. If not set or an empty dict, defaults to weights " + "proportional to the number of systems per task." + ) doc_data_dict = "The multiple definition of the data, used in the multi-task mode." doc_acc_freq = "Gradient accumulation steps (number of steps to accumulate gradients before performing an update)." doc_zero_stage = ( @@ -3340,7 +3344,7 @@ def training_args( "num_epoch", [int, float], optional=True, - doc=doc_only_pt_supported + doc_num_epoch, + doc=doc_num_epoch, ), Argument("seed", [int, None], optional=True, doc=doc_seed), Argument( From 01be4a56bbb28eacb02df020e4cc2952ebbee8be Mon Sep 17 00:00:00 2001 From: OutisLi Date: Mon, 12 Jan 2026 13:23:41 +0800 Subject: [PATCH 04/18] fix --- deepmd/pt/train/training.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 0b2b9bfeac..997151f789 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -309,6 +309,11 @@ def resolve_model_prob( ) -> np.ndarray: model_prob = np.zeros(len(model_keys), dtype=np.float64) if model_prob_config: + missing = [k for k in model_keys if k not in model_prob_config] + if missing: + raise ValueError( + f"training.model_prob must specify all tasks; missing: {missing}" + ) for ii, model_key in enumerate(model_keys): if model_key in model_prob_config: model_prob[ii] = float(model_prob_config[model_key]) From 6158e21b073c3d195a77676854e743809003f4c1 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 12:44:49 +0800 Subject: [PATCH 05/18] add numb_epoch alias for num_epoch --- deepmd/utils/argcheck.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index fdea38cc35..6f73a34476 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3345,6 +3345,7 @@ def training_args( [int, float], optional=True, doc=doc_num_epoch, + alias=["numb_epoch"], ), Argument("seed", [int, None], optional=True, doc=doc_seed), Argument( From e73e99e37e6740c8fe99f880d7784846cf0ebe3e Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 13:06:48 +0800 Subject: [PATCH 06/18] refactor docstring --- deepmd/utils/argcheck.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 6f73a34476..c7640f344c 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3226,9 +3226,13 @@ def training_args( "of batches for system i and p_i is the sampling probability after " "sys_probs/auto_prob normalization. In multi-task mode, model_prob is " "normalized to sum to 1, per-task total_numb_batch values are computed " - "as above, and the final total_numb_batch is their model_prob-weighted " - "sum. At least one of numb_steps or num_epoch must be set; otherwise a " - "ValueError is raised." + "as above, and the final total_numb_batch is their model_prob-weighted sum. " + "Note that in multi-task mode, this defines an 'expected epoch' where each " + "sample is visited once in expectation across all tasks, rather than a " + "full epoch for each individual task. For multi-task pretraining scenarios " + "where different tasks require different numbers of visits, using numb_steps " + "directly is recommended for more explicit control. At least one of numb_steps " + "or num_epoch must be set; otherwise a ValueError is raised." ) doc_seed = "The random seed for getting frames from the training data set." doc_disp_file = "The file for printing learning curve." From 3563492f9666ab689dde57cb2fcbf1cece5d4ee0 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 13:22:09 +0800 Subject: [PATCH 07/18] add num_epoch_dict for multitask training --- deepmd/pd/train/training.py | 70 +++++++++++++++----- deepmd/pt/train/training.py | 70 +++++++++++++++----- deepmd/utils/argcheck.py | 31 +++++++-- doc/train/multi-task-training.md | 8 +++ source/tests/pt/test_sampler.py | 107 +++++++++++++++++++++++++++++++ 5 files changed, 252 insertions(+), 34 deletions(-) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index 2178ee1b0b..38a2397d20 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -134,6 +134,7 @@ def __init__( # Iteration config self.num_steps = training_params.get("numb_steps") self.num_epoch = training_params.get("num_epoch") + self.num_epoch_dict = training_params.get("num_epoch_dict") self.acc_freq: int = training_params.get( "acc_freq", 1 ) # gradient accumulation steps @@ -466,24 +467,63 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) ) if self.num_steps is None: - if self.num_epoch is None: + # === Step 1. Check num_epoch_dict first (multi-task only) === + if self.multi_task and self.num_epoch_dict: + missing = [k for k in self.model_keys if k not in self.num_epoch_dict] + if missing: + raise ValueError( + f"training.num_epoch_dict must specify all tasks; missing: {missing}" + ) + # Validate epoch values + for model_key in self.model_keys: + epoch_value = self.num_epoch_dict[model_key] + if epoch_value is not None and epoch_value <= 0: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." + ) + # Compute steps needed for each task to complete its epochs + per_task_steps = [] + for ii, model_key in enumerate(self.model_keys): + epoch_value = self.num_epoch_dict[model_key] + if epoch_value is not None: + # steps_i = epoch_i * per_task_total[i] / model_prob[i] + steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] + per_task_steps.append(steps_i) + self.num_steps = int(np.ceil(np.max(per_task_steps))) + log.info( + "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", + self.num_steps, + self.num_epoch_dict, + { + k: int(np.ceil(v)) + for k, v in zip(self.model_keys, per_task_steps) + }, + ) + # === Step 2. Fall back to num_epoch === + elif self.num_epoch is None: raise ValueError( - "Either training.numb_steps or training.num_epoch must be set." + "Either training.numb_steps, training.num_epoch, or " + "training.num_epoch_dict (multi-task only) must be set." ) - if self.num_epoch <= 0: - raise ValueError("training.num_epoch must be positive.") - if total_numb_batch <= 0: - raise ValueError("Total number of training batches must be positive.") - self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) - log.info( - "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", - self.num_steps, - self.num_epoch, - total_numb_batch, - ) - elif self.num_epoch is not None: + else: + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError( + "Total number of training batches must be positive." + ) + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) + elif self.num_epoch is not None or ( + self.multi_task and self.num_epoch_dict is not None + ): log.warning( - "Both training.numb_steps and training.num_epoch are set; " + "Both training.numb_steps and training.num_epoch (or num_epoch_dict) are set; " "using numb_steps=%d.", self.num_steps, ) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 997151f789..7bc0e54473 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -152,6 +152,7 @@ def __init__( # Iteration config self.num_steps = training_params.get("numb_steps") self.num_epoch = training_params.get("num_epoch") + self.num_epoch_dict = training_params.get("num_epoch_dict") self.disp_file = training_params.get("disp_file", "lcurve.out") self.disp_freq = training_params.get("disp_freq", 1000) self.disp_avg = training_params.get("disp_avg", False) @@ -551,24 +552,63 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) ) if self.num_steps is None: - if self.num_epoch is None: + # === Step 1. Check num_epoch_dict first (multi-task only) === + if self.multi_task and self.num_epoch_dict: + missing = [k for k in self.model_keys if k not in self.num_epoch_dict] + if missing: + raise ValueError( + f"training.num_epoch_dict must specify all tasks; missing: {missing}" + ) + # Validate epoch values + for model_key in self.model_keys: + epoch_value = self.num_epoch_dict[model_key] + if epoch_value is not None and epoch_value <= 0: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." + ) + # Compute steps needed for each task to complete its epochs + per_task_steps = [] + for ii, model_key in enumerate(self.model_keys): + epoch_value = self.num_epoch_dict[model_key] + if epoch_value is not None: + # steps_i = epoch_i * per_task_total[i] / model_prob[i] + steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] + per_task_steps.append(steps_i) + self.num_steps = int(np.ceil(np.max(per_task_steps))) + log.info( + "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", + self.num_steps, + self.num_epoch_dict, + { + k: int(np.ceil(v)) + for k, v in zip(self.model_keys, per_task_steps) + }, + ) + # === Step 2. Fall back to num_epoch === + elif self.num_epoch is None: raise ValueError( - "Either training.numb_steps or training.num_epoch must be set." + "Either training.numb_steps, training.num_epoch, or " + "training.num_epoch_dict (multi-task only) must be set." ) - if self.num_epoch <= 0: - raise ValueError("training.num_epoch must be positive.") - if total_numb_batch <= 0: - raise ValueError("Total number of training batches must be positive.") - self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) - log.info( - "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", - self.num_steps, - self.num_epoch, - total_numb_batch, - ) - elif self.num_epoch is not None: + else: + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError( + "Total number of training batches must be positive." + ) + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) + elif self.num_epoch is not None or ( + self.multi_task and self.num_epoch_dict is not None + ): log.warning( - "Both training.numb_steps and training.num_epoch are set; " + "Both training.numb_steps and training.num_epoch (or num_epoch_dict) are set; " "using numb_steps=%d.", self.num_steps, ) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index c7640f344c..4ceb8c05eb 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3229,10 +3229,26 @@ def training_args( "as above, and the final total_numb_batch is their model_prob-weighted sum. " "Note that in multi-task mode, this defines an 'expected epoch' where each " "sample is visited once in expectation across all tasks, rather than a " - "full epoch for each individual task. For multi-task pretraining scenarios " - "where different tasks require different numbers of visits, using numb_steps " - "directly is recommended for more explicit control. At least one of numb_steps " - "or num_epoch must be set; otherwise a ValueError is raised." + "full epoch for each individual task. In multi-task mode, num_epoch_dict " + "takes precedence over num_epoch if both are set. For multi-task pretraining " + "scenarios where different tasks require different numbers of visits, using " + "numb_steps directly is recommended for more explicit control. At least one " + "of numb_steps or num_epoch (or num_epoch_dict in multi-task mode) must be " + "set; otherwise a ValueError is raised." + ) + doc_num_epoch_dict = ( + "Number of training epochs for each model branch in multi-task mode " + "(can be fractional). This is a dictionary mapping model keys to the " + "number of epochs to train that specific model. When set, the total " + "training steps are computed as max_i(num_epoch_dict[i] * per_task_total[i] / model_prob[i]), " + "ensuring each model completes at least its specified number of epochs. " + "The model requiring the most steps will complete approximately its target " + "epochs, while other models may complete more epochs. This is particularly " + "useful for multi-task fine-tuning scenarios where a data-rich pretrained model " + "is jointly trained with a data-scarce downstream task, and only the downstream " + "task's epoch count is of interest. In multi-task mode, this parameter takes " + "precedence over num_epoch if both are set. All model keys must be specified " + "in the dictionary." ) doc_seed = "The random seed for getting frames from the training data set." doc_disp_file = "The file for printing learning curve." @@ -3331,6 +3347,13 @@ def training_args( if not multi_task else [ Argument("model_prob", dict, optional=True, default={}, doc=doc_model_prob), + Argument( + "num_epoch_dict", + dict, + optional=True, + default={}, + doc=doc_num_epoch_dict, + ), Argument("data_dict", dict, data_args, repeat=True, doc=doc_data_dict), ] ) diff --git a/doc/train/multi-task-training.md b/doc/train/multi-task-training.md index 115c463cc2..867b1fef69 100644 --- a/doc/train/multi-task-training.md +++ b/doc/train/multi-task-training.md @@ -81,6 +81,14 @@ Specifically, there are several parts that need to be modified: You can specify any positive real number weight for each task. The higher the weight, the higher the probability of being sampled in each training. This setting is optional, and if not set, tasks will be sampled with equal weights. +- (Optional) {ref}`training/num_epoch_dict `: The number of training epochs for each model branch, specified as a dictionary mapping `model_key` to epoch values. + This allows different tasks to train for different numbers of epochs, which is particularly useful for multi-task fine-tuning scenarios + where a data-rich pretrained model is jointly trained with a data-scarce downstream task. + When set, the total training steps are computed as `max_i(num_epoch_dict[i] * per_task_total[i] / model_prob[i])`, + ensuring each model completes at least its specified number of epochs. + The model requiring the most steps will complete approximately its target epochs, while other models may complete more epochs. + In multi-task mode, this parameter takes precedence over `num_epoch` if both are set. + An example input for multi-task training two models in water system is shown as following: ```{literalinclude} ../../examples/water_multi_task/pytorch_example/input_torch.json diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index acc6a3f0b9..b7084c415e 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -410,6 +410,113 @@ def test_sampling_stability_multi_task(self) -> None: ) ) + def test_num_epoch_dict(self) -> None: + """Test num_epoch_dict calculation logic for multi-task training.""" + # === Step 1. Build Datasets === + model_keys = ["model_1", "model_2"] + systems_1 = [ + str(Path(__file__).parent / "water/data/data_0"), + str(Path(__file__).parent / "water/data/data_1"), + ] + systems_2 = [ + str(Path(__file__).parent / "water/data/data_1"), + str(Path(__file__).parent / "water/data/single"), + ] + dataset_1 = pt_dataloader.DpLoaderSet( + systems_1, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + dataset_2 = pt_dataloader.DpLoaderSet( + systems_2, + self.batch_size, + self.type_map, + seed=10, + shuffle=False, + ) + sampler_1 = pt_dataloader.get_sampler_from_params( + dataset_1, {"sys_probs": [0.7, 0.3], "auto_prob": "prob_sys_size"} + ) + sampler_2 = pt_dataloader.get_sampler_from_params( + dataset_2, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} + ) + probs_1 = self._normalize_probs(np.asarray(sampler_1.weights)) + probs_2 = self._normalize_probs(np.asarray(sampler_2.weights)) + + # === Step 2. Compute per-task total_numb_batch === + per_task_total = np.array( + [ + self._compute_total_numb_batch( + np.asarray(dataset_1.index, dtype=np.float64), probs_1 + ), + self._compute_total_numb_batch( + np.asarray(dataset_2.index, dtype=np.float64), probs_2 + ), + ], + dtype=np.float64, + ) + + # === Step 3. Test num_epoch_dict calculation === + model_prob = np.asarray([0.4, 0.6], dtype=np.float64) + model_prob = model_prob / np.sum(model_prob) + num_epoch_dict = {model_keys[0]: 2.0, model_keys[1]: 5.0} + + # Compute expected steps for each task + # steps_i = epoch_i * per_task_total[i] / model_prob[i] + per_task_steps = np.array( + [ + num_epoch_dict[model_keys[0]] * per_task_total[0] / model_prob[0], + num_epoch_dict[model_keys[1]] * per_task_total[1] / model_prob[1], + ], + dtype=np.float64, + ) + + # Total steps should be max of per-task steps + expected_num_steps = int(np.ceil(np.max(per_task_steps))) + + # Verify the calculation matches the expected formula + self.assertIsInstance(expected_num_steps, int) + self.assertGreater(expected_num_steps, 0) + + # Verify that running expected_num_steps would give each task at least + # its target epochs (may be more for tasks needing fewer steps) + expected_model_0_counts = expected_num_steps * model_prob[0] + expected_model_1_counts = expected_num_steps * model_prob[1] + + # Each task should complete at least its target epochs + expected_epochs_0 = expected_model_0_counts / per_task_total[0] + expected_epochs_1 = expected_model_1_counts / per_task_total[1] + + self.assertGreaterEqual( + expected_epochs_0, + num_epoch_dict[model_keys[0]], + msg="Model 0 should complete at least 2 epochs", + ) + self.assertGreaterEqual( + expected_epochs_1, + num_epoch_dict[model_keys[1]], + msg="Model 1 should complete at least 5 epochs", + ) + + # The task requiring the most steps should complete approximately its target + max_task_idx = int(np.argmax(per_task_steps)) + if max_task_idx == 0: + self.assertAlmostEqual( + expected_epochs_0, + num_epoch_dict[model_keys[0]], + delta=0.1, + msg="Model 0 (max steps) should complete approximately 2 epochs", + ) + else: + self.assertAlmostEqual( + expected_epochs_1, + num_epoch_dict[model_keys[1]], + delta=0.1, + msg="Model 1 (max steps) should complete approximately 5 epochs", + ) + if __name__ == "__main__": unittest.main() From 857c8b186a448f32e0f8bc5d3f93ae5b14d6cfaf Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 13:58:06 +0800 Subject: [PATCH 08/18] fix --- deepmd/pd/train/training.py | 34 ++++++++++++++++++++++++++-------- deepmd/pt/train/training.py | 29 +++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index 38a2397d20..385f8db2c9 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -227,6 +227,14 @@ def compute_total_numb_batch(numb_batches, sampler_weights) -> int: raise ValueError("Sampler weights must sum to a positive value.") probs = weights / weight_sum nbatches = np.asarray(numb_batches, dtype=np.float64) + if nbatches.ndim != 1: + raise ValueError("Number of batches must be 1D.") + if nbatches.size == 0: + raise ValueError("Number of batches is empty.") + if not np.all(np.isfinite(nbatches)): + raise ValueError("Number of batches must be finite.") + if np.any(nbatches < 0.0): + raise ValueError("Number of batches must be non-negative.") if nbatches.shape[0] != probs.shape[0]: raise ValueError("Number of batches and sampler weights must match.") valid = probs > 0.0 @@ -243,6 +251,11 @@ def resolve_model_prob( ) -> np.ndarray: model_prob = np.zeros(len(model_keys), dtype=np.float64) if model_prob_config: + missing = [k for k in model_keys if k not in model_prob_config] + if missing: + raise ValueError( + f"training.model_prob must specify all tasks; missing: {missing}" + ) for ii, model_key in enumerate(model_keys): if model_key in model_prob_config: model_prob[ii] = float(model_prob_config[model_key]) @@ -438,6 +451,7 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ), ) + per_task_total = [] if not self.multi_task: sampler_weights = to_numpy_array( self.training_dataloader.batch_sampler.sampler.weights @@ -447,7 +461,6 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: sampler_weights, ) else: - per_task_total = [] for model_key in self.model_keys: sampler_weights = to_numpy_array( self.training_dataloader[model_key].batch_sampler.sampler.weights @@ -482,22 +495,27 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." ) # Compute steps needed for each task to complete its epochs - per_task_steps = [] + per_task_steps: dict[str, float] = {} for ii, model_key in enumerate(self.model_keys): epoch_value = self.num_epoch_dict[model_key] if epoch_value is not None: + if self.model_prob[ii] <= 0.0: + raise ValueError( + f"training.model_prob['{model_key}'] must be positive when num_epoch_dict targets it." + ) # steps_i = epoch_i * per_task_total[i] / model_prob[i] steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] - per_task_steps.append(steps_i) - self.num_steps = int(np.ceil(np.max(per_task_steps))) + per_task_steps[model_key] = float(steps_i) + if not per_task_steps: + raise ValueError( + "training.num_epoch_dict must have at least one non-null epoch target." + ) + self.num_steps = int(np.ceil(np.max(list(per_task_steps.values())))) log.info( "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", self.num_steps, self.num_epoch_dict, - { - k: int(np.ceil(v)) - for k, v in zip(self.model_keys, per_task_steps) - }, + {k: int(np.ceil(v)) for k, v in per_task_steps.items()}, ) # === Step 2. Fall back to num_epoch === elif self.num_epoch is None: diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 7bc0e54473..6953fd7c31 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -294,6 +294,14 @@ def compute_total_numb_batch( raise ValueError("Sampler weights must sum to a positive value.") probs = weights / weight_sum nbatches = np.asarray(numb_batches, dtype=np.float64) + if nbatches.ndim != 1: + raise ValueError("Number of batches must be 1D.") + if nbatches.size == 0: + raise ValueError("Number of batches is empty.") + if not np.all(np.isfinite(nbatches)): + raise ValueError("Number of batches must be finite.") + if np.any(nbatches < 0.0): + raise ValueError("Number of batches must be non-negative.") if nbatches.shape[0] != probs.shape[0]: raise ValueError("Number of batches and sampler weights must match.") valid = probs > 0.0 @@ -525,6 +533,7 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ) # Resolve training steps + per_task_total = [] if not self.multi_task: sampler_weights = to_numpy_array(self.training_dataloader.sampler.weights) total_numb_batch = compute_total_numb_batch( @@ -532,7 +541,6 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: sampler_weights, ) else: - per_task_total = [] for model_key in self.model_keys: sampler_weights = to_numpy_array( self.training_dataloader[model_key].sampler.weights @@ -567,22 +575,27 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." ) # Compute steps needed for each task to complete its epochs - per_task_steps = [] + per_task_steps: dict[str, float] = {} for ii, model_key in enumerate(self.model_keys): epoch_value = self.num_epoch_dict[model_key] if epoch_value is not None: + if self.model_prob[ii] <= 0.0: + raise ValueError( + f"training.model_prob['{model_key}'] must be positive when num_epoch_dict targets it." + ) # steps_i = epoch_i * per_task_total[i] / model_prob[i] steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] - per_task_steps.append(steps_i) - self.num_steps = int(np.ceil(np.max(per_task_steps))) + per_task_steps[model_key] = float(steps_i) + if not per_task_steps: + raise ValueError( + "training.num_epoch_dict must have at least one non-null epoch target." + ) + self.num_steps = int(np.ceil(np.max(list(per_task_steps.values())))) log.info( "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", self.num_steps, self.num_epoch_dict, - { - k: int(np.ceil(v)) - for k, v in zip(self.model_keys, per_task_steps) - }, + {k: int(np.ceil(v)) for k, v in per_task_steps.items()}, ) # === Step 2. Fall back to num_epoch === elif self.num_epoch is None: From 99d889314813c4a9510cf77ea9451cc97dc8455a Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 16:33:10 +0800 Subject: [PATCH 09/18] refactor --- deepmd/pd/train/training.py | 152 ++++++++++++++++++------------- deepmd/pt/train/training.py | 152 ++++++++++++++++++------------- deepmd/tf/entrypoints/train.py | 6 -- deepmd/utils/argcheck.py | 101 +++++++++++++------- doc/train/multi-task-training.md | 12 +-- source/tests/pt/test_sampler.py | 61 +++++++------ 6 files changed, 278 insertions(+), 206 deletions(-) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index 385f8db2c9..bd05fc76f6 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -271,6 +271,55 @@ def resolve_model_prob( raise ValueError("Sum of model prob must be larger than 0!") return model_prob / sum_prob + def resolve_model_prob_from_epochs( + model_keys, + num_epoch_dict_config, + per_task_total, + ) -> tuple[np.ndarray, int, dict[str, float]]: + if not num_epoch_dict_config: + raise ValueError( + "training.num_epoch_dict must be set for multi-task epochs." + ) + missing = [k for k in model_keys if k not in num_epoch_dict_config] + if missing: + raise ValueError( + "training.num_epoch_dict must specify all tasks; " + f"missing: {missing}" + ) + epoch_targets = np.zeros(len(model_keys), dtype=np.float64) + for ii, model_key in enumerate(model_keys): + epoch_value = num_epoch_dict_config[model_key] + if epoch_value is None: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive." + ) + epoch_value = float(epoch_value) + if not np.isfinite(epoch_value) or epoch_value <= 0.0: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." + ) + epoch_targets[ii] = epoch_value + per_task_total = np.asarray(per_task_total, dtype=np.float64) + if per_task_total.ndim != 1: + raise ValueError("Per-task total batches must be 1D.") + if per_task_total.shape[0] != epoch_targets.shape[0]: + raise ValueError("Per-task totals and epoch targets must match.") + if not np.all(np.isfinite(per_task_total)): + raise ValueError("Per-task total batches must be finite.") + if np.any(per_task_total <= 0.0): + raise ValueError("Per-task total batches must be positive.") + per_task_steps = per_task_total * epoch_targets + total_target_steps = float(np.sum(per_task_steps)) + if total_target_steps <= 0.0: + raise ValueError("Sum of target steps must be positive.") + model_prob = per_task_steps / total_target_steps + num_steps = int(np.ceil(total_target_steps)) + per_task_steps_map = { + model_key: float(per_task_steps[ii]) + for ii, model_key in enumerate(model_keys) + } + return model_prob, num_steps, per_task_steps_map + def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -460,6 +509,24 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: training_data.index, sampler_weights, ) + if self.num_steps is None: + if self.num_epoch is None: + raise ValueError( + "Either training.numb_steps or training.num_epoch must be set." + ) + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError( + "Total number of training batches must be positive." + ) + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) else: for model_key in self.model_keys: sampler_weights = to_numpy_array( @@ -471,80 +538,35 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: sampler_weights, ) ) - self.model_prob = resolve_model_prob( - self.model_keys, - training_params.get("model_prob"), - training_data, - ) - total_numb_batch = int( - np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) - ) - if self.num_steps is None: - # === Step 1. Check num_epoch_dict first (multi-task only) === - if self.multi_task and self.num_epoch_dict: - missing = [k for k in self.model_keys if k not in self.num_epoch_dict] - if missing: - raise ValueError( - f"training.num_epoch_dict must specify all tasks; missing: {missing}" - ) - # Validate epoch values - for model_key in self.model_keys: - epoch_value = self.num_epoch_dict[model_key] - if epoch_value is not None and epoch_value <= 0: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." - ) - # Compute steps needed for each task to complete its epochs - per_task_steps: dict[str, float] = {} - for ii, model_key in enumerate(self.model_keys): - epoch_value = self.num_epoch_dict[model_key] - if epoch_value is not None: - if self.model_prob[ii] <= 0.0: - raise ValueError( - f"training.model_prob['{model_key}'] must be positive when num_epoch_dict targets it." - ) - # steps_i = epoch_i * per_task_total[i] / model_prob[i] - steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] - per_task_steps[model_key] = float(steps_i) - if not per_task_steps: - raise ValueError( - "training.num_epoch_dict must have at least one non-null epoch target." - ) - self.num_steps = int(np.ceil(np.max(list(per_task_steps.values())))) + if self.num_epoch_dict: + ( + self.model_prob, + self.num_steps, + per_task_steps, + ) = resolve_model_prob_from_epochs( + self.model_keys, + self.num_epoch_dict, + np.asarray(per_task_total, dtype=np.float64), + ) log.info( - "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", + "Computed model_prob=%s and num_steps=%d from num_epoch_dict=%s " + "with per-task target steps: %s.", + self.model_prob, self.num_steps, self.num_epoch_dict, {k: int(np.ceil(v)) for k, v in per_task_steps.items()}, ) - # === Step 2. Fall back to num_epoch === - elif self.num_epoch is None: - raise ValueError( - "Either training.numb_steps, training.num_epoch, or " - "training.num_epoch_dict (multi-task only) must be set." - ) else: - if self.num_epoch <= 0: - raise ValueError("training.num_epoch must be positive.") - if total_numb_batch <= 0: + if self.num_steps is None: raise ValueError( - "Total number of training batches must be positive." + "Either training.numb_steps (multi-task only) or " + "training.num_epoch_dict must be set." ) - self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) - log.info( - "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", - self.num_steps, - self.num_epoch, - total_numb_batch, + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, ) - elif self.num_epoch is not None or ( - self.multi_task and self.num_epoch_dict is not None - ): - log.warning( - "Both training.numb_steps and training.num_epoch (or num_epoch_dict) are set; " - "using numb_steps=%d.", - self.num_steps, - ) # Learning rate self.warmup_steps = training_params.get("warmup_steps", 0) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 6953fd7c31..3fe9f59d82 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -343,6 +343,55 @@ def resolve_model_prob( raise ValueError("Sum of model prob must be larger than 0!") return model_prob / sum_prob + def resolve_model_prob_from_epochs( + model_keys: list[str], + num_epoch_dict_config: dict[str, Any], + per_task_total: np.ndarray, + ) -> tuple[np.ndarray, int, dict[str, float]]: + if not num_epoch_dict_config: + raise ValueError( + "training.num_epoch_dict must be set for multi-task epochs." + ) + missing = [k for k in model_keys if k not in num_epoch_dict_config] + if missing: + raise ValueError( + "training.num_epoch_dict must specify all tasks; " + f"missing: {missing}" + ) + epoch_targets = np.zeros(len(model_keys), dtype=np.float64) + for ii, model_key in enumerate(model_keys): + epoch_value = num_epoch_dict_config[model_key] + if epoch_value is None: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive." + ) + epoch_value = float(epoch_value) + if not np.isfinite(epoch_value) or epoch_value <= 0.0: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." + ) + epoch_targets[ii] = epoch_value + per_task_total = np.asarray(per_task_total, dtype=np.float64) + if per_task_total.ndim != 1: + raise ValueError("Per-task total batches must be 1D.") + if per_task_total.shape[0] != epoch_targets.shape[0]: + raise ValueError("Per-task totals and epoch targets must match.") + if not np.all(np.isfinite(per_task_total)): + raise ValueError("Per-task total batches must be finite.") + if np.any(per_task_total <= 0.0): + raise ValueError("Per-task total batches must be positive.") + per_task_steps = per_task_total * epoch_targets + total_target_steps = float(np.sum(per_task_steps)) + if total_target_steps <= 0.0: + raise ValueError("Sum of target steps must be positive.") + model_prob = per_task_steps / total_target_steps + num_steps = int(np.ceil(total_target_steps)) + per_task_steps_map = { + model_key: float(per_task_steps[ii]) + for ii, model_key in enumerate(model_keys) + } + return model_prob, num_steps, per_task_steps_map + def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -540,6 +589,24 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: training_data.index, sampler_weights, ) + if self.num_steps is None: + if self.num_epoch is None: + raise ValueError( + "Either training.numb_steps or training.num_epoch must be set." + ) + if self.num_epoch <= 0: + raise ValueError("training.num_epoch must be positive.") + if total_numb_batch <= 0: + raise ValueError( + "Total number of training batches must be positive." + ) + self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) + log.info( + "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", + self.num_steps, + self.num_epoch, + total_numb_batch, + ) else: for model_key in self.model_keys: sampler_weights = to_numpy_array( @@ -551,80 +618,35 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: sampler_weights, ) ) - self.model_prob = resolve_model_prob( - self.model_keys, - training_params.get("model_prob"), - training_data, - ) - total_numb_batch = int( - np.ceil(np.sum(np.asarray(per_task_total) * self.model_prob)) - ) - if self.num_steps is None: - # === Step 1. Check num_epoch_dict first (multi-task only) === - if self.multi_task and self.num_epoch_dict: - missing = [k for k in self.model_keys if k not in self.num_epoch_dict] - if missing: - raise ValueError( - f"training.num_epoch_dict must specify all tasks; missing: {missing}" - ) - # Validate epoch values - for model_key in self.model_keys: - epoch_value = self.num_epoch_dict[model_key] - if epoch_value is not None and epoch_value <= 0: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." - ) - # Compute steps needed for each task to complete its epochs - per_task_steps: dict[str, float] = {} - for ii, model_key in enumerate(self.model_keys): - epoch_value = self.num_epoch_dict[model_key] - if epoch_value is not None: - if self.model_prob[ii] <= 0.0: - raise ValueError( - f"training.model_prob['{model_key}'] must be positive when num_epoch_dict targets it." - ) - # steps_i = epoch_i * per_task_total[i] / model_prob[i] - steps_i = epoch_value * per_task_total[ii] / self.model_prob[ii] - per_task_steps[model_key] = float(steps_i) - if not per_task_steps: - raise ValueError( - "training.num_epoch_dict must have at least one non-null epoch target." - ) - self.num_steps = int(np.ceil(np.max(list(per_task_steps.values())))) + if self.num_epoch_dict: + ( + self.model_prob, + self.num_steps, + per_task_steps, + ) = resolve_model_prob_from_epochs( + self.model_keys, + self.num_epoch_dict, + np.asarray(per_task_total, dtype=np.float64), + ) log.info( - "Computed num_steps=%d from num_epoch_dict=%s with per-task steps: %s.", + "Computed model_prob=%s and num_steps=%d from num_epoch_dict=%s " + "with per-task target steps: %s.", + self.model_prob, self.num_steps, self.num_epoch_dict, {k: int(np.ceil(v)) for k, v in per_task_steps.items()}, ) - # === Step 2. Fall back to num_epoch === - elif self.num_epoch is None: - raise ValueError( - "Either training.numb_steps, training.num_epoch, or " - "training.num_epoch_dict (multi-task only) must be set." - ) else: - if self.num_epoch <= 0: - raise ValueError("training.num_epoch must be positive.") - if total_numb_batch <= 0: + if self.num_steps is None: raise ValueError( - "Total number of training batches must be positive." + "Either training.numb_steps (multi-task only) or " + "training.num_epoch_dict must be set." ) - self.num_steps = int(np.ceil(self.num_epoch * total_numb_batch)) - log.info( - "Computed num_steps=%d from num_epoch=%s and total_numb_batch=%d.", - self.num_steps, - self.num_epoch, - total_numb_batch, + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, ) - elif self.num_epoch is not None or ( - self.multi_task and self.num_epoch_dict is not None - ): - log.warning( - "Both training.numb_steps and training.num_epoch (or num_epoch_dict) are set; " - "using numb_steps=%d.", - self.num_steps, - ) # Learning rate warmup_steps = training_params.get("warmup_steps", None) diff --git a/deepmd/tf/entrypoints/train.py b/deepmd/tf/entrypoints/train.py index d3a4d8a099..6b1af94a0f 100755 --- a/deepmd/tf/entrypoints/train.py +++ b/deepmd/tf/entrypoints/train.py @@ -304,12 +304,6 @@ def compute_total_numb_batch(nbatches, sys_probs) -> int: num_epoch, total_numb_batch, ) - elif num_epoch is not None: - log.warning( - "Both training.numb_steps and training.num_epoch are set; " - "using numb_steps=%d.", - stop_batch, - ) origin_type_map = jdata["model"].get("origin_type_map", None) if ( origin_type_map is not None and not origin_type_map diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 4ceb8c05eb..acaf0fc3f2 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3214,41 +3214,33 @@ def training_args( multi_task: bool = False, ) -> list[Argument]: # ! modified by Ziyao: data configuration isolated. doc_numb_steps = ( - "Number of training batches. Each training uses one batch of data. " - "If set, this value takes precedence over num_epoch. If both numb_steps " - "and num_epoch are not set, a ValueError is raised." + "Number of training steps (num_step). Each training uses one batch of data. " + "Mutually exclusive with num_epoch in single-task mode. In multi-task " + "mode, this is mutually exclusive with num_epoch_dict. " + "Accepted names: num_step, num_steps, numb_step, numb_steps." ) doc_num_epoch = ( - "Number of training epochs (can be fractional). " - "When numb_steps is not set, the total steps are computed as " - "ceil(num_epoch * total_numb_batch). For each task, total_numb_batch " - "is computed as ceil(max_i(n_bch_i / p_i)), where n_bch_i is the number " - "of batches for system i and p_i is the sampling probability after " - "sys_probs/auto_prob normalization. In multi-task mode, model_prob is " - "normalized to sum to 1, per-task total_numb_batch values are computed " - "as above, and the final total_numb_batch is their model_prob-weighted sum. " - "Note that in multi-task mode, this defines an 'expected epoch' where each " - "sample is visited once in expectation across all tasks, rather than a " - "full epoch for each individual task. In multi-task mode, num_epoch_dict " - "takes precedence over num_epoch if both are set. For multi-task pretraining " - "scenarios where different tasks require different numbers of visits, using " - "numb_steps directly is recommended for more explicit control. At least one " - "of numb_steps or num_epoch (or num_epoch_dict in multi-task mode) must be " - "set; otherwise a ValueError is raised." + "Number of training epochs (num_epoch; can be fractional) for single-task " + "mode only. Because each step samples the dataset stochastically, this " + "corresponds to an expected epoch count rather than a deterministic full " + "pass. When num_step is not set, the total steps are computed as " + "ceil(num_epoch * total_numb_batch). total_numb_batch is computed as " + "ceil(max_i(n_bch_i / p_i)), where n_bch_i is the number of batches for " + "system i and p_i is the sampling probability after sys_probs/auto_prob " + "normalization. Mutually exclusive with num_step. For multi-task mode, " + "use num_epoch_dict instead. Accepted names: num_epoch, num_epochs, " + "numb_epoch, numb_epochs." ) doc_num_epoch_dict = ( "Number of training epochs for each model branch in multi-task mode " "(can be fractional). This is a dictionary mapping model keys to the " - "number of epochs to train that specific model. When set, the total " - "training steps are computed as max_i(num_epoch_dict[i] * per_task_total[i] / model_prob[i]), " - "ensuring each model completes at least its specified number of epochs. " - "The model requiring the most steps will complete approximately its target " - "epochs, while other models may complete more epochs. This is particularly " - "useful for multi-task fine-tuning scenarios where a data-rich pretrained model " - "is jointly trained with a data-scarce downstream task, and only the downstream " - "task's epoch count is of interest. In multi-task mode, this parameter takes " - "precedence over num_epoch if both are set. All model keys must be specified " - "in the dictionary." + "number of epochs to train that specific model. When set, model_prob " + "is derived from the epoch targets and per-task total_numb_batch values: " + "model_prob[i] = num_epoch_dict[i] * per_task_total[i] / sum_j(num_epoch_dict[j] * per_task_total[j]). " + "Total training steps are computed as " + "ceil(sum_i(num_epoch_dict[i] * per_task_total[i])). " + "This parameter is mutually exclusive with training.model_prob and " + "training.num_step. All model keys must be specified in the dictionary." ) doc_seed = "The random seed for getting frames from the training data set." doc_disp_file = "The file for printing learning curve." @@ -3308,8 +3300,9 @@ def training_args( doc_kf_blocksize = "The blocksize for the Kalman filter." doc_model_prob = ( "The visiting probability of each model for each training step in the " - "multi-task mode. If not set or an empty dict, defaults to weights " - "proportional to the number of systems per task." + "multi-task mode. Only used when num_epoch_dict is not set. If not set " + "or an empty dict, defaults to weights proportional to the number of " + "systems per task." ) doc_data_dict = "The multiple definition of the data, used in the multi-task mode." doc_acc_freq = "Gradient accumulation steps (number of steps to accumulate gradients before performing an update)." @@ -3365,14 +3358,19 @@ def training_args( int, optional=True, doc=doc_numb_steps, - alias=["stop_batch", "num_steps"], + alias=[ + "stop_batch", + "num_step", + "num_steps", + "numb_step", + ], ), Argument( "num_epoch", [int, float], optional=True, doc=doc_num_epoch, - alias=["numb_epoch"], + alias=["num_epochs", "numb_epoch", "numb_epochs"], ), Argument("seed", [int, None], optional=True, doc=doc_seed), Argument( @@ -3653,8 +3651,43 @@ def training_args( ) ] + def training_extra_check(data: dict | None) -> bool: + if data is None: + return True + num_steps = data.get("numb_steps") + num_epoch = data.get("num_epoch") + num_epoch_dict = data.get("num_epoch_dict", {}) + model_prob = data.get("model_prob", {}) + if multi_task: + if num_epoch is not None: + raise ValueError( + "training.num_epoch is only supported in single-task mode." + ) + if num_epoch_dict: + if num_steps is not None: + raise ValueError( + "training.num_epoch_dict is mutually exclusive with training.num_step." + ) + if model_prob: + raise ValueError( + "training.num_epoch_dict is mutually exclusive with training.model_prob." + ) + else: + if num_steps is not None and num_epoch is not None: + raise ValueError( + "training.num_step and training.num_epoch are mutually exclusive." + ) + return True + doc_training = "The training options." - return Argument("training", dict, args, variants, doc=doc_training) + return Argument( + "training", + dict, + args, + variants, + doc=doc_training, + extra_check=training_extra_check, + ) def multi_model_args() -> list[Argument]: diff --git a/doc/train/multi-task-training.md b/doc/train/multi-task-training.md index 867b1fef69..2b420bb8a5 100644 --- a/doc/train/multi-task-training.md +++ b/doc/train/multi-task-training.md @@ -79,15 +79,15 @@ Specifically, there are several parts that need to be modified: - (Optional) {ref}`training/model_prob `: The sampling weight settings corresponding to each `model_key`, i.e., the probability weight in the training step. You can specify any positive real number weight for each task. The higher the weight, the higher the probability of being sampled in each training. - This setting is optional, and if not set, tasks will be sampled with equal weights. + This setting is optional, and if not set, tasks will be sampled with equal weights. It is only used when `num_epoch_dict` is not set. -- (Optional) {ref}`training/num_epoch_dict `: The number of training epochs for each model branch, specified as a dictionary mapping `model_key` to epoch values. +- (Optional) {ref}`training/num_epoch_dict `: The number of training epochs for each model branch, specified as a dictionary mapping `model_key` to epoch values (can be fractional). This allows different tasks to train for different numbers of epochs, which is particularly useful for multi-task fine-tuning scenarios where a data-rich pretrained model is jointly trained with a data-scarce downstream task. - When set, the total training steps are computed as `max_i(num_epoch_dict[i] * per_task_total[i] / model_prob[i])`, - ensuring each model completes at least its specified number of epochs. - The model requiring the most steps will complete approximately its target epochs, while other models may complete more epochs. - In multi-task mode, this parameter takes precedence over `num_epoch` if both are set. + When set, `model_prob` is derived from the epoch targets and per-task totals: + `model_prob[i] = num_epoch_dict[i] * per_task_total[i] / sum_j(num_epoch_dict[j] * per_task_total[j])`. + The total training steps are computed as `ceil(sum_i(num_epoch_dict[i] * per_task_total[i]))`. + This parameter is mutually exclusive with `training/model_prob` and `training/num_steps`. An example input for multi-task training two models in water system is shown as following: diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index b7084c415e..d6ca3b3749 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -333,11 +333,17 @@ def test_sampling_stability_multi_task(self) -> None: ], dtype=np.float64, ) - model_prob = np.asarray([0.4, 0.6], dtype=np.float64) - model_prob = model_prob / np.sum(model_prob) - total_numb_batch = int(np.ceil(np.sum(per_task_total * model_prob))) - num_epoch = 1.5 - num_steps = int(np.ceil(num_epoch * total_numb_batch)) + num_epoch_dict = {model_keys[0]: 1.5, model_keys[1]: 0.8} + target_steps = np.array( + [ + num_epoch_dict[model_keys[0]] * per_task_total[0], + num_epoch_dict[model_keys[1]] * per_task_total[1], + ], + dtype=np.float64, + ) + total_target_steps = float(np.sum(target_steps)) + model_prob = target_steps / total_target_steps + num_steps = int(np.ceil(total_target_steps)) # === Step 2. Sample Using Derived Steps === dataloaders_epoch = { @@ -459,22 +465,19 @@ def test_num_epoch_dict(self) -> None: ) # === Step 3. Test num_epoch_dict calculation === - model_prob = np.asarray([0.4, 0.6], dtype=np.float64) - model_prob = model_prob / np.sum(model_prob) num_epoch_dict = {model_keys[0]: 2.0, model_keys[1]: 5.0} - # Compute expected steps for each task - # steps_i = epoch_i * per_task_total[i] / model_prob[i] + # Compute expected steps and model_prob from epoch targets per_task_steps = np.array( [ - num_epoch_dict[model_keys[0]] * per_task_total[0] / model_prob[0], - num_epoch_dict[model_keys[1]] * per_task_total[1] / model_prob[1], + num_epoch_dict[model_keys[0]] * per_task_total[0], + num_epoch_dict[model_keys[1]] * per_task_total[1], ], dtype=np.float64, ) - - # Total steps should be max of per-task steps - expected_num_steps = int(np.ceil(np.max(per_task_steps))) + total_target_steps = float(np.sum(per_task_steps)) + model_prob = per_task_steps / total_target_steps + expected_num_steps = int(np.ceil(total_target_steps)) # Verify the calculation matches the expected formula self.assertIsInstance(expected_num_steps, int) @@ -500,22 +503,20 @@ def test_num_epoch_dict(self) -> None: msg="Model 1 should complete at least 5 epochs", ) - # The task requiring the most steps should complete approximately its target - max_task_idx = int(np.argmax(per_task_steps)) - if max_task_idx == 0: - self.assertAlmostEqual( - expected_epochs_0, - num_epoch_dict[model_keys[0]], - delta=0.1, - msg="Model 0 (max steps) should complete approximately 2 epochs", - ) - else: - self.assertAlmostEqual( - expected_epochs_1, - num_epoch_dict[model_keys[1]], - delta=0.1, - msg="Model 1 (max steps) should complete approximately 5 epochs", - ) + # All tasks should be scaled by the same rounding factor. + scale_0 = expected_epochs_0 / num_epoch_dict[model_keys[0]] + scale_1 = expected_epochs_1 / num_epoch_dict[model_keys[1]] + self.assertGreaterEqual( + scale_0, + 1.0, + msg="Rounding should not reduce expected epochs.", + ) + self.assertAlmostEqual( + scale_0, + scale_1, + delta=0.1, + msg="Rounding should scale all tasks consistently.", + ) if __name__ == "__main__": From a93a4f7eaa47e50ac6e98296371ff891dda3194d Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 16:52:56 +0800 Subject: [PATCH 10/18] fix pd --- deepmd/pd/train/training.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index bd05fc76f6..f309e18b05 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -130,6 +130,7 @@ def __init__( else 1 ) self.num_model = len(self.model_keys) + self.model_prob = None # Iteration config self.num_steps = training_params.get("numb_steps") @@ -753,6 +754,14 @@ def single_model_finetune( frz_model = paddle.jit.load(init_frz_model) self.model.set_state_dict(frz_model.state_dict()) + # Get model prob for multi-task + if self.multi_task and self.model_prob is None: + self.model_prob = resolve_model_prob( + self.model_keys, + training_params.get("model_prob"), + training_data, + ) + # Multi-task share params if shared_links is not None: self.wrapper.share_params( From 92b6aff38486fdd5cf42060882f6f0e2337d87d3 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Tue, 13 Jan 2026 23:34:33 +0800 Subject: [PATCH 11/18] merge commont functions --- deepmd/dpmodel/utils/__init__.py | 8 ++ deepmd/dpmodel/utils/training_utils.py | 188 +++++++++++++++++++++++++ deepmd/pd/train/training.py | 117 +-------------- deepmd/pt/train/training.py | 123 +--------------- deepmd/tf/entrypoints/change_bias.py | 27 +--- deepmd/tf/entrypoints/train.py | 27 +--- source/tests/pt/test_sampler.py | 44 +++--- 7 files changed, 235 insertions(+), 299 deletions(-) create mode 100644 deepmd/dpmodel/utils/training_utils.py diff --git a/deepmd/dpmodel/utils/__init__.py b/deepmd/dpmodel/utils/__init__.py index 5941d2c8d0..cd6eb696c9 100644 --- a/deepmd/dpmodel/utils/__init__.py +++ b/deepmd/dpmodel/utils/__init__.py @@ -36,6 +36,11 @@ save_dp_model, traverse_model_dict, ) +from .training_utils import ( + compute_total_numb_batch, + resolve_model_prob, + resolve_model_prob_from_epochs, +) __all__ = [ "AtomExcludeMask", @@ -49,6 +54,7 @@ "aggregate", "build_multiple_neighbor_list", "build_neighbor_list", + "compute_total_numb_batch", "extend_coord_with_ghosts", "get_graph_index", "get_multiple_nlist_key", @@ -60,6 +66,8 @@ "nlist_distinguish_types", "normalize_coord", "phys2inter", + "resolve_model_prob", + "resolve_model_prob_from_epochs", "save_dp_model", "to_face_distance", "traverse_model_dict", diff --git a/deepmd/dpmodel/utils/training_utils.py b/deepmd/dpmodel/utils/training_utils.py new file mode 100644 index 0000000000..72dfda930a --- /dev/null +++ b/deepmd/dpmodel/utils/training_utils.py @@ -0,0 +1,188 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import logging +from collections.abc import ( + Iterable, +) + +import numpy as np + +log = logging.getLogger(__name__) + + +def compute_total_numb_batch( + numb_batches: Iterable[int], + sampler_weights: np.ndarray, +) -> int: + """Compute total number of batches considering sampler weights. + + Parameters + ---------- + numb_batches : Iterable[int] + Number of batches for each data system. + sampler_weights : np.ndarray + Sampling weights for each data system. + + Returns + ------- + int + Total number of batches. + + Raises + ------ + ValueError + If input validation fails. + """ + weights = np.asarray(sampler_weights, dtype=np.float64) + if weights.ndim != 1: + raise ValueError("Sampler weights must be 1D.") + if weights.size == 0: + raise ValueError("Sampler weights are empty.") + if not np.all(np.isfinite(weights)): + raise ValueError("Sampler weights must be finite.") + if np.any(weights < 0.0): + raise ValueError("Sampler weights must be non-negative.") + weight_sum = float(np.sum(weights)) + if weight_sum <= 0.0: + raise ValueError("Sampler weights must sum to a positive value.") + probs = weights / weight_sum + nbatches = np.asarray(numb_batches, dtype=np.float64) + if nbatches.ndim != 1: + raise ValueError("Number of batches must be 1D.") + if nbatches.size == 0: + raise ValueError("Number of batches is empty.") + if not np.all(np.isfinite(nbatches)): + raise ValueError("Number of batches must be finite.") + if np.any(nbatches < 0.0): + raise ValueError("Number of batches must be non-negative.") + if nbatches.shape[0] != probs.shape[0]: + raise ValueError("Number of batches and sampler weights must match.") + valid = probs > 0.0 + if not np.any(valid): + raise ValueError( + "Sampler probabilities must contain at least one positive entry." + ) + return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) + + +def resolve_model_prob( + model_keys: list[str], + model_prob_config: dict[str, float] | None, + model_training_data: dict[str, object], + rank: int = 0, +) -> np.ndarray: + """Resolve model training probability for multi-task training. + + Parameters + ---------- + model_keys : list[str] + List of model keys. + model_prob_config : dict[str, float] | None + User-specified model probabilities. If None, use data size. + model_training_data : dict[str, object] + Training data for each model. + rank : int, optional + Process rank for distributed training, by default 0. + + Returns + ------- + np.ndarray + Normalized model probabilities. + + Raises + ------ + ValueError + If input validation fails. + """ + model_prob = np.zeros(len(model_keys), dtype=np.float64) + if model_prob_config: + missing = [k for k in model_keys if k not in model_prob_config] + if missing: + raise ValueError( + f"training.model_prob must specify all tasks; missing: {missing}" + ) + for ii, model_key in enumerate(model_keys): + if model_key in model_prob_config: + model_prob[ii] = float(model_prob_config[model_key]) + else: + if rank == 0: + log.info( + "training.model_prob is not set or empty; defaulting to the " + "number of systems per task." + ) + for ii, model_key in enumerate(model_keys): + model_prob[ii] = float(len(model_training_data[model_key])) + if not np.all(np.isfinite(model_prob)): + raise ValueError("Model prob must be finite.") + if np.any(model_prob < 0.0): + raise ValueError("Model prob must be non-negative.") + sum_prob = float(np.sum(model_prob)) + if sum_prob <= 0.0: + raise ValueError("Sum of model prob must be larger than 0!") + return model_prob / sum_prob + + +def resolve_model_prob_from_epochs( + model_keys: list[str], + num_epoch_dict_config: dict[str, float], + per_task_total: np.ndarray, +) -> tuple[np.ndarray, int, dict[str, float]]: + """Resolve model probability and training steps from epoch configuration. + + Parameters + ---------- + model_keys : list[str] + List of model keys. + num_epoch_dict_config : dict[str, float] + Target epochs for each task. + per_task_total : np.ndarray + Total batches per task. + + Returns + ------- + tuple[np.ndarray, int, dict[str, float]] + Model probabilities, total training steps, and per-task steps. + + Raises + ------ + ValueError + If input validation fails. + """ + if not num_epoch_dict_config: + raise ValueError("training.num_epoch_dict must be set for multi-task epochs.") + missing = [k for k in model_keys if k not in num_epoch_dict_config] + if missing: + raise ValueError( + f"training.num_epoch_dict must specify all tasks; missing: {missing}" + ) + epoch_targets = np.zeros(len(model_keys), dtype=np.float64) + for ii, model_key in enumerate(model_keys): + epoch_value = num_epoch_dict_config[model_key] + if epoch_value is None: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive." + ) + epoch_value = float(epoch_value) + if not np.isfinite(epoch_value) or epoch_value <= 0.0: + raise ValueError( + f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." + ) + epoch_targets[ii] = epoch_value + per_task_total = np.asarray(per_task_total, dtype=np.float64) + if per_task_total.ndim != 1: + raise ValueError("Per-task total batches must be 1D.") + if per_task_total.shape[0] != epoch_targets.shape[0]: + raise ValueError("Per-task totals and epoch targets must match.") + if not np.all(np.isfinite(per_task_total)): + raise ValueError("Per-task total batches must be finite.") + if np.any(per_task_total <= 0.0): + raise ValueError("Per-task total batches must be positive.") + per_task_steps = per_task_total * epoch_targets + total_target_steps = float(np.sum(per_task_steps)) + if total_target_steps <= 0.0: + raise ValueError("Sum of target steps must be positive.") + model_prob = per_task_steps / total_target_steps + num_steps = int(np.ceil(total_target_steps)) + per_task_steps_map = { + model_key: float(per_task_steps[ii]) for ii, model_key in enumerate(model_keys) + } + return model_prob, num_steps, per_task_steps_map diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index f309e18b05..d0691c6863 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -30,8 +30,11 @@ from deepmd.common import ( symlink_prefix_files, ) -from deepmd.dpmodel.utils.learning_rate import ( - BaseLR, +from deepmd.dpmodel.utils.learning_rate import BaseLR +from deepmd.dpmodel.utils import ( + compute_total_numb_batch, + resolve_model_prob, + resolve_model_prob_from_epochs, ) from deepmd.loggers.training import ( format_training_message, @@ -213,114 +216,6 @@ def get_dataloader_and_buffer( valid_numb_batch, ) - def compute_total_numb_batch(numb_batches, sampler_weights) -> int: - weights = np.asarray(sampler_weights, dtype=np.float64) - if weights.ndim != 1: - raise ValueError("Sampler weights must be 1D.") - if weights.size == 0: - raise ValueError("Sampler weights are empty.") - if not np.all(np.isfinite(weights)): - raise ValueError("Sampler weights must be finite.") - if np.any(weights < 0.0): - raise ValueError("Sampler weights must be non-negative.") - weight_sum = float(np.sum(weights)) - if weight_sum <= 0.0: - raise ValueError("Sampler weights must sum to a positive value.") - probs = weights / weight_sum - nbatches = np.asarray(numb_batches, dtype=np.float64) - if nbatches.ndim != 1: - raise ValueError("Number of batches must be 1D.") - if nbatches.size == 0: - raise ValueError("Number of batches is empty.") - if not np.all(np.isfinite(nbatches)): - raise ValueError("Number of batches must be finite.") - if np.any(nbatches < 0.0): - raise ValueError("Number of batches must be non-negative.") - if nbatches.shape[0] != probs.shape[0]: - raise ValueError("Number of batches and sampler weights must match.") - valid = probs > 0.0 - if not np.any(valid): - raise ValueError( - "Sampler probabilities must contain at least one positive entry." - ) - return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) - - def resolve_model_prob( - model_keys, - model_prob_config, - model_training_data, - ) -> np.ndarray: - model_prob = np.zeros(len(model_keys), dtype=np.float64) - if model_prob_config: - missing = [k for k in model_keys if k not in model_prob_config] - if missing: - raise ValueError( - f"training.model_prob must specify all tasks; missing: {missing}" - ) - for ii, model_key in enumerate(model_keys): - if model_key in model_prob_config: - model_prob[ii] = float(model_prob_config[model_key]) - else: - for ii, model_key in enumerate(model_keys): - model_prob[ii] = float(len(model_training_data[model_key])) - if not np.all(np.isfinite(model_prob)): - raise ValueError("Model prob must be finite.") - if np.any(model_prob < 0.0): - raise ValueError("Model prob must be non-negative.") - sum_prob = float(np.sum(model_prob)) - if sum_prob <= 0.0: - raise ValueError("Sum of model prob must be larger than 0!") - return model_prob / sum_prob - - def resolve_model_prob_from_epochs( - model_keys, - num_epoch_dict_config, - per_task_total, - ) -> tuple[np.ndarray, int, dict[str, float]]: - if not num_epoch_dict_config: - raise ValueError( - "training.num_epoch_dict must be set for multi-task epochs." - ) - missing = [k for k in model_keys if k not in num_epoch_dict_config] - if missing: - raise ValueError( - "training.num_epoch_dict must specify all tasks; " - f"missing: {missing}" - ) - epoch_targets = np.zeros(len(model_keys), dtype=np.float64) - for ii, model_key in enumerate(model_keys): - epoch_value = num_epoch_dict_config[model_key] - if epoch_value is None: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive." - ) - epoch_value = float(epoch_value) - if not np.isfinite(epoch_value) or epoch_value <= 0.0: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." - ) - epoch_targets[ii] = epoch_value - per_task_total = np.asarray(per_task_total, dtype=np.float64) - if per_task_total.ndim != 1: - raise ValueError("Per-task total batches must be 1D.") - if per_task_total.shape[0] != epoch_targets.shape[0]: - raise ValueError("Per-task totals and epoch targets must match.") - if not np.all(np.isfinite(per_task_total)): - raise ValueError("Per-task total batches must be finite.") - if np.any(per_task_total <= 0.0): - raise ValueError("Per-task total batches must be positive.") - per_task_steps = per_task_total * epoch_targets - total_target_steps = float(np.sum(per_task_steps)) - if total_target_steps <= 0.0: - raise ValueError("Sum of target steps must be positive.") - model_prob = per_task_steps / total_target_steps - num_steps = int(np.ceil(total_target_steps)) - per_task_steps_map = { - model_key: float(per_task_steps[ii]) - for ii, model_key in enumerate(model_keys) - } - return model_prob, num_steps, per_task_steps_map - def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -567,6 +462,7 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: self.model_keys, training_params.get("model_prob"), training_data, + rank=self.rank, ) # Learning rate @@ -760,6 +656,7 @@ def single_model_finetune( self.model_keys, training_params.get("model_prob"), training_data, + rank=self.rank, ) # Multi-task share params diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 3fe9f59d82..28c45627b6 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -23,6 +23,11 @@ from deepmd.common import ( symlink_prefix_files, ) +from deepmd.dpmodel.utils import ( + compute_total_numb_batch, + resolve_model_prob, + resolve_model_prob_from_epochs, +) from deepmd.loggers.training import ( format_training_message, format_training_message_per_task, @@ -276,122 +281,6 @@ def get_dataloader_and_iter( valid_numb_batch, ) - def compute_total_numb_batch( - numb_batches: Iterable[int], - sampler_weights: np.ndarray, - ) -> int: - weights = np.asarray(sampler_weights, dtype=np.float64) - if weights.ndim != 1: - raise ValueError("Sampler weights must be 1D.") - if weights.size == 0: - raise ValueError("Sampler weights are empty.") - if not np.all(np.isfinite(weights)): - raise ValueError("Sampler weights must be finite.") - if np.any(weights < 0.0): - raise ValueError("Sampler weights must be non-negative.") - weight_sum = float(np.sum(weights)) - if weight_sum <= 0.0: - raise ValueError("Sampler weights must sum to a positive value.") - probs = weights / weight_sum - nbatches = np.asarray(numb_batches, dtype=np.float64) - if nbatches.ndim != 1: - raise ValueError("Number of batches must be 1D.") - if nbatches.size == 0: - raise ValueError("Number of batches is empty.") - if not np.all(np.isfinite(nbatches)): - raise ValueError("Number of batches must be finite.") - if np.any(nbatches < 0.0): - raise ValueError("Number of batches must be non-negative.") - if nbatches.shape[0] != probs.shape[0]: - raise ValueError("Number of batches and sampler weights must match.") - valid = probs > 0.0 - if not np.any(valid): - raise ValueError( - "Sampler probabilities must contain at least one positive entry." - ) - return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) - - def resolve_model_prob( - model_keys: list[str], - model_prob_config: dict[str, Any] | None, - model_training_data: dict[str, DpLoaderSet], - ) -> np.ndarray: - model_prob = np.zeros(len(model_keys), dtype=np.float64) - if model_prob_config: - missing = [k for k in model_keys if k not in model_prob_config] - if missing: - raise ValueError( - f"training.model_prob must specify all tasks; missing: {missing}" - ) - for ii, model_key in enumerate(model_keys): - if model_key in model_prob_config: - model_prob[ii] = float(model_prob_config[model_key]) - else: - if self.rank == 0: - log.info( - "training.model_prob is not set or empty; defaulting to the " - "number of systems per task." - ) - for ii, model_key in enumerate(model_keys): - model_prob[ii] = float(len(model_training_data[model_key])) - if not np.all(np.isfinite(model_prob)): - raise ValueError("Model prob must be finite.") - if np.any(model_prob < 0.0): - raise ValueError("Model prob must be non-negative.") - sum_prob = float(np.sum(model_prob)) - if sum_prob <= 0.0: - raise ValueError("Sum of model prob must be larger than 0!") - return model_prob / sum_prob - - def resolve_model_prob_from_epochs( - model_keys: list[str], - num_epoch_dict_config: dict[str, Any], - per_task_total: np.ndarray, - ) -> tuple[np.ndarray, int, dict[str, float]]: - if not num_epoch_dict_config: - raise ValueError( - "training.num_epoch_dict must be set for multi-task epochs." - ) - missing = [k for k in model_keys if k not in num_epoch_dict_config] - if missing: - raise ValueError( - "training.num_epoch_dict must specify all tasks; " - f"missing: {missing}" - ) - epoch_targets = np.zeros(len(model_keys), dtype=np.float64) - for ii, model_key in enumerate(model_keys): - epoch_value = num_epoch_dict_config[model_key] - if epoch_value is None: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive." - ) - epoch_value = float(epoch_value) - if not np.isfinite(epoch_value) or epoch_value <= 0.0: - raise ValueError( - f"training.num_epoch_dict['{model_key}'] must be positive, got {epoch_value}." - ) - epoch_targets[ii] = epoch_value - per_task_total = np.asarray(per_task_total, dtype=np.float64) - if per_task_total.ndim != 1: - raise ValueError("Per-task total batches must be 1D.") - if per_task_total.shape[0] != epoch_targets.shape[0]: - raise ValueError("Per-task totals and epoch targets must match.") - if not np.all(np.isfinite(per_task_total)): - raise ValueError("Per-task total batches must be finite.") - if np.any(per_task_total <= 0.0): - raise ValueError("Per-task total batches must be positive.") - per_task_steps = per_task_total * epoch_targets - total_target_steps = float(np.sum(per_task_steps)) - if total_target_steps <= 0.0: - raise ValueError("Sum of target steps must be positive.") - model_prob = per_task_steps / total_target_steps - num_steps = int(np.ceil(total_target_steps)) - per_task_steps_map = { - model_key: float(per_task_steps[ii]) - for ii, model_key in enumerate(model_keys) - } - return model_prob, num_steps, per_task_steps_map - def single_model_stat( _model: Any, _data_stat_nbatch: int, @@ -646,6 +535,7 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: self.model_keys, training_params.get("model_prob"), training_data, + rank=self.rank, ) # Learning rate @@ -875,6 +765,7 @@ def single_model_finetune( self.model_keys, training_params.get("model_prob"), training_data, + rank=self.rank, ) # Multi-task share params diff --git a/deepmd/tf/entrypoints/change_bias.py b/deepmd/tf/entrypoints/change_bias.py index 4abfe3bc11..daf868fb4c 100644 --- a/deepmd/tf/entrypoints/change_bias.py +++ b/deepmd/tf/entrypoints/change_bias.py @@ -18,6 +18,9 @@ expand_sys_str, j_loader, ) +from deepmd.dpmodel.utils import ( + compute_total_numb_batch, +) from deepmd.tf.entrypoints.freeze import ( freeze, ) @@ -190,30 +193,6 @@ def _change_bias_checkpoint_file( data = _load_data_systems(datafile, system, trainer) # Get stop_batch and origin_type_map like in train.py - def compute_total_numb_batch(nbatches, sys_probs) -> int: - weights = np.asarray(sys_probs, dtype=np.float64) - if weights.ndim != 1: - raise ValueError("Sampler probabilities must be 1D.") - if weights.size == 0: - raise ValueError("Sampler probabilities are empty.") - if not np.all(np.isfinite(weights)): - raise ValueError("Sampler probabilities must be finite.") - if np.any(weights < 0.0): - raise ValueError("Sampler probabilities must be non-negative.") - weight_sum = float(np.sum(weights)) - if weight_sum <= 0.0: - raise ValueError("Sampler probabilities must sum to a positive value.") - probs = weights / weight_sum - nbatches = np.asarray(nbatches, dtype=np.float64) - if nbatches.shape[0] != probs.shape[0]: - raise ValueError("Number of batches and sampler probabilities must match.") - valid = probs > 0.0 - if not np.any(valid): - raise ValueError( - "Sampler probabilities must contain at least one positive entry." - ) - return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) - training_params = jdata.get("training", {}) stop_batch = training_params.get("numb_steps") num_epoch = training_params.get("num_epoch") diff --git a/deepmd/tf/entrypoints/train.py b/deepmd/tf/entrypoints/train.py index 6b1af94a0f..9e3ffeae52 100755 --- a/deepmd/tf/entrypoints/train.py +++ b/deepmd/tf/entrypoints/train.py @@ -17,6 +17,9 @@ from deepmd.common import ( j_loader, ) +from deepmd.dpmodel.utils import ( + compute_total_numb_batch, +) from deepmd.tf.env import ( GLOBAL_ENER_FLOAT_PRECISION, reset_default_tf_session_config, @@ -254,30 +257,6 @@ def _do_work( modifier.build_fv_graph() # get training info - def compute_total_numb_batch(nbatches, sys_probs) -> int: - weights = np.asarray(sys_probs, dtype=np.float64) - if weights.ndim != 1: - raise ValueError("Sampler probabilities must be 1D.") - if weights.size == 0: - raise ValueError("Sampler probabilities are empty.") - if not np.all(np.isfinite(weights)): - raise ValueError("Sampler probabilities must be finite.") - if np.any(weights < 0.0): - raise ValueError("Sampler probabilities must be non-negative.") - weight_sum = float(np.sum(weights)) - if weight_sum <= 0.0: - raise ValueError("Sampler probabilities must sum to a positive value.") - probs = weights / weight_sum - nbatches = np.asarray(nbatches, dtype=np.float64) - if nbatches.shape[0] != probs.shape[0]: - raise ValueError("Number of batches and sampler probabilities must match.") - valid = probs > 0.0 - if not np.any(valid): - raise ValueError( - "Sampler probabilities must contain at least one positive entry." - ) - return int(np.ceil(np.max(nbatches[valid] / probs[valid]))) - training_params = jdata["training"] stop_batch = training_params.get("numb_steps") num_epoch = training_params.get("num_epoch") diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index d6ca3b3749..52cddce742 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -14,6 +14,9 @@ ) import deepmd.pt.utils.dataloader as pt_dataloader +from deepmd.dpmodel.utils import ( + compute_total_numb_batch, +) from deepmd.pt.utils import ( dp_random, ) @@ -92,21 +95,6 @@ def _normalize_probs(self, weights: np.ndarray) -> np.ndarray: weights = np.asarray(weights, dtype=np.float64) return weights / np.sum(weights) - def _compute_total_numb_batch(self, nbatches: np.ndarray, probs: np.ndarray) -> int: - # NOTE: This is a simplified test-only variant of training.py logic. - nbatches = np.asarray(nbatches, dtype=np.float64) - probs = np.asarray(probs, dtype=np.float64) - if nbatches.shape != probs.shape: - raise ValueError( - "nbatches and probs must have the same shape in this test helper." - ) - if not np.all(probs > 0.0): - raise ValueError( - "Zero or negative sampling probabilities are not supported in this " - "test helper." - ) - return int(np.ceil(np.max(nbatches / probs))) - def _sample_sid_counts( self, dataloader: DataLoader, num_steps: int, nsystems: int ) -> np.ndarray: @@ -258,11 +246,13 @@ def test_sampling_stability_single_task(self) -> None: sys_probs = [0.2, 0.3, 0.5] params = {"sys_probs": sys_probs, "auto_prob": "prob_sys_size"} sampler_epoch = pt_dataloader.get_sampler_from_params(dataset_epoch, params) - probs = self._normalize_probs(np.asarray(sampler_epoch.weights)) nbatches = np.asarray(dataset_epoch.index, dtype=np.float64) - total_numb_batch = self._compute_total_numb_batch(nbatches, probs) + total_numb_batch = compute_total_numb_batch( + nbatches, np.asarray(sampler_epoch.weights) + ) num_epoch = 1.5 num_steps = int(np.ceil(num_epoch * total_numb_batch)) + probs = self._normalize_probs(np.asarray(sampler_epoch.weights)) # === Step 2. Sample Using Derived Steps === torch.manual_seed(123) @@ -324,11 +314,13 @@ def test_sampling_stability_multi_task(self) -> None: probs_2 = self._normalize_probs(np.asarray(sampler_2.weights)) per_task_total = np.array( [ - self._compute_total_numb_batch( - np.asarray(dataset_1.index, dtype=np.float64), probs_1 + compute_total_numb_batch( + np.asarray(dataset_1.index, dtype=np.float64), + np.asarray(sampler_1.weights), ), - self._compute_total_numb_batch( - np.asarray(dataset_2.index, dtype=np.float64), probs_2 + compute_total_numb_batch( + np.asarray(dataset_2.index, dtype=np.float64), + np.asarray(sampler_2.weights), ), ], dtype=np.float64, @@ -454,11 +446,13 @@ def test_num_epoch_dict(self) -> None: # === Step 2. Compute per-task total_numb_batch === per_task_total = np.array( [ - self._compute_total_numb_batch( - np.asarray(dataset_1.index, dtype=np.float64), probs_1 + compute_total_numb_batch( + np.asarray(dataset_1.index, dtype=np.float64), + np.asarray(sampler_1.weights), ), - self._compute_total_numb_batch( - np.asarray(dataset_2.index, dtype=np.float64), probs_2 + compute_total_numb_batch( + np.asarray(dataset_2.index, dtype=np.float64), + np.asarray(sampler_2.weights), ), ], dtype=np.float64, From 6ccac046dd02e6370938c7fa9660f815394e1951 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 02:10:05 +0000 Subject: [PATCH 12/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pd/train/training.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index d0691c6863..7d75a6679a 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -30,12 +30,14 @@ from deepmd.common import ( symlink_prefix_files, ) -from deepmd.dpmodel.utils.learning_rate import BaseLR from deepmd.dpmodel.utils import ( compute_total_numb_batch, resolve_model_prob, resolve_model_prob_from_epochs, ) +from deepmd.dpmodel.utils.learning_rate import ( + BaseLR, +) from deepmd.loggers.training import ( format_training_message, format_training_message_per_task, From a935a0a9c48f681926aec38bb7620312e5e1c8d6 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Sat, 17 Jan 2026 16:40:37 +0800 Subject: [PATCH 13/18] clear --- deepmd/tf/entrypoints/change_bias.py | 6 ++++++ deepmd/utils/argcheck.py | 9 +++++++++ source/tests/pt/test_sampler.py | 2 -- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/deepmd/tf/entrypoints/change_bias.py b/deepmd/tf/entrypoints/change_bias.py index daf868fb4c..e048dbb023 100644 --- a/deepmd/tf/entrypoints/change_bias.py +++ b/deepmd/tf/entrypoints/change_bias.py @@ -199,6 +199,12 @@ def _change_bias_checkpoint_file( if stop_batch is None and num_epoch is not None: if num_epoch <= 0: raise ValueError("training.num_epoch must be positive.") + # Apply sys_probs and auto_prob from original training config + # to ensure stop_batch calculation matches the original training + training_data_config = training_params.get("training_data", {}) + sys_probs = training_data_config.get("sys_probs", None) + auto_prob = training_data_config.get("auto_prob", "prob_sys_size") + data.set_sys_probs(sys_probs=sys_probs, auto_prob_style=auto_prob) total_numb_batch = compute_total_numb_batch(data.nbatches, data.sys_probs) if total_numb_batch <= 0: raise ValueError("Total number of training batches must be positive.") diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index acaf0fc3f2..6b95ca32b4 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3672,11 +3672,20 @@ def training_extra_check(data: dict | None) -> bool: raise ValueError( "training.num_epoch_dict is mutually exclusive with training.model_prob." ) + else: + if num_steps is None: + raise ValueError( + "Multi-task mode requires either training.numb_steps or training.num_epoch_dict." + ) else: if num_steps is not None and num_epoch is not None: raise ValueError( "training.num_step and training.num_epoch are mutually exclusive." ) + if num_steps is None and num_epoch is None: + raise ValueError( + "Single-task mode requires either training.numb_steps or training.num_epoch." + ) return True doc_training = "The training options." diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index 52cddce742..af0b5c1a98 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -440,8 +440,6 @@ def test_num_epoch_dict(self) -> None: sampler_2 = pt_dataloader.get_sampler_from_params( dataset_2, {"sys_probs": [0.4, 0.6], "auto_prob": "prob_sys_size"} ) - probs_1 = self._normalize_probs(np.asarray(sampler_1.weights)) - probs_2 = self._normalize_probs(np.asarray(sampler_2.weights)) # === Step 2. Compute per-task total_numb_batch === per_task_total = np.array( From 5c4fa92eb942e36af19c4fe8b523865f11bcdcff Mon Sep 17 00:00:00 2001 From: OutisLi Date: Mon, 26 Jan 2026 16:38:58 +0800 Subject: [PATCH 14/18] 1 --- deepmd/utils/argcheck.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 6b95ca32b4..8e44ebe832 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3366,7 +3366,7 @@ def training_args( ], ), Argument( - "num_epoch", + "numb_epoch", [int, float], optional=True, doc=doc_num_epoch, From b9813649bc18b5b4138635eb048cafe53cb65896 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Mon, 26 Jan 2026 16:39:10 +0800 Subject: [PATCH 15/18] 2 --- deepmd/utils/argcheck.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 8e44ebe832..b2f7534ba6 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3370,7 +3370,7 @@ def training_args( [int, float], optional=True, doc=doc_num_epoch, - alias=["num_epochs", "numb_epoch", "numb_epochs"], + alias=["num_epochs", "num_epoch", "numb_epochs"], ), Argument("seed", [int, None], optional=True, doc=doc_seed), Argument( From e7ef3bbd0ec2b8bf042dd11eb7fe052f36e6888d Mon Sep 17 00:00:00 2001 From: OutisLi Date: Fri, 13 Feb 2026 14:17:09 +0800 Subject: [PATCH 16/18] update --- deepmd/utils/argcheck.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index b2f7534ba6..9faf0c6113 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -3217,7 +3217,7 @@ def training_args( "Number of training steps (num_step). Each training uses one batch of data. " "Mutually exclusive with num_epoch in single-task mode. In multi-task " "mode, this is mutually exclusive with num_epoch_dict. " - "Accepted names: num_step, num_steps, numb_step, numb_steps." + "Accepted names: num_step, num_steps, numb_step, numb_steps, stop_batch." ) doc_num_epoch = ( "Number of training epochs (num_epoch; can be fractional) for single-task " @@ -3655,7 +3655,7 @@ def training_extra_check(data: dict | None) -> bool: if data is None: return True num_steps = data.get("numb_steps") - num_epoch = data.get("num_epoch") + num_epoch = data.get("numb_epoch") num_epoch_dict = data.get("num_epoch_dict", {}) model_prob = data.get("model_prob", {}) if multi_task: From 203bb349308a80fab830c8ffe9cc837b66152281 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Fri, 13 Feb 2026 15:11:23 +0800 Subject: [PATCH 17/18] fix: use numb_epoch key and correct model_prob doc - Change num_epoch to numb_epoch in change_bias.py and train.py - Fix model_prob default description in multi-task-training.md --- deepmd/tf/entrypoints/change_bias.py | 2 +- deepmd/tf/entrypoints/train.py | 2 +- doc/train/multi-task-training.md | 2 +- source/tests/pt/test_sampler.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/deepmd/tf/entrypoints/change_bias.py b/deepmd/tf/entrypoints/change_bias.py index e048dbb023..0363ac7437 100644 --- a/deepmd/tf/entrypoints/change_bias.py +++ b/deepmd/tf/entrypoints/change_bias.py @@ -195,7 +195,7 @@ def _change_bias_checkpoint_file( # Get stop_batch and origin_type_map like in train.py training_params = jdata.get("training", {}) stop_batch = training_params.get("numb_steps") - num_epoch = training_params.get("num_epoch") + num_epoch = training_params.get("numb_epoch") if stop_batch is None and num_epoch is not None: if num_epoch <= 0: raise ValueError("training.num_epoch must be positive.") diff --git a/deepmd/tf/entrypoints/train.py b/deepmd/tf/entrypoints/train.py index 9e3ffeae52..419ab4a1fe 100755 --- a/deepmd/tf/entrypoints/train.py +++ b/deepmd/tf/entrypoints/train.py @@ -259,7 +259,7 @@ def _do_work( # get training info training_params = jdata["training"] stop_batch = training_params.get("numb_steps") - num_epoch = training_params.get("num_epoch") + num_epoch = training_params.get("numb_epoch") if stop_batch is None: if num_epoch is None: raise ValueError( diff --git a/doc/train/multi-task-training.md b/doc/train/multi-task-training.md index 2b420bb8a5..bbb5a107cb 100644 --- a/doc/train/multi-task-training.md +++ b/doc/train/multi-task-training.md @@ -79,7 +79,7 @@ Specifically, there are several parts that need to be modified: - (Optional) {ref}`training/model_prob `: The sampling weight settings corresponding to each `model_key`, i.e., the probability weight in the training step. You can specify any positive real number weight for each task. The higher the weight, the higher the probability of being sampled in each training. - This setting is optional, and if not set, tasks will be sampled with equal weights. It is only used when `num_epoch_dict` is not set. + This setting is optional, and if not set, tasks will be sampled with weights proportional to the number of systems in each task. It is only used when `num_epoch_dict` is not set. - (Optional) {ref}`training/num_epoch_dict `: The number of training epochs for each model branch, specified as a dictionary mapping `model_key` to epoch values (can be fractional). This allows different tasks to train for different numbers of epochs, which is particularly useful for multi-task fine-tuning scenarios diff --git a/source/tests/pt/test_sampler.py b/source/tests/pt/test_sampler.py index af0b5c1a98..88848c7e02 100644 --- a/source/tests/pt/test_sampler.py +++ b/source/tests/pt/test_sampler.py @@ -35,7 +35,7 @@ class _SerialPool: def __init__(self, *args, **kwargs) -> None: pass - def __enter__(self) -> "_SerialPool": + def __enter__(self) -> "_SerialPool": # noqa: PYI034 return self def __exit__(self, exc_type, exc, tb) -> bool: From 8a5b2e91329028c76ec00d29d98b131f48b41c65 Mon Sep 17 00:00:00 2001 From: OutisLi Date: Sat, 21 Feb 2026 13:49:19 +0800 Subject: [PATCH 18/18] add examples --- deepmd/pd/train/training.py | 50 +++--- deepmd/pt/train/training.py | 46 +++-- .../water/se_e2_a/input_torch_num_epoch.json | 81 +++++++++ .../input_torch_num_epoch_dict.json | 163 ++++++++++++++++++ source/tests/common/test_examples.py | 5 + 5 files changed, 295 insertions(+), 50 deletions(-) create mode 100644 examples/water/se_e2_a/input_torch_num_epoch.json create mode 100644 examples/water_multi_task/pytorch_example/input_torch_num_epoch_dict.json diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index 7d75a6679a..dbcbe8d9f6 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -400,13 +400,6 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: per_task_total = [] if not self.multi_task: - sampler_weights = to_numpy_array( - self.training_dataloader.batch_sampler.sampler.weights - ) - total_numb_batch = compute_total_numb_batch( - training_data.index, - sampler_weights, - ) if self.num_steps is None: if self.num_epoch is None: raise ValueError( @@ -414,6 +407,13 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ) if self.num_epoch <= 0: raise ValueError("training.num_epoch must be positive.") + sampler_weights = to_numpy_array( + self.training_dataloader.batch_sampler.sampler.weights + ) + total_numb_batch = compute_total_numb_batch( + training_data.index, + sampler_weights, + ) if total_numb_batch <= 0: raise ValueError( "Total number of training batches must be positive." @@ -426,17 +426,24 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: total_numb_batch, ) else: - for model_key in self.model_keys: - sampler_weights = to_numpy_array( - self.training_dataloader[model_key].batch_sampler.sampler.weights - ) - per_task_total.append( - compute_total_numb_batch( - training_data[model_key].index, - sampler_weights, - ) - ) if self.num_epoch_dict: + if self.num_steps is not None: + raise ValueError( + "training.numb_steps and training.num_epoch_dict " + "are mutually exclusive." + ) + for model_key in self.model_keys: + sampler_weights = to_numpy_array( + self.training_dataloader[ + model_key + ].batch_sampler.sampler.weights + ) + per_task_total.append( + compute_total_numb_batch( + training_data[model_key].index, + sampler_weights, + ) + ) ( self.model_prob, self.num_steps, @@ -652,15 +659,6 @@ def single_model_finetune( frz_model = paddle.jit.load(init_frz_model) self.model.set_state_dict(frz_model.state_dict()) - # Get model prob for multi-task - if self.multi_task and self.model_prob is None: - self.model_prob = resolve_model_prob( - self.model_keys, - training_params.get("model_prob"), - training_data, - rank=self.rank, - ) - # Multi-task share params if shared_links is not None: self.wrapper.share_params( diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 28c45627b6..a23c7cb572 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -473,11 +473,6 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: # Resolve training steps per_task_total = [] if not self.multi_task: - sampler_weights = to_numpy_array(self.training_dataloader.sampler.weights) - total_numb_batch = compute_total_numb_batch( - training_data.index, - sampler_weights, - ) if self.num_steps is None: if self.num_epoch is None: raise ValueError( @@ -485,6 +480,13 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: ) if self.num_epoch <= 0: raise ValueError("training.num_epoch must be positive.") + sampler_weights = to_numpy_array( + self.training_dataloader.sampler.weights + ) + total_numb_batch = compute_total_numb_batch( + training_data.index, + sampler_weights, + ) if total_numb_batch <= 0: raise ValueError( "Total number of training batches must be positive." @@ -497,17 +499,22 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: total_numb_batch, ) else: - for model_key in self.model_keys: - sampler_weights = to_numpy_array( - self.training_dataloader[model_key].sampler.weights - ) - per_task_total.append( - compute_total_numb_batch( - training_data[model_key].index, - sampler_weights, - ) - ) if self.num_epoch_dict: + if self.num_steps is not None: + raise ValueError( + "training.numb_steps and training.num_epoch_dict " + "are mutually exclusive." + ) + for model_key in self.model_keys: + sampler_weights = to_numpy_array( + self.training_dataloader[model_key].sampler.weights + ) + per_task_total.append( + compute_total_numb_batch( + training_data[model_key].index, + sampler_weights, + ) + ) ( self.model_prob, self.num_steps, @@ -759,15 +766,6 @@ def single_model_finetune( f"Checkpoint loaded non-strictly. Missing keys: {missing}, Unexpected keys: {unexpected}" ) - # Get model prob for multi-task - if self.multi_task and self.model_prob is None: - self.model_prob = resolve_model_prob( - self.model_keys, - training_params.get("model_prob"), - training_data, - rank=self.rank, - ) - # Multi-task share params if shared_links is not None: _data_stat_protect = np.array( diff --git a/examples/water/se_e2_a/input_torch_num_epoch.json b/examples/water/se_e2_a/input_torch_num_epoch.json new file mode 100644 index 0000000000..61117e25b4 --- /dev/null +++ b/examples/water/se_e2_a/input_torch_num_epoch.json @@ -0,0 +1,81 @@ +{ + "model": { + "type_map": [ + "O", + "H" + ], + "descriptor": { + "type": "se_e2_a", + "sel": [ + 46, + 92 + ], + "rcut_smth": 0.50, + "rcut": 6.00, + "neuron": [ + 25, + 50, + 100 + ], + "resnet_dt": false, + "axis_neuron": 16, + "type_one_side": true, + "seed": 1, + "_comment": " that's all" + }, + "fitting_net": { + "neuron": [ + 240, + 240, + 240 + ], + "resnet_dt": true, + "seed": 1, + "_comment": " that's all" + }, + "data_stat_nbatch": 20, + "_comment": " that's all" + }, + "learning_rate": { + "type": "exp", + "decay_steps": 5000, + "start_lr": 0.001, + "stop_lr": 3.51e-8, + "_comment": "that's all" + }, + "loss": { + "type": "ener", + "start_pref_e": 0.02, + "limit_pref_e": 1, + "start_pref_f": 1000, + "limit_pref_f": 1, + "_comment": " that's all" + }, + "training": { + "stat_file": "./se_e2_a.hdf5", + "training_data": { + "systems": [ + "../data/data_0", + "../data/data_1", + "../data/data_2" + ], + "batch_size": 1, + "_comment": "that's all" + }, + "validation_data": { + "systems": [ + "../data/data_3" + ], + "batch_size": 1, + "numb_btch": 3, + "_comment": "that's all" + }, + "num_epoch": 100, + "seed": 10, + "disp_file": "lcurve.out", + "disp_freq": 100, + "save_freq": 10000, + "_comment": "that's all" + }, + "_comment": "that's all" +} diff --git a/examples/water_multi_task/pytorch_example/input_torch_num_epoch_dict.json b/examples/water_multi_task/pytorch_example/input_torch_num_epoch_dict.json new file mode 100644 index 0000000000..d3a50cf506 --- /dev/null +++ b/examples/water_multi_task/pytorch_example/input_torch_num_epoch_dict.json @@ -0,0 +1,163 @@ +{ + "_comment": "that's all", + "model": { + "shared_dict": { + "type_map_all": [ + "O", + "H" + ], + "dpa2_descriptor": { + "type": "dpa2", + "repinit": { + "tebd_dim": 8, + "rcut": 6.0, + "rcut_smth": 0.5, + "nsel": 120, + "neuron": [ + 25, + 50, + 100 + ], + "axis_neuron": 12, + "activation_function": "tanh", + "three_body_sel": 48, + "three_body_rcut": 4.0, + "three_body_rcut_smth": 3.5, + "use_three_body": true + }, + "repformer": { + "rcut": 4.0, + "rcut_smth": 3.5, + "nsel": 48, + "nlayers": 6, + "g1_dim": 128, + "g2_dim": 32, + "attn2_hidden": 32, + "attn2_nhead": 4, + "attn1_hidden": 128, + "attn1_nhead": 4, + "axis_neuron": 4, + "update_h2": false, + "update_g1_has_conv": true, + "update_g1_has_grrg": true, + "update_g1_has_drrd": true, + "update_g1_has_attn": false, + "update_g2_has_g1g1": false, + "update_g2_has_attn": true, + "update_style": "res_residual", + "update_residual": 0.01, + "update_residual_init": "norm", + "attn2_has_gate": true, + "use_sqrt_nnei": true, + "g1_out_conv": true, + "g1_out_mlp": true + }, + "precision": "float64", + "add_tebd_to_repinit_out": false, + "seed": 1, + "_comment": " that's all" + }, + "_comment": "that's all" + }, + "model_dict": { + "water_1": { + "type_map": "type_map_all", + "descriptor": "dpa2_descriptor", + "fitting_net": { + "neuron": [ + 240, + 240, + 240 + ], + "resnet_dt": true, + "seed": 1, + "_comment": " that's all" + } + }, + "water_2": { + "type_map": "type_map_all", + "descriptor": "dpa2_descriptor", + "fitting_net": { + "neuron": [ + 240, + 240, + 240 + ], + "resnet_dt": true, + "seed": 1, + "_comment": " that's all" + } + } + } + }, + "learning_rate": { + "type": "exp", + "decay_steps": 5000, + "start_lr": 0.0002, + "decay_rate": 0.98, + "stop_lr": 3.51e-08, + "_comment": "that's all" + }, + "loss_dict": { + "water_1": { + "type": "ener", + "start_pref_e": 0.02, + "limit_pref_e": 1, + "start_pref_f": 1000, + "limit_pref_f": 1, + "start_pref_v": 0, + "limit_pref_v": 0 + }, + "water_2": { + "type": "ener", + "start_pref_e": 0.02, + "limit_pref_e": 1, + "start_pref_f": 1000, + "limit_pref_f": 1, + "start_pref_v": 0, + "limit_pref_v": 0 + } + }, + "training": { + "num_epoch_dict": { + "water_1": 10, + "water_2": 20 + }, + "data_dict": { + "water_1": { + "training_data": { + "systems": [ + "../../water/data/data_0/", + "../../water/data/data_1/", + "../../water/data/data_2/" + ], + "batch_size": 1, + "_comment": "that's all" + }, + "validation_data": { + "systems": [ + "../../water/data/data_3/" + ], + "batch_size": 1, + "_comment": "that's all" + } + }, + "water_2": { + "training_data": { + "systems": [ + "../../water/data/data_0/", + "../../water/data/data_1/", + "../../water/data/data_2/" + ], + "batch_size": 1, + "_comment": "that's all" + } + } + }, + "seed": 10, + "disp_file": "lcurve.out", + "disp_freq": 100, + "save_freq": 100, + "_comment": "that's all" + } +} diff --git a/source/tests/common/test_examples.py b/source/tests/common/test_examples.py index 6c9f1e43a2..d463e9cd2b 100644 --- a/source/tests/common/test_examples.py +++ b/source/tests/common/test_examples.py @@ -63,6 +63,7 @@ p_examples / "property" / "train" / "input_torch.json", p_examples / "water" / "se_e3_tebd" / "input_torch.json", p_examples / "hessian" / "single_task" / "input.json", + p_examples / "water" / "se_e2_a" / "input_torch_num_epoch.json", ) input_files_multi = ( @@ -70,6 +71,10 @@ p_examples / "water_multi_task" / "pytorch_example" / "input_torch_sharefit.json", p_examples / "water_multi_task" / "pytorch_example" / "input_torch_with_alias.json", p_examples / "hessian" / "multi_task" / "input.json", + p_examples + / "water_multi_task" + / "pytorch_example" + / "input_torch_num_epoch_dict.json", )