From 099efe221f869b601fe8def81b131ac1b40a4138 Mon Sep 17 00:00:00 2001 From: Tomek Zebrowski Date: Sat, 4 Apr 2026 20:26:47 +0200 Subject: [PATCH 1/5] feat: long trips recording --- common/src/main/java/org/obd/graphs/Cache.kt | 4 - .../obd/graphs/bl/trip/DefaultTripManager.kt | 208 ++++++++++-------- .../java/org/obd/graphs/bl/trip/TripCache.kt | 2 +- .../org/obd/graphs/bl/trip/TripDescParser.kt | 14 +- .../org/obd/graphs/bl/trip/TripManager.kt | 4 +- .../java/org/obd/graphs/bl/trip/TripModel.kt | 6 +- 6 files changed, 138 insertions(+), 100 deletions(-) diff --git a/common/src/main/java/org/obd/graphs/Cache.kt b/common/src/main/java/org/obd/graphs/Cache.kt index 51d70664..ad236085 100644 --- a/common/src/main/java/org/obd/graphs/Cache.kt +++ b/common/src/main/java/org/obd/graphs/Cache.kt @@ -24,10 +24,6 @@ class Cache { fun updateEntry(name: String, value: Any) { cache[name] = value } - - fun initCache(m: MutableMap) { - cache = m - } } val cacheManager = Cache() diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt index 7cb2849e..8cb630e0 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt @@ -18,7 +18,9 @@ package org.obd.graphs.bl.trip import android.content.Context import android.util.Log -import org.obd.graphs.bl.datalogger.DataLoggerRepository +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch import org.obd.graphs.bl.datalogger.MetricsProcessor import org.obd.graphs.bl.datalogger.scaleToRange import org.obd.graphs.getContext @@ -32,15 +34,18 @@ import java.io.FileOutputStream import java.text.SimpleDateFormat import java.util.Date import java.util.Locale +import java.util.concurrent.Executors val tripManager: TripManager = DefaultTripManager() private const val LOGGER_TAG = "TripManager" private const val MIN_TRIP_LENGTH = 5 private const val TRIP_DIRECTORY = "trips" - private const val TRIP_FILE_PREFIX = "trip" +// Holds exactly 30 minutes of data per sensor at 10Hz +private const val MAX_CACHED_METRICS_PER_SENSOR = 18000 + internal class DefaultTripManager : TripManager, MetricsProcessor { @@ -49,9 +54,16 @@ internal class DefaultTripManager : private val tripModelSerializer = TripModelSerializer() private val tripCache = TripCache() - private val tripDescParser = TripDescParser() + // Properties for streaming + private var activeFileOutputStream: FileOutputStream? = null + private var activeTripFileName: String? = null + + // Single thread dispatcher for sequential, non-blocking file operations + private val fileIoDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val fileIoScope = CoroutineScope(fileIoDispatcher) + override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}" override fun postValue(obdMetric: ObdMetric) { @@ -61,27 +73,36 @@ internal class DefaultTripManager : val key = obdMetric.command.pid.id val newRecord = if (obdMetric.isNumber()) Entry(ts, obdMetric.scaleToRange(), key) else Entry(ts, obdMetric.value, key) + val metric = Metric( + entry = newRecord, + ts = obdMetric.timestamp, + rawAnswer = obdMetric.raw + ) + + // STREAM TO FILE (Sequential, Non-Blocking via single-thread dispatcher) + fileIoScope.launch { + try { + val jsonLine = tripModelSerializer.serializer.writeValueAsString(metric) + "\n" + activeFileOutputStream?.write(jsonLine.toByteArray()) + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to stream line to JSONL file", e) + } + } + + // UPDATE RAM CACHE (With 30-min Cap) if (trip.entries.containsKey(key)) { val tripEntry = trip.entries[key]!! - tripEntry.metrics.add( - Metric( - entry = newRecord, - ts = obdMetric.timestamp, - rawAnswer = obdMetric.raw - ) - ) + tripEntry.metrics.add(metric) + + // Memory Protection: Cap the list size + if (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) { + tripEntry.metrics.removeAt(0) + } } else { trip.entries[key] = SensorData( id = key, - metrics = - mutableListOf( - Metric( - entry = newRecord, - ts = obdMetric.timestamp, - rawAnswer = obdMetric.raw - ) - ) + metrics = mutableListOf(metric) ) } } @@ -103,61 +124,69 @@ internal class DefaultTripManager : override fun startNewTrip(newTs: Long) { Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") updateCache(newTs) + + // Generate the file name + val fileName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl" + activeTripFileName = fileName + + // Open the file stream for appending on the sequential thread + fileIoScope.launch { + try { + val file = getTripFile(getContext()!!, fileName) + activeFileOutputStream = FileOutputStream(file, true) + Log.i(LOGGER_TAG, "Opened stream for file: $fileName") + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to open file stream for streaming", e) + } + } } - override fun saveCurrentTrip(f: () -> Unit) { + override fun saveCurrentTrip() { tripCache.getTrip { trip -> val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip") val tripLength = getTripLength(trip) - Log.i(LOGGER_TAG, "Recorded trip, length: ${tripLength}s") + Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s") - if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) { - val tripStartTs = trip.startTs - - val filter = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs" - val alreadySaved = findAllTripsBy(filter) + val fileNameToProcess = activeTripFileName - if (alreadySaved.isNotEmpty()) { - Log.e( - LOGGER_TAG, - "It seems that Trip which start same date='$filter' is already saved." - ) - } else { + if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) { + fileIoScope.launch { try { - f() - val histogram = DataLoggerRepository.getDiagnostics().histogram() - val pidDefinitionRegistry = DataLoggerRepository.getPidDefinitionRegistry() - - trip.entries.forEach { (t, u) -> - val p = pidDefinitionRegistry.findBy(t) - p?.let { - val histogramSupplier = histogram.findBy(it) - u.max = histogramSupplier.max - u.min = histogramSupplier.min - u.mean = histogramSupplier.mean + activeFileOutputStream?.flush() + activeFileOutputStream?.close() + activeFileOutputStream = null + + fileNameToProcess?.let { currentName -> + val currentFile = getTripFile(getContext()!!, currentName) + if (currentFile.exists()) { + val finalName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-${trip.startTs}-$tripLength.jsonl" + val finalFile = getTripFile(getContext()!!, finalName) + currentFile.renameTo(finalFile) + Log.i(LOGGER_TAG, "Trip stream closed and renamed to: '$finalName'") } } - - val content: String = - tripModelSerializer.serializer.writeValueAsString(trip) - - val fileName = - "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs-$tripLength.json" - Log.i( - LOGGER_TAG, - "Saving the trip to the file: '$fileName'. Length: ${tripLength}s" - ) - writeFile(getContext()!!, fileName, content) - Log.i( - LOGGER_TAG, - "Trip was written to the file: '$fileName'. Length: ${tripLength}s" - ) } catch (e: java.lang.Exception) { - Log.e(LOGGER_TAG, "Failed to save trip", e) + Log.e(LOGGER_TAG, "Failed to finalize streaming trip file", e) + } finally { + activeTripFileName = null } } } else { - Log.w(LOGGER_TAG, "Trip was not saved. Trip time is less than ${MIN_TRIP_LENGTH}s") + Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.") + fileIoScope.launch { + try { + activeFileOutputStream?.close() + activeFileOutputStream = null + + fileNameToProcess?.let { + getTripFile(getContext()!!, it).delete() + } + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to delete short trip file", e) + } finally { + activeTripFileName = null + } + } } } } @@ -174,9 +203,8 @@ internal class DefaultTripManager : files .filter { if (filter.isNotEmpty()) it.startsWith(filter) else true } .filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") } + .filter { it.contains("${profile.getCurrentProfile()}-") } .filter { - it.contains("${profile.getCurrentProfile()}-") - }.filter { try { tripDescParser.decodeTripName(it).size > 3 } catch (e: Throwable) { @@ -207,51 +235,57 @@ internal class DefaultTripManager : } else { val file = File(getTripsDirectory(getContext()!!), tripName) try { - val trip: Trip = tripModelSerializer.deserializer.readValue(file, Trip::class.java) + val parts = tripDescParser.decodeTripName(tripName) + val startTs = parts[2].toLongOrNull() ?: System.currentTimeMillis() + val trip = Trip(startTs = startTs) + + // Read line-by-line (JSONL) instead of as one massive object + file.forEachLine { line -> + if (line.isNotBlank()) { + val metric = tripModelSerializer.deserializer.readValue(line, Metric::class.java) + val key = metric.entry.data + + if (!trip.entries.containsKey(key)) { + trip.entries[key] = SensorData(id = key) + } + + val sensorData = trip.entries[key]!! + sensorData.metrics.add(metric) + } + } + + // Calculate historical min/max/mean from loaded values + trip.entries.values.forEach { sensorData -> + val values = sensorData.metrics.mapNotNull { it.entry.y.toString().toFloatOrNull() } + if (values.isNotEmpty()) { + sensorData.min = values.minOrNull() ?: 0f + sensorData.max = values.maxOrNull() ?: 0f + sensorData.mean = values.average() + } + } + Log.i(LOGGER_TAG, "Trip '${file.absolutePath}' was loaded from the storage.") Log.i(LOGGER_TAG, "Trip selected PIDs ${trip.entries.keys}") Log.i(LOGGER_TAG, "Number of entries ${trip.entries.values.size} collected within the trip") tripCache.updateTrip(trip) tripVirtualScreenManager.updateReservedVirtualScreen( - trip.entries.keys - .map { it.toString() } - .toList() + trip.entries.keys.map { it.toString() }.toList() ) } catch (e: Throwable) { - Log.e(LOGGER_TAG, "Did not find trip '$tripName'.", e) + Log.e(LOGGER_TAG, "Did not find or failed to parse trip '$tripName'.", e) updateCache(System.currentTimeMillis()) } } } - private fun writeFile( - context: Context, - fileName: String, - content: String - ) { - var fd: FileOutputStream? = null - try { - val file = getTripFile(context, fileName) - fd = - FileOutputStream(file).apply { - write(content.toByteArray()) - } - } finally { - fd?.run { - flush() - close() - } - } - } - private fun getTripFile( context: Context, fileName: String ): File = File(getTripsDirectory(context), fileName) private fun updateCache(newTs: Long) { - val trip = Trip(startTs = newTs, entries = mutableMapOf()) + val trip = Trip(startTs = newTs) tripCache.updateTrip(trip) Log.i(LOGGER_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'") } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt index f933a847..07fc9d03 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt @@ -24,7 +24,7 @@ private const val CACHE_TRIP_PROPERTY_NAME = "cache.trip.current" internal class TripCache { init { - val trip = Trip(startTs = System.currentTimeMillis(), entries = mutableMapOf()) + val trip = Trip(startTs = System.currentTimeMillis()) updateTrip(trip) Log.i("tripCache", "Init Trip with stamp: ${trip.startTs}") } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt index 1911c889..dca6c79e 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt @@ -23,16 +23,22 @@ class TripDescParser { val p = decodeTripName(fileName) val profileId = p[1] val profiles = profile.getAvailableProfiles() - val profileLabel = profiles[profileId]!! + val profileLabel = profiles[profileId] ?: "Unknown" + + val startTime = if (p.size > 2) p[2] else "" + val tripTimeSec = if (p.size > 3) p[3] else "0" return TripFileDesc( fileName = fileName, profileId = profileId, profileLabel = profileLabel, - startTime = p[2], - tripTimeSec = p[3] + startTime = startTime, + tripTimeSec = tripTimeSec ) } - fun decodeTripName(fileName: String) = fileName.substring(0, fileName.length - 5).split("-") + fun decodeTripName(fileName: String): List { + val nameWithoutExtension = fileName.substringBeforeLast(".") + return nameWithoutExtension.split("-") + } } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt index 2fbf622e..093e9b74 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripManager.kt @@ -28,7 +28,7 @@ private const val LOG_TAG = "TripManager" interface TripManager : MetricsProcessor { fun getCurrentTrip(): Trip fun startNewTrip(newTs: Long) - fun saveCurrentTrip(f: () -> Unit) + fun saveCurrentTrip() fun getTripsDirectory(context: Context): String fun saveCurrentTripAsync(){ @@ -37,7 +37,7 @@ interface TripManager : MetricsProcessor { runAsync (wait = false) { try { - tripManager.saveCurrentTrip {} + tripManager.saveCurrentTrip() } finally { sendBroadcastEvent(SCREEN_UNLOCK_PROGRESS_EVENT) } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt index 525f678e..5fbfed44 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt @@ -18,6 +18,8 @@ package org.obd.graphs.bl.trip import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.obd.metrics.transport.message.ConnectorResponse +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap data class TripFileDesc( val fileName: String, @@ -44,7 +46,7 @@ data class Metric( @JsonIgnoreProperties(ignoreUnknown = true) data class SensorData( val id: Long, - val metrics: MutableList, + val metrics: MutableList = Collections.synchronizedList(ArrayList()), var min: Number = 0, var max: Number = 0, var mean: Number = 0 @@ -66,4 +68,4 @@ data class SensorData( } @JsonIgnoreProperties(ignoreUnknown = true) -data class Trip(val startTs: Long, val entries: MutableMap) +data class Trip(val startTs: Long, val entries: MutableMap = ConcurrentHashMap()) From f2d5af93f423509516e90393e99ca4daceccca43 Mon Sep 17 00:00:00 2001 From: Tomek Zebrowski Date: Sat, 4 Apr 2026 21:26:27 +0200 Subject: [PATCH 2/5] feat: add debug info about number of stored metrics --- .../obd/graphs/bl/trip/DefaultTripManager.kt | 66 +++++++++++-------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt index 8cb630e0..aeec5c44 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt @@ -35,10 +35,11 @@ import java.text.SimpleDateFormat import java.util.Date import java.util.Locale import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicLong val tripManager: TripManager = DefaultTripManager() -private const val LOGGER_TAG = "TripManager" +private const val LOG_TAG = "TripManager" private const val MIN_TRIP_LENGTH = 5 private const val TRIP_DIRECTORY = "trips" private const val TRIP_FILE_PREFIX = "trip" @@ -60,6 +61,9 @@ internal class DefaultTripManager : private var activeFileOutputStream: FileOutputStream? = null private var activeTripFileName: String? = null + // Thread-safe counter for total items written to disk + private val totalMetricsSaved = AtomicLong(0) + // Single thread dispatcher for sequential, non-blocking file operations private val fileIoDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val fileIoScope = CoroutineScope(fileIoDispatcher) @@ -79,22 +83,22 @@ internal class DefaultTripManager : rawAnswer = obdMetric.raw ) - // STREAM TO FILE (Sequential, Non-Blocking via single-thread dispatcher) fileIoScope.launch { try { val jsonLine = tripModelSerializer.serializer.writeValueAsString(metric) + "\n" - activeFileOutputStream?.write(jsonLine.toByteArray()) + activeFileOutputStream?.let { + it.write(jsonLine.toByteArray()) + totalMetricsSaved.incrementAndGet() + } } catch (e: Exception) { - Log.e(LOGGER_TAG, "Failed to stream line to JSONL file", e) + Log.e(LOG_TAG, "Failed to stream line to JSONL file", e) } } - // UPDATE RAM CACHE (With 30-min Cap) if (trip.entries.containsKey(key)) { val tripEntry = trip.entries[key]!! tripEntry.metrics.add(metric) - // Memory Protection: Cap the list size if (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) { tripEntry.metrics.removeAt(0) } @@ -107,7 +111,7 @@ internal class DefaultTripManager : } } } catch (e: Throwable) { - Log.e(LOGGER_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e) + Log.e(LOG_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e) } } @@ -117,13 +121,14 @@ internal class DefaultTripManager : } val trip = tripCache.getTrip()!! - Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") + Log.i(LOG_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") return trip } override fun startNewTrip(newTs: Long) { - Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") + Log.i(LOG_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") updateCache(newTs) + totalMetricsSaved.set(0) // Generate the file name val fileName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl" @@ -134,9 +139,9 @@ internal class DefaultTripManager : try { val file = getTripFile(getContext()!!, fileName) activeFileOutputStream = FileOutputStream(file, true) - Log.i(LOGGER_TAG, "Opened stream for file: $fileName") + Log.i(LOG_TAG, "Opened stream for file: $fileName") } catch (e: Exception) { - Log.e(LOGGER_TAG, "Failed to open file stream for streaming", e) + Log.e(LOG_TAG, "Failed to open file stream for streaming", e) } } } @@ -145,11 +150,13 @@ internal class DefaultTripManager : tripCache.getTrip { trip -> val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip") val tripLength = getTripLength(trip) - Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s") + Log.i(LOG_TAG, "Stopping trip, length: ${tripLength}s") + // Capture the current file name in case another trip starts immediately val fileNameToProcess = activeTripFileName if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) { + // Close and rename on the sequential thread to guarantee all pending writes finish first fileIoScope.launch { try { activeFileOutputStream?.flush() @@ -162,17 +169,20 @@ internal class DefaultTripManager : val finalName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-${trip.startTs}-$tripLength.jsonl" val finalFile = getTripFile(getContext()!!, finalName) currentFile.renameTo(finalFile) - Log.i(LOGGER_TAG, "Trip stream closed and renamed to: '$finalName'") + + val totalItems = totalMetricsSaved.get() + val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) + Log.i(LOG_TAG, "Trip stream closed. File: '$finalName' | Saved: $totalItems items | Size: ${String.format("%.2f", fileSizeMb)} MB") } } } catch (e: java.lang.Exception) { - Log.e(LOGGER_TAG, "Failed to finalize streaming trip file", e) + Log.e(LOG_TAG, "Failed to finalize streaming trip file", e) } finally { activeTripFileName = null } } } else { - Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.") + Log.w(LOG_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.") fileIoScope.launch { try { activeFileOutputStream?.close() @@ -182,7 +192,7 @@ internal class DefaultTripManager : getTripFile(getContext()!!, it).delete() } } catch (e: Exception) { - Log.e(LOGGER_TAG, "Failed to delete short trip file", e) + Log.e(LOG_TAG, "Failed to delete short trip file", e) } finally { activeTripFileName = null } @@ -192,11 +202,11 @@ internal class DefaultTripManager : } override fun findAllTripsBy(filter: String): MutableCollection { - Log.i(LOGGER_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}") + Log.i(LOG_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}") val files = File(getTripsDirectory(getContext()!!)).list() if (files == null) { - Log.i(LOGGER_TAG, "No files were found in the trips directory.") + Log.i(LOG_TAG, "No files were found in the trips directory.") return mutableListOf() } else { val result = @@ -211,24 +221,24 @@ internal class DefaultTripManager : false } }.mapNotNull { fileName -> - Log.d(LOGGER_TAG, "Found trip which fits the conditions: $fileName") + Log.d(LOG_TAG, "Found trip which fits the conditions: $fileName") tripDescParser.getTripDesc(fileName) }.sortedByDescending { it.startTime.toLongOrNull() } .toMutableList() - Log.i(LOGGER_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}") + Log.i(LOG_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}") return result } } override fun deleteTrip(trip: TripFileDesc) { - Log.i(LOGGER_TAG, "Deleting '${trip.fileName}' from the storage.") + Log.i(LOG_TAG, "Deleting '${trip.fileName}' from the storage.") val file = File(getTripsDirectory(getContext()!!), trip.fileName) file.delete() - Log.i(LOGGER_TAG, "Trip '${trip.fileName}' has been deleted from the storage.") + Log.i(LOG_TAG, "Trip '${trip.fileName}' has been deleted from the storage.") } override fun loadTrip(tripName: String) { - Log.i(LOGGER_TAG, "Loading '$tripName' from disk.") + Log.i(LOG_TAG, "Loading '$tripName' from disk.") if (tripName.isEmpty()) { updateCache(System.currentTimeMillis()) @@ -264,16 +274,16 @@ internal class DefaultTripManager : } } - Log.i(LOGGER_TAG, "Trip '${file.absolutePath}' was loaded from the storage.") - Log.i(LOGGER_TAG, "Trip selected PIDs ${trip.entries.keys}") - Log.i(LOGGER_TAG, "Number of entries ${trip.entries.values.size} collected within the trip") + Log.i(LOG_TAG, "Trip '${file.absolutePath}' was loaded from the storage.") + Log.i(LOG_TAG, "Trip selected PIDs ${trip.entries.keys}") + Log.i(LOG_TAG, "Number of entries ${trip.entries.values.size} collected within the trip") tripCache.updateTrip(trip) tripVirtualScreenManager.updateReservedVirtualScreen( trip.entries.keys.map { it.toString() }.toList() ) } catch (e: Throwable) { - Log.e(LOGGER_TAG, "Did not find or failed to parse trip '$tripName'.", e) + Log.e(LOG_TAG, "Did not find or failed to parse trip '$tripName'.", e) updateCache(System.currentTimeMillis()) } } @@ -287,7 +297,7 @@ internal class DefaultTripManager : private fun updateCache(newTs: Long) { val trip = Trip(startTs = newTs) tripCache.updateTrip(trip) - Log.i(LOGGER_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'") + Log.i(LOG_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'") } private fun getTripLength(trip: Trip): Long = From 8ae1fb4f89e16f559a6b10049852ba5d1af98d51 Mon Sep 17 00:00:00 2001 From: Tomek Zebrowski Date: Sun, 5 Apr 2026 15:19:05 +0200 Subject: [PATCH 3/5] feat: add support to jsonl files --- .../obd/graphs/bl/trip/DefaultTripManager.kt | 242 ++++-------------- .../org/obd/graphs/bl/trip/TripFileManager.kt | 169 ++++++++++++ .../integrations/log/TripLogTransformer.kt | 89 +++++-- .../gcp/gdrive/TripLogTransformerTest.kt | 48 ++++ 4 files changed, 346 insertions(+), 202 deletions(-) create mode 100644 datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt index aeec5c44..0b3189ad 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt @@ -18,9 +18,6 @@ package org.obd.graphs.bl.trip import android.content.Context import android.util.Log -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch import org.obd.graphs.bl.datalogger.MetricsProcessor import org.obd.graphs.bl.datalogger.scaleToRange import org.obd.graphs.getContext @@ -29,46 +26,31 @@ import org.obd.graphs.preferences.Prefs import org.obd.graphs.preferences.isEnabled import org.obd.graphs.profile.profile import org.obd.metrics.api.model.ObdMetric -import java.io.File -import java.io.FileOutputStream import java.text.SimpleDateFormat import java.util.Date import java.util.Locale -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicLong -val tripManager: TripManager = DefaultTripManager() +val tripManager: TripManager by lazy { DefaultTripManager() } -private const val LOG_TAG = "TripManager" +private const val LOGGER_TAG = "TripManager" private const val MIN_TRIP_LENGTH = 5 -private const val TRIP_DIRECTORY = "trips" private const val TRIP_FILE_PREFIX = "trip" -// Holds exactly 30 minutes of data per sensor at 10Hz private const val MAX_CACHED_METRICS_PER_SENSOR = 18000 internal class DefaultTripManager : TripManager, MetricsProcessor { - private val dateFormat: SimpleDateFormat = - SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault()) - private val tripModelSerializer = TripModelSerializer() + private val dateFormat: SimpleDateFormat = SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault()) private val tripCache = TripCache() private val tripDescParser = TripDescParser() - // Properties for streaming - private var activeFileOutputStream: FileOutputStream? = null - private var activeTripFileName: String? = null + private val repository: TripRepository by lazy { FileTripRepository(getContext()!!) } - // Thread-safe counter for total items written to disk - private val totalMetricsSaved = AtomicLong(0) + private var activeTripId: String? = null - // Single thread dispatcher for sequential, non-blocking file operations - private val fileIoDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val fileIoScope = CoroutineScope(fileIoDispatcher) - - override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}" + override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir("trips")?.absolutePath}" override fun postValue(obdMetric: ObdMetric) { try { @@ -83,17 +65,7 @@ internal class DefaultTripManager : rawAnswer = obdMetric.raw ) - fileIoScope.launch { - try { - val jsonLine = tripModelSerializer.serializer.writeValueAsString(metric) + "\n" - activeFileOutputStream?.let { - it.write(jsonLine.toByteArray()) - totalMetricsSaved.incrementAndGet() - } - } catch (e: Exception) { - Log.e(LOG_TAG, "Failed to stream line to JSONL file", e) - } - } + repository.saveMetric(metric) if (trip.entries.containsKey(key)) { val tripEntry = trip.entries[key]!! @@ -103,15 +75,11 @@ internal class DefaultTripManager : tripEntry.metrics.removeAt(0) } } else { - trip.entries[key] = - SensorData( - id = key, - metrics = mutableListOf(metric) - ) + trip.entries[key] = SensorData(id = key, metrics = mutableListOf(metric)) } } } catch (e: Throwable) { - Log.e(LOG_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e) + Log.e(LOGGER_TAG, "Failed to process metric for ${obdMetric.command.pid.pid}", e) } } @@ -121,191 +89,93 @@ internal class DefaultTripManager : } val trip = tripCache.getTrip()!! - Log.i(LOG_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") + Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") return trip } override fun startNewTrip(newTs: Long) { - Log.i(LOG_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") + Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'") updateCache(newTs) - totalMetricsSaved.set(0) - - // Generate the file name - val fileName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl" - activeTripFileName = fileName - - // Open the file stream for appending on the sequential thread - fileIoScope.launch { - try { - val file = getTripFile(getContext()!!, fileName) - activeFileOutputStream = FileOutputStream(file, true) - Log.i(LOG_TAG, "Opened stream for file: $fileName") - } catch (e: Exception) { - Log.e(LOG_TAG, "Failed to open file stream for streaming", e) - } - } + + activeTripId = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl" + repository.initStorage(activeTripId!!) } override fun saveCurrentTrip() { tripCache.getTrip { trip -> val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip") val tripLength = getTripLength(trip) - Log.i(LOG_TAG, "Stopping trip, length: ${tripLength}s") + val currentTripId = activeTripId ?: return@getTrip + + Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s") - // Capture the current file name in case another trip starts immediately - val fileNameToProcess = activeTripFileName + repository.releaseStorage(currentTripId) if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) { - // Close and rename on the sequential thread to guarantee all pending writes finish first - fileIoScope.launch { - try { - activeFileOutputStream?.flush() - activeFileOutputStream?.close() - activeFileOutputStream = null - - fileNameToProcess?.let { currentName -> - val currentFile = getTripFile(getContext()!!, currentName) - if (currentFile.exists()) { - val finalName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-${trip.startTs}-$tripLength.jsonl" - val finalFile = getTripFile(getContext()!!, finalName) - currentFile.renameTo(finalFile) - - val totalItems = totalMetricsSaved.get() - val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) - Log.i(LOG_TAG, "Trip stream closed. File: '$finalName' | Saved: $totalItems items | Size: ${String.format("%.2f", fileSizeMb)} MB") - } - } - } catch (e: java.lang.Exception) { - Log.e(LOG_TAG, "Failed to finalize streaming trip file", e) - } finally { - activeTripFileName = null - } - } + repository.updateTripMetadata(currentTripId, trip.startTs, tripLength, profile.getCurrentProfile()) } else { - Log.w(LOG_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.") - fileIoScope.launch { - try { - activeFileOutputStream?.close() - activeFileOutputStream = null - - fileNameToProcess?.let { - getTripFile(getContext()!!, it).delete() - } - } catch (e: Exception) { - Log.e(LOG_TAG, "Failed to delete short trip file", e) - } finally { - activeTripFileName = null - } - } + Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Discarding.") + repository.deleteTrip(currentTripId) } + + activeTripId = null } } override fun findAllTripsBy(filter: String): MutableCollection { - Log.i(LOG_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}") - - val files = File(getTripsDirectory(getContext()!!)).list() - if (files == null) { - Log.i(LOG_TAG, "No files were found in the trips directory.") - return mutableListOf() - } else { - val result = - files - .filter { if (filter.isNotEmpty()) it.startsWith(filter) else true } - .filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") } - .filter { it.contains("${profile.getCurrentProfile()}-") } - .filter { - try { - tripDescParser.decodeTripName(it).size > 3 - } catch (e: Throwable) { - false - } - }.mapNotNull { fileName -> - Log.d(LOG_TAG, "Found trip which fits the conditions: $fileName") - tripDescParser.getTripDesc(fileName) - }.sortedByDescending { it.startTime.toLongOrNull() } - .toMutableList() - Log.i(LOG_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}") - return result - } + return repository.findAllTripsBy(filter, profile.getCurrentProfile()) } override fun deleteTrip(trip: TripFileDesc) { - Log.i(LOG_TAG, "Deleting '${trip.fileName}' from the storage.") - val file = File(getTripsDirectory(getContext()!!), trip.fileName) - file.delete() - Log.i(LOG_TAG, "Trip '${trip.fileName}' has been deleted from the storage.") + repository.deleteTrip(trip.fileName) } - override fun loadTrip(tripName: String) { - Log.i(LOG_TAG, "Loading '$tripName' from disk.") + override fun loadTrip(tripId: String) { + Log.i(LOGGER_TAG, "Loading trip ID: '$tripId'") - if (tripName.isEmpty()) { + if (tripId.isEmpty()) { updateCache(System.currentTimeMillis()) - } else { - val file = File(getTripsDirectory(getContext()!!), tripName) - try { - val parts = tripDescParser.decodeTripName(tripName) - val startTs = parts[2].toLongOrNull() ?: System.currentTimeMillis() - val trip = Trip(startTs = startTs) - - // Read line-by-line (JSONL) instead of as one massive object - file.forEachLine { line -> - if (line.isNotBlank()) { - val metric = tripModelSerializer.deserializer.readValue(line, Metric::class.java) - val key = metric.entry.data - - if (!trip.entries.containsKey(key)) { - trip.entries[key] = SensorData(id = key) - } - - val sensorData = trip.entries[key]!! - sensorData.metrics.add(metric) - } - } + return + } - // Calculate historical min/max/mean from loaded values - trip.entries.values.forEach { sensorData -> - val values = sensorData.metrics.mapNotNull { it.entry.y.toString().toFloatOrNull() } - if (values.isNotEmpty()) { - sensorData.min = values.minOrNull() ?: 0f - sensorData.max = values.maxOrNull() ?: 0f - sensorData.mean = values.average() - } + try { + val parts = tripDescParser.decodeTripName(tripId) + val startTs = parts.getOrNull(2)?.toLongOrNull() ?: System.currentTimeMillis() + val trip = Trip(startTs = startTs) + + repository.loadTrip(tripId) { metric -> + val key = metric.entry.data + if (!trip.entries.containsKey(key)) { + trip.entries[key] = SensorData(id = key) } + trip.entries[key]!!.metrics.add(metric) + } - Log.i(LOG_TAG, "Trip '${file.absolutePath}' was loaded from the storage.") - Log.i(LOG_TAG, "Trip selected PIDs ${trip.entries.keys}") - Log.i(LOG_TAG, "Number of entries ${trip.entries.values.size} collected within the trip") - - tripCache.updateTrip(trip) - tripVirtualScreenManager.updateReservedVirtualScreen( - trip.entries.keys.map { it.toString() }.toList() - ) - } catch (e: Throwable) { - Log.e(LOG_TAG, "Did not find or failed to parse trip '$tripName'.", e) - updateCache(System.currentTimeMillis()) + trip.entries.values.forEach { sensorData -> + val values = sensorData.metrics.mapNotNull { it.entry.y.toString().toFloatOrNull() } + if (values.isNotEmpty()) { + sensorData.min = values.minOrNull() ?: 0f + sensorData.max = values.maxOrNull() ?: 0f + sensorData.mean = values.average() + } } + + Log.i(LOGGER_TAG, "Trip loaded successfully. PIDs: ${trip.entries.keys}") + tripCache.updateTrip(trip) + tripVirtualScreenManager.updateReservedVirtualScreen(trip.entries.keys.map { it.toString() }) + } catch (e: Throwable) { + Log.e(LOGGER_TAG, "Failed to load trip '$tripId'.", e) + updateCache(System.currentTimeMillis()) } } - private fun getTripFile( - context: Context, - fileName: String - ): File = File(getTripsDirectory(context), fileName) - private fun updateCache(newTs: Long) { val trip = Trip(startTs = newTs) tripCache.updateTrip(trip) - Log.i(LOG_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'") } private fun getTripLength(trip: Trip): Long = - if (trip.startTs == 0L) { - 0 - } else { - (Date().time - trip.startTs) / 1000 - } + if (trip.startTs == 0L) 0 else (Date().time - trip.startTs) / 1000 private fun formatTimestamp(ts: Long) = dateFormat.format(Date(ts)) } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt new file mode 100644 index 00000000..b543584f --- /dev/null +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt @@ -0,0 +1,169 @@ +/* + * Copyright 2019-2026, Tomasz Żebrowski + * + *

Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.obd.graphs.bl.trip + +import android.content.Context +import android.util.Log +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import java.io.File +import java.io.FileOutputStream +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicLong + +private const val LOGGER_TAG = "TripRepository" +private const val TRIP_DIRECTORY = "trips" +private const val TRIP_FILE_PREFIX = "trip" + +interface TripRepository { + fun initStorage(tripId: String) + fun releaseStorage(tripId: String) + + fun saveMetric(metric: Metric) + fun updateTripMetadata(tripId: String, tripStartTs: Long, tripLength: Long, profile: String) + fun deleteTrip(tripId: String) + + fun findAllTripsBy(filter: String, profile: String): MutableCollection + fun loadTrip(tripId: String, onMetricLoaded: (Metric) -> Unit) +} + +internal class FileTripRepository( + private val context: Context, + private val parser: TripDescParser = TripDescParser(), + private val serializer: TripModelSerializer = TripModelSerializer() +) : TripRepository { + + private var activeFileOutputStream: FileOutputStream? = null + private var activeTripId: String? = null + private val totalMetricsSaved = AtomicLong(0) + + // Single thread dispatcher guarantees sequential disk I/O + private val ioDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val ioScope = CoroutineScope(ioDispatcher) + + private fun getTripsDirectory() = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}" + private fun getTripFile(fileName: String): File = File(getTripsDirectory(), fileName) + + override fun initStorage(tripId: String) { + activeTripId = tripId + totalMetricsSaved.set(0) + + ioScope.launch { + try { + val file = getTripFile(activeTripId!!) + activeFileOutputStream = FileOutputStream(file, true) + Log.i(LOGGER_TAG, "Started saving trip to: $activeTripId") + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to initialize storage for trip", e) + } + } + } + + override fun releaseStorage(tripId: String) { + ioScope.launch { + try { + activeFileOutputStream?.flush() + activeFileOutputStream?.close() + activeFileOutputStream = null + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to release storage", e) + } + } + } + + override fun saveMetric(metric: Metric) { + ioScope.launch { + try { + activeFileOutputStream?.let { + val jsonLine = serializer.serializer.writeValueAsString(metric) + "\n" + it.write(jsonLine.toByteArray()) + totalMetricsSaved.incrementAndGet() + } + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to save metric", e) + } + } + } + + override fun updateTripMetadata(tripId: String, tripStartTs: Long, tripLength: Long, profile: String) { + ioScope.launch { + try { + val currentFile = getTripFile(tripId) + if (currentFile.exists()) { + val finalName = "$TRIP_FILE_PREFIX-$profile-$tripStartTs-$tripLength.jsonl" + val finalFile = getTripFile(finalName) + currentFile.renameTo(finalFile) + + val totalItems = totalMetricsSaved.get() + val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) + Log.i(LOGGER_TAG, "Trip finished. ID: '$finalName' | Saved: $totalItems metrics | Size: ${String.format("%.2f", fileSizeMb)} MB") + } + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to update trip metadata", e) + } finally { + activeTripId = null + } + } + } + + override fun deleteTrip(tripId: String) { + ioScope.launch { + try { + getTripFile(tripId).delete() + Log.i(LOGGER_TAG, "Deleted trip data: $tripId") + } catch (e: Exception) { + Log.e(LOGGER_TAG, "Failed to delete trip data", e) + } finally { + if (activeTripId == tripId) activeTripId = null + } + } + } + + override fun findAllTripsBy(filter: String, profile: String): MutableCollection { + val files = File(getTripsDirectory()).list() ?: return mutableListOf() + + return files + .filter { if (filter.isNotEmpty()) it.startsWith(filter) else true } + .filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") } + .filter { it.contains("$profile-") } + .filter { + try { + parser.decodeTripName(it).size > 3 + } catch (e: Throwable) { + false + } + }.mapNotNull { fileName -> + parser.getTripDesc(fileName) + }.sortedByDescending { it.startTime.toLongOrNull() } + .toMutableList() + } + + override fun loadTrip(tripId: String, onMetricLoaded: (Metric) -> Unit) { + val file = getTripFile(tripId) + if (file.exists()) { + file.forEachLine { line -> + if (line.isNotBlank()) { + val metric = serializer.deserializer.readValue(line, Metric::class.java) + onMetricLoaded(metric) + } + } + } else { + throw IllegalArgumentException("Trip data not found for ID: $tripId") + } + } +} diff --git a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt index 56c0e7a4..bdf5811f 100644 --- a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt +++ b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt @@ -20,6 +20,7 @@ import android.util.Log import com.google.gson.stream.JsonReader import com.google.gson.stream.JsonToken import com.google.gson.stream.JsonWriter +import java.io.EOFException import java.io.File import java.io.InputStreamReader import java.io.StringReader @@ -42,6 +43,8 @@ object TripLog { } } +private const val LOG_TAG = "DefaultJSONOutput" + private class DefaultJSONOutput( private val signalMapper: Map = mapOf(), private val valueMapper: (signal: Int, value: Any) -> Any @@ -54,6 +57,10 @@ private class DefaultJSONOutput( override fun transform(file: File, metadata: Map): File = file.inputStream().use { input -> + if (Log.isLoggable(LOG_TAG,Log.DEBUG)) { + Log.d(LOG_TAG, "Received file for transformation name=${file.name}, length=${file.length()}, metadata=$metadata") + } + process(JsonReader(InputStreamReader(input)), metadata) } @@ -61,7 +68,7 @@ private class DefaultJSONOutput( process(JsonReader(StringReader(log)), metadata) private fun process(reader: JsonReader, metadata: Map): File { - Log.d("DefaultJSONOutput", "Received $metadata") + val tempFile = File.createTempFile("json_buffer_", ".tmp").apply { deleteOnExit() @@ -71,7 +78,67 @@ private class DefaultJSONOutput( try { reader.isLenient = true - parseRootToMemory(reader, seriesMap) + + // Hybrid parsing loop supports both Legacy JSON and New JSONL + try { + while (reader.peek() != JsonToken.END_DOCUMENT) { + if (reader.peek() == JsonToken.BEGIN_OBJECT) { + reader.beginObject() + + var ts: Long = 0 + var signal = 0 + var value: Any = 0.0 + var isFlatMetric = false + + while (reader.hasNext()) { + when (reader.nextName()) { + // LEGACY FORMAT ROUTES + "startTs" -> reader.skipValue() + "entries" -> parseEntriesToMemory(reader, seriesMap) + + // NEW JSONL FORMAT ROUTES + "ts" -> { + isFlatMetric = true + ts = reader.nextLong() + } + "entry" -> { + isFlatMetric = true + reader.beginObject() + while (reader.hasNext()) { + when (reader.nextName()) { + "data" -> signal = reader.nextInt() + "y" -> { + value = if (reader.peek() == JsonToken.BEGIN_OBJECT) { + reader.readMap() + } else { + reader.nextDouble() + } + } + else -> reader.skipValue() + } + } + reader.endObject() + } + else -> reader.skipValue() + } + } + reader.endObject() + + if (isFlatMetric) { + val signalKey = signal.toString() + val mappedResult = valueMapper(signal, value) + + val series = seriesMap.getOrPut(signalKey) { SeriesData() } + series.timestamps.add(ts) + series.values.add(mappedResult) + } + } else { + reader.skipValue() + } + } + } catch (e: EOFException) { + // Safely reached the end of the stream + } tempFile.outputStream().bufferedWriter().use { fileWriter -> JsonWriter(fileWriter).use { writer -> @@ -126,6 +193,7 @@ private class DefaultJSONOutput( } return tempFile } catch (e: Exception) { + e.printStackTrace() tempFile.delete() throw e } finally { @@ -136,20 +204,9 @@ private class DefaultJSONOutput( } } - private fun parseRootToMemory( - reader: JsonReader, - seriesMap: MutableMap - ) { - reader.beginObject() - while (reader.hasNext()) { - if (reader.nextName() == "entries") { - parseEntriesToMemory(reader, seriesMap) - } else { - reader.skipValue() - } - } - reader.endObject() - } + // ========================================== + // LEGACY FORMAT PARSING HELPERS + // ========================================== private fun parseEntriesToMemory( reader: JsonReader, diff --git a/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt b/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt index f1a4b72b..6ca21b0d 100644 --- a/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt +++ b/integrations/src/test/java/org/obd/graphs/integrations/gcp/gdrive/TripLogTransformerTest.kt @@ -279,4 +279,52 @@ class TripLogTransformerTest { val expected = """{"signal_dictionary":{},"series":{}}""" assertEquals(expected, result) } + + @Test + fun `optimize should convert jsonl flat format to optimized columnar format`() { + val rawJsonl = """ + {"ts": 1000, "entry": {"x": 100.0, "y": 50.5, "data": 12}, "rawAnswer": "ignore me"} + {"ts": 2000, "entry": {"x": 101.0, "y": 60.5, "data": 12}, "rawAnswer": ""} + """.trimIndent() + + val transformer: TripLogTransformer = TripLog.transformer { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = + """{"signal_dictionary":{"12":"12"},"series":{"12":{"t":[1000,2000],"v":[50.5,60.5]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } + + @Test + fun `jsonl parsing should handle mixed signals`() { + val rawJsonl = """ + {"ts": 1000, "entry": {"data": 12, "y": 50.5}} + {"ts": 1500, "entry": {"data": 13, "y": 60.5}} + {"ts": 2000, "entry": {"data": 12, "y": 70.5}} + """.trimIndent() + + val signalMapper = mapOf(12 to "Boost", 13 to "RPM") + val transformer: TripLogTransformer = TripLog.transformer(signalMapper = signalMapper) { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = + """{"signal_dictionary":{"12":"Boost","13":"RPM"},"series":{"12":{"t":[1000,2000],"v":[50.5,70.5]},"13":{"t":[1500],"v":[60.5]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } + + @Test + fun `jsonl format should support map type in value field`() { + val rawJsonl = """ + {"ts": 1500, "entry": {"data": 99, "y": {"GPS altitude": 57.10662841796875, "GPS Location": { "altitude": 57.10662841796875, "accuracy": 46.843723, "latitude": 54.16406183, "longitude": 16.29066863}}}, "rawAnswer": "raw"} + """.trimIndent() + + val transformer: TripLogTransformer = TripLog.transformer { s, v -> v } + val result = transformer.transform(rawJsonl).readText() + + val expectedJson = """{"signal_dictionary":{"99":"99"},"series":{"99":{"t":[1500],"v":[{"GPS altitude":57.10662841796875,"GPS Location":{"altitude":57.10662841796875,"accuracy":46.843723,"latitude":54.16406183,"longitude":16.29066863}}]}}}""" + + Assertions.assertThat(result).isEqualTo(expectedJson) + } } From 243d59355f327f9740e2278f4a615763aa5ea04d Mon Sep 17 00:00:00 2001 From: Tomek Zebrowski Date: Sun, 5 Apr 2026 15:51:16 +0200 Subject: [PATCH 4/5] feat: naming refactoring --- .../obd/graphs/bl/trip/{TripFileManager.kt => TripRepository.kt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename datalogger/src/main/java/org/obd/graphs/bl/trip/{TripFileManager.kt => TripRepository.kt} (100%) diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt similarity index 100% rename from datalogger/src/main/java/org/obd/graphs/bl/trip/TripFileManager.kt rename to datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt From c2a385c4408659e8b33f28378224a347a94b11c4 Mon Sep 17 00:00:00 2001 From: Tomek Zebrowski Date: Sun, 5 Apr 2026 17:31:10 +0200 Subject: [PATCH 5/5] feat: minor improvements. change SensorData. metrics to ConcurrentLinkedDeque --- .../org/obd/graphs/ui/graph/GraphFragment.kt | 3 ++- .../obd/graphs/bl/trip/DefaultTripManager.kt | 24 +++++++++---------- .../java/org/obd/graphs/bl/trip/TripModel.kt | 4 ++-- .../org/obd/graphs/bl/trip/TripRepository.kt | 14 +++++++---- .../integrations/log/TripLogTransformer.kt | 7 +----- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt b/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt index 60fa3bea..58270e95 100644 --- a/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt +++ b/app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt @@ -73,6 +73,7 @@ import org.obd.metrics.pid.PidDefinitionRegistry import java.text.SimpleDateFormat import java.util.Date import java.util.Locale +import java.util.concurrent.ConcurrentLinkedDeque private const val LOG_TAG = "Graph" @@ -260,7 +261,7 @@ class GraphFragment : Fragment() { val sensorData = SensorData( id = it.command.pid.id, - metrics = mutableListOf(), + metrics = ConcurrentLinkedDeque(), min = hist.min, max = hist.max, mean = hist.mean diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt index 0b3189ad..4bed473b 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt @@ -67,15 +67,14 @@ internal class DefaultTripManager : repository.saveMetric(metric) - if (trip.entries.containsKey(key)) { - val tripEntry = trip.entries[key]!! - tripEntry.metrics.add(metric) - - if (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) { - tripEntry.metrics.removeAt(0) - } - } else { - trip.entries[key] = SensorData(id = key, metrics = mutableListOf(metric)) + val tripEntry = trip.entries.getOrPut(key) { + SensorData(id = key) + } + + tripEntry.metrics.add(metric) + + while (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) { + tripEntry.metrics.removeFirst() } } } catch (e: Throwable) { @@ -84,11 +83,12 @@ internal class DefaultTripManager : } override fun getCurrentTrip(): Trip { - if (null == tripCache.getTrip()) { - startNewTrip(System.currentTimeMillis()) + val trip = tripCache.getTrip() ?: run { + val newTs = System.currentTimeMillis() + startNewTrip(newTs) + tripCache.getTrip() ?: Trip(startTs = newTs) } - val trip = tripCache.getTrip()!! Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'") return trip } diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt index 5fbfed44..6cb77e6f 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripModel.kt @@ -18,8 +18,8 @@ package org.obd.graphs.bl.trip import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.obd.metrics.transport.message.ConnectorResponse -import java.util.Collections import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedDeque data class TripFileDesc( val fileName: String, @@ -46,7 +46,7 @@ data class Metric( @JsonIgnoreProperties(ignoreUnknown = true) data class SensorData( val id: Long, - val metrics: MutableList = Collections.synchronizedList(ArrayList()), + val metrics: ConcurrentLinkedDeque = ConcurrentLinkedDeque(), var min: Number = 0, var max: Number = 0, var mean: Number = 0 diff --git a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt index b543584f..22cc236e 100644 --- a/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt +++ b/datalogger/src/main/java/org/obd/graphs/bl/trip/TripRepository.kt @@ -52,7 +52,6 @@ internal class FileTripRepository( private var activeTripId: String? = null private val totalMetricsSaved = AtomicLong(0) - // Single thread dispatcher guarantees sequential disk I/O private val ioDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val ioScope = CoroutineScope(ioDispatcher) @@ -107,11 +106,16 @@ internal class FileTripRepository( if (currentFile.exists()) { val finalName = "$TRIP_FILE_PREFIX-$profile-$tripStartTs-$tripLength.jsonl" val finalFile = getTripFile(finalName) - currentFile.renameTo(finalFile) - val totalItems = totalMetricsSaved.get() - val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) - Log.i(LOGGER_TAG, "Trip finished. ID: '$finalName' | Saved: $totalItems metrics | Size: ${String.format("%.2f", fileSizeMb)} MB") + val isRenamed = currentFile.renameTo(finalFile) + + if (isRenamed) { + val totalItems = totalMetricsSaved.get() + val fileSizeMb = finalFile.length() / (1024.0 * 1024.0) + Log.i(LOGGER_TAG, "Trip finished. ID: '$finalName' | Saved: $totalItems metrics | Size: ${String.format("%.2f", fileSizeMb)} MB") + } else { + Log.e(LOGGER_TAG, "Failed to rename temporary trip file '$tripId' to '$finalName'.") + } } } catch (e: Exception) { Log.e(LOGGER_TAG, "Failed to update trip metadata", e) diff --git a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt index bdf5811f..cc0ab58b 100644 --- a/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt +++ b/integrations/src/main/java/org/obd/graphs/integrations/log/TripLogTransformer.kt @@ -57,7 +57,7 @@ private class DefaultJSONOutput( override fun transform(file: File, metadata: Map): File = file.inputStream().use { input -> - if (Log.isLoggable(LOG_TAG,Log.DEBUG)) { + if (Log.isLoggable(LOG_TAG, Log.DEBUG)) { Log.d(LOG_TAG, "Received file for transformation name=${file.name}, length=${file.length()}, metadata=$metadata") } @@ -68,7 +68,6 @@ private class DefaultJSONOutput( process(JsonReader(StringReader(log)), metadata) private fun process(reader: JsonReader, metadata: Map): File { - val tempFile = File.createTempFile("json_buffer_", ".tmp").apply { deleteOnExit() @@ -204,10 +203,6 @@ private class DefaultJSONOutput( } } - // ========================================== - // LEGACY FORMAT PARSING HELPERS - // ========================================== - private fun parseEntriesToMemory( reader: JsonReader, seriesMap: MutableMap