Skip to content

Commit c1e3401

Browse files
author
Michal Tichák
committed
better --help and handling of streams
1 parent a06119a commit c1e3401

File tree

4 files changed

+123
-27
lines changed

4 files changed

+123
-27
lines changed

cmd/peanut/main.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,60 @@ func main() {
4040
fmt.Fprint(os.Stderr, `peanut — process execution and control utility for OCC / FairMQ processes
4141
4242
TUI mode (interactive, launched when no command is given):
43-
OCC_CONTROL_PORT=<port> peanut
44-
peanut -addr host:port -mode fmq
43+
peanut direct mode via OCC_CONTROL_PORT env var
44+
peanut -addr host:port direct mode (OCC protobuf, one button per transition)
45+
peanut -addr host:port -mode fmq fmq batched mode (drives full FairMQ sequence per transition)
46+
peanut -addr host:port -mode fmq-step fmq single-step mode (one button per raw FairMQ event)
4547
4648
CLI mode (non-interactive, launched when a command is given):
4749
peanut [flags] <command> [args]
48-
Run "peanut -addr x get-state" for full CLI usage.
4950
50-
Flags:
51+
TUI Flags:
52+
-addr string gRPC address (host:port); if empty, uses OCC_CONTROL_PORT env var in direct mode
53+
-mode string direct (default), fmq, or fmq-step
54+
55+
CLI Flags:
56+
-addr string gRPC address (default "localhost:47100")
57+
-mode string fmq (default) or direct
58+
-timeout duration unary call timeout (default 30s)
59+
-config string path to YAML/JSON file with arguments to push (inline key=val args take precedence)
60+
61+
CLI Commands:
62+
get-state
63+
Print the current FSM state.
64+
65+
transition <fromState> <toState> [key=val ...]
66+
High-level OCC transition. In fmq mode drives the full multi-step FairMQ sequence:
67+
STANDBY→CONFIGURED runs INIT DEVICE, COMPLETE INIT, BIND, CONNECT, INIT TASK
68+
CONFIGURED→RUNNING runs RUN
69+
RUNNING→CONFIGURED runs STOP
70+
CONFIGURED→STANDBY runs RESET TASK, RESET DEVICE
71+
In direct mode sends a single OCC protobuf Transition RPC.
72+
key=val pairs are forwarded as ConfigEntry arguments.
73+
74+
direct-step <srcState> <event> [key=val ...]
75+
Low-level: send a single raw OCC protobuf Transition RPC regardless of -mode.
76+
Events: CONFIGURE, RESET, START, STOP, RECOVER, EXIT
77+
78+
fmq-step <srcFMQState> <fmqEvent> [key=val ...]
79+
Low-level: send a single raw FairMQ gRPC Transition call regardless of -mode.
80+
FairMQ state/event names that contain spaces must be quoted.
81+
82+
state-stream
83+
Subscribe to StateStream; print updates until interrupted (ctrl-c to stop).
84+
85+
event-stream
86+
Subscribe to EventStream; print events until interrupted (ctrl-c to stop).
87+
88+
Examples:
89+
peanut -addr localhost:47100 get-state
90+
peanut -addr localhost:47100 transition STANDBY CONFIGURED chans.x.0.address=ipc://@foo
91+
peanut -addr localhost:47100 -config args.yaml transition STANDBY CONFIGURED
92+
peanut -addr localhost:47100 fmq-step IDLE "INIT DEVICE" chans.x.0.address=ipc://@foo
93+
peanut -addr localhost:47100 direct-step STANDBY CONFIGURE key=val
94+
peanut -addr localhost:47100 state-stream
95+
peanut -addr localhost:47100 -mode direct transition STANDBY CONFIGURED
5196
`)
52-
fs.PrintDefaults()
5397
}
5498
_ = fs.Parse(os.Args[1:])
5599

executor/executorcmd/nopb/occclient.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,42 @@ func (c *occClient) EventStream(ctx context.Context, in *pb.EventStreamRequest,
6161
return x, nil
6262
}
6363

