Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/compute-pricing.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ This guide explains how to configure your node’s Docker compute environments a
## Overview

- **Configuration**: Define compute environments via the `DOCKER_COMPUTE_ENVIRONMENTS` environment variable (JSON) or via `config.json` under `dockerComputeEnvironments`.
- **Environment**: Is a group of resources, payment and accesslists.
- **Resources**: Each environment declares resources (e.g. `cpu`, `ram`, `disk`, and optionally GPUs). You must declare a `disk` resource.
- **Pricing**: For each chain and fee token, you set a `price` per resource. Cost is computed as **price × amount × duration (in minutes, rounded up)**.
- **Free**: Environments which does not require a payment for the resources, but most likley are very limited in terms of resources available and job duration.
- **Image building**: **Free jobs cannot build images** (Dockerfiles are not allowed). For **paid jobs**, **image build time counts toward billable duration** and also consumes the job’s `maxJobDuration`.

## Pricing Units

Expand Down
1 change: 1 addition & 0 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of
- **maxJobDuration**: Maximum duration in seconds for a free job.
- **minJobDuration**: Minimum duration in seconds for a free job.
- **maxJobs**: Maximum number of simultaneous free jobs.
- **allowImageBuild**: If building images is allowed on free envs. Default: false
- **access**: Access control configuration for free compute jobs. Works the same as the main `access` field.
- **addresses**: Array of Ethereum addresses allowed to run free compute jobs.
- **accessLists**: Array of AccessList contract addresses for free compute access control.
Expand Down
3 changes: 3 additions & 0 deletions src/@types/C2D/C2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export interface ComputeEnvironmentFreeOptions {
maxJobs?: number // maximum number of simultaneous free jobs
resources?: ComputeResource[]
access: ComputeAccessList
allowImageBuild?: boolean
}
export interface ComputeEnvironmentBaseConfig {
description?: string // v1
Expand Down Expand Up @@ -280,6 +281,8 @@ export interface DBComputeJob extends ComputeJob {
encryptedDockerRegistryAuth?: string
output?: string // this is always an ECIES encrypted string, that decodes to ComputeOutput interface
jobIdHash: string
buildStartTimestamp?: string
buildStopTimestamp?: string
}

// make sure we keep them both in sync
Expand Down
5 changes: 3 additions & 2 deletions src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,9 @@ export abstract class C2DEngine {
for (const job of jobs) {
if (job.environment === env.id) {
if (job.queueMaxWaitTime === 0) {
const timeElapsed =
new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp)
const timeElapsed = job.buildStartTimestamp
? new Date().getTime() / 1000 - Number.parseFloat(job?.buildStartTimestamp)
: new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp)
totalJobs++
maxRunningTime += job.maxJobDuration - timeElapsed
if (job.isFree) {
Expand Down
108 changes: 92 additions & 16 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class C2DEngineDocker extends C2DEngine {
private cpuAllocations: Map<string, number[]> = new Map()
private envCpuCores: number[] = []
private cpuOffset: number

public constructor(
clusterConfig: C2DClusterInfo,
db: C2DDatabase,
Expand Down Expand Up @@ -414,11 +415,11 @@ export class C2DEngineDocker extends C2DEngine {
}

// Process each job to determine what operation is needed
let duration
for (const job of jobs) {
// Calculate algo duration
const algoDuration =
parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp)
job.algoDuration = algoDuration
duration = parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp)
duration += this.getValidBuildDurationSeconds(job)

// Free jobs or jobs without payment info - mark as finished
if (job.isFree || !job.payment) {
Expand Down Expand Up @@ -455,7 +456,7 @@ export class C2DEngineDocker extends C2DEngine {
continue
}

let minDuration = Math.abs(algoDuration)
let minDuration = Math.abs(duration)
if (minDuration > job.maxJobDuration) {
minDuration = job.maxJobDuration
}
Expand Down Expand Up @@ -1107,6 +1108,13 @@ export class C2DEngineDocker extends C2DEngine {
throw new Error(`additionalDockerFiles cannot be used with queued jobs`)
}
}
if (
algorithm.meta.container &&
algorithm.meta.container.dockerfile &&
!env.free.allowImageBuild
) {
throw new Error(`Building image is not allowed for free jobs`)
}

const job: DBComputeJob = {
clusterHash: this.getC2DConfig().hash,
Expand Down Expand Up @@ -1147,7 +1155,9 @@ export class C2DEngineDocker extends C2DEngine {
algoDuration: 0,
queueMaxWaitTime: queueMaxWaitTime || 0,
encryptedDockerRegistryAuth, // we store the encrypted docker registry auth in the job
output
output,
buildStartTimestamp: '0',
buildStopTimestamp: '0'
}

if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
Expand Down Expand Up @@ -1606,6 +1616,19 @@ export class C2DEngineDocker extends C2DEngine {
}

if (job.status === C2DStatusNumber.ConfiguringVolumes) {
// we have the image (etiher pulled or built)
// if built, check if build process took all allocated time
// if yes, stop the job
const buildDuration = this.getValidBuildDurationSeconds(job)
if (buildDuration > 0 && buildDuration >= job.maxJobDuration) {
job.isStarted = false
job.status = C2DStatusNumber.PublishingResults
job.statusText = C2DStatusText.PublishingResults
job.algoStartTimestamp = '0'
job.algoStopTimestamp = '0'
job.isRunning = false
await this.db.updateJob(job)
}
// create the volume & create container
// TO DO C2D: Choose driver & size
// get env info
Expand Down Expand Up @@ -1814,7 +1837,13 @@ export class C2DEngineDocker extends C2DEngine {
}

const timeNow = Date.now() / 1000
const expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration
let expiry

const buildDuration = this.getValidBuildDurationSeconds(job)
if (buildDuration > 0) {
// if job has build time, reduce the remaining algorithm runtime budget
expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration - buildDuration
} else expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration
CORE_LOGGER.debug(
'container running since timeNow: ' + timeNow + ' , Expiry: ' + expiry
)
Expand Down Expand Up @@ -1964,6 +1993,14 @@ export class C2DEngineDocker extends C2DEngine {

private allocateCpus(jobId: string, count: number): string | null {
if (this.envCpuCores.length === 0 || count <= 0) return null
const existing = this.cpuAllocations.get(jobId)
if (existing && existing.length > 0) {
const cpusetStr = existing.join(',')
CORE_LOGGER.info(
`CPU affinity: reusing existing cores [${cpusetStr}] for job ${jobId}`
)
return cpusetStr
}

const usedCores = new Set<number>()
for (const cores of this.cpuAllocations.values()) {
Expand Down Expand Up @@ -2341,7 +2378,7 @@ export class C2DEngineDocker extends C2DEngine {
const imageLogFile =
this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log'
const controller = new AbortController()
const timeoutMs = 5 * 60 * 1000
const timeoutMs = job.maxJobDuration * 1000
const timer = setTimeout(() => controller.abort(), timeoutMs)
try {
const pack = tarStream.pack()
Expand All @@ -2355,18 +2392,29 @@ export class C2DEngineDocker extends C2DEngine {
}
}
pack.finalize()
job.buildStartTimestamp = String(Date.now() / 1000)
await this.db.updateJob(job)

// Build the image using the tar stream as context (Node IncomingMessage extends stream.Readable)
const buildStream = (await this.docker.buildImage(pack, {
const cpuperiod = 100000
const ramGb = this.getResourceRequest(job.resources, 'ram')
const ramBytes =
ramGb && ramGb > 0 ? ramGb * 1024 * 1024 * 1024 : 1024 * 1024 * 1024

const cpus = this.getResourceRequest(job.resources, 'cpu')
const cpuquota = cpus && cpus > 0 ? Math.floor(cpus * cpuperiod) : 50000

const buildOptions: Dockerode.ImageBuildOptions = {
t: job.containerImage,
memory: 1024 * 1024 * 1024, // 1GB RAM in bytes
memswap: -1, // Disable swap
cpushares: 512, // CPU Shares (default is 1024)
cpuquota: 50000, // 50% of one CPU (100000 = 1 CPU)
cpuperiod: 100000, // Default period
memory: ramBytes,
memswap: ramBytes, // same as memory => no swap
cpushares: 1024, // CPU Shares (default is 1024)
cpuquota, // 100000 = 1 CPU with cpuperiod=100000
cpuperiod,
nocache: true, // prevent cache poison
abortSignal: controller.signal
})) as Readable
}
// Build the image using the tar stream as context (Node IncomingMessage extends stream.Readable)
const buildStream = (await this.docker.buildImage(pack, buildOptions)) as Readable

const onBuildData = (data: Buffer) => {
try {
Expand Down Expand Up @@ -2405,9 +2453,23 @@ export class C2DEngineDocker extends C2DEngine {
}
controller.signal.addEventListener('abort', onAbort, { once: true })
const onSuccess = () => {
finish(() => {
finish(async () => {
detachBuildLog()
controller.signal.removeEventListener('abort', onAbort)

// Build stream completed, but does the image actually exist?
try {
await this.docker.getImage(job.containerImage).inspect()
} catch (e) {
return reject(
new Error(
`Cannot find image '${job.containerImage}' after building. Most likely it failed: ${
(e as Error)?.message || String(e)
}`
)
)
}

CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`)
this.updateImageUsage(job.containerImage).catch((e) => {
CORE_LOGGER.debug(`Failed to track image usage: ${e.message}`)
Expand All @@ -2430,6 +2492,7 @@ export class C2DEngineDocker extends C2DEngine {
})
job.status = C2DStatusNumber.ConfiguringVolumes
job.statusText = C2DStatusText.ConfiguringVolumes
job.buildStopTimestamp = String(Date.now() / 1000)
await this.db.updateJob(job)
} catch (err) {
const aborted =
Expand All @@ -2448,6 +2511,7 @@ export class C2DEngineDocker extends C2DEngine {
}
job.status = C2DStatusNumber.BuildImageFailed
job.statusText = C2DStatusText.BuildImageFailed
job.buildStopTimestamp = String(Date.now() / 1000)
job.isRunning = false
job.dateFinished = String(Date.now() / 1000)
await this.db.updateJob(job)
Expand Down Expand Up @@ -2843,6 +2907,18 @@ export class C2DEngineDocker extends C2DEngine {
}
return false
}

private getValidBuildDurationSeconds(job: DBComputeJob): number {
const startRaw = job.buildStartTimestamp
const stopRaw = job.buildStopTimestamp
if (!startRaw || !stopRaw) return 0
const start = Number.parseFloat(startRaw)
const stop = Number.parseFloat(stopRaw)
if (!Number.isFinite(start) || !Number.isFinite(stop)) return 0
if (start <= 0) return 0
if (stop < start) return 0
return stop - start
}
}

// this uses the docker engine, but exposes only one env, the free one
Expand Down
4 changes: 3 additions & 1 deletion src/components/database/sqliteCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ function getInternalStructure(job: DBComputeJob): any {
algoDuration: job.algoDuration,
queueMaxWaitTime: job.queueMaxWaitTime,
output: job.output,
jobIdHash: job.jobIdHash
jobIdHash: job.jobIdHash,
buildStartTimestamp: job.buildStartTimestamp,
buildStopTimestamp: job.buildStopTimestamp
}
return internalBlob
}
Expand Down
4 changes: 3 additions & 1 deletion src/test/integration/getJobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ function buildJob(overrides: Partial<DBComputeJob> = {}): DBComputeJob {
payment: overrides.payment,
additionalViewers: overrides.additionalViewers || [],
algoDuration: overrides.algoDuration || 0,
queueMaxWaitTime: overrides.queueMaxWaitTime || 0
queueMaxWaitTime: overrides.queueMaxWaitTime || 0,
buildStartTimestamp: overrides.buildStartTimestamp || '0',
buildStopTimestamp: overrides.buildStopTimestamp || '0'
}
}

Expand Down
Loading
Loading