Skip to content

Commit 59a61ab

Browse files
fix PR comments by copilot
1 parent df544f8 commit 59a61ab

File tree

2 files changed

+37
-25
lines changed

2 files changed

+37
-25
lines changed

lib/MessageBuilder.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,35 @@ void MessageBuilder::checkMetadata() {
6060
MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) {
6161
checkMetadata();
6262
impl_->payload = SharedBuffer::copy((char*)data, size);
63+
impl_->metadata.clear_null_value();
6364
return *this;
6465
}
6566

6667
MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) {
6768
checkMetadata();
6869
impl_->payload = SharedBuffer::wrap((char*)data, size);
70+
impl_->metadata.clear_null_value();
6971
return *this;
7072
}
7173

7274
MessageBuilder& MessageBuilder::setContent(const std::string& data) {
7375
checkMetadata();
7476
impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length());
77+
impl_->metadata.clear_null_value();
7578
return *this;
7679
}
7780

7881
MessageBuilder& MessageBuilder::setContent(std::string&& data) {
7982
checkMetadata();
8083
impl_->payload = SharedBuffer::take(std::move(data));
84+
impl_->metadata.clear_null_value();
8185
return *this;
8286
}
8387

8488
MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
89+
checkMetadata();
8590
impl_->keyValuePtr = data.impl_;
91+
impl_->metadata.clear_null_value();
8692
return *this;
8793
}
8894

@@ -160,6 +166,7 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
160166
MessageBuilder& MessageBuilder::setNullValue() {
161167
checkMetadata();
162168
impl_->metadata.set_null_value(true);
169+
impl_->payload = SharedBuffer();
163170
return *this;
164171
}
165172

tests/ReaderTest.cc

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,12 +1055,9 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10551055
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
10561056

10571057
// Send messages with keys
1058-
ASSERT_EQ(ResultOk,
1059-
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
1060-
ASSERT_EQ(ResultOk,
1061-
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
1062-
ASSERT_EQ(ResultOk,
1063-
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
1058+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
1059+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
1060+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
10641061

10651062
// Send a tombstone (null value) for key2
10661063
auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build();
@@ -1074,11 +1071,16 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10741071

10751072
// Trigger compaction via admin API
10761073
{
1077-
std::string compactUrl =
1078-
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
1079-
std::to_string(time(nullptr)) + "/compaction";
1080-
// Note: Compaction is async, we just trigger it
1081-
makePutRequest(compactUrl, "");
1074+
// Build compaction URL directly from topicName to avoid mismatches
1075+
// topicName is "persistent://public/default/..." -> need "persistent/public/default/..."
1076+
std::string topicPath = topicName;
1077+
std::size_t schemePos = topicPath.find("://");
1078+
if (schemePos != std::string::npos) {
1079+
topicPath.erase(schemePos, 3);
1080+
}
1081+
std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction";
1082+
int res = makePutRequest(compactUrl, "");
1083+
ASSERT_TRUE(res == 204 || res == 409) << "Failed to trigger compaction, res: " << res;
10821084
}
10831085

10841086
// Create a reader with readCompacted enabled
@@ -1091,18 +1093,11 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
10911093
std::map<std::string, std::string> keyValues;
10921094
std::set<std::string> nullValueKeys;
10931095

1094-
for (int i = 0; i < 10; i++) {
1095-
bool hasMessageAvailable = false;
1096-
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
1097-
if (!hasMessageAvailable) {
1098-
break;
1099-
}
1100-
1096+
bool hasMessageAvailable = false;
1097+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
1098+
while (hasMessageAvailable) {
11011099
Message msg;
1102-
Result res = reader.readNext(msg, 3000);
1103-
if (res != ResultOk) {
1104-
break;
1105-
}
1100+
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
11061101

11071102
std::string key = msg.getPartitionKey();
11081103
if (msg.hasNullValue()) {
@@ -1112,11 +1107,21 @@ TEST(ReaderTest, testReadCompactedWithNullValue) {
11121107
keyValues[key] = msg.getDataAsString();
11131108
LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString());
11141109
}
1110+
1111+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
11151112
}
11161113

1117-
// Verify we received the tombstone for key2
1118-
// Note: Without compaction completing, we see all messages including the tombstone
1119-
// After compaction, we would only see the latest value for each key
1114+
// Verify we actually read messages (test should not silently succeed with no messages)
1115+
ASSERT_FALSE(keyValues.empty() && nullValueKeys.empty()) << "Expected to read at least one message";
1116+
1117+
// Verify concrete outcomes:
1118+
// - key1 should have the updated value "value1-updated"
1119+
// - key2 should either be a tombstone (null value) or absent after compaction
1120+
// - key3 should have value "value3"
1121+
ASSERT_TRUE(keyValues.count("key1") > 0) << "key1 should be present";
1122+
ASSERT_EQ(keyValues["key1"], "value1-updated") << "key1 should have the updated value";
1123+
ASSERT_TRUE(keyValues.count("key3") > 0) << "key3 should be present";
1124+
ASSERT_EQ(keyValues["key3"], "value3") << "key3 should have value3";
11201125
ASSERT_TRUE(nullValueKeys.count("key2") > 0 || keyValues.count("key2") == 0)
11211126
<< "key2 should either have a null value or be absent after compaction";
11221127

0 commit comments

Comments
 (0)