diff --git a/docs/compute-pricing.md b/docs/compute-pricing.md index e2912bd46..c6598e760 100644 --- a/docs/compute-pricing.md +++ b/docs/compute-pricing.md @@ -5,8 +5,11 @@ This guide explains how to configure your node’s Docker compute environments a ## Overview - **Configuration**: Define compute environments via the `DOCKER_COMPUTE_ENVIRONMENTS` environment variable (JSON) or via `config.json` under `dockerComputeEnvironments`. +- **Environment**: Is a group of resources, payment and accesslists. - **Resources**: Each environment declares resources (e.g. `cpu`, `ram`, `disk`, and optionally GPUs). You must declare a `disk` resource. - **Pricing**: For each chain and fee token, you set a `price` per resource. Cost is computed as **price × amount × duration (in minutes, rounded up)**. +- **Free**: Environments which does not require a payment for the resources, but most likley are very limited in terms of resources available and job duration. +- **Image building**: **Free jobs cannot build images** (Dockerfiles are not allowed). For **paid jobs**, **image build time counts toward billable duration** and also consumes the job’s `maxJobDuration`. ## Pricing Units diff --git a/docs/env.md b/docs/env.md index e07f41b82..12627d223 100644 --- a/docs/env.md +++ b/docs/env.md @@ -218,6 +218,7 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of - **maxJobDuration**: Maximum duration in seconds for a free job. - **minJobDuration**: Minimum duration in seconds for a free job. - **maxJobs**: Maximum number of simultaneous free jobs. + - **allowImageBuild**: If building images is allowed on free envs. Default: false - **access**: Access control configuration for free compute jobs. Works the same as the main `access` field. - **addresses**: Array of Ethereum addresses allowed to run free compute jobs. - **accessLists**: Array of AccessList contract addresses for free compute access control. diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 603e8b188..6337792ea 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -106,6 +106,7 @@ export interface ComputeEnvironmentFreeOptions { maxJobs?: number // maximum number of simultaneous free jobs resources?: ComputeResource[] access: ComputeAccessList + allowImageBuild?: boolean } export interface ComputeEnvironmentBaseConfig { description?: string // v1 @@ -280,6 +281,8 @@ export interface DBComputeJob extends ComputeJob { encryptedDockerRegistryAuth?: string output?: string // this is always an ECIES encrypted string, that decodes to ComputeOutput interface jobIdHash: string + buildStartTimestamp?: string + buildStopTimestamp?: string } // make sure we keep them both in sync diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index da9de13e3..37f1067c0 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -327,8 +327,9 @@ export abstract class C2DEngine { for (const job of jobs) { if (job.environment === env.id) { if (job.queueMaxWaitTime === 0) { - const timeElapsed = - new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) + const timeElapsed = job.buildStartTimestamp + ? new Date().getTime() / 1000 - Number.parseFloat(job?.buildStartTimestamp) + : new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) totalJobs++ maxRunningTime += job.maxJobDuration - timeElapsed if (job.isFree) { diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 224414f86..9960f7ab0 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -72,6 +72,7 @@ export class C2DEngineDocker extends C2DEngine { private cpuAllocations: Map = new Map() private envCpuCores: number[] = [] private cpuOffset: number + public constructor( clusterConfig: C2DClusterInfo, db: C2DDatabase, @@ -414,11 +415,11 @@ export class C2DEngineDocker extends C2DEngine { } // Process each job to determine what operation is needed + let duration for (const job of jobs) { // Calculate algo duration - const algoDuration = - parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp) - job.algoDuration = algoDuration + duration = parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp) + duration += this.getValidBuildDurationSeconds(job) // Free jobs or jobs without payment info - mark as finished if (job.isFree || !job.payment) { @@ -455,7 +456,7 @@ export class C2DEngineDocker extends C2DEngine { continue } - let minDuration = Math.abs(algoDuration) + let minDuration = Math.abs(duration) if (minDuration > job.maxJobDuration) { minDuration = job.maxJobDuration } @@ -1107,6 +1108,13 @@ export class C2DEngineDocker extends C2DEngine { throw new Error(`additionalDockerFiles cannot be used with queued jobs`) } } + if ( + algorithm.meta.container && + algorithm.meta.container.dockerfile && + !env.free.allowImageBuild + ) { + throw new Error(`Building image is not allowed for free jobs`) + } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, @@ -1147,7 +1155,9 @@ export class C2DEngineDocker extends C2DEngine { algoDuration: 0, queueMaxWaitTime: queueMaxWaitTime || 0, encryptedDockerRegistryAuth, // we store the encrypted docker registry auth in the job - output + output, + buildStartTimestamp: '0', + buildStopTimestamp: '0' } if (algorithm.meta.container && algorithm.meta.container.dockerfile) { @@ -1606,6 +1616,19 @@ export class C2DEngineDocker extends C2DEngine { } if (job.status === C2DStatusNumber.ConfiguringVolumes) { + // we have the image (etiher pulled or built) + // if built, check if build process took all allocated time + // if yes, stop the job + const buildDuration = this.getValidBuildDurationSeconds(job) + if (buildDuration > 0 && buildDuration >= job.maxJobDuration) { + job.isStarted = false + job.status = C2DStatusNumber.PublishingResults + job.statusText = C2DStatusText.PublishingResults + job.algoStartTimestamp = '0' + job.algoStopTimestamp = '0' + job.isRunning = false + await this.db.updateJob(job) + } // create the volume & create container // TO DO C2D: Choose driver & size // get env info @@ -1814,7 +1837,13 @@ export class C2DEngineDocker extends C2DEngine { } const timeNow = Date.now() / 1000 - const expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration + let expiry + + const buildDuration = this.getValidBuildDurationSeconds(job) + if (buildDuration > 0) { + // if job has build time, reduce the remaining algorithm runtime budget + expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration - buildDuration + } else expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration CORE_LOGGER.debug( 'container running since timeNow: ' + timeNow + ' , Expiry: ' + expiry ) @@ -1964,6 +1993,14 @@ export class C2DEngineDocker extends C2DEngine { private allocateCpus(jobId: string, count: number): string | null { if (this.envCpuCores.length === 0 || count <= 0) return null + const existing = this.cpuAllocations.get(jobId) + if (existing && existing.length > 0) { + const cpusetStr = existing.join(',') + CORE_LOGGER.info( + `CPU affinity: reusing existing cores [${cpusetStr}] for job ${jobId}` + ) + return cpusetStr + } const usedCores = new Set() for (const cores of this.cpuAllocations.values()) { @@ -2341,7 +2378,7 @@ export class C2DEngineDocker extends C2DEngine { const imageLogFile = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' const controller = new AbortController() - const timeoutMs = 5 * 60 * 1000 + const timeoutMs = job.maxJobDuration * 1000 const timer = setTimeout(() => controller.abort(), timeoutMs) try { const pack = tarStream.pack() @@ -2355,18 +2392,29 @@ export class C2DEngineDocker extends C2DEngine { } } pack.finalize() + job.buildStartTimestamp = String(Date.now() / 1000) + await this.db.updateJob(job) - // Build the image using the tar stream as context (Node IncomingMessage extends stream.Readable) - const buildStream = (await this.docker.buildImage(pack, { + const cpuperiod = 100000 + const ramGb = this.getResourceRequest(job.resources, 'ram') + const ramBytes = + ramGb && ramGb > 0 ? ramGb * 1024 * 1024 * 1024 : 1024 * 1024 * 1024 + + const cpus = this.getResourceRequest(job.resources, 'cpu') + const cpuquota = cpus && cpus > 0 ? Math.floor(cpus * cpuperiod) : 50000 + + const buildOptions: Dockerode.ImageBuildOptions = { t: job.containerImage, - memory: 1024 * 1024 * 1024, // 1GB RAM in bytes - memswap: -1, // Disable swap - cpushares: 512, // CPU Shares (default is 1024) - cpuquota: 50000, // 50% of one CPU (100000 = 1 CPU) - cpuperiod: 100000, // Default period + memory: ramBytes, + memswap: ramBytes, // same as memory => no swap + cpushares: 1024, // CPU Shares (default is 1024) + cpuquota, // 100000 = 1 CPU with cpuperiod=100000 + cpuperiod, nocache: true, // prevent cache poison abortSignal: controller.signal - })) as Readable + } + // Build the image using the tar stream as context (Node IncomingMessage extends stream.Readable) + const buildStream = (await this.docker.buildImage(pack, buildOptions)) as Readable const onBuildData = (data: Buffer) => { try { @@ -2405,9 +2453,23 @@ export class C2DEngineDocker extends C2DEngine { } controller.signal.addEventListener('abort', onAbort, { once: true }) const onSuccess = () => { - finish(() => { + finish(async () => { detachBuildLog() controller.signal.removeEventListener('abort', onAbort) + + // Build stream completed, but does the image actually exist? + try { + await this.docker.getImage(job.containerImage).inspect() + } catch (e) { + return reject( + new Error( + `Cannot find image '${job.containerImage}' after building. Most likely it failed: ${ + (e as Error)?.message || String(e) + }` + ) + ) + } + CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`) this.updateImageUsage(job.containerImage).catch((e) => { CORE_LOGGER.debug(`Failed to track image usage: ${e.message}`) @@ -2430,6 +2492,7 @@ export class C2DEngineDocker extends C2DEngine { }) job.status = C2DStatusNumber.ConfiguringVolumes job.statusText = C2DStatusText.ConfiguringVolumes + job.buildStopTimestamp = String(Date.now() / 1000) await this.db.updateJob(job) } catch (err) { const aborted = @@ -2448,6 +2511,7 @@ export class C2DEngineDocker extends C2DEngine { } job.status = C2DStatusNumber.BuildImageFailed job.statusText = C2DStatusText.BuildImageFailed + job.buildStopTimestamp = String(Date.now() / 1000) job.isRunning = false job.dateFinished = String(Date.now() / 1000) await this.db.updateJob(job) @@ -2843,6 +2907,18 @@ export class C2DEngineDocker extends C2DEngine { } return false } + + private getValidBuildDurationSeconds(job: DBComputeJob): number { + const startRaw = job.buildStartTimestamp + const stopRaw = job.buildStopTimestamp + if (!startRaw || !stopRaw) return 0 + const start = Number.parseFloat(startRaw) + const stop = Number.parseFloat(stopRaw) + if (!Number.isFinite(start) || !Number.isFinite(stop)) return 0 + if (start <= 0) return 0 + if (stop < start) return 0 + return stop - start + } } // this uses the docker engine, but exposes only one env, the free one diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index d0596617e..b83aef083 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -48,7 +48,9 @@ function getInternalStructure(job: DBComputeJob): any { algoDuration: job.algoDuration, queueMaxWaitTime: job.queueMaxWaitTime, output: job.output, - jobIdHash: job.jobIdHash + jobIdHash: job.jobIdHash, + buildStartTimestamp: job.buildStartTimestamp, + buildStopTimestamp: job.buildStopTimestamp } return internalBlob } diff --git a/src/test/integration/getJobs.test.ts b/src/test/integration/getJobs.test.ts index eeae02ec5..3311ec7f9 100644 --- a/src/test/integration/getJobs.test.ts +++ b/src/test/integration/getJobs.test.ts @@ -60,7 +60,9 @@ function buildJob(overrides: Partial = {}): DBComputeJob { payment: overrides.payment, additionalViewers: overrides.additionalViewers || [], algoDuration: overrides.algoDuration || 0, - queueMaxWaitTime: overrides.queueMaxWaitTime || 0 + queueMaxWaitTime: overrides.queueMaxWaitTime || 0, + buildStartTimestamp: overrides.buildStartTimestamp || '0', + buildStopTimestamp: overrides.buildStopTimestamp || '0' } } diff --git a/src/test/unit/buildImage.test.ts b/src/test/unit/buildImage.test.ts new file mode 100644 index 000000000..caafbc234 --- /dev/null +++ b/src/test/unit/buildImage.test.ts @@ -0,0 +1,157 @@ +import { expect } from 'chai' +import sinon from 'sinon' +import { mkdirSync } from 'fs' +import os from 'os' +import path from 'path' +import { Readable } from 'stream' +import { C2DStatusNumber } from '../../@types/C2D/C2D.js' +import type { DBComputeJob } from '../../@types/C2D/C2D.js' + +function ensureTestEnv() { + // Several runtime modules validate env on import; provide safe defaults for unit tests. + if (!process.env.PRIVATE_KEY) { + process.env.PRIVATE_KEY = `0x${'11'.repeat(32)}` + } +} + +async function makeEngine(opts: { tempFolder: string }) { + ensureTestEnv() + const { C2DEngineDocker } = + await import('../../components/c2d/compute_engine_docker.js') + const db = { + updateJob: sinon.stub().resolves(), + // buildImage() doesn't call getJobs*; keep minimal surface + getRunningJobs: sinon.stub().resolves([]), + getJobsByStatus: sinon.stub().resolves([]) + } as any + + const clusterConfig = { + type: 2, + hash: 'test-hash', + tempFolder: opts.tempFolder, + connection: { + // keep constructor happy + imageRetentionDays: 1, + imageCleanupInterval: 999999, + paymentClaimInterval: 999999 + } + } as any + + const engine = new C2DEngineDocker(clusterConfig, db, {} as any, {} as any, {} as any) + + // prevent side-effects during unit tests + ;(engine as any).cleanupJob = sinon.stub().resolves() + ;(engine as any).updateImageUsage = sinon.stub().resolves() + + return { engine, db } +} + +function makeJob(base: Partial = {}): DBComputeJob { + return { + jobId: 'job-123', + owner: '0x0', + environment: 'env-1', + dateCreated: String(Date.now() / 1000), + dateFinished: null as any, + clusterHash: 'test-hash', + isFree: false, + isRunning: true, + isStarted: false, + stopRequested: false, + status: C2DStatusNumber.BuildImage, + statusText: 'BuildImage', + resources: [ + { id: 'cpu', amount: 1 }, + { id: 'ram', amount: 1 }, + { id: 'disk', amount: 1 } + ], + maxJobDuration: 60, + queueMaxWaitTime: 0, + // timestamps + algoStartTimestamp: '0', + algoStopTimestamp: '0', + buildStartTimestamp: '0', + buildStopTimestamp: '0', + // algorithm/container + algorithm: { + did: 'did:op:algo', + serviceIndex: 0, + meta: { + container: { + image: 'dummy', + tag: 'latest', + entrypoint: 'node', + checksum: '0x0', + dockerfile: 'FROM alpine:3.18\nRUN echo hi\n' + } + } + } as any, + input: [] as any, + output: '' as any, + containerImage: 'ocean-node-test:job-123', + algoDuration: 0, + encryptedDockerRegistryAuth: undefined, + payment: null as any, + additionalViewers: [], + logs: null as any, + results: null as any, + jobIdHash: '1', + ...base + } as DBComputeJob +} + +describe('C2DEngineDocker.buildImage', () => { + afterEach(() => { + sinon.restore() + }) + + it('marks build as failed if image is missing after build completes', async () => { + const tempFolder = path.join(os.tmpdir(), 'ocean-node-buildimage-test') + const { engine, db } = await makeEngine({ tempFolder }) + + const job = makeJob() + mkdirSync(path.join(tempFolder, job.jobId, 'data', 'logs'), { recursive: true }) + + const buildStream = new Readable({ read() {} }) + ;(engine as any).docker = { + buildImage: sinon.stub().resolves(buildStream), + getImage: sinon.stub().returns({ + inspect: sinon.stub().rejects(new Error('no such image')) + }) + } + + const p = (engine as any).buildImage(job, null) + await new Promise((resolve) => setImmediate(resolve)) + buildStream.emit('end') + await p + + expect(db.updateJob.called).to.equal(true) + const lastUpdate = db.updateJob.lastCall.args[0] as DBComputeJob + expect(lastUpdate.status).to.equal(C2DStatusNumber.BuildImageFailed) + }) + + it('only logs success when image exists', async () => { + const tempFolder = path.join(os.tmpdir(), 'ocean-node-buildimage-test-success') + const { engine, db } = await makeEngine({ tempFolder }) + + const job = makeJob({ containerImage: 'ocean-node-test:job-123-success' }) + mkdirSync(path.join(tempFolder, job.jobId, 'data', 'logs'), { recursive: true }) + + const buildStream = new Readable({ read() {} }) + ;(engine as any).docker = { + buildImage: sinon.stub().resolves(buildStream), + getImage: sinon.stub().returns({ + inspect: sinon.stub().resolves({}) + }) + } + + const p = (engine as any).buildImage(job, null) + await new Promise((resolve) => setImmediate(resolve)) + buildStream.emit('end') + await p + + const lastUpdate = db.updateJob.lastCall.args[0] as DBComputeJob + expect(lastUpdate.status).to.equal(C2DStatusNumber.ConfiguringVolumes) + expect(Number.parseFloat(lastUpdate.buildStopTimestamp)).to.be.greaterThan(0) + }) +}) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 07104524e..e18df6de8 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -153,7 +153,8 @@ export const ComputeEnvironmentFreeOptionsSchema = z.object({ .nullable() .optional() }) - .optional() + .optional(), + allowImageBuild: z.boolean().optional().default(false) }) export const C2DDockerConfigSchema = z.array(