[v1] upgrade batching (#9751)

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Yaowei Zheng
2026-01-12 00:21:36 +08:00
committed by GitHub
parent 15b87f3125
commit a296723697
18 changed files with 273 additions and 97 deletions

View File

@@ -34,30 +34,29 @@ from ...accelerator.interface import DistributedInterface
from ...config import BatchingStrategy
from ...utils import logging
from ...utils.helper import pad_and_truncate
from ...utils.types import BatchInput, ModelInput, TorchDataset
from ...utils.objects import StatefulBuffer
from ...utils.types import BatchInfo, BatchInput, ModelInput, TorchDataset
from .rendering import Renderer
logger = logging.get_logger(__name__)
def default_collate_fn(
buffer: list[ModelInput], buffer_tokens: int, micro_batch_size: int, num_micro_batch: int, cutoff_len: int
) -> tuple[list[ModelInput], int, list[BatchInput]]:
def default_collate_fn(buffer: StatefulBuffer, batch_info: BatchInfo) -> list[BatchInput] | None:
micro_batch_size = batch_info["micro_batch_size"]
num_micro_batch = batch_info["num_micro_batch"]
cutoff_len = batch_info["cutoff_len"]
batch_size = micro_batch_size * num_micro_batch
if len(buffer) < batch_size:
return buffer, buffer_tokens, None
samples = buffer[:batch_size]
buffer = buffer[batch_size:]
buffer_tokens -= sum(len(sample["input_ids"]) for sample in samples)
return None
samples = buffer.get(batch_size)
batch = []
for i in range(num_micro_batch):
micro_batch = samples[i * micro_batch_size : (i + 1) * micro_batch_size]
batch.append(default_collate(pad_and_truncate(micro_batch, cutoff_len)))
return buffer, buffer_tokens, batch
return batch
class BatchGenerator(Iterator):
@@ -105,9 +104,14 @@ class BatchGenerator(Iterator):
self._is_resuming: bool = False
self._data_iter = iter(self._data_provider)
self._buffer: list[ModelInput] = []
self._buffer_tokens: int = 0
self._max_buffer_tokens: int = self.micro_batch_size * self.num_micro_batch * self.cutoff_len
self._buffer = StatefulBuffer()
self._batch_info: BatchInfo = {
"micro_batch_size": self.micro_batch_size,
"num_micro_batch": self.num_micro_batch,
"cutoff_len": self.cutoff_len,
"data_iter": self._data_iter,
}
logger.info_rank0(
f"Init unified data loader with global batch size {self.global_batch_size}, "
@@ -145,7 +149,7 @@ class BatchGenerator(Iterator):
else:
from ...plugins.trainer_plugins.batching import BatchingPlugin
self._length = BatchingPlugin(self.batching_strategy).compute_length()
self._length = BatchingPlugin(self.batching_strategy).compute_length(self._data_provider)
raise NotImplementedError("Batching strategy other than NORMAL is not supported yet.")
def __len__(self) -> int:
@@ -161,38 +165,34 @@ class BatchGenerator(Iterator):
return self
def __next__(self):
batch = self._next_batch()
self._fill_buffer()
batch = self._generate_batch()
if batch is None:
raise StopIteration
return batch
def _next_batch(self) -> list[BatchInput] | None:
while self._buffer_tokens < self._max_buffer_tokens:
try:
samples: list[ModelInput] = next(self._data_iter)
except StopIteration:
break
num_tokens = sum(len(sample["input_ids"]) for sample in samples)
self._buffer.extend(samples)
self._buffer_tokens += num_tokens
return self._build_batch()
def _build_batch(self) -> list[BatchInput] | None:
def _fill_buffer(self) -> None:
if self.batching_strategy == BatchingStrategy.NORMAL:
self._buffer, self._buffer_tokens, batch = default_collate_fn(
self._buffer, self._buffer_tokens, self.micro_batch_size, self.num_micro_batch, self.cutoff_len
)
return batch
while len(self._buffer) < self.micro_batch_size * self.num_micro_batch:
try:
samples: list[ModelInput] = next(self._data_iter)
except StopIteration:
break
self._buffer.put(samples)
else:
from ...plugins.trainer_plugins.batching import BatchingPlugin
self._buffer, self._buffer_tokens, batch = BatchingPlugin(self.batching_strategy)(
self._buffer, self._buffer_tokens, self.micro_batch_size, self.num_micro_batch, self.cutoff_len
)
return batch
BatchingPlugin(self.batching_strategy).fill_buffer(self._buffer, self._batch_info)
def _generate_batch(self) -> list[BatchInput] | None:
if self.batching_strategy == BatchingStrategy.NORMAL:
return default_collate_fn(self._buffer, self._batch_info)
else:
from ...plugins.trainer_plugins.batching import BatchingPlugin
return BatchingPlugin(self.batching_strategy).generate_batch(self._buffer, self._batch_info)
def state_dict(self) -> dict[str, Any]:
return {

View File

@@ -22,7 +22,19 @@ from ...utils.types import Message, ModelInput, Processor, ToolCall
class RenderingPlugin(BasePlugin):
pass
def render_messages(
self,
processor: Processor,
messages: list[Message],
tools: str | None = None,
is_generate: bool = False,
) -> ModelInput:
"""Render messages in the template format."""
return self["render_messages"](processor, messages, tools, is_generate)
def parse_messages(self, generated_text: str) -> Message:
"""Parse messages in the template format."""
return self["parse_messages"](generated_text)
def _update_model_input(

View File

@@ -12,8 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils.objects import StatefulBuffer
from ...utils.plugin import BasePlugin
from ...utils.types import BatchInfo, BatchInput, DataLoader
class BatchingPlugin(BasePlugin):
pass
def compute_length(self, dataloader: DataLoader) -> int:
"""Compute the length of the batch generator."""
raise NotImplementedError()
def fill_buffer(self, buffer: StatefulBuffer, batch_info: BatchInfo) -> None:
"""Fill the buffer with data."""
raise NotImplementedError()
def generate_batch(self, buffer: StatefulBuffer, batch_info: BatchInfo) -> list[BatchInput] | None:
"""Generate a batch from the buffer."""
raise NotImplementedError()

View File

@@ -0,0 +1,67 @@
# Copyright 2025 Optuna, HuggingFace Inc. and the LlamaFactory team.
#
# This code is inspired by the HuggingFace's transformers library.
# https://github.com/huggingface/transformers/blob/v5.0.0rc0/src/transformers/utils/logging.py
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .types import ModelInput
class StatefulBuffer:
"""A buffer that stores model inputs."""
def __init__(self, max_buffer_size: int = 1_000_000_000) -> None:
self._buffer: list[ModelInput] = []
self._buffer_size: int = 0
self._max_buffer_size: int = max_buffer_size
def __len__(self) -> int:
return len(self._buffer)
@property
def size(self) -> int:
return self._buffer_size
def put(self, samples: list[ModelInput]) -> None:
"""Add samples to the buffer."""
num_tokens = sum(len(sample["input_ids"]) for sample in samples)
if self._buffer_size + num_tokens > self._max_buffer_size:
raise ValueError(f"Buffer size exceeds max buffer size {self._max_buffer_size}.")
self._buffer.extend(samples)
self._buffer_size += num_tokens
def get(self, value: int) -> list[ModelInput]:
"""Get samples from the buffer and remove them."""
samples = self._buffer[:value]
self._buffer_size -= sum(len(sample["input_ids"]) for sample in samples)
del self._buffer[:value]
return samples
def clear(self) -> None:
"""Clear the buffer."""
self._buffer = []
self._buffer_size = 0
def state_dict(self) -> dict:
"""Returns the state of the buffer."""
return {
"buffer": self._buffer,
"buffer_size": self._buffer_size,
}
def load_state_dict(self, state_dict: dict) -> None:
"""Loads the state into the buffer."""
self._buffer = state_dict["buffer"]
self._buffer_size = state_dict["buffer_size"]

View File

@@ -15,6 +15,7 @@
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from . import logging
@@ -26,33 +27,37 @@ class BasePlugin:
"""Base class for plugins.
A plugin is a callable object that can be registered and called by name.
Example usage:
```python
class PrintPlugin(BasePlugin):
def again(self): # optional
self["again"]()
@PrintPlugin("hello").register()
def print_hello():
print("Hello world!")
@PrintPlugin("hello").register("again")
def print_hello_again():
print("Hello world! Again.")
PrintPlugin("hello")()
PrintPlugin("hello").again()
```
"""
_registry: dict[str, dict[str, Callable]] = defaultdict(dict)
def __init__(self, name: str | None = None):
"""Initialize the plugin with a name.
Args:
name (str): The name of the plugin.
"""
def __init__(self, name: str | None = None) -> None:
"""Initialize the plugin with a name."""
self.name = name
def register(self, method_name: str = "__call__"):
"""Decorator to register a function as a plugin.
Example usage:
```python
@PrintPlugin("hello").register()
def print_hello():
print("Hello world!")
@PrintPlugin("hello").register("again")
def print_hello_again():
print("Hello world! Again.")
```
"""
def register(self, method_name: str = "__call__") -> Callable:
"""Decorator to register a function as a plugin."""
if self.name is None:
raise ValueError("Plugin name should be specified.")
@@ -65,27 +70,16 @@ class BasePlugin:
return decorator
def __call__(self, *args, **kwargs):
"""Call the registered function with the given arguments.
def __call__(self, *args, **kwargs) -> Any:
"""Call the registered function with the given arguments."""
return self["__call__"](*args, **kwargs)
Example usage:
```python
PrintPlugin("hello")()
```
"""
if "__call__" not in self._registry[self.name]:
raise ValueError(f"Method __call__ of plugin {self.name} is not registered.")
def __getattr__(self, method_name: str) -> Callable:
"""Get the registered function with the given name."""
return self[method_name]
return self._registry[self.name]["__call__"](*args, **kwargs)
def __getattr__(self, method_name: str):
"""Get the registered function with the given name.
Example usage:
```python
PrintPlugin("hello").again()
```
"""
def __getitem__(self, method_name: str) -> Callable:
"""Get the registered function with the given name."""
if method_name not in self._registry[self.name]:
raise ValueError(f"Method {method_name} of plugin {self.name} is not registered.")
@@ -98,7 +92,8 @@ if __name__ == "__main__":
"""
class PrintPlugin(BasePlugin):
pass
def again(self): # optional
self["again"]()
@PrintPlugin("hello").register()
def print_hello():

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Literal, NotRequired, TypedDict, Union
@@ -161,3 +162,14 @@ class BatchInput(TypedDict, total=False):
"""Position ids for the model (optional)."""
token_type_ids: NotRequired[Tensor]
"""Token type ids used in DPO, 0 represents the chosen messages, 1 represents the rejected messages."""
class BatchInfo(TypedDict):
micro_batch_size: int
"""Micro batch size."""
num_micro_batch: int
"""Number of micro batches."""
cutoff_len: int
"""Cutoff length."""
data_iter: Iterator[list[ModelInput]]
"""Data iterator."""