64+
type occStateStreamClient struct {
65+
grpc.ClientStream
66+
}
67+
68+
func (x *occStateStreamClient) Recv() (*pb.StateStreamReply, error) {
69+
m := new(pb.StateStreamReply)
70+
if err := x.ClientStream.RecvMsg(m); err != nil {
71+
return nil, err
72+
}
73+
return m, nil
74+
}
75+
6476
func (c *occClient) StateStream(ctx context.Context, in *pb.StateStreamRequest, opts ...grpc.CallOption) (pb.Occ_StateStreamClient, error) {
65-
return nil, nil
77+
opts = append(opts,
78+
[]grpc.CallOption{
79+
grpc.CallContentSubtype("json"),
80+
}...,
81+
)
82+
streamDesc := grpc.StreamDesc{
83+
StreamName: "StateStream",
84+
Handler: nil,
85+
ServerStreams: true,
86+
ClientStreams: false,
87+
}
88+
stream, err := c.cc.NewStream(ctx, &streamDesc, "StateStream", opts...)
89+
if err != nil {
90+
return nil, err
91+
}
92+
x := &occStateStreamClient{stream}
93+
if err := x.ClientStream.SendMsg(in); err != nil {
94+
return nil, err
95+
}
96+
if err := x.ClientStream.CloseSend(); err != nil {
97+
return nil, err
98+
}
99+
return x, nil
66100
}
67101

68102
func (c *occClient) GetState(ctx context.Context, in *pb.GetStateRequest, opts ...grpc.CallOption) (*pb.GetStateReply, error) {

occ/peanut/cli.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939

4040
"github.com/AliceO2Group/Control/executor/executorcmd/nopb"
4141
"github.com/AliceO2Group/Control/executor/executorcmd/transitioner/fairmq"
42-
"github.com/AliceO2Group/Control/executor/protos"
42+
pb "github.com/AliceO2Group/Control/executor/protos"
4343
"github.com/AliceO2Group/Control/occ/peanut/flatten"
4444
)
4545

