diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml new file mode 100644 index 0000000..3c70115 --- /dev/null +++ b/.github/workflows/build_and_test.yml @@ -0,0 +1,143 @@ +name: Build and Test + +on: + pull_request: + branches: + - main + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest] + build_type: [Release] + include: + - os: ubuntu-latest + cc: gcc + cxx: g++ + - os: windows-latest + cc: cl + cxx: cl + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup CMake + uses: jwlawson/actions-setup-cmake@master + + - name: Configure CMake + run: cmake -B build + + - name: Build + run: cmake --build build --config Release + + - name: Test + run: cmake --build build --config Release --target run_test + + release: + runs-on: ubuntu-latest + needs: build + permissions: + contents: write + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Determine library version + id: version + run: | + python - <<'PY' + import os + import re + from pathlib import Path + + header = Path('include/libsharedmemory/libsharedmemory.hpp').read_text(encoding='utf-8') + + def macro(name: str) -> str: + match = re.search(rf'#define\s+{name}\s+(\d+)', header) + if not match: + raise SystemExit(f'Macro {name} not found in header') + return match.group(1) + + major = macro('LIBSHAREDMEMORY_VERSION_MAJOR') + minor = macro('LIBSHAREDMEMORY_VERSION_MINOR') + patch = macro('LIBSHAREDMEMORY_VERSION_PATCH') + version = f'v{major}.{minor}.{patch}' + + with open(os.environ['GITHUB_OUTPUT'], 'a', encoding='utf-8') as output: + output.write(f'version={version}\n') + PY + + - name: Check if tag already exists + id: tag_check + run: | + git fetch --tags --force + if git rev-parse "${{ steps.version.outputs.version }}" >/dev/null 2>&1; then + echo "exists=true" >>"$GITHUB_OUTPUT" + exit 1 + else + echo "exists=false" >>"$GITHUB_OUTPUT" + exit 0 + fi + + - name: Generate release notes + if: steps.tag_check.outputs.exists == 'false' + id: notes + run: | + python - <<'PY' + import os + import subprocess + + def git_output(args): + result = subprocess.run(args, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return result.stdout.strip() + + last_tag = "" + try: + last_tag = git_output(["git", "describe", "--tags", "--abbrev=0"]) + except subprocess.CalledProcessError: + last_tag = "" + + if last_tag: + log_range = f"{last_tag}..HEAD" + else: + log_range = "HEAD" + + try: + notes = git_output(["git", "log", log_range, "--pretty=format:- %s (%h)"]) + except subprocess.CalledProcessError: + notes = "" + + notes = notes.strip() + if not notes: + notes = "No commits since previous release." + + if last_tag: + header = f"Changes since {last_tag}:" + else: + header = "Changes since repository start:" + + body = f"{header}\n\n{notes}" + + with open(os.environ["GITHUB_OUTPUT"], "a", encoding="utf-8") as fh: + fh.write("body<") target_include_directories(lsm INTERFACE $) target_include_directories(lsm SYSTEM INTERFACE $/include>) -if (${CMAKE_GENERATOR} MATCHES "Visual") - target_compile_options(lsm INTERFACE -W3 -EHsc) +if (MSVC) + target_compile_options(lsm INTERFACE /W3 /EHsc /std:c++20) else() - target_compile_options(lsm INTERFACE -Wall -Wno-missing-braces -std=c++11 -fPIC) + target_compile_options(lsm INTERFACE -Wall -Wno-missing-braces -std=c++20 -fPIC) endif() option(LSM_BUILD_TEST "build test" ON) diff --git a/README.md b/README.md index 748c6d9..9522f9a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,10 @@ # `libsharedmemory` -`libsharedmemory` is a small C++11 header-only library for using shared memory on Windows, Linux and macOS. `libsharedmemory` makes it easy to transfer data between isolated host OS processes. It also helps inter-connecting modules of applications that are implemented in different programming languages. It allows for simple read/write data transfer using single, indexed memory address access and also using array-types like `std::string`, `float*`, `double*`. +`libsharedmemory` is a small C++20 header-only library for using shared memory on Windows, Linux and macOS. `libsharedmemory` makes it easy to transfer data between isolated host OS processes. It also helps inter-connecting modules of applications that are implemented in different programming languages. It supports: + +- Simple read/write data transfer using single, indexed memory address access +- Array-types like `std::string`, `float*`, `double*` +- Message queue functionality with `SharedMemoryQueue` for FIFO communication @@ -28,6 +32,34 @@ std::string dataString = read$.readString(); std::cout << "UTF8 string written and read" << dataString << std::endl; ``` +### Message Queue Example + +```cpp +// Create a message queue with capacity for 10 messages, max 256 bytes each +SharedMemoryQueue writer{"messageQueue", /*capacity*/ 10, /*maxMessageSize*/ 256, /*persistent*/ true, /*isWriter*/ true}; +SharedMemoryQueue reader{"messageQueue", /*capacity*/ 10, /*maxMessageSize*/ 256, /*persistent*/ true, /*isWriter*/ false}; + +// Enqueue messages from writer +writer.enqueue("First message"); +writer.enqueue("Second message"); + +// Dequeue messages from reader +std::string msg; +if (reader.dequeue(msg)) { + std::cout << "Received: " << msg << std::endl; +} + +// Peek at next message without removing it +if (reader.peek(msg)) { + std::cout << "Next message: " << msg << std::endl; +} + +// Check queue status +std::cout << "Queue size: " << reader.size() << std::endl; +std::cout << "Is empty: " << reader.isEmpty() << std::endl; +std::cout << "Is full: " << reader.isFull() << std::endl; +``` + ## Source code package management via `npm` In case you want to use this library in your codebase, @@ -45,27 +77,38 @@ reports that are announced when running `npm audit`. Finally, it's also much easier for you to install all project dependencies by just running `npm install` in your projects root directory. Managing third party code becomes obsolete at all. -## Limits +## Features -`libsharedmemory` does only support the following datatypes (array-like): -- `std::string` -- `float*` -- `double*` +### Stream-based Transfer +`libsharedmemory` supports the following datatypes for stream-based transfer: +- `std::string` (UTF-8 compatible) +- `float*` (arrays of floats) +- `double*` (arrays of doubles) Single value access via `.data()[index]` API: - all scalar datatypes supported in C/C++ -- This library doesn't care for endinanness. This should be naturally fine +### Message Queue +`SharedMemoryQueue` provides FIFO message queue functionality: +- Thread-safe enqueue/dequeue operations +- Configurable capacity and maximum message size +- Peek functionality to inspect messages without removing them +- Suitable for single producer, single consumer or single producer, multiple consumers patterns + +## Limits + +- This library doesn't care for endianness. This should be naturally fine because shared memory shouldn't be shared between different machine architectures. However, if you plan to copy the shared buffer onto a -network layer prototcol, make sure to add an endianess indication bit. +network layer protocol, make sure to add an endianness indication bit. - Although the binary memory layout should give you no headache when compiling/linking using different compilers, the behavior is undefined. -- At the time of writing, there is no support for shared memory persistency -on Windows. Shared memory is lost after the writing process is killed. +- **SharedMemoryQueue** currently works best for single producer, single consumer +or single producer, multiple consumers scenarios. Multiple concurrent producers +require additional external synchronization. ## Memory layout @@ -92,29 +135,11 @@ enum DataType { to indicate data change. Continuous data reader will thus be able to catch every data change. -## Build - -This project is meant to be built with `cmake` and `clang`. -However, it _should_ also build with MSVC and GCC. - -```sh -./build.sh -``` - -## Test - -Test executables are built automatically and can be executed -to verify the correct function of the implementation on your machine: - -```sh -./test.sh -``` - ## License `libsharedmemory` is released under the MIT license, see the `LICENSE` file. ## Roadmap -1) Windows shared memory persistency support -2) Multi-threaded non-blocking `onChange( lambda fn )` data change handler on the read stream \ No newline at end of file +1) Multi-threaded non-blocking `onChange( lambda fn )` data change handler on the read stream +2) Support for multiple concurrent producers in SharedMemoryQueue with lock-free atomic operations diff --git a/build.sh b/build.sh deleted file mode 100755 index f94e3dc..0000000 --- a/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -cmake -B build -cmake --build build \ No newline at end of file diff --git a/include/libsharedmemory/libsharedmemory.hpp b/include/libsharedmemory/libsharedmemory.hpp index 99add7b..db8539e 100644 --- a/include/libsharedmemory/libsharedmemory.hpp +++ b/include/libsharedmemory/libsharedmemory.hpp @@ -1,35 +1,55 @@ +#pragma once -#ifndef INCLUDE_LIBSHAREDMEMORY_HPP_ -#define INCLUDE_LIBSHAREDMEMORY_HPP_ +#define LIBSHAREDMEMORY_VERSION_MAJOR 1 +#define LIBSHAREDMEMORY_VERSION_MINOR 9 +#define LIBSHAREDMEMORY_VERSION_PATCH 0 #include -#define LIBSHAREDMEMORY_VERSION_MAJOR 0 -#define LIBSHAREDMEMORY_VERSION_MINOR 0 -#define LIBSHAREDMEMORY_VERSION_PATCH 9 - -#include #include #include -#include -#include +#include #include // nullptr_t, ptrdiff_t, std::size_t +#include +#include +#include +#include +#include // added for atomic queue counters + +#if defined(__APPLE__) || defined(__linux__) || defined(__unix__) || defined(_POSIX_VERSION) || defined(__ANDROID__) +#include // O_* constants +#include +#include // mmap, munmap +#include // shm functions, close +#endif #if defined(_WIN32) #define WIN32_LEAN_AND_MEAN +#ifndef NOMINMAX +#define NOMINMAX 1 +#endif #include +#include +#include +#undef min +#undef max #undef WIN32_LEAN_AND_MEAN +#include +#include #endif -namespace lsm { +namespace lsm +{ -enum Error { - kOK = 0, - kErrorCreationFailed = 100, - kErrorMappingFailed = 110, - kErrorOpeningFailed = 120, +enum class Error +{ + OK = 0, + CreationFailed = 100, + MappingFailed = 110, + OpeningFailed = 120, }; -enum DataType { +enum DataType : std::uint8_t +{ kMemoryChanged = 1, kMemoryTypeString = 2, kMemoryTypeFloat = 4, @@ -37,45 +57,68 @@ enum DataType { }; // byte sizes of memory layout -const size_t bufferSizeSize = 4; // size_t takes 4 bytes -const size_t sizeOfOneFloat = 4; // float takes 4 bytes -const size_t sizeOfOneChar = 1; // char takes 1 byte -const size_t sizeOfOneDouble = 8; // double takes 8 bytes -const size_t flagSize = 1; // char takes 1 byte - -class Memory { +inline constexpr std::size_t bufferSizeSize = 4; // store buffer length as 32-bit value +inline constexpr std::size_t sizeOfOneFloat = 4; // float takes 4 bytes +inline constexpr std::size_t sizeOfOneChar = 1; // char takes 1 byte +inline constexpr std::size_t sizeOfOneDouble = 8; // double takes 8 bytes +inline constexpr std::size_t flagSize = 1; // char takes 1 byte + +class Memory +{ public: // path should only contain alpha-numeric characters, and is normalized // on linux/macOS. - explicit Memory(std::string path, std::size_t size, bool persist); + explicit Memory(const std::string& path, std::size_t size, bool persist); // create a shared memory area and open it for writing - inline Error create() { return createOrOpen(true); }; + [[nodiscard]] Error create() + { + return createOrOpen(true); + } // open an existing shared memory for reading - inline Error open() { return createOrOpen(false); }; + [[nodiscard]] Error open() + { + return createOrOpen(false); + } - inline std::size_t size() { return _size; }; + [[nodiscard]] std::size_t size() const noexcept + { + return _size; + } + + [[nodiscard]] const std::string &path() const noexcept + { + return _path; + } - inline const std::string &path() { return _path; } + [[nodiscard]] void *data() const noexcept + { + return _data; + } - inline void *data() { return _data; } + [[nodiscard]] std::span as_bytes() const noexcept + { + return {static_cast(_data), _size}; + } - void destroy(); + void destroy() const; void close(); ~Memory(); private: - Error createOrOpen(bool create); + [[nodiscard]] Error createOrOpen(bool create); std::string _path; void *_data = nullptr; std::size_t _size = 0; bool _persist = true; #if defined(_WIN32) - HANDLE _handle; + HANDLE _handle = nullptr; + HANDLE _fileHandle = INVALID_HANDLE_VALUE; + std::string _persistFilePath; #else int _fd = -1; #endif @@ -86,130 +129,396 @@ class Memory { #include // CreateFileMappingA, OpenFileMappingA, etc. -Memory::Memory(const std::string path, const std::size_t size, const bool persist) : _path(path), _size(size), _persist(persist) {}; +namespace lsm_windows_detail +{ + constexpr std::size_t MAX_FS_PATH = 1024; + constexpr int UTIL_PERM_READ = 4; + constexpr int UTIL_PERM_WRITE = 2; + constexpr int UTIL_PERM_EXECUTE = 1; + + inline bool AssignPermissionsToFilesystemPath(const std::string path, int permissionsBitMask) + { + //for simplicity, creator will always have full permissions, + //while group and others will be assigned the same permissions based on the bitmask + char buffer[MAX_FS_PATH] = { 0 }; + PSID usersSid = nullptr; + + if (path.empty() || path.size() >= MAX_FS_PATH) + return false; + + if (!(permissionsBitMask & (UTIL_PERM_READ | UTIL_PERM_WRITE | UTIL_PERM_EXECUTE))) + { + return false; + } + + strcpy_s(buffer, sizeof(buffer), path.c_str()); + + if (!ConvertStringSidToSidA("S-1-5-32-545", &usersSid)) //built-in Users + return false; + + EXPLICIT_ACCESSA accessPermissions = {}; + + accessPermissions.grfAccessPermissions = 0; + + if (permissionsBitMask & UTIL_PERM_READ) + { + accessPermissions.grfAccessPermissions |= FILE_GENERIC_READ; + } + + if (permissionsBitMask & UTIL_PERM_WRITE) + { + accessPermissions.grfAccessPermissions |= FILE_GENERIC_WRITE; + accessPermissions.grfAccessPermissions |= DELETE; // this is the natural place to put this if needed later + } + + if (permissionsBitMask & UTIL_PERM_EXECUTE) + { + accessPermissions.grfAccessPermissions |= FILE_GENERIC_EXECUTE; + } + + accessPermissions.grfAccessMode = GRANT_ACCESS; + accessPermissions.grfInheritance = SUB_CONTAINERS_AND_OBJECTS_INHERIT; + accessPermissions.Trustee.TrusteeForm = TRUSTEE_IS_SID; + accessPermissions.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP; + accessPermissions.Trustee.ptstrName = (LPCH)usersSid; + + // Retrieve existing DACL to merge with new ACE + PACL oldDacl = nullptr; + PSECURITY_DESCRIPTOR pSD = nullptr; + DWORD result = GetNamedSecurityInfoA( + buffer, + SE_FILE_OBJECT, + DACL_SECURITY_INFORMATION, + nullptr, + nullptr, + &oldDacl, + nullptr, + &pSD + ); + + if (result != ERROR_SUCCESS) + { + LocalFree(usersSid); + return false; + } + + // Merge new ACE with existing DACL + PACL newAcl = nullptr; + result = SetEntriesInAclA(1, &accessPermissions, oldDacl, &newAcl); + + // Free security descriptor (contains oldDacl) + if (pSD) + LocalFree(pSD); + + if (result != ERROR_SUCCESS) + { + LocalFree(usersSid); + return false; + } + + result = SetNamedSecurityInfoA(buffer, SE_FILE_OBJECT, DACL_SECURITY_INFORMATION, NULL, NULL, newAcl, NULL); + + LocalFree(usersSid); + LocalFree(newAcl); + + return result == ERROR_SUCCESS; + } + + inline std::string GetSystemStorageDirectory() + { + char* programData = nullptr; + size_t len = 0; + if (_dupenv_s(&programData, &len, "PROGRAMDATA") == 0 && programData != nullptr) + { + std::filesystem::path storagePath = std::filesystem::path(programData) / "shared_memory"; + free(programData); + + // Create directory if it doesn't exist + std::error_code ec; + std::filesystem::create_directories(storagePath, ec); + if (ec) + { + // Directory creation failed + return {}; + } + + AssignPermissionsToFilesystemPath(storagePath.string(), UTIL_PERM_READ | UTIL_PERM_WRITE); + + return storagePath.string(); + } + // If PROGRAMDATA is not set, return empty string + return {}; + } + + inline std::string sanitize_name(const std::string& name) + { + std::string sanitized = name; + const std::string invalid = "\\/:*?\"<>|"; + for (size_t idx = 0; idx < sanitized.size(); ++idx) + { + const char ch = sanitized[idx]; + if (ch < 32 || invalid.find(ch) != std::string::npos) + { + sanitized[idx] = '_'; + } + } + return sanitized; + } + + inline std::string persistence_file_path(const std::string& name) + { + std::string basePath = GetSystemStorageDirectory(); + if (!basePath.empty()) + { + if (const char last = basePath[basePath.size() - 1]; + last != '\\' && last != '/') + { + basePath.push_back('\\'); + } + } + basePath += "lsm_"; + basePath += sanitize_name(name); + basePath += ".shm"; + return basePath; + } +} + +Memory::Memory(const std::string& path, std::size_t size, bool persist) : _path(path), _size(size), _persist(persist) +{ + if (_persist) + { + _persistFilePath = lsm_windows_detail::persistence_file_path(_path); + } +} + +Error Memory::createOrOpen(const bool create) +{ + const DWORD size_high_order = static_cast((static_cast(_size) >> 32) & 0xFFFFFFFFull); + const DWORD size_low_order = static_cast(static_cast(_size) & 0xFFFFFFFFull); + + if (_persist) + { + if (_persistFilePath.empty()) + { + _persistFilePath = lsm_windows_detail::persistence_file_path(_path); + } + + // OPEN_ALWAYS: Opens if exists, creates if not - allows for persistent reuse + // CREATE_ALWAYS: Always creates new, truncating existing - forces fresh start + const DWORD disposition = create ? CREATE_ALWAYS : OPEN_EXISTING; + HANDLE fileHandle = CreateFileA(_persistFilePath.c_str(), + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, + disposition, + FILE_ATTRIBUTE_NORMAL, + NULL); + + if (fileHandle == INVALID_HANDLE_VALUE) + { + return create ? Error::CreationFailed : Error::OpeningFailed; + } + + CloseHandle(fileHandle); // will be reopened, this is just to ensure the file exists and has correct permissions -Error Memory::createOrOpen(const bool create) { - if (create) { - DWORD size_high_order = 0; - DWORD size_low_order = static_cast(size_); + lsm_windows_detail::AssignPermissionsToFilesystemPath(_persistFilePath, lsm_windows_detail::UTIL_PERM_READ | lsm_windows_detail::UTIL_PERM_WRITE); + + fileHandle = CreateFileA(_persistFilePath.c_str(), + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, + disposition, + FILE_ATTRIBUTE_NORMAL, + NULL); + + if (fileHandle == INVALID_HANDLE_VALUE) + { + return create ? Error::CreationFailed : Error::OpeningFailed; + } - _handle = CreateFileMappingA(INVALID_HANDLE_VALUE, // use paging file - NULL, // default security - PAGE_READWRITE, // read/write access - size_high_order, size_low_order, - _path.c_str() // name of mapping object - ); + LARGE_INTEGER requiredSize; + requiredSize.QuadPart = static_cast(_size); + + // Always resize to required size in create mode to ensure proper initialization + bool resizeFile = create; + if (!create) + { + LARGE_INTEGER currentSize; + if (GetFileSizeEx(fileHandle, ¤tSize)) + { + resizeFile = currentSize.QuadPart < requiredSize.QuadPart; + } + } - if (!_handle) { - return kErrorCreationFailed; + if (resizeFile) + { + if (!SetFilePointerEx(fileHandle, requiredSize, NULL, FILE_BEGIN) || !SetEndOfFile(fileHandle)) + { + CloseHandle(fileHandle); + return Error::CreationFailed; + } } - } else { - _handle = OpenFileMappingA(FILE_MAP_READ, // read access - FALSE, // do not inherit the name - _path.c_str() // name of mapping object - ); - // TODO: Windows has no default support for shared memory persistence - // see: destroy() to implement that + _fileHandle = fileHandle; + + _handle = CreateFileMappingA(_fileHandle, + NULL, + PAGE_READWRITE, + size_high_order, + size_low_order, + NULL); + + if (!_handle) + { + CloseHandle(_fileHandle); + _fileHandle = INVALID_HANDLE_VALUE; + return Error::MappingFailed; + } + } + else + { + if (create) + { + // For ephemeral memory, try to create with the name + // If it already exists, we'll get a handle to the existing one + _handle = CreateFileMappingA(INVALID_HANDLE_VALUE, // use paging file + NULL, // default security + PAGE_READWRITE, // read/write access + size_high_order, size_low_order, + _path.c_str()); // name of mapping object + + if (!_handle) + { + return Error::CreationFailed; + } - if (!_handle) { - return kErrorOpeningFailed; + // Check if we opened an existing mapping (can happen in multi-process scenarios) + // If GetLastError() returns ERROR_ALREADY_EXISTS, the mapping already existed + // For ephemeral memory in create mode, this is typically fine since the + // destructor will clean it up when all processes are done + } + else + { + _handle = OpenFileMappingA(FILE_MAP_ALL_ACCESS, // read/write access + FALSE, // do not inherit the name + _path.c_str()); // name of mapping object + + if (!_handle) + { + return Error::OpeningFailed; + } } } - // TODO: might want to use GetWriteWatch to get called whenever - // the memory section changes - // https://docs.microsoft.com/de-de/windows/win32/api/memoryapi/nf-memoryapi-getwritewatch?redirectedfrom=MSDN + // Change detection relies on explicit flags to keep the implementation lightweight - const DWORD access = create ? FILE_MAP_ALL_ACCESS : FILE_MAP_READ; + const DWORD access = FILE_MAP_ALL_ACCESS; // always request read/write view _data = MapViewOfFile(_handle, access, 0, 0, _size); - if (!_data) { - return kErrorMappingFailed; + if (!_data) + { + close(); + return Error::MappingFailed; } - return kOK; + return Error::OK; } -void Memory::destroy() { +void Memory::destroy() const +{ + if (_persistFilePath.empty()) + { + return; + } - // TODO: Windows needs priviledges to define a shared memory (file mapping) - // OBJ_PERMANENT; furthermore, ZwCreateSection would need to be used. - // Instead of doing this; saving a file here (by name, temp dir) - // and reading memory from file in createOrOpen seems more suitable. - // Especially, because files can be removed on reboot using: - // MoveFileEx() with the MOVEFILE_DELAY_UNTIL_REBOOT flag and lpNewFileName - // set to NULL. + if (!DeleteFileA(_persistFilePath.c_str())) + { + MoveFileExA(_persistFilePath.c_str(), NULL, MOVEFILE_DELAY_UNTIL_REBOOT); + } } -void Memory::close() { - if (_data) { +void Memory::close() +{ + if (_data) + { UnmapViewOfFile(_data); _data = nullptr; } - CloseHandle(_handle); + if (_handle) + { + CloseHandle(_handle); + _handle = nullptr; + } + if (_fileHandle != INVALID_HANDLE_VALUE) + { + CloseHandle(_fileHandle); + _fileHandle = INVALID_HANDLE_VALUE; + } } -Memory::~Memory() { +Memory::~Memory() +{ close(); - if (!_persist) { - destroy(); + if (!_persist) + { + destroy(); } } #endif // defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +// POSIX shared memory implementation #if defined(__APPLE__) || defined(__linux__) || defined(__unix__) || defined(_POSIX_VERSION) || defined(__ANDROID__) -#include // for O_* constants -#include // mmap, munmap -#include // for mode constants -#include // unlink - -#if defined(__APPLE__) - -#include - -#endif // __APPLE__ - -#include - -inline Memory::Memory(const std::string path, const std::size_t size, const bool persist) : _size(size), _persist(persist) { +inline Memory::Memory(const std::string& path, const std::size_t size, const bool persist) : _size(size), _persist(persist) +{ _path = "/" + path; -}; +} -inline Error Memory::createOrOpen(const bool create) { - if (create) { +inline Error Memory::createOrOpen(const bool create) +{ + if (create) + { // shm segments persist across runs, and macOS will refuse // to ftruncate an existing shm segment, so to be on the safe // side, we unlink it beforehand. const int ret = shm_unlink(_path.c_str()); - if (ret < 0) { - if (errno != ENOENT) { - return kErrorCreationFailed; + if (ret < 0) + { + if (errno != ENOENT) + { + return Error::CreationFailed; } } } - const int flags = create ? (O_CREAT | O_RDWR) : O_RDONLY; + const int flags = create ? (O_CREAT | O_RDWR) : O_RDWR; + + _fd = shm_open(_path.c_str(), flags, 0777); + fchmod(_fd, 0777); //explicit - _fd = shm_open(_path.c_str(), flags, 0755); - if (_fd < 0) { - if (create) { - return kErrorCreationFailed; - } else { - return kErrorOpeningFailed; + if (_fd < 0) + { + if (create) + { + return Error::CreationFailed; + } + else + { + return Error::OpeningFailed; } } - if (create) { + if (create) + { // this is the only way to specify the size of a // newly-created POSIX shared memory object - int ret = ftruncate(_fd, _size); - if (ret != 0) { - return kErrorCreationFailed; + const int ret = ftruncate(_fd, static_cast(_size)); + if (ret != 0) + { + return Error::CreationFailed; } } - const int prot = create ? (PROT_READ | PROT_WRITE) : PROT_READ; + constexpr int prot = PROT_READ | PROT_WRITE; _data = mmap(nullptr, // addr _size, // length @@ -219,96 +528,108 @@ inline Error Memory::createOrOpen(const bool create) { 0 // offset ); - if (_data == MAP_FAILED) { - return kErrorMappingFailed; + if (_data == MAP_FAILED) + { + return Error::MappingFailed; } - if (!_data) { - return kErrorMappingFailed; + if (!_data) + { + return Error::MappingFailed; } - return kOK; + return Error::OK; } -inline void Memory::destroy() { shm_unlink(_path.c_str()); } +inline void Memory::destroy() const +{ + shm_unlink(_path.c_str()); +} -inline void Memory::close() { - munmap(_data, _size); - ::close(_fd); +inline void Memory::close() +{ + munmap(_data, _size); + if (_fd >= 0) + { + const int fd_to_close = _fd; + _fd = -1; + // call POSIX close directly to avoid name collision with method close() + ::close(fd_to_close); + } } -inline Memory::~Memory() { +inline Memory::~Memory() +{ close(); - if (!_persist) { + if (!_persist) + { destroy(); } } -#endif // defined(__APPLE__) || defined(__linux__) || defined(__unix__) || defined(_POSIX_VERSION) || defined(__ANDROID__) +#endif // POSIX implementation -class SharedMemoryReadStream { +class SharedMemoryReadStream +{ public: - - SharedMemoryReadStream(const std::string name, const std::size_t bufferSize, const bool isPersistent): - _memory(name, bufferSize, isPersistent) { - - if (_memory.open() != kOK) { - throw "Shared memory segment could not be opened."; + SharedMemoryReadStream(const std::string& name, const std::size_t bufferSize, const bool isPersistent): + _memory(name, bufferSize, isPersistent) + { + if (_memory.open() != Error::OK) + { + throw std::runtime_error("Shared memory segment could not be opened."); } } - inline char readFlags() { - char* memory = (char*) _memory.data(); - return memory[0]; + [[nodiscard]] char readFlags() const noexcept + { + const auto memory = static_cast(_memory.data()); + return memory[0]; + } + + [[nodiscard]] bool hasNewData() const noexcept + { + const char flags = readFlags(); + return !!(flags & kMemoryChanged); } - inline void close() { _memory.close(); } + void markAsRead() const noexcept + { + auto memory = static_cast(_memory.data()); + memory[0] &= ~kMemoryChanged; + } + void close() + { + _memory.close(); + } - inline size_t readSize(char dataType) { - void *memory = _memory.data(); - std::size_t size = 0; + [[nodiscard]] size_t readSize(const char /*dataType*/) const noexcept + { + const auto memory = static_cast(_memory.data()); + std::uint32_t storedSize = 0; + std::memcpy(&storedSize, &memory[flagSize], bufferSizeSize); + return static_cast(storedSize); + } - // TODO(kyr0): should be clarified why we need to use size_t there - // for the size to be received correctly, but in float, we need int - // Might be prone to undefined behaviour; should be tested - // with various compilers; otherwise use memcpy() for the size - // and align the memory with one cast. + [[nodiscard]] size_t readLength(const char dataType) const noexcept + { + const size_t size = readSize(dataType); - if (dataType & kMemoryTypeDouble) { - size_t *intMemory = (size_t *)memory; - // copy size data to size variable - std::memcpy(&size, &intMemory[flagSize], bufferSizeSize); + if (dataType & kMemoryTypeString) + { + return size / sizeOfOneChar; } - if (dataType & kMemoryTypeFloat) { - int* intMemory = (int*) memory; - // copy size data to size variable - std::memcpy(&size, &intMemory[flagSize], bufferSizeSize); + if (dataType & kMemoryTypeFloat) + { + return size / sizeOfOneFloat; } - if (dataType & kMemoryTypeString) { - char* charMemory = (char*) memory; - // copy size data to size variable - std::memcpy(&size, &charMemory[flagSize], bufferSizeSize); + if (dataType & kMemoryTypeDouble) + { + return size / sizeOfOneDouble; } - return size; - } - - inline size_t readLength(char dataType) { - size_t size = readSize(dataType); - - if (dataType & kMemoryTypeString) { - return size / sizeOfOneChar; - } - - if (dataType & kMemoryTypeFloat) { - return size / sizeOfOneFloat; - } - - if (dataType & kMemoryTypeDouble) { - return size / sizeOfOneDouble; - } - return 0; + return 0; } /** @@ -317,98 +638,117 @@ class SharedMemoryReadStream { * * @return float* */ - // TODO: might wanna use templated functions here like: readNumericArray() - inline double* readDoubleArray() { - void *memory = _memory.data(); - std::size_t size = readSize(kMemoryTypeDouble); - double* typedMemory = (double*) memory; - - // allocating memory on heap (this might leak) - double *data = new double[size / sizeOfOneDouble](); - - // copy to data buffer - std::memcpy(data, &typedMemory[flagSize + bufferSizeSize], size); - - return data; + [[nodiscard]] double* readDoubleArray() const + { + return readNumericArray(kMemoryTypeDouble, sizeOfOneDouble); } /** * @brief Returns a float* read from shared memory * Caller has the obligation to call delete [] on the returning float*. - * - * @return float* + * + * @return float* */ - inline float* readFloatArray() { - void *memory = _memory.data(); - float *typedMemory = (float *)memory; - - std::size_t size = readSize(kMemoryTypeFloat); - - // allocating memory on heap (this might leak) - float *data = new float[size / sizeOfOneFloat](); - - // copy to data buffer - std::memcpy(data, &typedMemory[flagSize + bufferSizeSize], size); - - return data; + [[nodiscard]] float* readFloatArray() const + { + return readNumericArray(kMemoryTypeFloat, sizeOfOneFloat); } - inline std::string readString() { - char* memory = (char*) _memory.data(); + [[nodiscard]] std::string readString() const + { + const auto memory = static_cast(_memory.data()); - std::size_t size = readSize(kMemoryTypeString); + const std::size_t size = readSize(kMemoryTypeString); // create a string that copies the data from memory - std::string data = - std::string(&memory[flagSize + bufferSizeSize], size); - + auto data = std::string(&memory[flagSize + bufferSizeSize], size); + return data; } private: + template + [[nodiscard]] T* readNumericArray(const char typeFlag, const std::size_t elementSize) const + { + const auto memory = static_cast(_memory.data()); + const std::size_t byteSize = readSize(typeFlag); + const std::size_t length = byteSize / elementSize; + + auto data = new T[length](); + std::memcpy(data, &memory[flagSize + bufferSizeSize], byteSize); + + return data; + } + Memory _memory; }; -class SharedMemoryWriteStream { +class SharedMemoryWriteStream +{ public: + SharedMemoryWriteStream(const std::string& name, const std::size_t bufferSize, const bool isPersistent): + _memory(name, bufferSize, isPersistent) + { + if (_memory.create() != Error::OK) + { + throw std::runtime_error("Shared memory segment could not be created."); + } + } - SharedMemoryWriteStream(const std::string name, const std::size_t bufferSize, const bool isPersistent): - _memory(name, bufferSize, isPersistent) { + void close() + { + _memory.close(); + } - if (_memory.create() != kOK) { - throw "Shared memory segment could not be created."; - } + [[nodiscard]] bool isMessageRead() const noexcept + { + const auto memory = static_cast(_memory.data()); + const char flags = memory[0]; + return !(flags & kMemoryChanged); } - inline void close() { - _memory.close(); + void waitForRead() const noexcept + { + while (!isMessageRead()) + { + std::this_thread::yield(); + } } // https://stackoverflow.com/questions/18591924/how-to-use-bitmask - inline char getWriteFlags(const char type, - const char currentFlags) { + [[nodiscard]] static constexpr char getWriteFlags(const char type, const char currentFlags) noexcept + { char flags = type; - if ((currentFlags & (kMemoryChanged)) == kMemoryChanged) { + if ((currentFlags & (kMemoryChanged)) == kMemoryChanged) + { // disable flag, leave rest untouched flags &= ~kMemoryChanged; - } else { + } + else + { // enable flag, leave rest untouched flags ^= kMemoryChanged; } return flags; } - inline void write(const std::string& string) { - char* memory = (char*) _memory.data(); + void write(std::string_view string) const + { + const auto memory = static_cast(_memory.data()); + + if (string.size() > std::numeric_limits::max()) + { + throw std::runtime_error("String payload exceeds maximum shared memory size."); + } // 1) copy change flag into buffer for change detection - char flags = getWriteFlags(kMemoryTypeString, ((char*) _memory.data())[0]); + const char flags = getWriteFlags(kMemoryTypeString, memory[0]); std::memcpy(&memory[0], &flags, flagSize); // 2) copy buffer size into buffer (meta data for deserializing) const char *stringData = string.data(); - const std::size_t bufferSize = string.size(); + const auto bufferSize = static_cast(string.size()); // write data std::memcpy(&memory[flagSize], &bufferSize, bufferSizeSize); @@ -417,46 +757,293 @@ class SharedMemoryWriteStream { std::memcpy(&memory[flagSize + bufferSizeSize], stringData, bufferSize); } - // TODO: might wanna use template function here for numeric arrays, - // like void writeNumericArray( data, std::size_t length) - inline void write(float* data, std::size_t length) { - float* memory = (float*) _memory.data(); + void write(std::span data) const + { + writeNumericArray(data, kMemoryTypeFloat); + } - char flags = getWriteFlags(kMemoryTypeFloat, ((char*) _memory.data())[0]); - std::memcpy(&memory[0], &flags, flagSize); + void write(const float* data, const std::size_t length) const + { + write(std::span(data, length)); + } - // 2) copy buffer size into buffer (meta data for deserializing) - const std::size_t bufferSize = length * sizeOfOneFloat; - std::memcpy(&memory[flagSize], &bufferSize, bufferSizeSize); - - // 3) copy float* into memory buffer - std::memcpy(&memory[flagSize + bufferSizeSize], data, bufferSize); + void write(std::span data) const + { + writeNumericArray(data, kMemoryTypeDouble); } - inline void write(double* data, std::size_t length) { - double* memory = (double*) _memory.data(); + void write(const double* data, const std::size_t length) const + { + write(std::span(data, length)); + } - char flags = getWriteFlags(kMemoryTypeDouble, ((char*) _memory.data())[0]); - std::memcpy(&memory[0], &flags, flagSize); + void destroy() const + { + _memory.destroy(); + } - // 2) copy buffer size into buffer (meta data for deserializing) - const std::size_t bufferSize = length * sizeOfOneDouble; +private: + template + requires std::is_floating_point_v + void writeNumericArray(std::span data, const char typeFlag) const + { + const std::size_t length = data.size(); + + if (length > 0 && length > (std::numeric_limits::max() / sizeof(T))) + { + throw std::runtime_error("Numeric payload exceeds maximum shared memory size."); + } + + const auto memory = static_cast(_memory.data()); + + const char flags = getWriteFlags(typeFlag, memory[0]); + std::memcpy(&memory[0], &flags, flagSize); + const auto bufferSize = static_cast(length * sizeof(T)); std::memcpy(&memory[flagSize], &bufferSize, bufferSizeSize); - - // 3) copy double* into memory buffer - std::memcpy(&memory[flagSize + bufferSizeSize], data, bufferSize); + std::memcpy(&memory[flagSize + bufferSizeSize], data.data(), bufferSize); } - inline void destroy() { - _memory.destroy(); - } + Memory _memory; +}; +/** + * @brief Queue structure for shared memory + * Layout: [writeIndex(4)][readIndex(4)][capacity(4)][count(4)][maxMessageSize(4)][messages...] + */ +class SharedMemoryQueue +{ private: + static constexpr std::size_t kWriteIndexOffset = 0; + static constexpr std::size_t kReadIndexOffset = 4; + static constexpr std::size_t kCapacityOffset = 8; + static constexpr std::size_t kCountOffset = 12; + static constexpr std::size_t kMaxMessageSizeOffset = 16; + static constexpr std::size_t kHeaderSize = 20; + Memory _memory; -}; + std::uint32_t _capacity; + std::uint32_t _maxMessageSize; + bool _isWriter; + + [[nodiscard]] std::uint32_t readUInt32(std::size_t offset) const noexcept + { + const auto memory = static_cast(_memory.data()); + std::uint32_t value = 0; + std::memcpy(&value, &memory[offset], sizeof(std::uint32_t)); + return value; + } + + void writeUInt32(std::size_t offset, std::uint32_t value) const noexcept + { + auto memory = static_cast(_memory.data()); + std::memcpy(&memory[offset], &value, sizeof(std::uint32_t)); + } + [[nodiscard]] std::size_t getMessageOffset(std::uint32_t index) const noexcept + { + // Each slot contains: [length(4)][data(maxMessageSize)] + return kHeaderSize + index * (_maxMessageSize + sizeof(std::uint32_t)); + } -}; // namespace lsm + // Helper to access atomic count field in shared memory + [[nodiscard]] std::atomic& atomicCount() const noexcept + { + auto memory = static_cast(_memory.data()); + return *reinterpret_cast*>(&memory[kCountOffset]); + } + +public: + /** + * @brief Create or open a shared memory queue + * @param name Queue name + * @param capacity Maximum number of messages in queue + * @param maxMessageSize Maximum size of each message in bytes + * @param isPersistent Whether the queue persists after process exit + * @param isWriter True to create/write, false to open/read + */ + SharedMemoryQueue(const std::string& name, std::uint32_t capacity, + std::uint32_t maxMessageSize, bool isPersistent, bool isWriter) + : _memory(name, kHeaderSize + capacity * (maxMessageSize + sizeof(std::uint32_t)), isPersistent) + , _capacity(capacity) + , _maxMessageSize(maxMessageSize) + , _isWriter(isWriter) + { + if (isWriter) + { + if (_memory.create() != Error::OK) + { + throw std::runtime_error("Shared memory queue could not be created."); + } + + // Initialize queue metadata + writeUInt32(kWriteIndexOffset, 0); + writeUInt32(kReadIndexOffset, 0); + writeUInt32(kCapacityOffset, capacity); + // construct atomic count with placement new to ensure proper atomic object initialization + auto memory = static_cast(_memory.data()); + new (&memory[kCountOffset]) std::atomic(0); + writeUInt32(kMaxMessageSizeOffset, maxMessageSize); + } + else + { + if (_memory.open() != Error::OK) + { + throw std::runtime_error("Shared memory queue could not be opened."); + } + + // Read queue metadata + _capacity = readUInt32(kCapacityOffset); + _maxMessageSize = readUInt32(kMaxMessageSizeOffset); + } + } + + [[nodiscard]] bool isEmpty() const noexcept + { + return atomicCount().load(std::memory_order_acquire) == 0; + } + + [[nodiscard]] bool isFull() const noexcept + { + return atomicCount().load(std::memory_order_acquire) >= _capacity; + } + + [[nodiscard]] std::uint32_t size() const noexcept + { + return atomicCount().load(std::memory_order_acquire); + } + + [[nodiscard]] std::uint32_t capacity() const noexcept + { + return _capacity; + } + + /** + * @brief Enqueue a message (writer only) + * @param message Message to enqueue + * @return true if message was enqueued, false if queue is full + */ + bool enqueue(std::string_view message) + { + if (!_isWriter) + { + throw std::runtime_error("Cannot enqueue from a reader queue instance."); + } + + if (message.size() > _maxMessageSize) + { + throw std::runtime_error("Message exceeds maximum message size."); + } + + if (isFull()) + { + return false; + } + + const std::uint32_t writeIndex = readUInt32(kWriteIndexOffset); + const std::size_t offset = getMessageOffset(writeIndex); + + auto memory = static_cast(_memory.data()); + + // Write message length + const auto messageLength = static_cast(message.size()); + std::memcpy(&memory[offset], &messageLength, sizeof(std::uint32_t)); + + // Write message data + std::memcpy(&memory[offset + sizeof(std::uint32_t)], message.data(), messageLength); + + // Update write index (circular) + const std::uint32_t newWriteIndex = (writeIndex + 1) % _capacity; + writeUInt32(kWriteIndexOffset, newWriteIndex); + + // atomic increment of count + atomicCount().fetch_add(1, std::memory_order_release); + + return true; + } + + /** + * @brief Dequeue a message (reader only) + * @param message Output parameter for dequeued message + * @return true if message was dequeued, false if queue is empty + */ + bool dequeue(std::string& message) + { + if (_isWriter) + { + throw std::runtime_error("Cannot dequeue from a writer queue instance."); + } + + if (isEmpty()) + { + return false; + } + + const std::uint32_t readIndex = readUInt32(kReadIndexOffset); + const std::size_t offset = getMessageOffset(readIndex); + + const auto memory = static_cast(_memory.data()); + + // Read message length + std::uint32_t messageLength = 0; + std::memcpy(&messageLength, &memory[offset], sizeof(std::uint32_t)); -#endif // INCLUDE_LIBSHAREDMEMORY_HPP_ \ No newline at end of file + // Read message data + message.resize(messageLength); + std::memcpy(&message[0], &memory[offset + sizeof(std::uint32_t)], messageLength); + + // Update read index (circular) + const std::uint32_t newReadIndex = (readIndex + 1) % _capacity; + writeUInt32(kReadIndexOffset, newReadIndex); + + // atomic decrement of count + atomicCount().fetch_sub(1, std::memory_order_release); + + return true; + } + + /** + * @brief Peek at the next message without dequeuing (reader only) + * @param message Output parameter for peeked message + * @return true if message was peeked, false if queue is empty + */ + bool peek(std::string& message) const + { + if (_isWriter) + { + throw std::runtime_error("Cannot peek from a writer queue instance."); + } + + if (isEmpty()) + { + return false; + } + + const std::uint32_t readIndex = readUInt32(kReadIndexOffset); + const std::size_t offset = getMessageOffset(readIndex); + + const auto memory = static_cast(_memory.data()); + + // Read message length + std::uint32_t messageLength = 0; + std::memcpy(&messageLength, &memory[offset], sizeof(std::uint32_t)); + + // Read message data + message.resize(messageLength); + std::memcpy(&message[0], &memory[offset + sizeof(std::uint32_t)], messageLength); + + return true; + } + + void close() + { + _memory.close(); + } + + void destroy() const + { + _memory.destroy(); + } +}; + +}; // namespace lsm diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index 1b33488..0000000 --- a/package-lock.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "name": "cpp_libsharedmemory", - "version": "0.0.5", - "lockfileVersion": 1 -} diff --git a/package.json b/package.json deleted file mode 100644 index 946fe26..0000000 --- a/package.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "name": "cpp_libsharedmemory", - "version": "0.0.9", - "description": "C++11 header-only library for using shared memory on Windows, Linux and macOS", - "main": "include/libsharedmemory/libsharedmemory.hpp", - "directories": { - "test": "test" - }, - "files": [ - "include" - ], - "scripts": { - "build": "./build.sh", - "pretest": "npm run build", - "test": "./test.sh" - }, - "repository": { - "type": "git", - "url": "git+https://github.com/kyr0/libsharedmemory.git" - }, - "keywords": [ - "cpp", - "shared", - "memory", - "cross-platform", - "library" - ], - "author": "Aron Homberg ", - "license": "MIT", - "bugs": { - "url": "https://github.com/kyr0/libsharedmemory/issues" - }, - "homepage": "https://github.com/kyr0/libsharedmemory#readme" -} \ No newline at end of file diff --git a/test.sh b/test.sh deleted file mode 100755 index b17d410..0000000 --- a/test.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -./build/test/lsm_test && echo "ALL TESTS SUCCESSFUL!" || echo "TEST(S) FAILED!" \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 501ce96..02d60c2 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2,7 +2,7 @@ set(source_files test.cc) add_executable(lsm_test ${source_files}) target_link_libraries(lsm_test PUBLIC lsm) target_include_directories(lsm_test PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) -set_property(TARGET lsm_test PROPERTY CXX_STANDARD 11) +set_property(TARGET lsm_test PROPERTY CXX_STANDARD 20) if(MSVC) target_compile_definitions(lsm_test PRIVATE TYPE_SAFE_TEST_NO_STATIC_ASSERT) @@ -10,4 +10,10 @@ elseif(CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 5.0) target_compile_definitions(lsm_test PRIVATE TYPE_SAFE_TEST_NO_STATIC_ASSERT) endif() -add_test(NAME test COMMAND lsm_test) \ No newline at end of file +add_test(NAME test COMMAND lsm_test) + +add_custom_target(run_test + COMMAND lsm_test + DEPENDS lsm_test + WORKING_DIRECTORY ${CMAKE_PROJECT_DIR} +) diff --git a/test/test.cc b/test/test.cc index b5d33dd..64d9bde 100644 --- a/test/test.cc +++ b/test/test.cc @@ -4,23 +4,40 @@ #include #include #include +#include +#include +#include +#include +#include +#include using namespace std; using namespace lsm; -const lest::test specification[] = { - CASE("shared memory can be created and opened and transfer uint8_t") { +namespace +{ +inline void log_test_message(const std::string& message) +{ + static int counter = 0; + std::cout << ++counter << ". " << message << std::endl; +} +} + +const lest::test specification[] = +{ + CASE("shared memory can be created and opened and transfer uint8_t") + { Memory memoryWriter {"lsmtest", 64, true}; - EXPECT(kOK == memoryWriter.create()); + EXPECT(Error::OK == memoryWriter.create()); - ((uint8_t*)memoryWriter.data())[0] = 0x11; - ((uint8_t*)memoryWriter.data())[1] = 0x34; + static_cast(memoryWriter.data())[0] = 0x11; + static_cast(memoryWriter.data())[1] = 0x34; Memory memoryReader{"lsmtest", 64, true}; - EXPECT(kOK == memoryReader.open()); + EXPECT(Error::OK == memoryReader.open()); - std::cout << "1. single uint8_t: SUCCESS" << std::endl; + log_test_message("single uint8_t: SUCCESS"); EXPECT(0x11 == ((uint8_t*)memoryReader.data())[0]); EXPECT(0x34 == ((uint8_t *)memoryReader.data())[1]); @@ -29,35 +46,38 @@ const lest::test specification[] = { memoryReader.close(); }, - CASE("non-existing shared memory objects err") { + CASE("non-existing shared memory objects err") + { Memory memoryReader{"lsmtest2", 64, true}; - EXPECT(kErrorOpeningFailed == memoryReader.open()); - std::cout << "2. error when opening non-existing segment: SUCCESS" << std::endl; + EXPECT(Error::OpeningFailed == memoryReader.open()); + log_test_message("error when opening non-existing segment: SUCCESS"); }, - CASE("using MemoryStreamWriter and MemoryStreamReader to transfer std::string") { - - std::string dataToTransfer = "{ foo: 'coolest IPC ever! 🧑‍💻' }"; + CASE("using MemoryStreamWriter and MemoryStreamReader to transfer std::string") + { + const std::string dataToTransfer = "{ foo: 'coolest IPC ever! 🧑‍💻' }"; SharedMemoryWriteStream write${"jsonPipe", 65535, true}; SharedMemoryReadStream read${"jsonPipe", 65535, true}; write$.write(dataToTransfer); - std::string dataString = read$.readString(); + const std::string dataString = read$.readString(); - std::cout << "3. std::string (UTF8): SUCCESS | " << dataString << std::endl; + std::ostringstream msg; + msg << "std::string (UTF8): SUCCESS | " << dataString; + log_test_message(msg.str()); EXPECT(dataToTransfer == dataString); write$.close(); read$.close(); - } - , - - CASE("Write more then less, then read") { + }, - for (int i=0; i<1000; i++) { + CASE("Write more then less, then read") + { + for (int i=0; i<1000; i++) + { SharedMemoryWriteStream write${"varyingDataSizePipe", 65535, true}; SharedMemoryReadStream read${"varyingDataSizePipe", 65535, true}; @@ -74,33 +94,32 @@ const lest::test specification[] = { write$.close(); read$.close(); } - std::cout << "4. std::string more/less: SUCCESS; 1000 runs" - << std::endl; - + log_test_message("std::string more/less: SUCCESS; 1000 runs"); }, - CASE("Write a lot") { - std::string blob = - "ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab" - "😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃a" - "b😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃" - "ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab" - "😃ab😃ab😃ab😃ab😃ab"; + CASE("Write a lot") + { + const std::string blob = + "ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab" + "😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃a" + "b😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃" + "ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab😃ab" + "😃ab😃ab😃ab😃ab😃ab"; - SharedMemoryWriteStream write${"blobDataSizePipe", 65535, true}; - SharedMemoryReadStream read${"blobDataSizePipe", 65535, true}; + const SharedMemoryWriteStream write${"blobDataSizePipe", 65535, true}; + const SharedMemoryReadStream read${"blobDataSizePipe", 65535, true}; write$.write(blob); - std::string dataString = read$.readString(); + const std::string dataString = read$.readString(); EXPECT(blob == dataString); - std::cout << "5. std::string blob: SUCCESS" << std::endl; + log_test_message("std::string blob: SUCCESS"); }, - CASE("Can read flags, sets the right datatype and data change bit flips") { - + CASE("Can read flags, sets the right datatype and data change bit flips") + { SharedMemoryWriteStream write${"blobDataSizePipe2", 65535, true}; SharedMemoryReadStream read${"blobDataSizePipe2", 65535, true}; @@ -108,19 +127,21 @@ const lest::test specification[] = { char flagsData = read$.readFlags(); - EXPECT(read$.readLength(kMemoryTypeString) == 4); + EXPECT(read$.readLength(kMemoryTypeString) == 4UL); std::bitset<8> flags(flagsData); EXPECT(!!(flagsData & kMemoryTypeString)); - std::cout << "6. status flag shows string data type flag: SUCCESS: 0b" - << flags << std::endl; + std::ostringstream statusMsg; + statusMsg << "status flag shows string data type flag: SUCCESS: 0b" << flags; + log_test_message(statusMsg.str()); EXPECT(!!(flagsData & kMemoryChanged)); - std::cout << "6.1 status flag has the change bit set: SUCCESS: 0b" - << flags << std::endl; + std::ostringstream changeMsg; + changeMsg << "status flag has the change bit set: SUCCESS: 0b" << flags; + log_test_message(changeMsg.str()); write$.write("foo!"); @@ -135,55 +156,56 @@ const lest::test specification[] = { std::bitset<8> flags3(flagsData3); EXPECT(!!(flagsData3 & kMemoryChanged)); - std::cout - << "6.2 status bit flips to zero when writing again: SUCCESS: 0b" - << flags2 << std::endl; + std::ostringstream zeroMsg; + zeroMsg << "status bit flips to zero when writing again: SUCCESS: 0b" << flags2; + log_test_message(zeroMsg.str()); - std::cout - << "6.3 status bit flips to one when writing again: SUCCESS: 0b" - << flags3 << std::endl; + std::ostringstream oneMsg; + oneMsg << "status bit flips to one when writing again: SUCCESS: 0b" << flags3; + log_test_message(oneMsg.str()); write$.close(); read$.close(); }, - CASE("Can write and read a float* array") { - - float numbers[72] = { - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, - 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 6.14f, - }; - - SharedMemoryWriteStream write${"numberPipe", 65535, true}; - SharedMemoryReadStream read${"numberPipe", 65535, true}; - - write$.write(numbers, 72); - - EXPECT(read$.readLength(kMemoryTypeFloat) == 72); - - char flagsData = read$.readFlags(); - std::bitset<8> flags(flagsData); - - std::cout - << "Flags for float* read: 0b" - << flags << std::endl; - EXPECT(!!(flagsData & kMemoryTypeFloat)); - EXPECT(!!(flagsData & kMemoryChanged)); - - float* numbersReadPtr = read$.readFloatArray(); - - EXPECT(numbers[0] == numbersReadPtr[0]); - EXPECT(numbers[1] == numbersReadPtr[1]); - EXPECT(numbers[2] == numbersReadPtr[2]); - EXPECT(numbers[3] == numbersReadPtr[3]); - EXPECT(numbers[71] == numbersReadPtr[71]); - - std::cout << "7. float[72]: SUCCESS" << std::endl; - + CASE("Can write and read a float* array") + { + float numbers[72] = + { + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, + 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 3.14f, 1.3f, 3.4f, 6.14f, + }; + + SharedMemoryWriteStream write${"numberPipe", 65535, true}; + SharedMemoryReadStream read${"numberPipe", 65535, true}; + + write$.write(numbers, 72); + + EXPECT(read$.readLength(kMemoryTypeFloat) == 72UL); + + char flagsData = read$.readFlags(); + std::bitset<8> flags(flagsData); + + std::ostringstream floatFlagMsg; + floatFlagMsg << "Flags for float* read: 0b" << flags; + log_test_message(floatFlagMsg.str()); + EXPECT(!!(flagsData & kMemoryTypeFloat)); + EXPECT(!!(flagsData & kMemoryChanged)); + + float* numbersReadPtr = read$.readFloatArray(); + + EXPECT(numbers[0] == numbersReadPtr[0]); + EXPECT(numbers[1] == numbersReadPtr[1]); + EXPECT(numbers[2] == numbersReadPtr[2]); + EXPECT(numbers[3] == numbersReadPtr[3]); + EXPECT(numbers[71] == numbersReadPtr[71]); + + log_test_message("float[72]: SUCCESS"); + write$.write(numbers, 72); char flagsData2 = read$.readFlags(); @@ -197,68 +219,68 @@ const lest::test specification[] = { std::bitset<8> flags3(flagsData3); EXPECT(!!(flagsData3 & kMemoryChanged)); - std::cout - << "7.1 status bit flips to zero when writing again: SUCCESS: 0b" - << flags2 << std::endl; + std::ostringstream floatZeroMsg; + floatZeroMsg << "status bit flips to zero when writing again: SUCCESS: 0b" << flags2; + log_test_message(floatZeroMsg.str()); - std::cout - << "7.2 status bit flips to one when writing again: SUCCESS: 0b" - << flags3 << std::endl; + std::ostringstream floatOneMsg; + floatOneMsg << "status bit flips to one when writing again: SUCCESS: 0b" << flags3; + log_test_message(floatOneMsg.str()); - - delete[] numbersReadPtr; - write$.close(); - read$.close(); + delete[] numbersReadPtr; + write$.close(); + read$.close(); }, - CASE("Can write and read a double* array") { - - double numbers[72] = { - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, - 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, - }; - - SharedMemoryWriteStream write${"numberPipe", 65535, true}; - SharedMemoryReadStream read${"numberPipe", 65535, true}; - - write$.write(numbers, 72); - - EXPECT(read$.readLength(kMemoryTypeDouble) == 72); - - char flagsData = read$.readFlags(); - std::bitset<8> flags(flagsData); - - std::cout - << "Flags for double* read: 0b" - << flags << std::endl; - EXPECT(!!(flagsData & kMemoryTypeDouble)); - EXPECT(!!(flagsData & kMemoryChanged)); - - double* numbersReadPtr = read$.readDoubleArray(); - - EXPECT(numbers[0] == numbersReadPtr[0]); - EXPECT(numbers[1] == numbersReadPtr[1]); - EXPECT(numbers[2] == numbersReadPtr[2]); - EXPECT(numbers[3] == numbersReadPtr[3]); - EXPECT(numbers[71] == numbersReadPtr[71]); - - std::cout << "8. double[72]: SUCCESS" << std::endl; + CASE("Can write and read a double* array") + { + double numbers[72] = + { + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + 1.38038450934, 3.43723642783, 3.1438540345, 331.390696969, + 3.483045044, 6.14848338383, 7.3293840293, 8.4234234, + }; + + SharedMemoryWriteStream write${"numberPipe", 65535, true}; + SharedMemoryReadStream read${"numberPipe", 65535, true}; + + write$.write(numbers, 72); + + EXPECT(read$.readLength(kMemoryTypeDouble) == 72UL); + + char flagsData = read$.readFlags(); + std::bitset<8> flags(flagsData); + + std::ostringstream doubleFlagMsg; + doubleFlagMsg << "Flags for double* read: 0b" << flags; + log_test_message(doubleFlagMsg.str()); + EXPECT(!!(flagsData & kMemoryTypeDouble)); + EXPECT(!!(flagsData & kMemoryChanged)); + + double* numbersReadPtr = read$.readDoubleArray(); + + EXPECT(numbers[0] == numbersReadPtr[0]); + EXPECT(numbers[1] == numbersReadPtr[1]); + EXPECT(numbers[2] == numbersReadPtr[2]); + EXPECT(numbers[3] == numbersReadPtr[3]); + EXPECT(numbers[71] == numbersReadPtr[71]); + + log_test_message("double[72]: SUCCESS"); write$.write(numbers, 72); @@ -273,19 +295,434 @@ const lest::test specification[] = { std::bitset<8> flags3(flagsData3); EXPECT(!!(flagsData3 & kMemoryChanged)); - std::cout - << "8.1 status bit flips to zero when writing again: SUCCESS: 0b" - << flags2 << std::endl; + std::ostringstream doubleZeroMsg; + doubleZeroMsg << "status bit flips to zero when writing again: SUCCESS: 0b" << flags2; + log_test_message(doubleZeroMsg.str()); - std::cout - << "8.2 status bit flips to one when writing again: SUCCESS: 0b" - << flags3 << std::endl; - delete[] numbersReadPtr; - write$.close(); - read$.close(); + std::ostringstream doubleOneMsg; + doubleOneMsg << "status bit flips to one when writing again: SUCCESS: 0b" << flags3; + log_test_message(doubleOneMsg.str()); + delete[] numbersReadPtr; + write$.close(); + read$.close(); + }, + + CASE("Persistent shared memory can be reopened") + { + const std::string pipeName = "persistSegmentTest"; + { + Memory writer{pipeName, 128, true}; + EXPECT(Error::OK == writer.create()); + + auto *bytes = static_cast(writer.data()); + bytes[0] = 0xAB; + bytes[1] = 0xCD; + + writer.close(); + } + + Memory reader{pipeName, 128, true}; + EXPECT(Error::OK == reader.open()); + + auto *readBytes = static_cast(reader.data()); + EXPECT(0xAB == readBytes[0]); + EXPECT(0xCD == readBytes[1]); + + log_test_message("Persistent segment reopened with data intact: SUCCESS"); + + reader.close(); + reader.destroy(); }, + + CASE("Ephemeral shared memory is removed after destruction") + { + const std::string pipeName = "ephemeralSegmentTest"; + { + Memory ephemeral{pipeName, 128, false}; + EXPECT(Error::OK == ephemeral.create()); + + static_cast(ephemeral.data())[0] = 0x77; + ephemeral.close(); + } + + Memory reopen{pipeName, 128, false}; + EXPECT(Error::OpeningFailed == reopen.open()); + + log_test_message("Ephemeral segment removed after destruction: SUCCESS"); + }, + + CASE("Shared memory streams handle empty strings") + { + const std::string pipeName = "emptyStringPipe"; + + SharedMemoryWriteStream writer{pipeName, 64, true}; + SharedMemoryReadStream reader{pipeName, 64, true}; + + const std::string emptyValue; + writer.write(emptyValue); + + EXPECT(0UL == reader.readLength(kMemoryTypeString)); + EXPECT(emptyValue == reader.readString()); + + log_test_message("Empty string round-trip through shared memory: SUCCESS"); + + writer.close(); + reader.close(); + writer.destroy(); + }, + CASE("Multiple read, one write") + { + const std::string pipeName = "multiReadPipe"; + + SharedMemoryWriteStream writer{pipeName, 128, true}; + SharedMemoryReadStream reader1{pipeName, 128, true}; + + const std::string message = "Hello, readers!"; + writer.write(message); + + EXPECT(message == reader1.readString()); + EXPECT(message == reader1.readString()); + + log_test_message("Multiple readers read the same data: SUCCESS"); + + writer.close(); + reader1.close(); + writer.destroy(); + }, + + CASE("Writer sends messages, reader consumes them") + { + const std::string pipeName = "messagePipe"; + + SharedMemoryWriteStream writer{pipeName, 256, true}; + SharedMemoryReadStream reader{pipeName, 256, true}; + + // Initially there should be no data available + EXPECT(!reader.hasNewData()); + EXPECT(writer.isMessageRead()); + + // Writer publishes first message + const std::string msg1 = "First message"; + writer.write(msg1); + + // Message should not be marked as read yet + EXPECT(!writer.isMessageRead()); + + // Reader should detect new data available + EXPECT(reader.hasNewData()); + + // Reader consumes the message + std::string read1 = reader.readString(); + EXPECT(msg1 == read1); + reader.markAsRead(); + + // After consumption, no new data should be available + EXPECT(!reader.hasNewData()); + EXPECT(writer.isMessageRead()); + + // Reading again should return same content but not be marked as "new" + std::string read1Again = reader.readString(); + EXPECT(msg1 == read1Again); + EXPECT(!reader.hasNewData()); + + // Writer waits for previous message to be consumed before writing next + writer.waitForRead(); + + // Writer publishes second message + const std::string msg2 = "Second message"; + writer.write(msg2); + + EXPECT(!writer.isMessageRead()); + + // New data should be available again + EXPECT(reader.hasNewData()); + + // Reader consumes second message + std::string read2 = reader.readString(); + EXPECT(msg2 == read2); + reader.markAsRead(); + + // No new data after consumption + EXPECT(!reader.hasNewData()); + EXPECT(writer.isMessageRead()); + + // Writer waits before publishing third message + writer.waitForRead(); + + // Writer publishes third message + const std::string msg3 = "Third message 🚀"; + writer.write(msg3); + + EXPECT(!writer.isMessageRead()); + EXPECT(reader.hasNewData()); + + // Reader consumes third message + std::string read3 = reader.readString(); + EXPECT(msg3 == read3); + reader.markAsRead(); + + EXPECT(!reader.hasNewData()); + EXPECT(writer.isMessageRead()); + + log_test_message("Writer sends messages, reader consumes them: SUCCESS"); + + writer.close(); + reader.close(); + writer.destroy(); + }, + + CASE("SharedMemoryQueue: Writer enqueues multiple messages, reader dequeues in FIFO order") + { + const std::string queueName = "testQueue"; + const std::uint32_t capacity = 10; + const std::uint32_t maxMessageSize = 256; + + SharedMemoryQueue writer{queueName, capacity, maxMessageSize, true, true}; + SharedMemoryQueue reader{queueName, capacity, maxMessageSize, true, false}; + + // Initially queue should be empty + EXPECT(writer.isEmpty()); + EXPECT(reader.isEmpty()); + EXPECT(writer.size() == 0); + EXPECT(reader.capacity() == capacity); + + // Writer enqueues multiple messages + EXPECT(writer.enqueue("First message")); + EXPECT(writer.enqueue("Second message")); + EXPECT(writer.enqueue("Third message")); + EXPECT(writer.enqueue("Fourth message")); + EXPECT(writer.enqueue("Fifth message")); + + // Check queue state + EXPECT(!writer.isEmpty()); + EXPECT(writer.size() == 5); + EXPECT(!writer.isFull()); + + // Reader sees the same state + EXPECT(!reader.isEmpty()); + EXPECT(reader.size() == 5); + + // Reader dequeues messages in FIFO order + std::string msg; + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "First message"); + EXPECT(reader.size() == 4); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Second message"); + EXPECT(reader.size() == 3); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Third message"); + EXPECT(reader.size() == 2); + + // Writer continues to enqueue while reader dequeues + EXPECT(writer.enqueue("Sixth message")); + EXPECT(writer.enqueue("Seventh message")); + EXPECT(writer.size() == 4); + + // Reader dequeues old messages first + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Fourth message"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Fifth message"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Sixth message"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Seventh message"); + + // Queue should be empty now + EXPECT(reader.isEmpty()); + EXPECT(writer.isEmpty()); + + // Dequeuing from empty queue should return false + EXPECT(!reader.dequeue(msg)); + + log_test_message("SharedMemoryQueue: FIFO message queue: SUCCESS"); + + writer.close(); + reader.close(); + writer.destroy(); + }, + + CASE("SharedMemoryQueue: Queue full behavior") + { + const std::string queueName = "fullQueue"; + const std::uint32_t capacity = 3; + const std::uint32_t maxMessageSize = 64; + + SharedMemoryQueue writer{queueName, capacity, maxMessageSize, true, true}; + SharedMemoryQueue reader{queueName, capacity, maxMessageSize, true, false}; + + // Fill the queue + EXPECT(writer.enqueue("Message 1")); + EXPECT(writer.enqueue("Message 2")); + EXPECT(writer.enqueue("Message 3")); + + EXPECT(writer.isFull()); + EXPECT(writer.size() == capacity); + + // Attempt to enqueue when full should fail + EXPECT(!writer.enqueue("Message 4")); + + // Dequeue one message + std::string msg; + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Message 1"); + + // Now we can enqueue again + EXPECT(!writer.isFull()); + EXPECT(writer.enqueue("Message 4")); + + // Dequeue remaining messages + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Message 2"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Message 3"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Message 4"); + + EXPECT(reader.isEmpty()); + + log_test_message("SharedMemoryQueue: Queue full behavior: SUCCESS"); + + writer.close(); + reader.close(); + writer.destroy(); + }, + + CASE("SharedMemoryQueue: Peek without dequeuing") + { + const std::string queueName = "peekQueue"; + const std::uint32_t capacity = 5; + const std::uint32_t maxMessageSize = 128; + + SharedMemoryQueue writer{queueName, capacity, maxMessageSize, true, true}; + SharedMemoryQueue reader{queueName, capacity, maxMessageSize, true, false}; + + // Enqueue messages + EXPECT(writer.enqueue("First")); + EXPECT(writer.enqueue("Second")); + EXPECT(writer.enqueue("Third")); + + std::string msg; + + // Peek at first message multiple times + EXPECT(reader.peek(msg)); + EXPECT(msg == "First"); + EXPECT(reader.size() == 3); // Size unchanged + + EXPECT(reader.peek(msg)); + EXPECT(msg == "First"); + EXPECT(reader.size() == 3); + + // Dequeue first message + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "First"); + EXPECT(reader.size() == 2); + + // Peek at second message + EXPECT(reader.peek(msg)); + EXPECT(msg == "Second"); + + // Dequeue all remaining + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Second"); + + EXPECT(reader.dequeue(msg)); + EXPECT(msg == "Third"); + + // Peek on empty queue should fail + EXPECT(!reader.peek(msg)); + EXPECT(reader.isEmpty()); + + log_test_message("SharedMemoryQueue: Peek functionality: SUCCESS"); + + writer.close(); + reader.close(); + writer.destroy(); + }, + + CASE("SharedMemoryQueue: Multithread producer-consumer") + { + const std::string queueName = "mtQueue1"; + constexpr std::uint32_t capacity = 20; + constexpr std::uint32_t maxMessageSize = 128; + + SharedMemoryQueue writer{queueName, capacity, maxMessageSize, true, true}; + + constexpr int numMessages = 100; + std::atomic messagesProduced{0}; + std::atomic messagesConsumed{0}; + std::atomic producerDone{false}; + + // Producer thread + std::thread producer([&]() + { + for (int i = 0; i < numMessages; ++i) + { + std::string msg = "Message " + std::to_string(i); + while (!writer.enqueue(msg)) + { + // Queue full, wait a bit + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + ++messagesProduced; + } + producerDone = true; + }); + + // Consumer thread + std::thread consumer([&]() + { + SharedMemoryQueue reader{queueName, capacity, maxMessageSize, true, false}; + + while (messagesConsumed < numMessages) + { + if (std::string msg; reader.dequeue(msg)) + { + // Verify message format (with single producer, FIFO order should be maintained) + EXPECT(msg.find("Message ") == 0); + ++messagesConsumed; + } + else + { + // Queue empty, wait a bit + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + } + + reader.close(); + }); + + producer.join(); + consumer.join(); + + EXPECT(messagesProduced == numMessages); + EXPECT(messagesConsumed == numMessages); + EXPECT(writer.isEmpty()); + + log_test_message("SharedMemoryQueue: Multithread producer-consumer: SUCCESS"); + + writer.close(); + writer.destroy(); + } + + // NOTE: Multiple concurrent producers accessing the same SharedMemoryQueue instance + // requires additional synchronization (mutex or atomic operations) to prevent race + // conditions. The current implementation works well for: + // - Single producer, single consumer + // - Single producer, multiple consumers (read-only operations) + // Future work: Add atomic operations for truly lock-free multiple producer support }; -int main (int argc, char *argv[]) { - return lest::run(specification, argc, argv); +int main (const int argc, char *argv[]) +{ + return lest::run(specification, argc, argv); }