From fae0a64b50cc1ef40ec375e421d9efe380068110 Mon Sep 17 00:00:00 2001 From: tung_tung Date: Fri, 6 Feb 2026 18:19:56 +0700 Subject: [PATCH 1/2] COR-6010: implement headset service --- Src/CortexClient.cs | 28 +- Src/DataStreamManager.cs | 136 +---- Src/DataStreamProcess.cs | 34 +- Src/EmotivUnityItf.cs | 2 +- Src/HeadsetFinder.cs | 89 ---- Src/Runtime/Models/CortexErrorCode.cs | 1 + Src/Runtime/Models/DataSamples.cs | 45 ++ Src/Runtime/Models/SessionInfo.cs | 43 ++ Src/Runtime/SDK/EmotivCortexRuntime.cs | 4 + Src/Runtime/SDK/Headset/HeadsetService.cs | 580 +++++++++++++++++++++ Src/Runtime/SDK/Headset/IHeadsetService.cs | 52 ++ Src/Runtime/SDK/IEmotivCortexRuntime.cs | 1 + Src/SessionHandler.cs | 7 +- 13 files changed, 756 insertions(+), 266 deletions(-) delete mode 100644 Src/HeadsetFinder.cs create mode 100644 Src/Runtime/Models/DataSamples.cs create mode 100644 Src/Runtime/Models/SessionInfo.cs create mode 100644 Src/Runtime/SDK/Headset/HeadsetService.cs create mode 100644 Src/Runtime/SDK/Headset/IHeadsetService.cs diff --git a/Src/CortexClient.cs b/Src/CortexClient.cs index 1855dc8..d39f5fe 100644 --- a/Src/CortexClient.cs +++ b/Src/CortexClient.cs @@ -69,6 +69,8 @@ public abstract class CortexClient public event EventHandler UserLogoutNotify; public event EventHandler GetLicenseInfoDone; public event EventHandler<(CortexErrorCode error, License data)> GetLicenseInfoResult; + public event EventHandler<(CortexErrorCode error, string message)> RefreshHeadsetResult; + public event EventHandler<(CortexErrorCode error, List data)> QueryHeadsetResult; public event EventHandler CreateSessionOK; public event EventHandler UpdateSessionOK; public event EventHandler SubscribeDataDone; @@ -194,6 +196,14 @@ public void OnMessageReceived(string receievedMsg) { GetLicenseInfoResult?.Invoke(this, (CortexErrorCode.UnknownError, null)); } + else if (method == "queryHeadsets") + { + QueryHeadsetResult?.Invoke(this, (CortexErrorCode.UnknownError, null)); + } + else if (method == "controlDevice") + { + RefreshHeadsetResult?.Invoke(this, (CortexErrorCode.UnknownError, messageError)); + } } else { // handle response @@ -278,7 +288,8 @@ private void HandleResponse(string method, JToken data) foreach (JObject item in data) { headsetLists.Add(new Headset(item)); } - QueryHeadsetOK(this, headsetLists); + // QueryHeadsetOK(this, headsetLists); + QueryHeadsetResult?.Invoke(this, (CortexErrorCode.OK, headsetLists)); } else if (method == "controlDevice") { @@ -287,6 +298,10 @@ private void HandleResponse(string method, JToken data) { HeadsetDisConnectedOK(this, true); } + else if (command == "refresh") + { + RefreshHeadsetResult?.Invoke(this, (CortexErrorCode.OK, command)); + } } else if (method == "getUserLogin") { @@ -778,14 +793,19 @@ public void ControlDevice(string command, string headsetId, JObject mappings) } // CreateSession - // Required params: cortexToken, status - public void CreateSession(string cortexToken, string headsetId, string status) + // Required params: status + public void CreateSession(string headsetId, string status = "active") { JObject param = new JObject(); if (!String.IsNullOrEmpty(headsetId)) { param.Add("headset", headsetId); } - param.Add("cortexToken", cortexToken); + if (string.IsNullOrEmpty(CurrentCortexToken)) + { + UnityEngine.Debug.LogWarning("CreateSession requested but no cortex token is available."); + return; + } + param.Add("cortexToken", CurrentCortexToken); param.Add("status", status); SendTextMessage(param, "createSession", true); } diff --git a/Src/DataStreamManager.cs b/Src/DataStreamManager.cs index 35bfd6e..e88b2d9 100644 --- a/Src/DataStreamManager.cs +++ b/Src/DataStreamManager.cs @@ -112,12 +112,10 @@ private void Init() { _dsProcess.ProcessInit(); _dsProcess.SubscribedOK += OnSubscribedOK; - _dsProcess.HeadsetConnectNotify += OnHeadsetConnectNotify; _dsProcess.StreamStopNotify += OnStreamStopNotify; _dsProcess.LicenseValidTo += OnLicenseValidTo; _dsProcess.SessionActivedOK += OnSessionActivedOK; _dsProcess.CreateSessionFail += OnCreateSessionFail; - _dsProcess.QueryHeadsetOK += OnQueryHeadsetOK; _dsProcess.UserLogoutNotify += OnUserLogoutNotify; _dsProcess.SessionClosedNotify += OnSessionClosedNotify; _dsProcess.HeadsetScanFinished += OnHeadsetScanFinished; @@ -174,8 +172,6 @@ private void OnSessionClosedNotify(object sender, string sessionId) _wantedHeadsetId = ""; _connectHeadsetState = ConnectHeadsetStates.No_Connect; _detectedHeadsets.Clear(); - // start scanning headset again - _dsProcess.RefreshHeadset(); ResetDataBuffers(); } } @@ -198,34 +194,6 @@ private void OnUserLogoutNotify(object sender, string message) } } - private void OnQueryHeadsetOK(object sender, List headsets) - { - lock(_locker) - { - _detectedHeadsets.Clear(); - string strOut = ""; - foreach(var headset in headsets) { - _detectedHeadsets.Add(headset); - string headsetId = headset.HeadsetID; - if (_readyCreateSession && headsetId == _wantedHeadsetId && - (headset.Status == "connected")) { - UnityEngine.Debug.Log("The headset " + headsetId + " is connected. Start creating session."); - - _readyCreateSession = false; - // create session - _dsProcess.CreateSession(headsetId, true); - } - strOut += headset.HeadsetID + "-" + headset.HeadsetConnection + "-" + headset.Status + "; "; - } - UnityEngine.Debug.Log("DataStreamManager-OnQueryHeadsetOK: " + strOut); - if (string.IsNullOrEmpty(strOut)) - { - strOut = "No headset available at " + DateTime.Now.ToString("HH:mm:ss"); - } - MessageQueryHeadsetOK(this, strOut); - } - } - private void OnCreateSessionFail(object sender, string errorMsg) { lock (_locker) @@ -250,9 +218,6 @@ private void OnSessionActivedOK(object sender, SessionEventArgs sessionInfo) _connectHeadsetState = ConnectHeadsetStates.Session_Created; - // stop query headset if session is created - _dsProcess.StopQueryHeadset(); - // subscribe data _dsProcess.SubscribeData(); } @@ -265,10 +230,6 @@ private void OnSessionActivedOK(object sender, SessionEventArgs sessionInfo) private void OnLicenseValidTo(object sender, DateTime validTo) { - if (!_isSessActivated) { - // start scanning headset again - _dsProcess.RefreshHeadset(); - } LicenseValidTo(this, validTo); } @@ -341,33 +302,6 @@ private void OnStreamStopNotify(object sender, List streams) } } - private void OnHeadsetConnectNotify(object sender, HeadsetConnectEventArgs e) - { - lock (_locker) - { - string headsetId = e.HeadsetId; - UnityEngine.Debug.Log("OnHeadsetConnectNotify for headset " + headsetId + " while wantedHeadset : " + _wantedHeadsetId + "_readyCreateSession" + _readyCreateSession); - if (e.IsSuccess && _readyCreateSession && - (headsetId == _wantedHeadsetId)) { - UnityEngine.Debug.Log("Connect the headset " + headsetId + " successfully. Start creating session."); - _readyCreateSession = false; - // create session - _dsProcess.CreateSession(headsetId, true); - } - else if (!e.IsSuccess && headsetId == _wantedHeadsetId) { - UnityEngine.Debug.Log("Connect the headset " + headsetId + " unsuccessfully. Message : " + e.Message); - HeadsetConnectFail(this, headsetId); - _wantedHeadsetId = ""; // reset headsetId - _isSessActivated = false; - - _connectHeadsetState = ConnectHeadsetStates.Session_Failed; - } - else { - UnityEngine.Debug.Log("OnHeadsetConnectNotify: " + headsetId + ". Message : " + e.Message); - } - } - } - private void OnSubscribedOK(object sender, Dictionary e) { lock (_locker) @@ -559,11 +493,6 @@ private void CloseSession() private void OnHeadsetScanFinished(object sender, string message) { UnityEngine.Debug.Log(message); - if (!_isSessActivated) { - // start scanning headset again - UnityEngine.Debug.Log("Start scanning headset again."); - _dsProcess.RefreshHeadset(); - } } /// @@ -581,61 +510,7 @@ public void StartAuthorize(string licenseKey = "", object context = null) /// the id of headset you want to retrieve data. public void StartDataStream(List streamNameList, string headsetId) { - lock (_locker) - { - // if (!string.IsNullOrEmpty(_wantedHeadsetId)) { - // UnityEngine.Debug.Log("The data streams has already started for headset " - // + _wantedHeadsetId + ". Please wait..."); - // return; - // } - - if (string.IsNullOrEmpty(headsetId)) { - if (_detectedHeadsets.Count > 0) { - // get the first headset in the list - _wantedHeadsetId = _detectedHeadsets[0].HeadsetID; - } - else { - UnityEngine.Debug.LogError("No headset available."); - // query headset - // _dsProcess.QueryHeadsets(""); - return; - } - - } - else { - bool _foundHeadset = false; - foreach (var item in _detectedHeadsets) { - if (item.HeadsetID == headsetId){ - _foundHeadset = true; - } - } - if (_foundHeadset) { - _wantedHeadsetId = headsetId; - } - else { - UnityEngine.Debug.LogError("The headset " + headsetId + " is not found. Please check the headset Id."); - return; - } - } - - UnityEngine.Debug.Log("DataStreamManager-StartDataStream: " + _wantedHeadsetId); - - _connectHeadsetState = ConnectHeadsetStates.Headset_Connecting; - - foreach(var curStream in streamNameList) { - _dsProcess.AddStreams(curStream); - } - // check headset connected - if (!isConnectedHeadset(_wantedHeadsetId)) { - _readyCreateSession = true; - _dsProcess.StartConnectToDevice(_wantedHeadsetId); - } - else if (!_isSessActivated) { - UnityEngine.Debug.Log("The headset " + _wantedHeadsetId + " has already connected. Start creating session."); - _readyCreateSession = false; - _dsProcess.CreateSession(_wantedHeadsetId, true); - } - } + } /// @@ -990,13 +865,6 @@ public double GetPMData(string label) return _pmBuff.GetData(label); } - /// - /// Query headsets. - /// - public void QueryHeadsets(string headsetId = "") { - _dsProcess.QueryHeadsets(headsetId); - } - /// /// Get detected headsets. /// @@ -1013,8 +881,6 @@ public List GetDetectedHeadsets() { public void Stop() { // close data stream CloseSession(); - // stop query headset - _dsProcess.StopQueryHeadset(); _dsProcess.CloseCortexClient(); } diff --git a/Src/DataStreamProcess.cs b/Src/DataStreamProcess.cs index 6a37065..82f85f2 100644 --- a/Src/DataStreamProcess.cs +++ b/Src/DataStreamProcess.cs @@ -14,7 +14,6 @@ public class DataStreamProcess static readonly object _locker = new object(); private CortexClient _ctxClient = CortexClient.Instance; private List _streams; - private HeadsetFinder _headsetFinder = HeadsetFinder.Instance; private Authorizer _authorizer = Authorizer.Instance; private SessionHandler _sessionHandler = SessionHandler.Instance; @@ -50,12 +49,6 @@ public event EventHandler SessionActivedOK remove { _sessionHandler.SessionActived -= value; } } public event EventHandler CreateSessionFail; - - public event EventHandler> QueryHeadsetOK - { - add { _headsetFinder.QueryHeadsetOK += value; } - remove { _headsetFinder.QueryHeadsetOK -= value; } - } public event EventHandler UserLogoutNotify; // inform license valid to date // For test @@ -117,7 +110,6 @@ public void ProcessInit() private void OnUserLogoutNotify(object sender, string message) { // UnityEngine.Debug.Log("OnUserLogoutNotify: " + message); - StopQueryHeadset(); // Clear session data Clear(); UserLogoutNotify(this, message); @@ -351,9 +343,8 @@ public void CreateSession(string headsetId, bool isActiveSession) // Wait a moment before creating session System.Threading.Thread.Sleep(1000); // CreateSession - string cortexToken = _authorizer.CortexToken; UnityEngine.Debug.Log("Create Session with headset " + headsetId); - _sessionHandler.Create(cortexToken, headsetId, isActiveSession); + _sessionHandler.Create(headsetId, isActiveSession); } /// @@ -424,22 +415,6 @@ public void CloseSession() _sessionHandler.CloseSession(cortexToken); } - /// - /// Start query headsets to get headsets information. - /// - public void QueryHeadsets(string headsetId = "") - { - _ctxClient.QueryHeadsets(headsetId); - } - - /// - /// Stop query headsets. - /// - public void StopQueryHeadset() - { - _headsetFinder.StopQueryHeadset(); - } - /// /// Force close websocket client. /// @@ -448,13 +423,6 @@ public void CloseCortexClient() _ctxClient.Close(); } - /// - /// Refresh headset to trigger scan btle devices from Cortex - /// - public void RefreshHeadset() { - _headsetFinder.RefreshHeadset(); - } - // log out public void Logout() { _authorizer.Logout(); diff --git a/Src/EmotivUnityItf.cs b/Src/EmotivUnityItf.cs index 6771f9c..a758589 100644 --- a/Src/EmotivUnityItf.cs +++ b/Src/EmotivUnityItf.cs @@ -312,7 +312,7 @@ public void AcceptEulaAndPrivacyPolicy() /// The headset id of specific headset if want get headset information of a specific headset. /// If use empty string it will query all headsets public void QueryHeadsets(string headsetId = "") { - _dsManager.QueryHeadsets(headsetId); + // _dsManager.QueryHeadsets(headsetId); } /// diff --git a/Src/HeadsetFinder.cs b/Src/HeadsetFinder.cs deleted file mode 100644 index 30dced9..0000000 --- a/Src/HeadsetFinder.cs +++ /dev/null @@ -1,89 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using Newtonsoft.Json.Linq; -using System.Timers; -using UnityEngine; - -namespace EmotivUnityPlugin -{ - /// - /// Reponsible for finding headsets. - /// - public class HeadsetFinder - { - private CortexClient _ctxClient = CortexClient.Instance; - - /// - /// Timer for querying headsets - /// - private Timer _aTimer = null; - - // Event - public event EventHandler HeadsetDisConnectedOK; - public event EventHandler> QueryHeadsetOK; - - public HeadsetFinder() - { - _ctxClient = CortexClient.Instance; - _ctxClient.QueryHeadsetOK += OnQueryHeadsetReceived; - _ctxClient.HeadsetDisConnectedOK += OnHeadsetDisconnectedOK; - } - - public static HeadsetFinder Instance { get; } = new HeadsetFinder(); - - private void OnHeadsetDisconnectedOK(object sender, bool e) - { - HeadsetDisConnectedOK(this, true); - } - - private void OnQueryHeadsetReceived(object sender, List headsets) - { - QueryHeadsetOK(this, headsets); - } - - /// - /// Init headset finder - /// - public void FinderInit() - { - SetQueryHeadsetTimer(); - } - - public void StopQueryHeadset() { - if (_aTimer != null && _aTimer.Enabled) { - UnityEngine.Debug.Log("Stop query headset"); - _aTimer.Stop(); - } - } - public void RefreshHeadset() { - _ctxClient.ControlDevice("refresh", "", null); - } - - /// - /// Setup query headset timer - /// - private void SetQueryHeadsetTimer() - { - if (_aTimer != null) { - _aTimer.Enabled = true; - return; - } - - _aTimer = new Timer(Config.QUERY_HEADSET_TIME); - - // Hook up the Elapsed event for the timer. - _aTimer.Elapsed += OnTimedEvent; - _aTimer.AutoReset = true; - _aTimer.Enabled = true; - } - - /// - /// Handle timeout. Retry query headsets. - /// - private void OnTimedEvent(object sender, ElapsedEventArgs e) - { - _ctxClient.QueryHeadsets(""); - } - } -} diff --git a/Src/Runtime/Models/CortexErrorCode.cs b/Src/Runtime/Models/CortexErrorCode.cs index 9ce8022..317bbbb 100644 --- a/Src/Runtime/Models/CortexErrorCode.cs +++ b/Src/Runtime/Models/CortexErrorCode.cs @@ -6,6 +6,7 @@ public enum CortexErrorCode NoUserLogin, // Indicates that there is no user logged in. AuthorizationFailed, // Indicates that the authorization process failed overall. LicenseError, // Indicates that there is an issue with the license (e.g., invalid or expired license). + HeadsetNotFound, // Indicates that the specified headset could not be found. UnknownError } } diff --git a/Src/Runtime/Models/DataSamples.cs b/Src/Runtime/Models/DataSamples.cs new file mode 100644 index 0000000..52e255e --- /dev/null +++ b/Src/Runtime/Models/DataSamples.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; + +namespace Emotiv.Cortex.Models +{ + public enum DataSampleType + { + CQ, + MentalCommand + } + + public interface IDataSample + { + DataSampleType Type { get; } + double Timestamp { get; } + } + + public class CQDataSample : IDataSample + { + public CQDataSample(double timestamp, IReadOnlyDictionary values) + { + Timestamp = timestamp; + Values = values ?? new Dictionary(); + } + + public DataSampleType Type => DataSampleType.CQ; + public double Timestamp { get; } + public IReadOnlyDictionary Values { get; } + } + + public class MentalCommandDataSample : IDataSample + { + public MentalCommandDataSample(double timestamp, string action, float power) + { + Timestamp = timestamp; + Action = action ?? string.Empty; + Power = power; + } + + public DataSampleType Type => DataSampleType.MentalCommand; + public double Timestamp { get; } + public string Action { get; } + public float Power { get; } + } +} diff --git a/Src/Runtime/Models/SessionInfo.cs b/Src/Runtime/Models/SessionInfo.cs new file mode 100644 index 0000000..9b8b5bd --- /dev/null +++ b/Src/Runtime/Models/SessionInfo.cs @@ -0,0 +1,43 @@ +using EmotivUnityPlugin; + +namespace Emotiv.Cortex.Models +{ + public enum SessionStatus + { + Opened = 0, + Activated = 1, + Closed = 2 + } + + public sealed class SessionInfo + { + public string SessionId { get; private set; } + public SessionStatus Status { get; private set; } + public string ApplicationId { get; private set; } + public string HeadsetId { get; private set; } + + public static SessionInfo FromSessionEventArgs(SessionEventArgs sessionInfo) + { + return new SessionInfo + { + SessionId = sessionInfo.SessionId, + Status = MapStatus(sessionInfo.Status), + ApplicationId = sessionInfo.ApplicationId, + HeadsetId = sessionInfo.HeadsetId + }; + } + + private static SessionStatus MapStatus(EmotivUnityPlugin.SessionStatus status) + { + switch (status) + { + case EmotivUnityPlugin.SessionStatus.Opened: + return SessionStatus.Opened; + case EmotivUnityPlugin.SessionStatus.Activated: + return SessionStatus.Activated; + default: + return SessionStatus.Closed; + } + } + } +} diff --git a/Src/Runtime/SDK/EmotivCortexRuntime.cs b/Src/Runtime/SDK/EmotivCortexRuntime.cs index e0a5f41..2c13fcf 100644 --- a/Src/Runtime/SDK/EmotivCortexRuntime.cs +++ b/Src/Runtime/SDK/EmotivCortexRuntime.cs @@ -10,6 +10,7 @@ public class EmotivCortexRuntime : IEmotivCortexRuntime private readonly CortexRuntimeContext _context; private CortexClient _client; private IAuthService _auth; + private IHeadsetService _headset; public EmotivCortexRuntime() { @@ -43,6 +44,9 @@ private void OnCortexConnectionStared(object sender, bool isConnected) } public IAuthService Auth => _auth ??= new AuthService(_context, _client); + + public IHeadsetService Headset + => _headset ??= HeadsetService.Instance; public void Dispose() { diff --git a/Src/Runtime/SDK/Headset/HeadsetService.cs b/Src/Runtime/SDK/Headset/HeadsetService.cs new file mode 100644 index 0000000..0f7b5de --- /dev/null +++ b/Src/Runtime/SDK/Headset/HeadsetService.cs @@ -0,0 +1,580 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Emotiv.Cortex.Models; +using EmotivUnityPlugin; +using Newtonsoft.Json.Linq; + +namespace Emotiv.Cortex.Service +{ + public class HeadsetService : IHeadsetService + { + public static HeadsetService Instance { get; } = new HeadsetService(); + + private readonly CortexClient _client; + private readonly List _headsets = new List(); + private bool _refreshAndQueryInProgress; + private bool _sessionCreated; + private readonly object _sampleLock = new object(); + private readonly Dictionary _latestSamples = new Dictionary(StringComparer.Ordinal); + private readonly Dictionary> _streamHeaders = new Dictionary>(StringComparer.Ordinal); + private bool _streamDataHooked; + + public HeadsetService() + { + _client = CortexClient.Instance; + _client.HeadsetScanFinished += OnHeadsetScanFinished; + HookStreamData(); + } + + internal HeadsetService(CortexClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _client.HeadsetScanFinished += OnHeadsetScanFinished; + HookStreamData(); + } + + public async Task ScanHeadsetAsync() + { + var queryCode = await RefreshAndQueryAsync(); + if (queryCode != CortexErrorCode.OK) + { + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(queryCode)); + } + + return CortexResult.Success(); + } + + public List GetHeadsets() + { + return new List(_headsets); + } + + public bool TakeLatestSample(string stream, out IDataSample sample) + { + sample = null; + if (string.IsNullOrWhiteSpace(stream)) + { + return false; + } + + lock (_sampleLock) + { + return _latestSamples.TryGetValue(stream, out sample) && sample != null; + } + } + + private void OnHeadsetScanFinished(object sender, string message) + { + if (!_sessionCreated) { + _ = RefreshAndQueryAsync(); + } + + } + + private async Task RefreshAndQueryAsync() + { + if (_refreshAndQueryInProgress) + { + return CortexErrorCode.UnknownError; + } + + _refreshAndQueryInProgress = true; + try + { + var refreshTcs = new TaskCompletionSource(); + EventHandler<(CortexErrorCode error, string message)> refreshResultHandler = null; + + refreshResultHandler = (sender, result) => + { + CleanupRefresh(); + refreshTcs.TrySetResult(result.error); + }; + + void CleanupRefresh() + { + _client.RefreshHeadsetResult -= refreshResultHandler; + } + + _client.RefreshHeadsetResult += refreshResultHandler; + RefreshHeadset(); + + var refreshCode = await refreshTcs.Task; + + if (refreshCode != CortexErrorCode.OK) + { + return refreshCode; + } + + var queryTcs = new TaskCompletionSource(); + EventHandler<(CortexErrorCode error, List data)> queryResultHandler = null; + + queryResultHandler = (sender, result) => + { + CleanupQuery(); + if (result.error != CortexErrorCode.OK) + { + queryTcs.TrySetResult(result.error); + return; + } + + _headsets.Clear(); + if (result.data != null) + { + _headsets.AddRange(result.data); + } + queryTcs.TrySetResult(CortexErrorCode.OK); + }; + + void CleanupQuery() + { + _client.QueryHeadsetResult -= queryResultHandler; + } + + _client.QueryHeadsetResult += queryResultHandler; + QueryHeadsets(); + + return await queryTcs.Task; + } + finally + { + _refreshAndQueryInProgress = false; + } + } + + + + public async Task> ConnectHeadsetAsync( + string headsetId, + Dictionary mappings = null, + IReadOnlyList streams = null) + { + // check headset exists in the list + var headset = _headsets.FirstOrDefault(h => string.Equals(h.HeadsetID, headsetId, StringComparison.Ordinal)); + if (headset == null) + { + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(CortexErrorCode.HeadsetNotFound)); + } + + + var connectCode = await ConnectHeadsetAsync(headsetId, mappings); + if (connectCode != CortexErrorCode.OK) + { + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(connectCode)); + } + + var sessionResult = await CreateSessionAsync(headsetId); + if (sessionResult.Code != CortexErrorCode.OK) + { + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(sessionResult.Code)); + } + _sessionCreated = true; + + if (streams != null && streams.Count > 0) + { + var subscribeCode = await SubscribeAsync(sessionResult.Data.SessionId, streams); + if (subscribeCode != CortexErrorCode.OK) + { + return CortexResult.Success(sessionResult.Data); + } + } + + return CortexResult.Success(sessionResult.Data); + } + + public async Task DisconnectHeadsetAsync(string headsetId) + { + if (string.IsNullOrWhiteSpace(headsetId)) + { + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(CortexErrorCode.UnknownError)); + } + + var tcs = new TaskCompletionSource(); + EventHandler okHandler = null; + EventHandler errorHandler = null; + + okHandler = (sender, result) => + { + Cleanup(); + tcs.TrySetResult((result ? CortexErrorCode.OK : CortexErrorCode.UnknownError)); + }; + + errorHandler = (sender, error) => + { + if (error.MethodName != "controlDevice") + { + return; + } + Cleanup(); + tcs.TrySetResult((CortexErrorCode.UnknownError)); + }; + + void Cleanup() + { + _client.HeadsetDisConnectedOK -= okHandler; + _client.ErrorMsgReceived -= errorHandler; + } + + _client.HeadsetDisConnectedOK += okHandler; + _client.ErrorMsgReceived += errorHandler; + _client.ControlDevice("disconnect", headsetId, null); + + var code = await tcs.Task; + return code == CortexErrorCode.OK + ? CortexResult.Success() + : CortexResult.Fail(CortexErrorMapper.FromErrorCode(code)); + } + + private async Task ConnectHeadsetAsync(string headsetId, Dictionary mappings) + { + var tcs = new TaskCompletionSource(); + EventHandler connectHandler = null; + EventHandler errorHandler = null; + + connectHandler = (sender, info) => + { + if (!string.Equals(info.HeadsetId, headsetId, StringComparison.Ordinal)) + { + return; + } + Cleanup(); + tcs.TrySetResult(info.IsSuccess ? CortexErrorCode.OK : CortexErrorCode.UnknownError); + }; + + errorHandler = (sender, error) => + { + if (error.MethodName != "controlDevice") + { + return; + } + Cleanup(); + tcs.TrySetResult(CortexErrorCode.UnknownError); + }; + + void Cleanup() + { + _client.HeadsetConnectNotify -= connectHandler; + _client.ErrorMsgReceived -= errorHandler; + } + + _client.HeadsetConnectNotify += connectHandler; + _client.ErrorMsgReceived += errorHandler; + + _client.ControlDevice("connect", headsetId, ToMappings(mappings)); + return await tcs.Task; + } + + private async Task<(CortexErrorCode Code, SessionInfo Data)> CreateSessionAsync(string headsetId) + { + var tcs = new TaskCompletionSource<(CortexErrorCode Code, SessionInfo Data)>(); + EventHandler okHandler = null; + EventHandler errorHandler = null; + + okHandler = (sender, sessionInfo) => + { + if (!string.Equals(sessionInfo.HeadsetId, headsetId, StringComparison.Ordinal)) + { + return; + } + Cleanup(); + tcs.TrySetResult((CortexErrorCode.OK, SessionInfo.FromSessionEventArgs(sessionInfo))); + }; + + errorHandler = (sender, error) => + { + if (error.MethodName != "createSession") + { + return; + } + Cleanup(); + tcs.TrySetResult((CortexErrorCode.UnknownError, null)); + }; + + void Cleanup() + { + _client.CreateSessionOK -= okHandler; + _client.ErrorMsgReceived -= errorHandler; + } + + _client.CreateSessionOK += okHandler; + _client.ErrorMsgReceived += errorHandler; + _client.CreateSession(headsetId); + + return await tcs.Task; + } + + private async Task SubscribeAsync(string sessionId, IReadOnlyList streams) + { + if (string.IsNullOrEmpty(sessionId)) + { + return CortexErrorCode.UnknownError; + } + + var tcs = new TaskCompletionSource(); + EventHandler okHandler = null; + EventHandler errorHandler = null; + + okHandler = (sender, result) => + { + CacheStreamHeaders(result); + Cleanup(); + var hasSuccess = result.SuccessList != null && result.SuccessList.Count > 0; + tcs.TrySetResult(hasSuccess ? CortexErrorCode.OK : CortexErrorCode.UnknownError); + }; + + errorHandler = (sender, error) => + { + if (error.MethodName != "subscribe") + { + return; + } + Cleanup(); + tcs.TrySetResult(CortexErrorCode.UnknownError); + }; + + void Cleanup() + { + _client.SubscribeDataDone -= okHandler; + _client.ErrorMsgReceived -= errorHandler; + } + + _client.SubscribeDataDone += okHandler; + _client.ErrorMsgReceived += errorHandler; + _client.Subscribe(_client.CurrentCortexToken, sessionId, streams.ToList()); + + return await tcs.Task; + } + + private void HookStreamData() + { + if (_streamDataHooked) + { + return; + } + + _client.StreamDataReceived += OnStreamDataReceived; + _streamDataHooked = true; + } + + private void OnStreamDataReceived(object sender, StreamDataEventArgs e) + { + if (e == null || string.IsNullOrEmpty(e.StreamName)) + { + return; + } + + switch (e.StreamName) + { + case DataStreamName.DevInfos: + if (TryBuildCQSample(e, out var cqSample)) + { + SetLatestSample(DataStreamName.DevInfos, cqSample); + } + break; + case DataStreamName.MentalCommands: + if (TryBuildMentalCommandSample(e, out var comSample)) + { + SetLatestSample(DataStreamName.MentalCommands, comSample); + } + break; + } + } + + private void SetLatestSample(string stream, IDataSample sample) + { + if (sample == null || string.IsNullOrEmpty(stream)) + { + return; + } + lock (_sampleLock) + { + _latestSamples[stream] = sample; + } + } + + private bool TryBuildCQSample(StreamDataEventArgs e, out IDataSample sample) + { + sample = null; + if (e.Data == null || e.Data.Count == 0) + { + return false; + } + + if (!TryGetTimestamp(e.Data, out var timestamp)) + { + return false; + } + + IReadOnlyList headers = null; + lock (_sampleLock) + { + _streamHeaders.TryGetValue(DataStreamName.DevInfos, out headers); + } + + var values = new Dictionary(StringComparer.OrdinalIgnoreCase); + if (headers != null) + { + for (var i = 0; i < headers.Count && (i + 1) < e.Data.Count; i++) + { + var key = headers[i]; + if (IsNonChannelHeader(key)) + { + continue; + } + var value = Convert.ToSingle(e.Data[i + 1]); + values[key] = value; + } + } + else + { + // throw exception or return false if headers are not available, as we don't know how to parse the data + return false; + } + + sample = new CQDataSample(timestamp, values); + return true; + } + + private bool TryBuildMentalCommandSample(StreamDataEventArgs e, out IDataSample sample) + { + sample = null; + if (e.Data == null || e.Data.Count < 3) + { + return false; + } + + if (!TryGetTimestamp(e.Data, out var timestamp)) + { + return false; + } + + var action = Convert.ToString(e.Data[1]); + var power = Convert.ToSingle(e.Data[2]); + sample = new MentalCommandDataSample(timestamp, action, power); + return true; + } + + private static bool TryGetTimestamp(ArrayList data, out double timestamp) + { + timestamp = 0; + if (data == null || data.Count == 0) + { + return false; + } + + try + { + timestamp = Convert.ToDouble(data[0]); + return true; + } + catch + { + return false; + } + } + + private void CacheStreamHeaders(MultipleResultEventArgs result) + { + if (result?.SuccessList == null || result.SuccessList.Count == 0) + { + return; + } + + lock (_sampleLock) + { + foreach (JObject ele in result.SuccessList) + { + var streamName = (string)ele["streamName"]; + if (string.IsNullOrEmpty(streamName)) + { + continue; + } + + var header = (JArray)ele["cols"]; + if (header == null) + { + continue; + } + + _streamHeaders[streamName] = NormalizeHeaders(streamName, header); + } + } + } + + + private static IReadOnlyList NormalizeHeaders(string streamName, JArray header) + { + UnityEngine.Debug.Log("Normalizing headers for stream: " + streamName + " with header: " + header.Count); + if (streamName == DataStreamName.DevInfos && header.Count > 0) + { + var devCols = new List(); + devCols.Add(header[0].ToString()); + devCols.Add(header[1].ToString()); + + if (header.Count > 2 && header[2] is JArray channelList) + { + for (var i = 0; i < channelList.Count; i++) + { + devCols.Add(channelList[i].ToString()); + } + } + + if (header.Count > 3) + { + for (var id = 3; id < header.Count; id++) + { + devCols.Add(header[id].ToString()); + } + } + + return devCols; + } + + var cols = new List(header.Count); + foreach (var token in header) + { + cols.Add(token.ToString()); + } + return cols; + } + + private static bool IsNonChannelHeader(string header) + { + if (string.IsNullOrWhiteSpace(header)) + { + return true; + } + + return header.Equals("interpolated", StringComparison.OrdinalIgnoreCase) + || header.Equals("counter", StringComparison.OrdinalIgnoreCase) + || header.Equals("timestamp", StringComparison.OrdinalIgnoreCase); + } + + private static JObject ToMappings(Dictionary mappings) + { + if (mappings == null || mappings.Count == 0) + { + return null; + } + + var obj = new JObject(); + foreach (var kvp in mappings) + { + obj[kvp.Key] = kvp.Value; + } + return obj; + } + + private void RefreshHeadset() + { + _client.ControlDevice("refresh", string.Empty, null); + } + + private void QueryHeadsets() + { + _client.QueryHeadsets(string.Empty); + } + } +} diff --git a/Src/Runtime/SDK/Headset/IHeadsetService.cs b/Src/Runtime/SDK/Headset/IHeadsetService.cs new file mode 100644 index 0000000..3b61e1a --- /dev/null +++ b/Src/Runtime/SDK/Headset/IHeadsetService.cs @@ -0,0 +1,52 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Emotiv.Cortex.Models; +using EmotivUnityPlugin; + +namespace Emotiv.Cortex.Service +{ + public interface IHeadsetService + { + /// + /// Scans for available headsets and queries their status. Automatically retries the process when warning 142 is received (scanning finish warning). + /// The API only returns success or fail (no data). To get the headset list, use GetHeadsets(). + /// + /// Success or fail result. No headset data returned. + Task ScanHeadsetAsync(); + + /// + /// Retrieves the list of available headsets. This is a synchronous API. + /// + /// List of available headsets. + List GetHeadsets(); + + /// + /// Connects to the headset with the specified headsetId. If not already connected, creates a working session with the headset. + /// If streams is not null, subscribes to the specified data streams (e.g., "eeg", "mot", "dev", ...). + /// + /// The ID of the headset to connect. + /// Optional. Channel mappings for the EPOC FLEX headset only. + /// Optional. List of data streams to subscribe (e.g., "eeg", "mot", "dev"). + /// Session info for the connected headset. + Task> ConnectHeadsetAsync( + string headsetId, + Dictionary mappings = null, + IReadOnlyList streams = null + ); + + /// + /// Disconnects the headset with the specified headsetId. + /// + /// The ID of the headset to disconnect. + /// Success or fail result. + Task DisconnectHeadsetAsync(string headsetId); + + /// + /// Takes the latest sample of the specified subscribed data stream. + /// + /// The name of the data stream (must be subscribed). + /// Output parameter for the latest sample. + /// True if a sample is available; otherwise, false. + bool TakeLatestSample(string stream, out IDataSample sample); + } +} diff --git a/Src/Runtime/SDK/IEmotivCortexRuntime.cs b/Src/Runtime/SDK/IEmotivCortexRuntime.cs index 2b86c33..a4d6008 100644 --- a/Src/Runtime/SDK/IEmotivCortexRuntime.cs +++ b/Src/Runtime/SDK/IEmotivCortexRuntime.cs @@ -5,5 +5,6 @@ namespace Emotiv.Cortex.Service public interface IEmotivCortexRuntime : IDisposable { IAuthService Auth { get; } + IHeadsetService Headset { get; } } } \ No newline at end of file diff --git a/Src/SessionHandler.cs b/Src/SessionHandler.cs index 13c8344..617b886 100644 --- a/Src/SessionHandler.cs +++ b/Src/SessionHandler.cs @@ -111,13 +111,12 @@ private void UpdateSessionOk(object sender, SessionEventArgs sessionInfo) /// Open a session with an EMOTIV headset. /// A application can open only one session at a time with a given headset. /// - public void Create(string cortexToken, string headsetId, bool activeSession = false) + public void Create(string headsetId, bool activeSession = false) { - if (!String.IsNullOrEmpty(cortexToken) && - !String.IsNullOrEmpty(headsetId)) + if (!String.IsNullOrEmpty(headsetId)) { string status = activeSession ? "active" : "open"; - _ctxClient.CreateSession(cortexToken, headsetId, status); + _ctxClient.CreateSession(headsetId, status); } else { UnityEngine.Debug.Log("CreateSession: Invalid parameters"); From d40031c781957a91699b008624059f6e3354a247 Mon Sep 17 00:00:00 2001 From: tung_tung Date: Tue, 3 Mar 2026 18:32:00 +0700 Subject: [PATCH 2/2] update rework --- Src/CortexClient.cs | 7 +- Src/DataStreamManager.cs | 136 ++++++++++++++++++- Src/DataStreamProcess.cs | 34 ++++- Src/EmotivUnityItf.cs | 2 +- Src/HeadsetFinder.cs | 89 ++++++++++++ Src/Runtime/Models/CortexErrorCode.cs | 1 + Src/Runtime/Models/CortexRuntimeContext.cs | 50 +++++++ Src/Runtime/SDK/EmotivCortexRuntime.cs | 2 +- Src/Runtime/SDK/Headset/HeadsetService.cs | 149 ++++++++------------- Src/SessionHandler.cs | 7 +- 10 files changed, 373 insertions(+), 104 deletions(-) create mode 100644 Src/HeadsetFinder.cs diff --git a/Src/CortexClient.cs b/Src/CortexClient.cs index d39f5fe..c474559 100644 --- a/Src/CortexClient.cs +++ b/Src/CortexClient.cs @@ -69,7 +69,7 @@ public abstract class CortexClient public event EventHandler UserLogoutNotify; public event EventHandler GetLicenseInfoDone; public event EventHandler<(CortexErrorCode error, License data)> GetLicenseInfoResult; - public event EventHandler<(CortexErrorCode error, string message)> RefreshHeadsetResult; + public event EventHandler<(CortexErrorCode error, string command)> ControlDeviceResult; public event EventHandler<(CortexErrorCode error, List data)> QueryHeadsetResult; public event EventHandler CreateSessionOK; public event EventHandler UpdateSessionOK; @@ -202,7 +202,8 @@ public void OnMessageReceived(string receievedMsg) } else if (method == "controlDevice") { - RefreshHeadsetResult?.Invoke(this, (CortexErrorCode.UnknownError, messageError)); + string command = (string)error["command"]; + ControlDeviceResult?.Invoke(this, (CortexErrorCode.UnknownError, command)); } } else { @@ -300,7 +301,7 @@ private void HandleResponse(string method, JToken data) } else if (command == "refresh") { - RefreshHeadsetResult?.Invoke(this, (CortexErrorCode.OK, command)); + ControlDeviceResult?.Invoke(this, (CortexErrorCode.OK, command)); } } else if (method == "getUserLogin") diff --git a/Src/DataStreamManager.cs b/Src/DataStreamManager.cs index e88b2d9..35bfd6e 100644 --- a/Src/DataStreamManager.cs +++ b/Src/DataStreamManager.cs @@ -112,10 +112,12 @@ private void Init() { _dsProcess.ProcessInit(); _dsProcess.SubscribedOK += OnSubscribedOK; + _dsProcess.HeadsetConnectNotify += OnHeadsetConnectNotify; _dsProcess.StreamStopNotify += OnStreamStopNotify; _dsProcess.LicenseValidTo += OnLicenseValidTo; _dsProcess.SessionActivedOK += OnSessionActivedOK; _dsProcess.CreateSessionFail += OnCreateSessionFail; + _dsProcess.QueryHeadsetOK += OnQueryHeadsetOK; _dsProcess.UserLogoutNotify += OnUserLogoutNotify; _dsProcess.SessionClosedNotify += OnSessionClosedNotify; _dsProcess.HeadsetScanFinished += OnHeadsetScanFinished; @@ -172,6 +174,8 @@ private void OnSessionClosedNotify(object sender, string sessionId) _wantedHeadsetId = ""; _connectHeadsetState = ConnectHeadsetStates.No_Connect; _detectedHeadsets.Clear(); + // start scanning headset again + _dsProcess.RefreshHeadset(); ResetDataBuffers(); } } @@ -194,6 +198,34 @@ private void OnUserLogoutNotify(object sender, string message) } } + private void OnQueryHeadsetOK(object sender, List headsets) + { + lock(_locker) + { + _detectedHeadsets.Clear(); + string strOut = ""; + foreach(var headset in headsets) { + _detectedHeadsets.Add(headset); + string headsetId = headset.HeadsetID; + if (_readyCreateSession && headsetId == _wantedHeadsetId && + (headset.Status == "connected")) { + UnityEngine.Debug.Log("The headset " + headsetId + " is connected. Start creating session."); + + _readyCreateSession = false; + // create session + _dsProcess.CreateSession(headsetId, true); + } + strOut += headset.HeadsetID + "-" + headset.HeadsetConnection + "-" + headset.Status + "; "; + } + UnityEngine.Debug.Log("DataStreamManager-OnQueryHeadsetOK: " + strOut); + if (string.IsNullOrEmpty(strOut)) + { + strOut = "No headset available at " + DateTime.Now.ToString("HH:mm:ss"); + } + MessageQueryHeadsetOK(this, strOut); + } + } + private void OnCreateSessionFail(object sender, string errorMsg) { lock (_locker) @@ -218,6 +250,9 @@ private void OnSessionActivedOK(object sender, SessionEventArgs sessionInfo) _connectHeadsetState = ConnectHeadsetStates.Session_Created; + // stop query headset if session is created + _dsProcess.StopQueryHeadset(); + // subscribe data _dsProcess.SubscribeData(); } @@ -230,6 +265,10 @@ private void OnSessionActivedOK(object sender, SessionEventArgs sessionInfo) private void OnLicenseValidTo(object sender, DateTime validTo) { + if (!_isSessActivated) { + // start scanning headset again + _dsProcess.RefreshHeadset(); + } LicenseValidTo(this, validTo); } @@ -302,6 +341,33 @@ private void OnStreamStopNotify(object sender, List streams) } } + private void OnHeadsetConnectNotify(object sender, HeadsetConnectEventArgs e) + { + lock (_locker) + { + string headsetId = e.HeadsetId; + UnityEngine.Debug.Log("OnHeadsetConnectNotify for headset " + headsetId + " while wantedHeadset : " + _wantedHeadsetId + "_readyCreateSession" + _readyCreateSession); + if (e.IsSuccess && _readyCreateSession && + (headsetId == _wantedHeadsetId)) { + UnityEngine.Debug.Log("Connect the headset " + headsetId + " successfully. Start creating session."); + _readyCreateSession = false; + // create session + _dsProcess.CreateSession(headsetId, true); + } + else if (!e.IsSuccess && headsetId == _wantedHeadsetId) { + UnityEngine.Debug.Log("Connect the headset " + headsetId + " unsuccessfully. Message : " + e.Message); + HeadsetConnectFail(this, headsetId); + _wantedHeadsetId = ""; // reset headsetId + _isSessActivated = false; + + _connectHeadsetState = ConnectHeadsetStates.Session_Failed; + } + else { + UnityEngine.Debug.Log("OnHeadsetConnectNotify: " + headsetId + ". Message : " + e.Message); + } + } + } + private void OnSubscribedOK(object sender, Dictionary e) { lock (_locker) @@ -493,6 +559,11 @@ private void CloseSession() private void OnHeadsetScanFinished(object sender, string message) { UnityEngine.Debug.Log(message); + if (!_isSessActivated) { + // start scanning headset again + UnityEngine.Debug.Log("Start scanning headset again."); + _dsProcess.RefreshHeadset(); + } } /// @@ -510,7 +581,61 @@ public void StartAuthorize(string licenseKey = "", object context = null) /// the id of headset you want to retrieve data. public void StartDataStream(List streamNameList, string headsetId) { - + lock (_locker) + { + // if (!string.IsNullOrEmpty(_wantedHeadsetId)) { + // UnityEngine.Debug.Log("The data streams has already started for headset " + // + _wantedHeadsetId + ". Please wait..."); + // return; + // } + + if (string.IsNullOrEmpty(headsetId)) { + if (_detectedHeadsets.Count > 0) { + // get the first headset in the list + _wantedHeadsetId = _detectedHeadsets[0].HeadsetID; + } + else { + UnityEngine.Debug.LogError("No headset available."); + // query headset + // _dsProcess.QueryHeadsets(""); + return; + } + + } + else { + bool _foundHeadset = false; + foreach (var item in _detectedHeadsets) { + if (item.HeadsetID == headsetId){ + _foundHeadset = true; + } + } + if (_foundHeadset) { + _wantedHeadsetId = headsetId; + } + else { + UnityEngine.Debug.LogError("The headset " + headsetId + " is not found. Please check the headset Id."); + return; + } + } + + UnityEngine.Debug.Log("DataStreamManager-StartDataStream: " + _wantedHeadsetId); + + _connectHeadsetState = ConnectHeadsetStates.Headset_Connecting; + + foreach(var curStream in streamNameList) { + _dsProcess.AddStreams(curStream); + } + // check headset connected + if (!isConnectedHeadset(_wantedHeadsetId)) { + _readyCreateSession = true; + _dsProcess.StartConnectToDevice(_wantedHeadsetId); + } + else if (!_isSessActivated) { + UnityEngine.Debug.Log("The headset " + _wantedHeadsetId + " has already connected. Start creating session."); + _readyCreateSession = false; + _dsProcess.CreateSession(_wantedHeadsetId, true); + } + } } /// @@ -865,6 +990,13 @@ public double GetPMData(string label) return _pmBuff.GetData(label); } + /// + /// Query headsets. + /// + public void QueryHeadsets(string headsetId = "") { + _dsProcess.QueryHeadsets(headsetId); + } + /// /// Get detected headsets. /// @@ -881,6 +1013,8 @@ public List GetDetectedHeadsets() { public void Stop() { // close data stream CloseSession(); + // stop query headset + _dsProcess.StopQueryHeadset(); _dsProcess.CloseCortexClient(); } diff --git a/Src/DataStreamProcess.cs b/Src/DataStreamProcess.cs index 82f85f2..6a37065 100644 --- a/Src/DataStreamProcess.cs +++ b/Src/DataStreamProcess.cs @@ -14,6 +14,7 @@ public class DataStreamProcess static readonly object _locker = new object(); private CortexClient _ctxClient = CortexClient.Instance; private List _streams; + private HeadsetFinder _headsetFinder = HeadsetFinder.Instance; private Authorizer _authorizer = Authorizer.Instance; private SessionHandler _sessionHandler = SessionHandler.Instance; @@ -49,6 +50,12 @@ public event EventHandler SessionActivedOK remove { _sessionHandler.SessionActived -= value; } } public event EventHandler CreateSessionFail; + + public event EventHandler> QueryHeadsetOK + { + add { _headsetFinder.QueryHeadsetOK += value; } + remove { _headsetFinder.QueryHeadsetOK -= value; } + } public event EventHandler UserLogoutNotify; // inform license valid to date // For test @@ -110,6 +117,7 @@ public void ProcessInit() private void OnUserLogoutNotify(object sender, string message) { // UnityEngine.Debug.Log("OnUserLogoutNotify: " + message); + StopQueryHeadset(); // Clear session data Clear(); UserLogoutNotify(this, message); @@ -343,8 +351,9 @@ public void CreateSession(string headsetId, bool isActiveSession) // Wait a moment before creating session System.Threading.Thread.Sleep(1000); // CreateSession + string cortexToken = _authorizer.CortexToken; UnityEngine.Debug.Log("Create Session with headset " + headsetId); - _sessionHandler.Create(headsetId, isActiveSession); + _sessionHandler.Create(cortexToken, headsetId, isActiveSession); } /// @@ -415,6 +424,22 @@ public void CloseSession() _sessionHandler.CloseSession(cortexToken); } + /// + /// Start query headsets to get headsets information. + /// + public void QueryHeadsets(string headsetId = "") + { + _ctxClient.QueryHeadsets(headsetId); + } + + /// + /// Stop query headsets. + /// + public void StopQueryHeadset() + { + _headsetFinder.StopQueryHeadset(); + } + /// /// Force close websocket client. /// @@ -423,6 +448,13 @@ public void CloseCortexClient() _ctxClient.Close(); } + /// + /// Refresh headset to trigger scan btle devices from Cortex + /// + public void RefreshHeadset() { + _headsetFinder.RefreshHeadset(); + } + // log out public void Logout() { _authorizer.Logout(); diff --git a/Src/EmotivUnityItf.cs b/Src/EmotivUnityItf.cs index a758589..6771f9c 100644 --- a/Src/EmotivUnityItf.cs +++ b/Src/EmotivUnityItf.cs @@ -312,7 +312,7 @@ public void AcceptEulaAndPrivacyPolicy() /// The headset id of specific headset if want get headset information of a specific headset. /// If use empty string it will query all headsets public void QueryHeadsets(string headsetId = "") { - // _dsManager.QueryHeadsets(headsetId); + _dsManager.QueryHeadsets(headsetId); } /// diff --git a/Src/HeadsetFinder.cs b/Src/HeadsetFinder.cs new file mode 100644 index 0000000..30dced9 --- /dev/null +++ b/Src/HeadsetFinder.cs @@ -0,0 +1,89 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Newtonsoft.Json.Linq; +using System.Timers; +using UnityEngine; + +namespace EmotivUnityPlugin +{ + /// + /// Reponsible for finding headsets. + /// + public class HeadsetFinder + { + private CortexClient _ctxClient = CortexClient.Instance; + + /// + /// Timer for querying headsets + /// + private Timer _aTimer = null; + + // Event + public event EventHandler HeadsetDisConnectedOK; + public event EventHandler> QueryHeadsetOK; + + public HeadsetFinder() + { + _ctxClient = CortexClient.Instance; + _ctxClient.QueryHeadsetOK += OnQueryHeadsetReceived; + _ctxClient.HeadsetDisConnectedOK += OnHeadsetDisconnectedOK; + } + + public static HeadsetFinder Instance { get; } = new HeadsetFinder(); + + private void OnHeadsetDisconnectedOK(object sender, bool e) + { + HeadsetDisConnectedOK(this, true); + } + + private void OnQueryHeadsetReceived(object sender, List headsets) + { + QueryHeadsetOK(this, headsets); + } + + /// + /// Init headset finder + /// + public void FinderInit() + { + SetQueryHeadsetTimer(); + } + + public void StopQueryHeadset() { + if (_aTimer != null && _aTimer.Enabled) { + UnityEngine.Debug.Log("Stop query headset"); + _aTimer.Stop(); + } + } + public void RefreshHeadset() { + _ctxClient.ControlDevice("refresh", "", null); + } + + /// + /// Setup query headset timer + /// + private void SetQueryHeadsetTimer() + { + if (_aTimer != null) { + _aTimer.Enabled = true; + return; + } + + _aTimer = new Timer(Config.QUERY_HEADSET_TIME); + + // Hook up the Elapsed event for the timer. + _aTimer.Elapsed += OnTimedEvent; + _aTimer.AutoReset = true; + _aTimer.Enabled = true; + } + + /// + /// Handle timeout. Retry query headsets. + /// + private void OnTimedEvent(object sender, ElapsedEventArgs e) + { + _ctxClient.QueryHeadsets(""); + } + } +} diff --git a/Src/Runtime/Models/CortexErrorCode.cs b/Src/Runtime/Models/CortexErrorCode.cs index 317bbbb..65e0cff 100644 --- a/Src/Runtime/Models/CortexErrorCode.cs +++ b/Src/Runtime/Models/CortexErrorCode.cs @@ -7,6 +7,7 @@ public enum CortexErrorCode AuthorizationFailed, // Indicates that the authorization process failed overall. LicenseError, // Indicates that there is an issue with the license (e.g., invalid or expired license). HeadsetNotFound, // Indicates that the specified headset could not be found. + SubscriptionFailed, // Indicates that the subscription process failed. UnknownError } } diff --git a/Src/Runtime/Models/CortexRuntimeContext.cs b/Src/Runtime/Models/CortexRuntimeContext.cs index b14a2f1..902bcbf 100644 --- a/Src/Runtime/Models/CortexRuntimeContext.cs +++ b/Src/Runtime/Models/CortexRuntimeContext.cs @@ -1,4 +1,6 @@ +using System; using EmotivUnityPlugin; // TODO (Tung Nguyen): remove this line when move all old code to new SDK +using System.Collections.Generic; namespace Emotiv.Cortex.Models { public sealed class CortexRuntimeContext @@ -51,5 +53,53 @@ public void SetConnectionStatus(bool isConnected) } } + // ------------------------ + // Headsets + // ------------------------ + private List _headsets = new List(); + public List Headsets + { + get + { + lock (_lock) + { + return new List(_headsets); + } + } + } + + public void SetHeadsets(List headsets) + { + lock (_lock) + { + _headsets.Clear(); + if (headsets != null) + { + _headsets.AddRange(headsets); + } + } + } + + // Map headset id and session info + private Dictionary _sessionInfoByHeadsetId = new Dictionary(StringComparer.Ordinal); + public SessionInfo GetSessionInfoByHeadsetId(string headsetId) + { + lock (_lock) + { + if (_sessionInfoByHeadsetId.TryGetValue(headsetId, out var sessionInfo)) + { + return sessionInfo; + } + return null; + } + } + + public void SetSessionInfo(string headsetId, SessionInfo sessionInfo) + { + lock (_lock) + { + _sessionInfoByHeadsetId[headsetId] = sessionInfo; + } + } } } \ No newline at end of file diff --git a/Src/Runtime/SDK/EmotivCortexRuntime.cs b/Src/Runtime/SDK/EmotivCortexRuntime.cs index 2c13fcf..dfec828 100644 --- a/Src/Runtime/SDK/EmotivCortexRuntime.cs +++ b/Src/Runtime/SDK/EmotivCortexRuntime.cs @@ -46,7 +46,7 @@ public IAuthService Auth => _auth ??= new AuthService(_context, _client); public IHeadsetService Headset - => _headset ??= HeadsetService.Instance; + => _headset ??= new HeadsetService(_context, _client); public void Dispose() { diff --git a/Src/Runtime/SDK/Headset/HeadsetService.cs b/Src/Runtime/SDK/Headset/HeadsetService.cs index 0f7b5de..b592bb4 100644 --- a/Src/Runtime/SDK/Headset/HeadsetService.cs +++ b/Src/Runtime/SDK/Headset/HeadsetService.cs @@ -11,26 +11,17 @@ namespace Emotiv.Cortex.Service { public class HeadsetService : IHeadsetService { - public static HeadsetService Instance { get; } = new HeadsetService(); - + private readonly CortexRuntimeContext _context; private readonly CortexClient _client; - private readonly List _headsets = new List(); - private bool _refreshAndQueryInProgress; - private bool _sessionCreated; + private volatile bool _refreshAndQueryInProgress; private readonly object _sampleLock = new object(); private readonly Dictionary _latestSamples = new Dictionary(StringComparer.Ordinal); private readonly Dictionary> _streamHeaders = new Dictionary>(StringComparer.Ordinal); private bool _streamDataHooked; - public HeadsetService() - { - _client = CortexClient.Instance; - _client.HeadsetScanFinished += OnHeadsetScanFinished; - HookStreamData(); - } - - internal HeadsetService(CortexClient client) + public HeadsetService(CortexRuntimeContext context, CortexClient client) { + _context = context ?? throw new ArgumentNullException(nameof(context)); _client = client ?? throw new ArgumentNullException(nameof(client)); _client.HeadsetScanFinished += OnHeadsetScanFinished; HookStreamData(); @@ -49,7 +40,7 @@ public async Task ScanHeadsetAsync() public List GetHeadsets() { - return new List(_headsets); + return _context.Headsets; } public bool TakeLatestSample(string stream, out IDataSample sample) @@ -68,10 +59,8 @@ public bool TakeLatestSample(string stream, out IDataSample sample) private void OnHeadsetScanFinished(object sender, string message) { - if (!_sessionCreated) { - _ = RefreshAndQueryAsync(); - } - + UnityEngine.Debug.Log("Headset scan finished with message: " + message); + _ = RefreshAndQueryAsync(); } private async Task RefreshAndQueryAsync() @@ -85,20 +74,24 @@ private async Task RefreshAndQueryAsync() try { var refreshTcs = new TaskCompletionSource(); - EventHandler<(CortexErrorCode error, string message)> refreshResultHandler = null; + EventHandler<(CortexErrorCode error, string command)> refreshResultHandler = null; refreshResultHandler = (sender, result) => { + if (result.command != "refresh") + { + return; + } CleanupRefresh(); refreshTcs.TrySetResult(result.error); }; void CleanupRefresh() { - _client.RefreshHeadsetResult -= refreshResultHandler; + _client.ControlDeviceResult -= refreshResultHandler; } - _client.RefreshHeadsetResult += refreshResultHandler; + _client.ControlDeviceResult += refreshResultHandler; RefreshHeadset(); var refreshCode = await refreshTcs.Task; @@ -120,10 +113,13 @@ void CleanupRefresh() return; } - _headsets.Clear(); if (result.data != null) { - _headsets.AddRange(result.data); + _context.SetHeadsets(result.data); + } + else + { + _context.SetHeadsets(null); } queryTcs.TrySetResult(CortexErrorCode.OK); }; @@ -144,7 +140,6 @@ void CleanupQuery() } } - public async Task> ConnectHeadsetAsync( string headsetId, @@ -152,13 +147,12 @@ public async Task> ConnectHeadsetAsync( IReadOnlyList streams = null) { // check headset exists in the list - var headset = _headsets.FirstOrDefault(h => string.Equals(h.HeadsetID, headsetId, StringComparison.Ordinal)); + var headset = _context.Headsets.FirstOrDefault(h => string.Equals(h.HeadsetID, headsetId, StringComparison.Ordinal)); if (headset == null) { return CortexResult.Fail(CortexErrorMapper.FromErrorCode(CortexErrorCode.HeadsetNotFound)); } - var connectCode = await ConnectHeadsetAsync(headsetId, mappings); if (connectCode != CortexErrorCode.OK) { @@ -170,15 +164,18 @@ public async Task> ConnectHeadsetAsync( { return CortexResult.Fail(CortexErrorMapper.FromErrorCode(sessionResult.Code)); } - _sessionCreated = true; if (streams != null && streams.Count > 0) { - var subscribeCode = await SubscribeAsync(sessionResult.Data.SessionId, streams); - if (subscribeCode != CortexErrorCode.OK) + var subscribeResult = await SubscribeAsync(sessionResult.Data.SessionId, streams); + if (subscribeResult.IsSuccess) { return CortexResult.Success(sessionResult.Data); } + else { + return CortexResult.Fail(subscribeResult.Error); + } + } return CortexResult.Success(sessionResult.Data); @@ -192,33 +189,24 @@ public async Task DisconnectHeadsetAsync(string headsetId) } var tcs = new TaskCompletionSource(); - EventHandler okHandler = null; - EventHandler errorHandler = null; - - okHandler = (sender, result) => - { - Cleanup(); - tcs.TrySetResult((result ? CortexErrorCode.OK : CortexErrorCode.UnknownError)); - }; + EventHandler<(CortexErrorCode error, string command)> disconnectHandler = null; - errorHandler = (sender, error) => + disconnectHandler = (sender, result) => { - if (error.MethodName != "controlDevice") + if (result.command != "disconnect") { return; } Cleanup(); - tcs.TrySetResult((CortexErrorCode.UnknownError)); + tcs.TrySetResult(result.error); }; void Cleanup() { - _client.HeadsetDisConnectedOK -= okHandler; - _client.ErrorMsgReceived -= errorHandler; + _client.ControlDeviceResult -= disconnectHandler; } - _client.HeadsetDisConnectedOK += okHandler; - _client.ErrorMsgReceived += errorHandler; + _client.ControlDeviceResult += disconnectHandler; _client.ControlDevice("disconnect", headsetId, null); var code = await tcs.Task; @@ -230,37 +218,24 @@ void Cleanup() private async Task ConnectHeadsetAsync(string headsetId, Dictionary mappings) { var tcs = new TaskCompletionSource(); - EventHandler connectHandler = null; - EventHandler errorHandler = null; - - connectHandler = (sender, info) => - { - if (!string.Equals(info.HeadsetId, headsetId, StringComparison.Ordinal)) - { - return; - } - Cleanup(); - tcs.TrySetResult(info.IsSuccess ? CortexErrorCode.OK : CortexErrorCode.UnknownError); - }; + EventHandler<(CortexErrorCode error, string command)> connectHandler = null; - errorHandler = (sender, error) => + connectHandler = (sender, result) => { - if (error.MethodName != "controlDevice") + if (result.command != "connect") { return; } Cleanup(); - tcs.TrySetResult(CortexErrorCode.UnknownError); + tcs.TrySetResult(result.error); }; void Cleanup() { - _client.HeadsetConnectNotify -= connectHandler; - _client.ErrorMsgReceived -= errorHandler; + _client.ControlDeviceResult -= connectHandler; } - _client.HeadsetConnectNotify += connectHandler; - _client.ErrorMsgReceived += errorHandler; + _client.ControlDeviceResult += connectHandler; _client.ControlDevice("connect", headsetId, ToMappings(mappings)); return await tcs.Task; @@ -305,14 +280,14 @@ void Cleanup() return await tcs.Task; } - private async Task SubscribeAsync(string sessionId, IReadOnlyList streams) + private async Task SubscribeAsync(string sessionId, IReadOnlyList streams) { if (string.IsNullOrEmpty(sessionId)) { - return CortexErrorCode.UnknownError; + return CortexResult.Fail(CortexErrorMapper.FromErrorCode(CortexErrorCode.SubscriptionFailed)); } - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(); EventHandler okHandler = null; EventHandler errorHandler = null; @@ -321,7 +296,7 @@ private async Task SubscribeAsync(string sessionId, IReadOnlyLi CacheStreamHeaders(result); Cleanup(); var hasSuccess = result.SuccessList != null && result.SuccessList.Count > 0; - tcs.TrySetResult(hasSuccess ? CortexErrorCode.OK : CortexErrorCode.UnknownError); + tcs.TrySetResult(hasSuccess ? CortexResult.Success() : CortexResult.Fail(CortexErrorMapper.FromErrorCode(CortexErrorCode.SubscriptionFailed))); }; errorHandler = (sender, error) => @@ -331,7 +306,8 @@ private async Task SubscribeAsync(string sessionId, IReadOnlyLi return; } Cleanup(); - tcs.TrySetResult(CortexErrorCode.UnknownError); + var cortexError = new CortexError((CortexErrorCode)error.Code, error.MessageError); + tcs.TrySetResult(CortexResult.Fail(cortexError)); }; void Cleanup() @@ -416,14 +392,10 @@ private bool TryBuildCQSample(StreamDataEventArgs e, out IDataSample sample) var values = new Dictionary(StringComparer.OrdinalIgnoreCase); if (headers != null) { - for (var i = 0; i < headers.Count && (i + 1) < e.Data.Count; i++) + for (var i = 0; i < headers.Count && i < e.Data.Count; i++) { var key = headers[i]; - if (IsNonChannelHeader(key)) - { - continue; - } - var value = Convert.ToSingle(e.Data[i + 1]); + var value = Convert.ToSingle(e.Data[i]); values[key] = value; } } @@ -507,17 +479,19 @@ private void CacheStreamHeaders(MultipleResultEventArgs result) private static IReadOnlyList NormalizeHeaders(string streamName, JArray header) { UnityEngine.Debug.Log("Normalizing headers for stream: " + streamName + " with header: " + header.Count); - if (streamName == DataStreamName.DevInfos && header.Count > 0) + var cols = new List(); + cols.Add("TimeStamp"); + if (streamName == DataStreamName.DevInfos && header.Count >= 2) { - var devCols = new List(); - devCols.Add(header[0].ToString()); - devCols.Add(header[1].ToString()); + // the dev infos has data format kind of "Battery", "Signal", ["AF3","T7","Pz","T8","AF4","OVERALL"],"BatteryPercent" + cols.Add(header[0].ToString()); + cols.Add(header[1].ToString()); if (header.Count > 2 && header[2] is JArray channelList) { for (var i = 0; i < channelList.Count; i++) { - devCols.Add(channelList[i].ToString()); + cols.Add(channelList[i].ToString()); } } @@ -525,14 +499,13 @@ private static IReadOnlyList NormalizeHeaders(string streamName, JArray { for (var id = 3; id < header.Count; id++) { - devCols.Add(header[id].ToString()); + cols.Add(header[id].ToString()); } } - return devCols; + return cols; } - var cols = new List(header.Count); foreach (var token in header) { cols.Add(token.ToString()); @@ -540,18 +513,6 @@ private static IReadOnlyList NormalizeHeaders(string streamName, JArray return cols; } - private static bool IsNonChannelHeader(string header) - { - if (string.IsNullOrWhiteSpace(header)) - { - return true; - } - - return header.Equals("interpolated", StringComparison.OrdinalIgnoreCase) - || header.Equals("counter", StringComparison.OrdinalIgnoreCase) - || header.Equals("timestamp", StringComparison.OrdinalIgnoreCase); - } - private static JObject ToMappings(Dictionary mappings) { if (mappings == null || mappings.Count == 0) diff --git a/Src/SessionHandler.cs b/Src/SessionHandler.cs index 617b886..52e1edb 100644 --- a/Src/SessionHandler.cs +++ b/Src/SessionHandler.cs @@ -111,12 +111,13 @@ private void UpdateSessionOk(object sender, SessionEventArgs sessionInfo) /// Open a session with an EMOTIV headset. /// A application can open only one session at a time with a given headset. /// - public void Create(string headsetId, bool activeSession = false) + public void Create(string cortexToken, string headsetId, bool activeSession = false) { - if (!String.IsNullOrEmpty(headsetId)) + if (!String.IsNullOrEmpty(cortexToken) && + !String.IsNullOrEmpty(headsetId)) { string status = activeSession ? "active" : "open"; - _ctxClient.CreateSession(headsetId, status); + _ctxClient.CreateSession(headsetId, status); // temporary remove cortexToken parameter since update the function of CortexClient. The SessionHanlder will be removed in future updates. } else { UnityEngine.Debug.Log("CreateSession: Invalid parameters");