@@ -157,6 +157,9 @@ func RunCLI(args []string) error {
157157
if err != nil {
158158
return fmt.Errorf("StateStream: %w", err)
159159
}
160+
if stream == nil {
161+
return fmt.Errorf("StateStream not supported by this server (try polling with get-state)")
162+
}
160163
fmt.Fprintf(os.Stderr, "streaming state updates from %s (ctrl-c to stop)\n", *addr)
161164
for {
162165
msg, err := stream.Recv()
@@ -176,6 +179,9 @@ func RunCLI(args []string) error {
176179
if err != nil {
177180
return fmt.Errorf("EventStream: %w", err)
178181
}
182+
if stream == nil {
183+
return fmt.Errorf("EventStream not supported by this server")
184+
}
179185
fmt.Fprintf(os.Stderr, "streaming events from %s (ctrl-c to stop)\n", *addr)
180186
for {
181187
msg, err := stream.Recv()
@@ -256,37 +262,46 @@ func cliFMQTransition(ctx context.Context, client pb.OccClient, from, to string,
256262
}
257263
}
258264

265+
// fmqStepErr formats a FairMQ step failure, omitting the cause when err is nil
266+
// (state arrived but was wrong) to avoid a trailing ": <nil>" in the message.
267+
func fmqStepErr(step, want, got string, err error) error {
268+
if err != nil {
269+
return fmt.Errorf("%s: expected %q got %q: %w", step, want, got, err)
270+
}
271+
return fmt.Errorf("%s: expected %q got %q", step, want, got)
272+
}
273+
259274
func cliFMQConfigure(ctx context.Context, client pb.OccClient, args map[string]string) (string, error) {
260275
state, err := cliFMQDoStep(ctx, client, fairmq.IDLE, fairmq.EvtINIT_DEVICE, args)
261276
if err != nil || state != fairmq.INITIALIZING_DEVICE {
262-
return cliFMQToOCCState(state), fmt.Errorf("INIT DEVICE: expected %q got %q: %w", fairmq.INITIALIZING_DEVICE, state, err)
277+
return cliFMQToOCCState(state), fmqStepErr("INIT DEVICE", fairmq.INITIALIZING_DEVICE, state, err)
263278
}
264279
state, err = cliFMQDoStep(ctx, client, fairmq.INITIALIZING_DEVICE, fairmq.EvtCOMPLETE_INIT, nil)
265280
if err != nil || state != fairmq.INITIALIZED {
266-
return cliFMQToOCCState(state), fmt.Errorf("COMPLETE INIT: expected %q got %q: %w", fairmq.INITIALIZED, state, err)
281+
return cliFMQToOCCState(state), fmqStepErr("COMPLETE INIT", fairmq.INITIALIZED, state, err)
267282
}
268283
state, err = cliFMQDoStep(ctx, client, fairmq.INITIALIZED, fairmq.EvtBIND, nil)
269284
if err != nil || state != fairmq.BOUND {
270285
cliFMQDoStep(ctx, client, fairmq.INITIALIZED, fairmq.EvtRESET_DEVICE, nil) // rollback
271-
return cliFMQToOCCState(state), fmt.Errorf("BIND: expected %q got %q: %w", fairmq.BOUND, state, err)
286+
return cliFMQToOCCState(state), fmqStepErr("BIND", fairmq.BOUND, state, err)
272287
}
273288
state, err = cliFMQDoStep(ctx, client, fairmq.BOUND, fairmq.EvtCONNECT, nil)
274289
if err != nil || state != fairmq.DEVICE_READY {
275290
cliFMQDoStep(ctx, client, fairmq.BOUND, fairmq.EvtRESET_DEVICE, nil) // rollback
276-
return cliFMQToOCCState(state), fmt.Errorf("CONNECT: expected %q got %q: %w", fairmq.DEVICE_READY, state, err)
291+
return cliFMQToOCCState(state), fmqStepErr("CONNECT", fairmq.DEVICE_READY, state, err)
277292
}
278293
state, err = cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtINIT_TASK, nil)
279294
if err != nil || state != fairmq.READY {
280295
cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, nil) // rollback
281-
return cliFMQToOCCState(state), fmt.Errorf("INIT TASK: expected %q got %q: %w", fairmq.READY, state, err)
296+
return cliFMQToOCCState(state), fmqStepErr("INIT TASK", fairmq.READY, state, err)
282297
}
283298
return cliFMQToOCCState(state), nil
284299
}
285300

286301
func cliFMQReset(ctx context.Context, client pb.OccClient, args map[string]string) (string, error) {
287302
state, err := cliFMQDoStep(ctx, client, fairmq.READY, fairmq.EvtRESET_TASK, nil)
288303
if err != nil || state != fairmq.DEVICE_READY {
289-
return cliFMQToOCCState(state), fmt.Errorf("RESET TASK: expected %q got %q: %w", fairmq.DEVICE_READY, state, err)
304+
return cliFMQToOCCState(state), fmqStepErr("RESET TASK", fairmq.DEVICE_READY, state, err)
290305
}
291306
state, err = cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, args)
292307
return cliFMQToOCCState(state), err
@@ -366,15 +381,18 @@ func cliMergeKVs(base, override map[string]string) map[string]string {
366381
func cliUsage() {
367382
fmt.Fprint(os.Stderr, `peanut — process execution and control utility for OCC / FairMQ processes
368383
369-
TUI mode (interactive):
370-
OCC_CONTROL_PORT=<port> peanut
384+
TUI mode (interactive, no command given):
385+
peanut direct mode via OCC_CONTROL_PORT env var
386+
peanut -addr host:port direct mode (OCC protobuf)
387+
peanut -addr host:port -mode fmq fmq batched mode (full FairMQ sequence per transition)
388+
peanut -addr host:port -mode fmq-step fmq single-step mode (one button per raw FairMQ event)
371389
372-
CLI mode (non-interactive):
390+
CLI mode (non-interactive, command given):
373391
peanut [flags] <command> [args]
374392
375393
CLI Flags:
376394
-addr string gRPC address (default "localhost:47100")
377-
-mode string fmq (json codec, default) or direct (protobuf)
395+
-mode string fmq (FairMQ json codec, default) or direct (OCC protobuf)
378396
-timeout duration unary call timeout (default 30s)
379397
-config string path to YAML/JSON file with arguments to push (inline key=val args take precedence)
380398
@@ -383,21 +401,21 @@ CLI Commands:
383401
Print the current FSM state.
384402
385403
transition <fromState> <toState> [key=val ...]
386-
High-level OCC transition. In fairmq mode drives the full multi-step
404+
High-level OCC transition. In fmq mode drives the full multi-step
387405
FairMQ sequence automatically:
388406
STANDBY→CONFIGURED runs INIT DEVICE, COMPLETE INIT, BIND, CONNECT, INIT TASK
389407
CONFIGURED→RUNNING runs RUN
390408
RUNNING→CONFIGURED runs STOP
391409
CONFIGURED→STANDBY runs RESET TASK, RESET DEVICE
410+
In direct mode sends a single OCC protobuf Transition RPC.
392411
key=val pairs are forwarded as ConfigEntry arguments.
393412
394413
direct-step <srcState> <event> [key=val ...]
395-
Low-level: send a single raw OCC gRPC Transition call.
396-
Mirrors exactly what the TUI does for each button press.
414+
Low-level: send a single raw OCC protobuf Transition RPC regardless of -mode.
397415
Events: CONFIGURE, RESET, START, STOP, RECOVER, EXIT
398416
399417
fmq-step <srcFMQState> <fmqEvent> [key=val ...]
400-
Low-level: send a single raw FairMQ gRPC Transition call.
418+
Low-level: send a single raw FairMQ gRPC Transition call regardless of -mode.
401419
FairMQ state/event names that contain spaces must be quoted.
402420
403421
state-stream

occ/peanut/peanut.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,34 +270,34 @@ func tuiFMQTransition(evt string, args []*pb.ConfigEntry) (string, error) {
270270
func tuiFMQConfigure(args map[string]string) (string, error) {
271271
state, err := fmqDoStep(context.TODO(), occClient, fairmq.IDLE, fairmq.EvtINIT_DEVICE, args)
272272
if err != nil || state != fairmq.INITIALIZING_DEVICE {
273-
return cliFMQToOCCState(state), fmt.Errorf("INIT DEVICE: expected %q got %q: %w", fairmq.INITIALIZING_DEVICE, state, err)
273+
return cliFMQToOCCState(state), fmqStepErr("INIT DEVICE", fairmq.INITIALIZING_DEVICE, state, err)
274274
}
275275
state, err = fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZING_DEVICE, fairmq.EvtCOMPLETE_INIT, nil)
276276
if err != nil || state != fairmq.INITIALIZED {
277-
return cliFMQToOCCState(state), fmt.Errorf("COMPLETE INIT: expected %q got %q: %w", fairmq.INITIALIZED, state, err)
277+
return cliFMQToOCCState(state), fmqStepErr("COMPLETE INIT", fairmq.INITIALIZED, state, err)
278278
}
279279
state, err = fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZED, fairmq.EvtBIND, nil)
280280
if err != nil || state != fairmq.BOUND {
281281
fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZED, fairmq.EvtRESET_DEVICE, nil) // rollback
282-
return cliFMQToOCCState(state), fmt.Errorf("BIND: expected %q got %q: %w", fairmq.BOUND, state, err)
282+
return cliFMQToOCCState(state), fmqStepErr("BIND", fairmq.BOUND, state, err)
283283
}
284284
state, err = fmqDoStep(context.TODO(), occClient, fairmq.BOUND, fairmq.EvtCONNECT, nil)
285285
if err != nil || state != fairmq.DEVICE_READY {
286286
fmqDoStep(context.TODO(), occClient, fairmq.BOUND, fairmq.EvtRESET_DEVICE, nil) // rollback
287-
return cliFMQToOCCState(state), fmt.Errorf("CONNECT: expected %q got %q: %w", fairmq.DEVICE_READY, state, err)
287+
return cliFMQToOCCState(state), fmqStepErr("CONNECT", fairmq.DEVICE_READY, state, err)
288288
}
289289
state, err = fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtINIT_TASK, nil)
290290
if err != nil || state != fairmq.READY {
291291
fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, nil) // rollback
292-
return cliFMQToOCCState(state), fmt.Errorf("INIT TASK: expected %q got %q: %w", fairmq.READY, state, err)
292+
return cliFMQToOCCState(state), fmqStepErr("INIT TASK", fairmq.READY, state, err)
293293
}
294294
return cliFMQToOCCState(state), nil
295295
}
296296

297297
func tuiFMQReset(args map[string]string) (string, error) {
298298
state, err := fmqDoStep(context.TODO(), occClient, fairmq.READY, fairmq.EvtRESET_TASK, nil)
299299
if err != nil || state != fairmq.DEVICE_READY {
300-
return cliFMQToOCCState(state), fmt.Errorf("RESET TASK: expected %q got %q: %w", fairmq.DEVICE_READY, state, err)
300+
return cliFMQToOCCState(state), fmqStepErr("RESET TASK", fairmq.DEVICE_READY, state, err)
301301
}
302302
state, err = fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, args)
303303
return cliFMQToOCCState(state), err

0 commit comments

Comments
 (0)