From bf1a46004cd8a98681941e1a70e7280c0549c024 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Tue, 24 Mar 2026 12:41:05 +0200 Subject: [PATCH] set cpu pinning for envs, release cpu once the job is done, handle the case when the node restarts --- src/components/c2d/compute_engine_docker.ts | 113 +++++++++++++++++++- src/components/c2d/compute_engines.ts | 11 +- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 850161eeb..c8baa6216 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -69,14 +69,19 @@ export class C2DEngineDocker extends C2DEngine { private retentionDays: number private cleanupInterval: number private paymentClaimInterval: number + private cpuAllocations: Map = new Map() + private envCpuCores: number[] = [] + private cpuOffset: number public constructor( clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow, keyManager: KeyManager, - dockerRegistryAuths: dockerRegistrysAuth + dockerRegistryAuths: dockerRegistrysAuth, + cpuOffset: number = 0 ) { super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths) + this.cpuOffset = cpuOffset this.docker = null if (clusterConfig.connection.socketPath) { @@ -247,7 +252,15 @@ export class C2DEngineDocker extends C2DEngine { } this.envs[0].resources.push(cpuResources) this.envs[0].resources.push(ramResources) - /* TODO - get namedresources & discreete one + // Build the list of physical CPU core indices for this environment + this.envCpuCores = Array.from( + { length: cpuResources.total }, + (_, i) => this.cpuOffset + i + ) + CORE_LOGGER.info( + `CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})` + ) + /* TODO - get namedresources & discreete one if (sysinfo.GenericResources) { for (const [key, value] of Object.entries(sysinfo.GenericResources)) { for (const [type, val] of Object.entries(value)) { @@ -301,6 +314,9 @@ export class C2DEngineDocker extends C2DEngine { this.envs[0].id = this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees)) + // Rebuild CPU allocations from running containers (handles node restart) + await this.rebuildCpuAllocations() + // only now set the timer if (!this.cronTimer) { this.setNewTimer() @@ -1636,6 +1652,11 @@ export class C2DEngineDocker extends C2DEngine { if (cpus && cpus > 0) { hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod) + // Pin the container to specific physical CPU cores + const cpusetStr = this.allocateCpus(job.jobId, cpus) + if (cpusetStr) { + hostConfig.CpusetCpus = cpusetStr + } } const containerInfo: ContainerCreateOptions = { name: job.jobId + '-algoritm', @@ -1912,6 +1933,93 @@ export class C2DEngineDocker extends C2DEngine { } // eslint-disable-next-line require-await + private parseCpusetString(cpuset: string): number[] { + const cores: number[] = [] + if (!cpuset) return cores + for (const part of cpuset.split(',')) { + if (part.includes('-')) { + const [start, end] = part.split('-').map(Number) + for (let i = start; i <= end; i++) { + cores.push(i) + } + } else { + cores.push(Number(part)) + } + } + return cores + } + + private allocateCpus(jobId: string, count: number): string | null { + if (this.envCpuCores.length === 0 || count <= 0) return null + + const usedCores = new Set() + for (const cores of this.cpuAllocations.values()) { + for (const core of cores) { + usedCores.add(core) + } + } + + const freeCores: number[] = [] + for (const core of this.envCpuCores) { + if (!usedCores.has(core)) { + freeCores.push(core) + if (freeCores.length === count) break + } + } + + if (freeCores.length < count) { + CORE_LOGGER.warn( + `CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})` + ) + return null + } + + this.cpuAllocations.set(jobId, freeCores) + const cpusetStr = freeCores.join(',') + CORE_LOGGER.info(`CPU affinity: allocated cores [${cpusetStr}] to job ${jobId}`) + return cpusetStr + } + + private releaseCpus(jobId: string): void { + const cores = this.cpuAllocations.get(jobId) + if (cores) { + CORE_LOGGER.info( + `CPU affinity: released cores [${cores.join(',')}] from job ${jobId}` + ) + this.cpuAllocations.delete(jobId) + } + } + + /** + * On startup, inspects running Docker containers to rebuild the CPU allocation map. + */ + private async rebuildCpuAllocations(): Promise { + if (this.envCpuCores.length === 0) return + try { + const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) + for (const job of jobs) { + try { + const container = this.docker.getContainer(job.jobId + '-algoritm') + const info = await container.inspect() + const cpuset = info.HostConfig?.CpusetCpus + if (cpuset) { + const cores = this.parseCpusetString(cpuset) + if (cores.length > 0) { + this.cpuAllocations.set(job.jobId, cores) + CORE_LOGGER.info( + `CPU affinity: recovered allocation [${cpuset}] for running job ${job.jobId}` + ) + } + } + } catch (e) { + // Container may not exist yet (e.g., job is in pull/build phase) + } + } + } catch (e) { + CORE_LOGGER.error(`CPU affinity: failed to rebuild allocations: ${e.message}`) + } + } + private async cleanupJob(job: DBComputeJob) { // cleaning up // - claim payment or release lock @@ -1920,6 +2028,7 @@ export class C2DEngineDocker extends C2DEngine { // - delete container this.jobImageSizes.delete(job.jobId) + this.releaseCpus(job.jobId) try { const container = await this.docker.getContainer(job.jobId + '-algoritm') diff --git a/src/components/c2d/compute_engines.ts b/src/components/c2d/compute_engines.ts index 26ad035f9..4934d9416 100644 --- a/src/components/c2d/compute_engines.ts +++ b/src/components/c2d/compute_engines.ts @@ -21,6 +21,7 @@ export class C2DEngines { // if yes, do not create multiple engines if (config && config.c2dClusters) { this.engines = [] + let cpuOffset = 0 for (const cluster of config.c2dClusters) { if (cluster.type === C2DClusterType.DOCKER) { this.engines.push( @@ -29,9 +30,17 @@ export class C2DEngines { db, escrow, keyManager, - config.dockerRegistrysAuth + config.dockerRegistrysAuth, + cpuOffset ) ) + // Advance the CPU offset by this cluster's configured CPU total + if (cluster.connection?.resources) { + const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu') + if (cpuRes?.total) { + cpuOffset += cpuRes.total + } + } } } }