Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .stats.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
configured_endpoints: 74
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/togetherai%2Ftogetherai-da5b9df3bfe0d31a76c91444c9eba060ad607d7d5a4e7483c5cc3fe2cac0f25e.yml
openapi_spec_hash: 7efd2ae2111f3a9bf190485828a13252
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/togetherai%2Ftogetherai-41003d5ea35fdc4ec5c03cd7933dc5fee020d818abb1ea71f1d20d767cce06a0.yml
openapi_spec_hash: 651ade27191eb5ad232ec508418fe4cb
config_hash: b66198d27b4d5c152688ff6cccfdeab5
3 changes: 3 additions & 0 deletions src/together/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:
),
response=self.http_response,
client=cast(Any, self._client),
options=self._options,
),
)

Expand All @@ -162,6 +163,7 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:
cast_to=extract_stream_chunk_type(self._stream_cls),
response=self.http_response,
client=cast(Any, self._client),
options=self._options,
),
)

Expand All @@ -175,6 +177,7 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:
cast_to=cast_to,
response=self.http_response,
client=cast(Any, self._client),
options=self._options,
),
)

Expand Down
11 changes: 8 additions & 3 deletions src/together/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import inspect
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, AsyncIterator, cast
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, Optional, AsyncIterator, cast
from typing_extensions import Self, Protocol, TypeGuard, override, get_origin, runtime_checkable

import httpx
Expand All @@ -14,6 +14,7 @@

if TYPE_CHECKING:
from ._client import Together, AsyncTogether
from ._models import FinalRequestOptions


_T = TypeVar("_T")
Expand All @@ -23,7 +24,7 @@ class Stream(Generic[_T]):
"""Provides the core interface to iterate over a synchronous stream response."""

response: httpx.Response

_options: Optional[FinalRequestOptions] = None
_decoder: SSEBytesDecoder

