diff --git a/.gitignore b/.gitignore index 377396019..72f2b55d0 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ magicblock-test-storage/ # AI related **/CLAUDE.md +config.json diff --git a/Cargo.lock b/Cargo.lock index 9d7115c11..fb63dced7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,6 +356,43 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-nats" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5af9ebfb0a14481d3eaf6101e6391261e4f30d25b26a7635ade8a39482ded0" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.5", + "regex", + "ring", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -538,6 +575,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" + [[package]] name = "bincode" version = "1.3.3" @@ -811,6 +854,9 @@ name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +dependencies = [ + "serde", +] [[package]] name = "bzip2-sys" @@ -1105,6 +1151,12 @@ dependencies = [ "sha2-const-stable", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -1173,6 +1225,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1357,6 +1428,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.5" @@ -1456,7 +1538,16 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7" dependencies = [ - "signature", + "signature 1.6.4", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature 2.2.0", ] [[package]] @@ -1466,13 +1557,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" dependencies = [ "curve25519-dalek 3.2.0", - "ed25519", + "ed25519 1.5.3", "rand 0.7.3", "serde", "sha2 0.9.9", "zeroize", ] +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek 4.1.3", + "ed25519 2.2.3", + "sha2 0.10.9", + "signature 2.2.0", + "subtle", +] + [[package]] name = "ed25519-dalek-bip32" version = "0.2.0" @@ -1480,7 +1584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d2be62a4061b872c8c0873ee4fc6f101ce7b889d039f019c5fa2af471a59908" dependencies = [ "derivation-path", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "hmac 0.12.1", "sha2 0.10.9", ] @@ -1750,6 +1854,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -2548,6 +2661,26 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -2692,6 +2825,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2932,6 +3085,26 @@ dependencies = [ "libc", ] +[[package]] +name = "machineid-rs" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ceb4d434d69d7199abc3036541ba6ef86767a4356e3077d5a3419f85b70b14" +dependencies = [ + "hex", + "hmac 0.12.1", + "md-5", + "serde", + "serde_json", + "sha-1", + "sha2 0.10.9", + "sysinfo", + "uuid", + "whoami", + "winreg 0.11.0", + "wmi", +] + [[package]] name = "magic-domain-program" version = "0.2.0" @@ -3463,6 +3636,30 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "magicblock-replicator" +version = "0.8.3" +dependencies = [ + "async-nats", + "bincode", + "bytes", + "futures", + "machineid-rs", + "magicblock-accounts-db", + "magicblock-core", + "magicblock-ledger", + "notify", + "serde", + "solana-hash", + "solana-transaction", + "solana-transaction-error", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tracing", + "url", +] + [[package]] name = "magicblock-rpc-client" version = "0.8.3" @@ -3489,7 +3686,7 @@ dependencies = [ name = "magicblock-table-mania" version = "0.8.3" dependencies = [ - "ed25519-dalek", + "ed25519-dalek 1.0.1", "magicblock-metrics", "magicblock-rpc-client", "rand 0.9.2", @@ -3626,6 +3823,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.6" @@ -3767,6 +3974,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519 2.2.3", + "ed25519-dalek 2.2.0", + "getrandom 0.2.16", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -3777,6 +3999,42 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.10.0", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.10.0", +] + +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3786,6 +4044,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num" version = "0.2.1" @@ -4060,6 +4327,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -4164,6 +4440,16 @@ dependencies = [ "pinocchio-pubkey", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -4740,6 +5026,26 @@ dependencies = [ "unicode-width 0.2.0", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -4787,7 +5093,7 @@ dependencies = [ "cfg-if", "libc", "rustix 1.1.2", - "windows", + "windows 0.62.2", ] [[package]] @@ -4878,7 +5184,7 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "webpki-roots 0.25.4", - "winreg", + "winreg 0.50.0", ] [[package]] @@ -5071,6 +5377,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.2" @@ -5120,6 +5439,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -5143,6 +5472,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scc" version = "2.4.0" @@ -5317,6 +5655,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -5394,6 +5752,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.10.6" @@ -5490,12 +5859,34 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature 2.2.0", + "zeroize", +] + [[package]] name = "signature" version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest 0.10.7", + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.8" @@ -6009,7 +6400,7 @@ checksum = "a1feafa1691ea3ae588f99056f4bdd1293212c7ece28243d7da257c443e84753" dependencies = [ "bytemuck", "bytemuck_derive", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "solana-feature-set", "solana-instruction", "solana-precompile-error", @@ -6296,7 +6687,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dbb7042c2e0c561afa07242b2099d55c57bd1b1da3b6476932197d84e15e3e4" dependencies = [ "bs58", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "ed25519-dalek-bip32", "rand 0.7.3", "solana-derivation-path", @@ -7214,7 +7605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47d251c8f3dc015f320b4161daac7f108156c837428e5a8cc61136d25beb11d6" dependencies = [ "bs58", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "rand 0.8.5", "serde", "serde-big-array", @@ -7823,6 +8214,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "spl-associated-token-account" version = "6.0.0" @@ -8323,6 +8724,21 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "sysinfo" +version = "0.29.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd727fc423c2060f6c92d9534cef765c65a6ed3f428a03d7def74a8c4348e666" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -8642,6 +9058,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.0", + "httparse", + "rand 0.8.5", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "toml" version = "0.5.11" @@ -8744,7 +9181,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.13.5", - "rustls-native-certs", + "rustls-native-certs 0.8.2", "rustls-pemfile 2.2.0", "socket2 0.5.10", "tokio", @@ -8969,6 +9406,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.20.1" @@ -9164,6 +9611,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -9194,6 +9651,12 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.106" @@ -9287,6 +9750,24 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -9299,6 +9780,17 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -9330,6 +9822,17 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-implement 0.48.0", + "windows-interface 0.48.0", + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.62.2" @@ -9357,8 +9860,8 @@ version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link", "windows-result", "windows-strings", @@ -9375,6 +9878,17 @@ dependencies = [ "windows-threading", ] +[[package]] +name = "windows-implement" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2ee588991b9e7e6c8338edf3333fbe4da35dc72092643958ebb43f0ab2c49c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -9386,6 +9900,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "windows-interface" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6fb8df20c9bcaa8ad6ab513f7b40104840c8867d5751126e4df3b08388d0cc7" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "windows-interface" version = "0.59.3" @@ -9680,6 +10205,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a1a57ff50e9b408431e8f97d5456f2807f8eb2a2cd79b06068fc87f8ecf189" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "winreg" version = "0.50.0" @@ -9696,6 +10231,20 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wmi" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daffb44abb7d2e87a1233aa17fdbde0d55b890b32a23a1f908895b87fa6f1a00" +dependencies = [ + "chrono", + "futures", + "log", + "serde", + "thiserror 1.0.69", + "windows 0.48.0", +] + [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index 833da8581..691b0a16c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "magicblock-ledger", "magicblock-metrics", "magicblock-processor", + "magicblock-replicator", "magicblock-rpc-client", "magicblock-table-mania", "magicblock-task-scheduler", @@ -51,9 +52,11 @@ agave-geyser-plugin-interface = { version = "2.2" } anyhow = "1.0.86" arc-swap = { version = "1.7" } assert_matches = "1.5.0" +async-nats = "0.46" async-trait = "0.1.77" base64 = "0.21.7" bincode = "1.3.3" +bytes = "1.0" borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] } bs58 = "0.5.1" byteorder = "1.5.0" @@ -117,6 +120,8 @@ magicblock-tui-client = { path = "./tools/magicblock-tui-client" } magicblock-validator-admin = { path = "./magicblock-validator-admin" } magicblock-version = { path = "./magicblock-version" } +machineid-rs = "1.2" + num-derive = "0.4" num-format = "0.4.4" num-traits = "0.2" diff --git a/magicblock-accounts-db/src/lib.rs b/magicblock-accounts-db/src/lib.rs index 86c75d9cd..d2b6013ab 100644 --- a/magicblock-accounts-db/src/lib.rs +++ b/magicblock-accounts-db/src/lib.rs @@ -434,6 +434,10 @@ impl AccountsDb { pub fn lock_database(&self) -> RwLockWriteGuard<'_, ()> { self.write_lock.write() } + + pub fn database_directory(&self) -> &Path { + self.snapshot_manager.database_path() + } } impl AccountsBank for AccountsDb { diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 22f2a6312..e47169edc 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -42,7 +42,9 @@ use magicblock_config::{ }; use magicblock_core::{ link::{ - blocks::BlockUpdateTx, link, transactions::TransactionSchedulerHandle, + blocks::BlockUpdateTx, + link, + transactions::{SchedulerMode, TransactionSchedulerHandle}, }, Slot, }; @@ -55,10 +57,7 @@ use magicblock_metrics::{metrics::TRANSACTION_COUNT, MetricsService}; use magicblock_processor::{ build_svm_env, loader::load_upgradeable_programs, - scheduler::{ - state::{SchedulerMode, TransactionSchedulerState}, - TransactionScheduler, - }, + scheduler::{state::TransactionSchedulerState, TransactionScheduler}, }; use magicblock_program::{ init_magic_sys, diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 899555e29..65750b7da 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -287,11 +287,11 @@ impl TransactionSchedulerHandle { txn: impl SanitizeableTransaction, ) -> TransactionResult { let mode = TransactionProcessingMode::Replay(position); - let transaction = txn.sanitize(true)?; + let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let txn = ProcessableTransaction { transaction, mode, - encoded: None, + encoded, }; self.0 .send(txn) @@ -321,3 +321,14 @@ impl TransactionSchedulerHandle { rx.await.map_err(|_| TransactionError::ClusterMaintenance) } } + +/// Scheduler execution mode (used in mode switching). +/// +/// Send via channel to transition the scheduler between modes. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SchedulerMode { + /// Accept client transactions with concurrent execution. + Primary, + /// Replay transactions with strict ordering. + Replica, +} diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index d3c180f8d..69cd80d5d 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -17,7 +17,7 @@ use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::{ link::transactions::{ - ProcessableTransaction, TransactionProcessingMode, + ProcessableTransaction, SchedulerMode, TransactionProcessingMode, TransactionToProcessRx, }, Slot, @@ -27,7 +27,7 @@ use solana_account::{from_account, to_account}; use solana_program::slot_hashes::SlotHashes; use solana_program_runtime::loaded_programs::ProgramCache; use solana_sdk_ids::sysvar::{clock, slot_hashes}; -use state::{SchedulerMode, TransactionSchedulerState}; +use state::TransactionSchedulerState; use tokio::{ runtime::Builder, sync::mpsc::{channel, Receiver, Sender}, diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index d8acb5537..03b9a01af 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -10,7 +10,8 @@ use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::link::{ accounts::AccountUpdateTx, transactions::{ - ScheduledTasksTx, TransactionStatusTx, TransactionToProcessRx, + ScheduledTasksTx, SchedulerMode, TransactionStatusTx, + TransactionToProcessRx, }, }; use magicblock_ledger::Ledger; @@ -145,15 +146,3 @@ impl TransactionSchedulerState { } } } - -/// Scheduler execution mode command. -/// -/// Send via channel to transition the scheduler between modes. -/// See [`CoordinationMode`](super::coordinator::CoordinationMode) for internal state. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum SchedulerMode { - /// Accept client transactions with concurrent execution. - Primary, - /// Replay transactions with strict ordering. - Replica, -} diff --git a/magicblock-replicator/Cargo.toml b/magicblock-replicator/Cargo.toml new file mode 100644 index 000000000..12457e7fc --- /dev/null +++ b/magicblock-replicator/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "magicblock-replicator" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +async-nats = { workspace = true } +bincode = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +magicblock-accounts-db = { workspace = true } +magicblock-core = { workspace = true } +magicblock-ledger = { workspace = true } +machineid-rs = { workspace = true } +notify = { version = "8.0", features = ["macos_kqueue"] } +thiserror = { workspace = true } +tokio = { workspace = true, features = [ + "net", + "rt", + "macros", + "sync", + "io-util", + "fs", +] } +serde = { workspace = true, features = ["derive"] } +solana-hash = { workspace = true, features = ["serde"] } +solana-transaction = { workspace = true, features = ["serde"] } +solana-transaction-error = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/magicblock-replicator/src/error.rs b/magicblock-replicator/src/error.rs new file mode 100644 index 000000000..2fc69d7bc --- /dev/null +++ b/magicblock-replicator/src/error.rs @@ -0,0 +1,52 @@ +//! Error types for the replication protocol. + +use std::fmt::{Debug, Display}; + +use magicblock_ledger::errors::LedgerError; +use solana_transaction_error::TransactionError; + +/// Replication operation errors. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// NATS message broker error. + #[error("message broker error: {0}")] + Nats(async_nats::Error), + + /// I/O operation failed. + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + /// Serialization or deserialization failed. + #[error("serialization error: {0}")] + SerDe(#[from] bincode::Error), + + /// Ledger access error. + #[error("ledger access error: {0}")] + Ledger(#[from] LedgerError), + + /// Transaction execution error. + #[error("transaction execution error: {0}")] + Transaction(#[from] TransactionError), + + /// Internal protocol violation or malformed data. + #[error("internal error: {0}")] + Internal(String), + + /// File system watcher error. + #[error("watcher error: {0}")] + Watcher(#[from] notify::Error), +} + +// async_nats::Error is actually async_nats::error::Error where K is the error kind. +// We need this generic impl to convert all variants. +impl From> for Error +where + K: Display + Debug + Clone + PartialEq + Sync + Send + 'static, +{ + fn from(value: async_nats::error::Error) -> Self { + Self::Nats(value.into()) + } +} + +/// Convenience alias for `Result`. +pub type Result = std::result::Result; diff --git a/magicblock-replicator/src/lib.rs b/magicblock-replicator/src/lib.rs new file mode 100644 index 000000000..9a616b463 --- /dev/null +++ b/magicblock-replicator/src/lib.rs @@ -0,0 +1,25 @@ +//! State replication protocol for streaming validator events via NATS JetStream. +//! +//! # Architecture +//! +//! The replicator enables primary-standby state replication using NATS JetStream: +//! +//! - **Producer**: Primary node publishes transactions, blocks, and superblocks +//! - **Consumer**: Standby nodes consume events to maintain synchronized state +//! - **Snapshots**: Periodic AccountsDb snapshots enable fast standby recovery +//! +//! # Wire Format +//! +//! Messages are serialized with bincode (4-byte discriminator + payload). + +pub mod error; +pub mod nats; +pub mod proto; +pub mod service; +pub mod watcher; + +#[cfg(test)] +mod tests; + +pub use error::{Error, Result}; +pub use proto::{Message, TransactionIndex}; diff --git a/magicblock-replicator/src/nats/broker.rs b/magicblock-replicator/src/nats/broker.rs new file mode 100644 index 000000000..a43a846d2 --- /dev/null +++ b/magicblock-replicator/src/nats/broker.rs @@ -0,0 +1,186 @@ +//! NATS JetStream connection with initialized streams and buckets. + +use std::time::Duration; + +use async_nats::{ + jetstream::{ + kv, + object_store::{self, GetErrorKind, ObjectMetadata}, + stream::{self, Compression}, + Context, ContextBuilder, + }, + ConnectOptions, Event, ServerAddr, Subject, +}; +use bytes::Bytes; +use magicblock_core::Slot; +use tokio::{fs::File, io::AsyncReadExt}; +use tracing::{debug, error, info, instrument, warn}; +use url::Url; + +use super::{ + cfg, snapshot::SnapshotMeta, Consumer, Producer, Snapshot, Subjects, +}; +use crate::Result; + +/// NATS JetStream connection with initialized streams and buckets. +pub struct Broker { + pub(crate) ctx: Context, + pub(crate) sequence: u64, +} + +impl Broker { + /// Connects to NATS and initializes all JetStream resources. + /// + /// Resources are created idempotently - safe to call multiple times. + pub async fn connect(url: Url) -> Result { + let addr = ServerAddr::from_url(url)?; + + let client = ConnectOptions::new() + .max_reconnects(None) + .reconnect_delay_callback(|attempts| { + let ms = (attempts as u64 * cfg::RECONNECT_BASE_MS) + .min(cfg::RECONNECT_MAX_MS); + Duration::from_millis(ms) + }) + .event_callback(|event| async move { + match event { + Event::Disconnected => warn!("NATS disconnected"), + Event::Connected => info!("NATS connected"), + Event::ClientError(e) => warn!(%e, "NATS client error"), + other => debug!(?other, "NATS event"), + } + }) + .connect(addr) + .await?; + + let ctx = ContextBuilder::new() + .timeout(cfg::API_TIMEOUT) + .max_ack_inflight(cfg::MAX_ACK_INFLIGHT) + .backpressure_on_inflight(true) + .build(client); + + let mut broker = Self { ctx, sequence: 0 }; + broker.init_resources().await?; + Ok(broker) + } + + /// Initializes streams, object stores, and KV buckets. + async fn init_resources(&mut self) -> Result<()> { + let info = self + .ctx + .create_or_update_stream(stream::Config { + name: cfg::STREAM.into(), + max_bytes: cfg::STREAM_BYTES, + subjects: Subjects::all().into_iter().map(Into::into).collect(), + max_age: cfg::TTL_STREAM, + duplicate_window: cfg::DUP_WINDOW, + description: Some("Magicblock validator events".into()), + compression: Some(Compression::S2), + ..Default::default() + }) + .await?; + + info!(stream = %info.config.name, messages = info.state.messages, "JetStream initialized"); + + self.ctx + .create_object_store(object_store::Config { + bucket: cfg::SNAPSHOTS.into(), + description: Some("AccountsDb snapshots".into()), + max_bytes: cfg::SNAPSHOT_BYTES, + ..Default::default() + }) + .await?; + + self.ctx + .create_key_value(kv::Config { + bucket: cfg::PRODUCER_LOCK.into(), + description: "Producer leader election".into(), + max_age: cfg::TTL_LOCK, + ..Default::default() + }) + .await?; + + self.sequence = info.state.first_sequence; + + Ok(()) + } + + /// Publishes a serialized message to the stream. + /// + /// If `ack` is true, waits for server acknowledgment and updates internal sequence. + pub async fn publish( + &mut self, + subject: Subject, + payload: Bytes, + ack: bool, + ) -> Result<()> { + let f = self.ctx.publish(subject, payload).await?; + if ack { + self.sequence = f.await?.sequence; + } + Ok(()) + } + + /// Retrieves the latest snapshot, if one exists. + pub async fn get_snapshot(&mut self) -> Result> { + let store = self.ctx.get_object_store(cfg::SNAPSHOTS).await?; + + let mut object = match store.get(cfg::SNAPSHOT_NAME).await { + Ok(obj) => obj, + Err(e) if e.kind() == GetErrorKind::NotFound => return Ok(None), + Err(e) => return Err(e.into()), + }; + + let info = object.info(); + let meta = SnapshotMeta::parse(info)?; + + let mut data = Vec::with_capacity(info.size); + object.read_to_end(&mut data).await?; + self.sequence = meta.sequence; + + Ok(Some(Snapshot { + data, + slot: meta.slot, + })) + } + + /// Uploads a snapshot in the background. + /// + /// The snapshot is tagged with the current stream sequence number, + /// allowing standbys to resume replay from the correct position. + #[instrument(skip(self, file))] + pub async fn put_snapshot(&self, slot: Slot, mut file: File) -> Result<()> { + let store = self.ctx.get_object_store(cfg::SNAPSHOTS).await?; + // Next sequence (snapshot captures state after last published message) + let sequence = self.sequence + 1; + + let meta = ObjectMetadata { + name: cfg::SNAPSHOT_NAME.into(), + metadata: SnapshotMeta { slot, sequence }.into_headers(), + ..Default::default() + }; + + // Background upload to avoid blocking + tokio::spawn(async move { + if let Err(error) = store.put(meta, &mut file).await { + error!(%error, "snapshot upload failed"); + } + }); + + Ok(()) + } + + /// Creates a consumer for receiving replicated events. + pub async fn create_consumer( + &self, + id: &str, + reset: bool, + ) -> Result { + Consumer::new(id, self, reset).await + } + + /// Creates a producer for publishing events. + pub async fn create_producer(&self, id: &str) -> Result { + Producer::new(id, &self.ctx).await + } +} diff --git a/magicblock-replicator/src/nats/consumer.rs b/magicblock-replicator/src/nats/consumer.rs new file mode 100644 index 000000000..47a424f2c --- /dev/null +++ b/magicblock-replicator/src/nats/consumer.rs @@ -0,0 +1,77 @@ +//! Pull-based consumer for receiving replicated events. + +use async_nats::jetstream::consumer::{ + pull::{Config as PullConfig, Stream as MessageStream}, + AckPolicy, DeliverPolicy, PullConsumer, +}; +use tracing::warn; + +use super::cfg; +use crate::{nats::Broker, Result}; + +/// Pull-based consumer for receiving replicated events. +/// +/// Supports resuming from a specific sequence number for catch-up replay +/// after recovering from a snapshot. +pub struct Consumer { + inner: PullConsumer, +} + +impl Consumer { + pub(crate) async fn new( + id: &str, + broker: &Broker, + reset: bool, + ) -> Result { + let stream = broker.ctx.get_stream(cfg::STREAM).await?; + + let deliver_policy = if reset { + // Delete and recreate to change start position + if let Err(error) = stream.delete_consumer(id).await { + warn!(%error, "error removing consumer"); + } + DeliverPolicy::ByStartSequence { + start_sequence: broker.sequence, + } + } else { + DeliverPolicy::All + }; + + let inner = stream + .get_or_create_consumer( + id, + PullConfig { + durable_name: Some(id.into()), + ack_policy: AckPolicy::All, + ack_wait: cfg::ACK_WAIT, + max_ack_pending: cfg::MAX_ACK_PENDING, + deliver_policy, + ..Default::default() + }, + ) + .await?; + + Ok(Self { inner }) + } + + /// Returns a stream of messages from the consumer. + /// + /// Use this in a `tokio::select!` loop to process messages as they arrive. + /// Messages are fetched in batches for efficiency. + pub async fn messages(&self) -> MessageStream { + loop { + let result = self + .inner + .stream() + .max_messages_per_batch(cfg::BATCH_SIZE) + .messages() + .await; + match result { + Ok(s) => break s, + Err(error) => { + warn!(%error, "failed to create message stream") + } + } + } + } +} diff --git a/magicblock-replicator/src/nats/lock_watcher.rs b/magicblock-replicator/src/nats/lock_watcher.rs new file mode 100644 index 000000000..303a84926 --- /dev/null +++ b/magicblock-replicator/src/nats/lock_watcher.rs @@ -0,0 +1,60 @@ +//! Lock watcher for detecting leader expiration. + +use async_nats::jetstream::kv::{Operation, Watch}; +use futures::StreamExt; +use tracing::warn; + +use super::cfg; +use crate::nats::Broker; + +/// Watches the leader lock for expiration/deletion. +/// +/// Used by standby nodes to detect when the primary's lock expires, +/// enabling faster takeover than waiting for the activity timeout. +pub struct LockWatcher { + watch: Box, +} + +impl LockWatcher { + /// Creates a new lock watcher. + pub(crate) async fn new(broker: &Broker) -> Self { + let watch = loop { + let store = match broker.ctx.get_key_value(cfg::PRODUCER_LOCK).await + { + Ok(s) => s, + Err(error) => { + tracing::error!(%error, "failed to obtain lock object"); + continue; + } + }; + match store.watch(cfg::LOCK_KEY).await { + Ok(w) => break Box::new(w), + Err(error) => { + tracing::error!(%error, "failed to create lock watcher"); + continue; + } + } + }; + Self { watch } + } + + /// Waits for the lock to be deleted or expire. + /// + /// Returns when the lock key is deleted or purged (TTL expiry). + /// This signals that a takeover attempt should be made. + pub async fn wait_for_expiry(&mut self) { + while let Some(result) = self.watch.next().await { + let operation = match result { + Ok(entry) => entry.operation, + Err(e) => { + warn!(%e, "lock watch error"); + continue; + } + }; + if matches!(operation, Operation::Delete | Operation::Purge) { + return; + } + } + warn!("lock watch stream ended unexpectedly"); + } +} diff --git a/magicblock-replicator/src/nats/mod.rs b/magicblock-replicator/src/nats/mod.rs new file mode 100644 index 000000000..70cc0f511 --- /dev/null +++ b/magicblock-replicator/src/nats/mod.rs @@ -0,0 +1,100 @@ +//! NATS JetStream client for event replication. +//! +//! # Components +//! +//! - [`Broker`]: Connection manager with stream/bucket initialization +//! - [`Producer`]: Event publisher with distributed leader lock +//! - [`Consumer`]: Event subscriber for standby replay +//! - [`Snapshot`]: AccountsDb snapshot with positioning metadata +//! - [`LockWatcher`]: Watcher for leader lock expiration + +mod broker; +mod consumer; +mod lock_watcher; +mod producer; +mod snapshot; + +use async_nats::Subject; +pub use broker::Broker; +pub use consumer::Consumer; +pub use lock_watcher::LockWatcher; +pub use producer::Producer; +pub use snapshot::Snapshot; + +// ============================================================================= +// Configuration +// ============================================================================= + +/// Resource names and configuration constants. +mod cfg { + use std::time::Duration; + + pub const STREAM: &str = "EVENTS"; + pub const SNAPSHOTS: &str = "SNAPSHOTS"; + pub const PRODUCER_LOCK: &str = "PRODUCER"; + pub const LOCK_KEY: &str = "lock"; + pub const SNAPSHOT_NAME: &str = "accountsdb"; + + pub const META_SLOT: &str = "slot"; + pub const META_SEQUENCE: &str = "sequence"; + + // Size limits (256 GB stream, 512 GB snapshots) + pub const STREAM_BYTES: i64 = 256 * 1024 * 1024 * 1024; + pub const SNAPSHOT_BYTES: i64 = 512 * 1024 * 1024 * 1024; + + // Timeouts + pub const TTL_STREAM: Duration = Duration::from_secs(24 * 60 * 60); + pub const TTL_LOCK: Duration = Duration::from_secs(5); + pub const ACK_WAIT: Duration = Duration::from_secs(30); + pub const API_TIMEOUT: Duration = Duration::from_secs(2); + pub const DUP_WINDOW: Duration = Duration::from_secs(30); + + // Reconnect backoff (exponential: 100ms base, 5s max) + pub const RECONNECT_BASE_MS: u64 = 100; + pub const RECONNECT_MAX_MS: u64 = 5000; + + // Backpressure + pub const MAX_ACK_PENDING: i64 = 512; + pub const MAX_ACK_INFLIGHT: usize = 2048; + pub const BATCH_SIZE: usize = 512; +} + +// ============================================================================= +// Subjects +// ============================================================================= + +/// NATS subjects for event types. +/// +/// Provides both string constants for stream configuration and typed subjects +/// for publishing. +pub struct Subjects; + +impl Subjects { + pub const TRANSACTION: &'static str = "event.transaction"; + pub const BLOCK: &'static str = "event.block"; + pub const SUPERBLOCK: &'static str = "event.superblock"; + + /// All subjects for stream configuration. + pub const fn all() -> [&'static str; 3] { + [Self::TRANSACTION, Self::BLOCK, Self::SUPERBLOCK] + } + + const fn from(s: &'static str) -> Subject { + Subject::from_static(s) + } + + /// Typed subject for transaction events. + pub fn transaction() -> Subject { + Self::from(Self::TRANSACTION) + } + + /// Typed subject for block events. + pub fn block() -> Subject { + Self::from(Self::BLOCK) + } + + /// Typed subject for superblock events. + pub fn superblock() -> Subject { + Self::from(Self::SUPERBLOCK) + } +} diff --git a/magicblock-replicator/src/nats/producer.rs b/magicblock-replicator/src/nats/producer.rs new file mode 100644 index 000000000..e2ab5137d --- /dev/null +++ b/magicblock-replicator/src/nats/producer.rs @@ -0,0 +1,71 @@ +//! Event producer with distributed lock for leader election. + +use async_nats::jetstream::{ + kv::{CreateErrorKind, Store, UpdateErrorKind}, + Context, +}; +use bytes::Bytes; +use tracing::warn; + +use super::cfg; +use crate::Result; + +/// Event producer with distributed lock for leader election. +/// +/// Only one producer can hold the lock at a time, ensuring exactly one +/// primary publishes events. The lock has a TTL and must be refreshed +/// periodically to maintain leadership. +pub struct Producer { + lock: Box, + id: Bytes, + revision: u64, +} + +impl Producer { + pub(crate) async fn new(id: &str, js: &Context) -> Result { + Ok(Self { + lock: Box::new(js.get_key_value(cfg::PRODUCER_LOCK).await?), + id: id.to_owned().into_bytes().into(), + revision: 0, + }) + } + + /// Attempts to acquire the leader lock. + /// + /// Returns `true` if this producer became the leader. + /// Returns `false` if another producer already holds the lock. + pub async fn acquire(&mut self) -> Result { + match self.lock.create(cfg::LOCK_KEY, self.id.clone()).await { + Ok(rev) => { + self.revision = rev; + Ok(true) + } + Err(e) if e.kind() == CreateErrorKind::AlreadyExists => Ok(false), + Err(e) => Err(e.into()), + } + } + + /// Refreshes the leader lock to prevent expiration. + /// + /// Returns `false` if we lost the lock (another producer took over). + /// This typically indicates a network partition or slow refresh. + pub async fn refresh(&mut self) -> Result { + match self + .lock + .update(cfg::LOCK_KEY, self.id.clone(), self.revision) + .await + { + Ok(rev) => { + self.revision = rev; + Ok(true) + } + Err(e) if e.kind() == UpdateErrorKind::WrongLastRevision => { + Ok(false) + } + Err(e) => { + warn!(%e, "lock refresh failed"); + Err(e.into()) + } + } + } +} diff --git a/magicblock-replicator/src/nats/snapshot.rs b/magicblock-replicator/src/nats/snapshot.rs new file mode 100644 index 000000000..a7988eb7e --- /dev/null +++ b/magicblock-replicator/src/nats/snapshot.rs @@ -0,0 +1,48 @@ +//! AccountsDb snapshot with positioning metadata. + +use std::collections::HashMap; + +use async_nats::jetstream::object_store; +use magicblock_core::Slot; + +use super::cfg; +use crate::{Error, Result}; + +/// AccountsDb snapshot with positioning metadata. +#[derive(Debug)] +pub struct Snapshot { + /// Raw snapshot bytes. + pub data: Vec, + /// Slot at which the snapshot was taken. + pub slot: Slot, +} + +/// Metadata stored with each snapshot object. +pub(crate) struct SnapshotMeta { + pub(crate) slot: Slot, + pub(crate) sequence: u64, +} + +impl SnapshotMeta { + /// Parses required metadata fields from object info. + pub(crate) fn parse(info: &object_store::ObjectInfo) -> Result { + let get_parsed = + |key: &str| info.metadata.get(key).and_then(|v| v.parse().ok()); + + let slot = get_parsed(cfg::META_SLOT).ok_or_else(|| { + Error::Internal("missing 'slot' in snapshot metadata".into()) + })?; + let sequence = get_parsed(cfg::META_SEQUENCE).ok_or_else(|| { + Error::Internal("missing 'sequence' in snapshot metadata".into()) + })?; + + Ok(Self { slot, sequence }) + } + + pub(crate) fn into_headers(self) -> HashMap { + HashMap::from([ + (cfg::META_SLOT.into(), self.slot.to_string()), + (cfg::META_SEQUENCE.into(), self.sequence.to_string()), + ]) + } +} diff --git a/magicblock-replicator/src/proto.rs b/magicblock-replicator/src/proto.rs new file mode 100644 index 000000000..92a113eab --- /dev/null +++ b/magicblock-replicator/src/proto.rs @@ -0,0 +1,92 @@ +//! Protocol message types for replication. +//! +//! # Wire Format +//! +//! The enum variant index serves as an implicit type tag. + +use async_nats::Subject; +use magicblock_core::Slot; +use serde::{Deserialize, Serialize}; +use solana_hash::Hash; +use solana_transaction::versioned::VersionedTransaction; + +use crate::nats::Subjects; + +/// Ordinal position of a transaction within a slot. +pub type TransactionIndex = u32; + +/// Index for block boundary markers (TransactionIndex::MAX - 1). +/// Used to identify Block messages in slot/index comparisons. +pub const BLOCK_INDEX: TransactionIndex = TransactionIndex::MAX - 1; + +/// Index for superblock checkpoint markers (TransactionIndex::MAX). +/// Used to identify SuperBlock messages in slot/index comparisons. +pub const SUPERBLOCK_INDEX: TransactionIndex = TransactionIndex::MAX; + +/// Top-level replication message envelope. +/// +/// Variant order is part of the wire format - reordering breaks compatibility. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub enum Message { + /// Transaction executed at a specific slot position. + Transaction(Transaction), + /// Slot boundary with blockhash for confirmation. + Block(Block), + /// Periodic checkpoint for state verification. + SuperBlock(SuperBlock), +} + +impl Message { + pub(crate) fn subject(&self) -> Subject { + match self { + Self::Transaction(_) => Subjects::transaction(), + Self::Block(_) => Subjects::block(), + Self::SuperBlock(_) => Subjects::superblock(), + } + } + + pub(crate) fn slot_and_index(&self) -> (Slot, TransactionIndex) { + match self { + Self::Transaction(tx) => (tx.slot, tx.index), + Self::Block(block) => (block.slot, BLOCK_INDEX), + Self::SuperBlock(superblock) => (superblock.slot, SUPERBLOCK_INDEX), + } + } +} + +/// Transaction with slot context for ordered replay. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Transaction { + /// Slot where the transaction was executed. + pub slot: Slot, + /// Ordinal position within the slot. + pub index: TransactionIndex, + /// Bincode-encoded `VersionedTransaction`. + pub payload: Vec, +} + +/// Slot boundary marker with blockhash. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Block { + /// Slot number. + pub slot: Slot, + /// Blockhash for this slot. + pub hash: Hash, + /// Unix timestamp (seconds). + pub timestamp: i64, +} + +/// Periodic checkpoint for state verification and catch-up. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct SuperBlock { + pub slot: Slot, + /// Rolling checksum for verification. + pub checksum: u64, +} + +impl Transaction { + /// Deserializes the inner `VersionedTransaction`. + pub fn decode(&self) -> bincode::Result { + bincode::deserialize(&self.payload) + } +} diff --git a/magicblock-replicator/src/service/context.rs b/magicblock-replicator/src/service/context.rs new file mode 100644 index 000000000..fcd16d0f7 --- /dev/null +++ b/magicblock-replicator/src/service/context.rs @@ -0,0 +1,173 @@ +//! Shared context for primary and standby roles. + +use std::sync::Arc; + +use machineid_rs::IdBuilder; +use magicblock_accounts_db::AccountsDb; +use magicblock_core::{ + link::transactions::{SchedulerMode, TransactionSchedulerHandle}, + Slot, +}; +use magicblock_ledger::Ledger; +use tokio::{ + fs::File, + sync::mpsc::{Receiver, Sender}, +}; +use tracing::info; + +use super::{Primary, Standby, CONSUMER_RETRY_DELAY}; +use crate::{ + nats::{Broker, Consumer, LockWatcher, Producer}, + proto::{self, TransactionIndex}, + watcher::SnapshotWatcher, + Error, Message, Result, +}; + +/// Shared state for both primary and standby roles. +pub struct ReplicationContext { + /// Node identifier for leader election. + pub id: String, + /// NATS broker. + pub broker: Broker, + /// Scheduler mode channel. + pub mode_tx: Sender, + /// Accounts database. + pub accountsdb: Arc, + /// Transaction ledger. + pub ledger: Arc, + /// Transaction scheduler. + pub scheduler: TransactionSchedulerHandle, + /// Current position. + pub slot: Slot, + pub index: TransactionIndex, +} + +impl ReplicationContext { + /// Creates context from ledger state. + pub async fn new( + broker: Broker, + mode_tx: Sender, + accountsdb: Arc, + ledger: Arc, + scheduler: TransactionSchedulerHandle, + ) -> Result { + let id = IdBuilder::new(machineid_rs::Encryption::SHA256) + .add_component(machineid_rs::HWIDComponent::SystemID) + .build("magicblock") + .map_err(|e| Error::Internal(e.to_string()))?; + + let (slot, index) = ledger + .get_latest_transaction_position()? + .unwrap_or_default(); + + info!(%id, slot, index, "context initialized"); + Ok(Self { + id, + broker, + mode_tx, + accountsdb, + ledger, + scheduler, + slot, + index, + }) + } + + /// Updates position. + pub fn update_position(&mut self, slot: Slot, index: TransactionIndex) { + self.slot = slot; + self.index = index; + } + + /// Writes block to ledger. + pub async fn write_block(&self, block: &proto::Block) -> Result<()> { + self.ledger + .write_block(block.slot, block.timestamp, block.hash)?; + Ok(()) + } + + /// Verifies superblock checksum. + pub fn verify_checksum(&self, sb: &proto::SuperBlock) -> Result<()> { + let _lock = self.accountsdb.lock_database(); + // SAFETY: Lock acquired above ensures no concurrent modifications + // during checksum computation. + let checksum = unsafe { self.accountsdb.checksum() }; + if checksum == sb.checksum { + Ok(()) + } else { + let msg = format!( + "accountsdb state mismatch at {}, expected {checksum}, got {}", + sb.slot, sb.checksum + ); + Err(Error::Internal(msg)) + } + } + + /// Creates a snapshot watcher for the database directory. + pub fn create_snapshot_watcher(&self) -> Result { + SnapshotWatcher::new(self.accountsdb.database_directory()) + } + + /// Attempts to acquire producer lock for primary role. + pub async fn try_acquire_producer(&self) -> Result> { + let mut producer = self.broker.create_producer(&self.id).await?; + producer + .acquire() + .await + .map(|acquired| acquired.then_some(producer)) + } + + /// Switches to replica mode. + pub async fn enter_replica_mode(&self) { + let _ = self.mode_tx.send(SchedulerMode::Replica).await; + } + + /// Switches to primary mode. + pub async fn enter_primary_mode(&self) { + let _ = self.mode_tx.send(SchedulerMode::Primary).await; + } + + /// Uploads snapshot. + pub async fn upload_snapshot(&self, file: File, slot: Slot) -> Result<()> { + self.broker.put_snapshot(slot, file).await + } + + /// Creates consumer with retry. + pub async fn create_consumer(&self, reset: bool) -> Consumer { + loop { + match self.broker.create_consumer(&self.id, reset).await { + Ok(c) => return c, + Err(e) => { + tracing::warn!(%e, "consumer creation failed, retrying"); + tokio::time::sleep(CONSUMER_RETRY_DELAY).await; + } + } + } + } + + /// Transitions to primary role with the given producer. + pub async fn into_primary( + self, + producer: Producer, + messages: Receiver, + ) -> Result { + let snapshots = self.create_snapshot_watcher()?; + self.enter_primary_mode().await; + Ok(Primary::new(self, producer, messages, snapshots)) + } + + /// Transitions to standby role. + /// reset parameter controls where in the stream the consumption starts: + /// true - the last known position that we know + /// false - the last known position that message broker tracks for us + pub async fn into_standby( + self, + messages: Receiver, + reset: bool, + ) -> Result { + let consumer = Box::new(self.create_consumer(reset).await); + let watcher = LockWatcher::new(&self.broker).await; + self.enter_replica_mode().await; + Ok(Standby::new(self, consumer, messages, watcher)) + } +} diff --git a/magicblock-replicator/src/service/mod.rs b/magicblock-replicator/src/service/mod.rs new file mode 100644 index 000000000..4c89ae5ad --- /dev/null +++ b/magicblock-replicator/src/service/mod.rs @@ -0,0 +1,118 @@ +//! Primary-standby state synchronization via NATS JetStream. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────┐ +//! │ Service │ +//! └──────┬──────┘ +//! ┌─────────┴─────────┐ +//! ▼ ▼ +//! ┌─────────┐ ┌─────────┐ +//! │ Primary │ ←────→│ Standby │ +//! └────┬────┘ └────┬────┘ +//! │ │ +//! ┌───┴───┐ ┌───┴───┐ +//! │Publish│ │Consume│ +//! │Upload │ │Apply │ +//! │Refresh│ │Verify │ +//! └───────┘ └───────┘ +//! ``` + +mod context; +mod primary; +mod standby; + +use std::{sync::Arc, thread::JoinHandle, time::Duration}; + +pub use context::ReplicationContext; +use magicblock_accounts_db::AccountsDb; +use magicblock_core::link::transactions::{ + SchedulerMode, TransactionSchedulerHandle, +}; +use magicblock_ledger::Ledger; +pub use primary::Primary; +pub use standby::Standby; +use tokio::{ + runtime::Builder, + sync::mpsc::{Receiver, Sender}, +}; + +use crate::{nats::Broker, Message, Result}; + +// ============================================================================= +// Constants +// ============================================================================= + +pub(crate) const LOCK_REFRESH_INTERVAL: Duration = Duration::from_secs(1); +pub(crate) const LEADER_TIMEOUT: Duration = Duration::from_secs(10); +const CONSUMER_RETRY_DELAY: Duration = Duration::from_secs(1); + +// ============================================================================= +// Service +// ============================================================================= + +/// Replication service with automatic role transitions. +pub enum Service { + Primary(Primary), + Standby(Standby), +} + +impl Service { + /// Creates service, attempting primary role first. + pub async fn new( + broker: Broker, + mode_tx: Sender, + accountsdb: Arc, + ledger: Arc, + scheduler: TransactionSchedulerHandle, + messages: Receiver, + reset: bool, + ) -> crate::Result { + let ctx = ReplicationContext::new( + broker, mode_tx, accountsdb, ledger, scheduler, + ) + .await?; + + // Try to become primary. + match ctx.try_acquire_producer().await? { + Some(producer) => { + Ok(Self::Primary(ctx.into_primary(producer, messages).await?)) + } + None => { + let standby = ctx.into_standby(messages, reset).await?; + Ok(Self::Standby(standby)) + } + } + } + + /// Runs service with automatic role transitions. + pub async fn run(mut self) -> Result<()> { + loop { + self = match self { + Service::Primary(p) => Service::Standby(p.run().await?), + Service::Standby(s) => match s.run().await { + Ok(p) => Service::Primary(p), + Err(error) => { + tracing::error!(%error, "unrecoverable replication failure"); + return Err(error); + } + }, + }; + } + } + + /// Spawns the service in a dedicated OS thread with a single-threaded runtime. + /// + /// Returns a `JoinHandle` that can be used to wait for the service to complete. + pub fn spawn(self) -> JoinHandle> { + std::thread::spawn(move || { + let runtime = Builder::new_current_thread() + .thread_name("replication-service") + .build() + .expect("Failed to build replication service runtime"); + + runtime.block_on(tokio::task::unconstrained(self.run())) + }) + } +} diff --git a/magicblock-replicator/src/service/primary.rs b/magicblock-replicator/src/service/primary.rs new file mode 100644 index 000000000..c2b00a161 --- /dev/null +++ b/magicblock-replicator/src/service/primary.rs @@ -0,0 +1,93 @@ +//! Primary node: publishes events and holds leader lock. + +use tokio::sync::mpsc::Receiver; +use tracing::{error, info, instrument, warn}; + +use super::{ReplicationContext, LOCK_REFRESH_INTERVAL}; +use crate::{ + nats::Producer, service::Standby, watcher::SnapshotWatcher, Message, Result, +}; + +/// Primary node: publishes events and holds leader lock. +pub struct Primary { + pub(crate) ctx: ReplicationContext, + producer: Producer, + messages: Receiver, + snapshots: SnapshotWatcher, +} + +impl Primary { + /// Creates a new primary instance. + pub fn new( + ctx: ReplicationContext, + producer: Producer, + messages: Receiver, + snapshots: SnapshotWatcher, + ) -> Self { + Self { + ctx, + producer, + messages, + snapshots, + } + } + + /// Runs until leadership lost, returns standby on demotion. + #[instrument(skip(self))] + pub async fn run(mut self) -> Result { + let mut lock_tick = tokio::time::interval(LOCK_REFRESH_INTERVAL); + + loop { + tokio::select! { + Some(msg) = self.messages.recv() => { + if let Err(error) = self.publish(msg).await { + // publish should not easily fail, if that happens, it means + // the message broker has become unrecoverably unreacheable + warn!(%error, "failed to publish the message"); + return self.ctx.into_standby(self.messages, true).await; + } + } + + _ = lock_tick.tick() => { + let held = match self.producer.refresh().await { + Ok(h) => h, + Err(e) => { + warn!(%e, "lock refresh failed"); + false + } + }; + if !held { + info!("lost leadership, demoting"); + return self.ctx.into_standby(self.messages, true).await; + } + } + + Some((file, slot)) = self.snapshots.recv() => { + if let Err(e) = self.ctx.upload_snapshot(file, slot).await { + warn!(%e, "snapshot upload failed"); + } + } + } + } + } + + async fn publish(&mut self, msg: Message) -> Result<()> { + let payload = match bincode::serialize(&msg) { + Ok(p) => p, + Err(error) => { + error!(%error, "serialization failed, should never happen"); + return Ok(()); + } + }; + let subject = msg.subject(); + let (slot, index) = msg.slot_and_index(); + let ack = matches!(msg, Message::SuperBlock(_)); + + self.ctx + .broker + .publish(subject, payload.into(), ack) + .await?; + self.ctx.update_position(slot, index); + Ok(()) + } +} diff --git a/magicblock-replicator/src/service/standby.rs b/magicblock-replicator/src/service/standby.rs new file mode 100644 index 000000000..9bd834d1b --- /dev/null +++ b/magicblock-replicator/src/service/standby.rs @@ -0,0 +1,139 @@ +//! Standby node: consumes events and watches for leader failure. + +use std::time::{Duration, Instant}; + +use async_nats::Message as NatsMessage; +use futures::StreamExt; +use magicblock_core::{ + link::transactions::{ReplayPosition, WithEncoded}, + Slot, +}; +use solana_transaction::versioned::VersionedTransaction; +use tokio::sync::mpsc::Receiver; +use tracing::{error, info, warn}; + +use super::{ReplicationContext, LEADER_TIMEOUT}; +use crate::{ + nats::{Consumer, LockWatcher}, + proto::TransactionIndex, + service::Primary, + Message, Result, +}; + +/// Standby node: consumes events and watches for leader failure. +pub struct Standby { + pub(crate) ctx: ReplicationContext, + consumer: Box, + messages: Receiver, + watcher: LockWatcher, + last_activity: Instant, +} + +impl Standby { + /// Creates a new standby instance. + pub fn new( + ctx: ReplicationContext, + consumer: Box, + messages: Receiver, + watcher: LockWatcher, + ) -> Self { + Self { + ctx, + consumer, + messages, + watcher, + last_activity: Instant::now(), + } + } + + /// Runs until leadership acquired, returns primary on promotion. + pub async fn run(mut self) -> Result { + let mut timeout_check = tokio::time::interval(Duration::from_secs(1)); + let mut stream = self.consumer.messages().await; + + loop { + tokio::select! { + result = stream.next() => { + let Some(result) = result else { + stream = self.consumer.messages().await; + continue; + }; + match result { + Ok(msg) => { + self.handle_message(&msg).await; + self.last_activity = Instant::now(); + } + Err(e) => warn!(%e, "message consumption stream error"), + } + } + + _ = self.watcher.wait_for_expiry() => { + info!("leader lock expired, attempting takeover"); + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership, promoting"); + return self.ctx.into_primary(producer, self.messages).await; + } + } + + _ = timeout_check.tick(), if self.last_activity.elapsed() > LEADER_TIMEOUT => { + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership via timeout, promoting"); + return self.ctx.into_primary(producer, self.messages).await; + } + } + } + } + } + + async fn handle_message(&mut self, msg: &NatsMessage) { + let message = match bincode::deserialize::(&msg.payload) { + Ok(m) => m, + Err(e) => { + warn!(%e, "deserialization failed"); + return; + } + }; + let (slot, index) = message.slot_and_index(); + + // Skip duplicates. + let obsolete = self.ctx.slot == slot && self.ctx.index >= index; + if self.ctx.slot > slot || obsolete { + return; + } + + let result = match message { + Message::Transaction(tx) => { + self.replay_tx(tx.slot, tx.index, tx.payload).await + } + Message::Block(block) => self.ctx.write_block(&block).await, + Message::SuperBlock(sb) => { + self.ctx.verify_checksum(&sb).inspect_err(|error| + error!(slot, %error, "accountsdb state has diverged") + ) + } + }; + + if let Err(error) = result { + warn!(slot, index, %error, "message processing error"); + return; + } + self.ctx.update_position(slot, index); + } + + async fn replay_tx( + &self, + slot: Slot, + index: TransactionIndex, + encoded: Vec, + ) -> Result<()> { + let pos = ReplayPosition { + slot, + index, + persist: true, + }; + let tx: VersionedTransaction = bincode::deserialize(&encoded)?; + let tx = WithEncoded { txn: tx, encoded }; + self.ctx.scheduler.replay(pos, tx).await?; + Ok(()) + } +} diff --git a/magicblock-replicator/src/tests.rs b/magicblock-replicator/src/tests.rs new file mode 100644 index 000000000..2b3247f05 --- /dev/null +++ b/magicblock-replicator/src/tests.rs @@ -0,0 +1,71 @@ +use std::{io::Write, path::Path, time::Duration}; + +use tempfile::TempDir; +use tokio::io::AsyncReadExt; + +use crate::watcher::*; + +#[tokio::test] +async fn test_watcher_detects_new_snapshot() { + let temp_dir = TempDir::new().unwrap(); + let mut watcher = SnapshotWatcher::new(temp_dir.path()).unwrap(); + + let test_data = b"test archive contents"; + let snapshot_path = temp_dir.path().join("snapshot-000000000001.tar.gz"); + std::fs::File::create(&snapshot_path) + .unwrap() + .write_all(test_data) + .unwrap(); + + let (mut file, slot) = + tokio::time::timeout(Duration::from_secs(2), watcher.recv()) + .await + .expect("Timeout waiting for snapshot") + .expect("Channel closed"); + + assert_eq!(slot, 1); + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await.unwrap(); + assert_eq!(contents, test_data); +} + +#[tokio::test] +async fn test_watcher_ignores_non_snapshots() { + let temp_dir = TempDir::new().unwrap(); + let mut watcher = SnapshotWatcher::new(temp_dir.path()).unwrap(); + + let other_path = temp_dir.path().join("other.txt"); + std::fs::File::create(&other_path).unwrap(); + + let test_data = b"test archive"; + let snapshot_path = temp_dir.path().join("snapshot-000000000002.tar.gz"); + std::fs::File::create(&snapshot_path) + .unwrap() + .write_all(test_data) + .unwrap(); + + let (mut file, slot) = + tokio::time::timeout(Duration::from_secs(2), watcher.recv()) + .await + .expect("Timeout waiting for snapshot") + .expect("Channel closed"); + + assert_eq!(slot, 2); + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await.unwrap(); + assert_eq!(contents, test_data); +} + +#[test] +fn test_parse_slot() { + assert_eq!( + parse_slot(Path::new("snapshot-000000000001.tar.gz")), + Some(1) + ); + assert_eq!( + parse_slot(Path::new("/some/path/snapshot-000000000123.tar.gz")), + Some(123) + ); + assert_eq!(parse_slot(Path::new("other.txt")), None); + assert_eq!(parse_slot(Path::new("snapshot-invalid.tar.gz")), None); +} diff --git a/magicblock-replicator/src/watcher.rs b/magicblock-replicator/src/watcher.rs new file mode 100644 index 000000000..d7c962923 --- /dev/null +++ b/magicblock-replicator/src/watcher.rs @@ -0,0 +1,119 @@ +//! Directory watcher for AccountsDb snapshot archives. +//! +//! Monitors a directory for new `.tar.gz` snapshot files and yields them +//! as open [`tokio::fs::File`] handles via a channel for tokio::select compatibility. + +use std::path::{Path, PathBuf}; + +use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use tokio::{fs::File, sync::mpsc}; +use tracing::{error, info}; + +use crate::Result; + +const SNAPSHOT_EXTENSION: &str = "tar.gz"; +const SNAPSHOT_PREFIX: &str = "snapshot-"; + +/// Extracts the slot number from a snapshot filename. +/// +/// Expected format: `snapshot-{slot:0>12}.tar.gz` +/// Example: `snapshot-000000000001.tar.gz` -> `Some(1)` +pub fn parse_slot(path: &Path) -> Option { + path.file_name()? + .to_str()? + .strip_prefix(SNAPSHOT_PREFIX)? + .strip_suffix(&format!(".{SNAPSHOT_EXTENSION}"))? + .parse() + .ok() +} + +/// Watcher for snapshot archive files in a directory. +/// +/// Uses `notify` for filesystem events and yields open file handles +/// via an mpsc channel compatible with `tokio::select!`. +pub struct SnapshotWatcher { + _watcher: RecommendedWatcher, + rx: mpsc::Receiver, +} + +impl SnapshotWatcher { + /// Creates a new watcher monitoring the given directory. + /// + /// The watcher detects newly created `.tar.gz` files and opens them + /// for reading when [`Self::recv`] is called. + /// + /// # Errors + /// + /// Returns an error if the watcher cannot be initialized or the + /// directory cannot be accessed. + pub fn new(dir: &Path) -> Result { + let (tx, rx) = mpsc::channel(32); + + let mut watcher = + notify::recommended_watcher(move |res: notify::Result| { + match res { + Ok(event) => { + if let Some(path) = Self::process_event(&event) { + if let Err(e) = tx.blocking_send(path) { + error!("Failed to send snapshot event: {}", e); + } + } + } + Err(e) => { + error!("Watch error: {}", e); + } + } + })?; + + watcher.watch(dir, RecursiveMode::NonRecursive)?; + info!(dir = %dir.display(), "Snapshot watcher started"); + + Ok(Self { + _watcher: watcher, + rx, + }) + } + + /// Process a filesystem event and extract snapshot path if relevant. + fn process_event(event: &Event) -> Option { + if !matches!(event.kind, EventKind::Create(_)) { + return None; + } + + for path in &event.paths { + if Self::is_snapshot_file(path) { + info!(path = %path.display(), "Detected new snapshot"); + return Some(path.clone()); + } + } + + None + } + + /// Check if a path is a snapshot archive file. + fn is_snapshot_file(path: &std::path::Path) -> bool { + path.is_file() + && path + .file_name() + .and_then(|n| n.to_str()) + .is_some_and(|n| n.ends_with(&format!(".{SNAPSHOT_EXTENSION}"))) + } + + /// Receive the next detected snapshot as an open file handle and slot. + /// + /// Opens the file for reading before returning. This method is + /// `tokio::select!` compatible. Returns `None` when the watcher + /// has been dropped. + pub async fn recv(&mut self) -> Option<(File, u64)> { + loop { + let path = self.rx.recv().await?; + let Some(slot) = parse_slot(&path) else { + continue; + }; + let Ok(file) = File::open(&path).await else { + continue; + }; + break Some((file, slot)); + } + } +} diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index c0adc2b61..eaedfa801 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -14,8 +14,9 @@ use magicblock_core::{ blocks::{BlockMeta, BlockUpdate, BlockUpdateTx}, link, transactions::{ - ReplayPosition, SanitizeableTransaction, TransactionResult, - TransactionSchedulerHandle, TransactionSimulationResult, + ReplayPosition, SanitizeableTransaction, SchedulerMode, + TransactionResult, TransactionSchedulerHandle, + TransactionSimulationResult, }, DispatchEndpoints, }, @@ -25,10 +26,7 @@ use magicblock_ledger::Ledger; use magicblock_processor::{ build_svm_env, loader::load_upgradeable_programs, - scheduler::{ - state::{SchedulerMode, TransactionSchedulerState}, - TransactionScheduler, - }, + scheduler::{state::TransactionSchedulerState, TransactionScheduler}, }; use solana_account::AccountSharedData; pub use solana_instruction::*;