diff --git a/README.md b/README.md index c2c45c9a..a20209fc 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ The library now supports reasoning traces through the `reasoning_content` field - [Installing](#installing-the-library) - [Additional Nvidia packages](#additional-nvidia-packages) + - [Optional logging dependencies](#optional-logging-dependencies) - [Using the library](#using-the-library) - [Data format](#data-format) - [Reasoning content support](#reasoning-content-support-1) @@ -68,6 +69,23 @@ Editable install (development) pip install -e .[cuda] ``` +### Optional Logging Dependencies + +The library supports optional logging backends for experiment tracking. Install the ones you need: + +```bash +# MLflow logging +pip install mlflow + +# Weights & Biases logging +pip install wandb + +# TensorBoard logging +pip install tensorboard +``` + +For more details on configuring logging, see the [Logging Documentation](docs/logging.md). + ## Using the library See the `examples` dir for guided sample notebooks on library usage. Below provides some added details on library options: diff --git a/pyproject.toml b/pyproject.toml index 12def6fc..d9ee3dab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,9 @@ optional-dependencies.cuda = { file = ["requirements-cuda.txt"] } optional-dependencies.rocm = { file = ["requirements-rocm.txt"] } optional-dependencies.hpu = { file = ["requirements-hpu.txt"] } optional-dependencies.deepspeed = { file = ["requirements-deepspeed.txt"] } +optional-dependencies.mlflow = { file = ["requirements-mlflow.txt"] } +optional-dependencies.wandb = { file = ["requirements-wandb.txt"] } +optional-dependencies.tensorboard = { file = ["requirements-tensorboard.txt"] } [tool.setuptools.packages.find] where = ["src"] diff --git a/requirements-mlflow.txt b/requirements-mlflow.txt new file mode 100644 index 00000000..6d0d17c2 --- /dev/null +++ b/requirements-mlflow.txt @@ -0,0 +1 @@ +mlflow diff --git a/requirements-tensorboard.txt b/requirements-tensorboard.txt new file mode 100644 index 00000000..8ba46efc --- /dev/null +++ b/requirements-tensorboard.txt @@ -0,0 +1 @@ +tensorboard diff --git a/requirements-wandb.txt b/requirements-wandb.txt new file mode 100644 index 00000000..fff96000 --- /dev/null +++ b/requirements-wandb.txt @@ -0,0 +1 @@ +wandb diff --git a/src/instructlab/training/config.py b/src/instructlab/training/config.py index 599bab4d..1b9a3589 100644 --- a/src/instructlab/training/config.py +++ b/src/instructlab/training/config.py @@ -305,3 +305,38 @@ class TrainingArgs(BaseModel): log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field( default="INFO" ) + + mlflow_tracking_uri: str | None = Field( + default=None, + description="MLflow tracking server URI (e.g., 'http://localhost:5000'). Falls back to MLFLOW_TRACKING_URI env var.", + ) + + mlflow_experiment_name: str | None = Field( + default=None, + description="MLflow experiment name. Falls back to MLFLOW_EXPERIMENT_NAME env var.", + ) + + mlflow_run_name: str | None = Field( + default=None, + description="MLflow run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}", + ) + + wandb_project: str | None = Field( + default=None, + description="Weights & Biases project name.", + ) + + wandb_entity: str | None = Field( + default=None, + description="Weights & Biases team/entity name.", + ) + + wandb_run_name: str | None = Field( + default=None, + description="Weights & Biases run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}", + ) + + tensorboard_log_dir: str | None = Field( + default=None, + description="Directory for TensorBoard logs. Defaults to ckpt_output_dir if not specified.", + ) diff --git a/src/instructlab/training/logger.py b/src/instructlab/training/logger.py index d92b8975..f72c221b 100644 --- a/src/instructlab/training/logger.py +++ b/src/instructlab/training/logger.py @@ -2,17 +2,18 @@ This module provides a logging system for training machine learning models, supporting multiple logging backends including TensorBoard (tensorboard), Weights & Biases (wandb), -and structured JSONL logging (async). +MLflow (mlflow), and structured JSONL logging (async). Example Usage: ```python from instructlab.training.logger import setup_metric_logger - # Setup logging with TensorBoard and wandb + # Setup logging with TensorBoard and wandb (auto-detected from params) setup_metric_logger( - loggers=["tensorboard", "wandb"], - run_name="my_training_run", - output_dir="logs" + output_dir="logs", + tensorboard_log_dir="logs/tensorboard", + wandb_project="my_project", + wandb_run_name="my_training_run", ) # Log metrics @@ -73,6 +74,12 @@ except ImportError: wandb = None # type: ignore +try: + # Third Party + import mlflow +except ImportError: + mlflow = None # type: ignore + # Third Party from rich.logging import RichHandler import torch @@ -412,7 +419,7 @@ def _setup(self): if SummaryWriter is None: msg = ( "Could not initialize TensorBoardHandler because package tensorboard could not be imported.\n" - "Please ensure it is installed by running 'pip install tensorboard'" + "Please ensure it is installed by running: pip install tensorboard" ) raise RuntimeError(msg) os.makedirs(self.tboard_init_kwargs["log_dir"], exist_ok=True) @@ -550,8 +557,8 @@ def _setup(self): """Initialize the wandb run with the configured settings.""" if wandb is None: msg = ( - "Could not initialize WandbLogger because package wandb could not be imported.\n" - "Please ensure it is installed by running 'pip install wandb'" + "Could not initialize WandbHandler because package wandb could not be imported.\n" + "Please ensure it is installed by running: pip install wandb" ) raise RuntimeError(msg) self._wandb_run = wandb.init(**self.wandb_init_kwargs) @@ -581,6 +588,164 @@ def emit(self, record: logging.LogRecord): self._wandb_run.log(flat_dict, step=step) +class MLflowHandler(logging.Handler): + """Logger that sends metrics to MLflow. + + This handler expects a (nested) dictionary of metrics to be logged with string keys. + A step can be specified by passing `extra={"step": }` to the logging method. + To log hyperparameters, pass a (nested) mapping of hyperparameters to the logging method + and set `extra={"hparams": True}`. + + Example: + ```python + import logging + from instructlab.training.logger import MLflowHandler + + # Create handler + handler = MLflowHandler( + level=logging.INFO, + run_name="experiment_{time}", + tracking_uri="http://localhost:5000", + experiment_name="my_experiment" + ) + + # Create logger + logger = logging.getLogger("metrics") + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + # Log metrics + logger.info( + { + "training": { + "loss": 0.5, + "accuracy": 0.95 + } + }, + extra={"step": 100} + ) + + # Log hyperparameters + logger.info( + { + "learning_rate": 0.001, + "batch_size": 32 + }, + extra={"hparams": True} + ) + ``` + """ + + def __init__( + self, + level: int = logging.INFO, + run_name: str | None = None, + tracking_uri: str | None = None, + experiment_name: str | None = None, + **mlflow_init_kwargs: Any, + ): + """Initialize the MLflow logger and check for required dependencies. + + Args: + level: The logging level for this handler + run_name: Name of the run, can contain placeholders + tracking_uri: MLflow tracking server URI (e.g., "http://localhost:5000") + experiment_name: Name of the MLflow experiment + **mlflow_init_kwargs: Additional keyword arguments passed to mlflow.start_run() + """ + super().__init__(level) + + self.run_name = _substitute_placeholders(run_name) + self.tracking_uri = tracking_uri + self.experiment_name = experiment_name + self.mlflow_init_kwargs = mlflow_init_kwargs.copy() + + self._mlflow_run = None + self._owns_mlflow_run = False + + def _setup(self): + """Initialize the MLflow run with the configured settings.""" + if mlflow is None: + msg = ( + "Could not initialize MLflowHandler because package mlflow could not be imported.\n" + "Please ensure it is installed by running: pip install mlflow" + ) + raise RuntimeError(msg) + + # Always set tracking URI and experiment first (before checking for active run) + # This ensures the client is configured correctly even if we reuse an existing run + if self.tracking_uri: + mlflow.set_tracking_uri(self.tracking_uri) + + if self.experiment_name: + mlflow.set_experiment(self.experiment_name) + + # Reuse existing active run if one exists, otherwise start a new one + active = mlflow.active_run() + if active is not None: + self._mlflow_run = active + self._owns_mlflow_run = False + else: + self._mlflow_run = mlflow.start_run( + run_name=self.run_name, **self.mlflow_init_kwargs + ) + self._owns_mlflow_run = True + + def emit(self, record: logging.LogRecord): + """Emit a log record to MLflow. + + Args: + record: The log record to emit + """ + if self._mlflow_run is None: + self._setup() + + if not isinstance(record.msg, Mapping): + warnings.warn( + f"MLflowHandler expected a mapping, got {type(record.msg)}. Skipping log. " + "Please ensure the handler is configured correctly to filter out non-mapping objects." + ) + return + + flat_dict = _flatten_dict(record.msg, sep=".") + step = getattr(record, "step", None) + + if getattr(record, "hparams", None): + # Log as parameters - MLflow params must be strings + params_dict = {k: str(v) for k, v in flat_dict.items()} + mlflow.log_params(params_dict) + return + + # Filter to only numeric values for metrics + metrics_dict = {} + skipped_keys = [] + for k, v in flat_dict.items(): + try: + metrics_dict[k] = float(v) + except (ValueError, TypeError): + # Skip non-numeric values for metrics + skipped_keys.append(k) + + if skipped_keys: + logging.debug( + f"MLflowHandler skipped non-numeric metrics: {skipped_keys}. " + "Only numeric values can be logged as MLflow metrics." + ) + + if metrics_dict: + mlflow.log_metrics(metrics_dict, step=step) + + def close(self): + """End the MLflow run and cleanup resources.""" + if self._mlflow_run is not None: + # Only end the run if we started it (not if we reused an existing one) + if self._owns_mlflow_run: + mlflow.end_run() + self._mlflow_run = None + self._owns_mlflow_run = False + super().close() + + class AsyncStructuredHandler(logging.Handler): """Logger that asynchronously writes data to a JSONL file. @@ -708,50 +873,93 @@ def setup_root_logger(level="DEBUG"): ) -def setup_metric_logger(loggers, run_name, output_dir): - """Configure the metric logging system with specified backends. +def setup_metric_logger( + output_dir, + *, + mlflow_tracking_uri: str | None = None, + mlflow_experiment_name: str | None = None, + mlflow_run_name: str | None = None, + wandb_project: str | None = None, + wandb_entity: str | None = None, + wandb_run_name: str | None = None, + tensorboard_log_dir: str | None = None, +): + """Configure the metric logging system with auto-detected backends. This function sets up a comprehensive logging configuration that supports multiple logging backends simultaneously. It configures filters, handlers, - and loggers for structured metric logging. + and loggers for structured metric logging. Backends are automatically + detected based on the presence of their configuration parameters. + + Note: + Run names are configured per-backend (e.g., `mlflow_run_name`, `wandb_run_name`) + rather than using a shared global run name. This design provides explicit control + over each backend's naming without coupling them together. File-based loggers + use default templates when no run name is specified: TensorBoard uses + "{time}_rank{rank}", and async JSONL uses "training_params_and_metrics_global{rank}", + ensuring unique identifiers across distributed runs. Args: - loggers: A string or list of strings specifying which logging backends to use. - Supported values: "tensorboard", "wandb", "async" - run_name: Name for the current training run. Can include placeholders like - {time}, {rank}, {utc_time}, {local_rank}. output_dir: Directory where log files will be stored + mlflow_tracking_uri: MLflow tracking server URI (e.g., "http://localhost:5000"). + Falls back to MLFLOW_TRACKING_URI environment variable if not provided. + When set (or env var present), MLflow logging is automatically enabled. + mlflow_experiment_name: MLflow experiment name. + Falls back to MLFLOW_EXPERIMENT_NAME environment variable if not provided. + mlflow_run_name: MLflow run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}. + wandb_project: Weights & Biases project name. + When set (or WANDB_PROJECT env var present), wandb logging is automatically enabled. + wandb_entity: Weights & Biases team/entity name. + wandb_run_name: Weights & Biases run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}. + tensorboard_log_dir: Directory for TensorBoard logs. + When set, TensorBoard logging is automatically enabled. Example: ```python - # Setup logging with multiple backends + # Setup logging with MLflow (auto-detected from tracking URI) setup_metric_logger( - loggers=["tensorboard", "wandb", "async"], - run_name="experiment_{time}", - output_dir="logs" + output_dir="logs", + mlflow_tracking_uri="http://localhost:5000", + mlflow_experiment_name="my_experiment", + mlflow_run_name="my_run" + ) + + # Setup logging with wandb (auto-detected from project) + setup_metric_logger( + output_dir="logs", + wandb_project="my_project", + wandb_run_name="my_run" ) - # Setup logging with a single backend + # Setup logging with TensorBoard (auto-detected from log_dir) setup_metric_logger( - loggers="tensorboard", - run_name="my_run", - output_dir="logs" + output_dir="logs", + tensorboard_log_dir="logs/tensorboard" ) ``` """ - if not loggers: - return - # Enable package logging propagate_package_logs() - if isinstance(loggers, str): - loggers = loggers.split(",") - loggers = [logger.strip() for logger in loggers] + # Auto-detect which loggers to enable based on configuration + detected_loggers = [] + if ( + mlflow_tracking_uri + or mlflow_experiment_name + or mlflow_run_name + or os.environ.get("MLFLOW_TRACKING_URI") + or os.environ.get("MLFLOW_EXPERIMENT_NAME") + ): + detected_loggers.append("mlflow") + if wandb_project or os.environ.get("WANDB_PROJECT"): + detected_loggers.append("wandb") + if tensorboard_log_dir: + detected_loggers.append("tensorboard") + + # Always include async logger for file-based logging alongside other loggers + loggers = [*detected_loggers, "async"] async_filters = ["is_mapping"] - if run_name is not None and "{rank}" not in run_name: - async_filters.append("is_rank0") logging_config = { "version": 1, @@ -768,19 +976,30 @@ def setup_metric_logger(loggers, run_name, output_dir): "async": { "()": AsyncStructuredHandler, "log_dir": output_dir, - "run_name": run_name, + "run_name": None, # Uses default template "filters": async_filters, }, "tensorboard": { "()": TensorBoardHandler, - "log_dir": output_dir, - "run_name": run_name, + "log_dir": tensorboard_log_dir or output_dir, + "run_name": None, # Uses default template "filters": ["is_mapping", "is_rank0"], }, "wandb": { "()": WandbHandler, "log_dir": output_dir, - "run_name": run_name, + "run_name": wandb_run_name, + "project": wandb_project, + "entity": wandb_entity, + "filters": ["is_mapping", "is_rank0"], + }, + "mlflow": { + "()": MLflowHandler, + "run_name": mlflow_run_name, + "tracking_uri": mlflow_tracking_uri + or os.environ.get("MLFLOW_TRACKING_URI"), + "experiment_name": mlflow_experiment_name + or os.environ.get("MLFLOW_EXPERIMENT_NAME"), "filters": ["is_mapping", "is_rank0"], }, }, diff --git a/src/instructlab/training/main_ds.py b/src/instructlab/training/main_ds.py index e05c9eae..c3af5ba8 100644 --- a/src/instructlab/training/main_ds.py +++ b/src/instructlab/training/main_ds.py @@ -275,7 +275,16 @@ def main(args): "DeepSpeed was selected and CPU offloading was requested, but DeepSpeedCPUAdam could not be imported. This likely means you need to build DeepSpeed with the CPU adam flags." ) - setup_metric_logger(args.logger_type, args.run_name, args.output_dir) + setup_metric_logger( + args.output_dir, + mlflow_tracking_uri=args.mlflow_tracking_uri, + mlflow_experiment_name=args.mlflow_experiment_name, + mlflow_run_name=args.mlflow_run_name, + wandb_project=args.wandb_project, + wandb_entity=args.wandb_entity, + wandb_run_name=args.wandb_run_name, + tensorboard_log_dir=args.tensorboard_log_dir, + ) metric_logger = logging.getLogger("instructlab.training.metrics") if os.environ["LOCAL_RANK"] == "0": metric_logger.info(vars(args), extra={"hparams": True}) @@ -460,7 +469,16 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: # Enable package logging propagation before setting up loggers propagate_package_logs(True) setup_root_logger(train_args.log_level) - setup_metric_logger("async", None, train_args.ckpt_output_dir) + setup_metric_logger( + train_args.ckpt_output_dir, + mlflow_tracking_uri=train_args.mlflow_tracking_uri, + mlflow_experiment_name=train_args.mlflow_experiment_name, + mlflow_run_name=train_args.mlflow_run_name, + wandb_project=train_args.wandb_project, + wandb_entity=train_args.wandb_entity, + wandb_run_name=train_args.wandb_run_name, + tensorboard_log_dir=train_args.tensorboard_log_dir, + ) logger = logging.getLogger("instructlab.training") logger.info("Starting training setup...") @@ -551,6 +569,22 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: ] ) + # Add optional logging parameters + if train_args.mlflow_tracking_uri is not None: + command.append(f"--mlflow_tracking_uri={train_args.mlflow_tracking_uri}") + if train_args.mlflow_experiment_name is not None: + command.append(f"--mlflow_experiment_name={train_args.mlflow_experiment_name}") + if train_args.mlflow_run_name is not None: + command.append(f"--mlflow_run_name={train_args.mlflow_run_name}") + if train_args.wandb_project is not None: + command.append(f"--wandb_project={train_args.wandb_project}") + if train_args.wandb_entity is not None: + command.append(f"--wandb_entity={train_args.wandb_entity}") + if train_args.wandb_run_name is not None: + command.append(f"--wandb_run_name={train_args.wandb_run_name}") + if train_args.tensorboard_log_dir is not None: + command.append(f"--tensorboard_log_dir={train_args.tensorboard_log_dir}") + if train_args.pretraining_config is not None: command.append(f"--block-size={train_args.pretraining_config.block_size}") command.append( @@ -767,8 +801,48 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: help="Save full model state using Accelerate after finishing an epoch.", ) parser.add_argument("--log_level", type=str, default="INFO") - parser.add_argument("--run_name", type=str, default=None) - parser.add_argument("--logger_type", type=str, default="async") + parser.add_argument( + "--mlflow_tracking_uri", + type=str, + default=None, + help="MLflow tracking server URI (e.g., 'http://localhost:5000')", + ) + parser.add_argument( + "--mlflow_experiment_name", + type=str, + default=None, + help="MLflow experiment name", + ) + parser.add_argument( + "--mlflow_run_name", + type=str, + default=None, + help="MLflow run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}", + ) + parser.add_argument( + "--wandb_project", + type=str, + default=None, + help="Weights & Biases project name", + ) + parser.add_argument( + "--wandb_entity", + type=str, + default=None, + help="Weights & Biases team/entity name", + ) + parser.add_argument( + "--wandb_run_name", + type=str, + default=None, + help="Weights & Biases run name. Supports placeholders: {time}, {rank}, {utc_time}, {local_rank}", + ) + parser.add_argument( + "--tensorboard_log_dir", + type=str, + default=None, + help="Directory for TensorBoard logs. Defaults to output_dir if not specified.", + ) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--mock_data", action="store_true") parser.add_argument("--mock_len", type=int, default=2600)