From 8c2d39040a4e795dfd72801b9ebaa420e849b4c5 Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Mon, 30 Mar 2026 19:31:14 -0700 Subject: [PATCH] Epoll: Rework epoll type I spent a little trying to fix all of the various issues with the epoll wrapper we had, before realizing most of them would be completely gone if we just reworked the type itself :). The callback nature, all of the handler/state tracking internally all (to me) has no purpose being in the type itself. All of this logic can live outside and the wrapper should just be a typesafe abstraction around it that you can build on. This is what this change aims to do. There should be zero behavioral difference here. --- Sources/ContainerizationOS/Linux/Epoll.swift | 212 +++++++++--------- vminitd/Sources/vminitd/IOPair.swift | 4 +- .../Sources/vminitd/ProcessSupervisor.swift | 42 +++- vminitd/Sources/vminitd/VsockProxy.swift | 12 +- 4 files changed, 149 insertions(+), 121 deletions(-) diff --git a/Sources/ContainerizationOS/Linux/Epoll.swift b/Sources/ContainerizationOS/Linux/Epoll.swift index f239be1f..9ef2ce8e 100644 --- a/Sources/ContainerizationOS/Linux/Epoll.swift +++ b/Sources/ContainerizationOS/Linux/Epoll.swift @@ -15,16 +15,17 @@ //===----------------------------------------------------------------------===// #if os(Linux) +import Foundation #if canImport(Musl) import Musl +private let _write = Musl.write #elseif canImport(Glibc) import Glibc +private let _write = Glibc.write #endif import CShim -import Foundation -import Synchronization // On glibc, epoll constants are EPOLL_EVENTS enum values. On musl they're // plain UInt32. These helpers normalize them to UInt32/Int32. @@ -35,113 +36,150 @@ private func epollMask(_ value: EPOLL_EVENTS) -> UInt32 { value.rawValue } private func epollFlag(_ value: EPOLL_EVENTS) -> Int32 { Int32(bitPattern: value.rawValue) } #endif -/// Register file descriptors to receive events via Linux's -/// epoll syscall surface. +/// A thin wrapper around the Linux epoll syscall surface. public final class Epoll: Sendable { - public typealias Mask = Int32 - public typealias Handler = (@Sendable (Mask) -> Void) + /// A set of epoll event flags. + public struct Mask: OptionSet, Sendable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let input = Mask(rawValue: epollMask(EPOLLIN)) + public static let output = Mask(rawValue: epollMask(EPOLLOUT)) - public static let maskIn: Int32 = Int32(bitPattern: epollMask(EPOLLIN)) - public static let maskOut: Int32 = Int32(bitPattern: epollMask(EPOLLOUT)) - public static let defaultMask: Int32 = maskIn | maskOut + public var isHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) + } + + public var isRemoteHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLRDHUP))) + } + + public var readyToRead: Bool { + self.contains(.input) + } + + public var readyToWrite: Bool { + self.contains(.output) + } + } + + /// An event returned by `wait()`. + public struct Event: Sendable { + public let fd: Int32 + public let mask: Mask + } private let epollFD: Int32 - private let handlers = SafeMap() - private let pipe = Pipe() // to wake up a waiting epoll_wait + private let eventFD: Int32 public init() throws { let efd = epoll_create1(Int32(EPOLL_CLOEXEC)) - guard efd > 0 else { + guard efd >= 0 else { throw POSIXError.fromErrno() } + + let evfd = eventfd(0, Int32(EFD_CLOEXEC | EFD_NONBLOCK)) + guard evfd >= 0 else { + let evfdErrno = POSIXError.fromErrno() + close(efd) + throw evfdErrno + } + self.epollFD = efd - try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in } + self.eventFD = evfd + + // Register the eventfd with epoll for shutdown signaling. + var event = epoll_event() + event.events = epollMask(EPOLLIN) + event.data.fd = self.eventFD + let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(efd, EPOLL_CTL_ADD, self.eventFD, ptr) + } + guard ctlResult == 0 else { + let ctlErrno = POSIXError.fromErrno() + close(evfd) + close(efd) + throw ctlErrno + } + } + + deinit { + close(epollFD) + close(eventFD) } - public func add( - _ fd: Int32, - mask: Int32 = Epoll.defaultMask, - handler: @escaping Handler - ) throws { + /// Register a file descriptor for edge-triggered monitoring. + public func add(_ fd: Int32, mask: Mask) throws { guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else { throw POSIXError.fromErrno() } - let events = epollMask(EPOLLET) | UInt32(bitPattern: mask) + let events = epollMask(EPOLLET) | mask.rawValue var event = epoll_event() event.events = events event.data.fd = fd try withUnsafeMutablePointer(to: &event) { ptr in - while true { - if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { - if errno == EAGAIN || errno == EINTR { - continue - } - throw POSIXError.fromErrno() - } - break + if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { + throw POSIXError.fromErrno() } } + } - self.handlers.set(fd, handler) + /// Remove a file descriptor from the monitored collection. + public func delete(_ fd: Int32) throws { + var event = epoll_event() + let result = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32 + } + if result != 0 { + if !acceptableDeletionErrno() { + throw POSIXError.fromErrno() + } + } } - /// Run the main epoll loop. + /// Wait for events. /// - /// max events to return in a single wait - /// timeout in ms. - /// -1 means block forever. - /// 0 means return immediately if no events. - public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws { - var events: [epoll_event] = .init( - repeating: epoll_event(), - count: maxEvents - ) + /// Returns ready events, an empty array on timeout, or `nil` on shutdown. + public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event]? { + var events: [epoll_event] = .init(repeating: epoll_event(), count: maxEvents) while true { let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout) - guard n >= 0 else { + if n < 0 { if errno == EINTR || errno == EAGAIN { - continue // go back to epoll_wait + continue } - throw POSIXError.fromErrno() + preconditionFailure("epoll_wait failed unexpectedly: \(POSIXError.fromErrno())") } if n == 0 { - return // if epoll wait times out, then n will be 0 + return [] } + var result: [Event] = [] + result.reserveCapacity(Int(n)) for i in 0...size) + precondition(n == MemoryLayout.size, "eventfd write failed: \(POSIXError.fromErrno())") } // The errno's here are acceptable and can happen if the caller @@ -149,52 +187,6 @@ public final class Epoll: Sendable { private func acceptableDeletionErrno() -> Bool { errno == ENOENT || errno == EBADF || errno == EPERM } - - /// Shutdown the epoll handler. - public func shutdown() throws { - // wakes up epoll_wait and triggers a shutdown - try self.pipe.fileHandleForWriting.close() - } - - private final class SafeMap: Sendable { - let dict = Mutex<[Key: Value]>([:]) - - func set(_ key: Key, _ value: Value) { - dict.withLock { @Sendable in - $0[key] = value - } - } - - func get(_ key: Key) -> Value? { - dict.withLock { @Sendable in - $0[key] - } - } - - func del(_ key: Key) { - dict.withLock { @Sendable in - _ = $0.removeValue(forKey: key) - } - } - } -} - -extension Epoll.Mask { - public var isHangup: Bool { - (self & Int32(bitPattern: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) != 0 - } - - public var isRhangup: Bool { - (self & Int32(bitPattern: epollMask(EPOLLRDHUP))) != 0 - } - - public var readyToRead: Bool { - (self & Int32(bitPattern: epollMask(EPOLLIN))) != 0 - } - - public var readyToWrite: Bool { - (self & Int32(bitPattern: epollMask(EPOLLOUT))) != 0 - } } #endif // os(Linux) diff --git a/vminitd/Sources/vminitd/IOPair.swift b/vminitd/Sources/vminitd/IOPair.swift index 4b8a78aa..7243e16f 100644 --- a/vminitd/Sources/vminitd/IOPair.swift +++ b/vminitd/Sources/vminitd/IOPair.swift @@ -69,7 +69,7 @@ final class IOPair: Sendable { // Remove the fd from our global epoll instance first. let readFromFd = self.from.fileDescriptor do { - try ProcessSupervisor.default.poller.delete(readFromFd) + try ProcessSupervisor.default.unregisterFd(readFromFd) } catch { logger?.error("failed to delete fd from epoll \(readFromFd): \(error)") } @@ -118,7 +118,7 @@ final class IOPair: Sendable { let readFrom = OSFile(fd: readFromFd) let writeTo = OSFile(fd: writeToFd) - try ProcessSupervisor.default.poller.add(readFromFd, mask: Epoll.maskIn) { mask in + try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in self.io.withLock { io in if io.closed { return diff --git a/vminitd/Sources/vminitd/ProcessSupervisor.swift b/vminitd/Sources/vminitd/ProcessSupervisor.swift index 6bfe3e9d..b2d04ce8 100644 --- a/vminitd/Sources/vminitd/ProcessSupervisor.swift +++ b/vminitd/Sources/vminitd/ProcessSupervisor.swift @@ -20,7 +20,8 @@ import Logging import Synchronization final class ProcessSupervisor: Sendable { - let poller: Epoll + private let poller: Epoll + private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:]) private let queue: DispatchQueue // `DispatchSourceSignal` is thread-safe. @@ -47,11 +48,46 @@ final class ProcessSupervisor: Sendable { self.poller = try! Epoll() self.state = Mutex(State()) let t = Thread { - try! self.poller.run() + while true { + guard let events = self.poller.wait() else { + return + } + if events.isEmpty { + return + } + for event in events { + let handler = self.handlers.withLock { $0[event.fd] } + handler?(event.mask) + } + } } t.start() } + /// Register a file descriptor for epoll monitoring with a handler. + /// + /// The handler is stored before the fd is added to epoll, ensuring no + /// events are missed. + func registerFd( + _ fd: Int32, + mask: Epoll.Mask = [.input, .output], + handler: @escaping @Sendable (Epoll.Mask) -> Void + ) throws { + self.handlers.withLock { $0[fd] = handler } + do { + try self.poller.add(fd, mask: mask) + } catch { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + throw error + } + } + + /// Remove a file descriptor from epoll monitoring and discard its handler. + func unregisterFd(_ fd: Int32) throws { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + try self.poller.delete(fd) + } + func ready() { self.source.setEventHandler { self.handleSignal() @@ -123,6 +159,6 @@ final class ProcessSupervisor: Sendable { deinit { source.cancel() - try? poller.shutdown() + poller.shutdown() } } diff --git a/vminitd/Sources/vminitd/VsockProxy.swift b/vminitd/Sources/vminitd/VsockProxy.swift index fabdc353..31314c15 100644 --- a/vminitd/Sources/vminitd/VsockProxy.swift +++ b/vminitd/Sources/vminitd/VsockProxy.swift @@ -234,8 +234,8 @@ extension VsockProxy { ) do { - try ProcessSupervisor.default.poller.delete(clientFile.fileDescriptor) - try ProcessSupervisor.default.poller.delete(serverFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(clientFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(serverFile.fileDescriptor) try conn.close() try relayTo.close() } catch { @@ -244,7 +244,7 @@ extension VsockProxy { c.resume() } - try! ProcessSupervisor.default.poller.add(clientFile.fileDescriptor) { mask in + try! ProcessSupervisor.default.registerFd(clientFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromClient { let (fromEof, toEof) = Self.transferData( fromFile: &clientFile, @@ -270,7 +270,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromClient { + } else if mask.isRemoteHangup && !eofFromClient { // half close, shut down client to server transfer // we should see no more EPOLLIN events on the client fd // and no more EPOLLOUT events on the server fd @@ -296,7 +296,7 @@ extension VsockProxy { } } - try! ProcessSupervisor.default.poller.add(serverFile.fileDescriptor) { mask in + try! ProcessSupervisor.default.registerFd(serverFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromServer { let (fromEof, toEof) = Self.transferData( fromFile: &serverFile, @@ -322,7 +322,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromServer { + } else if mask.isRemoteHangup && !eofFromServer { // half close, shut down server to client transfer // we should see no more EPOLLIN events on the server fd // and no more EPOLLOUT events on the client fd