diff --git a/.env.example b/.env.example index 0f4c09503..473bb672f 100644 --- a/.env.example +++ b/.env.example @@ -65,6 +65,8 @@ export P2P_BOOTSTRAP_NODES= export P2P_FILTER_ANNOUNCED_ADDRESSES= ## compute +# Example with cross-resource constraints (constraints are optional and backwards-compatible): +# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01}]}]},"resources":[{"id":"cpu","total":8,"max":8,"min":1,"constraints":[{"id":"ram","min":1,"max":3},{"id":"disk","min":10,"max":100}]},{"id":"ram","total":32,"max":32,"min":1},{"id":"disk","total":500,"max":500,"min":10},{"id":"gpu","total":4,"max":4,"min":0,"constraints":[{"id":"ram","min":8,"max":32},{"id":"cpu","min":2,"max":4}]}]}]' export DOCKER_COMPUTE_ENVIRONMENTS= diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index b154eb8ce..1ec122b6e 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -22,6 +22,12 @@ export interface C2DClusterInfo { export type ComputeResourceType = 'cpu' | 'ram' | 'disk' | any +export interface ResourceConstraint { + id: ComputeResourceType // the resource being constrained + min?: number // min units of this resource per unit of parent resource + max?: number // max units of this resource per unit of parent resource +} + export interface ComputeResourcesPricingInfo { id: ComputeResourceType price: number // price per unit per minute @@ -63,6 +69,7 @@ export interface ComputeResource { */ platform?: string init?: dockerHwInit + constraints?: ResourceConstraint[] // optional cross-resource constraints } export interface ComputeResourceRequest { id: string diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 2423090a9..da9de13e3 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -248,9 +248,65 @@ export abstract class C2DEngine { properResources.push({ id: device, amount: desired }) } + this.checkResourceConstraints(properResources, env, isFree) return properResources } + protected checkResourceConstraints( + resources: ComputeResourceRequest[], + env: ComputeEnvironment, + isFree: boolean + ): void { + const envResources = isFree ? (env.free?.resources ?? []) : (env.resources ?? []) + for (const envResource of envResources) { + if (!envResource.constraints || envResource.constraints.length === 0) continue + const parentAmount = this.getResourceRequest(resources, envResource.id) + if (!parentAmount || parentAmount <= 0) continue + + for (const constraint of envResource.constraints) { + let constrainedAmount = this.getResourceRequest(resources, constraint.id) ?? 0 + + if (constraint.min !== undefined) { + const requiredMin = parentAmount * constraint.min + if (constrainedAmount < requiredMin) { + const constrainedMaxMin = this.getMaxMinResource(constraint.id, env, isFree) + if (requiredMin > constrainedMaxMin.max) { + throw new Error( + `Cannot satisfy constraint: ${parentAmount} ${envResource.id} requires at least ${requiredMin} ${constraint.id}, but max is ${constrainedMaxMin.max}` + ) + } + this.setResourceAmount(resources, constraint.id, requiredMin) + constrainedAmount = requiredMin + } + } + + if (constraint.max !== undefined) { + const requiredMax = parentAmount * constraint.max + // re-read in case it was bumped above + constrainedAmount = this.getResourceRequest(resources, constraint.id) ?? 0 + if (constrainedAmount > requiredMax) { + throw new Error( + `Too much ${constraint.id} for ${parentAmount} ${envResource.id}. Max allowed: ${requiredMax}, requested: ${constrainedAmount}` + ) + } + } + } + } + } + + protected setResourceAmount( + resources: ComputeResourceRequest[], + id: ComputeResourceType, + amount: number + ): void { + for (const resource of resources) { + if (resource.id === id) { + resource.amount = amount + return + } + } + } + public async getUsedResources(env: ComputeEnvironment): Promise { const usedResources: { [x: string]: any } = {} const usedFreeResources: { [x: string]: any } = {} diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 98a1b8aa4..1a6509727 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -205,7 +205,7 @@ describe('Compute', () => { oceanNode.blockchainRegistry ) oceanNode.addIndexer(indexer) - oceanNode.addC2DEngines() + await oceanNode.addC2DEngines() provider = new JsonRpcProvider('http://127.0.0.1:8545') publisherAccount = (await provider.getSigner(0)) as Signer @@ -2215,7 +2215,7 @@ describe('Compute', () => { after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + await indexer.stopAllChainIndexers() }) }) @@ -2368,7 +2368,7 @@ describe('Compute Access Restrictions', () => { oceanNode.blockchainRegistry ) oceanNode.addIndexer(indexer) - oceanNode.addC2DEngines() + await oceanNode.addC2DEngines() publishedComputeDataset = await publishAsset(computeAsset, publisherAccount) publishedAlgoDataset = await publishAsset(algoAsset, publisherAccount) @@ -2555,7 +2555,7 @@ describe('Compute Access Restrictions', () => { oceanNode.blockchainRegistry ) oceanNode.addIndexer(indexer) - oceanNode.addC2DEngines() + await oceanNode.addC2DEngines() publishedComputeDataset = await publishAsset(computeAsset, publisherAccount) publishedAlgoDataset = await publishAsset(algoAsset, publisherAccount) @@ -2685,7 +2685,7 @@ describe('Compute Access Restrictions', () => { oceanNode.blockchainRegistry ) oceanNode.addIndexer(indexer) - oceanNode.addC2DEngines() + await oceanNode.addC2DEngines() const provider = new JsonRpcProvider('http://127.0.0.1:8545') const publisherAccount = (await provider.getSigner(0)) as Signer diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 36e4e37c3..cecb0230f 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -7,8 +7,9 @@ import { C2DStatusText, ComputeAlgorithm, ComputeAsset, - // ComputeEnvironment, + ComputeEnvironment, ComputeJob, + ComputeResourceRequest, DBComputeJob, RunningPlatform } from '../../@types/C2D/C2D.js' @@ -30,8 +31,85 @@ import { import { OceanNodeConfig } from '../../@types/OceanNode.js' import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js' -import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' +import { + C2DEngine, + omitDBComputeFieldsFromComputeJob +} from '../../components/c2d/index.js' import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js' +import { ValidateParams } from '../../components/httpRoutes/validateCommands.js' +import { Readable } from 'stream' + +/* eslint-disable require-await */ +class TestC2DEngine extends C2DEngine { + constructor() { + super(null, null, null, null, null) + } + + async getComputeEnvironments(): Promise { + return [] + } + + async checkDockerImage(): Promise { + return { valid: true, reason: null as string, status: 200 } + } + + async startComputeJob(): Promise { + return [] + } + + async stopComputeJob(): Promise { + return [] + } + + async getComputeJobStatus(): Promise { + return [] + } + + async getComputeJobResult(): Promise<{ stream: Readable; headers: any }> { + return null + } + + async cleanupExpiredStorage(): Promise { + return true + } +} +/* eslint-enable require-await */ + +function makeEnv( + resources: any[], + opts: { + freeResources?: any[] + runningJobs?: number + runningfreeJobs?: number + maxJobs?: number + } = {} +): ComputeEnvironment { + return { + id: 'test-env', + resources, + free: opts.freeResources + ? { + resources: opts.freeResources, + access: { addresses: [], accessLists: null } + } + : undefined, + runningJobs: opts.runningJobs ?? 0, + runningfreeJobs: opts.runningfreeJobs ?? 0, + queuedJobs: 0, + queuedFreeJobs: 0, + queMaxWaitTime: 0, + queMaxWaitTimeFree: 0, + runMaxWaitTime: 0, + runMaxWaitTimeFree: 0, + consumerAddress: '0x0', + fees: {}, + access: { addresses: [], accessLists: null }, + platform: { architecture: 'x86_64', os: 'linux' }, + minJobDuration: 60, + maxJobDuration: 3600, + maxJobs: opts.maxJobs ?? 10 + } +} describe('Compute Jobs Database', () => { let envOverrides: OverrideEnvConfig[] @@ -248,11 +326,187 @@ describe('Compute Jobs Database', () => { expect(checkManifestPlatform(null, env)).to.be.equal(true) }) - it('testing checkAndFillMissingResources', async function () { - // TO DO + describe('testing checkAndFillMissingResources', function () { + let engine: TestC2DEngine + + before(function () { + engine = new TestC2DEngine() + }) + + const baseResources = [ + { id: 'cpu', total: 8, min: 1, max: 8, inUse: 0 }, + { id: 'ram', total: 32, min: 1, max: 32, inUse: 0 }, + { id: 'disk', total: 500, min: 10, max: 500, inUse: 0 } + ] + + it('satisfies constraints exactly → passes without modification', async function () { + const resources = [ + ...baseResources.slice(0, 1).map((r) => ({ + ...r, + constraints: [{ id: 'ram', min: 1, max: 4 }] + })), + ...baseResources.slice(1) + ] + const env = makeEnv(resources) + // 4 cpu, 8 ram (= 4*2, in [4, 16]) → no change + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 4 }, + { id: 'ram', amount: 8 }, + { id: 'disk', amount: 50 } + ] + const result = await engine.checkAndFillMissingResources(req, env, false) + const ramEntry = result.find((r) => r.id === 'ram') + expect(ramEntry.amount).to.equal(8) + }) + + it('resource below constraint min → auto-bumped to required minimum', async function () { + const resources = [ + { ...baseResources[0], constraints: [{ id: 'ram', min: 2, max: 8 }] }, + ...baseResources.slice(1) + ] + const env = makeEnv(resources) + // 4 cpu, 4 ram → ram < 4*2=8 → should be bumped to 8 + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 4 }, + { id: 'ram', amount: 4 }, + { id: 'disk', amount: 50 } + ] + const result = await engine.checkAndFillMissingResources(req, env, false) + const ramEntry = result.find((r) => r.id === 'ram') + expect(ramEntry.amount).to.equal(8) + }) + + it('resource above constraint max → throws meaningful error', async function () { + const resources = [ + { ...baseResources[0], constraints: [{ id: 'ram', min: 1, max: 3 }] }, + ...baseResources.slice(1) + ] + const env = makeEnv(resources) + // 4 cpu, 20 ram → ram > 4*3=12 → throws + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 4 }, + { id: 'ram', amount: 20 }, + { id: 'disk', amount: 50 } + ] + try { + await engine.checkAndFillMissingResources(req, env, false) + assert.fail('Expected error was not thrown') + } catch (err: any) { + expect(err.message).to.include('Too much ram') + expect(err.message).to.include('4 cpu') + expect(err.message).to.include('Max allowed: 12') + } + }) + + it('constraint involving GPU with 0 GPU requested → no constraint applied', async function () { + const resources = [ + ...baseResources, + { + id: 'gpu', + total: 4, + min: 0, + max: 4, + inUse: 0, + constraints: [{ id: 'ram', min: 8, max: 32 }] + } + ] + const env = makeEnv(resources) + // 0 gpu → gpu constraints should not be applied → ram stays at 4 + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 2 }, + { id: 'ram', amount: 4 }, + { id: 'disk', amount: 50 }, + { id: 'gpu', amount: 0 } + ] + const result = await engine.checkAndFillMissingResources(req, env, false) + const ramEntry = result.find((r) => r.id === 'ram') + expect(ramEntry.amount).to.equal(4) + }) + + it('no constraints defined → existing behavior unchanged', async function () { + const env = makeEnv(baseResources) + // below min → bumped to min; above max → throws + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 0 }, + { id: 'ram', amount: 0 }, + { id: 'disk', amount: 0 } + ] + const result = await engine.checkAndFillMissingResources(req, env, false) + const cpuEntry = result.find((r) => r.id === 'cpu') + const diskEntry = result.find((r) => r.id === 'disk') + expect(cpuEntry.amount).to.equal(1) // bumped to min + expect(diskEntry.amount).to.equal(10) // bumped to min + }) }) - it('testing checkIfResourcesAreAvailable', async function () { - // TO DO + + describe('testing checkIfResourcesAreAvailable', function () { + let engine: TestC2DEngine + + before(function () { + engine = new TestC2DEngine() + }) + + it('resources within env limits → passes', async function () { + const env = makeEnv([ + { id: 'cpu', total: 8, min: 1, max: 8, inUse: 2 }, + { id: 'ram', total: 32, min: 1, max: 32, inUse: 4 }, + { id: 'disk', total: 500, min: 10, max: 500, inUse: 50 } + ]) + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 4 }, + { id: 'ram', amount: 8 }, + { id: 'disk', amount: 100 } + ] + // should not throw + await engine.checkIfResourcesAreAvailable(req, env, false) + }) + + it('resources exceed env availability → throws', async function () { + const env = makeEnv([ + { id: 'cpu', total: 4, min: 1, max: 4, inUse: 3 }, + { id: 'ram', total: 32, min: 1, max: 32, inUse: 0 }, + { id: 'disk', total: 500, min: 10, max: 500, inUse: 0 } + ]) + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 4 }, // only 1 available (4-3) + { id: 'ram', amount: 8 }, + { id: 'disk', amount: 100 } + ] + try { + await engine.checkIfResourcesAreAvailable(req, env, false) + assert.fail('Expected error was not thrown') + } catch (err: any) { + expect(err.message).to.include('Not enough available cpu') + } + }) + + it('free resource limit exceeded → throws', async function () { + const env = makeEnv( + [ + { id: 'cpu', total: 8, min: 1, max: 8, inUse: 0 }, + { id: 'ram', total: 32, min: 1, max: 32, inUse: 0 }, + { id: 'disk', total: 500, min: 10, max: 500, inUse: 0 } + ], + { + freeResources: [ + { id: 'cpu', total: 2, min: 1, max: 2, inUse: 2 }, // fully used + { id: 'ram', total: 4, min: 1, max: 4, inUse: 0 }, + { id: 'disk', total: 20, min: 10, max: 20, inUse: 0 } + ] + } + ) + const req: ComputeResourceRequest[] = [ + { id: 'cpu', amount: 1 }, + { id: 'ram', amount: 2 }, + { id: 'disk', amount: 10 } + ] + try { + await engine.checkIfResourcesAreAvailable(req, env, true) + assert.fail('Expected error was not thrown') + } catch (err: any) { + expect(err.message).to.include('cpu') + } + }) }) after(async () => { diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 7246e8de9..07104524e 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -109,6 +109,12 @@ export const DockerRegistryAuthSchema = z export const DockerRegistrysSchema = z.record(z.string(), DockerRegistryAuthSchema) +const ResourceConstraintSchema = z.object({ + id: z.string(), + min: z.number().optional(), + max: z.number().optional() +}) + export const ComputeResourceSchema = z.object({ id: z.string(), total: z.number().optional(), @@ -121,7 +127,8 @@ export const ComputeResourceSchema = z.object({ init: z.any().optional(), platform: z.string().optional(), memoryTotal: z.string().optional(), - driverVersion: z.string().optional() + driverVersion: z.string().optional(), + constraints: z.array(ResourceConstraintSchema).optional() }) export const ComputeResourcesPricingInfoSchema = z.object({