From 6a4207827094b8b9c2f4622e56d6d59fefecf572 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 24 Mar 2026 10:41:19 -0300 Subject: [PATCH 1/5] beholder - batch emit events --- pkg/beholder/chip_ingress_emitter.go | 34 ++++++++++++++--------- pkg/beholder/chip_ingress_emitter_test.go | 2 +- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 1ba2b236cd..701dab08bd 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -26,23 +26,34 @@ func (c *ChipIngressEmitter) Close() error { } func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return c.BatchEmit(ctx, Message{ + Body: body, + Attrs: ExtractAttributes(attrKVs...), + }) +} - sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) - if err != nil { - return err - } +func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + events := make([]chipingress.CloudEvent, len(messages)) + for i, msg := range messages { + sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs) + if err != nil { + return err + } - event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...)) - if err != nil { - return err + event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs) + if err != nil { + return err + } + + events[i] = event } - eventPb, err := chipingress.EventToProto(event) + eventPb, err := chipingress.EventsToBatch(events) if err != nil { return fmt.Errorf("failed to convert event to proto: %w", err) } - _, err = c.client.Publish(ctx, eventPb) + _, err = c.client.PublishBatch(ctx, eventPb) if err != nil { return err } @@ -51,10 +62,7 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a } // ExtractSourceAndType extracts source domain and entity from the attributes -func ExtractSourceAndType(attrKVs ...any) (string, string, error) { - - attributes := newAttributes(attrKVs...) - +func ExtractSourceAndType(attributes Attributes) (string, string, error) { var sourceDomain string var entityType string diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 9d41e7d5d7..83639f5ae8 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -190,7 +190,7 @@ func TestExtractSourceAndType(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - domain, entity, err := beholder.ExtractSourceAndType(tt.attrs...) + domain, entity, err := beholder.ExtractSourceAndType(beholder.ExtractAttributes(tt.attrs)) if tt.wantErr { if err == nil { From a1c2789dbfed937455fd68038d112060df567b88 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 24 Mar 2026 10:50:18 -0300 Subject: [PATCH 2/5] extend interface --- pkg/beholder/chip_ingress_emitter.go | 5 +---- pkg/beholder/client.go | 3 ++- pkg/beholder/message_emitter.go | 13 +++++++++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 701dab08bd..501a089488 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -26,10 +26,7 @@ func (c *ChipIngressEmitter) Close() error { } func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { - return c.BatchEmit(ctx, Message{ - Body: body, - Attrs: ExtractAttributes(attrKVs...), - }) + return c.BatchEmit(ctx, NewMessage(body, attrKVs...)) } func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages ...Message) error { diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 328d877c49..1e94db05b3 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -31,8 +31,9 @@ import ( const defaultGRPCCompressor = "gzip" type Emitter interface { - // Sends message with bytes and attributes to OTel Collector + // Emit Sends message with bytes and attributes to OTel Collector Emit(ctx context.Context, body []byte, attrKVs ...any) error + BatchEmit(ctx context.Context, messages ...Message) error io.Closer } diff --git a/pkg/beholder/message_emitter.go b/pkg/beholder/message_emitter.go index 0f6400d532..f13eddbe44 100644 --- a/pkg/beholder/message_emitter.go +++ b/pkg/beholder/message_emitter.go @@ -22,10 +22,15 @@ func (e messageEmitter) Close() error { return nil } // Emits logs the message, but does not wait for the message to be processed. // Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { - message := NewMessage(body, attrKVs...) - if err := message.Validate(); err != nil { - return err + return e.BatchEmit(ctx, NewMessage(body, attrKVs...)) +} + +func (e messageEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + for _, message := range messages { + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) } - e.messageLogger.Emit(ctx, message.OtelRecord()) return nil } From 4b8a8416f39e965813ae636891cb7dc974a44f51 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 24 Mar 2026 11:30:00 -0300 Subject: [PATCH 3/5] fix emitters --- pkg/beholder/dual_source_emitter.go | 7 +++++-- pkg/beholder/noop.go | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index 5167efaece..865b83cedf 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -56,9 +56,12 @@ func (d *DualSourceEmitter) Close() error { } func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return d.BatchEmit(ctx, NewMessage(body, attrKVs...)) +} +func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages ...Message) error { // Emit via OTLP first - if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil { + if err := d.otelCollectorEmitter.BatchEmit(ctx, messages...); err != nil { return err } @@ -72,7 +75,7 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an ctx, cancel = d.stopCh.Ctx(ctx) defer cancel() - if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + if err := d.chipIngressEmitter.BatchEmit(ctx, messages...); err != nil { // If the chip ingress emitter fails, we ONLY log the error // because we still want to send the data to the OTLP collector and not cause disruption d.log.Infof("failed to emit to chip ingress: %v", err) diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 67580ec15c..3e27c98beb 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -108,6 +108,11 @@ func (e noopMessageEmitter) Close() error { return nil } func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { return nil } + +func (noopMessageEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + return nil +} + func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error { return nil } From c7be23de7d0850d0c881b33e22d3175eb56dc11a Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Thu, 26 Mar 2026 11:53:59 -0300 Subject: [PATCH 4/5] add error field to chip response --- pkg/chipingress/pb/chip_common.pb.go | 18 ++++---- pkg/chipingress/pb/chip_ingress.pb.go | 62 ++++++++++++++++----------- pkg/chipingress/pb/chip_ingress.proto | 2 + 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/pkg/chipingress/pb/chip_common.pb.go b/pkg/chipingress/pb/chip_common.pb.go index 33be5ae2cb..3308bf998a 100644 --- a/pkg/chipingress/pb/chip_common.pb.go +++ b/pkg/chipingress/pb/chip_common.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc v5.29.3 // source: pb/chip_common.proto @@ -77,8 +77,8 @@ type PathType int32 const ( PathType_S3 PathType = 0 // S3 storage PathType_LOCAL PathType = 1 // Local file system - PathType_GITHUB PathType = 3 // GitHub storage - PathType_OTHER PathType = 4 // Other storage types + PathType_GITHUB PathType = 2 // GitHub storage + PathType_OTHER PathType = 3 // Other storage types ) // Enum value maps for PathType. @@ -86,14 +86,14 @@ var ( PathType_name = map[int32]string{ 0: "S3", 1: "LOCAL", - 3: "GITHUB", - 4: "OTHER", + 2: "GITHUB", + 3: "OTHER", } PathType_value = map[string]int32{ "S3": 0, "LOCAL": 1, - "GITHUB": 3, - "OTHER": 4, + "GITHUB": 2, + "OTHER": 3, } ) @@ -515,8 +515,8 @@ const file_pb_chip_common_proto_rawDesc = "" + "\x02S3\x10\x00\x12\t\n" + "\x05LOCAL\x10\x01\x12\n" + "\n" + - "\x06GITHUB\x10\x03\x12\t\n" + - "\x05OTHER\x10\x04B\x06Z\x04./pbb\x06proto3" + "\x06GITHUB\x10\x02\x12\t\n" + + "\x05OTHER\x10\x03B\x06Z\x04./pbb\x06proto3" var ( file_pb_chip_common_proto_rawDescOnce sync.Once diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 7117f287c6..1c86f47db4 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc v5.29.3 // source: pb/chip_ingress.proto @@ -8,6 +8,7 @@ package pb import ( pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + status "google.golang.org/genproto/googleapis/rpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -114,6 +115,7 @@ func (x *PublishResponse) GetResults() []*PublishResult { type PublishResult struct { state protoimpl.MessageState `protogen:"open.v1"` EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + Error *status.Status `protobuf:"bytes,2,opt,name=error,proto3,oneof" json:"error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -155,6 +157,13 @@ func (x *PublishResult) GetEventId() string { return "" } +func (x *PublishResult) GetError() *status.Status { + if x != nil { + return x.Error + } + return nil +} + // EmptyRequest is just an empty request type EmptyRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -428,13 +437,15 @@ var File_pb_chip_ingress_proto protoreflect.FileDescriptor const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\x1a\x17google/rpc/status.proto\"H\n" + "\x0fCloudEventBatch\x125\n" + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + "\x0fPublishResponse\x127\n" + - "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\")\n" + + "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\"b\n" + "\rPublishResult\x12\x18\n" + - "\aeventId\x18\x01 \x01(\tR\aeventId\"\x0e\n" + + "\aeventId\x18\x01 \x01(\tR\aeventId\x12-\n" + + "\x05error\x18\x02 \x01(\v2\x12.google.rpc.StatusH\x00R\x05error\x88\x01\x01B\b\n" + + "\x06_error\"\x0e\n" + "\fEmptyRequest\"(\n" + "\fPingResponse\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"J\n" + @@ -480,30 +491,32 @@ var file_pb_chip_ingress_proto_goTypes = []any{ (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent - (*Schema)(nil), // 10: chip_common.Schema - (*RegisteredSchema)(nil), // 11: chip_common.RegisteredSchema + (*status.Status)(nil), // 10: google.rpc.Status + (*Schema)(nil), // 11: chip_common.Schema + (*RegisteredSchema)(nil), // 12: chip_common.RegisteredSchema } var file_pb_chip_ingress_proto_depIdxs = []int32{ 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 9, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 10, // 3: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema - 11, // 4: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema - 9, // 5: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 6: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 7: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 8: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 7, // 9: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest - 1, // 10: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 11: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 12: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 13: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 8, // 14: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse - 10, // [10:15] is the sub-list for method output_type - 5, // [5:10] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 10, // 2: chipingress.pb.PublishResult.error:type_name -> google.rpc.Status + 9, // 3: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 11, // 4: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 12, // 5: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 9, // 6: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 0, // 7: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 3, // 8: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 5, // 9: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 7, // 10: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 1, // 11: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 1, // 12: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 4, // 13: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 6, // 14: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 8, // 15: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 11, // [11:16] is the sub-list for method output_type + 6, // [6:11] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_pb_chip_ingress_proto_init() } @@ -512,6 +525,7 @@ func file_pb_chip_ingress_proto_init() { return } file_pb_chip_common_proto_init() + file_pb_chip_ingress_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index 1675f5d9c6..b8e6fd49b9 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto"; import "pb/chip_common.proto"; +import "google/rpc/status.proto"; package chipingress.pb; @@ -39,6 +40,7 @@ message PublishResponse { message PublishResult { string eventId = 1; + optional google.rpc.Status error = 2; } // EmptyRequest is just an empty request From 8a60daea606e3a4eccca45a0e42afff647dc37a3 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Thu, 26 Mar 2026 12:10:42 -0300 Subject: [PATCH 5/5] publish options --- pkg/chipingress/pb/chip_ingress.pb.go | 203 +++++++++++++++++--------- pkg/chipingress/pb/chip_ingress.proto | 12 +- pkg/chipingress/pb/generate.go | 15 ++ 3 files changed, 161 insertions(+), 69 deletions(-) create mode 100644 pkg/chipingress/pb/generate.go diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 1c86f47db4..eefbc293c4 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -23,17 +23,67 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// PublishOptions controls optional behaviour of PublishBatch. +type PublishOptions struct { + state protoimpl.MessageState `protogen:"open.v1"` + // allOrNothing makes the batch atomic: either all events are committed or none are. + // When unset, the server defaults to true (preserving the original atomic behaviour). + // Set to false to allow partial success, where individual results carry per-event errors. + AllOrNothing *bool `protobuf:"varint,1,opt,name=allOrNothing,proto3,oneof" json:"allOrNothing,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishOptions) Reset() { + *x = PublishOptions{} + mi := &file_pb_chip_ingress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishOptions) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishOptions) ProtoMessage() {} + +func (x *PublishOptions) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishOptions.ProtoReflect.Descriptor instead. +func (*PublishOptions) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishOptions) GetAllOrNothing() bool { + if x != nil && x.AllOrNothing != nil { + return *x.AllOrNothing + } + return false +} + // CloudEventBatch is used to send many ChipIngress type CloudEventBatch struct { - state protoimpl.MessageState `protogen:"open.v1"` - Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + // options are optional publish settings. When omitted, allOrNothing defaults to true. + Options *PublishOptions `protobuf:"bytes,2,opt,name=options,proto3,oneof" json:"options,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *CloudEventBatch) Reset() { *x = CloudEventBatch{} - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -45,7 +95,7 @@ func (x *CloudEventBatch) String() string { func (*CloudEventBatch) ProtoMessage() {} func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -58,7 +108,7 @@ func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use CloudEventBatch.ProtoReflect.Descriptor instead. func (*CloudEventBatch) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} } func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { @@ -68,6 +118,13 @@ func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { return nil } +func (x *CloudEventBatch) GetOptions() *PublishOptions { + if x != nil { + return x.Options + } + return nil +} + type PublishResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Results []*PublishResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` @@ -77,7 +134,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -89,7 +146,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -102,7 +159,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} } func (x *PublishResponse) GetResults() []*PublishResult { @@ -122,7 +179,7 @@ type PublishResult struct { func (x *PublishResult) Reset() { *x = PublishResult{} - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -134,7 +191,7 @@ func (x *PublishResult) String() string { func (*PublishResult) ProtoMessage() {} func (x *PublishResult) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -147,7 +204,7 @@ func (x *PublishResult) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResult.ProtoReflect.Descriptor instead. func (*PublishResult) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} } func (x *PublishResult) GetEventId() string { @@ -173,7 +230,7 @@ type EmptyRequest struct { func (x *EmptyRequest) Reset() { *x = EmptyRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -185,7 +242,7 @@ func (x *EmptyRequest) String() string { func (*EmptyRequest) ProtoMessage() {} func (x *EmptyRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -198,7 +255,7 @@ func (x *EmptyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use EmptyRequest.ProtoReflect.Descriptor instead. func (*EmptyRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} } // PingResponse responds to pings @@ -211,7 +268,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -223,7 +280,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -236,7 +293,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} } func (x *PingResponse) GetMessage() string { @@ -256,7 +313,7 @@ type StreamEventsRequest struct { func (x *StreamEventsRequest) Reset() { *x = StreamEventsRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -268,7 +325,7 @@ func (x *StreamEventsRequest) String() string { func (*StreamEventsRequest) ProtoMessage() {} func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -281,7 +338,7 @@ func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. func (*StreamEventsRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} } func (x *StreamEventsRequest) GetEvent() *pb.CloudEvent { @@ -301,7 +358,7 @@ type StreamEventsResponse struct { func (x *StreamEventsResponse) Reset() { *x = StreamEventsResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -313,7 +370,7 @@ func (x *StreamEventsResponse) String() string { func (*StreamEventsResponse) ProtoMessage() {} func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -326,7 +383,7 @@ func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. func (*StreamEventsResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} } func (x *StreamEventsResponse) GetEventId() string { @@ -353,7 +410,7 @@ type RegisterSchemaRequest struct { func (x *RegisterSchemaRequest) Reset() { *x = RegisterSchemaRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -365,7 +422,7 @@ func (x *RegisterSchemaRequest) String() string { func (*RegisterSchemaRequest) ProtoMessage() {} func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -378,7 +435,7 @@ func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaRequest.ProtoReflect.Descriptor instead. func (*RegisterSchemaRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} } func (x *RegisterSchemaRequest) GetSchemas() []*Schema { @@ -398,7 +455,7 @@ type RegisterSchemaResponse struct { func (x *RegisterSchemaResponse) Reset() { *x = RegisterSchemaResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -410,7 +467,7 @@ func (x *RegisterSchemaResponse) String() string { func (*RegisterSchemaResponse) ProtoMessage() {} func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -423,7 +480,7 @@ func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaResponse.ProtoReflect.Descriptor instead. func (*RegisterSchemaResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{9} } func (x *RegisterSchemaResponse) GetRegistered() []*RegisteredSchema { @@ -437,9 +494,15 @@ var File_pb_chip_ingress_proto protoreflect.FileDescriptor const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\x1a\x17google/rpc/status.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\x1a\x17google/rpc/status.proto\"J\n" + + "\x0ePublishOptions\x12'\n" + + "\fallOrNothing\x18\x01 \x01(\bH\x00R\fallOrNothing\x88\x01\x01B\x0f\n" + + "\r_allOrNothing\"\x93\x01\n" + "\x0fCloudEventBatch\x125\n" + - "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\x12=\n" + + "\aoptions\x18\x02 \x01(\v2\x1e.chipingress.pb.PublishOptionsH\x00R\aoptions\x88\x01\x01B\n" + + "\n" + + "\b_options\"J\n" + "\x0fPublishResponse\x127\n" + "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\"b\n" + "\rPublishResult\x12\x18\n" + @@ -479,44 +542,46 @@ func file_pb_chip_ingress_proto_rawDescGZIP() []byte { return file_pb_chip_ingress_proto_rawDescData } -var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_pb_chip_ingress_proto_goTypes = []any{ - (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch - (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse - (*PublishResult)(nil), // 2: chipingress.pb.PublishResult - (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest - (*PingResponse)(nil), // 4: chipingress.pb.PingResponse - (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest - (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse - (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest - (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse - (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent - (*status.Status)(nil), // 10: google.rpc.Status - (*Schema)(nil), // 11: chip_common.Schema - (*RegisteredSchema)(nil), // 12: chip_common.RegisteredSchema + (*PublishOptions)(nil), // 0: chipingress.pb.PublishOptions + (*CloudEventBatch)(nil), // 1: chipingress.pb.CloudEventBatch + (*PublishResponse)(nil), // 2: chipingress.pb.PublishResponse + (*PublishResult)(nil), // 3: chipingress.pb.PublishResult + (*EmptyRequest)(nil), // 4: chipingress.pb.EmptyRequest + (*PingResponse)(nil), // 5: chipingress.pb.PingResponse + (*StreamEventsRequest)(nil), // 6: chipingress.pb.StreamEventsRequest + (*StreamEventsResponse)(nil), // 7: chipingress.pb.StreamEventsResponse + (*RegisterSchemaRequest)(nil), // 8: chipingress.pb.RegisterSchemaRequest + (*RegisterSchemaResponse)(nil), // 9: chipingress.pb.RegisterSchemaResponse + (*pb.CloudEvent)(nil), // 10: io.cloudevents.v1.CloudEvent + (*status.Status)(nil), // 11: google.rpc.Status + (*Schema)(nil), // 12: chip_common.Schema + (*RegisteredSchema)(nil), // 13: chip_common.RegisteredSchema } var file_pb_chip_ingress_proto_depIdxs = []int32{ - 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent - 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 10, // 2: chipingress.pb.PublishResult.error:type_name -> google.rpc.Status - 9, // 3: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 11, // 4: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema - 12, // 5: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema - 9, // 6: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 7: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 8: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 9: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 7, // 10: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest - 1, // 11: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 12: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 13: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 14: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 8, // 15: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse - 11, // [11:16] is the sub-list for method output_type - 6, // [6:11] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 10, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent + 0, // 1: chipingress.pb.CloudEventBatch.options:type_name -> chipingress.pb.PublishOptions + 3, // 2: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult + 11, // 3: chipingress.pb.PublishResult.error:type_name -> google.rpc.Status + 10, // 4: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 12, // 5: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 13, // 6: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 10, // 7: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 1, // 8: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 4, // 9: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 6, // 10: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 8, // 11: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 2, // 12: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 2, // 13: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 5, // 14: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 7, // 15: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 9, // 16: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 12, // [12:17] is the sub-list for method output_type + 7, // [7:12] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_pb_chip_ingress_proto_init() } @@ -525,14 +590,16 @@ func file_pb_chip_ingress_proto_init() { return } file_pb_chip_common_proto_init() - file_pb_chip_ingress_proto_msgTypes[2].OneofWrappers = []any{} + file_pb_chip_ingress_proto_msgTypes[0].OneofWrappers = []any{} + file_pb_chip_ingress_proto_msgTypes[1].OneofWrappers = []any{} + file_pb_chip_ingress_proto_msgTypes[3].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc)), NumEnums: 0, - NumMessages: 9, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index b8e6fd49b9..aa19c46947 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -29,9 +29,19 @@ service ChipIngress { rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse) {} } +// PublishOptions controls optional behaviour of PublishBatch. +message PublishOptions { + // allOrNothing makes the batch atomic: either all events are committed or none are. + // When unset, the server defaults to true (preserving the original atomic behaviour). + // Set to false to allow partial success, where individual results carry per-event errors. + optional bool allOrNothing = 1; +} + // CloudEventBatch is used to send many ChipIngress -message CloudEventBatch{ +message CloudEventBatch { repeated io.cloudevents.v1.CloudEvent events = 1; + // options are optional publish settings. When omitted, allOrNothing defaults to true. + optional PublishOptions options = 2; } message PublishResponse { diff --git a/pkg/chipingress/pb/generate.go b/pkg/chipingress/pb/generate.go new file mode 100644 index 0000000000..412ac85da9 --- /dev/null +++ b/pkg/chipingress/pb/generate.go @@ -0,0 +1,15 @@ +// To regenerate the Go bindings, ensure the following are installed: +// make install-protoc (installs protoc v5.29.3 + protoc-gen-go + protoc-gen-go-grpc) +// +// Then run from pkg/chipingress/: +// +// GOMODCACHE=$(go env GOMODCACHE) +// PROTO_TMP=$(mktemp -d) +// CLOUDEVENTS=$(find "$GOMODCACHE/github.com/cloudevents" -name "cloudevent.proto" | head -1 | xargs dirname | xargs dirname) +// mkdir -p "$PROTO_TMP/github.com/cloudevents/sdk-go/binding/format/protobuf" +// ln -s "$CLOUDEVENTS" "$PROTO_TMP/github.com/cloudevents/sdk-go/binding/format/protobuf/v2" +// +//go:generate protoc --proto_path=. --proto_path=$HOME/.local/include --proto_path=$PROTO_TMP --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/chip_ingress.proto pb/chip_common.proto + +package pb +