diff --git a/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt b/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt index 60fa3bea..58270e95 100644 --- a/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt +++ b/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt @@ -73,6 +73,7 @@ import org.obd.metrics.pid.PidDefinitionRegistry import java.text.SimpleDateFormat import java.util.Date import java.util.Locale +import java.util.concurrent.ConcurrentLinkedDeque private const val LOG_TAG = "Graph" @@ -260,7 +261,7 @@ class GraphFragment : Fragment() { val sensorData = SensorData( id = it.command.pid.id, - metrics = mutableListOf(), + metrics = ConcurrentLinkedDeque(), min = hist.min, max = hist.max, mean = hist.mean diff --git a/common/src/main/java/org/obd/graphs/Cache.kt b/common/src/main/java/org/obd/graphs/Cache.kt index 51d70664..ad236085 100644 --- a/common/src/main/java/org/obd/graphs/Cache.kt +++ b/common/src/main/java/org/obd/graphs/Cache.kt @@ -24,10 +24,6 @@ class Cache { fun updateEntry(name: String, value: Any) { cache[name] = value } - - fun initCache(m: MutableMap) { - cache = m - } } val cacheManager = Cache() diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt index 7cb2849e..4bed473b 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt @@ -18,7 +18,6 @@ package org.obd.graphs.bl.trip import android.content.Context import android.util.Log -import org.obd.graphs.bl.datalogger.DataLoggerRepository import org.obd.graphs.bl.datalogger.MetricsProcessor import org.obd.graphs.bl.datalogger.scaleToRange import org.obd.graphs.getContext @@ -27,32 +26,31 @@ import org.obd.graphs.preferences.Prefs import org.obd.graphs.preferences.isEnabled import org.obd.graphs.profile.profile import org.obd.metrics.api.model.ObdMetric -import java.io.File -import java.io.FileOutputStream import java.text.SimpleDateFormat import java.util.Date import java.util.Locale -val tripManager: TripManager = DefaultTripManager() +val tripManager: TripManager by lazy { DefaultTripManager() } private const val LOGGER_TAG = "TripManager" private const val MIN_TRIP_LENGTH = 5 -private const val TRIP_DIRECTORY = "trips" - private const val TRIP_FILE_PREFIX = "trip" +private const val MAX_CACHED_METRICS_PER_SENSOR = 18000 + internal class DefaultTripManager : TripManager, MetricsProcessor { - private val dateFormat: SimpleDateFormat = - SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault()) - private val tripModelSerializer = TripModelSerializer() + private val dateFormat: SimpleDateFormat = SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault()) private val tripCache = TripCache() - private val tripDescParser = TripDescParser() - override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}" + private val repository: TripRepository by lazy { FileTripRepository(getContext()!!) } + + private var activeTripId: String? = null + + override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir("trips")?.absolutePath}" override fun postValue(obdMetric: ObdMetric) { try { @@ -61,41 +59,36 @@ internal class DefaultTripManager : val key = obdMetric.command.pid.id val newRecord = if (obdMetric.isNumber()) Entry(ts, obdMetric.scaleToRange(), key) else Entry(ts, obdMetric.value, key) - if (trip.entries.containsKey(key)) { - val tripEntry = trip.entries[key]!! - tripEntry.metrics.add( - Metric( - entry = newRecord, - ts = obdMetric.timestamp, - rawAnswer = obdMetric.raw - ) - ) - } else { - trip.entries[key] = - SensorData( - id = key, - metrics = - mutableListOf( - Metric( - entry = newRecord, - ts = obdMetric.timestamp, - rawAnswer = obdMetric.raw - ) - ) - ) + val metric = Metric( + entry = newRecord, + ts = obdMetric.timestamp, + rawAnswer = obdMetric.raw + ) + + repository.saveMetric(metric) + + val tripEntry = trip.entries.getOrPut(key) { + SensorData(id = key) + } + + tripEntry.metrics.add(metric) + + while (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) { + tripEntry.metrics.removeFirst() } } } catch (e: Throwable) { - Log.e(LOGGER_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e) + Log.e(LOGGER_TAG, "Failed to process metric for ${obdMetric.command.pid.pid}", e) } } override fun getCurrentTrip(): Trip { - if (null == tripCache.getTrip()) { - startNewTrip(System.currentTimeMillis()) + val trip = tripCache.getTrip() ?: run { + val newTs = System.currentTimeMillis() + startNewTrip(newTs) + tripCache.getTrip() ?: Trip(startTs = newTs) } - val trip = tripCache.getTrip()!! Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") return trip } @@ -103,165 +96,86 @@ internal class DefaultTripManager : override fun startNewTrip(newTs: Long) { Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") updateCache(newTs) + + activeTripId = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl" + repository.initStorage(activeTripId!!) } - override fun saveCurrentTrip(f: () -> Unit) { + override fun saveCurrentTrip() { tripCache.getTrip { trip -> val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip") val tripLength = getTripLength(trip) - Log.i(LOGGER_TAG, "Recorded trip, length: ${tripLength}s") + val currentTripId = activeTripId ?: return@getTrip + + Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s") + + repository.releaseStorage(currentTripId) if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) { - val tripStartTs = trip.startTs - - val filter = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs" - val alreadySaved = findAllTripsBy(filter) - - if (alreadySaved.isNotEmpty()) { - Log.e( - LOGGER_TAG, - "It seems that Trip which start same date='$filter' is already saved." - ) - } else { - try { - f() - val histogram = DataLoggerRepository.getDiagnostics().histogram() - val pidDefinitionRegistry = DataLoggerRepository.getPidDefinitionRegistry() - - trip.entries.forEach { (t, u) -> - val p = pidDefinitionRegistry.findBy(t) - p?.let { - val histogramSupplier = histogram.findBy(it) - u.max = histogramSupplier.max - u.min = histogramSupplier.min - u.mean = histogramSupplier.mean - } - } - - val content: String = - tripModelSerializer.serializer.writeValueAsString(trip) - - val fileName = - "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs-$tripLength.json" - Log.i( - LOGGER_TAG, - "Saving the trip to the file: '$fileName'. Length: ${tripLength}s" - ) - writeFile(getContext()!!, fileName, content) - Log.i( - LOGGER_TAG, - "Trip was written to the file: '$fileName'. Length: ${tripLength}s" - ) - } catch (e: java.lang.Exception) { - Log.e(LOGGER_TAG, "Failed to save trip", e) - } - } + repository.updateTripMetadata(currentTripId, trip.startTs, tripLength, profile.getCurrentProfile()) } else { - Log.w(LOGGER_TAG, "Trip was not saved. Trip time is less than ${MIN_TRIP_LENGTH}s") + Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Discarding.") + repository.deleteTrip(currentTripId) } + + activeTripId = null } } override fun findAllTripsBy(filter: String): MutableCollection { - Log.i(LOGGER_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}") - - val files = File(getTripsDirectory(getContext()!!)).list() - if (files == null) { - Log.i(LOGGER_TAG, "No files were found in the trips directory.") - return mutableListOf() - } else { - val result = - files - .filter { if (filter.isNotEmpty()) it.startsWith(filter) else true } - .filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") } - .filter { - it.contains("${profile.getCurrentProfile()}-") - }.filter { - try { - tripDescParser.decodeTripName(it).size > 3 - } catch (e: Throwable) { - false - } - }.mapNotNull { fileName -> - Log.d(LOGGER_TAG, "Found trip which fits the conditions: $fileName") - tripDescParser.getTripDesc(fileName) - }.sortedByDescending { it.startTime.toLongOrNull() } - .toMutableList() - Log.i(LOGGER_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}") - return result - } + return repository.findAllTripsBy(filter, profile.getCurrentProfile()) } override fun deleteTrip(trip: TripFileDesc) { - Log.i(LOGGER_TAG, "Deleting '${trip.fileName}' from the storage.") - val file = File(getTripsDirectory(getContext()!!), trip.fileName) - file.delete() - Log.i(LOGGER_TAG, "Trip '${trip.fileName}' has been deleted from the storage.") + repository.deleteTrip(trip.fileName) } - override fun loadTrip(tripName: String) { - Log.i(LOGGER_TAG, "Loading '$tripName' from disk.") + override fun loadTrip(tripId: String) { + Log.i(LOGGER_TAG, "Loading trip ID: '$tripId'") - if (tripName.isEmpty()) { + if (tripId.isEmpty()) { updateCache(System.currentTimeMillis()) - } else { - val file = File(getTripsDirectory(getContext()!!), tripName) - try { - val trip: Trip = tripModelSerializer.deserializer.readValue(file, Trip::class.java) - Log.i(LOGGER_TAG, "Trip '${file.absolutePath}' was loaded from the storage.") - Log.i(LOGGER_TAG, "Trip selected PIDs ${trip.entries.keys}") - Log.i(LOGGER_TAG, "Number of entries ${trip.entries.values.size} collected within the trip") - - tripCache.updateTrip(trip) - tripVirtualScreenManager.updateReservedVirtualScreen( - trip.entries.keys - .map { it.toString() } - .toList() - ) - } catch (e: Throwable) { - Log.e(LOGGER_TAG, "Did not find trip '$tripName'.", e) - updateCache(System.currentTimeMillis()) - } + return } - } - private fun writeFile( - context: Context, - fileName: String, - content: String - ) { - var fd: FileOutputStream? = null try { - val file = getTripFile(context, fileName) - fd = - FileOutputStream(file).apply { - write(content.toByteArray()) + val parts = tripDescParser.decodeTripName(tripId) + val startTs = parts.getOrNull(2)?.toLongOrNull() ?: System.currentTimeMillis() + val trip = Trip(startTs = startTs) + + repository.loadTrip(tripId) { metric -> + val key = metric.entry.data + if (!trip.entries.containsKey(key)) { + trip.entries[key] = SensorData(id = key) + } + trip.entries[key]!!.metrics.add(metric) + } + + trip.entries.values.forEach { sensorData -> + val values = sensorData.metrics.mapNotNull { it.entry.y.toString().toFloatOrNull() } + if (values.isNotEmpty()) { + sensorData.min = values.minOrNull() ?: 0f + sensorData.max = values.maxOrNull() ?: 0f + sensorData.mean = values.average() } - } finally { - fd?.run { - flush() - close() } + + Log.i(LOGGER_TAG, "Trip loaded successfully. PIDs: ${trip.entries.keys}") + tripCache.updateTrip(trip) + tripVirtualScreenManager.updateReservedVirtualScreen(trip.entries.keys.map { it.toString() }) + } catch (e: Throwable) { + Log.e(LOGGER_TAG, "Failed to load trip '$tripId'.", e) + updateCache(System.currentTimeMillis()) } } - private fun getTripFile( - context: Context, - fileName: String - ): File = File(getTripsDirectory(context), fileName) - private fun updateCache(newTs: Long) { - val trip = Trip(startTs = newTs, entries = mutableMapOf()) + val trip = Trip(startTs = newTs) tripCache.updateTrip(trip) - Log.i(LOGGER_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'") } private fun getTripLength(trip: Trip): Long = - if (trip.startTs == 0L) { - 0 - } else { - (Date().time - trip.startTs) / 1000 - } + if (trip.startTs == 0L) 0 else (Date().time - trip.startTs) / 1000 private fun formatTimestamp(ts: Long) = dateFormat.format(Date(ts)) } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt index f933a847..07fc9d03 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt @@ -24,7 +24,7 @@ private const val CACHE_TRIP_PROPERTY_NAME = "cache.trip.current" internal class TripCache { init { - val trip = Trip(startTs = System.currentTimeMillis(), entries = mutableMapOf()) + val trip = Trip(startTs = System.currentTimeMillis()) updateTrip(trip) Log.i("tripCache", "Init Trip with stamp: ${trip.startTs}") } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt index 1911c889..dca6c79e 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt @@ -23,16 +23,22 @@ class TripDescParser { val p = decodeTripName(fileName) val profileId = p[1] val profiles = profile.getAvailableProfiles() - val profileLabel = profiles[profileId]!! + val profileLabel = profiles[profileId] ?: "Unknown" + + val startTime = if (p.size > 2) p[2] else "" + val tripTimeSec = if (p.size > 3) p[3] else "0" return TripFileDesc( fileName = fileName, profileId = profileId, profileLabel = profileLabel, - startTime = p[2], - tripTimeSec = p[3] + startTime = startTime, + tripTimeSec = tripTimeSec ) } - fun decodeTripName(fileName: String) = fileName.substring(0, fileName.length - 5).split("-") + fun decodeTripName(fileName: String): List { + val nameWithoutExtension = fileName.substringBeforeLast(".") + return nameWithoutExtension.split("-") + } } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt index 2fbf622e..093e9b74 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt @@ -28,7 +28,7 @@ private const val LOG_TAG = "TripManager" interface TripManager : MetricsProcessor { fun getCurrentTrip(): Trip fun startNewTrip(newTs: Long) - fun saveCurrentTrip(f: () -> Unit) + fun saveCurrentTrip() fun getTripsDirectory(context: Context): String fun saveCurrentTripAsync(){ @@ -37,7 +37,7 @@ interface TripManager : MetricsProcessor { runAsync (wait = false) { try { - tripManager.saveCurrentTrip {} + tripManager.saveCurrentTrip() } finally { sendBroadcastEvent(SCREEN_UNLOCK_PROGRESS_EVENT) } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt index 525f678e..6cb77e6f 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt @@ -18,6 +18,8 @@ package org.obd.graphs.bl.trip import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.obd.metrics.transport.message.ConnectorResponse +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedDeque data class TripFileDesc( val fileName: String, @@ -44,7 +46,7 @@ data class Metric( @JsonIgnoreProperties(ignoreUnknown = true) data class SensorData( val id: Long, - val metrics: MutableList, + val metrics: ConcurrentLinkedDeque = ConcurrentLinkedDeque(), var min: Number = 0, var max: Number = 0, var mean: Number = 0 @@ -66,4 +68,4 @@ data class SensorData( } @JsonIgnoreProperties(ignoreUnknown = true) -data class Trip(val startTs: Long, val entries: MutableMap) +data class Trip(val startTs: Long, val entries: MutableMap = ConcurrentHashMap()) diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt new file mode 100644 index 00000000..22cc236e --- /dev/null +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt @@ -0,0 +1,173 @@ +/* + * Copyright 2019-2026, Tomasz Żebrowski + * + *

Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.obd.graphs.bl.trip + +import android.content.Context +import android.util.Log +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import java.io.File +import java.io.FileOutputStream +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicLong + +private const val LOGGER_TAG = "TripRepository" +private const val TRIP_DIRECTORY = "trips" +private const val TRIP_FILE_PREFIX = "trip" + +interface TripRepository { + fun initStorage(tripId: String) + fun releaseStorage(tripId: String) + + fun saveMetric(metric: Metric) + fun updateTripMetadata(tripId: String, tripStartTs: Long, tripLength: Long, profile: String) + fun deleteTrip(tripId: String) + + fun findAllTripsBy(filter: String, profile: String): MutableCollection + fun loadTrip(tripId: String, onMetricLoaded: (Metric) -> Unit) +} + +internal class FileTripRepository( + private val context: Context, + private val parser: TripDescParser = TripDescParser(), + private val serializer: TripModelSerializer = TripModelSerializer() +) : TripRepository { + + private var activeFileOutputStream: FileOutputStream? = null + private var activeTripId: String? = null + private val totalMetricsSaved = AtomicLong(0) + + private val ioDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val ioScope = CoroutineScope(ioDispatcher) + + private fun getTripsDirectory() = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}" + private fun getTripFile(fileName: String): File = File(getTripsDirectory(), fileName) + + override fun initStorage(tripId: String) { + activeTripId = tripId + totalMetricsSaved.set(0) + + ioScope.launch { + try { + val file = getTripFile(activeTripId!!) + activeFileOutputStream = FileOutputStream(file, true) + Log.i(LOGGER_TAG, "Started saving trip to: $activeTripId") + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to initialize storage for trip", e) + } + } + } + + override fun releaseStorage(tripId: String) { + ioScope.launch { + try { + activeFileOutputStream?.flush() + activeFileOutputStream?.close() + activeFileOutputStream = null + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to release storage", e) + } + } + } + + override fun saveMetric(metric: Metric) { + ioScope.launch { + try { + activeFileOutputStream?.let { + val jsonLine = serializer.serializer.writeValueAsString(metric) + "\n" + it.write(jsonLine.toByteArray()) + totalMetricsSaved.incrementAndGet() + } + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to save metric", e) + } + } + } + + override fun updateTripMetadata(tripId: String, tripStartTs: Long, tripLength: Long, profile: String) { + ioScope.launch { + try { + val currentFile = getTripFile(tripId) + if (currentFile.exists()) { + val finalName = "$TRIP_FILE_PREFIX-$profile-$tripStartTs-$tripLength.jsonl" + val finalFile = getTripFile(finalName) + + val isRenamed = currentFile.renameTo(finalFile) + + if (isRenamed) { + val totalItems = totalMetricsSaved.get() + val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) + Log.i(LOGGER_TAG, "Trip finished. ID: '$finalName' | Saved: $totalItems metrics | Size: ${String.format("%.2f", fileSizeMb)} MB") + } else { + Log.e(LOGGER_TAG, "Failed to rename temporary trip file '$tripId' to '$finalName'.") + } + } + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to update trip metadata", e) + } finally { + activeTripId = null + } + } + } + + override fun deleteTrip(tripId: String) { + ioScope.launch { + try { + getTripFile(tripId).delete() + Log.i(LOGGER_TAG, "Deleted trip data: $tripId") + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to delete trip data", e) + } finally { + if (activeTripId == tripId) activeTripId = null + } + } + } + + override fun findAllTripsBy(filter: String, profile: String): MutableCollection { + val files = File(getTripsDirectory()).list() ?: return mutableListOf() + + return files + .filter { if (filter.isNotEmpty()) it.startsWith(filter) else true } + .filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") } + .filter { it.contains("$profile-") } + .filter { + try { + parser.decodeTripName(it).size > 3 + } catch (e: Throwable) { + false + } + }.mapNotNull { fileName -> + parser.getTripDesc(fileName) + }.sortedByDescending { it.startTime.toLongOrNull() } + .toMutableList() + } + + override fun loadTrip(tripId: String, onMetricLoaded: (Metric) -> Unit) { + val file = getTripFile(tripId) + if (file.exists()) { + file.forEachLine { line -> + if (line.isNotBlank()) { + val metric = serializer.deserializer.readValue(line, Metric::class.java) + onMetricLoaded(metric) + } + } + } else { + throw IllegalArgumentException("Trip data not found for ID: $tripId") + } + } +} diff --git a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt index 56c0e7a4..cc0ab58b 100644 --- a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt +++ b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt @@ -20,6 +20,7 @@ import android.util.Log import com.google.gson.stream.JsonReader import com.google.gson.stream.JsonToken import com.google.gson.stream.JsonWriter +import java.io.EOFException import java.io.File import java.io.InputStreamReader import java.io.StringReader @@ -42,6 +43,8 @@ object TripLog { } } +private const val LOG_TAG = "DefaultJSONOutput" + private class DefaultJSONOutput( private val signalMapper: Map = mapOf(), private val valueMapper: (signal: Int, value: Any) -> Any @@ -54,6 +57,10 @@ private class DefaultJSONOutput( override fun transform(file: File, metadata: Map): File = file.inputStream().use { input -> + if (Log.isLoggable(LOG_TAG, Log.DEBUG)) { + Log.d(LOG_TAG, "Received file for transformation name=${file.name}, length=${file.length()}, metadata=$metadata") + } + process(JsonReader(InputStreamReader(input)), metadata) } @@ -61,7 +68,6 @@ private class DefaultJSONOutput( process(JsonReader(StringReader(log)), metadata) private fun process(reader: JsonReader, metadata: Map): File { - Log.d("DefaultJSONOutput", "Received $metadata") val tempFile = File.createTempFile("json_buffer_", ".tmp").apply { deleteOnExit() @@ -71,7 +77,67 @@ private class DefaultJSONOutput( try { reader.isLenient = true - parseRootToMemory(reader, seriesMap) + + // Hybrid parsing loop supports both Legacy JSON and New JSONL + try { + while (reader.peek() != JsonToken.END_DOCUMENT) { + if (reader.peek() == JsonToken.BEGIN_OBJECT) { + reader.beginObject() + + var ts: Long = 0 + var signal = 0 + var value: Any = 0.0 + var isFlatMetric = false + + while (reader.hasNext()) { + when (reader.nextName()) { + // LEGACY FORMAT ROUTES + "startTs" -> reader.skipValue() + "entries" -> parseEntriesToMemory(reader, seriesMap) + + // NEW JSONL FORMAT ROUTES + "ts" -> { + isFlatMetric = true + ts = reader.nextLong() + } + "entry" -> { + isFlatMetric = true + reader.beginObject() + while (reader.hasNext()) { + when (reader.nextName()) { + "data" -> signal = reader.nextInt() + "y" -> { + value = if (reader.peek() == JsonToken.BEGIN_OBJECT) { + reader.readMap() + } else { + reader.nextDouble() + } + } + else -> reader.skipValue() + } + } + reader.endObject() + } + else -> reader.skipValue() + } + } + reader.endObject() + + if (isFlatMetric) { + val signalKey = signal.toString() + val mappedResult = valueMapper(signal, value) + + val series = seriesMap.getOrPut(signalKey) { SeriesData() } + series.timestamps.add(ts) + series.values.add(mappedResult) + } + } else { + reader.skipValue() + } + } + } catch (e: EOFException) { + // Safely reached the end of the stream + } tempFile.outputStream().bufferedWriter().use { fileWriter -> JsonWriter(fileWriter).use { writer -> @@ -126,6 +192,7 @@ private class DefaultJSONOutput( } return tempFile } catch (e: Exception) { + e.printStackTrace() tempFile.delete() throw e } finally { @@ -136,21 +203,6 @@ private class DefaultJSONOutput( } } - private fun parseRootToMemory( - reader: JsonReader, - seriesMap: MutableMap - ) { - reader.beginObject() - while (reader.hasNext()) { - if (reader.nextName() == "entries") { - parseEntriesToMemory(reader, seriesMap) - } else { - reader.skipValue() - } - } - reader.endObject() - } - private fun parseEntriesToMemory( reader: JsonReader, seriesMap: MutableMap diff --git a/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt b/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt index f1a4b72b..6ca21b0d 100644 --- a/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt +++ b/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt @@ -279,4 +279,52 @@ class TripLogTransformerTest { val expected = """{"signal_dictionary":{},"series":{}}""" assertEquals(expected, result) } + + @Test + fun `optimize should convert jsonl flat format to optimized columnar format`() { + val rawJsonl = """ + {"ts": 1000, "entry": {"x": 100.0, "y": 50.5, "data": 12}, "rawAnswer": "ignore me"} + {"ts": 2000, "entry": {"x": 101.0, "y": 60.5, "data": 12}, "rawAnswer": ""} + """.trimIndent() + + val transformer: TripLogTransformer = TripLog.transformer { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = + """{"signal_dictionary":{"12":"12"},"series":{"12":{"t":[1000,2000],"v":[50.5,60.5]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } + + @Test + fun `jsonl parsing should handle mixed signals`() { + val rawJsonl = """ + {"ts": 1000, "entry": {"data": 12, "y": 50.5}} + {"ts": 1500, "entry": {"data": 13, "y": 60.5}} + {"ts": 2000, "entry": {"data": 12, "y": 70.5}} + """.trimIndent() + + val signalMapper = mapOf(12 to "Boost", 13 to "RPM") + val transformer: TripLogTransformer = TripLog.transformer(signalMapper = signalMapper) { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = + """{"signal_dictionary":{"12":"Boost","13":"RPM"},"series":{"12":{"t":[1000,2000],"v":[50.5,70.5]},"13":{"t":[1500],"v":[60.5]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } + + @Test + fun `jsonl format should support map type in value field`() { + val rawJsonl = """ + {"ts": 1500, "entry": {"data": 99, "y": {"GPS altitude": 57.10662841796875, "GPS Location": { "altitude": 57.10662841796875, "accuracy": 46.843723, "latitude": 54.16406183, "longitude": 16.29066863}}}, "rawAnswer": "raw"} + """.trimIndent() + + val transformer: TripLogTransformer = TripLog.transformer { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = """{"signal_dictionary":{"99":"99"},"series":{"99":{"t":[1500],"v":[{"GPS altitude":57.10662841796875,"GPS Location":{"altitude":57.10662841796875,"accuracy":46.843723,"latitude":54.16406183,"longitude":16.29066863}}]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } }