diff --git a/Cargo.lock b/Cargo.lock index 0c85430..0f0affb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2229,6 +2229,26 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "order-entry" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "axum", + "clap", + "hotfix", + "hotfix-message", + "hotfix-web", + "serde", + "serde_json", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "owo-colors" version = "4.2.3" @@ -3196,23 +3216,6 @@ dependencies = [ "libc", ] -[[package]] -name = "simple-new-order" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "axum", - "clap", - "hotfix", - "hotfix-web", - "tokio", - "tokio-util", - "tracing", - "tracing-subscriber", - "uuid", -] - [[package]] name = "siphasher" version = "1.0.1" diff --git a/README.md b/README.md index 198b085..5f0f668 100644 --- a/README.md +++ b/README.md @@ -40,8 +40,22 @@ various QuickFIX implementations, with long-term plans to optimise further. | FIX 4.4 | Fully supported | | FIX 5.0 | Planned | -Check out the [examples](https://github.com/Validus-Risk-Management/hotfix/tree/main/examples) -to get started. +### Getting started + +The quickest way to see HotFIX in action is with Docker Compose. The +repository includes an [order-entry](https://github.com/Validus-Risk-Management/hotfix/tree/main/examples/order-entry) example +initiator and a dummy acceptor that you can start together: + +```shell +docker compose -f example.compose.yml up --build +``` + +Once both services are running, open +[http://localhost:9881/order](http://localhost:9881/order) to send FIX +orders through the web UI and watch the messages flow in real time. + +See the [examples](https://github.com/Validus-Risk-Management/hotfix/tree/main/examples) directory for more details and +additional ways to run. ### Prior Art diff --git a/dummy-executor/.gitignore b/dummy-executor/.gitignore new file mode 100644 index 0000000..a4d0c95 --- /dev/null +++ b/dummy-executor/.gitignore @@ -0,0 +1,21 @@ +# IDEs +.idea/ +.vscode/ + +# Go +go.work +go.work.sum +*.test + +# Test artifacts +*.out +coverage.* +*.coverprofile +profile.cov + +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib \ No newline at end of file diff --git a/dummy-executor/Dockerfile b/dummy-executor/Dockerfile new file mode 100644 index 0000000..1e2940c --- /dev/null +++ b/dummy-executor/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.25-alpine AS build +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -o /executor . + +FROM alpine:3.21 +COPY --from=build /executor /executor +COPY config/executor.cfg /config/executor.cfg +EXPOSE 9880 +ENTRYPOINT ["/executor"] diff --git a/dummy-executor/README.md b/dummy-executor/README.md new file mode 100644 index 0000000..4eefcf2 --- /dev/null +++ b/dummy-executor/README.md @@ -0,0 +1,13 @@ +This is a dummy acceptor executor using QuickFIX/Go. +As HotFIX currently doesn't support sell-side use cases, +there is no way to set up end-to-end examples where HotFIX +acts as both sides of the connection. + +The acceptor implementation here provides a "ping-pong" +server, which responds to new orders with an ack and +a fill. + +## Acknowledgement + +This product includes software developed by +quickfixengine.org (http://www.quickfixengine.org/). \ No newline at end of file diff --git a/dummy-executor/cmd/executor.go b/dummy-executor/cmd/executor.go new file mode 100644 index 0000000..91a798b --- /dev/null +++ b/dummy-executor/cmd/executor.go @@ -0,0 +1,208 @@ +package cmd + +import ( + "fmt" + "log" + "os" + "os/signal" + "strconv" + "syscall" + + "github.com/quickfixgo/enum" + "github.com/quickfixgo/field" + fix44er "github.com/quickfixgo/fix44/executionreport" + "github.com/quickfixgo/fix44/newordersingle" + "github.com/quickfixgo/quickfix" + "github.com/quickfixgo/quickfix/log/screen" + "github.com/shopspring/decimal" +) + +// Realistic FX mid-market rates for common currency pairs. +var fxRates = map[string]decimal.Decimal{ + "EUR/USD": decimal.NewFromFloat(1.0850), + "GBP/USD": decimal.NewFromFloat(1.2650), + "USD/JPY": decimal.NewFromFloat(149.50), + "USD/CHF": decimal.NewFromFloat(0.8820), + "AUD/USD": decimal.NewFromFloat(0.6540), + "USD/CAD": decimal.NewFromFloat(1.3580), + "NZD/USD": decimal.NewFromFloat(0.6120), + "EUR/GBP": decimal.NewFromFloat(0.8580), + "EUR/JPY": decimal.NewFromFloat(162.20), + "GBP/JPY": decimal.NewFromFloat(189.10), +} + +// Executor implements quickfix.Application and handles incoming FIX messages. +type Executor struct { + orderID int + execID int + *quickfix.MessageRouter +} + +func newExecutor() *Executor { + e := &Executor{MessageRouter: quickfix.NewMessageRouter()} + e.AddRoute(newordersingle.Route(e.onNewOrderSingle)) + return e +} + +func (e *Executor) genOrderID() string { + e.orderID++ + return strconv.Itoa(e.orderID) +} + +func (e *Executor) genExecID() string { + e.execID++ + return strconv.Itoa(e.execID) +} + +// OnCreate is called when a FIX session is created. +func (e *Executor) OnCreate(sessionID quickfix.SessionID) { + log.Printf("Session created: %s", sessionID) +} + +// OnLogon is called when a FIX session logs on. +func (e *Executor) OnLogon(sessionID quickfix.SessionID) { + log.Printf("Session logon: %s", sessionID) +} + +// OnLogout is called when a FIX session logs out. +func (e *Executor) OnLogout(sessionID quickfix.SessionID) { + log.Printf("Session logout: %s", sessionID) +} + +// ToAdmin is called for outgoing admin messages. +func (e *Executor) ToAdmin(msg *quickfix.Message, sessionID quickfix.SessionID) {} + +// ToApp is called for outgoing application messages. +func (e *Executor) ToApp(msg *quickfix.Message, sessionID quickfix.SessionID) error { return nil } + +// FromAdmin is called for incoming admin messages. +func (e *Executor) FromAdmin(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { + return nil +} + +// FromApp is called for incoming application messages and routes them. +func (e *Executor) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { + return e.Route(msg, sessionID) +} + +func (e *Executor) onNewOrderSingle(msg newordersingle.NewOrderSingle, sessionID quickfix.SessionID) quickfix.MessageRejectError { + clOrdID, err := msg.GetClOrdID() + if err != nil { + return err + } + + symbol, err := msg.GetSymbol() + if err != nil { + return err + } + + side, err := msg.GetSide() + if err != nil { + return err + } + + orderQty, err := msg.GetOrderQty() + if err != nil { + return err + } + + log.Printf("Received NewOrderSingle: ClOrdID=%s Symbol=%s Side=%s Qty=%s", + clOrdID, symbol, string(side), orderQty.String()) + + // Look up FX rate; default to 1.0000 for unknown pairs. + price, ok := fxRates[symbol] + if !ok { + price = decimal.NewFromFloat(1.0000) + log.Printf("Unknown symbol %s, using default rate 1.0000", symbol) + } + + orderID := e.genOrderID() + zero := decimal.NewFromInt(0) + + // --- ACK (New) --- + ack := fix44er.New( + field.NewOrderID(orderID), + field.NewExecID(e.genExecID()), + field.NewExecType(enum.ExecType_NEW), + field.NewOrdStatus(enum.OrdStatus_NEW), + field.NewSide(side), + field.NewLeavesQty(orderQty, 2), + field.NewCumQty(zero, 2), + field.NewAvgPx(zero, 2), + ) + ack.Set(field.NewClOrdID(clOrdID)) + ack.Set(field.NewSymbol(symbol)) + ack.Set(field.NewOrderQty(orderQty, 2)) + + if sendErr := quickfix.SendToTarget(ack.ToMessage(), sessionID); sendErr != nil { + log.Printf("Error sending ACK: %v", sendErr) + } else { + log.Printf("Sent ACK for OrderID=%s", orderID) + } + + // --- FILL --- + fill := fix44er.New( + field.NewOrderID(orderID), + field.NewExecID(e.genExecID()), + field.NewExecType(enum.ExecType_TRADE), + field.NewOrdStatus(enum.OrdStatus_FILLED), + field.NewSide(side), + field.NewLeavesQty(zero, 2), + field.NewCumQty(orderQty, 2), + field.NewAvgPx(price, 4), + ) + fill.Set(field.NewClOrdID(clOrdID)) + fill.Set(field.NewSymbol(symbol)) + fill.Set(field.NewOrderQty(orderQty, 2)) + fill.Set(field.NewLastQty(orderQty, 2)) + fill.Set(field.NewLastPx(price, 4)) + + if sendErr := quickfix.SendToTarget(fill.ToMessage(), sessionID); sendErr != nil { + log.Printf("Error sending FILL: %v", sendErr) + } else { + log.Printf("Sent FILL for OrderID=%s at %s", orderID, price.String()) + } + + return nil +} + +// Run starts the FIX acceptor with the given config file path and blocks until interrupted. +func Run(cfgFileName string) error { + cfg, err := os.Open(cfgFileName) + if err != nil { + return fmt.Errorf("open config: %w", err) + } + defer func(cfg *os.File) { + err := cfg.Close() + if err != nil { + log.Printf("Error closing config file: %v", err) + } + }(cfg) + + appSettings, err := quickfix.ParseSettings(cfg) + if err != nil { + return fmt.Errorf("parse settings: %w", err) + } + + app := newExecutor() + storeFactory := quickfix.NewMemoryStoreFactory() + logFactory := screen.NewLogFactory() + + acceptor, err := quickfix.NewAcceptor(app, storeFactory, appSettings, logFactory) + if err != nil { + return fmt.Errorf("create acceptor: %w", err) + } + + if err := acceptor.Start(); err != nil { + return fmt.Errorf("start acceptor: %w", err) + } + log.Printf("FIX acceptor started on port 9880, waiting for connections...") + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + + log.Println("Shutting down...") + acceptor.Stop() + return nil +} diff --git a/dummy-executor/config/executor.cfg b/dummy-executor/config/executor.cfg new file mode 100644 index 0000000..8563267 --- /dev/null +++ b/dummy-executor/config/executor.cfg @@ -0,0 +1,10 @@ +[DEFAULT] +SocketAcceptPort=9880 +SenderCompID=dummy-acceptor +TargetCompID=dummy-initiator +ResetOnLogon=Y +ResetOnLogout=Y +ResetOnDisconnect=Y + +[SESSION] +BeginString=FIX.4.4 diff --git a/dummy-executor/go.mod b/dummy-executor/go.mod new file mode 100644 index 0000000..b7895cf --- /dev/null +++ b/dummy-executor/go.mod @@ -0,0 +1,19 @@ +module dummy-executor + +go 1.25.6 + +require ( + github.com/quickfixgo/enum v0.1.0 + github.com/quickfixgo/field v0.1.0 + github.com/quickfixgo/fix44 v0.1.0 + github.com/quickfixgo/quickfix v0.9.7 + github.com/shopspring/decimal v1.4.0 +) + +require ( + github.com/pires/go-proxyproto v0.7.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/quagmt/udecimal v1.8.0 // indirect + github.com/quickfixgo/tag v0.1.0 // indirect + golang.org/x/net v0.24.0 // indirect +) diff --git a/dummy-executor/go.sum b/dummy-executor/go.sum new file mode 100644 index 0000000..b7fecca --- /dev/null +++ b/dummy-executor/go.sum @@ -0,0 +1,30 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= +github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quagmt/udecimal v1.8.0 h1:d4MJNGb/dg8r03AprkeSiDlVKtkZnL10L3de/YGOiiI= +github.com/quagmt/udecimal v1.8.0/go.mod h1:ScmJ/xTGZcEoYiyMMzgDLn79PEJHcMBiJ4NNRT3FirA= +github.com/quickfixgo/enum v0.1.0 h1:TnCPOqxAWA5/IWp7lsvj97x7oyuHYgj3STBJlBzZGjM= +github.com/quickfixgo/enum v0.1.0/go.mod h1:65gdG2/8vr6uOYcjZBObVHMuTEYc5rr/+aKVWTrFIrQ= +github.com/quickfixgo/field v0.1.0 h1:JVO6fVD6Nkyy8e/ROYQtV/nQhMX/BStD5Lq7XIgYz2g= +github.com/quickfixgo/field v0.1.0/go.mod h1:Zu0qYmpj+gljlB2HgpUt9EcTIThs2lIQb8C57qbJr8o= +github.com/quickfixgo/fix44 v0.1.0 h1:g/rTl6mXDlG7iIMbY7zaPbHcj9N/B+tteOZ01yGzeSQ= +github.com/quickfixgo/fix44 v0.1.0/go.mod h1:d6Ia02Eq/JYgKCn/2V9FHxguAl1Alp/yu/xVpry82dA= +github.com/quickfixgo/quickfix v0.9.7 h1:vvx/cydUG6cnGDyYeUxKA5MTzbYFQulthdTHzmmsvmc= +github.com/quickfixgo/quickfix v0.9.7/go.mod h1:LpvubslWDsNapeQDvhYS2Qty9gJtm2vr/gSdUcpdEwU= +github.com/quickfixgo/tag v0.1.0 h1:R2A1Zf7CBE903+mOQlmTlfTmNZQz/yh7HunMbgcsqsA= +github.com/quickfixgo/tag v0.1.0/go.mod h1:l/drB1eO3PwN9JQTDC9Vt2EqOcaXk3kGJ+eeCQljvAI= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/dummy-executor/main.go b/dummy-executor/main.go new file mode 100644 index 0000000..78c18d0 --- /dev/null +++ b/dummy-executor/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "dummy-executor/cmd" + "log" + "os" +) + +func main() { + cfgFileName := "config/executor.cfg" + if len(os.Args) > 1 { + cfgFileName = os.Args[1] + } + if err := cmd.Run(cfgFileName); err != nil { + log.Fatalf("Error: %v", err) + } +} diff --git a/example.compose.yml b/example.compose.yml new file mode 100644 index 0000000..109cd48 --- /dev/null +++ b/example.compose.yml @@ -0,0 +1,14 @@ +services: + initiator-example: + build: + context: . + dockerfile: examples/order-entry/Dockerfile + environment: + RUST_LOG: debug + ports: + - "9881:9881" + + dummy-executor: + build: dummy-executor + ports: + - "9880:9880" diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 7f2ebe1..976f05b 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -18,6 +18,8 @@ use tracing_subscriber::EnvFilter; use crate::application::LoadTestingApplication; use crate::messages::{ExecutionReport, NewOrderSingle, OutboundMsg}; +const REPORTS_PER_ORDER: u32 = 2; + #[derive(ValueEnum, Clone, Debug)] #[clap(rename_all = "lower")] enum Database { @@ -143,7 +145,7 @@ async fn listen_for_reports(mut rx: UnboundedReceiver, message_ while let Some(_report) = rx.recv().await { count += 1; - if count == message_count { + if count == message_count * REPORTS_PER_ORDER { break; } } diff --git a/examples/load-testing/src/messages.rs b/examples/load-testing/src/messages.rs index ba00b2f..0dfd344 100644 --- a/examples/load-testing/src/messages.rs +++ b/examples/load-testing/src/messages.rs @@ -10,12 +10,12 @@ pub struct ExecutionReport { order_id: String, cl_ord_id: String, exec_id: String, - exec_type: u32, + exec_type: String, ord_status: OrdStatus, side: Side, symbol: String, - leaves_qty: u32, - cum_qty: u32, + leaves_qty: f64, + cum_qty: f64, avx_px: f64, } @@ -94,12 +94,12 @@ impl InboundMsg { order_id: message.get::<&str>(fix44::ORDER_ID).unwrap().to_string(), cl_ord_id: message.get::<&str>(fix44::CL_ORD_ID).unwrap().to_string(), exec_id: message.get::<&str>(fix44::EXEC_ID).unwrap().to_string(), - exec_type: message.get::(fix44::EXEC_TYPE).unwrap(), + exec_type: message.get::<&str>(fix44::EXEC_TYPE).unwrap().to_string(), ord_status: message.get::(fix44::ORD_STATUS).unwrap(), side: message.get::(fix44::SIDE).unwrap(), symbol: message.get::<&str>(fix44::SYMBOL).unwrap().to_string(), - leaves_qty: message.get::(fix44::LEAVES_QTY).unwrap(), - cum_qty: message.get::(fix44::CUM_QTY).unwrap(), + leaves_qty: message.get::(fix44::LEAVES_QTY).unwrap(), + cum_qty: message.get::(fix44::CUM_QTY).unwrap(), avx_px: message.get::(fix44::AVG_PX).unwrap(), }; Self::ExecutionReport(report) diff --git a/examples/order-entry/.dockerignore b/examples/order-entry/.dockerignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/examples/order-entry/.dockerignore @@ -0,0 +1 @@ +target/ diff --git a/examples/simple-new-order/.gitignore b/examples/order-entry/.gitignore similarity index 100% rename from examples/simple-new-order/.gitignore rename to examples/order-entry/.gitignore diff --git a/examples/simple-new-order/Cargo.toml b/examples/order-entry/Cargo.toml similarity index 77% rename from examples/simple-new-order/Cargo.toml rename to examples/order-entry/Cargo.toml index 81e7ad5..a0fad3a 100644 --- a/examples/simple-new-order/Cargo.toml +++ b/examples/order-entry/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "simple-new-order" +name = "order-entry" version = "0.1.0" authors.workspace = true edition.workspace = true @@ -10,10 +10,14 @@ publish = false hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb"] } hotfix-web = { path = "../../crates/hotfix-web", features = ["ui"] } +hotfix-message = { path = "../../crates/hotfix-message", features = ["fix44"] } + anyhow.workspace = true async-trait.workspace = true axum.workspace = true clap = { workspace = true, features = ["derive"] } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util.workspace = true tracing.workspace = true diff --git a/examples/order-entry/Dockerfile b/examples/order-entry/Dockerfile new file mode 100644 index 0000000..0fe82d5 --- /dev/null +++ b/examples/order-entry/Dockerfile @@ -0,0 +1,38 @@ +ARG RUST_VERSION=1.93.0 +ARG APP_NAME=order-entry +FROM rust:${RUST_VERSION}-slim-bullseye AS builder + +ARG APP_NAME +WORKDIR /app + +RUN --mount=type=bind,source=Cargo.toml,target=Cargo.toml \ + --mount=type=bind,source=Cargo.lock,target=Cargo.lock \ + --mount=type=bind,source=crates,target=crates \ + --mount=type=bind,source=examples,target=examples \ + --mount=type=cache,target=/app/target/ \ + --mount=type=cache,target=/usr/local/cargo/registry/ \ + <, + next_id: u64, +} + +impl SharedState { + pub fn new() -> Self { + Self { + messages: Vec::new(), + next_id: 1, + } + } + + pub fn push(&mut self, direction: &'static str, fix_string: String) { + let id = self.next_id; + self.next_id += 1; + self.messages.push(MessageLogEntry { + id, + direction, + fix_string, + }); + } +} + +#[derive(Clone)] +pub struct TestApplication { + pub shared_state: Arc>, +} + +impl TestApplication { + pub fn new(shared_state: Arc>) -> Self { + Self { shared_state } + } +} + +fn encode_pipe_separated(msg: &mut Message) -> String { + let config = EncodeConfig::with_separator(b'|'); + match msg.encode(&config) { + Ok(bytes) => String::from_utf8_lossy(&bytes).into_owned(), + Err(e) => format!(""), + } +} + +#[async_trait::async_trait] +impl Application for TestApplication { + type Outbound = OutboundMsg; + + async fn on_outbound_message(&self, msg: &OutboundMsg) -> OutboundDecision { + let mut fix_msg = Message::new("FIX.4.4", msg.message_type()); + msg.write(&mut fix_msg); + let fix_string = encode_pipe_separated(&mut fix_msg); + if let Ok(mut state) = self.shared_state.lock() { + state.push("OUT", fix_string); + } + OutboundDecision::Send + } + + async fn on_inbound_message(&self, msg: &Message) -> InboundDecision { + info!("received inbound message"); + let mut cloned = msg.clone(); + let fix_string = encode_pipe_separated(&mut cloned); + if let Ok(mut state) = self.shared_state.lock() { + state.push("IN", fix_string); + } + InboundDecision::Accept + } + + async fn on_logout(&mut self, _reason: &str) { + info!("we've been logged out"); + } + + async fn on_logon(&mut self) { + info!("we've been logged in"); + } +} diff --git a/examples/simple-new-order/src/main.rs b/examples/order-entry/src/main.rs similarity index 51% rename from examples/simple-new-order/src/main.rs rename to examples/order-entry/src/main.rs index f56a4ac..6041125 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/order-entry/src/main.rs @@ -1,23 +1,21 @@ mod application; mod messages; +mod web; + +use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; use hotfix::config::Config; -use hotfix::field_types::{Date, Timestamp}; -use hotfix::fix44; use hotfix::initiator::Initiator; -use hotfix::session::SessionHandle; -use hotfix_web::{RouterConfig, build_router_with_config}; use std::path::Path; -use tokio::select; -use tokio::task::spawn_blocking; use tokio_util::sync::CancellationToken; use tracing::info; use tracing_subscriber::EnvFilter; -use crate::application::TestApplication; -use crate::messages::{NewOrderSingle, OutboundMsg}; +use crate::application::{SharedState, TestApplication}; +use crate::messages::OutboundMsg; +use crate::web::{OrderAppState, start_web_service}; #[derive(ValueEnum, Clone, Debug)] #[clap(rename_all = "lower")] @@ -66,19 +64,28 @@ async fn main() -> Result<()> { } let db_config = args.database.unwrap_or(Database::Memory); - let app = TestApplication::default(); + let shared_state = Arc::new(Mutex::new(SharedState::new())); + let app = TestApplication::new(shared_state.clone()); let initiator = start_session(&args.config, &db_config, app).await?; let status_service_token = CancellationToken::new(); let session_handle = initiator.session_handle(); let child_token = status_service_token.child_token(); + let order_app_state = OrderAppState { + shared_state, + initiator: initiator.clone(), + }; tokio::spawn(async move { - if let Err(e) = start_web_service(session_handle, child_token).await { + if let Err(e) = start_web_service(session_handle, child_token, order_app_state).await { tracing::error!("web service error: {e:?}"); } }); - user_loop(&initiator).await?; + tokio::signal::ctrl_c() + .await + .context("failed to listen for ctrl-c")?; + info!("shutting down"); + status_service_token.cancel(); initiator .shutdown(false) @@ -87,60 +94,6 @@ async fn main() -> Result<()> { Ok(()) } -async fn user_loop(session: &Initiator) -> Result<()> { - loop { - println!("(q) to quit, (s) to send message"); - - let command_task = spawn_blocking(|| -> Result { - let mut input = String::new(); - std::io::stdin() - .read_line(&mut input) - .context("failed to read line from stdin")?; - Ok(input) - }); - - let input: String = command_task - .await - .context("failed to join blocking task")??; - - match input.trim() { - "q" => { - return Ok(()); - } - "s" => { - send_message(session).await?; - } - _ => { - println!("Unrecognised command"); - } - } - } -} - -async fn send_message(session: &Initiator) -> Result<()> { - let mut order_id = format!("{}", uuid::Uuid::new_v4()); - order_id.truncate(12); - let order = NewOrderSingle { - transact_time: Timestamp::utc_now(), - symbol: "EUR/USD".to_string(), - cl_ord_id: order_id, - side: fix44::Side::Buy, - order_qty: 230, - settlement_date: Date::new(2023, 9, 19).context("invalid settlement date")?, - currency: "USD".to_string(), - number_of_allocations: 1, - allocation_account: "acc1".to_string(), - allocation_quantity: 230, - }; - let msg = OutboundMsg::NewOrderSingle(order); - - session - .send_forget(msg) - .await - .context("failed to send message")?; - Ok(()) -} - async fn start_session( config_path: &str, db_config: &Database, @@ -158,7 +111,7 @@ async fn start_session( Initiator::start(session_config, app, store).await? } Database::File => { - let store = hotfix::store::file::FileStore::new("data", "simple-new-order-store") + let store = hotfix::store::file::FileStore::new("data", "order-entry-store") .context("failed to create file store")?; Initiator::start(session_config, app, store).await? } @@ -166,32 +119,3 @@ async fn start_session( Ok(initiator) } - -async fn start_web_service( - session_handle: SessionHandle, - cancellation_token: CancellationToken, -) -> Result<()> { - let config = RouterConfig { - enable_admin_endpoints: true, - }; - let router = build_router_with_config(session_handle, config); - let host_and_port = std::env::var("HOST_AND_PORT").unwrap_or("0.0.0.0:9881".to_string()); - let listener = tokio::net::TcpListener::bind(&host_and_port) - .await - .context("failed to bind TCP listener")?; - - info!("starting web interface on http://{host_and_port}"); - - select! { - result = axum::serve(listener, router) => { - if let Err(e) = result { - tracing::error!("status service error: {}", e); - } - }, - () = cancellation_token.cancelled() => { - info!("status service cancelled"); - } - } - - Ok(()) -} diff --git a/examples/order-entry/src/messages.rs b/examples/order-entry/src/messages.rs new file mode 100644 index 0000000..c69a092 --- /dev/null +++ b/examples/order-entry/src/messages.rs @@ -0,0 +1,119 @@ +use anyhow::{Context, Result}; +use hotfix::Message as HotfixMessage; +use hotfix::field_types::{Date, Timestamp}; +use hotfix::fix44; +use hotfix::message::{OutboundMessage, Part, RepeatingGroup}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone)] +pub struct NewOrderSingle { + // order details + pub transact_time: Timestamp, + pub symbol: String, // CCY1/CCY2 as string + pub cl_ord_id: String, // unique order ID assigned by the customer + pub side: fix44::Side, + pub order_qty: u32, + pub settlement_date: Date, + pub currency: String, // the dealt currency + + // allocation + pub number_of_allocations: u32, + pub allocation_account: String, + pub allocation_quantity: u32, +} + +#[derive(Debug, Clone)] +pub enum OutboundMsg { + NewOrderSingle(NewOrderSingle), +} + +impl OutboundMessage for OutboundMsg { + fn write(&self, msg: &mut HotfixMessage) { + match self { + OutboundMsg::NewOrderSingle(order) => { + // order details + msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); + msg.set(fix44::SYMBOL, order.symbol.as_str()); + msg.set(fix44::CL_ORD_ID, order.cl_ord_id.as_str()); + msg.set(fix44::SIDE, order.side); + msg.set(fix44::ORDER_QTY, order.order_qty); + msg.set(fix44::SETTL_DATE, order.settlement_date); + msg.set(fix44::CURRENCY, order.currency.as_str()); + + // allocations + msg.set(fix44::NO_ALLOCS, order.number_of_allocations); + let mut allocation = RepeatingGroup::new(fix44::NO_ALLOCS, fix44::ALLOC_ACCOUNT); + allocation.set(fix44::ALLOC_ACCOUNT, order.allocation_account.as_str()); + allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); + msg.set_groups(vec![allocation]).unwrap(); + } + } + } + + fn message_type(&self) -> &str { + match self { + OutboundMsg::NewOrderSingle(_) => "D", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NewOrderSingleDto { + pub symbol: String, + pub cl_ord_id: String, + pub side: String, + pub order_qty: u32, + pub settlement_date: String, + pub currency: String, + pub number_of_allocations: u32, + pub allocation_account: String, + pub allocation_quantity: u32, +} + +impl NewOrderSingleDto { + pub fn into_order(self) -> Result { + let side = match self.side.as_str() { + "Buy" => fix44::Side::Buy, + "Sell" => fix44::Side::Sell, + other => anyhow::bail!("invalid side: {other}"), + }; + + let date_parts: Vec<&str> = self.settlement_date.split('-').collect(); + if date_parts.len() != 3 { + anyhow::bail!("invalid settlement_date format, expected YYYY-MM-DD"); + } + let year: u32 = date_parts[0].parse().context("invalid year")?; + let month: u32 = date_parts[1].parse().context("invalid month")?; + let day: u32 = date_parts[2].parse().context("invalid day")?; + let settlement_date = Date::new(year, month, day).context("invalid settlement date")?; + + Ok(NewOrderSingle { + transact_time: Timestamp::utc_now(), + symbol: self.symbol, + cl_ord_id: self.cl_ord_id, + side, + order_qty: self.order_qty, + settlement_date, + currency: self.currency, + number_of_allocations: self.number_of_allocations, + allocation_account: self.allocation_account, + allocation_quantity: self.allocation_quantity, + }) + } +} + +pub fn random_order_json() -> NewOrderSingleDto { + let mut order_id = format!("{}", uuid::Uuid::new_v4()); + order_id.truncate(12); + NewOrderSingleDto { + symbol: "EUR/USD".to_string(), + cl_ord_id: order_id, + side: "Buy".to_string(), + order_qty: 230, + settlement_date: "2023-09-19".to_string(), + currency: "USD".to_string(), + number_of_allocations: 1, + allocation_account: "acc1".to_string(), + allocation_quantity: 230, + } +} diff --git a/examples/order-entry/src/web.rs b/examples/order-entry/src/web.rs new file mode 100644 index 0000000..cfccdb0 --- /dev/null +++ b/examples/order-entry/src/web.rs @@ -0,0 +1,232 @@ +use std::sync::{Arc, Mutex}; + +use anyhow::{Context, Result}; +use axum::Router; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use axum::response::{Html, IntoResponse, Json}; +use axum::routing::{get, post}; +use hotfix::initiator::Initiator; +use hotfix::session::SessionHandle; +use hotfix_web::{RouterConfig, build_router_with_config}; +use serde::Deserialize; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use crate::application::SharedState; +use crate::messages::{NewOrderSingleDto, OutboundMsg, random_order_json}; + +#[derive(Clone)] +pub struct OrderAppState { + pub shared_state: Arc>, + pub initiator: Initiator, +} + +pub async fn start_web_service( + session_handle: SessionHandle, + cancellation_token: CancellationToken, + order_app_state: OrderAppState, +) -> Result<()> { + let config = RouterConfig { + enable_admin_endpoints: true, + }; + let hotfix_router = build_router_with_config(session_handle, config); + + let order_router = Router::new() + .route("/order", get(order_page)) + .route("/api/send-order", post(send_order)) + .route("/api/messages", get(get_messages)) + .route("/api/random-order", get(get_random_order)) + .with_state(order_app_state); + + let router = hotfix_router.merge(order_router); + + let host_and_port = std::env::var("HOST_AND_PORT").unwrap_or("0.0.0.0:9881".to_string()); + let listener = tokio::net::TcpListener::bind(&host_and_port) + .await + .context("failed to bind TCP listener")?; + + info!("starting web interface on http://{host_and_port}"); + + select! { + result = axum::serve(listener, router) => { + if let Err(e) = result { + tracing::error!("status service error: {}", e); + } + }, + () = cancellation_token.cancelled() => { + info!("status service cancelled"); + } + } + + Ok(()) +} + +async fn order_page() -> Html<&'static str> { + Html(ORDER_HTML) +} + +async fn send_order( + State(state): State, + Json(order_json): Json, +) -> impl IntoResponse { + let order = match order_json.into_order() { + Ok(o) => o, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ); + } + }; + let msg = OutboundMsg::NewOrderSingle(order); + match state.initiator.send_forget(msg).await { + Ok(()) => (StatusCode::OK, Json(serde_json::json!({"status": "sent"}))), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ), + } +} + +#[derive(Deserialize)] +struct MessageQuery { + since: Option, +} + +async fn get_messages( + State(state): State, + Query(query): Query, +) -> Json { + let since = query.since.unwrap_or(0); + let shared = state.shared_state.lock().unwrap(); + let messages: Vec<_> = shared.messages.iter().filter(|m| m.id > since).collect(); + Json(serde_json::json!({ "messages": messages })) +} + +async fn get_random_order() -> Json { + Json(random_order_json()) +} + +const ORDER_HTML: &str = r##" + + + + + New Order Single + + + +
+

FIX New Order Single

+
+ +
+

Order JSON

+ +
+ + +
+
+
+ + +
+

FIX Message Log

+
+
No messages yet...
+
+
+
+
+ + + + +"##; diff --git a/examples/simple-new-order/README.md b/examples/simple-new-order/README.md deleted file mode 100644 index 5102226..0000000 --- a/examples/simple-new-order/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# Example initiator using HotFIX - -This dummy application demonstrates the current capabilities of HotFIX. -It connects to an acceptor, and sends a hard-coded single order. - -It's mainly used for testing session-level message flows, such as -logons, logouts, disconnects, and resends. - -## Running the app - -You need an acceptor capable of receiving FIX 4.4 messages. If you already have one, -great, otherwise [the QuickFIX/J example executor](https://github.com/quickfix-j/quickfixj/tree/master/quickfixj-examples/executor) -is straightforward to modify for this use-case. - -If you need to modify the configuration, you can find this in `./config/test-config.toml`. - -To run the application, just use `cargo run`. Pass in the config path and optionally -the log file path as CLI arguments, e.g. - -```shell -cargo run -- -c config/test-config.toml -``` - -Most of HotFIX's logging is `debug` level, so you may want to adjust -the log levels accordingly: - -```shell -RUST_LOG=info,hotfix=debug -``` - -## Message store selection - -By default, this example uses the file-system message store, which -requires no setup. Message state is persisted to files in the working -directory. - -Alternatively, you can try out the MongoDB store. This requires you to -spin up MongoDB locally first using the provided `docker-compose` file. -Once you have MongoDB running, you can run the application with the -`--database mongodb` flag to use it. diff --git a/examples/simple-new-order/docker-compose.yml b/examples/simple-new-order/docker-compose.yml deleted file mode 100644 index 7c4f7e6..0000000 --- a/examples/simple-new-order/docker-compose.yml +++ /dev/null @@ -1,22 +0,0 @@ -version: "3.10" - -services: - mongo: - image: mongo:6 - command: [--replSet, my-replica-set, --bind_ip_all, --port, "30001"] - ports: - - "30001:30001" - healthcheck: - test: test $$(mongosh --port 30001 --quiet --eval "try {rs.initiate({_id:'my-replica-set',members:[{_id:0,host:\"localhost:30001\"}]})} catch(e) {rs.status().ok}") -eq 1 - interval: 10s - start_period: 30s - - dynamo: - image: "amazon/dynamodb-local:latest" - user: root - container_name: dynamodb-local - ports: - - "8000:8000" - volumes: - - "./docker-data/dynamodb:/home/dynamodblocal/data" - working_dir: /home/dynamodblocal diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs deleted file mode 100644 index d3d7735..0000000 --- a/examples/simple-new-order/src/application.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::messages::OutboundMsg; -use hotfix::Application; -use hotfix::Message; -use hotfix::application::{InboundDecision, OutboundDecision}; -use tracing::info; - -#[derive(Default)] -pub struct TestApplication {} - -#[async_trait::async_trait] -impl Application for TestApplication { - type Outbound = OutboundMsg; - - async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { - OutboundDecision::Send - } - - async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision { - info!("received inbound message"); - InboundDecision::Accept - } - - async fn on_logout(&mut self, _reason: &str) { - info!("we've been logged out"); - } - - async fn on_logon(&mut self) { - info!("we've been logged in"); - } -} diff --git a/examples/simple-new-order/src/messages.rs b/examples/simple-new-order/src/messages.rs deleted file mode 100644 index 614aad2..0000000 --- a/examples/simple-new-order/src/messages.rs +++ /dev/null @@ -1,56 +0,0 @@ -use hotfix::Message as HotfixMessage; -use hotfix::field_types::{Date, Timestamp}; -use hotfix::fix44; -use hotfix::message::{OutboundMessage, Part, RepeatingGroup}; - -#[derive(Debug, Clone)] -pub struct NewOrderSingle { - // order details - pub transact_time: Timestamp, - pub symbol: String, // CCY1/CCY2 as string - pub cl_ord_id: String, // unique order ID assigned by the customer - pub side: fix44::Side, - pub order_qty: u32, - pub settlement_date: Date, - pub currency: String, // the dealt currency - - // allocation - pub number_of_allocations: u32, - pub allocation_account: String, - pub allocation_quantity: u32, -} - -#[derive(Debug, Clone)] -pub enum OutboundMsg { - NewOrderSingle(NewOrderSingle), -} - -impl OutboundMessage for OutboundMsg { - fn write(&self, msg: &mut HotfixMessage) { - match self { - OutboundMsg::NewOrderSingle(order) => { - // order details - msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); - msg.set(fix44::SYMBOL, order.symbol.as_str()); - msg.set(fix44::CL_ORD_ID, order.cl_ord_id.as_str()); - msg.set(fix44::SIDE, order.side); - msg.set(fix44::ORDER_QTY, order.order_qty); - msg.set(fix44::SETTL_DATE, order.settlement_date); - msg.set(fix44::CURRENCY, order.currency.as_str()); - - // allocations - msg.set(fix44::NO_ALLOCS, order.number_of_allocations); - let mut allocation = RepeatingGroup::new(fix44::NO_ALLOCS, fix44::ALLOC_ACCOUNT); - allocation.set(fix44::ALLOC_ACCOUNT, order.allocation_account.as_str()); - allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); - msg.set_groups(vec![allocation]).unwrap(); - } - } - } - - fn message_type(&self) -> &str { - match self { - OutboundMsg::NewOrderSingle(_) => "D", - } - } -}