Skip to content

Commit d08cc31

Browse files
vaibhavtiwari33vtiwari5
andauthored
feat: Metadata propagation in source, source transformer, mapper, sink (#210)
Signed-off-by: vtiwari5 <vaibhav_tiwari1@intuit.com> Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com> Co-authored-by: vtiwari5 <vaibhav_tiwari1@intuit.com>
1 parent 3180f88 commit d08cc31

37 files changed

Lines changed: 1223 additions & 148 deletions

examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ public ResponseList processMessages(DatumIterator datumIterator) {
4444
log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders());
4545
if (writeToPrimarySink()) {
4646
log.info("Writing to onSuccess sink: {}", datum.getId());
47+
// Build the onSuccess message using builder for changing values, keys or userMetadata
4748
responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
48-
Message.builder()
49-
.value(String.format("Successfully wrote message with ID: %s",
50-
datum.getId()).getBytes())
51-
.build()));
49+
Message.fromDatum(datum)));
50+
// Or use on-success Message constructor:
51+
// responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
52+
// new Message(String.format("Successfully wrote message with ID: %s",
53+
// datum.getId()).getBytes(), datum.getKeys(), datum.getUserMetadata()))));
5254
} else {
5355
log.info("Writing to fallback sink: {}", datum.getId());
5456
responseListBuilder.addResponse(Response.responseFallback(datum.getId()));

src/main/java/io/numaproj/numaflow/mapper/Datum.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.numaproj.numaflow.mapper;
22

33

4+
import io.numaproj.numaflow.shared.SystemMetadata;
5+
import io.numaproj.numaflow.shared.UserMetadata;
6+
47
import java.time.Instant;
58
import java.util.Map;
69

@@ -36,4 +39,19 @@ public interface Datum {
3639
* @return returns the headers in the form of key value pair
3740
*/
3841
Map<String, String> getHeaders();
42+
43+
/**
44+
* method to get the metadata information added by the user.
45+
* It can be appended to and passed downstream.
46+
*
47+
* @return returns the UserMetadata object
48+
*/
49+
UserMetadata getUserMetadata();
50+
51+
/**
52+
* method to get the read-only system metadata information
53+
*
54+
* @return returns the SystemMetadata object
55+
*/
56+
SystemMetadata getSystemMetadata();
3957
}

src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.numaproj.numaflow.mapper;
22

33

4+
import io.numaproj.numaflow.shared.SystemMetadata;
5+
import io.numaproj.numaflow.shared.UserMetadata;
46
import lombok.AllArgsConstructor;
57

68
import java.time.Instant;
@@ -13,7 +15,8 @@ class HandlerDatum implements Datum {
1315
private Instant watermark;
1416
private Instant eventTime;
1517
private Map<String, String> headers;
16-
18+
private UserMetadata userMetadata;
19+
private SystemMetadata systemMetadata;
1720

1821
@Override
1922
public Instant getWatermark() {
@@ -35,4 +38,13 @@ public Map<String, String> getHeaders() {
3538
return this.headers;
3639
}
3740

41+
@Override
42+
public UserMetadata getUserMetadata() {
43+
return this.userMetadata;
44+
}
45+
46+
@Override
47+
public SystemMetadata getSystemMetadata() {
48+
return this.systemMetadata;
49+
}
3850
}

src/main/java/io/numaproj/numaflow/mapper/MapperActor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44
import akka.actor.Props;
55
import akka.japi.pf.ReceiveBuilder;
66
import com.google.protobuf.ByteString;
7+
import common.MetadataOuterClass;
78
import io.numaproj.numaflow.map.v1.MapOuterClass;
8-
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
9+
import io.numaproj.numaflow.shared.SystemMetadata;
10+
import io.numaproj.numaflow.shared.UserMetadata;
911

1012
import java.time.Instant;
1113
import java.util.ArrayList;
1214
import java.util.Arrays;
15+
import java.util.HashMap;
1316

1417
/**
1518
* Mapper actor that processes the map request. It invokes the mapper to process the request and
@@ -48,7 +51,9 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) {
4851
Instant.ofEpochSecond(
4952
mapRequest.getRequest().getEventTime().getSeconds(),
5053
mapRequest.getRequest().getEventTime().getNanos()),
51-
mapRequest.getRequest().getHeadersMap()
54+
mapRequest.getRequest().getHeadersMap(),
55+
new UserMetadata(mapRequest.getRequest().getMetadata()),
56+
new SystemMetadata(mapRequest.getRequest().getMetadata())
5257
);
5358
String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]);
5459
try {
@@ -89,6 +94,8 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
8994
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
9095
.addAllTags(message.getTags()
9196
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
97+
.setMetadata(message.getUserMetadata()
98+
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto())
9299
.build());
93100
});
94101
return responseBuilder.setId(ID).build();

src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import io.grpc.stub.StreamObserver;
88
import io.numaproj.numaflow.map.v1.MapGrpc;
99
import io.numaproj.numaflow.map.v1.MapOuterClass;
10+
import io.numaproj.numaflow.shared.SystemMetadata;
11+
import io.numaproj.numaflow.shared.UserMetadata;
1012
import lombok.Builder;
1113
import lombok.Getter;
1214
import lombok.extern.slf4j.Slf4j;
@@ -233,5 +235,7 @@ public static class TestDatum implements Datum {
233235
private final Instant eventTime;
234236
private final Instant watermark;
235237
private final Map<String, String> headers;
238+
private final UserMetadata userMetadata;
239+
private final SystemMetadata systemMetadata;
236240
}
237241
}
Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,36 @@
11
package io.numaproj.numaflow.mapper;
22

3+
import io.numaproj.numaflow.shared.UserMetadata;
34
import lombok.Getter;
45

6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.stream.Collectors;
9+
510
/** Message is used to wrap the data returned by Mapper. */
611
@Getter
712
public class Message {
813
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
914
private final String[] keys;
1015
private final byte[] value;
1116
private final String[] tags;
17+
private final UserMetadata userMetadata;
1218

1319
/**
14-
* used to create Message with value, keys and tags(used for conditional forwarding)
20+
* used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata
1521
*
1622
* @param value message value
1723
* @param keys message keys
1824
* @param tags message tags which will be used for conditional forwarding
25+
* @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex
1926
*/
20-
public Message(byte[] value, String[] keys, String[] tags) {
27+
public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) {
2128
// defensive copy - once the Message is created, the caller should not be able to modify it.
2229
this.keys = keys == null ? null : keys.clone();
2330
this.value = value == null ? null : value.clone();
2431
this.tags = tags == null ? null : tags.clone();
32+
// Copy the data using copy constructor to prevent mutation
33+
this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata);
2534
}
2635

2736
/**
@@ -30,7 +39,7 @@ public Message(byte[] value, String[] keys, String[] tags) {
3039
* @param value message value
3140
*/
3241
public Message(byte[] value) {
33-
this(value, null, null);
42+
this(value, null, null, null);
3443
}
3544

3645
/**
@@ -40,7 +49,18 @@ public Message(byte[] value) {
4049
* @param keys message keys
4150
*/
4251
public Message(byte[] value, String[] keys) {
43-
this(value, keys, null);
52+
this(value, keys, null, null);
53+
}
54+
55+
/**
56+
* used to create Message with value, keys and tags(used for conditional forwarding)
57+
*
58+
* @param value message value
59+
* @param keys message keys
60+
* @param tags message tags which will be used for conditional forwarding
61+
*/
62+
public Message(byte[] value, String[] keys, String[] tags) {
63+
this(value, keys, tags, null);
4464
}
4565

4666
/**
@@ -49,6 +69,6 @@ public Message(byte[] value, String[] keys) {
4969
* @return returns the Message which will be dropped
5070
*/
5171
public static Message toDrop() {
52-
return new Message(new byte[0], null, DROP_TAGS);
72+
return new Message(new byte[0], null, DROP_TAGS, null);
5373
}
5474
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.numaproj.numaflow.shared;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.stream.Collectors;
8+
9+
import common.MetadataOuterClass;
10+
import lombok.Getter;
11+
12+
/**
13+
* SystemMetadata is mapping of group name to key-value pairs
14+
* SystemMetadata wraps system-generated metadata groups per message.
15+
* It is read-only to UDFs
16+
*/
17+
public class SystemMetadata {
18+
private final Map<String, Map<String, byte[]>> data;
19+
20+
/**
21+
* Default constructor
22+
*/
23+
public SystemMetadata() {
24+
this.data = new HashMap<>();
25+
}
26+
27+
/**
28+
* Constructor from MetadataOuterClass.Metadata
29+
*
30+
* @param metadata is an instance of MetadataOuterClass.Metadata which contains system metadata
31+
*/
32+
public SystemMetadata(MetadataOuterClass.Metadata metadata) {
33+
if (metadata == null || metadata.getSysMetadataMap().isEmpty()) {
34+
this.data = new HashMap<>();
35+
return;
36+
}
37+
this.data = metadata.getSysMetadataMap()
38+
.entrySet().stream()
39+
// No null checks here as protobuf contract ensures that the data has no null values
40+
.collect(Collectors.toMap(
41+
Map.Entry::getKey,
42+
e -> new HashMap<>(e.getValue()
43+
.getKeyValueMap()
44+
.entrySet()
45+
.stream()
46+
.collect(Collectors.toMap(
47+
Map.Entry::getKey,
48+
e1 -> e1.getValue().toByteArray()
49+
))
50+
)
51+
));
52+
}
53+
54+
/**
55+
* Get the list of all groups present in the user metadata
56+
*
57+
* @return list of group names
58+
*/
59+
public List<String> getGroups() {
60+
return new ArrayList<>(this.data.keySet());
61+
}
62+
63+
/**
64+
* Get a list of key names within a given group
65+
*
66+
* @param group is the name of the group from which to get the key names
67+
* @return a list of key names within the group
68+
*/
69+
public List<String> getKeys(String group) {
70+
if (!this.data.containsKey(group)) {
71+
return new ArrayList<>();
72+
}
73+
return new ArrayList<>(this.data.get(group).keySet());
74+
}
75+
76+
/**
77+
* Get the value of a key in a group
78+
*
79+
* @param group Name of the group which contains the key holding required value
80+
* @param key Name of the key in the group for which value is required
81+
* @return Value of the key in the group or null if the group/key is not present
82+
*/
83+
public byte[] getValue(String group, String key) {
84+
Map<String, byte[]> groupData = this.data.get(group);
85+
if (groupData == null) {
86+
return null;
87+
}
88+
byte[] value = groupData.get(key);
89+
return value == null ? null : value.clone();
90+
}
91+
}

0 commit comments

Comments
 (0)