From 6688448c49f4fab38fd4b579e1c25dc310821337 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Tue, 28 Jan 2025 16:51:47 +0100 Subject: [PATCH 1/6] Add trusted node verification for response validation Introduced TrustedNodeVerifier to handle comparison of responses between main and trusted nodes. This includes mechanisms for enabling/disabling verification, detailed error tracking, and discrepancy logging. Updated relevant components to integrate trusted node handling and stats collection. --- web3pi_proxy/config/conf.py | 5 ++ web3pi_proxy/core/proxy.py | 73 ++++++++++++++----- .../rpc/node/endpoint_pool/pool_manager.py | 15 ++++ .../endpoint_pool/trusted_node_verifier.py | 53 ++++++++++++++ .../connection/endpoint_connection_handler.py | 3 + .../connection/endpointconnection.py | 5 ++ .../connection/endpointconnectionstats.py | 24 +++++- .../core/rpc/node/rpcendpoint/endpointimpl.py | 4 + .../service/providers/serviceprovider.py | 21 +++++- 9 files changed, 181 insertions(+), 22 deletions(-) create mode 100644 web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py diff --git a/web3pi_proxy/config/conf.py b/web3pi_proxy/config/conf.py index 0b71e63..da86341 100644 --- a/web3pi_proxy/config/conf.py +++ b/web3pi_proxy/config/conf.py @@ -106,6 +106,11 @@ class AppConfig: LOADBALANCER: str = "LeastBusyLoadBalancer" + # enable or disable trusted node mirroring + ENABLE_TRUSTED_NODE_VERIFICATION: bool = False + # URL of the trusted node + TRUSTED_NODE_URL: str = None + def __init__(self): env = { **dotenv_values(".env"), # load shared development variables diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 04806b8..0c0a646 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -6,9 +6,12 @@ from concurrent.futures import ThreadPoolExecutor from typing import Callable +from mypy.nodes import node_kinds + from web3pi_proxy.config.conf import Config from web3pi_proxy.core.inbound.server import InboundServer from web3pi_proxy.core.interfaces.rpcrequest import RequestReaderMiddleware +from web3pi_proxy.core.rpc.node.endpoint_pool.trusted_node_verifier import TrustedNodeVerifier from web3pi_proxy.core.sockets.poller import ( get_poller, Poller, @@ -44,6 +47,7 @@ def __init__( middlewares: RequestMiddlewareDescr, connection_pool: EndpointConnectionPoolManager, state_updater: StateUpdater, + trusted_node_verifier: TrustedNodeVerifier ) -> None: self.request_reader = middlewares.instantiate() @@ -60,6 +64,8 @@ def __init__( self.num_workers = num_proxy_workers + self.trusted_node_verifier = trusted_node_verifier + @classmethod def __print_pre_init_info( cls, rr: RequestReaderMiddleware, cp: EndpointConnectionPoolManager @@ -143,6 +149,7 @@ def handle_client( active_client_connections: ClientSocketPool, ) -> None: endpoint_connection_handler = None + trusted_connection_handler = None try: req, err = self.request_reader.read_request(cs, RPCRequest()) @@ -170,6 +177,8 @@ def handle_client( # if self.is_cache_available: # TODO cache # self.read_cache() + + # Main node handling try: endpoint_connection_handler = self.connection_pool.get_connection(req) except Exception as error: @@ -183,26 +192,56 @@ def handle_client( ) return - try: - endpoint_connection_handler.send(req) - except ( - BrokenConnectionError - ): # TODO: Pick up new connection from pool if fresh connection failed - self.__logger.error( - f"Failed to send request with {endpoint_connection_handler}" + if Config.ENABLE_TRUSTED_NODE_VERIFICATION: + # Trusted node handling + try: + trusted_connection_handler = self.connection_pool.get_trusted_node_connection() + except Exception as error: + self.__logger.error("%s: %s", error.__class__, error) + self.__logger.error("Failed to establish trusted endpoint connection") + self.__logger.error("Stack trace:\n%s", + traceback.format_exc()) + trusted_connection_handler = None + + # Parallel request sending + with ThreadPoolExecutor(max_workers=2) as executor: + fut_main = executor.submit(endpoint_connection_handler.send, req) + fut_trusted = ( + executor.submit(trusted_connection_handler.send, req) + if Config.ENABLE_TRUSTED_NODE_VERIFICATION and trusted_connection_handler + else None ) - cs.send_all( - ErrorResponses.connection_error(req.id) - ) # TODO: detect wether client connection is closed - self.__manage_client_connection( - req.keep_alive, cs, client_poller, active_client_connections + + try: + main_response = fut_main.result() + trusted_response = fut_trusted.result() if fut_trusted else None + except Exception as e: + self.__logger.error(f"Error during request handling: {e}") + cs.send_all(ErrorResponses.connection_error(req.id)) + endpoint_connection_handler.update_error_stats(1) + endpoint_connection_handler.close() + + if trusted_connection_handler: + trusted_connection_handler.close() + self.__manage_client_connection( + req.keep_alive, cs, client_poller, active_client_connections + ) + return + + print(f"main_response: {main_response}") + print(f"trusted_response: {trusted_response}") + + if trusted_response and not self.trusted_node_verifier.verify(req, main_response, trusted_response): + endpoint_connection_handler.update_error_stats(0, 1) + self.__logger.error("""Trusted node response does not match node response""") + response_handler = self.__create_response_handler( + trusted_connection_handler, cs, req + ) + else: + response_handler = self.__create_response_handler( + endpoint_connection_handler, cs, req ) - endpoint_connection_handler.close() - return - response_handler = self.__create_response_handler( - endpoint_connection_handler, cs, req - ) try: endpoint_connection_handler.receive(response_handler) except ( diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py b/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py index a74be8b..10510e8 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py @@ -12,6 +12,7 @@ from web3pi_proxy.core.rpc.node.endpoint_pool.load_balancers import ( LoadBalancer, ) +from web3pi_proxy.core.rpc.node.endpoint_pool.trusted_node_verifier import TrustedNodeVerifier from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import ( EndpointConnectionDescriptor, ) @@ -204,6 +205,7 @@ def __init__( self, descriptors: List[Tuple[str, EndpointConnectionDescriptor]], load_balancer: LoadBalancer, + trusted_node_verifier: TrustedNodeVerifier or None ): self.load_balancer = load_balancer self.damage_controller = DamageController() @@ -228,6 +230,19 @@ def __init__( ) self.sync_controller_thread.start() + if trusted_node_verifier: + trusted_node_descr = trusted_node_verifier.get_trusted_node_connection_descriptor() + endpoint = RPCEndpoint.create('TRUSTED_NODE', trusted_node_descr) + self.trusted_node_pool = EndpointConnectionPool(endpoint) + else: + self.trusted_node_pool = None + + + def get_trusted_node_connection(self) -> EndpointConnectionHandler: + if self.trusted_node_pool is None: + raise ValueError("Trusted node is not configured") + return self.trusted_node_pool.get() + @property def endpoints(self) -> List[RPCEndpoint]: with self.__lock: diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py new file mode 100644 index 0000000..4c52450 --- /dev/null +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py @@ -0,0 +1,53 @@ +import json +from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import EndpointConnectionDescriptor +from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest + + +class TrustedNodeVerifier: + + def __init__(self, trusted_node_url: str): + self.trusted_node_connection_descriptor = EndpointConnectionDescriptor.from_url(trusted_node_url) + + def get_trusted_node_connection_descriptor(self): + return self.trusted_node_connection_descriptor + + def verify(self, req: RPCRequest, node_response: bytes, trusted_response: bytes) -> bool: + try: + node_response_json = json.loads(node_response) + trusted_response_json = json.loads(trusted_response) + + # Validate the structure of both responses and ensure they are JSON-RPC + if not self._is_valid_jsonrpc_response(node_response_json) or \ + not self._is_valid_jsonrpc_response(trusted_response_json): + self.log_discrepancy(node_response, trusted_response, reason="Invalid JSON-RPC structure") + return False + + # Strict compare the "result" field + if node_response_json.get("result") == trusted_response_json.get("result"): + print("Verified! OK") + return True + else: + self.log_discrepancy(node_response, trusted_response, reason="Result mismatch") + return False + + except json.JSONDecodeError as e: + # Handle invalid JSON formats + self.log_discrepancy(node_response, trusted_response, reason=f"Invalid JSON: {e}") + return False + except Exception as e: + # Handle unexpected exceptions + self.log_discrepancy(node_response, trusted_response, reason=f"Unexpected error: {e}") + return False + + def _is_valid_jsonrpc_response(self, response: dict) -> bool: + return ( + isinstance(response, dict) and + response.get("jsonrpc") == "2.0" and + ("result" in response or "error" in response) and + "id" in response + ) + + def log_discrepancy(self, node_response, trusted_response): + print("Discrepancy detected!") + print(f"Main Node Response: {node_response}") + print(f"Trusted Node Response: {trusted_response}") \ No newline at end of file diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpoint_connection_handler.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpoint_connection_handler.py index 61f93ed..9350fbe 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpoint_connection_handler.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpoint_connection_handler.py @@ -101,6 +101,9 @@ def update_request_stats(self, request: RPCRequest): def update_response_stats(self, response_bytes: bytearray) -> None: self.connection.update_endpoint_stats(bytearray(), response_bytes) + def update_error_stats(self, no_errors: int, no_verification_errors: int = 0) -> None: + self.connection.update_endpoint_error_stats(no_errors, no_verification_errors) + def release(self) -> None: if self.connection is not None: self.connection_pool.put(self.connection) diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py index 30543f4..08fb75d 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py @@ -71,3 +71,8 @@ def update_endpoint_stats( self, request_bytes: bytearray, response_bytes: bytearray ) -> None: self.endpoint.update_stats(request_bytes, response_bytes) + + def update_endpoint_error_stats( + self, no_errors: int, no_verification_errors: int + ) -> None: + self.endpoint.update_errors_stats(no_errors, no_verification_errors) diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py index 88f7edf..7f6f97b 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py @@ -14,6 +14,8 @@ def __init__(self): self._no_bytes_sent = 0 self._no_bytes_received = 0 self._no_requests_handled = 0 + self._no_errors = 0 + self._no_verification_errors = 0 self.__lock = Lock() @@ -33,13 +35,31 @@ def no_bytes_received(self): def no_requests_handled(self): return self._no_requests_handled + @property + def no_errors(self): + return self._no_errors + + @property + def no_verification_errors(self): + return self._no_verification_errors + def _update( - self, no_bytes_received: int, no_bytes_sent: int, no_requests_handled: int + self, no_bytes_received: int, no_bytes_sent: int, no_requests_handled: int, no_errors: int = 0, no_verification_errors: int = 0 ) -> None: with self.__lock: self._no_requests_handled += no_requests_handled self._no_bytes_sent += no_bytes_sent self._no_bytes_received += no_bytes_received + self._no_errors += no_errors + self._no_verification_errors += no_verification_errors + + def update_errors(self, no_errors: int) -> None: + if no_errors: + self._update(0, 0, 1, no_errors) + + def update_verification_errors(self, no_verification_errors: int) -> None: + if no_verification_errors: + self._update(0, 0, 1, 0, no_verification_errors) def update_request_bytes(self, req_data: bytearray) -> None: no_req_bytes = len(req_data) @@ -57,4 +77,6 @@ def to_dict(self): "req_no": self.no_requests_handled, "bytes_sent": self.no_bytes_sent, "bytes_received": self.no_bytes_received, + "errors_no": self._no_errors, + "trusted_verification_errors_no": self._no_verification_errors, } diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/endpointimpl.py b/web3pi_proxy/core/rpc/node/rpcendpoint/endpointimpl.py index a6a6cdb..44caecf 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/endpointimpl.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/endpointimpl.py @@ -30,6 +30,10 @@ def update_stats(self, request_bytes: bytearray, response_bytes: bytearray) -> N self.conn_stats.update_request_bytes(request_bytes) self.conn_stats.update_response_bytes(response_bytes) + def update_errors_stats(self, no_errors: int, no_verification_errors: int) -> None: + self.conn_stats.update_errors(no_errors) + self.conn_stats.update_verification_errors(no_verification_errors) + @classmethod def create(cls, name: str, conn_descr: EndpointConnectionDescriptor) -> RPCEndpoint: return RPCEndpoint(name, conn_descr) diff --git a/web3pi_proxy/service/providers/serviceprovider.py b/web3pi_proxy/service/providers/serviceprovider.py index a80f010..4421b07 100644 --- a/web3pi_proxy/service/providers/serviceprovider.py +++ b/web3pi_proxy/service/providers/serviceprovider.py @@ -8,6 +8,7 @@ from web3pi_proxy.core.rpc.node.endpoint_pool.pool_manager import ( EndpointConnectionPoolManager, ) +from web3pi_proxy.core.rpc.node.endpoint_pool.trusted_node_verifier import TrustedNodeVerifier from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import ( EndpointConnectionDescriptor, ) @@ -40,7 +41,7 @@ def configure_default_reader_middlewares( return RPCRequestMiddlewareFactory.create_default_descr(cli, call) @classmethod - def create_default_connection_pool(cls, endpoint_config: List[dict], loadbalancer_class: str): + def create_default_connection_pool(cls, endpoint_config: List[dict], loadbalancer_class: str, trusted_node_verifier: TrustedNodeVerifier): descriptors = [ ( entrypoint["name"], @@ -57,7 +58,8 @@ def create_default_connection_pool(cls, endpoint_config: List[dict], loadbalance load_balancer = load_balancers.ConstantLoadBalancer() else: raise Exception("fatal: inconsistent LOADBALANCER configuration") - return EndpointConnectionPoolManager(descriptors, load_balancer) + + return EndpointConnectionPoolManager(descriptors, load_balancer, trusted_node_verifier) @classmethod def create_default_response_handler(cls) -> RPCResponseHandler: @@ -71,6 +73,7 @@ def create_web3_rpc_proxy( proxy_address: str, proxy_port: int, num_proxy_workers: int, + trusted_node_verifier: TrustedNodeVerifier ) -> Web3RPCProxy: # Create default components middlewares = cls.configure_default_reader_middlewares(ssm) @@ -83,6 +86,7 @@ def create_web3_rpc_proxy( middlewares, connection_pool, ssm.get_service_state_updater_instance(), + trusted_node_verifier ) # Pass proxy stats to StateManager, so that it may be queried @@ -104,11 +108,20 @@ def create_default_web3_rpc_proxy( eth_endpoints.append(json.loads(eth_endpoint_data.config)) else: eth_endpoints = Config.ETH_ENDPOINTS + + trusted_node_verifier = ( + TrustedNodeVerifier(Config.TRUSTED_NODE_URL) + if Config.ENABLE_TRUSTED_NODE_VERIFICATION + else None + ) + print("trusted node", trusted_node_verifier) + # Create default components - connection_pool = cls.create_default_connection_pool(eth_endpoints, Config.LOADBALANCER) + connection_pool = cls.create_default_connection_pool(eth_endpoints, Config.LOADBALANCER, trusted_node_verifier) + return cls.create_web3_rpc_proxy( - ssm, connection_pool, proxy_listen_address, proxy_listen_port, num_proxy_workers + ssm, connection_pool, proxy_listen_address, proxy_listen_port, num_proxy_workers, trusted_node_verifier ) @classmethod From 6f8ef90e9150610696d9f82f26453d9183ffb236 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Wed, 29 Jan 2025 15:10:50 +0100 Subject: [PATCH 2/6] Improve trusted node verification and error handling. Refactored the trusted node verification logic to enhance comparison mechanisms like content length and body validation. --- web3pi_proxy/core/proxy.py | 100 ++++++++------ .../endpoint_pool/trusted_node_verifier.py | 124 ++++++++++++------ .../connection/endpointconnectionstats.py | 2 +- .../service/providers/serviceprovider.py | 2 - 4 files changed, 145 insertions(+), 83 deletions(-) diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 0c0a646..93cd4f0 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -5,9 +5,6 @@ from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from typing import Callable - -from mypy.nodes import node_kinds - from web3pi_proxy.config.conf import Config from web3pi_proxy.core.inbound.server import InboundServer from web3pi_proxy.core.interfaces.rpcrequest import RequestReaderMiddleware @@ -73,6 +70,10 @@ def __print_pre_init_info( print(f"Starting {Config.PROXY_NAME}, version {Config.PROXY_VER}") print(f"Provided request middleware chain: {rr}") + if Config.ENABLE_TRUSTED_NODE_VERIFICATION: + print(f"Trusted node verification enabled with node URL: {Config.TRUSTED_NODE_URL}") + else: + print(f"Trusted node verification disabled") endpoints = cp.endpoints print( @@ -126,10 +127,37 @@ def __create_response_handler( endpoint_connection_handler: EndpointConnectionHandler, cs: ClientSocket, req: RPCRequest, + trusted_connection_handler: EndpointConnectionHandler | None ) -> Callable: add_cors = req.cors_origin is not None # TODO CORS support here is very crude, needs improvement + def get_trusted_response(): + result_holder = {"result": None} + def callback(trusted_res: bytes): + self.__logger.debug(f"Trusted response received: {trusted_res}") + result_holder["result"] = trusted_res + trusted_connection_handler.receive(callback) + return result_holder["result"] + def response_handler(res: bytes): + if trusted_connection_handler: + executor = ThreadPoolExecutor(max_workers=1) + trusted_response_future = executor.submit(get_trusted_response) + try: + trusted_response = trusted_response_future.result() + self.__logger.debug(f"Trusted response processed: {trusted_response}") + except Exception as e: + self.__logger.error(f"Error receiving trusted response. {e}") + trusted_connection_handler.close() + trusted_response = None + else: + trusted_response = None + + + if trusted_response and not self.trusted_node_verifier.verify(req, res, trusted_response): + self.__logger.error("""Trusted node response does not match node response""") + endpoint_connection_handler.update_error_stats(0, 1) + if cs.socket.fileno() < 0: return nonlocal add_cors @@ -197,50 +225,39 @@ def handle_client( try: trusted_connection_handler = self.connection_pool.get_trusted_node_connection() except Exception as error: - self.__logger.error("%s: %s", error.__class__, error) - self.__logger.error("Failed to establish trusted endpoint connection") - self.__logger.error("Stack trace:\n%s", - traceback.format_exc()) + self.__logger.error(f"Failed to establish trusted endpoint connection. {error}") trusted_connection_handler = None - # Parallel request sending - with ThreadPoolExecutor(max_workers=2) as executor: - fut_main = executor.submit(endpoint_connection_handler.send, req) - fut_trusted = ( - executor.submit(trusted_connection_handler.send, req) - if Config.ENABLE_TRUSTED_NODE_VERIFICATION and trusted_connection_handler - else None + try: + endpoint_connection_handler.send(req) + except ( + BrokenConnectionError + ): # TODO: Pick up new connection from pool if fresh connection failed + self.__logger.error( + f"Failed to send request with {endpoint_connection_handler}" ) + cs.send_all( + ErrorResponses.connection_error(req.id) + ) # TODO: detect wether client connection is closed + endpoint_connection_handler.update_error_stats(1) + self.__manage_client_connection( + req.keep_alive, cs, client_poller, active_client_connections + ) + endpoint_connection_handler.close() + return + if trusted_connection_handler: try: - main_response = fut_main.result() - trusted_response = fut_trusted.result() if fut_trusted else None - except Exception as e: - self.__logger.error(f"Error during request handling: {e}") - cs.send_all(ErrorResponses.connection_error(req.id)) - endpoint_connection_handler.update_error_stats(1) - endpoint_connection_handler.close() - - if trusted_connection_handler: - trusted_connection_handler.close() - self.__manage_client_connection( - req.keep_alive, cs, client_poller, active_client_connections + trusted_connection_handler.send(req) + except BrokenConnectionError: + self.__logger.error( + f"Failed to send request to trusted node with {trusted_connection_handler}" ) - return - - print(f"main_response: {main_response}") - print(f"trusted_response: {trusted_response}") + trusted_connection_handler = None - if trusted_response and not self.trusted_node_verifier.verify(req, main_response, trusted_response): - endpoint_connection_handler.update_error_stats(0, 1) - self.__logger.error("""Trusted node response does not match node response""") - response_handler = self.__create_response_handler( - trusted_connection_handler, cs, req - ) - else: - response_handler = self.__create_response_handler( - endpoint_connection_handler, cs, req - ) + response_handler = self.__create_response_handler( + endpoint_connection_handler, cs, req, trusted_connection_handler + ) try: endpoint_connection_handler.receive(response_handler) @@ -271,8 +288,7 @@ def handle_client( endpoint_connection_handler.release() except Exception as e: - traceback.print_exc() - self.__logger.error(e) + self.__logger.error("Error while handling the client request", exc_info=True) print( f"Error while handling the client request {e}" ) # TODO is this a good error handling? diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py index 4c52450..bcba030 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py @@ -1,53 +1,101 @@ +import gzip import json + from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import EndpointConnectionDescriptor from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest +from web3pi_proxy.utils.logger import get_logger class TrustedNodeVerifier: - def __init__(self, trusted_node_url: str): - self.trusted_node_connection_descriptor = EndpointConnectionDescriptor.from_url(trusted_node_url) + self.__trusted_node_connection_descriptor = EndpointConnectionDescriptor.from_url(trusted_node_url) + self.__logger = get_logger("TrustedNodeVerifier") + + self.__comparison_rules = { + "web3_clientVersion": ["content_length", "body"], + "eth_getLogs": ["content_length"], + # TODO: define other rules + } def get_trusted_node_connection_descriptor(self): - return self.trusted_node_connection_descriptor + return self.__trusted_node_connection_descriptor - def verify(self, req: RPCRequest, node_response: bytes, trusted_response: bytes) -> bool: + def verify(self, req: RPCRequest, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: try: - node_response_json = json.loads(node_response) - trusted_response_json = json.loads(trusted_response) - - # Validate the structure of both responses and ensure they are JSON-RPC - if not self._is_valid_jsonrpc_response(node_response_json) or \ - not self._is_valid_jsonrpc_response(trusted_response_json): - self.log_discrepancy(node_response, trusted_response, reason="Invalid JSON-RPC structure") - return False - - # Strict compare the "result" field - if node_response_json.get("result") == trusted_response_json.get("result"): - print("Verified! OK") - return True - else: - self.log_discrepancy(node_response, trusted_response, reason="Result mismatch") - return False - - except json.JSONDecodeError as e: - # Handle invalid JSON formats - self.log_discrepancy(node_response, trusted_response, reason=f"Invalid JSON: {e}") + self.__logger.debug("Verifying response from trusted node...") + + comparison_types = self.__comparison_rules.get(req.method, ["content_length"]) + + for comparison_type in comparison_types: + comparison_method_name = f"_compare_{comparison_type}" + comparison_method = getattr(self, comparison_method_name, None) + + if comparison_method is None: + self.__logger.error(f"Comparison method '{comparison_type}' is not implemented.") + raise NotImplementedError(f"Comparison method '{comparison_type}' is not implemented.") + + if comparison_method(node_response_raw, trusted_response_raw) is False: + return False + + self.__logger.debug("Responses match successfully!") + return True + except Exception as e: + self.__logger.error(f"Error during verification: {e}") return False + + def __parse_headers(self, raw_response: bytes) -> dict: + try: + headers_raw, _ = raw_response.split(b"\r\n\r\n", 1) + headers_lines = headers_raw.split(b"\r\n")[1:] + + headers = {} + for line in headers_lines: + key, value = line.split(b": ", 1) + headers[key.decode("utf-8").lower()] = value.decode("utf-8") + + return headers except Exception as e: - # Handle unexpected exceptions - self.log_discrepancy(node_response, trusted_response, reason=f"Unexpected error: {e}") + self.__logger.error(f"Failed to parse headers: {e}") + raise + + def __parse_body(self, raw_response: bytes, headers: dict) -> dict: + try: + _, body = raw_response.split(b"\r\n\r\n", 1) + + if headers.get("content-encoding") == "gzip": + body = gzip.decompress(body) + + body_json = json.loads(body.decode("utf-8")) + return body_json + except Exception as e: + self.__logger.error(f"Failed to parse body: {e}") + raise + + def _compare_content_length(self, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: + headers_node = self.__parse_headers(node_response_raw) + headers_trusted = self.__parse_headers(trusted_response_raw) + node_length = headers_node.get("content-length") + trusted_length = headers_trusted.get("content-length") + + if node_length != trusted_length: + self.__logger.warning( + f"Content-length mismatch: node({node_length}) vs trusted({trusted_length})" + ) + return False + + return True + + + def _compare_body(self, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: + headers_node = self.__parse_headers(node_response_raw) + headers_trusted = self.__parse_headers(trusted_response_raw) + body_node = self.__parse_body(node_response_raw, headers_node) + body_trusted = self.__parse_body(trusted_response_raw, headers_trusted) + + if body_node != body_trusted: + self.__logger.warning( + f"Body mismatch: node({json.dumps(body_node)}) vs trusted({json.dumps(body_trusted)})" + ) return False - def _is_valid_jsonrpc_response(self, response: dict) -> bool: - return ( - isinstance(response, dict) and - response.get("jsonrpc") == "2.0" and - ("result" in response or "error" in response) and - "id" in response - ) - - def log_discrepancy(self, node_response, trusted_response): - print("Discrepancy detected!") - print(f"Main Node Response: {node_response}") - print(f"Trusted Node Response: {trusted_response}") \ No newline at end of file + return True diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py index 7f6f97b..3042dd7 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnectionstats.py @@ -78,5 +78,5 @@ def to_dict(self): "bytes_sent": self.no_bytes_sent, "bytes_received": self.no_bytes_received, "errors_no": self._no_errors, - "trusted_verification_errors_no": self._no_verification_errors, + "verification_errors_no": self._no_verification_errors, } diff --git a/web3pi_proxy/service/providers/serviceprovider.py b/web3pi_proxy/service/providers/serviceprovider.py index 4421b07..292e68b 100644 --- a/web3pi_proxy/service/providers/serviceprovider.py +++ b/web3pi_proxy/service/providers/serviceprovider.py @@ -103,7 +103,6 @@ def create_default_web3_rpc_proxy( ) -> Web3RPCProxy: if Config.ETH_ENDPOINTS_STORE: eth_endpoints = [] - uuu = Endpoint.select(Endpoint.config) for eth_endpoint_data in Endpoint.select(Endpoint.config): eth_endpoints.append(json.loads(eth_endpoint_data.config)) else: @@ -114,7 +113,6 @@ def create_default_web3_rpc_proxy( if Config.ENABLE_TRUSTED_NODE_VERIFICATION else None ) - print("trusted node", trusted_node_verifier) # Create default components connection_pool = cls.create_default_connection_pool(eth_endpoints, Config.LOADBALANCER, trusted_node_verifier) From 88bb17cd2a391b1304ea2480ec134eb0048d1c7e Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Wed, 29 Jan 2025 15:28:11 +0100 Subject: [PATCH 3/6] Updated README.md for new configs: `ENABLE_TRUSTED_NODE_VERIFICATION` and `TRUSTED_NODE_URL` --- README.md | 62 +++++++++++++++++++------------------ web3pi_proxy/config/conf.py | 2 -- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 85e83fa..210af80 100644 --- a/README.md +++ b/README.md @@ -41,36 +41,38 @@ poetry install You can define the following environment variables, and you can place them in the .env file (all are optional): -| Variable | Default | Description | -|---------------------------------|----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `LOG_LEVEL` | `INFO` | Specifies the logging level. | -| `ADMIN_AUTH_TOKEN` | Randomly generated | Admin authentication token. | -| `ETH_ENDPOINTS` | `None` | A JSON list of endpoint descriptors for Ethereum nodes. Example: `[{"name": "rpi4", "url": "http://localhost:8545/"}]`. If defined, this list becomes static and cannot be managed via the admin panel. Leaving it undefined enables endpoint management via the admin panel and local database. | -| `DEFAULT_RECV_BUF_SIZE` | `8192` | Buffer size for socket receiving. | -| `PUBLIC_SERVICE` | `False` | Whether the service is public. | -| `USE_UPNP` | `True` | Enables UPnP if `PUBLIC_SERVICE` is `True`. | -| `UPNP_DISCOVERY_TIMEOUT` | `2.5` | Timeout for UPnP discovery in seconds. | -| `UPNP_LEASE_TIME` | `18000` (5 hours) | Lease time for UPnP in seconds. | -| `PROXY_LISTEN_ADDRESS` | `0.0.0.0` | Address for the proxy to listen on. | -| `PROXY_CONNECTION_ADDRESS` | `None` | Address clients use to connect to the proxy. Default is `None` (auto-resolved). | -| `PROXY_LISTEN_PORT` | `6512` | Port for the proxy to listen on. | -| `NUM_PROXY_WORKERS` | `150` | Number of workers handling proxy connections. | -| `MAX_PENDING_CLIENT_SOCKETS` | `10000` | Maximum number of pending client sockets. | -| `MAX_CONCURRENT_CONNECTIONS` | `21` | Maximum number of concurrent connections. | -| `IDLE_CONNECTION_TIMEOUT` | `300` | Timeout for idle connections in seconds. | -| `SSL_ENABLED` | `False` | Whether SSL is enabled. | -| `SSL_CERT_FILE` | `cert.pem` | Path to SSL certificate file. | -| `SSL_KEY_FILE` | `key.pem` | Path to SSL key file. | -| `CACHE_ENABLED` | `False` | Whether caching is enabled. | -| `CACHE_EXPIRY_MS` | `300000` (5 minutes) | Cache expiry time in milliseconds. | -| `JSON_RPC_REQUEST_PARSER_ENABLED` | `True` | Enables JSON-RPC request parsing. | -| `STATS_UPDATE_DELTA` | `12` | Update interval for stats in seconds. | -| `ADMIN_LISTEN_ADDRESS` | `0.0.0.0` | Address for the admin panel to listen on. | -| `ADMIN_CONNECTION_ADDRESS` | `None` | Address clients use to connect to the admin panel. Default is `None` (auto-resolved). | -| `ADMIN_LISTEN_PORT` | `6561` | Port for the admin panel to listen on. | -| `DB_FILE` | `web3pi_proxy.sqlite3` | Path to the database file. | -| `MODE` | `PROD` | Proxy mode (`DEV`, `SIM`, `PROD`). | -| `LOADBALANCER` | `LeastBusyLoadBalancer` | Load balancer strategy (`RandomLoadBalancer`, `LeastBusyLoadBalancer`, `ConstantLoadBalancer`). | +| Variable | Default | Description | +|---------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `LOG_LEVEL` | `INFO` | Specifies the logging level. | +| `ADMIN_AUTH_TOKEN` | Randomly generated | Admin authentication token. | +| `ETH_ENDPOINTS` | `None` | A JSON list of endpoint descriptors for Ethereum nodes. Example: `[{"name": "rpi4", "url": "http://localhost:8545/"}]`. If defined, this list becomes static and cannot be managed via the admin panel. Leaving it undefined enables endpoint management via the admin panel and local database. | +| `DEFAULT_RECV_BUF_SIZE` | `8192` | Buffer size for socket receiving. | +| `PUBLIC_SERVICE` | `False` | Whether the service is public. | +| `USE_UPNP` | `True` | Enables UPnP if `PUBLIC_SERVICE` is `True`. | +| `UPNP_DISCOVERY_TIMEOUT` | `2.5` | Timeout for UPnP discovery in seconds. | +| `UPNP_LEASE_TIME` | `18000` (5 hours) | Lease time for UPnP in seconds. | +| `PROXY_LISTEN_ADDRESS` | `0.0.0.0` | Address for the proxy to listen on. | +| `PROXY_CONNECTION_ADDRESS` | `None` | Address clients use to connect to the proxy. Default is `None` (auto-resolved). | +| `PROXY_LISTEN_PORT` | `6512` | Port for the proxy to listen on. | +| `NUM_PROXY_WORKERS` | `150` | Number of workers handling proxy connections. | +| `MAX_PENDING_CLIENT_SOCKETS` | `10000` | Maximum number of pending client sockets. | +| `MAX_CONCURRENT_CONNECTIONS` | `21` | Maximum number of concurrent connections. | +| `IDLE_CONNECTION_TIMEOUT` | `300` | Timeout for idle connections in seconds. | +| `SSL_ENABLED` | `False` | Whether SSL is enabled. | +| `SSL_CERT_FILE` | `cert.pem` | Path to SSL certificate file. | +| `SSL_KEY_FILE` | `key.pem` | Path to SSL key file. | +| `CACHE_ENABLED` | `False` | Whether caching is enabled. | +| `CACHE_EXPIRY_MS` | `300000` (5 minutes) | Cache expiry time in milliseconds. | +| `JSON_RPC_REQUEST_PARSER_ENABLED` | `True` | Enables JSON-RPC request parsing. | +| `STATS_UPDATE_DELTA` | `12` | Update interval for stats in seconds. | +| `ADMIN_LISTEN_ADDRESS` | `0.0.0.0` | Address for the admin panel to listen on. | +| `ADMIN_CONNECTION_ADDRESS` | `None` | Address clients use to connect to the admin panel. Default is `None` (auto-resolved). | +| `ADMIN_LISTEN_PORT` | `6561` | Port for the admin panel to listen on. | +| `DB_FILE` | `web3pi_proxy.sqlite3` | Path to the database file. | +| `MODE` | `PROD` | Proxy mode (`DEV`, `SIM`, `PROD`). | +| `LOADBALANCER` | `LeastBusyLoadBalancer` | Load balancer strategy (`RandomLoadBalancer`, `LeastBusyLoadBalancer`, `ConstantLoadBalancer`). | +| `ENABLE_TRUSTED_NODE_VERIFICATION` | `False` | Enables or disables response verification through a trusted node. If set to `True`, all RPC responses will be validated against a trusted node before being forwarded. | +| `TRUSTED_NODE_URL` | `None` | Specifies the URL of the trusted Ethereum node used for response verification. This node acts as the reference point for validating RPC responses. | ## Run diff --git a/web3pi_proxy/config/conf.py b/web3pi_proxy/config/conf.py index da86341..26b4b0e 100644 --- a/web3pi_proxy/config/conf.py +++ b/web3pi_proxy/config/conf.py @@ -106,9 +106,7 @@ class AppConfig: LOADBALANCER: str = "LeastBusyLoadBalancer" - # enable or disable trusted node mirroring ENABLE_TRUSTED_NODE_VERIFICATION: bool = False - # URL of the trusted node TRUSTED_NODE_URL: str = None def __init__(self): From 201a66d6b2afb6254716a542871134739db6479f Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 30 Jan 2025 13:16:50 +0100 Subject: [PATCH 4/6] Refactor proxy and verifier modules for better readability --- web3pi_proxy/core/proxy.py | 76 +++++++++++++------ .../endpoint_pool/trusted_node_verifier.py | 50 +++++++++--- 2 files changed, 92 insertions(+), 34 deletions(-) diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 93cd4f0..3dfca10 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -8,7 +8,9 @@ from web3pi_proxy.config.conf import Config from web3pi_proxy.core.inbound.server import InboundServer from web3pi_proxy.core.interfaces.rpcrequest import RequestReaderMiddleware -from web3pi_proxy.core.rpc.node.endpoint_pool.trusted_node_verifier import TrustedNodeVerifier +from web3pi_proxy.core.rpc.node.endpoint_pool.trusted_node_verifier import ( + TrustedNodeVerifier, +) from web3pi_proxy.core.sockets.poller import ( get_poller, Poller, @@ -44,7 +46,7 @@ def __init__( middlewares: RequestMiddlewareDescr, connection_pool: EndpointConnectionPoolManager, state_updater: StateUpdater, - trusted_node_verifier: TrustedNodeVerifier + trusted_node_verifier: TrustedNodeVerifier, ) -> None: self.request_reader = middlewares.instantiate() @@ -71,7 +73,9 @@ def __print_pre_init_info( print(f"Starting {Config.PROXY_NAME}, version {Config.PROXY_VER}") print(f"Provided request middleware chain: {rr}") if Config.ENABLE_TRUSTED_NODE_VERIFICATION: - print(f"Trusted node verification enabled with node URL: {Config.TRUSTED_NODE_URL}") + print( + f"Trusted node verification enabled with node URL: {Config.TRUSTED_NODE_URL}" + ) else: print(f"Trusted node verification disabled") @@ -127,15 +131,19 @@ def __create_response_handler( endpoint_connection_handler: EndpointConnectionHandler, cs: ClientSocket, req: RPCRequest, - trusted_connection_handler: EndpointConnectionHandler | None + trusted_connection_handler: EndpointConnectionHandler | None, ) -> Callable: - add_cors = req.cors_origin is not None # TODO CORS support here is very crude, needs improvement + add_cors = ( + req.cors_origin is not None + ) # TODO CORS support here is very crude, needs improvement def get_trusted_response(): result_holder = {"result": None} + def callback(trusted_res: bytes): self.__logger.debug(f"Trusted response received: {trusted_res}") result_holder["result"] = trusted_res + trusted_connection_handler.receive(callback) return result_holder["result"] @@ -145,7 +153,9 @@ def response_handler(res: bytes): trusted_response_future = executor.submit(get_trusted_response) try: trusted_response = trusted_response_future.result() - self.__logger.debug(f"Trusted response processed: {trusted_response}") + self.__logger.debug( + f"Trusted response processed: {trusted_response}" + ) except Exception as e: self.__logger.error(f"Error receiving trusted response. {e}") trusted_connection_handler.close() @@ -153,9 +163,12 @@ def response_handler(res: bytes): else: trusted_response = None - - if trusted_response and not self.trusted_node_verifier.verify(req, res, trusted_response): - self.__logger.error("""Trusted node response does not match node response""") + if trusted_response and not self.trusted_node_verifier.verify( + req, res, trusted_response + ): + self.__logger.error( + """Trusted node response does not match node response""" + ) endpoint_connection_handler.update_error_stats(0, 1) if cs.socket.fileno() < 0: @@ -163,9 +176,15 @@ def response_handler(res: bytes): nonlocal add_cors if add_cors: add_cors = False - res = res.replace(b"\r\n", b"\r\nAccess-Control-Allow-Origin: " + req.cors_origin + b"\r\n", 1) + res = res.replace( + b"\r\n", + b"\r\nAccess-Control-Allow-Origin: " + req.cors_origin + b"\r\n", + 1, + ) cs.send_all(res) - endpoint_connection_handler.update_response_stats(res) # TODO do we need bytearray here? + endpoint_connection_handler.update_response_stats( + res + ) # TODO do we need bytearray here? self.state_updater.record_rpc_response(req, res) return response_handler @@ -194,10 +213,10 @@ def handle_client( ) return - if req.http_method == b"OPTIONS": # TODO CORS are always included, is that right? - cs.send_all( - OptionsResponses.options_response(req) - ) + if ( + req.http_method == b"OPTIONS" + ): # TODO CORS are always included, is that right? + cs.send_all(OptionsResponses.options_response(req)) self.__manage_client_connection( req.keep_alive, cs, client_poller, active_client_connections ) @@ -208,6 +227,7 @@ def handle_client( # Main node handling try: + endpoint_connection_handler = self.connection_pool.get_connection(req) except Exception as error: self.__logger.error("%s: %s", error.__class__, error) @@ -223,15 +243,19 @@ def handle_client( if Config.ENABLE_TRUSTED_NODE_VERIFICATION: # Trusted node handling try: - trusted_connection_handler = self.connection_pool.get_trusted_node_connection() + trusted_connection_handler = ( + self.connection_pool.get_trusted_node_connection() + ) except Exception as error: - self.__logger.error(f"Failed to establish trusted endpoint connection. {error}") + self.__logger.error( + f"Failed to establish trusted endpoint connection. {error}" + ) trusted_connection_handler = None try: endpoint_connection_handler.send(req) except ( - BrokenConnectionError + BrokenConnectionError ): # TODO: Pick up new connection from pool if fresh connection failed self.__logger.error( f"Failed to send request with {endpoint_connection_handler}" @@ -288,7 +312,9 @@ def handle_client( endpoint_connection_handler.release() except Exception as e: - self.__logger.error("Error while handling the client request", exc_info=True) + self.__logger.error( + "Error while handling the client request", exc_info=True + ) print( f"Error while handling the client request {e}" ) # TODO is this a good error handling? @@ -297,7 +323,9 @@ def handle_client( endpoint_connection_handler.release() @classmethod - def __print_post_init_info(cls, proxy_listen_address, proxy_listen_port: int) -> None: + def __print_post_init_info( + cls, proxy_listen_address, proxy_listen_port: int + ) -> None: print( "Proxy initialized and listening on {}".format( f"{proxy_listen_address}:{proxy_listen_port}" @@ -340,7 +368,9 @@ def main_loop(self) -> None: events = client_poller.poll(Config.BLOCKING_ACCEPT_TIMEOUT) for fd, ev in events: if fd == srv_socket.socket.fileno(): - cs = srv_socket.accept_awaiting_connection() # TODO connection hang up? errors? + cs = ( + srv_socket.accept_awaiting_connection() + ) # TODO connection hang up? errors? active_client_connections.add_cs_pending(cs) try: client_poller.register( @@ -355,9 +385,7 @@ def main_loop(self) -> None: break except KeyError: break - cs = active_client_connections.get_cs_and_set_in_use( - fd - ) + cs = active_client_connections.get_cs_and_set_in_use(fd) # TODO connection hang up? executor.submit( self.handle_client, diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py index bcba030..bda5d5d 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py @@ -1,38 +1,50 @@ import gzip import json -from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import EndpointConnectionDescriptor +from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import ( + EndpointConnectionDescriptor, +) from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest from web3pi_proxy.utils.logger import get_logger class TrustedNodeVerifier: def __init__(self, trusted_node_url: str): - self.__trusted_node_connection_descriptor = EndpointConnectionDescriptor.from_url(trusted_node_url) + self.__trusted_node_connection_descriptor = ( + EndpointConnectionDescriptor.from_url(trusted_node_url) + ) self.__logger = get_logger("TrustedNodeVerifier") self.__comparison_rules = { - "web3_clientVersion": ["content_length", "body"], - "eth_getLogs": ["content_length"], + "web3_clientVersion": ["content_length", "body_size"], + "eth_getLogs": ["content_length", "body_size", "body_content"], # TODO: define other rules } def get_trusted_node_connection_descriptor(self): return self.__trusted_node_connection_descriptor - def verify(self, req: RPCRequest, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: + def verify( + self, req: RPCRequest, node_response_raw: bytes, trusted_response_raw: bytes + ) -> bool: try: self.__logger.debug("Verifying response from trusted node...") - comparison_types = self.__comparison_rules.get(req.method, ["content_length"]) + comparison_types = self.__comparison_rules.get( + req.method, ["content_length", "body_size"] + ) for comparison_type in comparison_types: comparison_method_name = f"_compare_{comparison_type}" comparison_method = getattr(self, comparison_method_name, None) if comparison_method is None: - self.__logger.error(f"Comparison method '{comparison_type}' is not implemented.") - raise NotImplementedError(f"Comparison method '{comparison_type}' is not implemented.") + self.__logger.error( + f"Comparison method '{comparison_type}' is not implemented." + ) + raise NotImplementedError( + f"Comparison method '{comparison_type}' is not implemented." + ) if comparison_method(node_response_raw, trusted_response_raw) is False: return False @@ -71,7 +83,9 @@ def __parse_body(self, raw_response: bytes, headers: dict) -> dict: self.__logger.error(f"Failed to parse body: {e}") raise - def _compare_content_length(self, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: + def _compare_content_length( + self, node_response_raw: bytes, trusted_response_raw: bytes + ) -> bool: headers_node = self.__parse_headers(node_response_raw) headers_trusted = self.__parse_headers(trusted_response_raw) node_length = headers_node.get("content-length") @@ -85,13 +99,29 @@ def _compare_content_length(self, node_response_raw: bytes, trusted_response_raw return True + def _compare_body_size( + self, node_response_raw: bytes, trusted_response_raw: bytes + ) -> bool: + body_node = node_response_raw.split(b"\r\n\r\n", 1)[1] + body_trusted = trusted_response_raw.split(b"\r\n\r\n", 1)[1] + + if len(body_node) != len(body_trusted): + self.__logger.warning( + f"Body size mismatch: node({len(body_node)}) vs trusted({len(body_trusted)})" + ) + return False + + return True - def _compare_body(self, node_response_raw: bytes, trusted_response_raw: bytes) -> bool: + def _compare_body_content( + self, node_response_raw: bytes, trusted_response_raw: bytes + ) -> bool: headers_node = self.__parse_headers(node_response_raw) headers_trusted = self.__parse_headers(trusted_response_raw) body_node = self.__parse_body(node_response_raw, headers_node) body_trusted = self.__parse_body(trusted_response_raw, headers_trusted) + # TODO: implement more sophisticated comparisons.. if body_node != body_trusted: self.__logger.warning( f"Body mismatch: node({json.dumps(body_node)}) vs trusted({json.dumps(body_trusted)})" From c20556372f775db54d1cfd449b8719daadce334c Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 30 Jan 2025 13:22:23 +0100 Subject: [PATCH 5/6] Refactor trusted response handling in proxy module --- web3pi_proxy/core/proxy.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 3dfca10..8949f72 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -138,14 +138,15 @@ def __create_response_handler( ) # TODO CORS support here is very crude, needs improvement def get_trusted_response(): - result_holder = {"result": None} + result = None - def callback(trusted_res: bytes): + def on_response_received(trusted_res: bytes): self.__logger.debug(f"Trusted response received: {trusted_res}") - result_holder["result"] = trusted_res + nonlocal result + result = trusted_res - trusted_connection_handler.receive(callback) - return result_holder["result"] + trusted_connection_handler.receive(on_response_received) + return result def response_handler(res: bytes): if trusted_connection_handler: From ea4150b2cb66fe6f56fcab27a4f1fc4098916cac Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 30 Jan 2025 14:58:14 +0100 Subject: [PATCH 6/6] Fix response handling and header parsing logic. --- web3pi_proxy/core/proxy.py | 5 ++++- .../core/rpc/node/endpoint_pool/trusted_node_verifier.py | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 8949f72..5c8c065 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -143,7 +143,10 @@ def get_trusted_response(): def on_response_received(trusted_res: bytes): self.__logger.debug(f"Trusted response received: {trusted_res}") nonlocal result - result = trusted_res + if result is None: + result = trusted_res + else: + result += trusted_res trusted_connection_handler.receive(on_response_received) return result diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py index bda5d5d..63bc109 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/trusted_node_verifier.py @@ -57,7 +57,12 @@ def verify( def __parse_headers(self, raw_response: bytes) -> dict: try: - headers_raw, _ = raw_response.split(b"\r\n\r\n", 1) + if b"\r\n\r\n" in raw_response: + headers_raw, _ = raw_response.split(b"\r\n\r\n", 1) + else: + raise ValueError( + "Response does not contain valid HTTP header-body separator (\\r\\n\\r\\n)." + ) headers_lines = headers_raw.split(b"\r\n")[1:] headers = {}