77using System . Collections . Concurrent ;
88using System . Collections . Generic ;
99using System . Collections . ObjectModel ;
10+ using System . ComponentModel ;
1011using System . Data ;
1112using System . Linq ;
1213using System . Runtime . CompilerServices ;
@@ -62,6 +63,8 @@ public abstract class AbstractInstance : IInstance
6263 private readonly ConcurrentDictionary < Tuple < PortAddress , IPv4Address > , DMXReceiveBag > receivedDMXBuffer = new ConcurrentDictionary < Tuple < PortAddress , IPv4Address > , DMXReceiveBag > ( ) ;
6364 private readonly ConcurrentDictionary < RDM_TransactionID , RDMMessage > artRDMdeBumbReceive = new ( ) ;
6465
66+ private readonly ConcurrentDictionary < IPv4Address , GatewayRDMFiFoQueue > artRDMgatewayQueues = new ( ) ;
67+
6568 private readonly SemaphoreSlim pollReplyProcessSemaphoreSlim = new SemaphoreSlim ( 1 ) ;
6669
6770 private readonly struct RDM_TransactionID : IEquatable < RDM_TransactionID >
@@ -735,6 +738,7 @@ public async Task SendArtRDM(RDMMessage rdmMessage)
735738 List < Tuple < IPv4Address , PortAddress > > tuples = new List < Tuple < IPv4Address , PortAddress > > ( ) ;
736739 foreach ( var port in ports )
737740 {
741+ RDMUID_ReceivedBag bag = port . KnownResponderRDMUIDs . FirstOrDefault ( b => b . Uid == rdmMessage . DestUID ) ;
738742 PortAddress pa = default ;
739743 if ( ! rdmMessage . Command . HasFlag ( ERDM_Command . RESPONSE ) && port . OutputPortAddress . HasValue )
740744 pa = port . OutputPortAddress . Value ;
@@ -744,17 +748,36 @@ public async Task SendArtRDM(RDMMessage rdmMessage)
744748 if ( tuples . Any ( t => t . Item2 == pa && t . Item1 == port . IpAddress ) ) // skip multiple send the same Package to the same IP
745749 continue ;
746750 tuples . Add ( new Tuple < IPv4Address , PortAddress > ( port . IpAddress , pa ) ) ;
747- ArtRDM artRDM = new ArtRDM ( pa , rdmMessage ) ;
748- tasks . Add ( Task . Run ( async ( ) => await TrySendPacket ( artRDM , port . IpAddress ) ) ) ;
751+ ArtRDM artRDM = new ArtRDM ( bag ? . PortAddress ?? pa , rdmMessage ) ;
752+ tasks . Add ( Task . Run ( async ( ) =>
753+ {
754+ await handleGatewayQueue ( artRDM , port . IpAddress ) ;
755+ await TrySendPacket ( artRDM , port . IpAddress ) ;
756+ } ) ) ;
749757 }
750758 await Task . WhenAll ( tasks ) ;
751759 }
752760 }
753761 private async Task sendArtRDM ( RDMMessage rdmMessage , PortAddress portAddress , IPv4Address ip )
754762 {
755763 ArtRDM artRDM = new ArtRDM ( portAddress , rdmMessage ) ;
764+ await handleGatewayQueue ( artRDM , ip ) ;
756765 await TrySendPacket ( artRDM , ip ) ;
757766 }
767+
768+ private async Task handleGatewayQueue ( ArtRDM artRDM , IPv4Address ip )
769+ {
770+ await Task . CompletedTask ;
771+ //if (artRDMgatewayQueues.TryGetValue(ip, out GatewayRDMFiFoQueue gatewayRDMFiFoQueue))
772+ // await gatewayRDMFiFoQueue.AwaitSlot();
773+ //else
774+ //{
775+ // gatewayRDMFiFoQueue = new GatewayRDMFiFoQueue(ip);
776+ // artRDMgatewayQueues.TryAdd(ip, gatewayRDMFiFoQueue);
777+ //}
778+ //gatewayRDMFiFoQueue.AddToQueue(artRDM);
779+
780+ }
758781 public async Task SendArtNzs ( ArtNzs artNzs , IPv4Address ip )
759782 {
760783 await TrySendPacket ( artNzs , ip ) ;
@@ -982,7 +1005,7 @@ protected void processArtTodData(ArtTodData artTodData, IPv4Address source)
9821005 . ToList ( ) ;
9831006
9841007 foreach ( var port in ports )
985- port . AddResponderRdmUIDs ( artTodData . Uids ) ;
1008+ port . AddResponderRdmUIDs ( artTodData . PortAddress , artTodData . BindIndex , artTodData . Uids ) ;
9861009 }
9871010 }
9881011 catch ( Exception ex ) { Logger . LogError ( ex ) ; }
@@ -994,13 +1017,13 @@ protected void processArtTodData(ArtTodData artTodData, IPv4Address source)
9941017 . ToList ( ) ;
9951018
9961019 foreach ( var config in configs )
997- config . AddDiscoveredRdmUIDs ( artTodData . Uids ) ;
1020+ config . AddDiscoveredRdmUIDs ( artTodData . PortAddress , artTodData . BindIndex , artTodData . Uids ) ;
9981021 }
9991022 catch ( Exception ex ) { Logger . LogError ( ex ) ; }
10001023
10011024 try
10021025 {
1003- AddRdmUIDs ( artTodData . Uids ) ;
1026+ AddRdmUIDs ( artTodData . PortAddress , artTodData . BindIndex , artTodData . Uids ) ;
10041027 }
10051028 catch ( Exception ex ) { Logger . LogError ( ex ) ; }
10061029 }
@@ -1011,6 +1034,10 @@ private async void processArtRDM(ArtRDM artRDM, IPv4Address source)
10111034 ControllerRDMUID_Bag bag = null ;
10121035 try
10131036 {
1037+ if ( artRDMgatewayQueues . TryGetValue ( source , out GatewayRDMFiFoQueue gatewayRDMFiFoQueue ) )
1038+ {
1039+ gatewayRDMFiFoQueue . Update ( artRDM ) ;
1040+ }
10141041 if ( ! artRDM . RDMMessage . Command . HasFlag ( ERDM_Command . RESPONSE ) )
10151042 {
10161043 if ( knownControllerRDMUIDs . TryGetValue ( artRDM . Source , out bag ) )
@@ -1223,18 +1250,18 @@ private NodeStatus getOwnNodeStatus()
12231250
12241251 return nodeStatus ;
12251252 }
1226- private void AddRdmUIDs ( params UID [ ] rdmuids )
1253+ private void AddRdmUIDs ( PortAddress portAddress , byte bindIndex , params UID [ ] rdmuids )
12271254 {
12281255 if ( rdmuids . Length == 0 )
12291256 return ;
12301257
12311258 foreach ( UID rdmuid in rdmuids )
12321259 {
12331260 if ( knownRDMUIDs . TryGetValue ( rdmuid , out RDMUID_ReceivedBag bag ) )
1234- bag . Seen ( ) ;
1261+ bag . Seen ( portAddress , bindIndex ) ;
12351262 else
12361263 {
1237- bag = new RDMUID_ReceivedBag ( rdmuid ) ;
1264+ bag = new RDMUID_ReceivedBag ( portAddress , bindIndex , rdmuid ) ;
12381265 if ( knownRDMUIDs . TryAdd ( rdmuid , bag ) )
12391266 RDMUIDReceived ? . InvokeFailSafe ( this , bag ) ;
12401267 }
@@ -1377,4 +1404,86 @@ protected virtual void Dispose()
13771404 {
13781405
13791406 }
1407+
1408+ public class GatewayRDMFiFoQueue : INotifyPropertyChanged
1409+ {
1410+ private int available = 0 ;
1411+ private byte max = 0 ;
1412+ private bool implemented ;
1413+
1414+ public readonly IPv4Address IpAddress ;
1415+
1416+ private readonly List < ArtRDM > bag = new List < ArtRDM > ( ) ;
1417+
1418+ public int Available
1419+ {
1420+ get { return available ; }
1421+ private set
1422+ {
1423+ if ( available == value )
1424+ return ;
1425+
1426+ available = value ;
1427+ PropertyChanged ? . InvokeFailSafe ( this , new PropertyChangedEventArgs ( nameof ( Available ) ) ) ;
1428+ }
1429+ }
1430+ public byte Max
1431+ {
1432+ get { return max ; }
1433+ }
1434+ public bool Implemented
1435+ {
1436+ get { return implemented ; }
1437+ }
1438+
1439+ public event PropertyChangedEventHandler PropertyChanged ;
1440+
1441+ public GatewayRDMFiFoQueue ( IPv4Address ipAddress )
1442+ {
1443+ IpAddress = ipAddress ;
1444+ }
1445+
1446+ internal void Update ( ArtRDM response )
1447+ {
1448+ if ( ! response . RDMMessage . Command . HasFlag ( ERDM_Command . RESPONSE ) )
1449+ return ;
1450+
1451+ max = response . FifoMax ;
1452+ implemented = max != 0 ;
1453+ if ( implemented )
1454+ {
1455+ Available = response . FifoAvailable ;
1456+ }
1457+ else
1458+ {
1459+ if ( bag . FirstOrDefault ( b => b . PortAddress == response . PortAddress && b . RDMMessage . DestUID == response . RDMMessage . SourceUID ) is ArtRDM request )
1460+ {
1461+ bag . Remove ( request ) ;
1462+ Available = 1 ;
1463+ }
1464+ }
1465+
1466+ }
1467+
1468+ internal void AddToQueue ( ArtRDM request )
1469+ {
1470+ bag . Add ( request ) ;
1471+ Available -- ;
1472+
1473+ Task . Run ( async ( ) =>
1474+ {
1475+ await Task . Delay ( 10000 ) ;
1476+ bag . Remove ( request ) ;
1477+ Available ++ ;
1478+ } ) ;
1479+ }
1480+ internal async Task AwaitSlot ( )
1481+ {
1482+ Task . Run ( async ( ) =>
1483+ {
1484+ while ( Available <= 0 )
1485+ await Task . Delay ( 30 ) ;
1486+ } ) . Wait ( 3000 ) ;
1487+ }
1488+ }
13801489}
0 commit comments