diff --git a/broker/client/client.go b/broker/client/client.go index a6874c2a..e7e8f4a4 100644 --- a/broker/client/client.go +++ b/broker/client/client.go @@ -91,12 +91,12 @@ func CreateIso18626ClientWithHttpClient(client *http.Client) Iso18626Client { func (c *Iso18626Client) MessageRequester(ctx common.ExtendedContext, event events.Event) { ctx = ctx.WithArgs(ctx.LoggerArgs().WithComponent(CLIENT_COMP)) - _, _ = c.eventBus.ProcessTask(ctx, event, c.createAndSendSupplyingAgencyMessage) + _, _ = c.eventBus.ProcessTaskBroadcast(ctx, event, c.createAndSendSupplyingAgencyMessage) } func (c *Iso18626Client) MessageSupplier(ctx common.ExtendedContext, event events.Event) { ctx = ctx.WithArgs(ctx.LoggerArgs().WithComponent(CLIENT_COMP)) - _, _ = c.eventBus.ProcessTask(ctx, event, c.createAndSendRequestOrRequestingAgencyMessage) + _, _ = c.eventBus.ProcessTaskBroadcast(ctx, event, c.createAndSendRequestOrRequestingAgencyMessage) } func (c *Iso18626Client) getSkippedSupplierAndPeer(ctx common.ExtendedContext, illTransId, symbol string) (*ill_db.LocatedSupplier, *ill_db.Peer, error) { diff --git a/broker/events/eventbus.go b/broker/events/eventbus.go index d211c2c2..ca0d427c 100644 --- a/broker/events/eventbus.go +++ b/broker/events/eventbus.go @@ -35,8 +35,10 @@ type EventBus interface { CreateNoticeWithParent(id string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain, parentId *string) (string, error) // Mark task for processing or fail if status is invalid (e.g already started) BeginTask(eventId string) (Event, error) + BeginTaskBroadcast(eventId string) (Event, error) // Mark task as completed or fail if status is invalid (e.g not started) CompleteTask(eventId string, result *EventResult, status EventStatus) (Event, error) + CompleteTaskBroadcast(eventId string, result *EventResult, status EventStatus) (Event, error) // Register handler for event (task or notice) created signal HandleEventCreated(eventName EventName, f func(ctx common.ExtendedContext, event Event)) // Register handler for task started signal @@ -45,6 +47,7 @@ type EventBus interface { HandleTaskCompleted(eventName EventName, f func(ctx common.ExtendedContext, event Event)) // Execute task processing function within an automatic Begin/Complete block. ProcessTask(ctx common.ExtendedContext, event Event, h func(common.ExtendedContext, Event) (EventStatus, *EventResult)) (Event, error) + ProcessTaskBroadcast(ctx common.ExtendedContext, event Event, h func(common.ExtendedContext, Event) (EventStatus, *EventResult)) (Event, error) FindAncestor(descendant *Event, eventName EventName) *Event GetLatestRequestEventByAction(ctx common.ExtendedContext, illTransId string, action string) (Event, error) } @@ -270,6 +273,14 @@ func (p *PostgresEventBus) createNotice(classId string, eventName EventName, dat } func (p *PostgresEventBus) BeginTask(eventId string) (Event, error) { + return p.beginTask(eventId, false) +} + +func (p *PostgresEventBus) BeginTaskBroadcast(eventId string) (Event, error) { + return p.beginTask(eventId, true) +} + +func (p *PostgresEventBus) beginTask(eventId string, broadcast bool) (Event, error) { var event Event err := p.repo.WithTxFunc(p.ctx, func(eventRepo EventRepo) error { var err error @@ -283,10 +294,15 @@ func (p *PostgresEventBus) BeginTask(eventId string) (Event, error) { if event.EventStatus != EventStatusNew { return fmt.Errorf("cannot begin task processing, event is not in state NEW but %s", event.EventStatus) } + broadcastValue := event.Broadcast + if broadcast { + broadcastValue = broadcast + } event, err = eventRepo.UpdateEventStatus(p.ctx, UpdateEventStatusParams{ ID: eventId, EventStatus: EventStatusProcessing, LastSignal: string(SignalTaskBegin), + Broadcast: broadcastValue, }) if err != nil { return err @@ -298,6 +314,14 @@ func (p *PostgresEventBus) BeginTask(eventId string) (Event, error) { } func (p *PostgresEventBus) CompleteTask(eventId string, result *EventResult, status EventStatus) (Event, error) { + return p.completeTask(eventId, result, status, false) +} + +func (p *PostgresEventBus) CompleteTaskBroadcast(eventId string, result *EventResult, status EventStatus) (Event, error) { + return p.completeTask(eventId, result, status, true) +} + +func (p *PostgresEventBus) completeTask(eventId string, result *EventResult, status EventStatus, broadcast bool) (Event, error) { var event Event err := p.repo.WithTxFunc(p.ctx, func(eventRepo EventRepo) error { var err error @@ -316,6 +340,9 @@ func (p *PostgresEventBus) CompleteTask(eventId string, result *EventResult, sta event.ResultData = *result } event.LastSignal = string(SignalTaskComplete) + if broadcast { + event.Broadcast = broadcast + } event, err = eventRepo.SaveEvent(p.ctx, SaveEventParams(event)) if err != nil { return err @@ -357,8 +384,16 @@ func (p *PostgresEventBus) HandleTaskCompleted(eventName EventName, f func(ctx c } func (p *PostgresEventBus) ProcessTask(ctx common.ExtendedContext, event Event, h func(common.ExtendedContext, Event) (EventStatus, *EventResult)) (Event, error) { + return p.processTask(ctx, event, h, p.BeginTask, p.CompleteTask) +} + +func (p *PostgresEventBus) ProcessTaskBroadcast(ctx common.ExtendedContext, event Event, h func(common.ExtendedContext, Event) (EventStatus, *EventResult)) (Event, error) { + return p.processTask(ctx, event, h, p.BeginTaskBroadcast, p.CompleteTaskBroadcast) +} + +func (p *PostgresEventBus) processTask(ctx common.ExtendedContext, event Event, h func(common.ExtendedContext, Event) (EventStatus, *EventResult), b func(eventId string) (Event, error), c func(eventId string, result *EventResult, status EventStatus) (Event, error)) (Event, error) { inEvent := &event - event, err := p.BeginTask(event.ID) + event, err := b(event.ID) if err != nil { p.getEventContext(inEvent).Logger().Warn("failed to start processing TASK event", "error", err, "eventName", inEvent.EventName) return event, err @@ -366,7 +401,7 @@ func (p *PostgresEventBus) ProcessTask(ctx common.ExtendedContext, event Event, status, result := h(ctx, event) - event, err = p.CompleteTask(event.ID, result, status) + event, err = c(event.ID, result, status) if err != nil { p.getEventContext(inEvent).Logger().Warn("failed to complete processing TASK event", "error", err, "eventName", inEvent.EventName) return event, err diff --git a/broker/sqlc/event_query.sql b/broker/sqlc/event_query.sql index 85ca63cd..8d3ea001 100644 --- a/broker/sqlc/event_query.sql +++ b/broker/sqlc/event_query.sql @@ -75,7 +75,7 @@ DELETE FROM event WHERE ill_transaction_id = $1; -- name: UpdateEventStatus :one -UPDATE event SET last_signal = $3, event_status = $2 +UPDATE event SET last_signal = $3, event_status = $2, broadcast = $4 WHERE id = $1 RETURNING sqlc.embed(event);