Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 111 additions & 2 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,19 @@ export class C2DEngineDocker extends C2DEngine {
private retentionDays: number
private cleanupInterval: number
private paymentClaimInterval: number
private cpuAllocations: Map<string, number[]> = new Map()
private envCpuCores: number[] = []
private cpuOffset: number
public constructor(
clusterConfig: C2DClusterInfo,
db: C2DDatabase,
escrow: Escrow,
keyManager: KeyManager,
dockerRegistryAuths: dockerRegistrysAuth
dockerRegistryAuths: dockerRegistrysAuth,
cpuOffset: number = 0
) {
super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths)
this.cpuOffset = cpuOffset

this.docker = null
if (clusterConfig.connection.socketPath) {
Expand Down Expand Up @@ -247,7 +252,15 @@ export class C2DEngineDocker extends C2DEngine {
}
this.envs[0].resources.push(cpuResources)
this.envs[0].resources.push(ramResources)
/* TODO - get namedresources & discreete one
// Build the list of physical CPU core indices for this environment
this.envCpuCores = Array.from(
{ length: cpuResources.total },
(_, i) => this.cpuOffset + i
)
CORE_LOGGER.info(
`CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})`
)
/* TODO - get namedresources & discreete one
if (sysinfo.GenericResources) {
for (const [key, value] of Object.entries(sysinfo.GenericResources)) {
for (const [type, val] of Object.entries(value)) {
Expand Down Expand Up @@ -301,6 +314,9 @@ export class C2DEngineDocker extends C2DEngine {
this.envs[0].id =
this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees))

// Rebuild CPU allocations from running containers (handles node restart)
await this.rebuildCpuAllocations()

// only now set the timer
if (!this.cronTimer) {
this.setNewTimer()
Expand Down Expand Up @@ -1636,6 +1652,11 @@ export class C2DEngineDocker extends C2DEngine {
if (cpus && cpus > 0) {
hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default
hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod)
// Pin the container to specific physical CPU cores
const cpusetStr = this.allocateCpus(job.jobId, cpus)
if (cpusetStr) {
hostConfig.CpusetCpus = cpusetStr
}
}
const containerInfo: ContainerCreateOptions = {
name: job.jobId + '-algoritm',
Expand Down Expand Up @@ -1912,6 +1933,93 @@ export class C2DEngineDocker extends C2DEngine {
}

// eslint-disable-next-line require-await
private parseCpusetString(cpuset: string): number[] {
const cores: number[] = []
if (!cpuset) return cores
for (const part of cpuset.split(',')) {
if (part.includes('-')) {
const [start, end] = part.split('-').map(Number)
for (let i = start; i <= end; i++) {
cores.push(i)
}
} else {
cores.push(Number(part))
}
}
return cores
}

private allocateCpus(jobId: string, count: number): string | null {
if (this.envCpuCores.length === 0 || count <= 0) return null

const usedCores = new Set<number>()
for (const cores of this.cpuAllocations.values()) {
for (const core of cores) {
usedCores.add(core)
}
}

const freeCores: number[] = []
for (const core of this.envCpuCores) {
if (!usedCores.has(core)) {
freeCores.push(core)
if (freeCores.length === count) break
}
}

if (freeCores.length < count) {
CORE_LOGGER.warn(
`CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})`
)
return null
}

this.cpuAllocations.set(jobId, freeCores)
const cpusetStr = freeCores.join(',')
CORE_LOGGER.info(`CPU affinity: allocated cores [${cpusetStr}] to job ${jobId}`)
return cpusetStr
}

private releaseCpus(jobId: string): void {
const cores = this.cpuAllocations.get(jobId)
if (cores) {
CORE_LOGGER.info(
`CPU affinity: released cores [${cores.join(',')}] from job ${jobId}`
)
this.cpuAllocations.delete(jobId)
}
}

/**
* On startup, inspects running Docker containers to rebuild the CPU allocation map.
*/
private async rebuildCpuAllocations(): Promise<void> {
if (this.envCpuCores.length === 0) return
try {
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)
for (const job of jobs) {
try {
const container = this.docker.getContainer(job.jobId + '-algoritm')
const info = await container.inspect()
const cpuset = info.HostConfig?.CpusetCpus
if (cpuset) {
const cores = this.parseCpusetString(cpuset)
if (cores.length > 0) {
this.cpuAllocations.set(job.jobId, cores)
CORE_LOGGER.info(
`CPU affinity: recovered allocation [${cpuset}] for running job ${job.jobId}`
)
}
}
} catch (e) {
// Container may not exist yet (e.g., job is in pull/build phase)
}
}
} catch (e) {
CORE_LOGGER.error(`CPU affinity: failed to rebuild allocations: ${e.message}`)
}
}

private async cleanupJob(job: DBComputeJob) {
// cleaning up
// - claim payment or release lock
Expand All @@ -1920,6 +2028,7 @@ export class C2DEngineDocker extends C2DEngine {
// - delete container

this.jobImageSizes.delete(job.jobId)
this.releaseCpus(job.jobId)

try {
const container = await this.docker.getContainer(job.jobId + '-algoritm')
Expand Down
11 changes: 10 additions & 1 deletion src/components/c2d/compute_engines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class C2DEngines {
// if yes, do not create multiple engines
if (config && config.c2dClusters) {
this.engines = []
let cpuOffset = 0
for (const cluster of config.c2dClusters) {
if (cluster.type === C2DClusterType.DOCKER) {
this.engines.push(
Expand All @@ -29,9 +30,17 @@ export class C2DEngines {
db,
escrow,
keyManager,
config.dockerRegistrysAuth
config.dockerRegistrysAuth,
cpuOffset
)
)
// Advance the CPU offset by this cluster's configured CPU total
if (cluster.connection?.resources) {
const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu')
if (cpuRes?.total) {
cpuOffset += cpuRes.total
}
}
}
}
}
Expand Down
Loading