COR-6010: implement headset service#70
COR-6010: implement headset service#70tungntEmotiv wants to merge 1 commit intounity-plugin-with-api-v5from
Conversation
|
The issue is ready for review, and the below acceptance criteria have been met:
Code Reviewer could not determine whether the following acceptance criteria have been met:
|
| var values = new Dictionary<string, float>(StringComparer.OrdinalIgnoreCase); | ||
| if (headers != null) | ||
| { | ||
| for (var i = 0; i < headers.Count && (i + 1) < e.Data.Count; i++) |
There was a problem hiding this comment.
🔥 Code Bugs
The index mapping is incorrect: when IsNonChannelHeader returns true and continue is executed, the data index (i + 1) doesn't align with the header index, causing channel values to be mismatched.
Details
📖 Explanation: The loop iterates through headers with index i, and accesses e.Data[i + 1]. However, when a non-channel header is skipped via continue, the next iteration increments i but still uses i+1 for data access, causing misalignment. The data index should track independently or the logic should account for skipped headers differently.
There was a problem hiding this comment.
Pull request overview
This PR implements a new IHeadsetService and HeadsetService class as part of an ongoing migration from the legacy EmotivUnityPlugin to a new SDK layer (Emotiv.Cortex.Service). It provides async APIs for scanning headsets, connecting/disconnecting, creating sessions, subscribing to data streams, and reading the latest data samples. The old HeadsetFinder class is deleted and its responsibilities are absorbed into the new service.
Changes:
- New
IHeadsetServiceinterface andHeadsetServiceimplementation with async APIs (ScanHeadsetAsync,ConnectHeadsetAsync,DisconnectHeadsetAsync,TakeLatestSample). - New supporting models:
SessionInfo,DataSamples(CQDataSample,MentalCommandDataSample), andHeadsetNotFounderror code. - Removal of
HeadsetFinderand stripping of headset-query/connect logic fromDataStreamManager,DataStreamProcess, andEmotivUnityItf;CortexClient.CreateSessionnow readsCurrentCortexTokeninternally.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
Src/Runtime/SDK/Headset/IHeadsetService.cs |
New interface defining the headset service API |
Src/Runtime/SDK/Headset/HeadsetService.cs |
New implementation of the headset service |
Src/Runtime/SDK/IEmotivCortexRuntime.cs |
Adds IHeadsetService Headset property to runtime interface |
Src/Runtime/SDK/EmotivCortexRuntime.cs |
Wires Headset property via HeadsetService.Instance |
Src/Runtime/Models/SessionInfo.cs |
New model wrapping session data for the new SDK |
Src/Runtime/Models/DataSamples.cs |
New models for CQ and MentalCommand data samples |
Src/Runtime/Models/CortexErrorCode.cs |
Adds HeadsetNotFound error code |
Src/CortexClient.cs |
Removes cortex token from CreateSession signature; adds RefreshHeadsetResult and QueryHeadsetResult events |
Src/SessionHandler.cs |
Removes cortexToken parameter from Create method |
Src/DataStreamProcess.cs |
Removes headset finder references and cortex token from CreateSession call |
Src/DataStreamManager.cs |
Removes headset-related logic, event handlers, and empties StartDataStream body |
Src/EmotivUnityItf.cs |
Comments out QueryHeadsets body |
Src/HeadsetFinder.cs |
Deleted legacy class |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (streamName == DataStreamName.DevInfos && header.Count > 0) | ||
| { | ||
| var devCols = new List<string>(); | ||
| devCols.Add(header[0].ToString()); | ||
| devCols.Add(header[1].ToString()); |
There was a problem hiding this comment.
In NormalizeHeaders, when streamName == DataStreamName.DevInfos and header.Count > 0, the code unconditionally accesses header[1] at line 514. However, the guard only checks that header.Count > 0, so if the response only contains one element, this will throw an ArgumentOutOfRangeException at runtime. The guard should be header.Count >= 2 to safely access both header[0] and header[1].
| => _auth ??= new AuthService(_context, _client); | ||
|
|
||
| public IHeadsetService Headset | ||
| => _headset ??= HeadsetService.Instance; |
There was a problem hiding this comment.
EmotivCortexRuntime.Headset returns HeadsetService.Instance, the global singleton, rather than constructing a new HeadsetService with the _client that is already injected/managed by this runtime instance. This means all EmotivCortexRuntime instances share the exact same headset service regardless of their individual _client connection.
Compare with Auth, which correctly constructs new AuthService(_context, _client) using the runtime's own context and client. Headset should do the same by constructing new HeadsetService(_client) (using the internal constructor), so the service properly uses the runtime's specific CortexClient instance.
| => _headset ??= HeadsetService.Instance; | |
| => _headset ??= new HeadsetService(_client); |
There was a problem hiding this comment.
@tungntEmotiv only resolve the comment after you fix it.
| private async Task<(CortexErrorCode Code, SessionInfo Data)> CreateSessionAsync(string headsetId) | ||
| { | ||
| var tcs = new TaskCompletionSource<(CortexErrorCode Code, SessionInfo Data)>(); | ||
| EventHandler<SessionEventArgs> okHandler = null; | ||
| EventHandler<ErrorMsgEventArgs> errorHandler = null; | ||
|
|
||
| okHandler = (sender, sessionInfo) => | ||
| { | ||
| if (!string.Equals(sessionInfo.HeadsetId, headsetId, StringComparison.Ordinal)) | ||
| { | ||
| return; | ||
| } | ||
| Cleanup(); | ||
| tcs.TrySetResult((CortexErrorCode.OK, SessionInfo.FromSessionEventArgs(sessionInfo))); | ||
| }; | ||
|
|
||
| errorHandler = (sender, error) => | ||
| { | ||
| if (error.MethodName != "createSession") | ||
| { | ||
| return; | ||
| } | ||
| Cleanup(); | ||
| tcs.TrySetResult((CortexErrorCode.UnknownError, null)); | ||
| }; | ||
|
|
||
| void Cleanup() | ||
| { | ||
| _client.CreateSessionOK -= okHandler; | ||
| _client.ErrorMsgReceived -= errorHandler; | ||
| } | ||
|
|
||
| _client.CreateSessionOK += okHandler; | ||
| _client.ErrorMsgReceived += errorHandler; | ||
| _client.CreateSession(headsetId); | ||
|
|
||
| return await tcs.Task; | ||
| } |
There was a problem hiding this comment.
None of the async operations in HeadsetService (connect, create session, subscribe, disconnect, refresh, query) have any timeout or CancellationToken support. All operations await a TaskCompletionSource<T> that will never complete if the Cortex server doesn't respond (e.g., connection drop, firmware hang). This means callers can wait indefinitely. The interface IHeadsetService does not accept CancellationToken parameters either.
Consider adding CancellationToken overloads and/or timeout logic (e.g., tcs.Task.WaitAsync(TimeSpan)) for all network-bound operations.
| if (error.MethodName != "controlDevice") | ||
| { | ||
| return; | ||
| } |
There was a problem hiding this comment.
The controlDevice error handler in ConnectHeadsetAsync matches only on error.MethodName != "controlDevice", without checking the headset ID. If a "controlDevice" error arrives for a different headset than headsetId, the handler will still complete the current task with an error, even though the error wasn't related to this connect attempt. A check should be added to verify that the error is for the specific headset being connected.
| } | |
| } | |
| // Ensure this error is for the headset being connected. | |
| if (!string.Equals(error.HeadsetId, headsetId, StringComparison.Ordinal)) | |
| { | |
| return; | |
| } |
| public static HeadsetService Instance { get; } = new HeadsetService(); | ||
|
|
||
| private readonly CortexClient _client; | ||
| private readonly List<Headset> _headsets = new List<Headset>(); |
There was a problem hiding this comment.
The _headsets list is read without synchronization in ConnectHeadsetAsync (line 155, _headsets.FirstOrDefault(...)) and in GetHeadsets (line 52, new List<Headset>(_headsets)), while it is written (cleared and refilled) in the queryResultHandler callback inside RefreshAndQueryAsync. That callback runs on a CortexClient event thread, which is separate from the caller's async context.
Concurrent reads and writes on a non-thread-safe List<T> can cause exceptions or corrupted data. A lock (e.g., the existing _sampleLock, or a dedicated headset lock) should be used for all accesses to _headsets.
| public void QueryHeadsets(string headsetId = "") { | ||
| _dsManager.QueryHeadsets(headsetId); | ||
| // _dsManager.QueryHeadsets(headsetId); | ||
| } |
There was a problem hiding this comment.
The body of QueryHeadsets has been replaced with a comment (// _dsManager.QueryHeadsets(headsetId);), making the public method a silent no-op. Any existing callers (e.g., BCIGameItf.QueryHeadsets which calls emotivUnityItf.QueryHeadsets(...)) will silently receive no results. If this method is intentionally deprecated in favour of the new IHeadsetService.ScanHeadsetAsync, this should be documented and callers should be notified or updated. If it is not intentionally deprecated, the implementation should be restored or replaced with the correct call.
There was a problem hiding this comment.
the file will be not used in future when complete the change API of new unity-plugi. So i will revert the change
| { | ||
| if (error.MethodName != "controlDevice") | ||
| { | ||
| return; | ||
| } |
There was a problem hiding this comment.
The errorHandler inside DisconnectHeadsetAsync only checks error.MethodName != "controlDevice" and then resolves the task with a generic CortexErrorCode.UnknownError. However, the same "controlDevice" method name is shared by both connect and disconnect operations. If a connect error fires while a disconnect is awaiting, this handler will erroneously complete the disconnect task with an error, because there is no check confirming this error relates to a disconnect (e.g., checking the command or headset ID). The same issue exists in ConnectHeadsetAsync at lines 247-254 — it filters on method name only, not headset ID or command type.
| { | |
| if (error.MethodName != "controlDevice") | |
| { | |
| return; | |
| } | |
| { | |
| // Only handle errors for the specific disconnect operation we initiated. | |
| if (error.MethodName != "controlDevice") | |
| { | |
| return; | |
| } | |
| // Attempt to inspect error parameters to ensure this error is for | |
| // the "disconnect" command on the expected headsetId. | |
| try | |
| { | |
| // Many Cortex/Emotiv error payloads are JSON; use JObject for safe access. | |
| var errorParams = error.Params as JObject; | |
| var command = errorParams?["command"]?.ToString(); | |
| var errorHeadsetId = errorParams?["headset"]?.ToString(); | |
| if (!string.Equals(command, "disconnect", StringComparison.OrdinalIgnoreCase)) | |
| { | |
| // This error is for a different controlDevice command (e.g., "connect"). | |
| return; | |
| } | |
| if (!string.Equals(errorHeadsetId, headsetId, StringComparison.Ordinal)) | |
| { | |
| // This error is for a different headset; ignore it. | |
| return; | |
| } | |
| } | |
| catch | |
| { | |
| // If we cannot safely determine that the error belongs to this | |
| // disconnect operation, do not complete this task. | |
| return; | |
| } |
| private bool _refreshAndQueryInProgress; | ||
| private bool _sessionCreated; |
There was a problem hiding this comment.
The _refreshAndQueryInProgress boolean flag and the _sessionCreated flag are accessed from multiple threads without synchronization. OnHeadsetScanFinished is triggered by a CortexClient event (potentially on a network/IO thread), which reads _sessionCreated and calls RefreshAndQueryAsync() which reads and writes _refreshAndQueryInProgress. Meanwhile, ConnectHeadsetAsync writes _sessionCreated on the caller's async thread. Neither field is declared volatile nor protected by a lock.
This creates a race condition where:
- Two concurrent callers could both pass the
_refreshAndQueryInProgresscheck and start a double refresh. - Writes to
_sessionCreatedmay not be visible toOnHeadsetScanFinishedrunning on another thread.
Both fields should be declared volatile, or their accesses should be guarded with a lock or Interlocked operations.
| private bool _refreshAndQueryInProgress; | |
| private bool _sessionCreated; | |
| private volatile bool _refreshAndQueryInProgress; | |
| private volatile bool _sessionCreated; |
| var subscribeCode = await SubscribeAsync(sessionResult.Data.SessionId, streams); | ||
| if (subscribeCode != CortexErrorCode.OK) | ||
| { | ||
| return CortexResult<SessionInfo>.Success(sessionResult.Data); |
There was a problem hiding this comment.
When stream subscription fails (i.e., subscribeCode != CortexErrorCode.OK), the method returns CortexResult<SessionInfo>.Success(sessionResult.Data). This silently returns a success result even though the caller explicitly requested data streams that were not subscribed. The caller has no way to distinguish "connected with all streams subscribed" from "connected but streams failed".
This either needs to return a failure result when subscription fails, or the API contract (documented in the interface) should be updated to clearly state that success is returned regardless of subscription outcome. The current behavior is misleading.
| return CortexResult<SessionInfo>.Success(sessionResult.Data); | |
| return CortexResult<SessionInfo>.Fail(CortexErrorMapper.FromErrorCode(subscribeCode)); |
|
|
||
| public HeadsetService() | ||
| { | ||
| _client = CortexClient.Instance; |
There was a problem hiding this comment.
This should be set from out side.
There was a problem hiding this comment.
Oh, I see we already had HeadsetService(CortexClient client), so we should remove this constructor.
|
|
||
| public DataSampleType Type => DataSampleType.CQ; | ||
| public double Timestamp { get; } | ||
| public IReadOnlyDictionary<string, float> Values { get; } |
There was a problem hiding this comment.
Can you please write comment to explain the Values here, so that developers can understand how to read it? Or do we have another way to make Values easier to understand?
| } | ||
|
|
||
| public DataSampleType Type => DataSampleType.CQ; | ||
| public double Timestamp { get; } |
There was a problem hiding this comment.
Do we need to repeat this declaration in child class?
|
|
||
| public sealed class SessionInfo | ||
| { | ||
| public string SessionId { get; private set; } |
There was a problem hiding this comment.
we will not have SessionId in API v5, can you please move it to some internal place?
| public sealed class SessionInfo | ||
| { | ||
| public string SessionId { get; private set; } | ||
| public SessionStatus Status { get; private set; } |
There was a problem hiding this comment.
Same here.
Btw, why do game developers need to know SessionStatus?
| { | ||
| public string SessionId { get; private set; } | ||
| public SessionStatus Status { get; private set; } | ||
| public string ApplicationId { get; private set; } |
| { | ||
| public enum DataSampleType | ||
| { | ||
| CQ, |
There was a problem hiding this comment.
Please comment to explain
| return new List<Headset>(_headsets); | ||
| } | ||
|
|
||
| public bool TakeLatestSample(string stream, out IDataSample sample) |
There was a problem hiding this comment.
please use DataSampleType instead of string?
The PR to implement headset service APIs.
Please help to review. thanks
Rovo Dev code review: Rovo Dev has reviewed this pull request
Any suggestions or improvements have been posted as pull request comments.