From 09a18f44ae66e36233d5c2941d802fbcab345657 Mon Sep 17 00:00:00 2001 From: Aashish <112133849+aashishpatil-g@users.noreply.github.com> Date: Fri, 27 Mar 2026 15:17:04 -0700 Subject: [PATCH 01/13] feat(fdc): Happy Path Implementation (#18151) * Initial Commit * Fix formatting and analyzer warnings * Fix tests and licenses * Denver feedback: var initialization best practice * sorted keys for id generation * Fix analyze info messages --- .../firebase_data_connect/.metadata | 30 ++ .../analysis_options.yaml | 1 + .../lib/src/cache/cache_data_types.dart | 12 +- .../lib/src/common/common_library.dart | 9 + .../lib/src/common/dataconnect_error.dart | 4 +- .../lib/src/core/ref.dart | 128 ++++-- .../lib/src/firebase_data_connect.dart | 108 ++++- .../google/protobuf/duration.pb.dart | 1 + .../generated/google/protobuf/struct.pb.dart | 1 + .../lib/src/network/grpc_transport.dart | 17 +- .../lib/src/network/rest_transport.dart | 13 + .../lib/src/network/stream_protocol.dart | 174 ++++++++ .../lib/src/network/transport_library.dart | 12 +- .../lib/src/network/transport_stub.dart | 15 +- .../lib/src/network/websocket_transport.dart | 410 ++++++++++++++++++ .../firebase_data_connect/pubspec.yaml | 1 + .../test/src/cache/cache_manager_test.dart | 2 +- .../src/cache/cache_manager_test.mocks.dart | 17 +- .../test/src/common/common_library_test.dart | 25 +- .../src/common/dataconnect_error_test.dart | 6 +- .../test/src/core/ref_test.dart | 20 +- .../test/src/firebase_data_connect_test.dart | 11 +- .../src/firebase_data_connect_test.mocks.dart | 17 +- .../test/src/network/rest_transport_test.dart | 16 +- .../network/rest_transport_test.mocks.dart | 9 +- 25 files changed, 947 insertions(+), 112 deletions(-) create mode 100644 packages/firebase_data_connect/firebase_data_connect/.metadata create mode 100644 packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml create mode 100644 packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart create mode 100644 packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart diff --git a/packages/firebase_data_connect/firebase_data_connect/.metadata b/packages/firebase_data_connect/firebase_data_connect/.metadata new file mode 100644 index 000000000000..fca9f99c599e --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/.metadata @@ -0,0 +1,30 @@ +# This file tracks properties of this Flutter project. +# Used by Flutter tool to assess capabilities and perform upgrades etc. +# +# This file should be version controlled and should not be manually edited. + +version: + revision: "adc901062556672b4138e18a4dc62a4be8f4b3c2" + channel: "stable" + +project_type: app + +# Tracks metadata for the flutter migrate command +migration: + platforms: + - platform: root + create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + - platform: macos + create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + + # User provided section + + # List of Local paths (relative to this file) that should be + # ignored by the migrate tool. + # + # Files that are not part of the templates will be ignored by default. + unmanaged_files: + - 'lib/main.dart' + - 'ios/Runner.xcodeproj/project.pbxproj' diff --git a/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml b/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml new file mode 100644 index 000000000000..f9b303465f19 --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml @@ -0,0 +1 @@ +include: package:flutter_lints/flutter.yaml diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart index 0768da15232c..9b991a946a79 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart @@ -332,11 +332,11 @@ class EntityNode { srcListMap.forEach((key, value) { List enodeList = []; List jsonList = value as List; - jsonList.forEach((jsonObj) { + for (var jsonObj in jsonList) { Map jmap = jsonObj as Map; EntityNode en = EntityNode.fromJson(jmap, cacheProvider); enodeList.add(en); - }); + } objLists?[key] = enodeList; }); } @@ -367,9 +367,9 @@ class EntityNode { if (nestedObjectLists != null) { nestedObjectLists!.forEach((key, edoList) { List> jsonList = []; - edoList.forEach((edo) { + for (var edo in edoList) { jsonList.add(edo.toJson(mode: mode)); - }); + } jsonData[key] = jsonList; }); } @@ -396,9 +396,9 @@ class EntityNode { Map nestedObjectListsJson = {}; nestedObjectLists!.forEach((key, edoList) { List> jsonList = []; - edoList.forEach((edo) { + for (var edo in edoList) { jsonList.add(edo.toJson(mode: mode)); - }); + } nestedObjectListsJson[key] = jsonList; }); jsonData[listsKey] = nestedObjectListsJson; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart index 9247287f5adf..5f523644870c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart @@ -117,4 +117,13 @@ abstract class DataConnectTransport { Variables? vars, String? token, ); + + /// Invokes corresponding stream query endpoint. + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer serializer, + Variables? vars, + String? token, + ); } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart index 43b7fd964418..309b0974d94b 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart @@ -36,9 +36,7 @@ class DataConnectError extends FirebaseException { /// Error thrown when an operation is partially successful. class DataConnectOperationError extends DataConnectError { - DataConnectOperationError( - DataConnectErrorCode code, String message, this.response) - : super(code, message); + DataConnectOperationError(super.code, String super.message, this.response); final DataConnectOperationFailureResponse response; } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 5b359f6a15b3..7a5fd8c2e198 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -53,15 +53,43 @@ abstract class OperationRef { ); Variables? variables; String operationName; - DataConnectTransport _transport; + final DataConnectTransport _transport; Deserializer deserializer; Serializer serializer; String? _lastToken; FirebaseDataConnect dataConnect; - Future> execute( - {QueryFetchPolicy fetchPolicy = QueryFetchPolicy.preferCache}); + static dynamic _sortKeys(dynamic value) { + if (value is Map) { + final sortedMap = {}; + final sortedKeys = value.keys.toList()..sort(); + for (final key in sortedKeys) { + sortedMap[key.toString()] = _sortKeys(value[key]); + } + return sortedMap; + } else if (value is List) { + return value.map(_sortKeys).toList(); + } + return value; + } + + static String createOperationId(String operationName, + Variables? vars, Serializer? serializer) { + if (vars != null && serializer != null) { + try { + final decoded = jsonDecode(serializer(vars)); + final sortedStr = jsonEncode(_sortKeys(decoded)); + return '$operationName::$sortedStr'; + } catch (_) { + return '$operationName::${serializer(vars)}'; + } + } else { + return operationName; + } + } + + Future> execute(); Future _shouldRetry() async { String? newToken; @@ -184,15 +212,6 @@ class QueryManager { return streamController; } - static String createQueryId(String queryName, - QueryVariables? vars, Serializer varSerializer) { - if (vars != null) { - return '$queryName::${varSerializer(vars)}'; - } else { - return queryName; - } - } - void dispose() { _impactedQueriesSubscription?.cancel(); } @@ -216,7 +235,7 @@ class QueryRef extends OperationRef { variables, ); - QueryManager _queryManager; + final QueryManager _queryManager; @override Future> execute( @@ -240,7 +259,7 @@ class QueryRef extends OperationRef { } String get _queryId => - QueryManager.createQueryId(operationName, variables, serializer); + OperationRef.createOperationId(operationName, variables, serializer); Future> _executeFromCache( QueryFetchPolicy fetchPolicy) async { @@ -311,9 +330,58 @@ class QueryRef extends OperationRef { Stream> subscribe() { _streamController ??= _queryManager.addQuery(this); - execute(); + final stream = + _streamController!.stream.cast>(); + + // Return the stream to the caller, then execute fetches + Future.microtask(() { + if (dataConnect.cacheManager != null) { + _executeFromCache(QueryFetchPolicy.cacheOnly) + .then((_) {}) + .catchError((err) { + log("Error fetching from cache during subscribe $err"); + // Ignore cache misses here, server stream will provide latest data + }); + } + + // Initiate Web Socket stream + _streamFromServer(); + }); + + return stream; + } + + void _streamFromServer() async { + bool shouldRetry = await _shouldRetry(); + try { + final stream = _transport.invokeStreamQuery( + operationName, + deserializer, + serializer, + variables, + _lastToken, + ); + + await for (final serverResponse in stream) { + if (dataConnect.cacheManager != null) { + await dataConnect.cacheManager!.update(_queryId, serverResponse); + } + Data typedData = _convertBodyJsonToData(serverResponse.data); - return _streamController!.stream.cast>(); + QueryResult res = + QueryResult(dataConnect, typedData, DataSource.server, this); + publishResultToStream(res); + } + } on DataConnectError catch (e) { + if (shouldRetry && + e.code == DataConnectErrorCode.unauthorized.toString()) { + _streamFromServer(); + } else { + publishErrorToStream(e); + } + } catch (e) { + publishErrorToStream(e as Error); + } } void publishResultToStream(QueryResult result) { @@ -322,7 +390,7 @@ class QueryRef extends OperationRef { } } - void publishErrorToStream(Error err) { + void publishErrorToStream(Object err) { if (_streamController != null) { _streamController?.addError(err); } @@ -331,24 +399,16 @@ class QueryRef extends OperationRef { class MutationRef extends OperationRef { MutationRef( - FirebaseDataConnect dataConnect, - String operationName, - DataConnectTransport transport, - Deserializer deserializer, - Serializer serializer, - Variables? variables, - ) : super( - dataConnect, - operationName, - transport, - deserializer, - serializer, - variables, - ); + super.dataConnect, + super.operationName, + super.transport, + super.deserializer, + super.serializer, + super.variables, + ); @override - Future> execute( - {QueryFetchPolicy fetchPolicy = QueryFetchPolicy.serverOnly}) async { + Future> execute() async { bool shouldRetry = await _shouldRetry(); try { // Logic below is duplicated due to the fact that `executeOperation` returns diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart index d5813f94dc89..ea1a7738a352 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart @@ -20,10 +20,8 @@ import 'package:firebase_data_connect/src/common/common_library.dart'; import 'package:firebase_data_connect/src/core/ref.dart'; import 'package:flutter/foundation.dart'; -import './network/transport_library.dart' - if (dart.library.io) './network/grpc_library.dart' - if (dart.library.js_interop) './network/rest_library.dart' - if (dart.library.html) './network/rest_library.dart'; +import './network/rest_library.dart'; +import './network/transport_library.dart'; import 'cache/cache_data_types.dart'; import 'cache/cache.dart'; @@ -93,13 +91,22 @@ class FirebaseDataConnect extends FirebasePluginPlatform { void checkTransport() { transportOptions ??= TransportOptions('firebasedataconnect.googleapis.com', null, true); - transport = getTransport( + final rest = RestTransport( transportOptions!, options, app.options.appId, _sdkType, appCheck, ); + final ws = WebSocketTransport( + transportOptions!, + options, + app.options.appId, + _sdkType, + appCheck, + auth, + ); + transport = _RoutingTransport(rest, ws); } @visibleForTesting @@ -120,7 +127,7 @@ class FirebaseDataConnect extends FirebasePluginPlatform { checkTransport(); checkAndInitializeCache(); String queryId = - QueryManager.createQueryId(operationName, vars, varsSerializer); + OperationRef.createOperationId(operationName, vars, varsSerializer); QueryRef? ref = _queryManager.trackedQueries[queryId] as QueryRef?; @@ -218,3 +225,92 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return newInstance; } } + +class _RoutingTransport implements DataConnectTransport { + _RoutingTransport(this.rest, this.websocket); + final RestTransport rest; + final WebSocketTransport websocket; + + @override + FirebaseAppCheck? get appCheck => rest.appCheck; + @override + set appCheck(FirebaseAppCheck? val) { + rest.appCheck = val; + websocket.appCheck = val; + } + + @override + CallerSDKType get sdkType => rest.sdkType; + @override + set sdkType(CallerSDKType val) { + rest.sdkType = val; + websocket.sdkType = val; + } + + @override + TransportOptions get transportOptions => rest.transportOptions; + @override + set transportOptions(TransportOptions val) { + rest.transportOptions = val; + websocket.transportOptions = val; + } + + @override + DataConnectOptions get options => rest.options; + @override + set options(DataConnectOptions val) { + rest.options = val; + websocket.options = val; + } + + @override + String get appId => rest.appId; + @override + set appId(String val) { + rest.appId = val; + websocket.appId = val; + } + + @override + Future invokeMutation( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + if (websocket.isConnected) { + return websocket.invokeMutation( + queryName, deserializer, serializer, vars, token); + } + return rest.invokeMutation( + queryName, deserializer, serializer, vars, token); + } + + @override + Future invokeQuery( + String queryName, + Deserializer deserializer, + Serializer? serialize, + Variables? vars, + String? token, + ) { + if (websocket.isConnected) { + return websocket.invokeQuery( + queryName, deserializer, serialize, vars, token); + } + return rest.invokeQuery(queryName, deserializer, serialize, vars, token); + } + + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + return websocket.invokeStreamQuery( + queryName, deserializer, serializer, vars, token); + } +} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart index 4bcbcd32a4c2..6c32fb50221c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart @@ -1,3 +1,4 @@ +// ignore_for_file: implementation_imports // // Generated code. Do not modify. // source: google/protobuf/duration.proto diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart index 42d55e426602..42164fbc928f 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart @@ -1,3 +1,4 @@ +// ignore_for_file: implementation_imports // // Generated code. Do not modify. // source: google/protobuf/struct.proto diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart index 180bb209168b..298fb25cfb4e 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart @@ -1,3 +1,4 @@ +// ignore_for_file: deprecated_member_use_from_same_package // Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,7 +15,8 @@ part of 'grpc_library.dart'; -/// Transport used for Android/iOS. Uses a GRPC transport instead of REST. +@Deprecated( + 'Use RestTransport and WebSocketTransport instead. The Data Connect SDK has moved away from gRPC.') class GRPCTransport implements DataConnectTransport { /// GRPCTransport creates a new channel GRPCTransport( @@ -167,6 +169,19 @@ class GRPCTransport implements DataConnectTransport { rethrow; } } + + /// Invokes stream query using WebSockets (even for GRPC clients we fall back to WebSockets for streaming right now). + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + throw UnsupportedError( + 'Streaming should be routed through WebSocketTransport'); + } } ServerResponse handleResponse(CommonResponse commonResponse) { diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart index 2680f692e350..ec8b87103aea 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart @@ -184,6 +184,19 @@ class RestTransport implements DataConnectTransport { token, ); } + + /// WebSockets are now handled by WebSocketTransport in FirebaseDataConnect. + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + throw UnsupportedError( + 'Streaming should be routed through WebSocketTransport'); + } } /// Initializes Rest transport for Data Connect. diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart new file mode 100644 index 000000000000..f64ee0bad44b --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart @@ -0,0 +1,174 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// The kind of streaming request. +enum RequestKind { + subscribe, + execute, + resume, + cancel, +} + +/// Request to execute or subscribe to a Data Connect query or mutation. +class ExecuteRequest { + ExecuteRequest(this.operationName, this.variables); + + final String operationName; + final Map? variables; + + Map toJson() { + final Map data = {}; + data['operationName'] = operationName; + if (variables != null) { + data['variables'] = variables; + } + return data; + } +} + +/// Request to resume a query. +class ResumeRequest { + ResumeRequest(); + + Map toJson() { + return {}; + } +} + +/// StreamRequest defines the request of Data Connect's bi-directional streaming API. +class StreamRequest { + StreamRequest({ + this.name, + this.headers, + this.authToken, + this.appCheckToken, + this.requestId, + this.requestKind, + this.subscribe, + this.execute, + this.resume, + this.cancel, + this.dataEtag, + }); + + /// The resource name of the connector. + final String? name; + + /// Optional headers. + final Map? headers; + + /// Optional Auth token. + final String? authToken; + + /// Optional App Check token. + final String? appCheckToken; + + /// The request id used to identify a request within the stream. + final String? requestId; + + /// Kind of the request. + final RequestKind? requestKind; + + /// Subscribe to a Data Connect query. + final ExecuteRequest? subscribe; + + /// Execute a Data Connect query or mutation. + final ExecuteRequest? execute; + + /// Resume a query. + final ResumeRequest? resume; + + /// Signal that the client is no longer interested. + final bool? cancel; + + /// Etag for caching. + final String? dataEtag; + + Map toJson() { + final Map data = {}; + if (name != null) { + data['name'] = name; + } + if (headers != null) { + data['headers'] = headers; + } + if (authToken != null) { + data['authToken'] = authToken; + } + if (appCheckToken != null) { + data['appCheckToken'] = appCheckToken; + } + if (requestId != null) { + data['requestId'] = requestId; + } + if (dataEtag != null) { + data['dataEtag'] = dataEtag; + } + + if (subscribe != null) { + data['subscribe'] = subscribe!.toJson(); + } else if (execute != null) { + data['execute'] = execute!.toJson(); + } else if (resume != null) { + data['resume'] = resume!.toJson(); + } else if (cancel == true) { + data['cancel'] = {}; + } + + return data; + } +} + +/// StreamResponse defines the response of Data Connect's bi-directional streaming API. +class StreamResponse { + StreamResponse({ + this.requestId, + this.data, + this.dataEtag, + this.errors, + this.cancelled, + this.extensions, + }); + + factory StreamResponse.fromJson(Map json) { + if (json.containsKey('result')) { + json = json['result'] as Map; + } else if (json.containsKey('error')) { + final errObj = json['error'] as Map; + json = { + 'errors': [ + {'message': errObj['message']} + ] + }; + } + + List? errorsList = json['errors'] as List?; + + return StreamResponse( + requestId: json['requestId'] as String?, + data: json['data'] as Map?, + dataEtag: json['dataEtag'] as String?, + errors: errorsList, + cancelled: json['cancelled'] as bool?, + extensions: json['extensions'] as Map?, + ); + } + + final String? requestId; + final Map? data; + final String? dataEtag; + final List? errors; + final bool? cancelled; + final Map? extensions; +} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart index 7da28d92b812..3b851c62989b 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,8 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'dart:async'; +import 'dart:convert'; +import 'dart:developer' as developer; +import 'dart:math'; import 'package:firebase_app_check/firebase_app_check.dart'; +import 'package:firebase_auth/firebase_auth.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import '../common/common_library.dart'; +import '../core/ref.dart'; +import '../dataconnect_version.dart'; +import 'stream_protocol.dart'; part 'transport_stub.dart'; +part 'websocket_transport.dart'; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart index e00c53b5bac1..a0083e0253d6 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -60,6 +60,19 @@ class TransportStub implements DataConnectTransport { throw UnimplementedError(); } + /// Stub for subscribing to a query. + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + // TODO: implement invokeStreamQuery + throw UnimplementedError(); + } + /// Stub for invoking a query. @override Future invokeQuery( diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart new file mode 100644 index 000000000000..03daed7602c0 --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -0,0 +1,410 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +part of 'transport_library.dart'; + +/// WebSocketTransport makes requests out to the streaming endpoints of the configured backend, +/// multiplexing multiple subscriptions and unary operations over a single WebSocket connection. +class WebSocketTransport implements DataConnectTransport { + /// Initializes necessary protocol and port. + WebSocketTransport( + this.transportOptions, + this.options, + this.appId, + this.sdkType, + this.appCheck, [ + this.auth, + ]) { + final protocol = (transportOptions.isSecure ?? true) ? 'wss' : 'ws'; + final host = transportOptions.host; + final port = transportOptions.port ?? 443; + final location = options.location; + + _url = Uri( + scheme: protocol, + host: host, + port: port, + path: '/v1/Connect/locations/$location', + ).toString(); + + _currentUid = auth?.currentUser?.uid; + _authSubscription = auth?.idTokenChanges().listen((user) async { + final newUid = user?.uid; + // Don't disconnect if auth state changes from not logged in to logged in. + // Only disconnect if logged in user changes. + if (_currentUid != null && _currentUid != newUid) { + _disconnect(); + } else if (newUid != null && isConnected) { + try { + final token = await user?.getIdToken(); + final request = StreamRequest( + requestId: _generateRequestId('auth'), + authToken: token, + ); + _channel!.sink.add(jsonEncode(request.toJson())); + } catch (_) { + // Ignored + } + } + _currentUid = newUid; + }); + } + + FirebaseAuth? auth; + String? _currentUid; + // ignore: unused_field + StreamSubscription? _authSubscription; //required to hold reference + + @override + FirebaseAppCheck? appCheck; + + @override + CallerSDKType sdkType; + + late String _url; + + @override + TransportOptions transportOptions; + + @override + DataConnectOptions options; + + @override + String appId; + + WebSocketChannel? _channel; + // ignore: unused_field + StreamSubscription? _channelSubscription; + + // Active listeners for stream subscriptions mapped by requestId. + final Map>> _streamListeners = + {}; + + // Active completers for unary operations mapped by requestId. + final Map>> _unaryListeners = {}; + + // Active subscriptions mapped by operationId => requestId. + final Map _activeSubscriptions = {}; + + final Random _random = Random(); + static const String _chars = 'abcdefghijklmnopqrstuvwxyz0123456789'; + + String _generateRequestId(String operationName) { + final randStr = String.fromCharCodes(Iterable.generate( + 15, (_) => _chars.codeUnitAt(_random.nextInt(_chars.length)))); + return '${operationName}_$randStr'; + } + + bool get isConnected => _channel != null; + + Map _buildHeaders(String? authToken, String? appCheckToken) { + Map headers = { + 'x-goog-api-client': getGoogApiVal(sdkType, packageVersion), + 'x-firebase-client': getFirebaseClientVal(packageVersion) + }; + if (authToken != null) { + headers['X-Firebase-Auth-Token'] = authToken; + } + if (appCheckToken != null) { + headers['X-Firebase-AppCheck'] = appCheckToken; + } + headers['x-firebase-gmpid'] = appId; + return headers; + } + + Future _ensureConnected(String? authToken) async { + if (_channel != null) return; + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + _channel = WebSocketChannel.connect(Uri.parse(_url)); + _channelSubscription = _channel!.stream.listen( + _onMessage, + onError: _onError, + onDone: _onDone, + ); + + final initRequest = StreamRequest( + name: + 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}', + headers: headers, + ); + _channel!.sink.add(jsonEncode(initRequest.toJson())); + } + + void _onMessage(dynamic message) { + try { + developer.log("Received stream response \n $message"); + final bodyJson = jsonDecode(message as String) as Map; + final response = StreamResponse.fromJson(bodyJson); + + final requestId = response.requestId; + if (requestId == null) return; + + final serverResponse = ServerResponse( + response.data ?? {}, + extensions: response.extensions, + ); + + // Append errors if any exist on the stream payload + if (response.errors != null && response.errors!.isNotEmpty) { + // We simulate a DataConnectOperationError payload structure + // so that ref.dart can parse it correctly + serverResponse.data['errors'] = response.errors; + } + + if (_unaryListeners.containsKey(requestId)) { + final completers = _unaryListeners.remove(requestId)!; + for (final completer in completers) { + completer.complete(serverResponse); + } + } + + if (_streamListeners.containsKey(requestId)) { + final controllers = _streamListeners[requestId]!; + if (response.cancelled == true) { + for (final controller in controllers) { + controller.close(); + } + _streamListeners.remove(requestId); + _activeSubscriptions.removeWhere((key, value) => value == requestId); + } else { + for (final controller in controllers) { + controller.add(serverResponse); + } + } + } + } catch (e) { + // JSON decoding error or unknown format + developer.log('error decoding server response $e'); + } + } + + void _onError(dynamic error) { + final e = + DataConnectError(DataConnectErrorCode.other, 'WebSocket error: $error'); + for (final completers in _unaryListeners.values) { + for (final completer in completers) { + completer.completeError(e); + } + } + for (final controllers in _streamListeners.values) { + for (final controller in controllers) { + controller.addError(e); + } + } + _unaryListeners.clear(); + _streamListeners.clear(); + _activeSubscriptions.clear(); + _channel = null; + } + + void _disconnect() { + _channel?.sink.close(); + } + + void _onDone() { + _channel = null; + for (final controllers in _streamListeners.values) { + for (final controller in controllers) { + controller.close(); + } + } + _unaryListeners.clear(); + _streamListeners.clear(); + _activeSubscriptions.clear(); + } + + @override + Future invokeQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) async { + return _invokeUnary(queryName, deserializer, serializer, vars, authToken, + RequestKind.execute); + } + + @override + Future invokeMutation( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) async { + return _invokeUnary(queryName, deserializer, serializer, vars, authToken, + RequestKind.execute); + } + + Future _invokeUnary( + String operationName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + RequestKind requestKind, + ) async { + await _ensureConnected(authToken); + + final operationId = + OperationRef.createOperationId(operationName, vars, serializer); + final completer = Completer(); + + if (_activeSubscriptions.containsKey(operationId)) { + final existingRequestId = _activeSubscriptions[operationId]!; + _unaryListeners.putIfAbsent(existingRequestId, () => []).add(completer); + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + authToken: authToken, + appCheckToken: appCheckToken, + requestId: existingRequestId, + requestKind: RequestKind.resume, + resume: ResumeRequest(), + headers: headers, + ); + _channel!.sink.add(jsonEncode(request.toJson())); + + return completer.future; + } + + final requestId = _generateRequestId(operationId); + _unaryListeners.putIfAbsent(requestId, () => []).add(completer); + + Map? variables; + if (vars != null && serializer != null) { + variables = json.decode(serializer(vars)); + } + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + authToken: authToken, + appCheckToken: appCheckToken, + requestId: requestId, + requestKind: requestKind, + execute: ExecuteRequest(operationName, variables), + headers: headers, + ); + + _channel!.sink.add(jsonEncode(request.toJson())); + + return completer.future; + } + + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) { + late StreamController controller; + final operationId = + OperationRef.createOperationId(queryName, vars, serializer); + + controller = StreamController( + onListen: () async { + await _ensureConnected(authToken); + + if (_activeSubscriptions.containsKey(operationId)) { + final existingRequestId = _activeSubscriptions[operationId]!; + _streamListeners + .putIfAbsent(existingRequestId, () => []) + .add(controller); + return; + } + + final requestId = _generateRequestId(operationId); + _activeSubscriptions[operationId] = requestId; + _streamListeners.putIfAbsent(requestId, () => []).add(controller); + + Map? variables; + if (vars != null && serializer != null) { + variables = json.decode(serializer(vars)); + } + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + authToken: authToken, + appCheckToken: appCheckToken, + requestId: requestId, + requestKind: RequestKind.subscribe, + subscribe: ExecuteRequest(queryName, variables), + headers: headers, + ); + + _channel!.sink.add(jsonEncode(request.toJson())); + }, + onCancel: () { + if (!_activeSubscriptions.containsKey(operationId)) return; + final requestId = _activeSubscriptions[operationId]!; + + final listeners = _streamListeners[requestId]; + if (listeners != null) { + listeners.remove(controller); + if (listeners.isEmpty) { + _streamListeners.remove(requestId); + _activeSubscriptions.remove(operationId); + + if (_channel != null) { + final cancelReq = StreamRequest( + requestId: requestId, + requestKind: RequestKind.cancel, + cancel: true, + ); + _channel!.sink.add(jsonEncode(cancelReq.toJson())); + } + } + } + }, + ); + + return controller.stream; + } +} diff --git a/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml b/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml index d0145d0290ac..3bd28b86b42c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml +++ b/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml @@ -27,6 +27,7 @@ dependencies: protobuf: ^3.1.0 sqlite3: ^2.9.0 sqlite3_flutter_libs: ^0.5.40 + web_socket_channel: ^3.0.1 dev_dependencies: build_runner: ^2.4.12 diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart index dec53744b7e3..6848824996c4 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart @@ -233,7 +233,7 @@ void main() { }); test('maxAge conformance', () async { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponseSuccess = http.Response('{"success": true}', 200); if (dataConnect.cacheManager == null) { diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart index 09f67581ad75..a99e554cb2d7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Mocks generated by Mockito 5.4.6 from annotations -// in firebase_data_connect/test/src/cache/cache_manager_test.dart. -// Do not manually edit this file. - // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i5; @@ -39,7 +35,6 @@ import 'package:mockito/src/dummies.dart' as _i4; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class -// ignore_for_file: invalid_use_of_internal_member class _FakeFirebaseOptions_0 extends _i1.SmartFake implements _i2.FirebaseOptions { @@ -166,28 +161,28 @@ class MockConnectorConfig extends _i1.Mock implements _i6.ConnectorConfig { ) as String); @override - set location(String? value) => super.noSuchMethod( + set location(String? _location) => super.noSuchMethod( Invocation.setter( #location, - value, + _location, ), returnValueForMissingStub: null, ); @override - set connector(String? value) => super.noSuchMethod( + set connector(String? _connector) => super.noSuchMethod( Invocation.setter( #connector, - value, + _connector, ), returnValueForMissingStub: null, ); @override - set serviceId(String? value) => super.noSuchMethod( + set serviceId(String? _serviceId) => super.noSuchMethod( Invocation.setter( #serviceId, - value, + _serviceId, ), returnValueForMissingStub: null, ); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart index 353f34aeec75..2d526f0ef8c0 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart @@ -103,7 +103,7 @@ void main() { test('should handle invokeQuery with proper deserializer', () async { const queryName = 'testQuery'; - final deserializer = (json) => json; + deserializer(json) => json; final result = await transport.invokeQuery( queryName, deserializer, @@ -117,7 +117,7 @@ void main() { test('should handle invokeMutation with proper deserializer', () async { const queryName = 'testMutation'; - final deserializer = (json) => json; + deserializer(json) => json; final result = await transport.invokeMutation( queryName, deserializer, @@ -134,12 +134,12 @@ void main() { // Test class extending DataConnectTransport for testing purposes class TestDataConnectTransport extends DataConnectTransport { TestDataConnectTransport( - TransportOptions transportOptions, - DataConnectOptions options, - String appId, - CallerSDKType sdkType, { + super.transportOptions, + super.options, + super.appId, + super.sdkType, { FirebaseAppCheck? appCheck, - }) : super(transportOptions, options, appId, sdkType) { + }) { this.appCheck = appCheck; } @@ -166,4 +166,15 @@ class TestDataConnectTransport extends DataConnectTransport { // Simulate mutation invocation logic here return ServerResponse({}); } + + @override + Stream invokeStreamQuery( + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) { + return Stream.value(ServerResponse({})); + } } diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart index 3cb6c9ce26c8..5735f324f3b4 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart @@ -79,8 +79,7 @@ void main() { group('Serializer and Deserializer', () { test('should serialize variables into string format', () { - Serializer> serializer = - (Map vars) => vars.toString(); + String serializer(Map vars) => vars.toString(); final inputVars = {'key1': 'value1', 'key2': 123}; final serializedString = serializer(inputVars); @@ -89,8 +88,7 @@ void main() { }); test('should deserialize string data into expected format', () { - Deserializer> deserializer = - (String data) => {'data': data}; + deserializer(String data) => {'data': data}; const inputData = '{"message": "Hello World"}'; final deserializedData = deserializer(inputData); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart index 636cae0cde52..4e615d7ab8f1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart @@ -82,7 +82,7 @@ void main() { test( 'addQuery should create a new StreamController if query does not exist', () { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; String varSerializer(Object? _) { return 'varsAsStr'; } @@ -149,7 +149,7 @@ void main() { mockDataConnect.transport = transport; }); test('executeQuery should gracefully handle getIdToken failures', () async { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponseSuccess = http.Response('{"success": true}', 200); when(mockUser.getIdToken()).thenThrow(Exception('Auth error')); QueryRef ref = QueryRef( @@ -175,7 +175,7 @@ void main() { () async { final mockResponse = http.Response('{"error": "Unauthorized"}', 401); final mockResponseSuccess = http.Response('{"success": true}', 200); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; int count = 0; int idTokenCount = 0; QueryRef ref = QueryRef( @@ -219,7 +219,7 @@ void main() { }); test('throw Error if server throws one', () { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponse = http.Response( ''' { @@ -285,10 +285,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, @@ -339,10 +339,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, @@ -390,10 +390,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart index 2deb28f22749..b4edc25b2a81 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart @@ -25,7 +25,16 @@ import 'package:mockito/mockito.dart'; @GenerateNiceMocks([MockSpec(), MockSpec()]) import 'firebase_data_connect_test.mocks.dart'; -class MockFirebaseAuth extends Mock implements FirebaseAuth {} +class MockFirebaseAuth extends Mock implements FirebaseAuth { + @override + Stream idTokenChanges() { + return super.noSuchMethod( + Invocation.method(#idTokenChanges, []), + returnValue: const Stream.empty(), + returnValueForMissingStub: const Stream.empty(), + ) as Stream; + } +} class MockFirebaseAppCheck extends Mock implements FirebaseAppCheck {} diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart index 06634cc3f1ec..a99e554cb2d7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Mocks generated by Mockito 5.4.6 from annotations -// in firebase_data_connect/test/src/firebase_data_connect_test.dart. -// Do not manually edit this file. - // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i5; @@ -39,7 +35,6 @@ import 'package:mockito/src/dummies.dart' as _i4; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class -// ignore_for_file: invalid_use_of_internal_member class _FakeFirebaseOptions_0 extends _i1.SmartFake implements _i2.FirebaseOptions { @@ -166,28 +161,28 @@ class MockConnectorConfig extends _i1.Mock implements _i6.ConnectorConfig { ) as String); @override - set location(String? value) => super.noSuchMethod( + set location(String? _location) => super.noSuchMethod( Invocation.setter( #location, - value, + _location, ), returnValueForMissingStub: null, ); @override - set connector(String? value) => super.noSuchMethod( + set connector(String? _connector) => super.noSuchMethod( Invocation.setter( #connector, - value, + _connector, ), returnValueForMissingStub: null, ); @override - set serviceId(String? value) => super.noSuchMethod( + set serviceId(String? _serviceId) => super.noSuchMethod( Invocation.setter( #serviceId, - value, + _serviceId, ), returnValueForMissingStub: null, ); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart index bad97d364d32..d5325e89faab 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart @@ -98,7 +98,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; expect( () => transport.invokeOperation( @@ -124,7 +124,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; expect( () => transport.invokeOperation( @@ -150,7 +150,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeQuery('testQuery', deserializer, null, null, null); @@ -178,7 +178,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Mutation Data'; + String deserializer(String data) => 'Deserialized Mutation Data'; await transport.invokeMutation( 'testMutation', @@ -215,7 +215,7 @@ void main() { when(mockUser.getIdToken()).thenAnswer((_) async => 'authToken123'); when(mockAppCheck.getToken()).thenAnswer((_) async => 'appCheckToken123'); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', @@ -250,7 +250,7 @@ void main() { when(mockUser.getIdToken()).thenAnswer((_) async => 'authToken123'); when(mockAppCheck.getToken()).thenAnswer((_) async => 'appCheckToken123'); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', @@ -297,7 +297,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final result = await transport.invokeOperation( 'testQuery', @@ -327,7 +327,7 @@ void main() { when(mockUser.getIdToken()).thenThrow(Exception('Auth error')); when(mockAppCheck.getToken()).thenThrow(Exception('AppCheck error')); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart index 828b48d4f7e7..909c251f0434 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Mocks generated by Mockito 5.4.6 from annotations -// in firebase_data_connect/test/src/network/rest_transport_test.dart. -// Do not manually edit this file. - // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i6; import 'dart:convert' as _i7; @@ -45,7 +41,6 @@ import 'package:mockito/src/dummies.dart' as _i8; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class -// ignore_for_file: invalid_use_of_internal_member class _FakeResponse_0 extends _i1.SmartFake implements _i2.Response { _FakeResponse_0( @@ -754,10 +749,10 @@ class MockFirebaseAppCheck extends _i1.Mock implements _i10.FirebaseAppCheck { ) as _i6.Stream); @override - set app(_i5.FirebaseApp? value) => super.noSuchMethod( + set app(_i5.FirebaseApp? _app) => super.noSuchMethod( Invocation.setter( #app, - value, + _app, ), returnValueForMissingStub: null, ); From a2461d8d758460fbaf8441d2b53343741f453953 Mon Sep 17 00:00:00 2001 From: Aashish <112133849+aashishpatil-g@users.noreply.github.com> Date: Thu, 2 Apr 2026 19:28:25 -0700 Subject: [PATCH 02/13] feat(fdc): Handle disconnects and reconnects (#18157) * Handle disconnects and reconnects * Address gemini review comments * Reconnection logic * Fixes to reconnect on startup * Fix to decode binary data from server * Handle gemini feedback * Update rest_transport mocks with new windows provider sig --- .../lib/src/core/ref.dart | 12 +- .../lib/src/network/websocket_transport.dart | 317 +++++++++++++++--- .../src/cache/cache_manager_test.mocks.dart | 17 +- .../src/firebase_data_connect_test.mocks.dart | 17 +- .../network/rest_transport_test.mocks.dart | 9 +- 5 files changed, 314 insertions(+), 58 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 7a5fd8c2e198..6b5f691d0888 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -334,14 +334,14 @@ class QueryRef extends OperationRef { _streamController!.stream.cast>(); // Return the stream to the caller, then execute fetches - Future.microtask(() { + Future.microtask(() async { if (dataConnect.cacheManager != null) { - _executeFromCache(QueryFetchPolicy.cacheOnly) - .then((_) {}) - .catchError((err) { + try { + await _executeFromCache(QueryFetchPolicy.cacheOnly); + } catch (err) { log("Error fetching from cache during subscribe $err"); // Ignore cache misses here, server stream will provide latest data - }); + } } // Initiate Web Socket stream @@ -380,7 +380,7 @@ class QueryRef extends OperationRef { publishErrorToStream(e); } } catch (e) { - publishErrorToStream(e as Error); + publishErrorToStream(e); } } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index 03daed7602c0..51e81237add4 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -16,7 +16,30 @@ part of 'transport_library.dart'; /// WebSocketTransport makes requests out to the streaming endpoints of the configured backend, /// multiplexing multiple subscriptions and unary operations over a single WebSocket connection. + +class _PendingUnary { + final Completer completer; + final String operationName; + final Map? variables; + final bool isMutation; + + _PendingUnary( + this.completer, this.operationName, this.variables, this.isMutation); +} + +class _PendingSubscription { + final String operationId; + final String queryName; + final Map? variables; + + _PendingSubscription(this.operationId, this.queryName, this.variables); +} + class WebSocketTransport implements DataConnectTransport { + static const int _maxReconnectAttempts = 10; + static const int _maxReconnectDelayMs = 30000; + static const int _initialReconnectDelayMs = 1000; + /// Initializes necessary protocol and port. WebSocketTransport( this.transportOptions, @@ -35,24 +58,26 @@ class WebSocketTransport implements DataConnectTransport { scheme: protocol, host: host, port: port, - path: '/v1/Connect/locations/$location', + path: + '/ws/google.firebase.dataconnect.v1.ConnectorStreamService/Connect/locations/$location', ).toString(); _currentUid = auth?.currentUser?.uid; _authSubscription = auth?.idTokenChanges().listen((user) async { final newUid = user?.uid; - // Don't disconnect if auth state changes from not logged in to logged in. - // Only disconnect if logged in user changes. - if (_currentUid != null && _currentUid != newUid) { + // Disconnect and reconnect on any fundamental user change (login, logout, switch). + if (_currentUid != newUid) { _disconnect(); + _scheduleReconnect(); } else if (newUid != null && isConnected) { + // Token refreshed for the same user, push the new token natively down the socket. try { final token = await user?.getIdToken(); final request = StreamRequest( requestId: _generateRequestId('auth'), authToken: token, ); - _channel!.sink.add(jsonEncode(request.toJson())); + _channel?.sink.add(jsonEncode(request.toJson())); } catch (_) { // Ignored } @@ -91,12 +116,27 @@ class WebSocketTransport implements DataConnectTransport { final Map>> _streamListeners = {}; + // Pending information for subscriptions mapped by requestId. + final Map _pendingSubscriptions = {}; + // Active completers for unary operations mapped by requestId. - final Map>> _unaryListeners = {}; + final Map> _unaryListeners = {}; // Active subscriptions mapped by operationId => requestId. final Map _activeSubscriptions = {}; + bool _isReconnecting = false; + int _reconnectAttempts = 0; + bool _isExpectedDisconnect = false; + + void _checkIdleAndDisconnect() { + if (_streamListeners.isEmpty && _unaryListeners.isEmpty) { + _isExpectedDisconnect = true; + _disconnect(); + _clearState(); + } + } + final Random _random = Random(); static const String _chars = 'abcdefghijklmnopqrstuvwxyz0123456789'; @@ -123,9 +163,18 @@ class WebSocketTransport implements DataConnectTransport { return headers; } - Future _ensureConnected(String? authToken) async { - if (_channel != null) return; + Future? _connectionFuture; + + Future _ensureConnected(String? authToken) { + if (_channel != null) return Future.value(); + if (_connectionFuture != null) return _connectionFuture!; + _connectionFuture = _doConnect(authToken).whenComplete(() { + _connectionFuture = null; + }); + return _connectionFuture!; + } + Future _doConnect(String? authToken) async { String? appCheckToken; try { appCheckToken = await appCheck?.getToken(); @@ -136,24 +185,44 @@ class WebSocketTransport implements DataConnectTransport { final headers = _buildHeaders(authToken, appCheckToken); _channel = WebSocketChannel.connect(Uri.parse(_url)); - _channelSubscription = _channel!.stream.listen( + _channelSubscription = _channel?.stream.listen( _onMessage, onError: _onError, onDone: _onDone, ); + // reset this since an explicit connect was requested + _isExpectedDisconnect = false; + + try { + await _channel?.ready; + } catch (e) { + developer.log('WebSocket connection failed to become ready: $e'); + _channel = null; + throw DataConnectError( + DataConnectErrorCode.other, 'WebSocket connection failed: $e'); + } + final initRequest = StreamRequest( name: 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}', headers: headers, ); - _channel!.sink.add(jsonEncode(initRequest.toJson())); + _channel?.sink.add(jsonEncode(initRequest.toJson())); } + // called when a message is received from the stream void _onMessage(dynamic message) { try { - developer.log("Received stream response \n $message"); - final bodyJson = jsonDecode(message as String) as Map; + var bodyString = ''; + if (message is List) { + bodyString = utf8.decode(message); + } else { + bodyString = message as String; + } + developer.log("Received stream response \n $bodyString"); + + final bodyJson = jsonDecode(bodyString) as Map; final response = StreamResponse.fromJson(bodyJson); final requestId = response.requestId; @@ -172,10 +241,13 @@ class WebSocketTransport implements DataConnectTransport { } if (_unaryListeners.containsKey(requestId)) { - final completers = _unaryListeners.remove(requestId)!; - for (final completer in completers) { - completer.complete(serverResponse); + final pendings = _unaryListeners.remove(requestId)!; + for (final p in pendings) { + if (!p.completer.isCompleted) { + p.completer.complete(serverResponse); + } } + _checkIdleAndDisconnect(); } if (_streamListeners.containsKey(requestId)) { @@ -186,6 +258,8 @@ class WebSocketTransport implements DataConnectTransport { } _streamListeners.remove(requestId); _activeSubscriptions.removeWhere((key, value) => value == requestId); + _pendingSubscriptions.remove(requestId); + _checkIdleAndDisconnect(); } else { for (final controller in controllers) { controller.add(serverResponse); @@ -198,39 +272,178 @@ class WebSocketTransport implements DataConnectTransport { } } - void _onError(dynamic error) { - final e = - DataConnectError(DataConnectErrorCode.other, 'WebSocket error: $error'); - for (final completers in _unaryListeners.values) { - for (final completer in completers) { - completer.completeError(e); + void _clearState([DataConnectError? error]) { + final e = error ?? + DataConnectError( + DataConnectErrorCode.other, 'WebSocket connection closed.'); + for (final pendings in _unaryListeners.values) { + for (final p in pendings) { + if (!p.completer.isCompleted) { + p.completer.completeError(e); + } } } for (final controllers in _streamListeners.values) { for (final controller in controllers) { controller.addError(e); + controller.close(); } } _unaryListeners.clear(); _streamListeners.clear(); _activeSubscriptions.clear(); + _pendingSubscriptions.clear(); + _isReconnecting = false; + _reconnectAttempts = 0; + } + + Timer? _reconnectTimer; + + void _scheduleReconnect() { + developer.log( + '${DateTime.now()} _scheduleReconnect $_reconnectAttempts $_isReconnecting $_isExpectedDisconnect'); + if (_isReconnecting || _isExpectedDisconnect) return; + _isReconnecting = true; + + if (_reconnectAttempts >= _maxReconnectAttempts) { + _clearState(DataConnectError(DataConnectErrorCode.other, + 'Network disconnected after max attempts.')); + return; + } + + final delay = min( + _initialReconnectDelayMs * pow(2, _reconnectAttempts).toInt(), + _maxReconnectDelayMs); + var startTime = DateTime.now(); + developer.log('$startTime scheduling _performReconnect in $delay ms'); + + _reconnectTimer?.cancel(); + _reconnectTimer = Timer(Duration(milliseconds: delay), () async { + developer.log( + '${DateTime.now()} calling delayed _performReconnect scheduled at $startTime'); + _performReconnect(); + }); + } + + Future _refreshAuthToken() async { + try { + return await auth?.currentUser?.getIdToken(); + } catch (_) { + // If fetching token fails, continue unauthenticated. + return null; + } + } + + Future _refreshAppCheckToken() async { + try { + if (appCheck != null) { + return await appCheck!.getToken(); + } + } catch (_) { + // Ignored: continue without AppCheck token if it fails. + } + return null; + } + + void _resubscribeActive(String? authToken, String? appCheckToken) { + for (final sub in _pendingSubscriptions.values) { + final reqId = _activeSubscriptions[sub.operationId]; + if (reqId == null) continue; + final headers = _buildHeaders(authToken, appCheckToken); + final request = StreamRequest( + authToken: authToken, + appCheckToken: headers['X-Firebase-AppCheck'], + requestId: reqId, + requestKind: RequestKind.subscribe, + subscribe: ExecuteRequest(sub.queryName, sub.variables), + headers: headers, + ); + _channel?.sink.add(jsonEncode(request.toJson())); + } + } + + void _replayQueriesAndFailMutations( + String? authToken, String? appCheckToken) { + final unariesToReplay = >{}; + for (final entry in _unaryListeners.entries) { + final reqId = entry.key; + final kept = <_PendingUnary>[]; + for (final p in entry.value) { + if (p.isMutation) { + p.completer.completeError(DataConnectError(DataConnectErrorCode.other, + 'Network reconnected; mutations cannot be safely retried.')); + } else { + kept.add(p); + final headers = _buildHeaders(authToken, appCheckToken); + final request = StreamRequest( + authToken: authToken, + appCheckToken: headers['X-Firebase-AppCheck'], + requestId: reqId, + requestKind: RequestKind.execute, + execute: ExecuteRequest(p.operationName, p.variables), + headers: headers, + ); + _channel?.sink.add(jsonEncode(request.toJson())); + } + } + if (kept.isNotEmpty) { + unariesToReplay[reqId] = kept; + } + } + _unaryListeners.clear(); + _unaryListeners.addAll(unariesToReplay); + } + + Future _performReconnect() async { + _channel?.sink.close(); _channel = null; + _reconnectAttempts++; + + final authToken = await _refreshAuthToken(); + final appCheckToken = await _refreshAppCheckToken(); + + try { + await _ensureConnected(authToken); + + _reconnectAttempts = 0; + _isReconnecting = false; + + _resubscribeActive(authToken, appCheckToken); + _replayQueriesAndFailMutations(authToken, appCheckToken); + } catch (e) { + _isReconnecting = false; + _scheduleReconnect(); + } + } + + void _onError(dynamic error) { + if (_channel == null) return; + developer.log('WebSocket error: $error'); + _channel = null; + _isReconnecting = false; + _scheduleReconnect(); } void _disconnect() { + _reconnectTimer?.cancel(); + _reconnectTimer = null; _channel?.sink.close(); + _channel = null; + } + + void disconnect() { + _isExpectedDisconnect = true; + _disconnect(); } void _onDone() { + if (_channel == null) return; + developer.log('WebSocket connection closed.'); _channel = null; - for (final controllers in _streamListeners.values) { - for (final controller in controllers) { - controller.close(); - } + _isReconnecting = false; + if (!_isExpectedDisconnect) { + _scheduleReconnect(); } - _unaryListeners.clear(); - _streamListeners.clear(); - _activeSubscriptions.clear(); } @override @@ -242,7 +455,7 @@ class WebSocketTransport implements DataConnectTransport { String? authToken, ) async { return _invokeUnary(queryName, deserializer, serializer, vars, authToken, - RequestKind.execute); + RequestKind.execute, false); } @override @@ -254,7 +467,7 @@ class WebSocketTransport implements DataConnectTransport { String? authToken, ) async { return _invokeUnary(queryName, deserializer, serializer, vars, authToken, - RequestKind.execute); + RequestKind.execute, true); } Future _invokeUnary( @@ -264,6 +477,7 @@ class WebSocketTransport implements DataConnectTransport { Variables? vars, String? authToken, RequestKind requestKind, + bool isMutation, ) async { await _ensureConnected(authToken); @@ -273,7 +487,12 @@ class WebSocketTransport implements DataConnectTransport { if (_activeSubscriptions.containsKey(operationId)) { final existingRequestId = _activeSubscriptions[operationId]!; - _unaryListeners.putIfAbsent(existingRequestId, () => []).add(completer); + Map? variablesMap; + if (vars != null && serializer != null) { + variablesMap = jsonDecode(serializer(vars)); + } + _unaryListeners.putIfAbsent(existingRequestId, () => []).add( + _PendingUnary(completer, operationName, variablesMap, isMutation)); String? appCheckToken; try { @@ -292,18 +511,20 @@ class WebSocketTransport implements DataConnectTransport { resume: ResumeRequest(), headers: headers, ); - _channel!.sink.add(jsonEncode(request.toJson())); + _channel?.sink.add(jsonEncode(request.toJson())); return completer.future; } final requestId = _generateRequestId(operationId); - _unaryListeners.putIfAbsent(requestId, () => []).add(completer); Map? variables; if (vars != null && serializer != null) { - variables = json.decode(serializer(vars)); + variables = jsonDecode(serializer(vars)); } + _unaryListeners + .putIfAbsent(requestId, () => []) + .add(_PendingUnary(completer, operationName, variables, isMutation)); String? appCheckToken; try { @@ -323,7 +544,7 @@ class WebSocketTransport implements DataConnectTransport { headers: headers, ); - _channel!.sink.add(jsonEncode(request.toJson())); + _channel?.sink.add(jsonEncode(request.toJson())); return completer.future; } @@ -342,7 +563,14 @@ class WebSocketTransport implements DataConnectTransport { controller = StreamController( onListen: () async { - await _ensureConnected(authToken); + try { + await _ensureConnected(authToken); + } catch (e) { + developer.log("Error subscribing - setting up stream $e"); + // Do NOT add error to sink here. The stream is designed to quietly + // add the query to `_pendingSubscriptions` below and silently + // retry when the network reconnects via `_scheduleReconnect`. + } if (_activeSubscriptions.containsKey(operationId)) { final existingRequestId = _activeSubscriptions[operationId]!; @@ -360,6 +588,15 @@ class WebSocketTransport implements DataConnectTransport { if (vars != null && serializer != null) { variables = json.decode(serializer(vars)); } + _pendingSubscriptions[requestId] = + _PendingSubscription(operationId, queryName, variables); + + if (!isConnected) { + // we are not connected - + // keep pending sub to use for retry + _scheduleReconnect(); + return; + } String? appCheckToken; try { @@ -379,7 +616,9 @@ class WebSocketTransport implements DataConnectTransport { headers: headers, ); - _channel!.sink.add(jsonEncode(request.toJson())); + if (_channel != null) { + _channel?.sink.add(jsonEncode(request.toJson())); + } }, onCancel: () { if (!_activeSubscriptions.containsKey(operationId)) return; @@ -391,6 +630,7 @@ class WebSocketTransport implements DataConnectTransport { if (listeners.isEmpty) { _streamListeners.remove(requestId); _activeSubscriptions.remove(operationId); + _pendingSubscriptions.remove(requestId); if (_channel != null) { final cancelReq = StreamRequest( @@ -398,8 +638,9 @@ class WebSocketTransport implements DataConnectTransport { requestKind: RequestKind.cancel, cancel: true, ); - _channel!.sink.add(jsonEncode(cancelReq.toJson())); + _channel?.sink.add(jsonEncode(cancelReq.toJson())); } + _checkIdleAndDisconnect(); } } }, diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart index a99e554cb2d7..09f67581ad75 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.mocks.dart @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Mocks generated by Mockito 5.4.6 from annotations +// in firebase_data_connect/test/src/cache/cache_manager_test.dart. +// Do not manually edit this file. + // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i5; @@ -35,6 +39,7 @@ import 'package:mockito/src/dummies.dart' as _i4; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class +// ignore_for_file: invalid_use_of_internal_member class _FakeFirebaseOptions_0 extends _i1.SmartFake implements _i2.FirebaseOptions { @@ -161,28 +166,28 @@ class MockConnectorConfig extends _i1.Mock implements _i6.ConnectorConfig { ) as String); @override - set location(String? _location) => super.noSuchMethod( + set location(String? value) => super.noSuchMethod( Invocation.setter( #location, - _location, + value, ), returnValueForMissingStub: null, ); @override - set connector(String? _connector) => super.noSuchMethod( + set connector(String? value) => super.noSuchMethod( Invocation.setter( #connector, - _connector, + value, ), returnValueForMissingStub: null, ); @override - set serviceId(String? _serviceId) => super.noSuchMethod( + set serviceId(String? value) => super.noSuchMethod( Invocation.setter( #serviceId, - _serviceId, + value, ), returnValueForMissingStub: null, ); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart index a99e554cb2d7..06634cc3f1ec 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.mocks.dart @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Mocks generated by Mockito 5.4.6 from annotations +// in firebase_data_connect/test/src/firebase_data_connect_test.dart. +// Do not manually edit this file. + // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i5; @@ -35,6 +39,7 @@ import 'package:mockito/src/dummies.dart' as _i4; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class +// ignore_for_file: invalid_use_of_internal_member class _FakeFirebaseOptions_0 extends _i1.SmartFake implements _i2.FirebaseOptions { @@ -161,28 +166,28 @@ class MockConnectorConfig extends _i1.Mock implements _i6.ConnectorConfig { ) as String); @override - set location(String? _location) => super.noSuchMethod( + set location(String? value) => super.noSuchMethod( Invocation.setter( #location, - _location, + value, ), returnValueForMissingStub: null, ); @override - set connector(String? _connector) => super.noSuchMethod( + set connector(String? value) => super.noSuchMethod( Invocation.setter( #connector, - _connector, + value, ), returnValueForMissingStub: null, ); @override - set serviceId(String? _serviceId) => super.noSuchMethod( + set serviceId(String? value) => super.noSuchMethod( Invocation.setter( #serviceId, - _serviceId, + value, ), returnValueForMissingStub: null, ); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart index 909c251f0434..828b48d4f7e7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.mocks.dart @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Mocks generated by Mockito 5.4.6 from annotations +// in firebase_data_connect/test/src/network/rest_transport_test.dart. +// Do not manually edit this file. + // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i6; import 'dart:convert' as _i7; @@ -41,6 +45,7 @@ import 'package:mockito/src/dummies.dart' as _i8; // ignore_for_file: unnecessary_parenthesis // ignore_for_file: camel_case_types // ignore_for_file: subtype_of_sealed_class +// ignore_for_file: invalid_use_of_internal_member class _FakeResponse_0 extends _i1.SmartFake implements _i2.Response { _FakeResponse_0( @@ -749,10 +754,10 @@ class MockFirebaseAppCheck extends _i1.Mock implements _i10.FirebaseAppCheck { ) as _i6.Stream); @override - set app(_i5.FirebaseApp? _app) => super.noSuchMethod( + set app(_i5.FirebaseApp? value) => super.noSuchMethod( Invocation.setter( #app, - _app, + value, ), returnValueForMissingStub: null, ); From 98c2e6c62241962aa50380a257d3371e343a227d Mon Sep 17 00:00:00 2001 From: Aashish <112133849+aashishpatil-g@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:45:27 -0700 Subject: [PATCH 03/13] feat(fdc): Hardening (#18173) * debug logging and hardering * Hardening: ensure only one multicast stream controller is created in a ref * Handle cancellations * Formatting fix --- .../lib/src/core/ref.dart | 92 ++++++++++++++----- .../lib/src/network/websocket_transport.dart | 4 +- .../test/src/core/ref_test.dart | 3 +- 3 files changed, 72 insertions(+), 27 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 6b5f691d0888..a65f85160735 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -80,9 +80,12 @@ abstract class OperationRef { try { final decoded = jsonDecode(serializer(vars)); final sortedStr = jsonEncode(_sortKeys(decoded)); - return '$operationName::$sortedStr'; + final hashVars = convertToSha256(sortedStr); + return '$operationName::$hashVars'; } catch (_) { - return '$operationName::${serializer(vars)}'; + final rawVars = serializer(vars); + final hashVars = convertToSha256(rawVars); + return '$operationName::$hashVars'; } } else { return operationName; @@ -180,7 +183,7 @@ class QueryManager { try { await queryRef.execute(fetchPolicy: QueryFetchPolicy.cacheOnly); } catch (e) { - log('Error executing impacted query $e'); + log('Error executing impacted query $queryId $e'); } } } @@ -207,7 +210,12 @@ class QueryManager { trackedQueries[queryId] = ref; final streamController = - StreamController>.broadcast(); + StreamController>.broadcast( + onCancel: () { + trackedQueries.remove(queryId); + ref._onAllSubscribersCancelled(); + }, + ); return streamController; } @@ -326,6 +334,15 @@ class QueryRef extends OperationRef { } StreamController>? _streamController; + Stream? _serverStream; + StreamSubscription? _serverStreamSubscription; + + void _onAllSubscribersCancelled() { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + log("QueryRef $_queryId: All subscribers cancelled. Unsubscribed from server stream."); + } Stream> subscribe() { _streamController ??= _queryManager.addQuery(this); @@ -344,8 +361,10 @@ class QueryRef extends OperationRef { } } - // Initiate Web Socket stream - _streamFromServer(); + // Initiate Web Socket stream only if not already streaming + if (_serverStream == null) { + _streamFromServer(); + } }); return stream; @@ -353,8 +372,9 @@ class QueryRef extends OperationRef { void _streamFromServer() async { bool shouldRetry = await _shouldRetry(); + log("QueryRef $_queryId _streamFromServer loop started."); try { - final stream = _transport.invokeStreamQuery( + _serverStream = _transport.invokeStreamQuery( operationName, deserializer, serializer, @@ -362,24 +382,46 @@ class QueryRef extends OperationRef { _lastToken, ); - await for (final serverResponse in stream) { - if (dataConnect.cacheManager != null) { - await dataConnect.cacheManager!.update(_queryId, serverResponse); - } - Data typedData = _convertBodyJsonToData(serverResponse.data); - - QueryResult res = - QueryResult(dataConnect, typedData, DataSource.server, this); - publishResultToStream(res); - } - } on DataConnectError catch (e) { - if (shouldRetry && - e.code == DataConnectErrorCode.unauthorized.toString()) { - _streamFromServer(); - } else { - publishErrorToStream(e); - } + _serverStreamSubscription = _serverStream!.listen( + (serverResponse) async { + log("QueryRef $_queryId _streamFromServer loop received snapshot."); + if (dataConnect.cacheManager != null) { + try { + await dataConnect.cacheManager!.update(_queryId, serverResponse); + } catch (e) { + log("QueryRef $_queryId _streamFromServer loop cache update failed: $e"); + } + } + Data typedData = _convertBodyJsonToData(serverResponse.data); + + QueryResult res = + QueryResult(dataConnect, typedData, DataSource.server, this); + publishResultToStream(res); + }, + onError: (e) { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + + if (shouldRetry && + e is DataConnectError && + e.code == DataConnectErrorCode.unauthorized.toString()) { + _streamFromServer(); + } else { + publishErrorToStream(e); + } + }, + onDone: () { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + }, + ); } catch (e) { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + log("QueryRef $_queryId _streamFromServer loop Unknown loop failure: $e"); publishErrorToStream(e); } } @@ -387,6 +429,8 @@ class QueryRef extends OperationRef { void publishResultToStream(QueryResult result) { if (_streamController != null) { _streamController?.add(result); + } else { + log("QueryRef $_queryId _streamFromServer loop _streamController is null"); } } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index 51e81237add4..76aac91955fc 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -617,7 +617,9 @@ class WebSocketTransport implements DataConnectTransport { ); if (_channel != null) { - _channel?.sink.add(jsonEncode(request.toJson())); + final encodedMessage = jsonEncode(request.toJson()); + developer.log('Sending subscribe message $encodedMessage'); + _channel?.sink.add(encodedMessage); } }, onCancel: () { diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart index 4e615d7ab8f1..b057f4aeb71f 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart @@ -98,8 +98,7 @@ void main() { ); final stream = queryManager.addQuery(ref); - //expect(queryManager.trackedQueries['testQuery'], isNotNull); - expect(queryManager.trackedQueries['testQuery::varsAsStr'], isNotNull); + expect(queryManager.trackedQueries.values.contains(ref), isTrue); expect(stream, isA()); }); }); From 1477a082af7c2bd9cb931de0cd272b2d15b35dd0 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 20:22:06 -0700 Subject: [PATCH 04/13] Cynthia feedback (final, ...) --- .../lib/src/common/dataconnect_error.dart | 2 +- .../firebase_data_connect/lib/src/core/ref.dart | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart index 309b0974d94b..32641baafa85 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart @@ -36,7 +36,7 @@ class DataConnectError extends FirebaseException { /// Error thrown when an operation is partially successful. class DataConnectOperationError extends DataConnectError { - DataConnectOperationError(super.code, String super.message, this.response); + DataConnectOperationError(super.code, super.message, this.response); final DataConnectOperationFailureResponse response; } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index a65f85160735..3e05859c53f3 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -1,4 +1,4 @@ -// Copyright 2026 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -51,14 +51,14 @@ abstract class OperationRef { this.serializer, this.variables, ); - Variables? variables; - String operationName; + final Variables? variables; + final String operationName; final DataConnectTransport _transport; - Deserializer deserializer; - Serializer serializer; + final Deserializer deserializer; + final Serializer serializer; String? _lastToken; - FirebaseDataConnect dataConnect; + final FirebaseDataConnect dataConnect; static dynamic _sortKeys(dynamic value) { if (value is Map) { From d2c86a70d62256aaceee855e2d19a3a1ea7f601c Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 22:05:02 -0700 Subject: [PATCH 05/13] Gemini feedback: checkTransport single instancing. --- .../lib/src/cache/cache.dart | 2 +- .../lib/src/firebase_data_connect.dart | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart index 3983df6c5842..6b26001a8ac3 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart @@ -51,7 +51,7 @@ class Cache { String _constructCacheIdentifier() { final rawPrefix = - '${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport.transportOptions.host}'; + '${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport?.transportOptions.host}'; final prefixSha = convertToSha256(rawPrefix); final rawSuffix = dataConnect.auth?.currentUser?.uid ?? 'anon'; final suffixSha = convertToSha256(rawSuffix); diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart index ea1a7738a352..4820a4effee1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart @@ -65,9 +65,9 @@ class FirebaseDataConnect extends FirebasePluginPlatform { /// FirebaseAppCheck FirebaseAppCheck? appCheck; - /// Due to compatibility issues with grpc-web, we swap out the transport based on what platform the user is using. - /// For web, we use RestTransport. For mobile, we use GRPCTransport. - late DataConnectTransport transport; + /// Transport for connecting to the Data Connect service. + /// Routes between RestTransport and WebSocketTransport based on subscription status + DataConnectTransport? transport; /// FirebaseAuth FirebaseAuth? auth; @@ -89,6 +89,9 @@ class FirebaseDataConnect extends FirebasePluginPlatform { /// Checks whether the transport has been properly initialized. @visibleForTesting void checkTransport() { + if (transport != null) { + return; + } transportOptions ??= TransportOptions('firebasedataconnect.googleapis.com', null, true); final rest = RestTransport( @@ -137,7 +140,7 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return QueryRef( this, operationName, - transport, + transport!, dataDeserializer, _queryManager, varsSerializer, @@ -154,10 +157,12 @@ class FirebaseDataConnect extends FirebasePluginPlatform { Variables? vars, ) { checkTransport(); + //initialize cache since mutations on a stream could result in subscribed query updates + checkAndInitializeCache(); return MutationRef( this, operationName, - transport, + transport!, dataDeserializer, varsSerializer, vars, @@ -174,11 +179,12 @@ class FirebaseDataConnect extends FirebasePluginPlatform { String mappedHost = automaticHostMapping ? getMappedHost(host) : host; transportOptions = TransportOptions(mappedHost, port, isSecure); - if (cacheManager != null) { - // dispose and clean this up. it will get reinitialized for newer QueryRefs that target the emulator. - cacheManager?.dispose(); - cacheManager = null; - } + // dispose and clean this up. it will get reinitialized for newer QueryRefs that target the emulator. + cacheManager?.dispose(); + cacheManager = null; + + // transport will get reinitialized for newer QueryRefs that target the emulator. + transport = null; } /// Currently cached DataConnect instances. Maps from app name to ConnectorConfigStr, DataConnect. @@ -206,16 +212,13 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return cachedInstances[app.name]![connectorConfig.toJson()]!; } - //TODO remove after testing since CS should be null by default - final resolvedCacheSettings = cacheSettings ?? CacheSettings(); - FirebaseDataConnect newInstance = FirebaseDataConnect( app: app, auth: auth, appCheck: appCheck, connectorConfig: connectorConfig, sdkType: sdkType, - cacheSettings: resolvedCacheSettings, + cacheSettings: cacheSettings, ); if (cachedInstances[app.name] == null) { cachedInstances[app.name] = {}; From 9c9602781b8d693accac0dfa4cede1f0b02b38e6 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 22:09:50 -0700 Subject: [PATCH 06/13] Gemini feedback: cancel and nullify authSubscription in disconnect --- .../lib/src/network/websocket_transport.dart | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index 76aac91955fc..b6d77f3cc4c1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -429,6 +429,8 @@ class WebSocketTransport implements DataConnectTransport { _reconnectTimer = null; _channel?.sink.close(); _channel = null; + _authSubscription?.cancel(); + _authSubscription = null; } void disconnect() { From b1a96a14ec62e8667c72546a99f54a5b3d7940b4 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 22:23:19 -0700 Subject: [PATCH 07/13] Reverting authSubscription nullifying This affects reconnects. Taking tech debt instead. --- .../lib/src/network/websocket_transport.dart | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index b6d77f3cc4c1..76aac91955fc 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -429,8 +429,6 @@ class WebSocketTransport implements DataConnectTransport { _reconnectTimer = null; _channel?.sink.close(); _channel = null; - _authSubscription?.cancel(); - _authSubscription = null; } void disconnect() { From 09567ce36a62885638050020dd7fda60170ae6f3 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 22:40:15 -0700 Subject: [PATCH 08/13] Handle low priority gemini feedback --- .../lib/src/network/websocket_transport.dart | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index 76aac91955fc..6c1349c8b358 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -75,7 +75,7 @@ class WebSocketTransport implements DataConnectTransport { final token = await user?.getIdToken(); final request = StreamRequest( requestId: _generateRequestId('auth'), - authToken: token, + headers: _buildHeaders(token, null), ); _channel?.sink.add(jsonEncode(request.toJson())); } catch (_) { @@ -241,7 +241,7 @@ class WebSocketTransport implements DataConnectTransport { } if (_unaryListeners.containsKey(requestId)) { - final pendings = _unaryListeners.remove(requestId)!; + final pendings = _unaryListeners.remove(requestId) ?? []; for (final p in pendings) { if (!p.completer.isCompleted) { p.completer.complete(serverResponse); @@ -251,7 +251,7 @@ class WebSocketTransport implements DataConnectTransport { } if (_streamListeners.containsKey(requestId)) { - final controllers = _streamListeners[requestId]!; + final controllers = _streamListeners[requestId] ?? []; if (response.cancelled == true) { for (final controller in controllers) { controller.close(); @@ -351,8 +351,6 @@ class WebSocketTransport implements DataConnectTransport { if (reqId == null) continue; final headers = _buildHeaders(authToken, appCheckToken); final request = StreamRequest( - authToken: authToken, - appCheckToken: headers['X-Firebase-AppCheck'], requestId: reqId, requestKind: RequestKind.subscribe, subscribe: ExecuteRequest(sub.queryName, sub.variables), @@ -376,8 +374,6 @@ class WebSocketTransport implements DataConnectTransport { kept.add(p); final headers = _buildHeaders(authToken, appCheckToken); final request = StreamRequest( - authToken: authToken, - appCheckToken: headers['X-Firebase-AppCheck'], requestId: reqId, requestKind: RequestKind.execute, execute: ExecuteRequest(p.operationName, p.variables), @@ -504,8 +500,6 @@ class WebSocketTransport implements DataConnectTransport { final headers = _buildHeaders(authToken, appCheckToken); final request = StreamRequest( - authToken: authToken, - appCheckToken: appCheckToken, requestId: existingRequestId, requestKind: RequestKind.resume, resume: ResumeRequest(), @@ -536,8 +530,6 @@ class WebSocketTransport implements DataConnectTransport { final headers = _buildHeaders(authToken, appCheckToken); final request = StreamRequest( - authToken: authToken, - appCheckToken: appCheckToken, requestId: requestId, requestKind: requestKind, execute: ExecuteRequest(operationName, variables), @@ -608,8 +600,6 @@ class WebSocketTransport implements DataConnectTransport { final headers = _buildHeaders(authToken, appCheckToken); final request = StreamRequest( - authToken: authToken, - appCheckToken: appCheckToken, requestId: requestId, requestKind: RequestKind.subscribe, subscribe: ExecuteRequest(queryName, variables), From 112c07a9cdc41322a1545acf907413f7d4cc8cae Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 23:44:24 -0700 Subject: [PATCH 09/13] Gemini feedback: optimized operation id creation initialized as a lazy var that is used everywhere instead of computing always --- .../lib/src/common/common_library.dart | 3 +++ .../firebase_data_connect/lib/src/core/ref.dart | 6 ++++++ .../lib/src/firebase_data_connect.dart | 14 +++++++++----- .../lib/src/network/grpc_transport.dart | 3 +++ .../lib/src/network/rest_transport.dart | 3 +++ .../lib/src/network/transport_library.dart | 1 - .../lib/src/network/transport_stub.dart | 3 +++ .../lib/src/network/websocket_transport.dart | 16 ++++++++-------- 8 files changed, 35 insertions(+), 14 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart index 5f523644870c..df7b405ab788 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart @@ -102,6 +102,7 @@ abstract class DataConnectTransport { /// Invokes corresponding query endpoint. Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer serializer, @@ -111,6 +112,7 @@ abstract class DataConnectTransport { /// Invokes corresponding mutation endpoint. Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer serializer, @@ -120,6 +122,7 @@ abstract class DataConnectTransport { /// Invokes corresponding stream query endpoint. Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 3e05859c53f3..66d761a5cde1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -60,6 +60,9 @@ abstract class OperationRef { final FirebaseDataConnect dataConnect; + late final String operationId = + createOperationId(operationName, variables, serializer); + static dynamic _sortKeys(dynamic value) { if (value is Map) { final sortedMap = {}; @@ -307,6 +310,7 @@ class QueryRef extends OperationRef { try { ServerResponse serverResponse = await _transport.invokeQuery( + operationId, operationName, deserializer, serializer, @@ -375,6 +379,7 @@ class QueryRef extends OperationRef { log("QueryRef $_queryId _streamFromServer loop started."); try { _serverStream = _transport.invokeStreamQuery( + operationId, operationName, deserializer, serializer, @@ -474,6 +479,7 @@ class MutationRef extends OperationRef { ) async { ServerResponse serverResponse = await _transport.invokeMutation( + operationId, operationName, deserializer, serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart index 4820a4effee1..ec731343a66c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart @@ -276,6 +276,7 @@ class _RoutingTransport implements DataConnectTransport { @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -284,14 +285,15 @@ class _RoutingTransport implements DataConnectTransport { ) { if (websocket.isConnected) { return websocket.invokeMutation( - queryName, deserializer, serializer, vars, token); + operationId, queryName, deserializer, serializer, vars, token); } return rest.invokeMutation( - queryName, deserializer, serializer, vars, token); + operationId, queryName, deserializer, serializer, vars, token); } @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serialize, @@ -300,13 +302,15 @@ class _RoutingTransport implements DataConnectTransport { ) { if (websocket.isConnected) { return websocket.invokeQuery( - queryName, deserializer, serialize, vars, token); + operationId, queryName, deserializer, serialize, vars, token); } - return rest.invokeQuery(queryName, deserializer, serialize, vars, token); + return rest.invokeQuery( + operationId, queryName, deserializer, serialize, vars, token); } @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -314,6 +318,6 @@ class _RoutingTransport implements DataConnectTransport { String? token, ) { return websocket.invokeStreamQuery( - queryName, deserializer, serializer, vars, token); + operationId, queryName, deserializer, serializer, vars, token); } } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart index 298fb25cfb4e..4fd12e6922c7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart @@ -95,6 +95,7 @@ class GRPCTransport implements DataConnectTransport { /// Invokes GPRC query endpoint. @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -139,6 +140,7 @@ class GRPCTransport implements DataConnectTransport { /// Invokes GPRC mutation endpoint. @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -173,6 +175,7 @@ class GRPCTransport implements DataConnectTransport { /// Invokes stream query using WebSockets (even for GRPC clients we fall back to WebSockets for streaming right now). @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart index ec8b87103aea..4480096e49a3 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart @@ -150,6 +150,7 @@ class RestTransport implements DataConnectTransport { /// Invokes query REST endpoint. @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -169,6 +170,7 @@ class RestTransport implements DataConnectTransport { /// Invokes mutation REST endpoint. @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -188,6 +190,7 @@ class RestTransport implements DataConnectTransport { /// WebSockets are now handled by WebSocketTransport in FirebaseDataConnect. @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart index 3b851c62989b..58a02dd11d29 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart @@ -21,7 +21,6 @@ import 'package:firebase_auth/firebase_auth.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../common/common_library.dart'; -import '../core/ref.dart'; import '../dataconnect_version.dart'; import 'stream_protocol.dart'; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart index a0083e0253d6..3b9badd74928 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart @@ -50,6 +50,7 @@ class TransportStub implements DataConnectTransport { /// Stub for invoking a mutation. @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -63,6 +64,7 @@ class TransportStub implements DataConnectTransport { /// Stub for subscribing to a query. @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -76,6 +78,7 @@ class TransportStub implements DataConnectTransport { /// Stub for invoking a query. @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serialize, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index 6c1349c8b358..bfb49aacd41d 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -444,29 +444,32 @@ class WebSocketTransport implements DataConnectTransport { @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, Variables? vars, String? authToken, ) async { - return _invokeUnary(queryName, deserializer, serializer, vars, authToken, - RequestKind.execute, false); + return _invokeUnary(operationId, queryName, deserializer, serializer, vars, + authToken, RequestKind.execute, false); } @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, Variables? vars, String? authToken, ) async { - return _invokeUnary(queryName, deserializer, serializer, vars, authToken, - RequestKind.execute, true); + return _invokeUnary(operationId, queryName, deserializer, serializer, vars, + authToken, RequestKind.execute, true); } Future _invokeUnary( + String operationId, String operationName, Deserializer deserializer, Serializer? serializer, @@ -477,8 +480,6 @@ class WebSocketTransport implements DataConnectTransport { ) async { await _ensureConnected(authToken); - final operationId = - OperationRef.createOperationId(operationName, vars, serializer); final completer = Completer(); if (_activeSubscriptions.containsKey(operationId)) { @@ -543,6 +544,7 @@ class WebSocketTransport implements DataConnectTransport { @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -550,8 +552,6 @@ class WebSocketTransport implements DataConnectTransport { String? authToken, ) { late StreamController controller; - final operationId = - OperationRef.createOperationId(queryName, vars, serializer); controller = StreamController( onListen: () async { From dab3ef721e4402bce126af413f5775c29b312ec8 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Mon, 6 Apr 2026 23:53:52 -0700 Subject: [PATCH 10/13] Remove _queryId and use the computed operationId --- .../lib/src/core/ref.dart | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 66d761a5cde1..44cbf9ca45c1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -209,7 +209,7 @@ class QueryManager { StreamController> addQuery( QueryRef ref, ) { - final queryId = ref._queryId; + final queryId = ref.operationId; trackedQueries[queryId] = ref; final streamController = @@ -269,9 +269,6 @@ class QueryRef extends OperationRef { } } - String get _queryId => - OperationRef.createOperationId(operationName, variables, serializer); - Future> _executeFromCache( QueryFetchPolicy fetchPolicy) async { if (dataConnect.cacheManager == null) { @@ -281,7 +278,7 @@ class QueryRef extends OperationRef { final cacheManager = dataConnect.cacheManager!; bool allowStale = fetchPolicy == QueryFetchPolicy.cacheOnly; //if its cache only, we always allow stale - final cachedData = await cacheManager.resultTree(_queryId, allowStale); + final cachedData = await cacheManager.resultTree(operationId, allowStale); if (cachedData != null) { try { @@ -319,7 +316,7 @@ class QueryRef extends OperationRef { ); if (dataConnect.cacheManager != null) { - await dataConnect.cacheManager!.update(_queryId, serverResponse); + await dataConnect.cacheManager!.update(operationId, serverResponse); } Data typedData = _convertBodyJsonToData(serverResponse.data); @@ -345,7 +342,7 @@ class QueryRef extends OperationRef { _serverStreamSubscription?.cancel(); _serverStreamSubscription = null; _serverStream = null; - log("QueryRef $_queryId: All subscribers cancelled. Unsubscribed from server stream."); + log("QueryRef $operationId: All subscribers cancelled. Unsubscribed from server stream."); } Stream> subscribe() { @@ -376,7 +373,7 @@ class QueryRef extends OperationRef { void _streamFromServer() async { bool shouldRetry = await _shouldRetry(); - log("QueryRef $_queryId _streamFromServer loop started."); + log("QueryRef $operationId _streamFromServer loop started."); try { _serverStream = _transport.invokeStreamQuery( operationId, @@ -389,12 +386,13 @@ class QueryRef extends OperationRef { _serverStreamSubscription = _serverStream!.listen( (serverResponse) async { - log("QueryRef $_queryId _streamFromServer loop received snapshot."); + log("QueryRef $operationId _streamFromServer loop received snapshot."); if (dataConnect.cacheManager != null) { try { - await dataConnect.cacheManager!.update(_queryId, serverResponse); + await dataConnect.cacheManager! + .update(operationId, serverResponse); } catch (e) { - log("QueryRef $_queryId _streamFromServer loop cache update failed: $e"); + log("QueryRef $operationId _streamFromServer loop cache update failed: $e"); } } Data typedData = _convertBodyJsonToData(serverResponse.data); @@ -426,7 +424,7 @@ class QueryRef extends OperationRef { _serverStreamSubscription?.cancel(); _serverStreamSubscription = null; _serverStream = null; - log("QueryRef $_queryId _streamFromServer loop Unknown loop failure: $e"); + log("QueryRef $operationId _streamFromServer loop Unknown loop failure: $e"); publishErrorToStream(e); } } @@ -435,7 +433,7 @@ class QueryRef extends OperationRef { if (_streamController != null) { _streamController?.add(result); } else { - log("QueryRef $_queryId _streamFromServer loop _streamController is null"); + log("QueryRef $operationId _streamFromServer loop _streamController is null"); } } From 22444de0ef30c26124b307cb91705ff27536c305 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Tue, 7 Apr 2026 00:17:31 -0700 Subject: [PATCH 11/13] Cynthia feedback on sending empty message to server --- .../lib/src/network/transport_stub.dart | 2 +- .../lib/src/network/websocket_transport.dart | 41 ++++++++++--------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart index 3b9badd74928..87b8445f3abe 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart @@ -1,4 +1,4 @@ -// Copyright 2026 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart index bfb49aacd41d..0fb62e5fde33 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -77,7 +77,7 @@ class WebSocketTransport implements DataConnectTransport { requestId: _generateRequestId('auth'), headers: _buildHeaders(token, null), ); - _channel?.sink.add(jsonEncode(request.toJson())); + _send(request.toJson()); } catch (_) { // Ignored } @@ -146,6 +146,15 @@ class WebSocketTransport implements DataConnectTransport { return '${operationName}_$randStr'; } + void _send(Map json) { + if (_channel == null) return; + final encoded = jsonEncode(json); + if (encoded.isNotEmpty) { + developer.log("Sending stream message \n $encoded"); + _channel!.sink.add(encoded); + } + } + bool get isConnected => _channel != null; Map _buildHeaders(String? authToken, String? appCheckToken) { @@ -208,7 +217,7 @@ class WebSocketTransport implements DataConnectTransport { 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}', headers: headers, ); - _channel?.sink.add(jsonEncode(initRequest.toJson())); + _send(initRequest.toJson()); } // called when a message is received from the stream @@ -356,7 +365,7 @@ class WebSocketTransport implements DataConnectTransport { subscribe: ExecuteRequest(sub.queryName, sub.variables), headers: headers, ); - _channel?.sink.add(jsonEncode(request.toJson())); + _send(request.toJson()); } } @@ -379,7 +388,7 @@ class WebSocketTransport implements DataConnectTransport { execute: ExecuteRequest(p.operationName, p.variables), headers: headers, ); - _channel?.sink.add(jsonEncode(request.toJson())); + _send(request.toJson()); } } if (kept.isNotEmpty) { @@ -506,7 +515,7 @@ class WebSocketTransport implements DataConnectTransport { resume: ResumeRequest(), headers: headers, ); - _channel?.sink.add(jsonEncode(request.toJson())); + _send(request.toJson()); return completer.future; } @@ -537,7 +546,7 @@ class WebSocketTransport implements DataConnectTransport { headers: headers, ); - _channel?.sink.add(jsonEncode(request.toJson())); + _send(request.toJson()); return completer.future; } @@ -606,11 +615,7 @@ class WebSocketTransport implements DataConnectTransport { headers: headers, ); - if (_channel != null) { - final encodedMessage = jsonEncode(request.toJson()); - developer.log('Sending subscribe message $encodedMessage'); - _channel?.sink.add(encodedMessage); - } + _send(request.toJson()); }, onCancel: () { if (!_activeSubscriptions.containsKey(operationId)) return; @@ -624,14 +629,12 @@ class WebSocketTransport implements DataConnectTransport { _activeSubscriptions.remove(operationId); _pendingSubscriptions.remove(requestId); - if (_channel != null) { - final cancelReq = StreamRequest( - requestId: requestId, - requestKind: RequestKind.cancel, - cancel: true, - ); - _channel?.sink.add(jsonEncode(cancelReq.toJson())); - } + final cancelReq = StreamRequest( + requestId: requestId, + requestKind: RequestKind.cancel, + cancel: true, + ); + _send(cancelReq.toJson()); _checkIdleAndDisconnect(); } } From f13a848d98415c8fa8892370afcc97f66835fa07 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Tue, 7 Apr 2026 00:38:15 -0700 Subject: [PATCH 12/13] Remove grpc transport since its no longer used. --- .../lib/src/network/grpc_library.dart | 27 -- .../lib/src/network/grpc_transport.dart | 260 ------------------ .../lib/src/network/transport_library.dart | 2 +- 3 files changed, 1 insertion(+), 288 deletions(-) delete mode 100644 packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart delete mode 100644 packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart deleted file mode 100644 index d46b30815133..000000000000 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import 'dart:convert'; -import 'dart:developer'; - -import 'package:firebase_app_check/firebase_app_check.dart'; -import 'package:firebase_data_connect/src/generated/graphql_error.pb.dart'; -import 'package:grpc/grpc.dart'; - -import '../common/common_library.dart'; -import '../dataconnect_version.dart'; -import '../generated/connector_service.pbgrpc.dart'; -import '../generated/google/protobuf/struct.pb.dart'; - -part 'grpc_transport.dart'; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart deleted file mode 100644 index 4fd12e6922c7..000000000000 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart +++ /dev/null @@ -1,260 +0,0 @@ -// ignore_for_file: deprecated_member_use_from_same_package -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -part of 'grpc_library.dart'; - -@Deprecated( - 'Use RestTransport and WebSocketTransport instead. The Data Connect SDK has moved away from gRPC.') -class GRPCTransport implements DataConnectTransport { - /// GRPCTransport creates a new channel - GRPCTransport( - this.transportOptions, - this.options, - this.appId, - this.sdkType, - this.appCheck, - ) { - bool isSecure = transportOptions.isSecure ?? true; - channel = ClientChannel( - transportOptions.host, - port: transportOptions.port ?? 443, - options: ChannelOptions( - credentials: (isSecure - ? const ChannelCredentials.secure() - : const ChannelCredentials.insecure()), - ), - ); - stub = ConnectorServiceClient(channel); - name = - 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}'; - } - - /// FirebaseAppCheck - @override - FirebaseAppCheck? appCheck; - - @override - CallerSDKType sdkType; - - /// Name of the endpoint. - late String name; - - /// ConnectorServiceClient used to execute the query/mutation. - late ConnectorServiceClient stub; - - /// ClientChannel used to configure connection to the GRPC server. - late ClientChannel channel; - - /// Current host configuration. - @override - TransportOptions transportOptions; - - /// Data Connect backend configuration options. - @override - DataConnectOptions options; - - /// Application ID - @override - String appId; - - Future> getMetadata(String? authToken) async { - String? appCheckToken; - try { - appCheckToken = await appCheck?.getToken(); - } catch (e) { - log('Unable to get app check token: $e'); - } - Map metadata = { - 'x-goog-request-params': 'location=${options.location}&frontend=data', - 'x-goog-api-client': getGoogApiVal(sdkType, packageVersion), - 'x-firebase-client': getFirebaseClientVal(packageVersion) - }; - - if (authToken != null) { - metadata['x-firebase-auth-token'] = authToken; - } - if (appCheckToken != null) { - metadata['X-Firebase-AppCheck'] = appCheckToken; - } - metadata['x-firebase-gmpid'] = appId; - return metadata; - } - - /// Invokes GPRC query endpoint. - @override - Future invokeQuery( - String operationId, - String queryName, - Deserializer deserializer, - Serializer? serializer, - Variables? vars, - String? authToken, - ) async { - ExecuteQueryResponse response; - - ExecuteQueryRequest request = - ExecuteQueryRequest(name: name, operationName: queryName); - if (vars != null && serializer != null) { - request.variables = getStruct(vars, serializer); - } - try { - response = await stub.executeQuery( - request, - options: CallOptions(metadata: await getMetadata(authToken)), - ); - return handleResponse( - CommonResponse.fromExecuteQuery(deserializer, response)); - } on Exception catch (e) { - if (e.toString().contains('invalid Firebase Auth Credentials')) { - throw DataConnectError( - DataConnectErrorCode.unauthorized, - 'Failed to invoke operation: $e', - ); - } - rethrow; - } - } - - /// Converts the variables into a proto Struct. - Struct getStruct( - Variables vars, - Serializer serializer, - ) { - Struct struct = Struct.create(); - struct.mergeFromProto3Json(jsonDecode(serializer(vars))); - return struct; - } - - /// Invokes GPRC mutation endpoint. - @override - Future invokeMutation( - String operationId, - String queryName, - Deserializer deserializer, - Serializer? serializer, - Variables? vars, - String? authToken, - ) async { - ExecuteMutationResponse response; - ExecuteMutationRequest request = - ExecuteMutationRequest(name: name, operationName: queryName); - if (vars != null && serializer != null) { - request.variables = getStruct(vars, serializer); - } - - try { - response = await stub.executeMutation( - request, - options: CallOptions(metadata: await getMetadata(authToken)), - ); - return handleResponse( - CommonResponse.fromExecuteMutation(deserializer, response)); - } on Exception catch (e) { - if (e.toString().contains('invalid Firebase Auth Credentials')) { - throw DataConnectError( - DataConnectErrorCode.unauthorized, - 'Failed to invoke operation: $e', - ); - } - rethrow; - } - } - - /// Invokes stream query using WebSockets (even for GRPC clients we fall back to WebSockets for streaming right now). - @override - Stream invokeStreamQuery( - String operationId, - String queryName, - Deserializer deserializer, - Serializer? serializer, - Variables? vars, - String? token, - ) { - throw UnsupportedError( - 'Streaming should be routed through WebSocketTransport'); - } -} - -ServerResponse handleResponse(CommonResponse commonResponse) { - Map? jsond = commonResponse.data as Map?; - String jsonEncoded = jsonEncode(commonResponse.data); - - Map? jsonExt = - commonResponse.extensions as Map?; - - if (commonResponse.errors.isNotEmpty) { - Map? data = - jsonDecode(jsonEncoded) as Map?; - Data? decodedData; - List errors = commonResponse - .errors - .map((e) => DataConnectOperationFailureResponseErrorInfo( - e.path.values - .map((val) => val.hasStringValue() - ? DataConnectFieldPathSegment(val.stringValue) - : DataConnectListIndexPathSegment(val.numberValue.toInt())) - .toList(), - e.message)) - .toList(); - if (data != null) { - try { - decodedData = commonResponse.deserializer(jsonEncoded); - } catch (e) { - // nothing required - } - } - final response = - DataConnectOperationFailureResponse(errors, data, decodedData); - throw DataConnectOperationError(DataConnectErrorCode.other, - 'failed to invoke operation: ${response.errors}', response); - } - - // no errors - return a standard response - if (jsond != null) { - return ServerResponse(jsond, extensions: jsonExt); - } else { - return ServerResponse({}); - } -} - -/// Initializes GRPC transport for Data Connect. -DataConnectTransport getTransport( - TransportOptions transportOptions, - DataConnectOptions options, - String appId, - CallerSDKType sdkType, - FirebaseAppCheck? appCheck, -) => - GRPCTransport(transportOptions, options, appId, sdkType, appCheck); - -class CommonResponse { - CommonResponse(this.deserializer, this.data, this.errors, this.extensions); - static CommonResponse fromExecuteMutation( - Deserializer deserializer, ExecuteMutationResponse response) { - return CommonResponse( - deserializer, response.data.toProto3Json(), response.errors, null); - } - - static CommonResponse fromExecuteQuery( - Deserializer deserializer, ExecuteQueryResponse response) { - return CommonResponse(deserializer, response.data.toProto3Json(), - response.errors, response.extensions.toProto3Json()); - } - - final Deserializer deserializer; - final Object? data; - final List errors; - final Object? extensions; -} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart index 58a02dd11d29..44e0391dd6c7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart @@ -1,4 +1,4 @@ -// Copyright 2026 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 9008a270510592bf5370bdb615ef8fc71f5b4802 Mon Sep 17 00:00:00 2001 From: Aashish Patil Date: Tue, 7 Apr 2026 00:44:50 -0700 Subject: [PATCH 13/13] Update tests to incorporate operationId --- .../test/src/common/common_library_test.dart | 7 +++++++ .../test/src/network/rest_transport_test.dart | 4 +++- .../test/src/network/transport_stub_test.dart | 2 ++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart index 2d526f0ef8c0..a3798daf98f7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart @@ -103,8 +103,10 @@ void main() { test('should handle invokeQuery with proper deserializer', () async { const queryName = 'testQuery'; + const queryId = 'testQueryId'; deserializer(json) => json; final result = await transport.invokeQuery( + queryId, queryName, deserializer, emptySerializer, @@ -117,8 +119,10 @@ void main() { test('should handle invokeMutation with proper deserializer', () async { const queryName = 'testMutation'; + const operationId = 'testMutationId'; deserializer(json) => json; final result = await transport.invokeMutation( + operationId, queryName, deserializer, emptySerializer, @@ -145,6 +149,7 @@ class TestDataConnectTransport extends DataConnectTransport { @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -157,6 +162,7 @@ class TestDataConnectTransport extends DataConnectTransport { @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -169,6 +175,7 @@ class TestDataConnectTransport extends DataConnectTransport { @override Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart index d5325e89faab..d3dbc2902d12 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart @@ -152,7 +152,8 @@ void main() { String deserializer(String data) => 'Deserialized Data'; - await transport.invokeQuery('testQuery', deserializer, null, null, null); + await transport.invokeQuery( + 'testQueryId', 'testQuery', deserializer, null, null, null); verify( mockHttpClient.post( @@ -181,6 +182,7 @@ void main() { String deserializer(String data) => 'Deserialized Mutation Data'; await transport.invokeMutation( + 'testMutationId', 'testMutation', deserializer, null, diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart index d8c6a865d316..daeffedd48cc 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart @@ -65,6 +65,7 @@ void main() { expect( () async => transportStub.invokeMutation( + 'operationId', 'queryName', (json) => json, null, @@ -86,6 +87,7 @@ void main() { expect( () async => transportStub.invokeQuery( + 'operationId', 'queryName', (json) => json, null,