diff --git a/packages/firebase_data_connect/firebase_data_connect/.metadata b/packages/firebase_data_connect/firebase_data_connect/.metadata new file mode 100644 index 000000000000..fca9f99c599e --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/.metadata @@ -0,0 +1,30 @@ +# This file tracks properties of this Flutter project. +# Used by Flutter tool to assess capabilities and perform upgrades etc. +# +# This file should be version controlled and should not be manually edited. + +version: + revision: "adc901062556672b4138e18a4dc62a4be8f4b3c2" + channel: "stable" + +project_type: app + +# Tracks metadata for the flutter migrate command +migration: + platforms: + - platform: root + create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + - platform: macos + create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2 + + # User provided section + + # List of Local paths (relative to this file) that should be + # ignored by the migrate tool. + # + # Files that are not part of the templates will be ignored by default. + unmanaged_files: + - 'lib/main.dart' + - 'ios/Runner.xcodeproj/project.pbxproj' diff --git a/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml b/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml new file mode 100644 index 000000000000..f9b303465f19 --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/analysis_options.yaml @@ -0,0 +1 @@ +include: package:flutter_lints/flutter.yaml diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart index 3983df6c5842..6b26001a8ac3 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart @@ -51,7 +51,7 @@ class Cache { String _constructCacheIdentifier() { final rawPrefix = - '${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport.transportOptions.host}'; + '${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport?.transportOptions.host}'; final prefixSha = convertToSha256(rawPrefix); final rawSuffix = dataConnect.auth?.currentUser?.uid ?? 'anon'; final suffixSha = convertToSha256(rawSuffix); diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart index 0768da15232c..9b991a946a79 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart @@ -332,11 +332,11 @@ class EntityNode { srcListMap.forEach((key, value) { List enodeList = []; List jsonList = value as List; - jsonList.forEach((jsonObj) { + for (var jsonObj in jsonList) { Map jmap = jsonObj as Map; EntityNode en = EntityNode.fromJson(jmap, cacheProvider); enodeList.add(en); - }); + } objLists?[key] = enodeList; }); } @@ -367,9 +367,9 @@ class EntityNode { if (nestedObjectLists != null) { nestedObjectLists!.forEach((key, edoList) { List> jsonList = []; - edoList.forEach((edo) { + for (var edo in edoList) { jsonList.add(edo.toJson(mode: mode)); - }); + } jsonData[key] = jsonList; }); } @@ -396,9 +396,9 @@ class EntityNode { Map nestedObjectListsJson = {}; nestedObjectLists!.forEach((key, edoList) { List> jsonList = []; - edoList.forEach((edo) { + for (var edo in edoList) { jsonList.add(edo.toJson(mode: mode)); - }); + } nestedObjectListsJson[key] = jsonList; }); jsonData[listsKey] = nestedObjectListsJson; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart index 9247287f5adf..df7b405ab788 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart @@ -102,6 +102,7 @@ abstract class DataConnectTransport { /// Invokes corresponding query endpoint. Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer serializer, @@ -111,6 +112,17 @@ abstract class DataConnectTransport { /// Invokes corresponding mutation endpoint. Future invokeMutation( + String operationId, + String queryName, + Deserializer deserializer, + Serializer serializer, + Variables? vars, + String? token, + ); + + /// Invokes corresponding stream query endpoint. + Stream invokeStreamQuery( + String operationId, String queryName, Deserializer deserializer, Serializer serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart index 43b7fd964418..32641baafa85 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart @@ -36,9 +36,7 @@ class DataConnectError extends FirebaseException { /// Error thrown when an operation is partially successful. class DataConnectOperationError extends DataConnectError { - DataConnectOperationError( - DataConnectErrorCode code, String message, this.response) - : super(code, message); + DataConnectOperationError(super.code, super.message, this.response); final DataConnectOperationFailureResponse response; } diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart index 5b359f6a15b3..44cbf9ca45c1 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart @@ -51,17 +51,51 @@ abstract class OperationRef { this.serializer, this.variables, ); - Variables? variables; - String operationName; - DataConnectTransport _transport; - Deserializer deserializer; - Serializer serializer; + final Variables? variables; + final String operationName; + final DataConnectTransport _transport; + final Deserializer deserializer; + final Serializer serializer; String? _lastToken; - FirebaseDataConnect dataConnect; + final FirebaseDataConnect dataConnect; + + late final String operationId = + createOperationId(operationName, variables, serializer); - Future> execute( - {QueryFetchPolicy fetchPolicy = QueryFetchPolicy.preferCache}); + static dynamic _sortKeys(dynamic value) { + if (value is Map) { + final sortedMap = {}; + final sortedKeys = value.keys.toList()..sort(); + for (final key in sortedKeys) { + sortedMap[key.toString()] = _sortKeys(value[key]); + } + return sortedMap; + } else if (value is List) { + return value.map(_sortKeys).toList(); + } + return value; + } + + static String createOperationId(String operationName, + Variables? vars, Serializer? serializer) { + if (vars != null && serializer != null) { + try { + final decoded = jsonDecode(serializer(vars)); + final sortedStr = jsonEncode(_sortKeys(decoded)); + final hashVars = convertToSha256(sortedStr); + return '$operationName::$hashVars'; + } catch (_) { + final rawVars = serializer(vars); + final hashVars = convertToSha256(rawVars); + return '$operationName::$hashVars'; + } + } else { + return operationName; + } + } + + Future> execute(); Future _shouldRetry() async { String? newToken; @@ -152,7 +186,7 @@ class QueryManager { try { await queryRef.execute(fetchPolicy: QueryFetchPolicy.cacheOnly); } catch (e) { - log('Error executing impacted query $e'); + log('Error executing impacted query $queryId $e'); } } } @@ -175,24 +209,20 @@ class QueryManager { StreamController> addQuery( QueryRef ref, ) { - final queryId = ref._queryId; + final queryId = ref.operationId; trackedQueries[queryId] = ref; final streamController = - StreamController>.broadcast(); + StreamController>.broadcast( + onCancel: () { + trackedQueries.remove(queryId); + ref._onAllSubscribersCancelled(); + }, + ); return streamController; } - static String createQueryId(String queryName, - QueryVariables? vars, Serializer varSerializer) { - if (vars != null) { - return '$queryName::${varSerializer(vars)}'; - } else { - return queryName; - } - } - void dispose() { _impactedQueriesSubscription?.cancel(); } @@ -216,7 +246,7 @@ class QueryRef extends OperationRef { variables, ); - QueryManager _queryManager; + final QueryManager _queryManager; @override Future> execute( @@ -239,9 +269,6 @@ class QueryRef extends OperationRef { } } - String get _queryId => - QueryManager.createQueryId(operationName, variables, serializer); - Future> _executeFromCache( QueryFetchPolicy fetchPolicy) async { if (dataConnect.cacheManager == null) { @@ -251,7 +278,7 @@ class QueryRef extends OperationRef { final cacheManager = dataConnect.cacheManager!; bool allowStale = fetchPolicy == QueryFetchPolicy.cacheOnly; //if its cache only, we always allow stale - final cachedData = await cacheManager.resultTree(_queryId, allowStale); + final cachedData = await cacheManager.resultTree(operationId, allowStale); if (cachedData != null) { try { @@ -280,6 +307,7 @@ class QueryRef extends OperationRef { try { ServerResponse serverResponse = await _transport.invokeQuery( + operationId, operationName, deserializer, serializer, @@ -288,7 +316,7 @@ class QueryRef extends OperationRef { ); if (dataConnect.cacheManager != null) { - await dataConnect.cacheManager!.update(_queryId, serverResponse); + await dataConnect.cacheManager!.update(operationId, serverResponse); } Data typedData = _convertBodyJsonToData(serverResponse.data); @@ -307,22 +335,109 @@ class QueryRef extends OperationRef { } StreamController>? _streamController; + Stream? _serverStream; + StreamSubscription? _serverStreamSubscription; + + void _onAllSubscribersCancelled() { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + log("QueryRef $operationId: All subscribers cancelled. Unsubscribed from server stream."); + } Stream> subscribe() { _streamController ??= _queryManager.addQuery(this); - execute(); + final stream = + _streamController!.stream.cast>(); + + // Return the stream to the caller, then execute fetches + Future.microtask(() async { + if (dataConnect.cacheManager != null) { + try { + await _executeFromCache(QueryFetchPolicy.cacheOnly); + } catch (err) { + log("Error fetching from cache during subscribe $err"); + // Ignore cache misses here, server stream will provide latest data + } + } + + // Initiate Web Socket stream only if not already streaming + if (_serverStream == null) { + _streamFromServer(); + } + }); + + return stream; + } + + void _streamFromServer() async { + bool shouldRetry = await _shouldRetry(); + log("QueryRef $operationId _streamFromServer loop started."); + try { + _serverStream = _transport.invokeStreamQuery( + operationId, + operationName, + deserializer, + serializer, + variables, + _lastToken, + ); - return _streamController!.stream.cast>(); + _serverStreamSubscription = _serverStream!.listen( + (serverResponse) async { + log("QueryRef $operationId _streamFromServer loop received snapshot."); + if (dataConnect.cacheManager != null) { + try { + await dataConnect.cacheManager! + .update(operationId, serverResponse); + } catch (e) { + log("QueryRef $operationId _streamFromServer loop cache update failed: $e"); + } + } + Data typedData = _convertBodyJsonToData(serverResponse.data); + + QueryResult res = + QueryResult(dataConnect, typedData, DataSource.server, this); + publishResultToStream(res); + }, + onError: (e) { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + + if (shouldRetry && + e is DataConnectError && + e.code == DataConnectErrorCode.unauthorized.toString()) { + _streamFromServer(); + } else { + publishErrorToStream(e); + } + }, + onDone: () { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + }, + ); + } catch (e) { + _serverStreamSubscription?.cancel(); + _serverStreamSubscription = null; + _serverStream = null; + log("QueryRef $operationId _streamFromServer loop Unknown loop failure: $e"); + publishErrorToStream(e); + } } void publishResultToStream(QueryResult result) { if (_streamController != null) { _streamController?.add(result); + } else { + log("QueryRef $operationId _streamFromServer loop _streamController is null"); } } - void publishErrorToStream(Error err) { + void publishErrorToStream(Object err) { if (_streamController != null) { _streamController?.addError(err); } @@ -331,24 +446,16 @@ class QueryRef extends OperationRef { class MutationRef extends OperationRef { MutationRef( - FirebaseDataConnect dataConnect, - String operationName, - DataConnectTransport transport, - Deserializer deserializer, - Serializer serializer, - Variables? variables, - ) : super( - dataConnect, - operationName, - transport, - deserializer, - serializer, - variables, - ); + super.dataConnect, + super.operationName, + super.transport, + super.deserializer, + super.serializer, + super.variables, + ); @override - Future> execute( - {QueryFetchPolicy fetchPolicy = QueryFetchPolicy.serverOnly}) async { + Future> execute() async { bool shouldRetry = await _shouldRetry(); try { // Logic below is duplicated due to the fact that `executeOperation` returns @@ -370,6 +477,7 @@ class MutationRef extends OperationRef { ) async { ServerResponse serverResponse = await _transport.invokeMutation( + operationId, operationName, deserializer, serializer, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart index d5813f94dc89..ec731343a66c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/firebase_data_connect.dart @@ -20,10 +20,8 @@ import 'package:firebase_data_connect/src/common/common_library.dart'; import 'package:firebase_data_connect/src/core/ref.dart'; import 'package:flutter/foundation.dart'; -import './network/transport_library.dart' - if (dart.library.io) './network/grpc_library.dart' - if (dart.library.js_interop) './network/rest_library.dart' - if (dart.library.html) './network/rest_library.dart'; +import './network/rest_library.dart'; +import './network/transport_library.dart'; import 'cache/cache_data_types.dart'; import 'cache/cache.dart'; @@ -67,9 +65,9 @@ class FirebaseDataConnect extends FirebasePluginPlatform { /// FirebaseAppCheck FirebaseAppCheck? appCheck; - /// Due to compatibility issues with grpc-web, we swap out the transport based on what platform the user is using. - /// For web, we use RestTransport. For mobile, we use GRPCTransport. - late DataConnectTransport transport; + /// Transport for connecting to the Data Connect service. + /// Routes between RestTransport and WebSocketTransport based on subscription status + DataConnectTransport? transport; /// FirebaseAuth FirebaseAuth? auth; @@ -91,15 +89,27 @@ class FirebaseDataConnect extends FirebasePluginPlatform { /// Checks whether the transport has been properly initialized. @visibleForTesting void checkTransport() { + if (transport != null) { + return; + } transportOptions ??= TransportOptions('firebasedataconnect.googleapis.com', null, true); - transport = getTransport( + final rest = RestTransport( + transportOptions!, + options, + app.options.appId, + _sdkType, + appCheck, + ); + final ws = WebSocketTransport( transportOptions!, options, app.options.appId, _sdkType, appCheck, + auth, ); + transport = _RoutingTransport(rest, ws); } @visibleForTesting @@ -120,7 +130,7 @@ class FirebaseDataConnect extends FirebasePluginPlatform { checkTransport(); checkAndInitializeCache(); String queryId = - QueryManager.createQueryId(operationName, vars, varsSerializer); + OperationRef.createOperationId(operationName, vars, varsSerializer); QueryRef? ref = _queryManager.trackedQueries[queryId] as QueryRef?; @@ -130,7 +140,7 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return QueryRef( this, operationName, - transport, + transport!, dataDeserializer, _queryManager, varsSerializer, @@ -147,10 +157,12 @@ class FirebaseDataConnect extends FirebasePluginPlatform { Variables? vars, ) { checkTransport(); + //initialize cache since mutations on a stream could result in subscribed query updates + checkAndInitializeCache(); return MutationRef( this, operationName, - transport, + transport!, dataDeserializer, varsSerializer, vars, @@ -167,11 +179,12 @@ class FirebaseDataConnect extends FirebasePluginPlatform { String mappedHost = automaticHostMapping ? getMappedHost(host) : host; transportOptions = TransportOptions(mappedHost, port, isSecure); - if (cacheManager != null) { - // dispose and clean this up. it will get reinitialized for newer QueryRefs that target the emulator. - cacheManager?.dispose(); - cacheManager = null; - } + // dispose and clean this up. it will get reinitialized for newer QueryRefs that target the emulator. + cacheManager?.dispose(); + cacheManager = null; + + // transport will get reinitialized for newer QueryRefs that target the emulator. + transport = null; } /// Currently cached DataConnect instances. Maps from app name to ConnectorConfigStr, DataConnect. @@ -199,16 +212,13 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return cachedInstances[app.name]![connectorConfig.toJson()]!; } - //TODO remove after testing since CS should be null by default - final resolvedCacheSettings = cacheSettings ?? CacheSettings(); - FirebaseDataConnect newInstance = FirebaseDataConnect( app: app, auth: auth, appCheck: appCheck, connectorConfig: connectorConfig, sdkType: sdkType, - cacheSettings: resolvedCacheSettings, + cacheSettings: cacheSettings, ); if (cachedInstances[app.name] == null) { cachedInstances[app.name] = {}; @@ -218,3 +228,96 @@ class FirebaseDataConnect extends FirebasePluginPlatform { return newInstance; } } + +class _RoutingTransport implements DataConnectTransport { + _RoutingTransport(this.rest, this.websocket); + final RestTransport rest; + final WebSocketTransport websocket; + + @override + FirebaseAppCheck? get appCheck => rest.appCheck; + @override + set appCheck(FirebaseAppCheck? val) { + rest.appCheck = val; + websocket.appCheck = val; + } + + @override + CallerSDKType get sdkType => rest.sdkType; + @override + set sdkType(CallerSDKType val) { + rest.sdkType = val; + websocket.sdkType = val; + } + + @override + TransportOptions get transportOptions => rest.transportOptions; + @override + set transportOptions(TransportOptions val) { + rest.transportOptions = val; + websocket.transportOptions = val; + } + + @override + DataConnectOptions get options => rest.options; + @override + set options(DataConnectOptions val) { + rest.options = val; + websocket.options = val; + } + + @override + String get appId => rest.appId; + @override + set appId(String val) { + rest.appId = val; + websocket.appId = val; + } + + @override + Future invokeMutation( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + if (websocket.isConnected) { + return websocket.invokeMutation( + operationId, queryName, deserializer, serializer, vars, token); + } + return rest.invokeMutation( + operationId, queryName, deserializer, serializer, vars, token); + } + + @override + Future invokeQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serialize, + Variables? vars, + String? token, + ) { + if (websocket.isConnected) { + return websocket.invokeQuery( + operationId, queryName, deserializer, serialize, vars, token); + } + return rest.invokeQuery( + operationId, queryName, deserializer, serialize, vars, token); + } + + @override + Stream invokeStreamQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + return websocket.invokeStreamQuery( + operationId, queryName, deserializer, serializer, vars, token); + } +} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart index 4bcbcd32a4c2..6c32fb50221c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/duration.pb.dart @@ -1,3 +1,4 @@ +// ignore_for_file: implementation_imports // // Generated code. Do not modify. // source: google/protobuf/duration.proto diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart index 42d55e426602..42164fbc928f 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/generated/google/protobuf/struct.pb.dart @@ -1,3 +1,4 @@ +// ignore_for_file: implementation_imports // // Generated code. Do not modify. // source: google/protobuf/struct.proto diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart deleted file mode 100644 index d46b30815133..000000000000 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_library.dart +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import 'dart:convert'; -import 'dart:developer'; - -import 'package:firebase_app_check/firebase_app_check.dart'; -import 'package:firebase_data_connect/src/generated/graphql_error.pb.dart'; -import 'package:grpc/grpc.dart'; - -import '../common/common_library.dart'; -import '../dataconnect_version.dart'; -import '../generated/connector_service.pbgrpc.dart'; -import '../generated/google/protobuf/struct.pb.dart'; - -part 'grpc_transport.dart'; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart deleted file mode 100644 index 180bb209168b..000000000000 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/grpc_transport.dart +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -part of 'grpc_library.dart'; - -/// Transport used for Android/iOS. Uses a GRPC transport instead of REST. -class GRPCTransport implements DataConnectTransport { - /// GRPCTransport creates a new channel - GRPCTransport( - this.transportOptions, - this.options, - this.appId, - this.sdkType, - this.appCheck, - ) { - bool isSecure = transportOptions.isSecure ?? true; - channel = ClientChannel( - transportOptions.host, - port: transportOptions.port ?? 443, - options: ChannelOptions( - credentials: (isSecure - ? const ChannelCredentials.secure() - : const ChannelCredentials.insecure()), - ), - ); - stub = ConnectorServiceClient(channel); - name = - 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}'; - } - - /// FirebaseAppCheck - @override - FirebaseAppCheck? appCheck; - - @override - CallerSDKType sdkType; - - /// Name of the endpoint. - late String name; - - /// ConnectorServiceClient used to execute the query/mutation. - late ConnectorServiceClient stub; - - /// ClientChannel used to configure connection to the GRPC server. - late ClientChannel channel; - - /// Current host configuration. - @override - TransportOptions transportOptions; - - /// Data Connect backend configuration options. - @override - DataConnectOptions options; - - /// Application ID - @override - String appId; - - Future> getMetadata(String? authToken) async { - String? appCheckToken; - try { - appCheckToken = await appCheck?.getToken(); - } catch (e) { - log('Unable to get app check token: $e'); - } - Map metadata = { - 'x-goog-request-params': 'location=${options.location}&frontend=data', - 'x-goog-api-client': getGoogApiVal(sdkType, packageVersion), - 'x-firebase-client': getFirebaseClientVal(packageVersion) - }; - - if (authToken != null) { - metadata['x-firebase-auth-token'] = authToken; - } - if (appCheckToken != null) { - metadata['X-Firebase-AppCheck'] = appCheckToken; - } - metadata['x-firebase-gmpid'] = appId; - return metadata; - } - - /// Invokes GPRC query endpoint. - @override - Future invokeQuery( - String queryName, - Deserializer deserializer, - Serializer? serializer, - Variables? vars, - String? authToken, - ) async { - ExecuteQueryResponse response; - - ExecuteQueryRequest request = - ExecuteQueryRequest(name: name, operationName: queryName); - if (vars != null && serializer != null) { - request.variables = getStruct(vars, serializer); - } - try { - response = await stub.executeQuery( - request, - options: CallOptions(metadata: await getMetadata(authToken)), - ); - return handleResponse( - CommonResponse.fromExecuteQuery(deserializer, response)); - } on Exception catch (e) { - if (e.toString().contains('invalid Firebase Auth Credentials')) { - throw DataConnectError( - DataConnectErrorCode.unauthorized, - 'Failed to invoke operation: $e', - ); - } - rethrow; - } - } - - /// Converts the variables into a proto Struct. - Struct getStruct( - Variables vars, - Serializer serializer, - ) { - Struct struct = Struct.create(); - struct.mergeFromProto3Json(jsonDecode(serializer(vars))); - return struct; - } - - /// Invokes GPRC mutation endpoint. - @override - Future invokeMutation( - String queryName, - Deserializer deserializer, - Serializer? serializer, - Variables? vars, - String? authToken, - ) async { - ExecuteMutationResponse response; - ExecuteMutationRequest request = - ExecuteMutationRequest(name: name, operationName: queryName); - if (vars != null && serializer != null) { - request.variables = getStruct(vars, serializer); - } - - try { - response = await stub.executeMutation( - request, - options: CallOptions(metadata: await getMetadata(authToken)), - ); - return handleResponse( - CommonResponse.fromExecuteMutation(deserializer, response)); - } on Exception catch (e) { - if (e.toString().contains('invalid Firebase Auth Credentials')) { - throw DataConnectError( - DataConnectErrorCode.unauthorized, - 'Failed to invoke operation: $e', - ); - } - rethrow; - } - } -} - -ServerResponse handleResponse(CommonResponse commonResponse) { - Map? jsond = commonResponse.data as Map?; - String jsonEncoded = jsonEncode(commonResponse.data); - - Map? jsonExt = - commonResponse.extensions as Map?; - - if (commonResponse.errors.isNotEmpty) { - Map? data = - jsonDecode(jsonEncoded) as Map?; - Data? decodedData; - List errors = commonResponse - .errors - .map((e) => DataConnectOperationFailureResponseErrorInfo( - e.path.values - .map((val) => val.hasStringValue() - ? DataConnectFieldPathSegment(val.stringValue) - : DataConnectListIndexPathSegment(val.numberValue.toInt())) - .toList(), - e.message)) - .toList(); - if (data != null) { - try { - decodedData = commonResponse.deserializer(jsonEncoded); - } catch (e) { - // nothing required - } - } - final response = - DataConnectOperationFailureResponse(errors, data, decodedData); - throw DataConnectOperationError(DataConnectErrorCode.other, - 'failed to invoke operation: ${response.errors}', response); - } - - // no errors - return a standard response - if (jsond != null) { - return ServerResponse(jsond, extensions: jsonExt); - } else { - return ServerResponse({}); - } -} - -/// Initializes GRPC transport for Data Connect. -DataConnectTransport getTransport( - TransportOptions transportOptions, - DataConnectOptions options, - String appId, - CallerSDKType sdkType, - FirebaseAppCheck? appCheck, -) => - GRPCTransport(transportOptions, options, appId, sdkType, appCheck); - -class CommonResponse { - CommonResponse(this.deserializer, this.data, this.errors, this.extensions); - static CommonResponse fromExecuteMutation( - Deserializer deserializer, ExecuteMutationResponse response) { - return CommonResponse( - deserializer, response.data.toProto3Json(), response.errors, null); - } - - static CommonResponse fromExecuteQuery( - Deserializer deserializer, ExecuteQueryResponse response) { - return CommonResponse(deserializer, response.data.toProto3Json(), - response.errors, response.extensions.toProto3Json()); - } - - final Deserializer deserializer; - final Object? data; - final List errors; - final Object? extensions; -} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart index 2680f692e350..4480096e49a3 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/rest_transport.dart @@ -150,6 +150,7 @@ class RestTransport implements DataConnectTransport { /// Invokes query REST endpoint. @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -169,6 +170,7 @@ class RestTransport implements DataConnectTransport { /// Invokes mutation REST endpoint. @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -184,6 +186,20 @@ class RestTransport implements DataConnectTransport { token, ); } + + /// WebSockets are now handled by WebSocketTransport in FirebaseDataConnect. + @override + Stream invokeStreamQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + throw UnsupportedError( + 'Streaming should be routed through WebSocketTransport'); + } } /// Initializes Rest transport for Data Connect. diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart new file mode 100644 index 000000000000..f64ee0bad44b --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/stream_protocol.dart @@ -0,0 +1,174 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// The kind of streaming request. +enum RequestKind { + subscribe, + execute, + resume, + cancel, +} + +/// Request to execute or subscribe to a Data Connect query or mutation. +class ExecuteRequest { + ExecuteRequest(this.operationName, this.variables); + + final String operationName; + final Map? variables; + + Map toJson() { + final Map data = {}; + data['operationName'] = operationName; + if (variables != null) { + data['variables'] = variables; + } + return data; + } +} + +/// Request to resume a query. +class ResumeRequest { + ResumeRequest(); + + Map toJson() { + return {}; + } +} + +/// StreamRequest defines the request of Data Connect's bi-directional streaming API. +class StreamRequest { + StreamRequest({ + this.name, + this.headers, + this.authToken, + this.appCheckToken, + this.requestId, + this.requestKind, + this.subscribe, + this.execute, + this.resume, + this.cancel, + this.dataEtag, + }); + + /// The resource name of the connector. + final String? name; + + /// Optional headers. + final Map? headers; + + /// Optional Auth token. + final String? authToken; + + /// Optional App Check token. + final String? appCheckToken; + + /// The request id used to identify a request within the stream. + final String? requestId; + + /// Kind of the request. + final RequestKind? requestKind; + + /// Subscribe to a Data Connect query. + final ExecuteRequest? subscribe; + + /// Execute a Data Connect query or mutation. + final ExecuteRequest? execute; + + /// Resume a query. + final ResumeRequest? resume; + + /// Signal that the client is no longer interested. + final bool? cancel; + + /// Etag for caching. + final String? dataEtag; + + Map toJson() { + final Map data = {}; + if (name != null) { + data['name'] = name; + } + if (headers != null) { + data['headers'] = headers; + } + if (authToken != null) { + data['authToken'] = authToken; + } + if (appCheckToken != null) { + data['appCheckToken'] = appCheckToken; + } + if (requestId != null) { + data['requestId'] = requestId; + } + if (dataEtag != null) { + data['dataEtag'] = dataEtag; + } + + if (subscribe != null) { + data['subscribe'] = subscribe!.toJson(); + } else if (execute != null) { + data['execute'] = execute!.toJson(); + } else if (resume != null) { + data['resume'] = resume!.toJson(); + } else if (cancel == true) { + data['cancel'] = {}; + } + + return data; + } +} + +/// StreamResponse defines the response of Data Connect's bi-directional streaming API. +class StreamResponse { + StreamResponse({ + this.requestId, + this.data, + this.dataEtag, + this.errors, + this.cancelled, + this.extensions, + }); + + factory StreamResponse.fromJson(Map json) { + if (json.containsKey('result')) { + json = json['result'] as Map; + } else if (json.containsKey('error')) { + final errObj = json['error'] as Map; + json = { + 'errors': [ + {'message': errObj['message']} + ] + }; + } + + List? errorsList = json['errors'] as List?; + + return StreamResponse( + requestId: json['requestId'] as String?, + data: json['data'] as Map?, + dataEtag: json['dataEtag'] as String?, + errors: errorsList, + cancelled: json['cancelled'] as bool?, + extensions: json['extensions'] as Map?, + ); + } + + final String? requestId; + final Map? data; + final String? dataEtag; + final List? errors; + final bool? cancelled; + final Map? extensions; +} diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart index 7da28d92b812..44e0391dd6c7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_library.dart @@ -12,8 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'dart:async'; +import 'dart:convert'; +import 'dart:developer' as developer; +import 'dart:math'; import 'package:firebase_app_check/firebase_app_check.dart'; +import 'package:firebase_auth/firebase_auth.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import '../common/common_library.dart'; +import '../dataconnect_version.dart'; +import 'stream_protocol.dart'; part 'transport_stub.dart'; +part 'websocket_transport.dart'; diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart index e00c53b5bac1..87b8445f3abe 100644 --- a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/transport_stub.dart @@ -50,6 +50,7 @@ class TransportStub implements DataConnectTransport { /// Stub for invoking a mutation. @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -60,9 +61,24 @@ class TransportStub implements DataConnectTransport { throw UnimplementedError(); } + /// Stub for subscribing to a query. + @override + Stream invokeStreamQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? token, + ) { + // TODO: implement invokeStreamQuery + throw UnimplementedError(); + } + /// Stub for invoking a query. @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serialize, diff --git a/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart new file mode 100644 index 000000000000..0fb62e5fde33 --- /dev/null +++ b/packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart @@ -0,0 +1,646 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +part of 'transport_library.dart'; + +/// WebSocketTransport makes requests out to the streaming endpoints of the configured backend, +/// multiplexing multiple subscriptions and unary operations over a single WebSocket connection. + +class _PendingUnary { + final Completer completer; + final String operationName; + final Map? variables; + final bool isMutation; + + _PendingUnary( + this.completer, this.operationName, this.variables, this.isMutation); +} + +class _PendingSubscription { + final String operationId; + final String queryName; + final Map? variables; + + _PendingSubscription(this.operationId, this.queryName, this.variables); +} + +class WebSocketTransport implements DataConnectTransport { + static const int _maxReconnectAttempts = 10; + static const int _maxReconnectDelayMs = 30000; + static const int _initialReconnectDelayMs = 1000; + + /// Initializes necessary protocol and port. + WebSocketTransport( + this.transportOptions, + this.options, + this.appId, + this.sdkType, + this.appCheck, [ + this.auth, + ]) { + final protocol = (transportOptions.isSecure ?? true) ? 'wss' : 'ws'; + final host = transportOptions.host; + final port = transportOptions.port ?? 443; + final location = options.location; + + _url = Uri( + scheme: protocol, + host: host, + port: port, + path: + '/ws/google.firebase.dataconnect.v1.ConnectorStreamService/Connect/locations/$location', + ).toString(); + + _currentUid = auth?.currentUser?.uid; + _authSubscription = auth?.idTokenChanges().listen((user) async { + final newUid = user?.uid; + // Disconnect and reconnect on any fundamental user change (login, logout, switch). + if (_currentUid != newUid) { + _disconnect(); + _scheduleReconnect(); + } else if (newUid != null && isConnected) { + // Token refreshed for the same user, push the new token natively down the socket. + try { + final token = await user?.getIdToken(); + final request = StreamRequest( + requestId: _generateRequestId('auth'), + headers: _buildHeaders(token, null), + ); + _send(request.toJson()); + } catch (_) { + // Ignored + } + } + _currentUid = newUid; + }); + } + + FirebaseAuth? auth; + String? _currentUid; + // ignore: unused_field + StreamSubscription? _authSubscription; //required to hold reference + + @override + FirebaseAppCheck? appCheck; + + @override + CallerSDKType sdkType; + + late String _url; + + @override + TransportOptions transportOptions; + + @override + DataConnectOptions options; + + @override + String appId; + + WebSocketChannel? _channel; + // ignore: unused_field + StreamSubscription? _channelSubscription; + + // Active listeners for stream subscriptions mapped by requestId. + final Map>> _streamListeners = + {}; + + // Pending information for subscriptions mapped by requestId. + final Map _pendingSubscriptions = {}; + + // Active completers for unary operations mapped by requestId. + final Map> _unaryListeners = {}; + + // Active subscriptions mapped by operationId => requestId. + final Map _activeSubscriptions = {}; + + bool _isReconnecting = false; + int _reconnectAttempts = 0; + bool _isExpectedDisconnect = false; + + void _checkIdleAndDisconnect() { + if (_streamListeners.isEmpty && _unaryListeners.isEmpty) { + _isExpectedDisconnect = true; + _disconnect(); + _clearState(); + } + } + + final Random _random = Random(); + static const String _chars = 'abcdefghijklmnopqrstuvwxyz0123456789'; + + String _generateRequestId(String operationName) { + final randStr = String.fromCharCodes(Iterable.generate( + 15, (_) => _chars.codeUnitAt(_random.nextInt(_chars.length)))); + return '${operationName}_$randStr'; + } + + void _send(Map json) { + if (_channel == null) return; + final encoded = jsonEncode(json); + if (encoded.isNotEmpty) { + developer.log("Sending stream message \n $encoded"); + _channel!.sink.add(encoded); + } + } + + bool get isConnected => _channel != null; + + Map _buildHeaders(String? authToken, String? appCheckToken) { + Map headers = { + 'x-goog-api-client': getGoogApiVal(sdkType, packageVersion), + 'x-firebase-client': getFirebaseClientVal(packageVersion) + }; + if (authToken != null) { + headers['X-Firebase-Auth-Token'] = authToken; + } + if (appCheckToken != null) { + headers['X-Firebase-AppCheck'] = appCheckToken; + } + headers['x-firebase-gmpid'] = appId; + return headers; + } + + Future? _connectionFuture; + + Future _ensureConnected(String? authToken) { + if (_channel != null) return Future.value(); + if (_connectionFuture != null) return _connectionFuture!; + _connectionFuture = _doConnect(authToken).whenComplete(() { + _connectionFuture = null; + }); + return _connectionFuture!; + } + + Future _doConnect(String? authToken) async { + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + _channel = WebSocketChannel.connect(Uri.parse(_url)); + _channelSubscription = _channel?.stream.listen( + _onMessage, + onError: _onError, + onDone: _onDone, + ); + + // reset this since an explicit connect was requested + _isExpectedDisconnect = false; + + try { + await _channel?.ready; + } catch (e) { + developer.log('WebSocket connection failed to become ready: $e'); + _channel = null; + throw DataConnectError( + DataConnectErrorCode.other, 'WebSocket connection failed: $e'); + } + + final initRequest = StreamRequest( + name: + 'projects/${options.projectId}/locations/${options.location}/services/${options.serviceId}/connectors/${options.connector}', + headers: headers, + ); + _send(initRequest.toJson()); + } + + // called when a message is received from the stream + void _onMessage(dynamic message) { + try { + var bodyString = ''; + if (message is List) { + bodyString = utf8.decode(message); + } else { + bodyString = message as String; + } + developer.log("Received stream response \n $bodyString"); + + final bodyJson = jsonDecode(bodyString) as Map; + final response = StreamResponse.fromJson(bodyJson); + + final requestId = response.requestId; + if (requestId == null) return; + + final serverResponse = ServerResponse( + response.data ?? {}, + extensions: response.extensions, + ); + + // Append errors if any exist on the stream payload + if (response.errors != null && response.errors!.isNotEmpty) { + // We simulate a DataConnectOperationError payload structure + // so that ref.dart can parse it correctly + serverResponse.data['errors'] = response.errors; + } + + if (_unaryListeners.containsKey(requestId)) { + final pendings = _unaryListeners.remove(requestId) ?? []; + for (final p in pendings) { + if (!p.completer.isCompleted) { + p.completer.complete(serverResponse); + } + } + _checkIdleAndDisconnect(); + } + + if (_streamListeners.containsKey(requestId)) { + final controllers = _streamListeners[requestId] ?? []; + if (response.cancelled == true) { + for (final controller in controllers) { + controller.close(); + } + _streamListeners.remove(requestId); + _activeSubscriptions.removeWhere((key, value) => value == requestId); + _pendingSubscriptions.remove(requestId); + _checkIdleAndDisconnect(); + } else { + for (final controller in controllers) { + controller.add(serverResponse); + } + } + } + } catch (e) { + // JSON decoding error or unknown format + developer.log('error decoding server response $e'); + } + } + + void _clearState([DataConnectError? error]) { + final e = error ?? + DataConnectError( + DataConnectErrorCode.other, 'WebSocket connection closed.'); + for (final pendings in _unaryListeners.values) { + for (final p in pendings) { + if (!p.completer.isCompleted) { + p.completer.completeError(e); + } + } + } + for (final controllers in _streamListeners.values) { + for (final controller in controllers) { + controller.addError(e); + controller.close(); + } + } + _unaryListeners.clear(); + _streamListeners.clear(); + _activeSubscriptions.clear(); + _pendingSubscriptions.clear(); + _isReconnecting = false; + _reconnectAttempts = 0; + } + + Timer? _reconnectTimer; + + void _scheduleReconnect() { + developer.log( + '${DateTime.now()} _scheduleReconnect $_reconnectAttempts $_isReconnecting $_isExpectedDisconnect'); + if (_isReconnecting || _isExpectedDisconnect) return; + _isReconnecting = true; + + if (_reconnectAttempts >= _maxReconnectAttempts) { + _clearState(DataConnectError(DataConnectErrorCode.other, + 'Network disconnected after max attempts.')); + return; + } + + final delay = min( + _initialReconnectDelayMs * pow(2, _reconnectAttempts).toInt(), + _maxReconnectDelayMs); + var startTime = DateTime.now(); + developer.log('$startTime scheduling _performReconnect in $delay ms'); + + _reconnectTimer?.cancel(); + _reconnectTimer = Timer(Duration(milliseconds: delay), () async { + developer.log( + '${DateTime.now()} calling delayed _performReconnect scheduled at $startTime'); + _performReconnect(); + }); + } + + Future _refreshAuthToken() async { + try { + return await auth?.currentUser?.getIdToken(); + } catch (_) { + // If fetching token fails, continue unauthenticated. + return null; + } + } + + Future _refreshAppCheckToken() async { + try { + if (appCheck != null) { + return await appCheck!.getToken(); + } + } catch (_) { + // Ignored: continue without AppCheck token if it fails. + } + return null; + } + + void _resubscribeActive(String? authToken, String? appCheckToken) { + for (final sub in _pendingSubscriptions.values) { + final reqId = _activeSubscriptions[sub.operationId]; + if (reqId == null) continue; + final headers = _buildHeaders(authToken, appCheckToken); + final request = StreamRequest( + requestId: reqId, + requestKind: RequestKind.subscribe, + subscribe: ExecuteRequest(sub.queryName, sub.variables), + headers: headers, + ); + _send(request.toJson()); + } + } + + void _replayQueriesAndFailMutations( + String? authToken, String? appCheckToken) { + final unariesToReplay = >{}; + for (final entry in _unaryListeners.entries) { + final reqId = entry.key; + final kept = <_PendingUnary>[]; + for (final p in entry.value) { + if (p.isMutation) { + p.completer.completeError(DataConnectError(DataConnectErrorCode.other, + 'Network reconnected; mutations cannot be safely retried.')); + } else { + kept.add(p); + final headers = _buildHeaders(authToken, appCheckToken); + final request = StreamRequest( + requestId: reqId, + requestKind: RequestKind.execute, + execute: ExecuteRequest(p.operationName, p.variables), + headers: headers, + ); + _send(request.toJson()); + } + } + if (kept.isNotEmpty) { + unariesToReplay[reqId] = kept; + } + } + _unaryListeners.clear(); + _unaryListeners.addAll(unariesToReplay); + } + + Future _performReconnect() async { + _channel?.sink.close(); + _channel = null; + _reconnectAttempts++; + + final authToken = await _refreshAuthToken(); + final appCheckToken = await _refreshAppCheckToken(); + + try { + await _ensureConnected(authToken); + + _reconnectAttempts = 0; + _isReconnecting = false; + + _resubscribeActive(authToken, appCheckToken); + _replayQueriesAndFailMutations(authToken, appCheckToken); + } catch (e) { + _isReconnecting = false; + _scheduleReconnect(); + } + } + + void _onError(dynamic error) { + if (_channel == null) return; + developer.log('WebSocket error: $error'); + _channel = null; + _isReconnecting = false; + _scheduleReconnect(); + } + + void _disconnect() { + _reconnectTimer?.cancel(); + _reconnectTimer = null; + _channel?.sink.close(); + _channel = null; + } + + void disconnect() { + _isExpectedDisconnect = true; + _disconnect(); + } + + void _onDone() { + if (_channel == null) return; + developer.log('WebSocket connection closed.'); + _channel = null; + _isReconnecting = false; + if (!_isExpectedDisconnect) { + _scheduleReconnect(); + } + } + + @override + Future invokeQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) async { + return _invokeUnary(operationId, queryName, deserializer, serializer, vars, + authToken, RequestKind.execute, false); + } + + @override + Future invokeMutation( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) async { + return _invokeUnary(operationId, queryName, deserializer, serializer, vars, + authToken, RequestKind.execute, true); + } + + Future _invokeUnary( + String operationId, + String operationName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + RequestKind requestKind, + bool isMutation, + ) async { + await _ensureConnected(authToken); + + final completer = Completer(); + + if (_activeSubscriptions.containsKey(operationId)) { + final existingRequestId = _activeSubscriptions[operationId]!; + Map? variablesMap; + if (vars != null && serializer != null) { + variablesMap = jsonDecode(serializer(vars)); + } + _unaryListeners.putIfAbsent(existingRequestId, () => []).add( + _PendingUnary(completer, operationName, variablesMap, isMutation)); + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + requestId: existingRequestId, + requestKind: RequestKind.resume, + resume: ResumeRequest(), + headers: headers, + ); + _send(request.toJson()); + + return completer.future; + } + + final requestId = _generateRequestId(operationId); + + Map? variables; + if (vars != null && serializer != null) { + variables = jsonDecode(serializer(vars)); + } + _unaryListeners + .putIfAbsent(requestId, () => []) + .add(_PendingUnary(completer, operationName, variables, isMutation)); + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + requestId: requestId, + requestKind: requestKind, + execute: ExecuteRequest(operationName, variables), + headers: headers, + ); + + _send(request.toJson()); + + return completer.future; + } + + @override + Stream invokeStreamQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) { + late StreamController controller; + + controller = StreamController( + onListen: () async { + try { + await _ensureConnected(authToken); + } catch (e) { + developer.log("Error subscribing - setting up stream $e"); + // Do NOT add error to sink here. The stream is designed to quietly + // add the query to `_pendingSubscriptions` below and silently + // retry when the network reconnects via `_scheduleReconnect`. + } + + if (_activeSubscriptions.containsKey(operationId)) { + final existingRequestId = _activeSubscriptions[operationId]!; + _streamListeners + .putIfAbsent(existingRequestId, () => []) + .add(controller); + return; + } + + final requestId = _generateRequestId(operationId); + _activeSubscriptions[operationId] = requestId; + _streamListeners.putIfAbsent(requestId, () => []).add(controller); + + Map? variables; + if (vars != null && serializer != null) { + variables = json.decode(serializer(vars)); + } + _pendingSubscriptions[requestId] = + _PendingSubscription(operationId, queryName, variables); + + if (!isConnected) { + // we are not connected - + // keep pending sub to use for retry + _scheduleReconnect(); + return; + } + + String? appCheckToken; + try { + appCheckToken = await appCheck?.getToken(); + } catch (_) { + // Ignored + } + + final headers = _buildHeaders(authToken, appCheckToken); + + final request = StreamRequest( + requestId: requestId, + requestKind: RequestKind.subscribe, + subscribe: ExecuteRequest(queryName, variables), + headers: headers, + ); + + _send(request.toJson()); + }, + onCancel: () { + if (!_activeSubscriptions.containsKey(operationId)) return; + final requestId = _activeSubscriptions[operationId]!; + + final listeners = _streamListeners[requestId]; + if (listeners != null) { + listeners.remove(controller); + if (listeners.isEmpty) { + _streamListeners.remove(requestId); + _activeSubscriptions.remove(operationId); + _pendingSubscriptions.remove(requestId); + + final cancelReq = StreamRequest( + requestId: requestId, + requestKind: RequestKind.cancel, + cancel: true, + ); + _send(cancelReq.toJson()); + _checkIdleAndDisconnect(); + } + } + }, + ); + + return controller.stream; + } +} diff --git a/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml b/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml index d0145d0290ac..3bd28b86b42c 100644 --- a/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml +++ b/packages/firebase_data_connect/firebase_data_connect/pubspec.yaml @@ -27,6 +27,7 @@ dependencies: protobuf: ^3.1.0 sqlite3: ^2.9.0 sqlite3_flutter_libs: ^0.5.40 + web_socket_channel: ^3.0.1 dev_dependencies: build_runner: ^2.4.12 diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart index dec53744b7e3..6848824996c4 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/cache/cache_manager_test.dart @@ -233,7 +233,7 @@ void main() { }); test('maxAge conformance', () async { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponseSuccess = http.Response('{"success": true}', 200); if (dataConnect.cacheManager == null) { diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart index 353f34aeec75..a3798daf98f7 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/common/common_library_test.dart @@ -103,8 +103,10 @@ void main() { test('should handle invokeQuery with proper deserializer', () async { const queryName = 'testQuery'; - final deserializer = (json) => json; + const queryId = 'testQueryId'; + deserializer(json) => json; final result = await transport.invokeQuery( + queryId, queryName, deserializer, emptySerializer, @@ -117,8 +119,10 @@ void main() { test('should handle invokeMutation with proper deserializer', () async { const queryName = 'testMutation'; - final deserializer = (json) => json; + const operationId = 'testMutationId'; + deserializer(json) => json; final result = await transport.invokeMutation( + operationId, queryName, deserializer, emptySerializer, @@ -134,17 +138,18 @@ void main() { // Test class extending DataConnectTransport for testing purposes class TestDataConnectTransport extends DataConnectTransport { TestDataConnectTransport( - TransportOptions transportOptions, - DataConnectOptions options, - String appId, - CallerSDKType sdkType, { + super.transportOptions, + super.options, + super.appId, + super.sdkType, { FirebaseAppCheck? appCheck, - }) : super(transportOptions, options, appId, sdkType) { + }) { this.appCheck = appCheck; } @override Future invokeQuery( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -157,6 +162,7 @@ class TestDataConnectTransport extends DataConnectTransport { @override Future invokeMutation( + String operationId, String queryName, Deserializer deserializer, Serializer? serializer, @@ -166,4 +172,16 @@ class TestDataConnectTransport extends DataConnectTransport { // Simulate mutation invocation logic here return ServerResponse({}); } + + @override + Stream invokeStreamQuery( + String operationId, + String queryName, + Deserializer deserializer, + Serializer? serializer, + Variables? vars, + String? authToken, + ) { + return Stream.value(ServerResponse({})); + } } diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart index 3cb6c9ce26c8..5735f324f3b4 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/common/dataconnect_error_test.dart @@ -79,8 +79,7 @@ void main() { group('Serializer and Deserializer', () { test('should serialize variables into string format', () { - Serializer> serializer = - (Map vars) => vars.toString(); + String serializer(Map vars) => vars.toString(); final inputVars = {'key1': 'value1', 'key2': 123}; final serializedString = serializer(inputVars); @@ -89,8 +88,7 @@ void main() { }); test('should deserialize string data into expected format', () { - Deserializer> deserializer = - (String data) => {'data': data}; + deserializer(String data) => {'data': data}; const inputData = '{"message": "Hello World"}'; final deserializedData = deserializer(inputData); diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart index 636cae0cde52..b057f4aeb71f 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart @@ -82,7 +82,7 @@ void main() { test( 'addQuery should create a new StreamController if query does not exist', () { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; String varSerializer(Object? _) { return 'varsAsStr'; } @@ -98,8 +98,7 @@ void main() { ); final stream = queryManager.addQuery(ref); - //expect(queryManager.trackedQueries['testQuery'], isNotNull); - expect(queryManager.trackedQueries['testQuery::varsAsStr'], isNotNull); + expect(queryManager.trackedQueries.values.contains(ref), isTrue); expect(stream, isA()); }); }); @@ -149,7 +148,7 @@ void main() { mockDataConnect.transport = transport; }); test('executeQuery should gracefully handle getIdToken failures', () async { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponseSuccess = http.Response('{"success": true}', 200); when(mockUser.getIdToken()).thenThrow(Exception('Auth error')); QueryRef ref = QueryRef( @@ -175,7 +174,7 @@ void main() { () async { final mockResponse = http.Response('{"error": "Unauthorized"}', 401); final mockResponseSuccess = http.Response('{"success": true}', 200); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; int count = 0; int idTokenCount = 0; QueryRef ref = QueryRef( @@ -219,7 +218,7 @@ void main() { }); test('throw Error if server throws one', () { - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final mockResponse = http.Response( ''' { @@ -285,10 +284,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, @@ -339,10 +338,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, @@ -390,10 +389,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) { + AbcHolder deserializer(String data) { Map decoded = jsonDecode(data) as Map; return AbcHolder(decoded['abc']!); - }; + } QueryRef ref = QueryRef( mockDataConnect, diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart index 2deb28f22749..b4edc25b2a81 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/firebase_data_connect_test.dart @@ -25,7 +25,16 @@ import 'package:mockito/mockito.dart'; @GenerateNiceMocks([MockSpec(), MockSpec()]) import 'firebase_data_connect_test.mocks.dart'; -class MockFirebaseAuth extends Mock implements FirebaseAuth {} +class MockFirebaseAuth extends Mock implements FirebaseAuth { + @override + Stream idTokenChanges() { + return super.noSuchMethod( + Invocation.method(#idTokenChanges, []), + returnValue: const Stream.empty(), + returnValueForMissingStub: const Stream.empty(), + ) as Stream; + } +} class MockFirebaseAppCheck extends Mock implements FirebaseAppCheck {} diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart index bad97d364d32..d3dbc2902d12 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/rest_transport_test.dart @@ -98,7 +98,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; expect( () => transport.invokeOperation( @@ -124,7 +124,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; expect( () => transport.invokeOperation( @@ -150,9 +150,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; - await transport.invokeQuery('testQuery', deserializer, null, null, null); + await transport.invokeQuery( + 'testQueryId', 'testQuery', deserializer, null, null, null); verify( mockHttpClient.post( @@ -178,9 +179,10 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Mutation Data'; + String deserializer(String data) => 'Deserialized Mutation Data'; await transport.invokeMutation( + 'testMutationId', 'testMutation', deserializer, null, @@ -215,7 +217,7 @@ void main() { when(mockUser.getIdToken()).thenAnswer((_) async => 'authToken123'); when(mockAppCheck.getToken()).thenAnswer((_) async => 'appCheckToken123'); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', @@ -250,7 +252,7 @@ void main() { when(mockUser.getIdToken()).thenAnswer((_) async => 'authToken123'); when(mockAppCheck.getToken()).thenAnswer((_) async => 'appCheckToken123'); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', @@ -297,7 +299,7 @@ void main() { ), ).thenAnswer((_) async => mockResponse); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; final result = await transport.invokeOperation( 'testQuery', @@ -327,7 +329,7 @@ void main() { when(mockUser.getIdToken()).thenThrow(Exception('Auth error')); when(mockAppCheck.getToken()).thenThrow(Exception('AppCheck error')); - final deserializer = (String data) => 'Deserialized Data'; + String deserializer(String data) => 'Deserialized Data'; await transport.invokeOperation( 'testQuery', diff --git a/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart b/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart index d8c6a865d316..daeffedd48cc 100644 --- a/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart +++ b/packages/firebase_data_connect/firebase_data_connect/test/src/network/transport_stub_test.dart @@ -65,6 +65,7 @@ void main() { expect( () async => transportStub.invokeMutation( + 'operationId', 'queryName', (json) => json, null, @@ -86,6 +87,7 @@ void main() { expect( () async => transportStub.invokeQuery( + 'operationId', 'queryName', (json) => json, null,