diff --git a/Sources/ContainerizationOS/Linux/Epoll.swift b/Sources/ContainerizationOS/Linux/Epoll.swift index f239be1f..9ef2ce8e 100644 --- a/Sources/ContainerizationOS/Linux/Epoll.swift +++ b/Sources/ContainerizationOS/Linux/Epoll.swift @@ -15,16 +15,17 @@ //===----------------------------------------------------------------------===// #if os(Linux) +import Foundation #if canImport(Musl) import Musl +private let _write = Musl.write #elseif canImport(Glibc) import Glibc +private let _write = Glibc.write #endif import CShim -import Foundation -import Synchronization // On glibc, epoll constants are EPOLL_EVENTS enum values. On musl they're // plain UInt32. These helpers normalize them to UInt32/Int32. @@ -35,113 +36,150 @@ private func epollMask(_ value: EPOLL_EVENTS) -> UInt32 { value.rawValue } private func epollFlag(_ value: EPOLL_EVENTS) -> Int32 { Int32(bitPattern: value.rawValue) } #endif -/// Register file descriptors to receive events via Linux's -/// epoll syscall surface. +/// A thin wrapper around the Linux epoll syscall surface. public final class Epoll: Sendable { - public typealias Mask = Int32 - public typealias Handler = (@Sendable (Mask) -> Void) + /// A set of epoll event flags. + public struct Mask: OptionSet, Sendable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let input = Mask(rawValue: epollMask(EPOLLIN)) + public static let output = Mask(rawValue: epollMask(EPOLLOUT)) - public static let maskIn: Int32 = Int32(bitPattern: epollMask(EPOLLIN)) - public static let maskOut: Int32 = Int32(bitPattern: epollMask(EPOLLOUT)) - public static let defaultMask: Int32 = maskIn | maskOut + public var isHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) + } + + public var isRemoteHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLRDHUP))) + } + + public var readyToRead: Bool { + self.contains(.input) + } + + public var readyToWrite: Bool { + self.contains(.output) + } + } + + /// An event returned by `wait()`. + public struct Event: Sendable { + public let fd: Int32 + public let mask: Mask + } private let epollFD: Int32 - private let handlers = SafeMap() - private let pipe = Pipe() // to wake up a waiting epoll_wait + private let eventFD: Int32 public init() throws { let efd = epoll_create1(Int32(EPOLL_CLOEXEC)) - guard efd > 0 else { + guard efd >= 0 else { throw POSIXError.fromErrno() } + + let evfd = eventfd(0, Int32(EFD_CLOEXEC | EFD_NONBLOCK)) + guard evfd >= 0 else { + let evfdErrno = POSIXError.fromErrno() + close(efd) + throw evfdErrno + } + self.epollFD = efd - try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in } + self.eventFD = evfd + + // Register the eventfd with epoll for shutdown signaling. + var event = epoll_event() + event.events = epollMask(EPOLLIN) + event.data.fd = self.eventFD + let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(efd, EPOLL_CTL_ADD, self.eventFD, ptr) + } + guard ctlResult == 0 else { + let ctlErrno = POSIXError.fromErrno() + close(evfd) + close(efd) + throw ctlErrno + } + } + + deinit { + close(epollFD) + close(eventFD) } - public func add( - _ fd: Int32, - mask: Int32 = Epoll.defaultMask, - handler: @escaping Handler - ) throws { + /// Register a file descriptor for edge-triggered monitoring. + public func add(_ fd: Int32, mask: Mask) throws { guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else { throw POSIXError.fromErrno() } - let events = epollMask(EPOLLET) | UInt32(bitPattern: mask) + let events = epollMask(EPOLLET) | mask.rawValue var event = epoll_event() event.events = events event.data.fd = fd try withUnsafeMutablePointer(to: &event) { ptr in - while true { - if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { - if errno == EAGAIN || errno == EINTR { - continue - } - throw POSIXError.fromErrno() - } - break + if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { + throw POSIXError.fromErrno() } } + } - self.handlers.set(fd, handler) + /// Remove a file descriptor from the monitored collection. + public func delete(_ fd: Int32) throws { + var event = epoll_event() + let result = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32 + } + if result != 0 { + if !acceptableDeletionErrno() { + throw POSIXError.fromErrno() + } + } } - /// Run the main epoll loop. + /// Wait for events. /// - /// max events to return in a single wait - /// timeout in ms. - /// -1 means block forever. - /// 0 means return immediately if no events. - public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws { - var events: [epoll_event] = .init( - repeating: epoll_event(), - count: maxEvents - ) + /// Returns ready events, an empty array on timeout, or `nil` on shutdown. + public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event]? { + var events: [epoll_event] = .init(repeating: epoll_event(), count: maxEvents) while true { let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout) - guard n >= 0 else { + if n < 0 { if errno == EINTR || errno == EAGAIN { - continue // go back to epoll_wait + continue } - throw POSIXError.fromErrno() + preconditionFailure("epoll_wait failed unexpectedly: \(POSIXError.fromErrno())") } if n == 0 { - return // if epoll wait times out, then n will be 0 + return [] } + var result: [Event] = [] + result.reserveCapacity(Int(n)) for i in 0...size) + precondition(n == MemoryLayout.size, "eventfd write failed: \(POSIXError.fromErrno())") } // The errno's here are acceptable and can happen if the caller @@ -149,52 +187,6 @@ public final class Epoll: Sendable { private func acceptableDeletionErrno() -> Bool { errno == ENOENT || errno == EBADF || errno == EPERM } - - /// Shutdown the epoll handler. - public func shutdown() throws { - // wakes up epoll_wait and triggers a shutdown - try self.pipe.fileHandleForWriting.close() - } - - private final class SafeMap: Sendable { - let dict = Mutex<[Key: Value]>([:]) - - func set(_ key: Key, _ value: Value) { - dict.withLock { @Sendable in - $0[key] = value - } - } - - func get(_ key: Key) -> Value? { - dict.withLock { @Sendable in - $0[key] - } - } - - func del(_ key: Key) { - dict.withLock { @Sendable in - _ = $0.removeValue(forKey: key) - } - } - } -} - -extension Epoll.Mask { - public var isHangup: Bool { - (self & Int32(bitPattern: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) != 0 - } - - public var isRhangup: Bool { - (self & Int32(bitPattern: epollMask(EPOLLRDHUP))) != 0 - } - - public var readyToRead: Bool { - (self & Int32(bitPattern: epollMask(EPOLLIN))) != 0 - } - - public var readyToWrite: Bool { - (self & Int32(bitPattern: epollMask(EPOLLOUT))) != 0 - } } #endif // os(Linux) diff --git a/vminitd/Sources/vminitd/IOPair.swift b/vminitd/Sources/vminitd/IOPair.swift index 4b8a78aa..7243e16f 100644 --- a/vminitd/Sources/vminitd/IOPair.swift +++ b/vminitd/Sources/vminitd/IOPair.swift @@ -69,7 +69,7 @@ final class IOPair: Sendable { // Remove the fd from our global epoll instance first. let readFromFd = self.from.fileDescriptor do { - try ProcessSupervisor.default.poller.delete(readFromFd) + try ProcessSupervisor.default.unregisterFd(readFromFd) } catch { logger?.error("failed to delete fd from epoll \(readFromFd): \(error)") } @@ -118,7 +118,7 @@ final class IOPair: Sendable { let readFrom = OSFile(fd: readFromFd) let writeTo = OSFile(fd: writeToFd) - try ProcessSupervisor.default.poller.add(readFromFd, mask: Epoll.maskIn) { mask in + try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in self.io.withLock { io in if io.closed { return diff --git a/vminitd/Sources/vminitd/ProcessSupervisor.swift b/vminitd/Sources/vminitd/ProcessSupervisor.swift index 6bfe3e9d..b2d04ce8 100644 --- a/vminitd/Sources/vminitd/ProcessSupervisor.swift +++ b/vminitd/Sources/vminitd/ProcessSupervisor.swift @@ -20,7 +20,8 @@ import Logging import Synchronization final class ProcessSupervisor: Sendable { - let poller: Epoll + private let poller: Epoll + private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:]) private let queue: DispatchQueue // `DispatchSourceSignal` is thread-safe. @@ -47,11 +48,46 @@ final class ProcessSupervisor: Sendable { self.poller = try! Epoll() self.state = Mutex(State()) let t = Thread { - try! self.poller.run() + while true { + guard let events = self.poller.wait() else { + return + } + if events.isEmpty { + return + } + for event in events { + let handler = self.handlers.withLock { $0[event.fd] } + handler?(event.mask) + } + } } t.start() } + /// Register a file descriptor for epoll monitoring with a handler. + /// + /// The handler is stored before the fd is added to epoll, ensuring no + /// events are missed. + func registerFd( + _ fd: Int32, + mask: Epoll.Mask = [.input, .output], + handler: @escaping @Sendable (Epoll.Mask) -> Void + ) throws { + self.handlers.withLock { $0[fd] = handler } + do { + try self.poller.add(fd, mask: mask) + } catch { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + throw error + } + } + + /// Remove a file descriptor from epoll monitoring and discard its handler. + func unregisterFd(_ fd: Int32) throws { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + try self.poller.delete(fd) + } + func ready() { self.source.setEventHandler { self.handleSignal() @@ -123,6 +159,6 @@ final class ProcessSupervisor: Sendable { deinit { source.cancel() - try? poller.shutdown() + poller.shutdown() } } diff --git a/vminitd/Sources/vminitd/VsockProxy.swift b/vminitd/Sources/vminitd/VsockProxy.swift index fabdc353..31314c15 100644 --- a/vminitd/Sources/vminitd/VsockProxy.swift +++ b/vminitd/Sources/vminitd/VsockProxy.swift @@ -234,8 +234,8 @@ extension VsockProxy { ) do { - try ProcessSupervisor.default.poller.delete(clientFile.fileDescriptor) - try ProcessSupervisor.default.poller.delete(serverFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(clientFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(serverFile.fileDescriptor) try conn.close() try relayTo.close() } catch { @@ -244,7 +244,7 @@ extension VsockProxy { c.resume() } - try! ProcessSupervisor.default.poller.add(clientFile.fileDescriptor) { mask in + try! ProcessSupervisor.default.registerFd(clientFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromClient { let (fromEof, toEof) = Self.transferData( fromFile: &clientFile, @@ -270,7 +270,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromClient { + } else if mask.isRemoteHangup && !eofFromClient { // half close, shut down client to server transfer // we should see no more EPOLLIN events on the client fd // and no more EPOLLOUT events on the server fd @@ -296,7 +296,7 @@ extension VsockProxy { } } - try! ProcessSupervisor.default.poller.add(serverFile.fileDescriptor) { mask in + try! ProcessSupervisor.default.registerFd(serverFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromServer { let (fromEof, toEof) = Self.transferData( fromFile: &serverFile, @@ -322,7 +322,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromServer { + } else if mask.isRemoteHangup && !eofFromServer { // half close, shut down server to client transfer // we should see no more EPOLLIN events on the server fd // and no more EPOLLOUT events on the client fd