diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 1ba2b236cd..501a089488 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -26,23 +26,31 @@ func (c *ChipIngressEmitter) Close() error { } func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return c.BatchEmit(ctx, NewMessage(body, attrKVs...)) +} - sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) - if err != nil { - return err - } +func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + events := make([]chipingress.CloudEvent, len(messages)) + for i, msg := range messages { + sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs) + if err != nil { + return err + } - event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...)) - if err != nil { - return err + event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs) + if err != nil { + return err + } + + events[i] = event } - eventPb, err := chipingress.EventToProto(event) + eventPb, err := chipingress.EventsToBatch(events) if err != nil { return fmt.Errorf("failed to convert event to proto: %w", err) } - _, err = c.client.Publish(ctx, eventPb) + _, err = c.client.PublishBatch(ctx, eventPb) if err != nil { return err } @@ -51,10 +59,7 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a } // ExtractSourceAndType extracts source domain and entity from the attributes -func ExtractSourceAndType(attrKVs ...any) (string, string, error) { - - attributes := newAttributes(attrKVs...) - +func ExtractSourceAndType(attributes Attributes) (string, string, error) { var sourceDomain string var entityType string diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 9d41e7d5d7..83639f5ae8 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -190,7 +190,7 @@ func TestExtractSourceAndType(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - domain, entity, err := beholder.ExtractSourceAndType(tt.attrs...) + domain, entity, err := beholder.ExtractSourceAndType(beholder.ExtractAttributes(tt.attrs)) if tt.wantErr { if err == nil { diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 328d877c49..1e94db05b3 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -31,8 +31,9 @@ import ( const defaultGRPCCompressor = "gzip" type Emitter interface { - // Sends message with bytes and attributes to OTel Collector + // Emit Sends message with bytes and attributes to OTel Collector Emit(ctx context.Context, body []byte, attrKVs ...any) error + BatchEmit(ctx context.Context, messages ...Message) error io.Closer } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index 5167efaece..865b83cedf 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -56,9 +56,12 @@ func (d *DualSourceEmitter) Close() error { } func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return d.BatchEmit(ctx, NewMessage(body, attrKVs...)) +} +func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages ...Message) error { // Emit via OTLP first - if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil { + if err := d.otelCollectorEmitter.BatchEmit(ctx, messages...); err != nil { return err } @@ -72,7 +75,7 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an ctx, cancel = d.stopCh.Ctx(ctx) defer cancel() - if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + if err := d.chipIngressEmitter.BatchEmit(ctx, messages...); err != nil { // If the chip ingress emitter fails, we ONLY log the error // because we still want to send the data to the OTLP collector and not cause disruption d.log.Infof("failed to emit to chip ingress: %v", err) diff --git a/pkg/beholder/message_emitter.go b/pkg/beholder/message_emitter.go index 0f6400d532..f13eddbe44 100644 --- a/pkg/beholder/message_emitter.go +++ b/pkg/beholder/message_emitter.go @@ -22,10 +22,15 @@ func (e messageEmitter) Close() error { return nil } // Emits logs the message, but does not wait for the message to be processed. // Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { - message := NewMessage(body, attrKVs...) - if err := message.Validate(); err != nil { - return err + return e.BatchEmit(ctx, NewMessage(body, attrKVs...)) +} + +func (e messageEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + for _, message := range messages { + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) } - e.messageLogger.Emit(ctx, message.OtelRecord()) return nil } diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 67580ec15c..3e27c98beb 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -108,6 +108,11 @@ func (e noopMessageEmitter) Close() error { return nil } func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { return nil } + +func (noopMessageEmitter) BatchEmit(ctx context.Context, messages ...Message) error { + return nil +} + func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error { return nil } diff --git a/pkg/chipingress/pb/chip_common.pb.go b/pkg/chipingress/pb/chip_common.pb.go index 33be5ae2cb..3308bf998a 100644 --- a/pkg/chipingress/pb/chip_common.pb.go +++ b/pkg/chipingress/pb/chip_common.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc v5.29.3 // source: pb/chip_common.proto @@ -77,8 +77,8 @@ type PathType int32 const ( PathType_S3 PathType = 0 // S3 storage PathType_LOCAL PathType = 1 // Local file system - PathType_GITHUB PathType = 3 // GitHub storage - PathType_OTHER PathType = 4 // Other storage types + PathType_GITHUB PathType = 2 // GitHub storage + PathType_OTHER PathType = 3 // Other storage types ) // Enum value maps for PathType. @@ -86,14 +86,14 @@ var ( PathType_name = map[int32]string{ 0: "S3", 1: "LOCAL", - 3: "GITHUB", - 4: "OTHER", + 2: "GITHUB", + 3: "OTHER", } PathType_value = map[string]int32{ "S3": 0, "LOCAL": 1, - "GITHUB": 3, - "OTHER": 4, + "GITHUB": 2, + "OTHER": 3, } ) @@ -515,8 +515,8 @@ const file_pb_chip_common_proto_rawDesc = "" + "\x02S3\x10\x00\x12\t\n" + "\x05LOCAL\x10\x01\x12\n" + "\n" + - "\x06GITHUB\x10\x03\x12\t\n" + - "\x05OTHER\x10\x04B\x06Z\x04./pbb\x06proto3" + "\x06GITHUB\x10\x02\x12\t\n" + + "\x05OTHER\x10\x03B\x06Z\x04./pbb\x06proto3" var ( file_pb_chip_common_proto_rawDescOnce sync.Once diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 7117f287c6..eefbc293c4 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc v5.29.3 // source: pb/chip_ingress.proto @@ -8,6 +8,7 @@ package pb import ( pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + status "google.golang.org/genproto/googleapis/rpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -22,17 +23,67 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// PublishOptions controls optional behaviour of PublishBatch. +type PublishOptions struct { + state protoimpl.MessageState `protogen:"open.v1"` + // allOrNothing makes the batch atomic: either all events are committed or none are. + // When unset, the server defaults to true (preserving the original atomic behaviour). + // Set to false to allow partial success, where individual results carry per-event errors. + AllOrNothing *bool `protobuf:"varint,1,opt,name=allOrNothing,proto3,oneof" json:"allOrNothing,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishOptions) Reset() { + *x = PublishOptions{} + mi := &file_pb_chip_ingress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishOptions) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishOptions) ProtoMessage() {} + +func (x *PublishOptions) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishOptions.ProtoReflect.Descriptor instead. +func (*PublishOptions) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishOptions) GetAllOrNothing() bool { + if x != nil && x.AllOrNothing != nil { + return *x.AllOrNothing + } + return false +} + // CloudEventBatch is used to send many ChipIngress type CloudEventBatch struct { - state protoimpl.MessageState `protogen:"open.v1"` - Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + // options are optional publish settings. When omitted, allOrNothing defaults to true. + Options *PublishOptions `protobuf:"bytes,2,opt,name=options,proto3,oneof" json:"options,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *CloudEventBatch) Reset() { *x = CloudEventBatch{} - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +95,7 @@ func (x *CloudEventBatch) String() string { func (*CloudEventBatch) ProtoMessage() {} func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +108,7 @@ func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use CloudEventBatch.ProtoReflect.Descriptor instead. func (*CloudEventBatch) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} } func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { @@ -67,6 +118,13 @@ func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { return nil } +func (x *CloudEventBatch) GetOptions() *PublishOptions { + if x != nil { + return x.Options + } + return nil +} + type PublishResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Results []*PublishResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` @@ -76,7 +134,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -88,7 +146,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -101,7 +159,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} } func (x *PublishResponse) GetResults() []*PublishResult { @@ -114,13 +172,14 @@ func (x *PublishResponse) GetResults() []*PublishResult { type PublishResult struct { state protoimpl.MessageState `protogen:"open.v1"` EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + Error *status.Status `protobuf:"bytes,2,opt,name=error,proto3,oneof" json:"error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *PublishResult) Reset() { *x = PublishResult{} - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +191,7 @@ func (x *PublishResult) String() string { func (*PublishResult) ProtoMessage() {} func (x *PublishResult) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +204,7 @@ func (x *PublishResult) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResult.ProtoReflect.Descriptor instead. func (*PublishResult) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} } func (x *PublishResult) GetEventId() string { @@ -155,6 +214,13 @@ func (x *PublishResult) GetEventId() string { return "" } +func (x *PublishResult) GetError() *status.Status { + if x != nil { + return x.Error + } + return nil +} + // EmptyRequest is just an empty request type EmptyRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -164,7 +230,7 @@ type EmptyRequest struct { func (x *EmptyRequest) Reset() { *x = EmptyRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -176,7 +242,7 @@ func (x *EmptyRequest) String() string { func (*EmptyRequest) ProtoMessage() {} func (x *EmptyRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -189,7 +255,7 @@ func (x *EmptyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use EmptyRequest.ProtoReflect.Descriptor instead. func (*EmptyRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} } // PingResponse responds to pings @@ -202,7 +268,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -214,7 +280,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -227,7 +293,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} } func (x *PingResponse) GetMessage() string { @@ -247,7 +313,7 @@ type StreamEventsRequest struct { func (x *StreamEventsRequest) Reset() { *x = StreamEventsRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -259,7 +325,7 @@ func (x *StreamEventsRequest) String() string { func (*StreamEventsRequest) ProtoMessage() {} func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -272,7 +338,7 @@ func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. func (*StreamEventsRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} } func (x *StreamEventsRequest) GetEvent() *pb.CloudEvent { @@ -292,7 +358,7 @@ type StreamEventsResponse struct { func (x *StreamEventsResponse) Reset() { *x = StreamEventsResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -304,7 +370,7 @@ func (x *StreamEventsResponse) String() string { func (*StreamEventsResponse) ProtoMessage() {} func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -317,7 +383,7 @@ func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. func (*StreamEventsResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} } func (x *StreamEventsResponse) GetEventId() string { @@ -344,7 +410,7 @@ type RegisterSchemaRequest struct { func (x *RegisterSchemaRequest) Reset() { *x = RegisterSchemaRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -356,7 +422,7 @@ func (x *RegisterSchemaRequest) String() string { func (*RegisterSchemaRequest) ProtoMessage() {} func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -369,7 +435,7 @@ func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaRequest.ProtoReflect.Descriptor instead. func (*RegisterSchemaRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} } func (x *RegisterSchemaRequest) GetSchemas() []*Schema { @@ -389,7 +455,7 @@ type RegisterSchemaResponse struct { func (x *RegisterSchemaResponse) Reset() { *x = RegisterSchemaResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -401,7 +467,7 @@ func (x *RegisterSchemaResponse) String() string { func (*RegisterSchemaResponse) ProtoMessage() {} func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -414,7 +480,7 @@ func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaResponse.ProtoReflect.Descriptor instead. func (*RegisterSchemaResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{9} } func (x *RegisterSchemaResponse) GetRegistered() []*RegisteredSchema { @@ -428,13 +494,21 @@ var File_pb_chip_ingress_proto protoreflect.FileDescriptor const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\x1a\x17google/rpc/status.proto\"J\n" + + "\x0ePublishOptions\x12'\n" + + "\fallOrNothing\x18\x01 \x01(\bH\x00R\fallOrNothing\x88\x01\x01B\x0f\n" + + "\r_allOrNothing\"\x93\x01\n" + "\x0fCloudEventBatch\x125\n" + - "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\x12=\n" + + "\aoptions\x18\x02 \x01(\v2\x1e.chipingress.pb.PublishOptionsH\x00R\aoptions\x88\x01\x01B\n" + + "\n" + + "\b_options\"J\n" + "\x0fPublishResponse\x127\n" + - "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\")\n" + + "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\"b\n" + "\rPublishResult\x12\x18\n" + - "\aeventId\x18\x01 \x01(\tR\aeventId\"\x0e\n" + + "\aeventId\x18\x01 \x01(\tR\aeventId\x12-\n" + + "\x05error\x18\x02 \x01(\v2\x12.google.rpc.StatusH\x00R\x05error\x88\x01\x01B\b\n" + + "\x06_error\"\x0e\n" + "\fEmptyRequest\"(\n" + "\fPingResponse\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"J\n" + @@ -468,42 +542,46 @@ func file_pb_chip_ingress_proto_rawDescGZIP() []byte { return file_pb_chip_ingress_proto_rawDescData } -var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_pb_chip_ingress_proto_goTypes = []any{ - (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch - (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse - (*PublishResult)(nil), // 2: chipingress.pb.PublishResult - (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest - (*PingResponse)(nil), // 4: chipingress.pb.PingResponse - (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest - (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse - (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest - (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse - (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent - (*Schema)(nil), // 10: chip_common.Schema - (*RegisteredSchema)(nil), // 11: chip_common.RegisteredSchema + (*PublishOptions)(nil), // 0: chipingress.pb.PublishOptions + (*CloudEventBatch)(nil), // 1: chipingress.pb.CloudEventBatch + (*PublishResponse)(nil), // 2: chipingress.pb.PublishResponse + (*PublishResult)(nil), // 3: chipingress.pb.PublishResult + (*EmptyRequest)(nil), // 4: chipingress.pb.EmptyRequest + (*PingResponse)(nil), // 5: chipingress.pb.PingResponse + (*StreamEventsRequest)(nil), // 6: chipingress.pb.StreamEventsRequest + (*StreamEventsResponse)(nil), // 7: chipingress.pb.StreamEventsResponse + (*RegisterSchemaRequest)(nil), // 8: chipingress.pb.RegisterSchemaRequest + (*RegisterSchemaResponse)(nil), // 9: chipingress.pb.RegisterSchemaResponse + (*pb.CloudEvent)(nil), // 10: io.cloudevents.v1.CloudEvent + (*status.Status)(nil), // 11: google.rpc.Status + (*Schema)(nil), // 12: chip_common.Schema + (*RegisteredSchema)(nil), // 13: chip_common.RegisteredSchema } var file_pb_chip_ingress_proto_depIdxs = []int32{ - 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent - 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 9, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 10, // 3: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema - 11, // 4: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema - 9, // 5: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 6: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 7: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 8: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 7, // 9: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest - 1, // 10: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 11: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 12: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 13: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 8, // 14: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse - 10, // [10:15] is the sub-list for method output_type - 5, // [5:10] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 10, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent + 0, // 1: chipingress.pb.CloudEventBatch.options:type_name -> chipingress.pb.PublishOptions + 3, // 2: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult + 11, // 3: chipingress.pb.PublishResult.error:type_name -> google.rpc.Status + 10, // 4: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 12, // 5: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 13, // 6: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 10, // 7: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 1, // 8: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 4, // 9: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 6, // 10: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 8, // 11: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 2, // 12: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 2, // 13: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 5, // 14: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 7, // 15: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 9, // 16: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 12, // [12:17] is the sub-list for method output_type + 7, // [7:12] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_pb_chip_ingress_proto_init() } @@ -512,13 +590,16 @@ func file_pb_chip_ingress_proto_init() { return } file_pb_chip_common_proto_init() + file_pb_chip_ingress_proto_msgTypes[0].OneofWrappers = []any{} + file_pb_chip_ingress_proto_msgTypes[1].OneofWrappers = []any{} + file_pb_chip_ingress_proto_msgTypes[3].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc)), NumEnums: 0, - NumMessages: 9, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index 1675f5d9c6..aa19c46947 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto"; import "pb/chip_common.proto"; +import "google/rpc/status.proto"; package chipingress.pb; @@ -28,9 +29,19 @@ service ChipIngress { rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse) {} } +// PublishOptions controls optional behaviour of PublishBatch. +message PublishOptions { + // allOrNothing makes the batch atomic: either all events are committed or none are. + // When unset, the server defaults to true (preserving the original atomic behaviour). + // Set to false to allow partial success, where individual results carry per-event errors. + optional bool allOrNothing = 1; +} + // CloudEventBatch is used to send many ChipIngress -message CloudEventBatch{ +message CloudEventBatch { repeated io.cloudevents.v1.CloudEvent events = 1; + // options are optional publish settings. When omitted, allOrNothing defaults to true. + optional PublishOptions options = 2; } message PublishResponse { @@ -39,6 +50,7 @@ message PublishResponse { message PublishResult { string eventId = 1; + optional google.rpc.Status error = 2; } // EmptyRequest is just an empty request diff --git a/pkg/chipingress/pb/generate.go b/pkg/chipingress/pb/generate.go new file mode 100644 index 0000000000..412ac85da9 --- /dev/null +++ b/pkg/chipingress/pb/generate.go @@ -0,0 +1,15 @@ +// To regenerate the Go bindings, ensure the following are installed: +// make install-protoc (installs protoc v5.29.3 + protoc-gen-go + protoc-gen-go-grpc) +// +// Then run from pkg/chipingress/: +// +// GOMODCACHE=$(go env GOMODCACHE) +// PROTO_TMP=$(mktemp -d) +// CLOUDEVENTS=$(find "$GOMODCACHE/github.com/cloudevents" -name "cloudevent.proto" | head -1 | xargs dirname | xargs dirname) +// mkdir -p "$PROTO_TMP/github.com/cloudevents/sdk-go/binding/format/protobuf" +// ln -s "$CLOUDEVENTS" "$PROTO_TMP/github.com/cloudevents/sdk-go/binding/format/protobuf/v2" +// +//go:generate protoc --proto_path=. --proto_path=$HOME/.local/include --proto_path=$PROTO_TMP --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/chip_ingress.proto pb/chip_common.proto + +package pb +