diff --git a/schema/building.fbs b/schema/building.fbs new file mode 100644 index 0000000..05146e6 --- /dev/null +++ b/schema/building.fbs @@ -0,0 +1,17 @@ +namespace com.dedicatedcode.paikka.flatbuffers; + +table Geometry { + data:[ubyte]; // WKB binary +} +table Building { + id: long; + name: string; // building type (residential, commercial, etc.) + code: string; // always "building" + geometry: Geometry; +} + +table BuildingList { + buildings: [Building]; +} + +root_type BuildingList; \ No newline at end of file diff --git a/scripts/filter_osm.sh b/scripts/filter_osm.sh index 492e638..b876351 100755 --- a/scripts/filter_osm.sh +++ b/scripts/filter_osm.sh @@ -54,8 +54,8 @@ echo "Input file: $INPUT_FILE" echo "Output file: $OUTPUT_FILE" echo "" osmium tags-filter "$INPUT_FILE" \ - nwr/amenity!=bench,drinking_water,waste_basket,bicycle_parking,vending_machine,parking_entrance,fire_hydrant,recycling \ - nwr/emergency!=fire_hydrant,defibrillator \ + nwr/amenity!=bench,drinking_water,waste_basket,bicycle_parking,vending_machine,parking_entrance,fire_hydrant,recycling,post_box,atm,loading_ramp,parcel_locker,trolley_bay \ + nwr/emergency!=fire_hydrant,defibrillator,fire_service_inlet \ nw/shop \ nw/tourism \ nw/leisure \ @@ -70,6 +70,7 @@ osmium tags-filter "$INPUT_FILE" \ w/building=yes,commercial,retail,industrial,office,apartments,residential \ r/boundary=administrative \ r/type=multipolygon \ + n/addr:* \ -o "$OUTPUT_FILE" --overwrite if [ $? -eq 0 ]; then diff --git a/src/main/java/com/dedicatedcode/paikka/controller/AdminController.java b/src/main/java/com/dedicatedcode/paikka/controller/AdminController.java index 5d8bf47..6146856 100644 --- a/src/main/java/com/dedicatedcode/paikka/controller/AdminController.java +++ b/src/main/java/com/dedicatedcode/paikka/controller/AdminController.java @@ -19,6 +19,7 @@ import com.dedicatedcode.paikka.service.ReverseGeocodingService; import com.dedicatedcode.paikka.service.BoundaryService; import com.dedicatedcode.paikka.service.MetadataService; +import com.dedicatedcode.paikka.service.BuildingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -44,11 +45,16 @@ public class AdminController { private final ReverseGeocodingService reverseGeocodingService; private final BoundaryService boundaryService; private final MetadataService metadataService; + private final BuildingService buildingService; - public AdminController(ReverseGeocodingService reverseGeocodingService, BoundaryService boundaryService, MetadataService metadataService) { + public AdminController(ReverseGeocodingService reverseGeocodingService, + BoundaryService boundaryService, + MetadataService metadataService, + BuildingService buildingService) { this.reverseGeocodingService = reverseGeocodingService; this.boundaryService = boundaryService; this.metadataService = metadataService; + this.buildingService = buildingService; } @PostMapping(value = "/refresh-db", produces = "application/json") @@ -64,6 +70,10 @@ public ResponseEntity refreshDatabase() { logger.info("Reloading boundaries database..."); boundaryService.reloadDatabase(); + // Reload the building service (buildings database) + logger.info("Reloading buildings database..."); + buildingService.reloadDatabase(); + // Reload metadata logger.info("Reloading metadata..."); metadataService.reload(); diff --git a/src/main/java/com/dedicatedcode/paikka/flatbuffers/Building.java b/src/main/java/com/dedicatedcode/paikka/flatbuffers/Building.java new file mode 100644 index 0000000..601913e --- /dev/null +++ b/src/main/java/com/dedicatedcode/paikka/flatbuffers/Building.java @@ -0,0 +1,58 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package com.dedicatedcode.paikka.flatbuffers; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class Building extends Table { + public static void ValidateVersion() { Constants.FLATBUFFERS_25_2_10(); } + public static Building getRootAsBuilding(ByteBuffer _bb) { return getRootAsBuilding(_bb, new Building()); } + public static Building getRootAsBuilding(ByteBuffer _bb, Building obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { __reset(_i, _bb); } + public Building __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public long id() { int o = __offset(4); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } + public String name() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer nameAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer nameInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + public String code() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer codeAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer codeInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + public com.dedicatedcode.paikka.flatbuffers.Geometry geometry() { return geometry(new com.dedicatedcode.paikka.flatbuffers.Geometry()); } + public com.dedicatedcode.paikka.flatbuffers.Geometry geometry(com.dedicatedcode.paikka.flatbuffers.Geometry obj) { int o = __offset(10); return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; } + + public static int createBuilding(FlatBufferBuilder builder, + long id, + int nameOffset, + int codeOffset, + int geometryOffset) { + builder.startTable(4); + Building.addId(builder, id); + Building.addGeometry(builder, geometryOffset); + Building.addCode(builder, codeOffset); + Building.addName(builder, nameOffset); + return Building.endBuilding(builder); + } + + public static void startBuilding(FlatBufferBuilder builder) { builder.startTable(4); } + public static void addId(FlatBufferBuilder builder, long id) { builder.addLong(0, id, 0L); } + public static void addName(FlatBufferBuilder builder, int nameOffset) { builder.addOffset(1, nameOffset, 0); } + public static void addCode(FlatBufferBuilder builder, int codeOffset) { builder.addOffset(2, codeOffset, 0); } + public static void addGeometry(FlatBufferBuilder builder, int geometryOffset) { builder.addOffset(3, geometryOffset, 0); } + public static int endBuilding(FlatBufferBuilder builder) { + int o = builder.endTable(); + return o; + } + + public static final class Vector extends BaseVector { + public Vector __assign(int _vector, int _element_size, ByteBuffer _bb) { __reset(_vector, _element_size, _bb); return this; } + + public Building get(int j) { return get(new Building(), j); } + public Building get(Building obj, int j) { return obj.__assign(__indirect(__element(j), bb), bb); } + } +} + diff --git a/src/main/java/com/dedicatedcode/paikka/flatbuffers/BuildingList.java b/src/main/java/com/dedicatedcode/paikka/flatbuffers/BuildingList.java new file mode 100644 index 0000000..edf97f1 --- /dev/null +++ b/src/main/java/com/dedicatedcode/paikka/flatbuffers/BuildingList.java @@ -0,0 +1,49 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package com.dedicatedcode.paikka.flatbuffers; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class BuildingList extends Table { + public static void ValidateVersion() { Constants.FLATBUFFERS_25_2_10(); } + public static BuildingList getRootAsBuildingList(ByteBuffer _bb) { return getRootAsBuildingList(_bb, new BuildingList()); } + public static BuildingList getRootAsBuildingList(ByteBuffer _bb, BuildingList obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { __reset(_i, _bb); } + public BuildingList __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public com.dedicatedcode.paikka.flatbuffers.Building buildings(int j) { return buildings(new com.dedicatedcode.paikka.flatbuffers.Building(), j); } + public com.dedicatedcode.paikka.flatbuffers.Building buildings(com.dedicatedcode.paikka.flatbuffers.Building obj, int j) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int buildingsLength() { int o = __offset(4); return o != 0 ? __vector_len(o) : 0; } + public com.dedicatedcode.paikka.flatbuffers.Building.Vector buildingsVector() { return buildingsVector(new com.dedicatedcode.paikka.flatbuffers.Building.Vector()); } + public com.dedicatedcode.paikka.flatbuffers.Building.Vector buildingsVector(com.dedicatedcode.paikka.flatbuffers.Building.Vector obj) { int o = __offset(4); return o != 0 ? obj.__assign(__vector(o), 4, bb) : null; } + + public static int createBuildingList(FlatBufferBuilder builder, + int buildingsOffset) { + builder.startTable(1); + BuildingList.addBuildings(builder, buildingsOffset); + return BuildingList.endBuildingList(builder); + } + + public static void startBuildingList(FlatBufferBuilder builder) { builder.startTable(1); } + public static void addBuildings(FlatBufferBuilder builder, int buildingsOffset) { builder.addOffset(0, buildingsOffset, 0); } + public static int createBuildingsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startBuildingsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static int endBuildingList(FlatBufferBuilder builder) { + int o = builder.endTable(); + return o; + } + public static void finishBuildingListBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); } + public static void finishSizePrefixedBuildingListBuffer(FlatBufferBuilder builder, int offset) { builder.finishSizePrefixed(offset); } + + public static final class Vector extends BaseVector { + public Vector __assign(int _vector, int _element_size, ByteBuffer _bb) { __reset(_vector, _element_size, _bb); return this; } + + public BuildingList get(int j) { return get(new BuildingList(), j); } + public BuildingList get(BuildingList obj, int j) { return obj.__assign(__indirect(__element(j), bb), bb); } + } +} + diff --git a/src/main/java/com/dedicatedcode/paikka/service/BuildingService.java b/src/main/java/com/dedicatedcode/paikka/service/BuildingService.java new file mode 100644 index 0000000..82911c9 --- /dev/null +++ b/src/main/java/com/dedicatedcode/paikka/service/BuildingService.java @@ -0,0 +1,276 @@ +/* + * This file is part of paikka. + * + * Paikka is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 or + * any later version. + * + * Paikka is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Affero General Public License for more details. + * You should have received a copy of the GNU Affero General Public License + * along with Paikka. If not, see . + */ + +package com.dedicatedcode.paikka.service; + +import com.dedicatedcode.paikka.config.PaikkaConfiguration; +import com.dedicatedcode.paikka.flatbuffers.Geometry; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.io.WKBReader; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +/** + * Service for querying building information from the buildings database. + */ +@Service +@ConditionalOnProperty(name = "paikka.import-mode", havingValue = "false", matchIfMissing = true) +public class BuildingService { + + private static final Logger logger = LoggerFactory.getLogger(BuildingService.class); + private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + + private final PaikkaConfiguration config; + private final S2Helper s2Helper; + private RocksDB buildingsDb; + + public BuildingService(PaikkaConfiguration config, S2Helper s2Helper) { + this.config = config; + this.s2Helper = s2Helper; + initializeRocksDB(); + } + + private void initializeRocksDB() { + if (buildingsDb != null) { + return; + } + + try { + RocksDB.loadLibrary(); + Path buildingsDbPath = Paths.get(config.getDataDir(), "buildings_shards"); + + // Check if the database directory exists + if (!buildingsDbPath.toFile().exists()) { + logger.warn("Buildings database not found at: {}", buildingsDbPath); + return; + } + + Options options = new Options().setCreateIfMissing(false); + this.buildingsDb = RocksDB.openReadOnly(options, buildingsDbPath.toString()); + logger.info("Successfully initialized RocksDB for buildings"); + } catch (Exception e) { + logger.warn("Failed to initialize RocksDB for buildings: {}", e.getMessage()); + this.buildingsDb = null; + } + } + + public synchronized void reloadDatabase() { + logger.info("Reloading buildings database..."); + + if (buildingsDb != null) { + try { + buildingsDb.close(); + logger.info("Closed existing buildings database connection"); + } catch (Exception e) { + logger.warn("Error closing existing buildings database: {}", e.getMessage()); + } + buildingsDb = null; + } + + initializeRocksDB(); + + if (buildingsDb != null) { + logger.info("Buildings database reloaded successfully"); + } else { + logger.warn("Buildings database reload completed but database is not available"); + } + } + + /** + * Get building information for a point (lat, lon) by checking if it is contained in a building. + * This searches in the given shard and surrounding shards if needed. + */ + public BuildingInfo getBuildingInfo(double lat, double lon) { + if (buildingsDb == null) { + logger.debug("Buildings database not initialized"); + return null; + } + + try { + Point point = GEOMETRY_FACTORY.createPoint(new Coordinate(lon, lat)); + long centerShardId = s2Helper.getShardId(lat, lon); + List shardsToSearch = new ArrayList<>(); + shardsToSearch.add(centerShardId); + shardsToSearch.addAll(s2Helper.getNeighborShards(centerShardId)); + + for (Long shardId : shardsToSearch) { + byte[] key = s2Helper.longToByteArray(shardId); + byte[] data = buildingsDb.get(key); + + if (data != null) { + ByteBuffer buffer = ByteBuffer.wrap(data); + com.dedicatedcode.paikka.flatbuffers.BuildingList buildingList = com.dedicatedcode.paikka.flatbuffers.BuildingList.getRootAsBuildingList(buffer); + for (int i = 0; i < buildingList.buildingsLength(); i++) { + com.dedicatedcode.paikka.flatbuffers.Building building = buildingList.buildings(i); + if (building != null && building.geometry() != null) { + Geometry geometryFb = building.geometry(); + if (geometryFb.dataLength() > 0) { + byte[] wkbData = new byte[geometryFb.dataLength()]; + for (int j = 0; j < geometryFb.dataLength(); j++) { + wkbData[j] = (byte) geometryFb.data(j); + } + try { + WKBReader wkbReader = new WKBReader(); + org.locationtech.jts.geom.Geometry jtsGeometry = wkbReader.read(wkbData); + if (jtsGeometry.intersects(point)) { + return decodeBuildingInfo(building); + } + } catch (Exception e) { + logger.warn("Failed to read geometry for building {}", building.id(), e); + } + } + } + } + } + } + + logger.debug("Building containing point ({}, {}) not found in buildings database", lon, lat); + return null; + + } catch (RocksDBException e) { + logger.error("RocksDB error while querying building info for point ({}, {})", lon, lat, e); + return null; + } catch (Exception e) { + logger.error("Error while querying building info for point ({}, {})", lon, lat, e); + return null; + } + } + + private BuildingInfo decodeBuildingInfo(com.dedicatedcode.paikka.flatbuffers.Building building) { + try { + BuildingInfo info = new BuildingInfo(); + info.setOsmId(building.id()); + info.setLevel(100); + info.setName(building.name()); + info.setCode(building.code()); + info.setType(building.name()); + + // Decode geometry if present + if (building.geometry() != null) { + Geometry geometry = building.geometry(); + if (geometry.dataLength() > 0) { + byte[] wkbData = new byte[geometry.dataLength()]; + for (int i = 0; i < geometry.dataLength(); i++) { + wkbData[i] = (byte) geometry.data(i); + } + + try { + WKBReader wkbReader = new WKBReader(); + org.locationtech.jts.geom.Geometry jtsGeometry = wkbReader.read(wkbData); + info.setGeometry(jtsGeometry); + info.setBoundaryWkb(wkbData); + } catch (Exception e) { + logger.warn("Failed to decode geometry for building OSM ID {}: {}", building.id(), e.getMessage()); + } + } + } + + return info; + } catch (Exception e) { + logger.error("Failed to decode building info for OSM ID {}", building.id(), e); + return null; + } + } + + /** + * Check if the buildings database is available. + */ + public boolean isAvailable() { + return buildingsDb != null; + } + + /** + * Represents building information retrieved from the buildings database. + */ + public static class BuildingInfo { + private long osmId; + private int level; + private String name; + private String code; + private String type; + private org.locationtech.jts.geom.Geometry geometry; + private byte[] boundaryWkb; + + public long getOsmId() { + return osmId; + } + + public void setOsmId(long osmId) { + this.osmId = osmId; + } + + public int getLevel() { + return level; + } + + public void setLevel(int level) { + this.level = level; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public org.locationtech.jts.geom.Geometry getGeometry() { + return geometry; + } + + public void setGeometry(org.locationtech.jts.geom.Geometry geometry) { + this.geometry = geometry; + } + + public byte[] getBoundaryWkb() { + return boundaryWkb; + } + + public void setBoundaryWkb(byte[] boundaryWkb) { + this.boundaryWkb = boundaryWkb; + } + } +} diff --git a/src/main/java/com/dedicatedcode/paikka/service/ReverseGeocodingService.java b/src/main/java/com/dedicatedcode/paikka/service/ReverseGeocodingService.java index 38623e3..955c90b 100644 --- a/src/main/java/com/dedicatedcode/paikka/service/ReverseGeocodingService.java +++ b/src/main/java/com/dedicatedcode/paikka/service/ReverseGeocodingService.java @@ -50,11 +50,13 @@ public class ReverseGeocodingService { private final PaikkaConfiguration config; private final S2Helper s2Helper; + private final BuildingService buildingService; private RocksDB shardsDb; - public ReverseGeocodingService(PaikkaConfiguration config, S2Helper s2Helper) { + public ReverseGeocodingService(PaikkaConfiguration config, S2Helper s2Helper, BuildingService buildingService) { this.config = config; this.s2Helper = s2Helper; + this.buildingService = buildingService; initializeRocksDB(); } @@ -176,10 +178,15 @@ public List findNearbyPOIs(double lat, double lon, String lang, int // Find the closest POIs up to the limit List closestPOIs = findClosestPOIs(allPOIs, lat, lon, limit); - // Convert POIs to response format + // Convert POIs to response format and enhance with building info if needed List results = new ArrayList<>(); for (POIData poi : closestPOIs) { - results.add(convertPOIToResponse(poi, lat, lon, lang)); + POIResponse response = convertPOIToResponse(poi, lat, lon, lang); + + // Enhance with building information if this is a building or related to a building + enhanceWithBuildingInfo(response, poi); + + results.add(response); } logger.debug("Final result: {} POIs found from {} searched shards", results.size(), searchedShards.size()); @@ -417,6 +424,35 @@ private POIResponse convertPOIToResponse(POIData poi, double queryLat, double qu return response; } + + private void enhanceWithBuildingInfo(POIResponse response, POIData poi) { + BuildingService.BuildingInfo buildingInfo = buildingService.getBuildingInfo(poi.lat(), poi.lon()); + if (buildingInfo != null) { + // Update type and subtype with building information + if (buildingInfo.getType() != null) { + response.setType("building"); + } + // For subtype, we can use building name or code + if (buildingInfo.getName() != null) { + response.setSubtype(buildingInfo.getName()); + } else if (buildingInfo.getCode() != null) { + response.setSubtype(buildingInfo.getCode()); + } + + // If building has boundary data, set it in the response + if (buildingInfo.getBoundaryWkb() != null && buildingInfo.getBoundaryWkb().length > 0) { + try { + WKBReader wkbReader = new WKBReader(); + Geometry geometry = wkbReader.read(buildingInfo.getBoundaryWkb()); + GeoJsonGeometry geoJsonGeometry = convertJtsToGeoJson(geometry); + response.setBoundary(geoJsonGeometry); + logger.debug("Enhanced POI {} with building boundary", poi.id()); + } catch (Exception e) { + logger.warn("Failed to convert building boundary to GeoJSON for POI {}: {}", poi.id(), e.getMessage()); + } + } + } + } private record POIData(long id, float lat, float lon, String type, String subtype, Map names, diff --git a/src/main/java/com/dedicatedcode/paikka/service/importer/ImportService.java b/src/main/java/com/dedicatedcode/paikka/service/importer/ImportService.java index b6de56a..968315e 100644 --- a/src/main/java/com/dedicatedcode/paikka/service/importer/ImportService.java +++ b/src/main/java/com/dedicatedcode/paikka/service/importer/ImportService.java @@ -67,9 +67,11 @@ public class ImportService { private final PaikkaConfiguration config; private final Map tagCache = new ConcurrentHashMap<>(1000); + private final int fileReadWindowSize; private final AtomicLong sequence = new AtomicLong(0); + private final AtomicLong buildingSequence = new AtomicLong(0); public ImportService(S2Helper s2Helper, GeometrySimplificationService geometrySimplificationService, PaikkaConfiguration config) { this.s2Helper = s2Helper; @@ -101,7 +103,10 @@ public void importData(String pbfFilePath, String dataDir) throws Exception { Path shardsDbPath = dataDirectory.resolve("poi_shards"); Path boundariesDbPath = dataDirectory.resolve("boundaries"); + Path buildingsDbPath = dataDirectory.resolve("buildings_shards"); + Path appendBuildingDbPath = dataDirectory.resolve("tmp/append_building"); Path gridIndexDbPath = dataDirectory.resolve("tmp/grid_index"); + Path buildingGridIndexDbPath = dataDirectory.resolve("tmp/building_grid_index"); Path appendDbPath = dataDirectory.resolve("tmp/append_poi"); Path nodeCacheDbPath = dataDirectory.resolve("tmp/node_cache"); Path wayIndexDbPath = dataDirectory.resolve("tmp/way_index"); @@ -112,7 +117,10 @@ public void importData(String pbfFilePath, String dataDir) throws Exception { cleanupDatabase(shardsDbPath); cleanupDatabase(boundariesDbPath); + cleanupDatabase(buildingsDbPath); + cleanupDatabase(appendBuildingDbPath); cleanupDatabase(gridIndexDbPath); + cleanupDatabase(buildingGridIndexDbPath); cleanupDatabase(nodeCacheDbPath); cleanupDatabase(wayIndexDbPath); cleanupDatabase(boundaryWayIndexDbPath); @@ -184,7 +192,10 @@ public void importData(String pbfFilePath, String dataDir) throws Exception { try (RocksDB shardsDb = RocksDB.open(poiShardsOpts, shardsDbPath.toString()); RocksDB boundariesDb = RocksDB.open(poiShardsOpts, boundariesDbPath.toString()); + RocksDB buildingsDb = RocksDB.open(poiShardsOpts, buildingsDbPath.toString()); + RocksDB appendBuildingDb = RocksDB.open(appendOpts, appendBuildingDbPath.toString()); RocksDB gridIndexDb = RocksDB.open(gridOpts, gridIndexDbPath.toString()); + RocksDB buildingGridIndexDb = RocksDB.open(gridOpts, buildingGridIndexDbPath.toString()); RocksDB nodeCache = RocksDB.open(nodeOpts, nodeCacheDbPath.toString()); RocksDB wayIndexDb = RocksDB.open(wayIndexOpts, wayIndexDbPath.toString()); RocksDB neededBoundaryWaysDb = RocksDB.open(wayIndexOpts, boundaryWayIndexDbPath.toString()); @@ -205,22 +216,27 @@ public void importData(String pbfFilePath, String dataDir) throws Exception { // PASS 2: Nodes Cache, Boundaries, POIs stats.printPhaseHeader("PASS 2: Nodes Cache, Boundaries, POIs"); long pass2Start = System.currentTimeMillis(); - stats.setCurrentPhase(3, "1.1.3: Caching node coordinates"); + stats.setCurrentPhase(3, "1.2: Caching node coordinates"); cacheNeededNodeCoordinates(pbfFile, neededNodesDb, nodeCache, stats); - stats.setCurrentPhase(4, "1.2: Processing administrative boundaries"); + stats.setCurrentPhase(4, "1.3: Processing administrative boundaries"); processAdministrativeBoundariesFromIndex(relIndexDb, nodeCache, wayIndexDb, gridIndexDb, boundariesDb, stats); - stats.setCurrentPhase(5, "2.1: Processing POIs & Sharding"); + + stats.setCurrentPhase(5, "1.4: Processing building boundaries"); + processBuildingBoundariesFromIndex(poiIndexDb, nodeCache, wayIndexDb, buildingGridIndexDb, appendBuildingDb, stats); + pass2PoiShardingFromIndex(nodeCache, wayIndexDb, appendDb, boundariesDb, poiIndexDb, gridIndexDb, stats); - stats.setCurrentPhase(6, "2.2: Compacting POIs"); + stats.setCurrentPhase(8, "2.2: Compacting POIs"); compactShards(appendDb, shardsDb, stats); + stats.setCurrentPhase(9, "2.3: Compacting Buildings"); + compactBuildingShards(appendBuildingDb, buildingsDb, stats); stats.stop(); stats.printPhaseSummary("PASS 2", pass2Start); shardsDb.compactRange(); boundariesDb.compactRange(); - + buildingsDb.compactRange(); stats.setTotalTime(System.currentTimeMillis() - totalStartTime); @@ -228,7 +244,10 @@ public void importData(String pbfFilePath, String dataDir) throws Exception { recordSizeMetrics(stats, shardsDbPath, boundariesDbPath, + buildingsDbPath, + appendBuildingDbPath, gridIndexDbPath, + buildingGridIndexDbPath, nodeCacheDbPath, wayIndexDbPath, boundaryWayIndexDbPath, @@ -327,7 +346,9 @@ private void pass1DiscoveryAndIndexing(Path pbfFile, RocksDB wayIndexDb, RocksDB try { if (type == EntityType.Node) { OsmNode node = (OsmNode) container.getEntity(); - if (isPoi(node)) { + boolean isAddressNode = hasAddressTags(node); + + if (isPoi(node) || isAddressNode) { PoiIndexRec rec = buildPoiIndexRecFromEntity(node); rec.lat = node.getLatitude(); rec.lon = node.getLongitude(); @@ -335,12 +356,16 @@ private void pass1DiscoveryAndIndexing(Path pbfFile, RocksDB wayIndexDb, RocksDB poiWriter.put(key, encodePoiIndexRec(rec)); neededWriter.put(s2Helper.longToByteArray(node.getId()), ONE); stats.incrementNodesFound(); + if (isAddressNode) { + stats.incrementAddressNodesFound(); + } } } else if (type == EntityType.Way) { OsmWay way = (OsmWay) container.getEntity(); boolean isPoi = isPoi(way); boolean isAdmin = isAdministrativeBoundaryWay(way); + if (isPoi || isAdmin) { stats.incrementWaysProcessed(); int n = way.getNumberOfNodes(); @@ -350,15 +375,18 @@ private void pass1DiscoveryAndIndexing(Path pbfFile, RocksDB wayIndexDb, RocksDB nodeIds[j] = nid; neededWriter.put(s2Helper.longToByteArray(nid), ONE); } - wayWriter.put(s2Helper.longToByteArray(way.getId()), s2Helper.longArrayToByteArray(nodeIds)); + wayWriter.put(s2Helper.longToByteArray(way.getId()), + s2Helper.longArrayToByteArray(nodeIds)); + if (isPoi) { PoiIndexRec rec = buildPoiIndexRecFromEntity(way); - // lat/lon remain NaN for ways — resolved in Pass 2 reader byte[] key = buildPoiKey((byte) 'W', way.getId()); poiWriter.put(key, encodePoiIndexRec(rec)); + if ("building".equals(rec.type)) { + stats.incrementBuildingsFound(); + } } } - } else if (type == EntityType.Relation) { OsmRelation relation = (OsmRelation) container.getEntity(); if (isAdministrativeBoundary(relation)) { @@ -417,11 +445,17 @@ private void pass2PoiShardingFromIndex(RocksDB nodeCache, try (ExecutorService executor = createExecutorService(numReaders); ReadOptions ro = new ReadOptions().setReadaheadSize(8 * 1024 * 1024)) { com.github.benmanes.caffeine.cache.Cache globalBoundaryCache = Caffeine.newBuilder().maximumSize(1000).recordStats().build(); ThreadLocal hierarchyCacheThreadLocal = ThreadLocal.withInitial(() -> new HierarchyCache(boundariesDb, gridIndexDb, s2Helper, globalBoundaryCache)); + + stats.setCurrentPhase(6, "1.5: Preparing POI Sharding"); long total = 0; try (RocksIterator it = poiIndexDb.newIterator(ro)) { - for (it.seekToFirst(); it.isValid(); it.next()) total++; + for (it.seekToFirst(); it.isValid(); it.next()) { + total++; + stats.incrementPoiIndexEntriesScanned(); + } } + stats.setCurrentPhase(7, "2.1: Processing POIs & Sharding"); long step = Math.max(1, total / numReaders); List splitKeys = new ArrayList<>(); try (RocksIterator it = poiIndexDb.newIterator(ro)) { @@ -453,6 +487,15 @@ private void pass2PoiShardingFromIndex(RocksDB nodeCache, rec.kind = kind; rec.id = id; + // Solution: The old workaround is removed. Enrichment is done at query time. + // We no longer try to enrich address nodes during import. + + // Buildings are processed in a separate phase and not included as point POIs. + if ("building".equals(rec.type)) { + it.next(); + continue; + } + // For way POIs, lat/lon is NaN — resolve from nodeCache/wayIndexDb byte[] cacheWayNodes = null; if (Double.isNaN(rec.lat) && kind == 'W') { @@ -486,8 +529,6 @@ private void pass2PoiShardingFromIndex(RocksDB nodeCache, } - // ─── Worker threads: hierarchy resolution + sharding (unchanged) ─── - for (int i = 0; i < numReaders; i++) { executor.submit(() -> { stats.incrementActiveThreads(); @@ -508,7 +549,6 @@ private void pass2PoiShardingFromIndex(RocksDB nodeCache, double lon = rec.lon; byte[] boundaryWkb = null; - // For way POIs, still compute boundary WKB geometry if (rec.kind == 'W') { List coords = buildCoordinatesFromWay(nodeCache, item.cachedWayNodes); if (coords != null && coords.size() >= 3) { @@ -575,11 +615,6 @@ private void pass2PoiShardingFromIndex(RocksDB nodeCache, } } - /** - * Sorts a chunk of POI items by S2CellId (Hilbert curve order) for spatial - * locality, then emits to the processing queue in batches. - * Memory stays bounded: only one chunk is in memory at a time. - */ private void sortAndEmitChunk(List chunk, BlockingQueue> queue, ImportStatistics stats) throws InterruptedException { chunk.sort((a, b) -> Long.compareUnsigned(a.s2SortKey, b.s2SortKey)); @@ -641,13 +676,11 @@ private byte[] resolveWayCenter(PoiIndexRec rec, RocksDB nodeCache, RocksDB wayI private void cacheNeededNodeCoordinates(Path pbfFile, RocksDB neededNodesDb, RocksDB nodeCache, ImportStatistics stats) throws Exception { final int BATCH_SIZE = 50_000; - // The reader thread produces batches of nodes BlockingQueue> nodeBatchQueue = new LinkedBlockingQueue<>(200); int numProcessors = Math.max(1, config.getImportConfiguration().getThreads()); CountDownLatch latch = new CountDownLatch(numProcessors); try (ExecutorService executor = createExecutorService(numProcessors)) { - // The reader logic is good, no changes needed here. Thread reader = Thread.ofVirtual().start(() -> { List buf = new ArrayList<>(BATCH_SIZE); try { @@ -684,11 +717,8 @@ private void cacheNeededNodeCoordinates(Path pbfFile, RocksDB neededNodesDb, Roc for (int i = 0; i < numProcessors; i++) { executor.submit(() -> { stats.incrementActiveThreads(); - // --- OPTIMIZATION 1: ThreadLocal for reusable objects to reduce GC pressure --- final ThreadLocal nodeWriterLocal = ThreadLocal.withInitial(() -> new RocksBatchWriter(nodeCache, 50_000, stats)); - // Reuse a ByteBuffer for writing values final ThreadLocal valueBufferLocal = ThreadLocal.withInitial(() -> ByteBuffer.allocate(16)); - // Reuse a List of byte[] for multiGet keys. We only need one per thread. final ThreadLocal> keysListLocal = ThreadLocal.withInitial(ArrayList::new); try { @@ -697,9 +727,8 @@ private void cacheNeededNodeCoordinates(Path pbfFile, RocksDB neededNodesDb, Roc stats.setQueueSize(nodeBatchQueue.size()); if (nodes.isEmpty()) break; - // Reuse the keys list List keys = keysListLocal.get(); - keys.clear(); // Clear previous batch's keys + keys.clear(); for (OsmNode n : nodes) { keys.add(s2Helper.longToByteArray(n.getId())); } @@ -707,19 +736,14 @@ private void cacheNeededNodeCoordinates(Path pbfFile, RocksDB neededNodesDb, Roc List presence = neededNodesDb.multiGetAsList(keys); RocksBatchWriter nodeWriter = nodeWriterLocal.get(); - ByteBuffer valueBuffer = valueBufferLocal.get(); // Get the reusable buffer + ByteBuffer valueBuffer = valueBufferLocal.get(); - // --- OPTIMIZATION 2: Combine loops --- for (int idx = 0; idx < nodes.size(); idx++) { if (presence.get(idx) != null) { OsmNode n = nodes.get(idx); - - // Reset buffer position and write new data valueBuffer.clear(); valueBuffer.putDouble(n.getLatitude()); valueBuffer.putDouble(n.getLongitude()); - - // The key is already in the 'keys' list at the same index nodeWriter.put(keys.get(idx), valueBuffer.array()); stats.incrementNodesCached(); } @@ -737,8 +761,8 @@ private void cacheNeededNodeCoordinates(Path pbfFile, RocksDB neededNodesDb, Roc stats.recordError(ImportStatistics.Stage.CACHING_NODE_COORDINATES, Kind.STORE, null, "node-cache-writer-close", e); } nodeWriterLocal.remove(); - valueBufferLocal.remove(); // Clean up thread-local - keysListLocal.remove(); // Clean up thread-local + valueBufferLocal.remove(); + keysListLocal.remove(); stats.decrementActiveThreads(); latch.countDown(); } @@ -787,16 +811,15 @@ private void processAdministrativeBoundariesFromIndex(RocksDB relIndexDb, RocksD org.locationtech.jts.geom.Geometry simplified = geometrySimplificationService.simplifyByAdminLevel(geometry, rec.level); - // Simplification can produce invalid results too if (simplified == null || simplified.isEmpty() || !simplified.isValid()) { try { simplified = simplified != null ? simplified.buffer(0) : null; } catch (Exception e) { - simplified = geometry; // fall back to unsimplified + simplified = geometry; } } if (simplified == null || simplified.isEmpty() || !simplified.isValid()) { - simplified = geometry; // fall back to unsimplified + simplified = geometry; } return new BoundaryResultLite(rec.osmId, rec.level, rec.name, rec.code, simplified); @@ -805,7 +828,7 @@ private void processAdministrativeBoundariesFromIndex(RocksDB relIndexDb, RocksD stats.decrementActiveThreads(); } }); - ; + submitted.incrementAndGet(); for (Future f; (f = ecs.poll()) != null; ) { @@ -844,6 +867,164 @@ private void processAdministrativeBoundariesFromIndex(RocksDB relIndexDb, RocksD } } + private void processBuildingBoundariesFromIndex(RocksDB poiIndexDb, RocksDB nodeCache, RocksDB wayIndexDb, RocksDB buildingGridIndexDb, RocksDB appendBuildingDb, ImportStatistics stats) throws Exception { + int maxInFlight = 100; + Semaphore semaphore = new Semaphore(maxInFlight); + int batchSize = 1000; + int numThreads = Math.max(1, config.getImportConfiguration().getThreads()); + ExecutorService executor = createExecutorService(numThreads); + ExecutorCompletionService> ecs = new ExecutorCompletionService<>(executor); + + AtomicInteger submitted = new AtomicInteger(0); + AtomicLong collected = new AtomicLong(0); + + AtomicReference>> shardBufferRef = new AtomicReference<>(new ConcurrentHashMap<>()); + Runnable flushTask = () -> { + try { + Map> bufferToFlush = shardBufferRef.getAndSet(new ConcurrentHashMap<>()); + if (!bufferToFlush.isEmpty()) { + writeBuildingShardBatchAppendOnly(bufferToFlush, appendBuildingDb, stats); + } + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.STORE, null, "flush-building-append-batch", e); + } + }; + + try (PeriodicFlusher _ = PeriodicFlusher.start("building-shard-flush", 5, 5, flushTask); + RocksIterator it = poiIndexDb.newIterator()) { + + it.seekToFirst(); + List currentBatchIds = new ArrayList<>(batchSize); + List currentBatchRecs = new ArrayList<>(batchSize); + + while (it.isValid()) { + byte[] key = it.key(); + if (key[0] == 'W') { // Buildings must be ways + byte[] val = it.value(); + final PoiIndexRec rec = decodePoiIndexRec(val); + if ("building".equals(rec.type)) { + final long wayId = bytesToLong(key, 1); + currentBatchIds.add(wayId); + currentBatchRecs.add(rec); + + if (currentBatchIds.size() >= batchSize) { + semaphore.acquire(); + stats.incrementActiveThreads(); + List idsToProcess = new ArrayList<>(currentBatchIds); + List recsToProcess = new ArrayList<>(currentBatchRecs); + ecs.submit(() -> { + try { + List results = new ArrayList<>(); + for (int i = 0; i < idsToProcess.size(); i++) { + long wayIdInner = idsToProcess.get(i); + PoiIndexRec recInner = recsToProcess.get(i); + try { + org.locationtech.jts.geom.Geometry geometry = buildGeometryFromWay(wayIdInner, nodeCache, wayIndexDb, stats); + if (geometry == null) continue; + if (!geometry.isValid()) { + try { + geometry = geometry.buffer(0); + } catch (Exception e) { + continue; + } + } + if (geometry == null || geometry.isEmpty() || !geometry.isValid()) continue; + results.add(new BuildingData(wayIdInner, recInner.subtype, null, geometry)); + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.GEOMETRY, wayIdInner, "build-building-geometry-batch", e); + } + } + return results; + } finally { + semaphore.release(); + stats.decrementActiveThreads(); + } + }); + submitted.incrementAndGet(); + currentBatchIds.clear(); + currentBatchRecs.clear(); + + Future> f; + while ((f = ecs.poll()) != null) { + collected.incrementAndGet(); + try { + List batchResults = f.get(); + Map> currentActiveBuffer = shardBufferRef.get(); + for (BuildingData r : batchResults) { + Coordinate center = r.geometry().getEnvelopeInternal().centre(); + long shardId = s2Helper.getShardId(center.getY(), center.getX()); + currentActiveBuffer.computeIfAbsent(shardId, k -> new CopyOnWriteArrayList<>()).add(r); + stats.incrementBuildingsProcessed(); + } + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.CONCURRENCY, null, "building-future-poll", e); + } + } + } + } + } + it.next(); + } + + if (!currentBatchIds.isEmpty()) { + semaphore.acquire(); + stats.incrementActiveThreads(); + List idsToProcess = new ArrayList<>(currentBatchIds); + List recsToProcess = new ArrayList<>(currentBatchRecs); + ecs.submit(() -> { + try { + List results = new ArrayList<>(); + for (int i = 0; i < idsToProcess.size(); i++) { + long wayIdInner = idsToProcess.get(i); + PoiIndexRec recInner = recsToProcess.get(i); + try { + org.locationtech.jts.geom.Geometry geometry = buildGeometryFromWay(wayIdInner, nodeCache, wayIndexDb, stats); + if (geometry == null) continue; + if (!geometry.isValid()) { + try { + geometry = geometry.buffer(0); + } catch (Exception e) { + continue; + } + } + if (geometry == null || geometry.isEmpty() || !geometry.isValid()) continue; + results.add(new BuildingData(wayIdInner, recInner.subtype, null, geometry)); + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.GEOMETRY, wayIdInner, "build-building-geometry-batch", e); + } + } + return results; + } finally { + semaphore.release(); + stats.decrementActiveThreads(); + } + }); + submitted.incrementAndGet(); + } + + long remaining = submitted.get() - collected.get(); + for (int i = 0; i < remaining; i++) { + try { + Future> f = ecs.take(); + List batchResults = f.get(); + Map> currentActiveBuffer = shardBufferRef.get(); + for (BuildingData r : batchResults) { + Coordinate center = r.geometry().getEnvelopeInternal().centre(); + long shardId = s2Helper.getShardId(center.getY(), center.getX()); + currentActiveBuffer.computeIfAbsent(shardId, k -> new CopyOnWriteArrayList<>()).add(r); + stats.incrementBuildingsProcessed(); + } + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.CONCURRENCY, null, "building-future-take", e); + } + } + flushTask.run(); + } finally { + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.MINUTES); + } + } + private PoiData createPoiDataFromIndex(PoiIndexRec rec, double lat, double lon, List hierarchy, byte[] boundaryWkb) { AddressData addr = null; if (rec.street != null || rec.houseNumber != null || rec.postcode != null || rec.city != null || rec.country != null) { @@ -867,6 +1048,30 @@ private String intern(String s) { return tagCache.computeIfAbsent(s, k -> k); } + private boolean hasAddressTags(OsmEntity entity) { + for (int i = 0; i < entity.getNumberOfTags(); i++) { + if (entity.getTag(i).getKey().startsWith("addr:")) { + return true; + } + } + return false; + } + + private boolean isBuildingWay(OsmWay way) { + for (int i = 0; i < way.getNumberOfTags(); i++) { + OsmTag tag = way.getTag(i); + if ("building".equals(tag.getKey())) { + String val = tag.getValue(); + return switch (val) { + case "yes", "commercial", "retail", "industrial", + "office", "apartments", "residential" -> true; + default -> false; + }; + } + } + return false; + } + private void writeShardBatchAppendOnly(Map> shardBuffer, RocksDB appendDb, ImportStatistics stats) throws Exception { FlatBufferBuilder builder = new FlatBufferBuilder(1024 * 32); @@ -882,7 +1087,6 @@ private void writeShardBatchAppendOnly(Map> shardBuffer, Roc builder.clear(); - // Serialize ONLY the new POIs (no reading existing!) int[] poiOffsets = new int[pois.size()]; for (int i = 0; i < pois.size(); i++) { poiOffsets[i] = serializePoiData(builder, pois.get(i)); @@ -891,7 +1095,6 @@ private void writeShardBatchAppendOnly(Map> shardBuffer, Roc int poiListOffset = POIList.createPOIList(builder, poisVectorOffset); builder.finish(poiListOffset); - // Key = shardId (8 bytes) + sequence (8 bytes) — unique, no collision long seq = sequence.incrementAndGet(); byte[] key = new byte[16]; ByteBuffer.wrap(key).putLong(entry.getKey()).putLong(seq); @@ -961,12 +1164,172 @@ private int serializePoiData(FlatBufferBuilder builder, PoiData poi) { return POI.endPOI(builder); } + private void compactBuildingShards(RocksDB appendDb, RocksDB buildingsDb, ImportStatistics stats) { + stats.setCompactionStartTime(System.currentTimeMillis()); + stats.setCompactionEntriesTotal(buildingSequence.get()); + + Building reusableBuilding = new Building(); + Geometry reusableGeom = new Geometry(); + + try (RocksIterator iterator = appendDb.newIterator(); + WriteOptions writeOptions = new WriteOptions().setDisableWAL(true)) { + + iterator.seekToFirst(); + + long currentShardId = Long.MIN_VALUE; + List currentShardChunks = new ArrayList<>(); + + while (iterator.isValid()) { + byte[] key = iterator.key(); + long shardId = ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN).getLong(); + + if (shardId != currentShardId && currentShardId != Long.MIN_VALUE) { + flushCompactedBuildingShard(currentShardChunks, currentShardId, buildingsDb, writeOptions, reusableBuilding, reusableGeom, stats); + stats.incrementCompactionEntriesProcessed(currentShardChunks.size()); + currentShardChunks.clear(); + } + + currentShardId = shardId; + + byte[] value = iterator.value(); + byte[] valueCopy = new byte[value.length]; + System.arraycopy(value, 0, valueCopy, 0, value.length); + currentShardChunks.add(valueCopy); + + iterator.next(); + } + + if (currentShardId != Long.MIN_VALUE && !currentShardChunks.isEmpty()) { + flushCompactedBuildingShard(currentShardChunks, currentShardId, buildingsDb, writeOptions, reusableBuilding, reusableGeom, stats); + stats.incrementCompactionEntriesProcessed(currentShardChunks.size()); + } + } + } + + private void flushCompactedBuildingShard(List chunks, long shardId, RocksDB buildingsDb, WriteOptions writeOptions, Building reusableBuilding, Geometry reusableGeom, ImportStatistics stats) { + int totalBuildings = 0; + try { + for (byte[] chunk : chunks) { + ByteBuffer buf = ByteBuffer.wrap(chunk); + BuildingList buildingList = BuildingList.getRootAsBuildingList(buf); + totalBuildings += buildingList.buildingsLength(); + } + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.COMPACTING_POIS, Kind.DECODE, null, "flatbuffers-read:BuildingList", e); + } + + FlatBufferBuilder builder = new FlatBufferBuilder(Math.max(1024, totalBuildings * 256)); + int[] allOffsets = new int[totalBuildings]; + int idx = 0; + + for (byte[] chunk : chunks) { + ByteBuffer buf = ByteBuffer.wrap(chunk); + BuildingList buildingList = BuildingList.getRootAsBuildingList(buf); + int count = buildingList.buildingsLength(); + + for (int i = 0; i < count; i++) { + buildingList.buildings(reusableBuilding, i); + allOffsets[idx++] = copyBuildingFromFlatBuffer(builder, reusableBuilding, reusableGeom); + } + } + + int buildingsVec = BuildingList.createBuildingsVector(builder, allOffsets); + int buildingList = BuildingList.createBuildingList(builder, buildingsVec); + builder.finish(buildingList); + try { + buildingsDb.put(writeOptions, s2Helper.longToByteArray(shardId), builder.sizedByteArray()); + } catch (RocksDBException e) { + stats.recordError(ImportStatistics.Stage.COMPACTING_POIS, Kind.STORE, null, "rocks-put:buildings", e); + } + } + + private int copyBuildingFromFlatBuffer(FlatBufferBuilder builder, Building building, Geometry reusableGeom) { + String nameStr = building.name(); + String codeStr = building.code(); + int nameOff = nameStr != null ? builder.createString(nameStr) : 0; + int codeOff = codeStr != null ? builder.createString(codeStr) : 0; + + int geometryOff = 0; + if (building.geometry(reusableGeom) != null && reusableGeom.dataLength() > 0) { + ByteBuffer geometryBuf = reusableGeom.dataAsByteBuffer(); + if (geometryBuf != null) { + int geometryDataOff = Geometry.createDataVector(builder, geometryBuf); + geometryOff = Geometry.createGeometry(builder, geometryDataOff); + } + } + + Building.startBuilding(builder); + Building.addId(builder, building.id()); + if (nameOff != 0) Building.addName(builder, nameOff); + if (codeOff != 0) Building.addCode(builder, codeOff); + if (geometryOff != 0) Building.addGeometry(builder, geometryOff); + return Building.endBuilding(builder); + } + + private void writeBuildingShardBatchAppendOnly(Map> shardBuffer, RocksDB appendDb, ImportStatistics stats) throws Exception { + FlatBufferBuilder builder = new FlatBufferBuilder(1024 * 32); + + try (WriteBatch batch = new WriteBatch(); WriteOptions writeOptions = new WriteOptions()) { + + for (Iterator>> it = shardBuffer.entrySet().iterator(); it.hasNext(); ) { + Map.Entry> entry = it.next(); + List buildings = entry.getValue(); + if (buildings.isEmpty()) { + it.remove(); + continue; + } + + builder.clear(); + + int[] buildingOffsets = new int[buildings.size()]; + for (int i = 0; i < buildings.size(); i++) { + buildingOffsets[i] = serializeBuildingData(builder, buildings.get(i)); + } + int buildingsVectorOffset = BuildingList.createBuildingsVector(builder, buildingOffsets); + int buildingListOffset = BuildingList.createBuildingList(builder, buildingsVectorOffset); + builder.finish(buildingListOffset); + + long seq = buildingSequence.incrementAndGet(); + byte[] key = new byte[16]; + ByteBuffer.wrap(key).putLong(entry.getKey()).putLong(seq); + + batch.put(key, builder.sizedByteArray()); + it.remove(); + } + + try { + appendDb.write(writeOptions, batch); + } catch (RocksDBException e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.STORE, null, "rocks-write:append_building", e); + } + stats.incrementRocksDbWrites(); + } + } + + private int serializeBuildingData(FlatBufferBuilder builder, BuildingData building) { + int nameOff = building.name() != null ? builder.createString(building.name()) : 0; + int codeOff = building.code() != null ? builder.createString(building.code()) : 0; + + int geometryOff = 0; + if (building.geometry() != null) { + byte[] wkb = new WKBWriter().write(building.geometry()); + int geometryDataOff = Geometry.createDataVector(builder, wkb); + geometryOff = Geometry.createGeometry(builder, geometryDataOff); + } + + Building.startBuilding(builder); + Building.addId(builder, building.id()); + if (nameOff != 0) Building.addName(builder, nameOff); + if (codeOff != 0) Building.addCode(builder, codeOff); + if (geometryOff != 0) Building.addGeometry(builder, geometryOff); + return Building.endBuilding(builder); + } + private void compactShards(RocksDB appendDb, RocksDB shardsDb, ImportStatistics stats) { stats.setCompactionStartTime(System.currentTimeMillis()); stats.setCompactionEntriesTotal(sequence.get()); - // Reusable FlatBuffer accessor objects POI reusablePoi = new POI(); Name reusableName = new Name(); HierarchyItem reusableHier = new HierarchyItem(); @@ -979,14 +1342,12 @@ private void compactShards(RocksDB appendDb, RocksDB shardsDb, ImportStatistics iterator.seekToFirst(); long currentShardId = Long.MIN_VALUE; - // Collect raw byte[] chunks per shard, build FlatBuffer only at flush time List currentShardChunks = new ArrayList<>(); while (iterator.isValid()) { byte[] key = iterator.key(); long shardId = ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN).getLong(); - // Shard boundary — flush previous shard if (shardId != currentShardId && currentShardId != Long.MIN_VALUE) { flushCompactedShard(currentShardChunks, currentShardId, shardsDb, writeOptions, reusablePoi, reusableName, reusableHier, reusableAddr, reusableGeom, stats); stats.incrementCompactionEntriesProcessed(currentShardChunks.size()); @@ -995,7 +1356,6 @@ private void compactShards(RocksDB appendDb, RocksDB shardsDb, ImportStatistics currentShardId = shardId; - // IMPORTANT: copy the value bytes — RocksIterator may reuse the buffer byte[] value = iterator.value(); byte[] valueCopy = new byte[value.length]; System.arraycopy(value, 0, valueCopy, 0, value.length); @@ -1004,7 +1364,6 @@ private void compactShards(RocksDB appendDb, RocksDB shardsDb, ImportStatistics iterator.next(); } - // Flush last shard if (currentShardId != Long.MIN_VALUE && !currentShardChunks.isEmpty()) { flushCompactedShard(currentShardChunks, currentShardId, shardsDb, writeOptions, reusablePoi, reusableName, reusableHier, reusableAddr, reusableGeom, stats); stats.incrementCompactionEntriesProcessed(currentShardChunks.size()); @@ -1012,13 +1371,8 @@ private void compactShards(RocksDB appendDb, RocksDB shardsDb, ImportStatistics } } - /** - * Takes all raw FlatBuffer chunks for a single shard, reads each chunk's POIs, - * copies them into a fresh FlatBufferBuilder, and writes the merged result. - */ private void flushCompactedShard(List chunks, long shardId, RocksDB shardsDb, WriteOptions writeOptions, POI reusablePoi, Name reusableName, HierarchyItem reusableHier, Address reusableAddr, Geometry reusableGeom, ImportStatistics stats) { - // Count total POIs first int totalPois = 0; try { for (byte[] chunk : chunks) { @@ -1030,12 +1384,10 @@ private void flushCompactedShard(List chunks, long shardId, RocksDB shar stats.recordError(ImportStatistics.Stage.COMPACTING_POIS, Kind.DECODE, null, "flatbuffers-read:POIList", e); } - // Fresh builder per shard — no stale offsets FlatBufferBuilder builder = new FlatBufferBuilder(Math.max(1024, totalPois * 256)); int[] allOffsets = new int[totalPois]; int idx = 0; - // Process each chunk: the source ByteBuffer stays alive while we read from it for (byte[] chunk : chunks) { ByteBuffer buf = ByteBuffer.wrap(chunk); POIList poiList = POIList.getRootAsPOIList(buf); @@ -1118,7 +1470,6 @@ private int copyPoiFromFlatBuffer(FlatBufferBuilder builder, POI poi, Name reusa hierVecOff = POI.createHierarchyVector(builder, new int[0]); } - // Boundary geometry — use ByteBuffer slice for zero-copy transfer int boundaryOff = 0; if (poi.boundary(reusableGeom) != null && reusableGeom.dataLength() > 0) { ByteBuffer boundaryBuf = reusableGeom.dataAsByteBuffer(); @@ -1126,7 +1477,6 @@ private int copyPoiFromFlatBuffer(FlatBufferBuilder builder, POI poi, Name reusa int boundaryDataOff = Geometry.createDataVector(builder, boundaryBuf); boundaryOff = Geometry.createGeometry(builder, boundaryDataOff); } else { - // Fallback: manual copy if dataAsByteBuffer() returns null int len = reusableGeom.dataLength(); byte[] boundaryBytes = new byte[len]; for (int k = 0; k < len; k++) { @@ -1137,7 +1487,6 @@ private int copyPoiFromFlatBuffer(FlatBufferBuilder builder, POI poi, Name reusa } } - // --- Now build the POI table --- POI.startPOI(builder); POI.addId(builder, poi.id()); POI.addLat(builder, poi.lat()); @@ -1151,6 +1500,31 @@ private int copyPoiFromFlatBuffer(FlatBufferBuilder builder, POI poi, Name reusa return POI.endPOI(builder); } + private org.locationtech.jts.geom.Geometry buildGeometryFromWay(long wayId, RocksDB nodeCache, RocksDB wayIndexDb, ImportStatistics stats) { + try { + byte[] nodeSeq = wayIndexDb.get(s2Helper.longToByteArray(wayId)); + List coords = buildCoordinatesFromWay(nodeCache, nodeSeq); + + if (coords == null || coords.size() < 3) { + return null; + } + + if (!coords.getFirst().equals2D(coords.getLast())) { + coords.add(new Coordinate(coords.getFirst())); + } + + if (coords.size() < 4) { + return null; + } + + LinearRing shell = GEOMETRY_FACTORY.createLinearRing(coords.toArray(new Coordinate[0])); + return GEOMETRY_FACTORY.createPolygon(shell); + } catch (Exception e) { + stats.recordError(ImportStatistics.Stage.PROCESSING_BUILDINGS, Kind.GEOMETRY, wayId, "build-building-geometry", e); + return null; + } + } + private org.locationtech.jts.geom.Geometry buildGeometryFromRelRec(RelRec rec, RocksDB nodeCache, RocksDB wayIndexDb, ImportStatistics stats) { List> outerRings = buildConnectedRings(toList(rec.outer), nodeCache, wayIndexDb); List> innerRings = buildConnectedRings(toList(rec.inner), nodeCache, wayIndexDb); @@ -1244,40 +1618,44 @@ private boolean isPoi(OsmEntity entity) { switch (key) { case "amenity": return switch (val) { - case "bench", "drinking_water", "waste_basket", "bicycle_parking", "vending_machine", - "parking_entrance", "fire_hydrant" -> false; - + case "bench", "drinking_water", "waste_basket", "bicycle_parking", + "vending_machine", "parking_entrance", "fire_hydrant" -> false; default -> true; }; case "healthcare": return true; + case "emergency": return switch (val) { - case "fire_hydrant", "defibrillator", "fire_extinguisher", "siren", "life_ring", "lifeline", - "phone", "drinking_water" -> false; + case "fire_hydrant", "fire_service_inlet", "defibrillator", "fire_extinguisher", + "siren", "life_ring", "lifeline", "phone", "drinking_water", "yes" -> false; default -> true; }; + case "building": - if (switch (val) { - case "yes", "commercial", "retail", "industrial", "office", "apartments" -> true; - default -> false; - }) { - isInterestingBuilding = true; - } + isInterestingBuilding = true; break; - case "shop", "tourism", "leisure", "office", "craft", "place", "historic", "public_transport", - "aeroway": - // Exclude the specific sub-leisure types you mentioned earlier + case "natural": + // Filter out natural features that are not useful as POIs + return switch (val) { + case "tree", "wood", "scrub", "heath", "grassland", "fell", + "bare_rock", "scree", "shingle", "sand", "mud" -> false; + default -> true; + }; + + case "shop", "tourism", "leisure", "office", "craft", "place", + "historic", "public_transport", "aeroway": return !key.equals("leisure") || !List.of("picnic_table", "swimming_pool").contains(val); case "railway": if ("station".equals(val)) return true; break; + case "man_made": + return false; default: - // If it's any other key in your fast-key list (like 'natural'), allow it if (isPoiFastKey(key)) return true; } } @@ -1379,7 +1757,7 @@ private void storeBoundary(long osmId, int level, String name, String code, hasMir = true; } } catch (Exception e) { - // MIR computation failed — boundary still works, just no fast-path optimization + // MIR computation failed } int nameOffset = fbb.createString(name != null ? name : "Unknown"); @@ -1414,17 +1792,11 @@ private void storeBoundary(long osmId, int level, String name, String code, ArrayList covering = new ArrayList<>(); coverer.getCovering(rect, covering); - // Batched grid index update instead of per-cell synchronized writes batchUpdateGridIndex(gridsIndexDb, covering, osmId); boundariesWriter.put(s2Helper.longToByteArray(osmId), fbb.sizedByteArray()); } - /** - * Registers a boundary in the grid index for all covered cells. - * Uses chunked multiGet + WriteBatch for much better throughput than - * the previous per-cell synchronized approach. - */ private void batchUpdateGridIndex(RocksDB gridIndexDb, ArrayList cells, long osmId) throws RocksDBException { final int CHUNK_SIZE = 5_000; for (int offset = 0; offset < cells.size(); offset += CHUNK_SIZE) { @@ -1593,7 +1965,10 @@ private long computeDirectorySize(Path root) { private void recordSizeMetrics(ImportStatistics stats, Path shardsDbPath, Path boundariesDbPath, + Path buildingsDbPath, + Path appendBuildingDbPath, Path gridIndexDbPath, + Path buildingGridIndexDbPath, Path nodeCacheDbPath, Path wayIndexDbPath, Path boundaryWayIndexDbPath, @@ -1603,9 +1978,11 @@ private void recordSizeMetrics(ImportStatistics stats, Path appendDbPath) { long shards = computeDirectorySize(shardsDbPath); long boundaries = computeDirectorySize(boundariesDbPath); - long dataset = shards + boundaries; + long buildings = computeDirectorySize(buildingsDbPath); + long dataset = shards + boundaries + buildings; long grid = computeDirectorySize(gridIndexDbPath); + long buildingGrid = computeDirectorySize(buildingGridIndexDbPath); long node = computeDirectorySize(nodeCacheDbPath); long way = computeDirectorySize(wayIndexDbPath); long boundaryWay = computeDirectorySize(boundaryWayIndexDbPath); @@ -1613,13 +1990,16 @@ private void recordSizeMetrics(ImportStatistics stats, long rel = computeDirectorySize(relIndexDbPath); long poi = computeDirectorySize(poiIndexDbPath); long append = computeDirectorySize(appendDbPath); - long tmpTotal = grid + node + way + boundaryWay + needed + rel + poi + append; + long appendBuilding = computeDirectorySize(appendBuildingDbPath); + long tmpTotal = grid + buildingGrid + node + way + boundaryWay + needed + rel + poi + append + appendBuilding; stats.setShardsBytes(shards); stats.setBoundariesBytes(boundaries); + stats.setBuildingsBytes(buildings); stats.setDatasetBytes(dataset); stats.setTmpGridBytes(grid); + stats.setTmpBuildingGridBytes(buildingGrid); stats.setTmpNodeBytes(node); stats.setTmpWayBytes(way); stats.setTmpBoundaryWayBytes(boundaryWay); @@ -1645,6 +2025,9 @@ private record AddressData(String street, String houseNumber, String postcode, S private record BoundaryResultLite(long osmId, int level, String name, String code, org.locationtech.jts.geom.Geometry geometry) { } + private record BuildingData(long id, String name, String code, org.locationtech.jts.geom.Geometry geometry) { + } + private Long safeEntityId(EntityContainer container) { try { OsmEntity e = container.getEntity(); @@ -1778,20 +2161,38 @@ private PoiIndexRec buildPoiIndexRecFromEntity(OsmEntity e) { PoiIndexRec rec = new PoiIndexRec(); rec.type = "unknown"; rec.subtype = ""; + + boolean hasPoiType = false; + boolean hasAddressTags = false; + for (int i = 0; i < e.getNumberOfTags(); i++) { OsmTag t = e.getTag(i); - if (isPoiFastKey(t.getKey())) { + String k = t.getKey(); + + if (isPoiFastKey(k)) { rec.type = intern(t.getKey()); rec.subtype = intern(t.getValue()); - break; + hasPoiType = true; + } + + if (k.startsWith("addr:")) { + hasAddressTags = true; } } + + if (hasAddressTags && !hasPoiType) { + rec.type = "address"; + rec.subtype = "node"; + } + List names = new ArrayList<>(); Set dedup = new HashSet<>(); + for (int i = 0; i < e.getNumberOfTags(); i++) { OsmTag t = e.getTag(i); String k = t.getKey(), v = t.getValue(); if (v == null || v.trim().isEmpty()) continue; + if ("name".equals(k)) { if (dedup.add("default:" + v)) names.add(new NameData("default", v)); } else if (k.startsWith("name:")) { @@ -1807,6 +2208,7 @@ private PoiIndexRec buildPoiIndexRecFromEntity(OsmEntity e) { } } } + rec.names = names; return rec; } @@ -1866,7 +2268,7 @@ private byte[] encodePoiIndexRec(PoiIndexRec rec) { byte[] pcB = bytes(rec.postcode); byte[] cityB = bytes(rec.city); byte[] countryB = bytes(rec.country); - int cap = 16 // ← NEW: 8 bytes lat + 8 bytes lon + int cap = 16 + 4 + typeB.length + 4 + subtypeB.length + namesSize + 4 + streetB.length + 4 + hnB.length + 4 + pcB.length + 4 + cityB.length + 4 + countryB.length; ByteBuffer bb = ByteBuffer.allocate(cap); bb.putDouble(rec.lat); diff --git a/src/main/java/com/dedicatedcode/paikka/service/importer/ImportStatistics.java b/src/main/java/com/dedicatedcode/paikka/service/importer/ImportStatistics.java index 403c803..5d0ec0c 100644 --- a/src/main/java/com/dedicatedcode/paikka/service/importer/ImportStatistics.java +++ b/src/main/java/com/dedicatedcode/paikka/service/importer/ImportStatistics.java @@ -23,6 +23,23 @@ import java.util.concurrent.atomic.AtomicLong; class ImportStatistics { + + public void incrementBuildingsFound() { + buildingsFound.incrementAndGet(); + } + + public void incrementBuildingsProcessed() { + buildingsProcessed.incrementAndGet(); + } + + public void setBuildingsBytes(long buildings) { + this.buildingsBytes = buildings; + } + + public void setTmpBuildingGridBytes(long buildingGrid) { + this.tmpBuildingGridBytes = buildingGrid; + } + public enum Pass { ONE, TWO @@ -35,7 +52,10 @@ public enum Stage { SCAN_PBF_STRUCTURE("Scan PBF Structure"), CACHING_NODE_COORDINATES("Caching Node Coordinates"), PROCESSING_ADMIN_BOUNDARIES("Processing Admin Boundaries"), - COMPACTING_POIS("Compacting POIs"),; + COMPACTING_POIS("Compacting POIs"), + COMPACTING_BUILDINGS("Compacting Buildings"), + PROCESSING_BUILDINGS("Processing Buildings"), + PREPARING_POI_SHARDING("Preparing POI Sharding"); private final String shortName; Stage(String shortName) { @@ -85,12 +105,17 @@ public String toString() { private final AtomicLong boundariesProcessed = new AtomicLong(0); private final AtomicLong poisProcessed = new AtomicLong(0); private final AtomicLong poiIndexRecRead = new AtomicLong(0); + private final AtomicLong poiIndexEntriesScanned = new AtomicLong(0); private final AtomicBoolean poiIndexRecReadDone = new AtomicBoolean(false); private final AtomicLong rocksDbWrites = new AtomicLong(0); private final AtomicLong queueSize = new AtomicLong(0); private final AtomicLong activeThreads = new AtomicLong(0); private final AtomicLong boundaryWaysProcessed = new AtomicLong(0); private final AtomicLong boundaryPhaseEntitiesRead = new AtomicLong(0); + private final AtomicLong buildingsFound = new AtomicLong(0); + private final AtomicLong buildingsProcessed = new AtomicLong(0); + private final AtomicLong addressNodesFound = new AtomicLong(0); + private final AtomicLong addressNodesWithBuildingType = new AtomicLong(0); private volatile String currentPhase = "Initializing"; private volatile boolean running = true; @@ -105,7 +130,9 @@ public String toString() { private volatile long datasetBytes; private volatile long shardsBytes; private volatile long boundariesBytes; + private volatile long buildingsBytes; private volatile long tmpGridBytes; + private volatile long tmpBuildingGridBytes; private volatile long tmpNodeBytes; private volatile long tmpWayBytes; private volatile long tmpBoundaryWayBytes; @@ -115,7 +142,7 @@ public String toString() { private volatile long tmpTotalBytes; private volatile long tmpAppendBytes; - private final int TOTAL_STEPS = 6; + private final int TOTAL_STEPS = 9; private int currentStep = 0; public long getEntitiesRead() { @@ -182,6 +209,14 @@ public void incrementPoiIndexRecRead() { poiIndexRecRead.incrementAndGet(); } + public long getPoiIndexEntriesScanned() { + return poiIndexEntriesScanned.get(); + } + + public void incrementPoiIndexEntriesScanned() { + poiIndexEntriesScanned.incrementAndGet(); + } + public boolean isPoiIndexRecReadDone() { return poiIndexRecReadDone.get(); } @@ -234,9 +269,18 @@ public void incrementBoundaryPhaseEntitiesRead() { boundaryPhaseEntitiesRead.incrementAndGet(); } + public void incrementAddressNodesFound() { + addressNodesFound.incrementAndGet(); + } + + public void incrementAddressNodesWithBuildingType() { + addressNodesWithBuildingType.incrementAndGet(); + } + public void resetBoundaryPhaseEntitiesRead() { boundaryPhaseEntitiesRead.set(0); } + public String getCurrentPhase() { return currentPhase; } @@ -377,6 +421,22 @@ public void setTmpPoiBytes(long v) { this.tmpPoiBytes = v; } + public long getBuildingsBytes() { + return buildingsBytes; + } + + public long getTmpBuildingGridBytes() { + return tmpBuildingGridBytes; + } + + public long getBuildingsFound() { + return buildingsFound.get(); + } + + public long getBuildingsProcessed() { + return buildingsProcessed.get(); + } + public long getTmpAppendBytes() { return tmpAppendBytes; } @@ -448,7 +508,7 @@ public void startProgressReporter() { sb.append(String.format("\033[1;90m[%d/%d]\033[0m ", currentStep, TOTAL_STEPS)); if (phase.contains("1.1.1")) { - long pbfPerSec = phaseSeconds > 0 ? (long)(getEntitiesRead() / phaseSeconds) : 0; + long pbfPerSec = phaseSeconds > 0 ? (long) (getEntitiesRead() / phaseSeconds) : 0; sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mScanning PBF Structure\033[0m", formatTime(elapsed))); sb.append(String.format(" │ \033[32mPBF Entities:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(getEntitiesRead()), formatCompactRate(pbfPerSec))); @@ -456,9 +516,9 @@ public void startProgressReporter() { sb.append(String.format(" │ \033[34mWays Found:\033[0m %s", formatCompactNumber(getWaysProcessed()))); sb.append(String.format(" │ \033[35mRelations:\033[0m %s", formatCompactNumber(getRelationsFound()))); - } else if (phase.contains("1.1.2")) { + } else if (phase.contains("1.1.2")) { long boundaryEntities = getBoundaryPhaseEntitiesRead(); - long pbfPerSec = phaseSeconds > 0 ? (long)(boundaryEntities / phaseSeconds) : 0; + long pbfPerSec = phaseSeconds > 0 ? (long) (boundaryEntities / phaseSeconds) : 0; sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mIndexing Boundary Member Ways\033[0m", formatTime(elapsed))); sb.append(String.format(" │ \033[32mPBF Entities:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(boundaryEntities), formatCompactRate(pbfPerSec))); @@ -466,28 +526,51 @@ public void startProgressReporter() { sb.append(String.format(" │ \033[35mRelations:\033[0m %s", formatCompactNumber(getRelationsFound()))); } else if (phase.contains("1.1.3")) { - long nodesPerSec = phaseSeconds > 0 ? (long)(getNodesCached() / phaseSeconds) : 0; + long nodesPerSec = phaseSeconds > 0 ? (long) (getNodesCached() / phaseSeconds) : 0; sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mCaching Node Coordinates\033[0m", formatTime(elapsed))); sb.append(String.format(" │ \033[32mNodes Cached:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(getNodesCached()), formatCompactRate(nodesPerSec))); sb.append(String.format(" │ \033[36mQueue:\033[0m %s", formatCompactNumber(getQueueSize()))); sb.append(String.format(" │ \033[37mThreads:\033[0m %d", getActiveThreads())); - } else if (phase.contains("1.2")) { - long boundsPerSec = phaseSeconds > 0 ? (long)(getBoundariesProcessed() / phaseSeconds) : 0; + } else if (phase.contains("1.2:")) { + long nodesPerSec = phaseSeconds > 0 ? (long) (getNodesCached() / phaseSeconds) : 0; + sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mCaching Node Coordinates\033[0m", formatTime(elapsed))); + sb.append(String.format(" │ \033[32mNodes Cached:\033[0m %s \033[33m(%s/s)\033[0m", + formatCompactNumber(getNodesCached()), formatCompactRate(nodesPerSec))); + sb.append(String.format(" │ \033[36mQueue:\033[0m %s", formatCompactNumber(getQueueSize()))); + sb.append(String.format(" │ \033[37mThreads:\033[0m %d", getActiveThreads())); + + } else if (phase.contains("1.3:")) { + long boundsPerSec = phaseSeconds > 0 ? (long) (getBoundariesProcessed() / phaseSeconds) : 0; sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mProcessing Admin Boundaries\033[0m", formatTime(elapsed))); sb.append(String.format(" │ \033[32mBoundaries:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(getBoundariesProcessed()), formatCompactRate(boundsPerSec))); sb.append(String.format(" │ \033[37mThreads:\033[0m %d", getActiveThreads())); + } else if (phase.contains("1.4")) { + long buildingsPerSec = phaseSeconds > 0 ? (long) (getBuildingsProcessed() / phaseSeconds) : 0; + double percentage = getBuildingsFound() > 0 ? (double) getBuildingsProcessed() / getBuildingsFound() * 100.0 : 0.0; + sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mProcessing Building Boundaries\033[0m", formatTime(elapsed))); + sb.append(String.format(" │ \033[32mBuildings:\033[0m %s \033[33m(%s/s)\033[0m", + formatCompactNumber(getBuildingsProcessed()), formatCompactRate(buildingsPerSec))); + sb.append(String.format(" │ \033[35mProgress:\033[0m %.2f%%", percentage)); + sb.append(String.format(" │ \033[37mThreads:\033[0m %d", getActiveThreads())); + + } else if (phase.contains("1.5")) { + long scannedPerSec = phaseSeconds > 0 ? (long) (getPoiIndexEntriesScanned() / phaseSeconds) : 0; + sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mPreparing POI Sharding\033[0m", formatTime(elapsed))); + sb.append(String.format(" │ \033[32mPOI Index Scanned:\033[0m %s \033[33m(%s/s)\033[0m", + formatCompactNumber(getPoiIndexEntriesScanned()), formatCompactRate(scannedPerSec))); + } else if (phase.contains("2.1")) { - long poisPerSec = phaseSeconds > 0 ? (long)(getPoisProcessed() / phaseSeconds) : 0; - long poisReadSec = phaseSeconds > 0 ? (long)(getPoiIndexRecRead() / phaseSeconds) : 0; + long poisPerSec = phaseSeconds > 0 ? (long) (getPoisProcessed() / phaseSeconds) : 0; + long poisReadSec = phaseSeconds > 0 ? (long) (getPoiIndexRecRead() / phaseSeconds) : 0; long totalFromPhase1 = getNodesFound() + getWaysProcessed(); double percentage = totalFromPhase1 > 0 ? (double) getPoisProcessed() / totalFromPhase1 * 100.0 : 0.0; sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mProcessing POIs & Sharding\033[0m", formatTime(elapsed))); - sb.append(String.format(" │ \033[32mPOI Index Rec Read:\033[0m %s \033[33m%s\033[0m", formatCompactNumber(getPoiIndexRecRead()), isPoiIndexRecReadDone() ? "(done)" : String.format("(%s/s)",formatCompactRate(poisReadSec)))); + sb.append(String.format(" │ \033[32mPOI Index Rec Read:\033[0m %s \033[33m%s\033[0m", formatCompactNumber(getPoiIndexRecRead()), isPoiIndexRecReadDone() ? "(done)" : String.format("(%s/s)", formatCompactRate(poisReadSec)))); sb.append(String.format(" │ \033[32mPOIs Processed:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(getPoisProcessed()), formatCompactRate(poisPerSec))); sb.append(String.format(" │ \033[35mProgress:\033[0m %.4f%%", percentage)); sb.append(String.format(" │ \033[36mQueue:\033[0m %s", formatCompactNumber(getQueueSize()))); @@ -497,13 +580,24 @@ public void startProgressReporter() { long compactionElapsed = System.currentTimeMillis() - getCompactionStartTime(); double compactionPhaseSeconds = compactionElapsed / 1000.0; long shardsCompacted = getCompactionEntriesProcessed(); - long shardsPerSec = compactionPhaseSeconds > 0 ? (long)(shardsCompacted / compactionPhaseSeconds) : 0; + long shardsPerSec = compactionPhaseSeconds > 0 ? (long) (shardsCompacted / compactionPhaseSeconds) : 0; long remaining = getCompactionEntriesRemaining(); sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mCompacting POIs\033[0m", formatTime(elapsed))); sb.append(String.format(" │ \033[32mShards Compacted:\033[0m %s \033[33m(%s/s)\033[0m", formatCompactNumber(shardsCompacted), formatCompactRate(shardsPerSec))); sb.append(String.format(" │ \033[37mRemaining:\033[0m %s", formatCompactNumber(remaining))); + } else if (phase.contains("2.3")) { + long compactionElapsed = System.currentTimeMillis() - getCompactionStartTime(); + double compactionPhaseSeconds = compactionElapsed / 1000.0; + long shardsCompacted = getCompactionEntriesProcessed(); + long shardsPerSec = compactionPhaseSeconds > 0 ? (long) (shardsCompacted / compactionPhaseSeconds) : 0; + long remaining = getCompactionEntriesRemaining(); + sb.append(String.format("\033[1;36m[%s]\033[0m \033[1mCompacting Buildings\033[0m", formatTime(elapsed))); + sb.append(String.format(" │ \033[32mShards Compacted:\033[0m %s \033[33m(%s/s)\033[0m", + formatCompactNumber(shardsCompacted), formatCompactRate(shardsPerSec))); + sb.append(String.format(" │ \033[37mRemaining:\033[0m %s", formatCompactNumber(remaining))); + } else { sb.append(String.format("\033[1;36m[%s]\033[0m %s", formatTime(elapsed), phase)); } @@ -536,43 +630,52 @@ public void printFinalStatistics() { System.out.printf("\n\033[1;37m⏱️ Total Import Time:\033[0m \033[1;33m%s\033[0m%n%n", formatTime(getTotalTime())); System.out.println("\033[1;37m📊 Processing Summary:\033[0m"); - System.out.println("┌─────────────────┬─────────────────┬─────────────────┐"); - System.out.println("│ \033[1mEntity Type\033[0m │ \033[1mTotal Count\033[0m │ \033[1mAvg Speed\033[0m │"); - System.out.println("├─────────────────┼─────────────────┼─────────────────┤"); - System.out.printf("│ \033[32mPBF Entities\033[0m │ %15s │ %13s/s │%n", + System.out.println("┌────────────────────┬─────────────────┬─────────────────┐"); + System.out.println("│ \033[1mEntity Type\033[0m │ \033[1mTotal Count\033[0m │ \033[1mAvg Speed\033[0m │"); + System.out.println("├────────────────────┼─────────────────┼─────────────────┤"); + System.out.printf("│ \033[32mPBF Entities\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getEntitiesRead()), - formatCompactNumber((long)(getEntitiesRead() / totalSeconds))); - System.out.printf("│ \033[37mNodes Found\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber((long) (getEntitiesRead() / totalSeconds))); + System.out.printf("│ \033[37mNodes Found\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getNodesFound()), - formatCompactNumber((long)(getNodesFound() / totalSeconds))); - System.out.printf("│ \033[34mNodes Cached\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber((long) (getNodesFound() / totalSeconds))); + System.out.printf("│ \033[34mNodes Cached\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getNodesCached()), - formatCompactNumber((long)(getNodesCached() / totalSeconds))); - System.out.printf("│ \033[35mWays Processed\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber((long) (getNodesCached() / totalSeconds))); + System.out.printf("│ \033[35mWays Processed\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getWaysProcessed()), - formatCompactNumber((long)(getWaysProcessed() / totalSeconds))); - System.out.printf("│ \033[36mBoundaries\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber((long) (getWaysProcessed() / totalSeconds))); + System.out.printf("│ \033[36mBoundaries\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getBoundariesProcessed()), - formatCompactNumber((long)(getBoundariesProcessed() / totalSeconds))); - System.out.printf("│ \033[33mPOIs Created\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber((long) (getBoundariesProcessed() / totalSeconds))); + System.out.printf("│ \033[31mBuildings Found\033[0m │ %15s │ %13s/s │%n", + formatCompactNumber(getBuildingsFound()), + formatCompactNumber((long) (getBuildingsFound() / totalSeconds))); + System.out.printf("│ \033[31mBuildings Processed\033[0m│ %15s │ %13s/s │%n", + formatCompactNumber(getBuildingsProcessed()), + formatCompactNumber((long) (getBuildingsProcessed() / totalSeconds))); + System.out.printf("│ \033[33mPOIs Created\033[0m │ %15s │ %13s/s │%n", formatCompactNumber(getPoisProcessed()), - formatCompactNumber((long)(getPoisProcessed() / totalSeconds))); - System.out.println("└─────────────────┴─────────────────┴─────────────────┘"); + formatCompactNumber((long) (getPoisProcessed() / totalSeconds))); + System.out.println("└────────────────────┴─────────────────┴─────────────────┘"); - long totalObjects = getNodesFound() + getNodesCached() + getWaysProcessed() + getBoundariesProcessed() + getPoisProcessed(); + long totalObjects = getNodesFound() + getNodesCached() + getWaysProcessed() + + getBoundariesProcessed() + getBuildingsProcessed() + getPoisProcessed(); System.out.printf("%n\033[1;37m🚀 Overall Throughput:\033[0m \033[1;32m%s objects\033[0m processed at \033[1;33m%s objects/sec\033[0m%n", formatCompactNumber(totalObjects), - formatCompactNumber((long)(totalObjects / totalSeconds))); + formatCompactNumber((long) (totalObjects / totalSeconds))); System.out.printf("\033[1;37m💾 Database Operations:\033[0m \033[1;36m%s writes\033[0m%n", formatCompactNumber(getRocksDbWrites())); System.out.println("\n\033[1;37m📦 Dataset Size:\033[0m " + formatSize(getDatasetBytes())); - System.out.println(" • poi_shards: " + formatSize(getShardsBytes())); - System.out.println(" • boundaries: " + formatSize(getBoundariesBytes())); + System.out.println(" • poi_shards : " + formatSize(getShardsBytes())); + System.out.println(" • boundaries : " + formatSize(getBoundariesBytes())); + System.out.println(" • building_shards: " + formatSize(getBuildingsBytes())); System.out.println("\n\033[1;37m🧹 Temporary DBs:\033[0m " + formatSize(getTmpTotalBytes())); System.out.println(" • grid_index: " + formatSize(getTmpGridBytes())); + System.out.println(" • building_grid_index:" + formatSize(getTmpBuildingGridBytes())); System.out.println(" • node_cache: " + formatSize(getTmpNodeBytes())); System.out.println(" • way_index: " + formatSize(getTmpWayBytes())); System.out.println(" • boundary_way_index: " + formatSize(getTmpBoundaryWayBytes())); @@ -628,7 +731,7 @@ private String formatCompactNumber(long n) { if (n < 1_000_000) return String.format("%.2fk", n / 1000.0); return String.format("%.3fM", n / 1_000_000.0); } - + private String formatCompactRate(long n) { if (n < 1000) return String.valueOf(n); if (n < 1_000_000) return String.format("%.1fk", n / 1000.0);