def __init__(
Expand All @@ -32,10 +33,12 @@ def __init__(
cast_to: type[_T],
response: httpx.Response,
client: Together,
options: Optional[FinalRequestOptions] = None,
) -> None:
self.response = response
self._cast_to = cast_to
self._client = client
self._options = options
self._decoder = client._make_sse_decoder()
self._iterator = self.__stream__()

Expand Down Expand Up @@ -105,7 +108,7 @@ class AsyncStream(Generic[_T]):
"""Provides the core interface to iterate over an asynchronous stream response."""

response: httpx.Response

_options: Optional[FinalRequestOptions] = None
_decoder: SSEDecoder | SSEBytesDecoder

def __init__(
Expand All @@ -114,10 +117,12 @@ def __init__(
cast_to: type[_T],
response: httpx.Response,
client: AsyncTogether,
options: Optional[FinalRequestOptions] = None,
) -> None:
self.response = response
self._cast_to = cast_to
self._client = client
self._options = options
self._decoder = client._make_sse_decoder()
self._iterator = self.__stream__()

Expand Down
37 changes: 33 additions & 4 deletions src/together/lib/cli/api/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ def _human_readable_time(timedelta: float) -> str:
return " ".join(parts) if parts else "0s"


def generate_progress_text(
finetune_job: Union[Data, FinetuneResponse, _FinetuneResponse], current_time: datetime
) -> str:
"""Generate a progress text for a finetune job.
Args:
finetune_job: The finetune job to generate a progress text for.
current_time: The current time.
Returns:
A string representing the progress text.
"""
time_text = ""
if getattr(finetune_job, "started_at", None) is not None and isinstance(finetune_job.started_at, datetime):
started_at = finetune_job.started_at.astimezone()

if finetune_job.progress is not None:
if current_time < started_at:
return ""

if not finetune_job.progress.estimate_available:
return ""

if finetune_job.progress.seconds_remaining <= 0:
return ""

elapsed_time = (current_time - started_at).total_seconds()
time_left = "N/A"
if finetune_job.progress.seconds_remaining > elapsed_time:
time_left = _human_readable_time(finetune_job.progress.seconds_remaining - elapsed_time)
time_text = f"{time_left} left"
return time_text


def generate_progress_bar(
finetune_job: Union[Data, FinetuneResponse, _FinetuneResponse], current_time: datetime, use_rich: bool = False
) -> str:
Expand Down Expand Up @@ -122,10 +154,7 @@ def generate_progress_bar(
percentage = ratio_filled * 100
filled = math.ceil(ratio_filled * _PROGRESS_BAR_WIDTH)
bar = "█" * filled + "░" * (_PROGRESS_BAR_WIDTH - filled)
time_left = "N/A"
if finetune_job.progress.seconds_remaining > elapsed_time:
time_left = _human_readable_time(finetune_job.progress.seconds_remaining - elapsed_time)
time_text = f"{time_left} left"
time_text = generate_progress_text(finetune_job, current_time)
progress = f"Progress: {bar} [bold]{percentage:>3.0f}%[/bold] [yellow]{time_text}[/yellow]"

if use_rich:
Expand Down
2 changes: 1 addition & 1 deletion src/together/lib/cli/api/beta/jig/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DeployConfig:
port: int = 8000
environment_variables: dict[str, str] = field(default_factory=dict[str, str])
command: Optional[list[str]] = None
autoscaling: dict[str, str] = field(default_factory=dict[str, str])
autoscaling: dict[str, Union[str, int, float]] = field(default_factory=dict[str, Union[str, int, float]])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do autoscaling['target'] = float(autoscaling['target'] before sending it in the request? so that existing configs work with the backend change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still wondering if we can just greenfield this at this stage. Otherwise, very least we should show a deprecation warning for str floats imo.

health_check_path: str = "/health"
termination_grace_period_seconds: int = 300
volume_mounts: list[VolumeMount] = field(default_factory=list[VolumeMount])
Expand Down
30 changes: 22 additions & 8 deletions src/together/lib/cli/api/files/list.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,47 @@
from typing import Any, Dict, List
from textwrap import wrap
from datetime import datetime, timezone

import click
from tabulate import tabulate

from together import Together
from together.lib.utils import convert_bytes, convert_unix_timestamp
from together._utils._json import openapi_dumps
from together.lib.utils.tools import format_timestamp
from together.lib.cli.api._utils import handle_api_errors


@click.command()
@click.pass_context
@click.option("--json", is_flag=True, help="Print output in JSON format")
@handle_api_errors("Files")
def list(ctx: click.Context) -> None:
def list(ctx: click.Context, json: bool) -> None:
"""List files"""
client: Together = ctx.obj

response = client.files.list()

response.data = response.data or []

# Use a default datetime for None values to make sure the key function always returns a comparable value
# Sort newest to oldest
epoch_start = datetime.fromtimestamp(0, tz=timezone.utc)
response.data.sort(key=lambda x: x.created_at or epoch_start, reverse=True)

if json:
click.echo(openapi_dumps(response.data))
return

display_list: List[Dict[str, Any]] = []
for i in response.data or []:
for i in response.data:
display_list.append(
{
"File name": "\n".join(wrap(i.filename or "", width=30)),
"File ID": i.id,
"Size": convert_bytes(float(str(i.bytes))), # convert to string for mypy typing
"Created At": convert_unix_timestamp(i.created_at or 0),
"ID": click.style(i.id, fg="blue"),
"File name": click.style(i.filename or "", fg="blue"),
"Size": click.style(convert_bytes(float(str(i.bytes))), fg="blue"), # convert to string for mypy typing
"Created At": click.style(format_timestamp(convert_unix_timestamp(i.created_at or 0)), fg="blue"),
}
)
table = tabulate(display_list, headers="keys", tablefmt="grid", showindex=True)
table = tabulate(display_list, headers="keys")

click.echo(table)
18 changes: 15 additions & 3 deletions src/together/lib/cli/api/files/upload.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import pathlib
from typing import get_args

import click

from together import Together
from together.types import FilePurpose
from together._utils._json import openapi_dumps
from together.lib.cli.api._utils import handle_api_errors


Expand All @@ -27,12 +27,24 @@
default=True,
help="Whether to check the file before uploading.",
)
@click.option(
"--json",
is_flag=True,
help="Output the response in JSON format",
)
@handle_api_errors("Files")
def upload(ctx: click.Context, file: pathlib.Path, purpose: FilePurpose, check: bool) -> None:
def upload(ctx: click.Context, file: pathlib.Path, purpose: FilePurpose, check: bool, json: bool) -> None:
"""Upload file"""

client: Together = ctx.obj

response = client.files.upload(file=file, purpose=purpose, check=check)

click.echo(json.dumps(response.model_dump(exclude_none=True), indent=4))
if json:
click.echo(openapi_dumps(response.model_dump(exclude_none=True)))
return

click.echo(
click.style("> Success! ", fg="blue")
+ f"File uploaded for {click.style(response.purpose, bold=True)}. File ID: {click.style(response.id, fg='green', bold=True)}"
)
49 changes: 27 additions & 22 deletions src/together/lib/cli/api/fine_tuning/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path

import click
from rich import print as rprint
from click.core import ParameterSource

from together import Together
Expand All @@ -13,19 +12,21 @@
from together.lib.cli.api._utils import INT_WITH_MAX, BOOL_WITH_AUTO, handle_api_errors
from together.lib.resources.fine_tuning import get_model_limits

_CONFIRMATION_MESSAGE = (
"You are about to create a fine-tuning job. "
"The estimated price of this job is {price}. "
"The actual cost of your job will be determined by the model size, the number of tokens "
"in the training file, the number of tokens in the validation file, the number of epochs, and "
"the number of evaluations. Visit https://www.together.ai/pricing to learn more about pricing.\n"
"{warning}"
"You can pass `-y` or `--confirm` to your command to skip this message.\n\n"
"Do you want to proceed?"
)

def get_confirmation_message(price: str, warning: str) -> str:
return (
"\nYou are about to create a fine-tuning job. The estimated price of this job is "
+ f"{click.style(f'{price}', fg='blue', bold=True)}\n\n"
+ "The actual cost of your job will be determined by the model size, the number of tokens"
+ "in the training file, the number of tokens in the validation file, the number of epochs, and "
+ "the number of evaluations. Visit https://www.together.ai/pricing to learn more about pricing.\n"
+ warning
+ "\nDo you want to proceed?"
)


_WARNING_MESSAGE_INSUFFICIENT_FUNDS = (
"The estimated price of this job is significantly greater than your current credit limit and balance combined. "
"\nThe estimated price of this job is significantly greater than your current credit limit and balance combined. "
"It will likely get cancelled due to insufficient funds. "
"Consider increasing your credit limit at https://api.together.xyz/settings/profile\n"
)
Expand Down Expand Up @@ -285,7 +286,10 @@ def create(
)

if model is None and from_checkpoint is None:
raise click.BadParameter("You must specify either a model or a checkpoint")
raise click.MissingParameter(
"",
param_type="option --model or --from-checkpoint",
)

model_name = model
if from_checkpoint is not None:
Expand Down Expand Up @@ -343,9 +347,11 @@ def create(

training_method_cls: pe_params.TrainingMethod
if training_method == "sft":
train_on_inputs = train_on_inputs or "auto"
training_args["train_on_inputs"] = train_on_inputs
training_method_cls = pe_params.TrainingMethodTrainingMethodSft(
method="sft",
train_on_inputs=train_on_inputs or "auto",
train_on_inputs=train_on_inputs,
)
else:
training_method_cls = pe_params.TrainingMethodTrainingMethodDpo(
Expand Down Expand Up @@ -396,7 +402,7 @@ def create(
else:
warning = ""

confirmation_message = _CONFIRMATION_MESSAGE.format(
confirmation_message = get_confirmation_message(
price=price,
warning=warning,
)
Expand All @@ -407,13 +413,12 @@ def create(
verbose=True,
)

report_string = f"Successfully submitted a fine-tuning job {response.id}"
# created_at reports UTC time, we use .astimezone() to convert to local time
formatted_time = response.created_at.astimezone().strftime("%m/%d/%Y, %H:%M:%S")
report_string += f" at {formatted_time}"
rprint(report_string)
else:
click.echo("No confirmation received, stopping job launch")
click.echo(
click.style(f"\n\nSuccess!", fg="green", bold=True)
+ click.style(" Your fine-tuning job ", fg="green")
+ click.style(response.id, fg="blue", bold=True)
+ click.style(" has been submitted.", fg="green")
)


def _check_path_exists(path_string: str) -> bool:
Expand Down
27 changes: 19 additions & 8 deletions src/together/lib/cli/api/fine_tuning/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import click

from together import NOT_GIVEN, NotGiven, Together
from together import NOT_GIVEN, APIError, NotGiven, Together, APIStatusError
from together.lib import DownloadManager
from together.lib.cli.api._utils import handle_api_errors
from together.types.finetune_response import TrainingTypeFullTrainingType, TrainingTypeLoRaTrainingType

_FT_JOB_WITH_STEP_REGEX = r"^ft-[\dabcdef-]+:\d+$"
Expand Down Expand Up @@ -40,6 +41,7 @@
default="merged",
help="Specifies checkpoint type. 'merged' and 'adapter' options work only for LoRA jobs.",
)
@handle_api_errors("Fine-tuning")
def download(
ctx: click.Context,
fine_tune_id: str,
Expand Down Expand Up @@ -84,11 +86,20 @@ def download(
if isinstance(output_dir, str):
output = Path(output_dir)

file_path, file_size = DownloadManager(client).download(
url=url,
output=output,
remote_name=remote_name,
fetch_metadata=True,
)
try:
file_path, file_size = DownloadManager(client).download(
url=url,
output=output,
remote_name=remote_name,
fetch_metadata=True,
)

click.echo(json.dumps({"object": "local", "id": fine_tune_id, "filename": file_path, "size": file_size}, indent=4))
click.echo(
json.dumps({"object": "local", "id": fine_tune_id, "filename": file_path, "size": file_size}, indent=4)
)
except APIStatusError as e:
raise APIError(
"Training job is not downloadable. This may be because the job is not in a completed state.",
request=e.request,
body=None,
) from e
Loading