@@ -1055,12 +1055,9 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10551055 ASSERT_EQ (ResultOk, client.createProducer (topicName, producer));
10561056
10571057 // Send messages with keys
1058- ASSERT_EQ (ResultOk,
1059- producer.send (MessageBuilder ().setPartitionKey (" key1" ).setContent (" value1" ).build ()));
1060- ASSERT_EQ (ResultOk,
1061- producer.send (MessageBuilder ().setPartitionKey (" key2" ).setContent (" value2" ).build ()));
1062- ASSERT_EQ (ResultOk,
1063- producer.send (MessageBuilder ().setPartitionKey (" key3" ).setContent (" value3" ).build ()));
1058+ ASSERT_EQ (ResultOk, producer.send (MessageBuilder ().setPartitionKey (" key1" ).setContent (" value1" ).build ()));
1059+ ASSERT_EQ (ResultOk, producer.send (MessageBuilder ().setPartitionKey (" key2" ).setContent (" value2" ).build ()));
1060+ ASSERT_EQ (ResultOk, producer.send (MessageBuilder ().setPartitionKey (" key3" ).setContent (" value3" ).build ()));
10641061
10651062 // Send a tombstone (null value) for key2
10661063 auto tombstone = MessageBuilder ().setPartitionKey (" key2" ).setNullValue ().build ();
@@ -1075,10 +1072,9 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10751072 // Trigger compaction via admin API
10761073 {
10771074 std::string compactUrl =
1078- adminUrl + " admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
1079- std::to_string (time (nullptr )) + " /compaction" ;
1080- // Note: Compaction is async, we just trigger it
1081- makePutRequest (compactUrl, " " );
1075+ adminUrl + " admin/v2/" + topicName.substr (strlen (" persistent://" )) + " /compaction" ;
1076+ int res = makePutRequest (compactUrl, " " );
1077+ ASSERT_TRUE (res == 204 || res == 409 ) << " Failed to trigger compaction, res: " << res;
10821078 }
10831079
10841080 // Create a reader with readCompacted enabled
@@ -1091,18 +1087,11 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10911087 std::map<std::string, std::string> keyValues;
10921088 std::set<std::string> nullValueKeys;
10931089
1094- for (int i = 0 ; i < 10 ; i++) {
1095- bool hasMessageAvailable = false ;
1096- ASSERT_EQ (ResultOk, reader.hasMessageAvailable (hasMessageAvailable));
1097- if (!hasMessageAvailable) {
1098- break ;
1099- }
1100-
1090+ bool hasMessageAvailable = false ;
1091+ ASSERT_EQ (ResultOk, reader.hasMessageAvailable (hasMessageAvailable));
1092+ while (hasMessageAvailable) {
11011093 Message msg;
1102- Result res = reader.readNext (msg, 3000 );
1103- if (res != ResultOk) {
1104- break ;
1105- }
1094+ ASSERT_EQ (ResultOk, reader.readNext (msg, 3000 ));
11061095
11071096 std::string key = msg.getPartitionKey ();
11081097 if (msg.hasNullValue ()) {
@@ -1112,6 +1101,8 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
11121101 keyValues[key] = msg.getDataAsString ();
11131102 LOG_INFO (" Received message for key: " << key << " , value: " << msg.getDataAsString ());
11141103 }
1104+
1105+ ASSERT_EQ (ResultOk, reader.hasMessageAvailable (hasMessageAvailable));
11151106 }
11161107
11171108 // Verify we received the tombstone for key2
0 commit comments