Skip to content

Commit b87dcaa

Browse files
authored
Close db query rs and stmt before processing results (#61)
* Update Java version to 24 * Close db query rs and stmt before processing results * Add Github Actions job that runs on feature branches * Method getMinSec returns milliseconds * Update timer handling * Fix end time handling * Simplify method closeQuery * Use seconds as float numbers in log statement
1 parent 53fc765 commit b87dcaa

8 files changed

Lines changed: 150 additions & 57 deletions

File tree

.github/workflows/test-and-build.yml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ jobs:
77
runs-on: ubuntu-latest
88
steps:
99
- uses: actions/checkout@v2
10-
- name: Set up JDK 11
10+
- name: Set up JDK 24
1111
uses: actions/setup-java@v3
1212
with:
1313
distribution: 'temurin'
14-
java-version: '11'
14+
java-version: '24'
1515
cache: 'maven'
1616
- name: Build with Maven
1717
run: mvn --file pom.xml clean install
@@ -79,3 +79,22 @@ jobs:
7979
username: ${{ secrets.DOCKER_USERNAME }}
8080
password: ${{ secrets.DOCKER_PASSWORD }}
8181
tags: aks-dev
82+
build-feature-branch-docker-image:
83+
needs: test
84+
runs-on: ubuntu-latest
85+
# Run only on branches starting with feature/
86+
if: startsWith(github.ref, 'refs/heads/feature/')
87+
steps:
88+
- uses: actions/checkout@v2
89+
- name: Download .jar file
90+
uses: actions/download-artifact@v4
91+
with:
92+
name: transitdata-pubtrans-source.jar
93+
path: target
94+
- name: Build and publish feature branch Docker image
95+
uses: elgohr/Publish-Docker-Github-Action@master
96+
with:
97+
name: hsldevcom/transitdata-pubtrans-source
98+
username: ${{ secrets.DOCKER_USERNAME }}
99+
password: ${{ secrets.DOCKER_PASSWORD }}
100+
tags: feature-branch

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM eclipse-temurin:11-alpine
1+
FROM eclipse-temurin:24-alpine
22
#Install curl for health check
33
RUN apk add --no-cache curl
44
ADD target/transitdata-pubtrans-source.jar /usr/app/transitdata-pubtrans-source.jar

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
<properties>
1616
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
17-
<maven.compiler.source>11</maven.compiler.source>
18-
<maven.compiler.target>11</maven.compiler.target>
17+
<maven.compiler.source>24</maven.compiler.source>
18+
<maven.compiler.target>24</maven.compiler.target>
1919
<common.version>2.0.3</common.version>
2020
</properties>
2121

@@ -103,8 +103,8 @@
103103
<artifactId>maven-compiler-plugin</artifactId>
104104
<version>3.8.0</version>
105105
<configuration>
106-
<source>11</source>
107-
<target>11</target>
106+
<source>24</source>
107+
<target>24</target>
108108
</configuration>
109109
</plugin>
110110

src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/ArrivalHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@
44
import fi.hsl.common.transitdata.TransitdataProperties;
55
import fi.hsl.common.transitdata.TransitdataSchema;
66
import fi.hsl.common.transitdata.proto.PubtransTableProtos;
7-
import org.apache.pulsar.client.api.Producer;
8-
import org.apache.pulsar.client.api.TypedMessageBuilder;
9-
import redis.clients.jedis.Jedis;
107

118
import java.sql.ResultSet;
129
import java.sql.SQLException;
13-
import java.util.LinkedList;
10+
import java.util.Map;
1411
import java.util.Optional;
15-
import java.util.Queue;
1612

1713
public class ArrivalHandler extends PubtransTableHandler {
1814

@@ -35,9 +31,15 @@ protected String getTimetabledDateTimeColumnName() {
3531
protected TransitdataSchema getSchema() {
3632
return schema;
3733
}
34+
35+
@Override
36+
protected Map<String, Long> getTableColumnToIdMap(ResultSet resultSet) throws SQLException {
37+
return Map.of();
38+
}
3839

3940
@Override
40-
protected byte[] createPayload(ResultSet resultSet, PubtransTableProtos.Common common, PubtransTableProtos.DOITripInfo tripInfo) throws SQLException {
41+
protected byte[] createPayload(PubtransTableProtos.Common common, Map<String,
42+
Long> columnToIdMap, PubtransTableProtos.DOITripInfo tripInfo) throws SQLException {
4143
PubtransTableProtos.ROIArrival.Builder arrivalBuilder = PubtransTableProtos.ROIArrival.newBuilder();
4244
arrivalBuilder.setSchemaVersion(arrivalBuilder.getSchemaVersion());
4345
arrivalBuilder.setCommon(common);

src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/DepartureHandler.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,17 @@
44
import fi.hsl.common.transitdata.TransitdataProperties;
55
import fi.hsl.common.transitdata.TransitdataSchema;
66
import fi.hsl.common.transitdata.proto.PubtransTableProtos;
7-
import org.apache.pulsar.client.api.Producer;
8-
import org.apache.pulsar.client.api.TypedMessageBuilder;
9-
import org.slf4j.Logger;
10-
import org.slf4j.LoggerFactory;
11-
import redis.clients.jedis.Jedis;
127

138
import java.sql.ResultSet;
149
import java.sql.SQLException;
15-
import java.util.LinkedList;
16-
import java.util.Optional;
17-
import java.util.Queue;
10+
import java.util.*;
1811

1912
public class DepartureHandler extends PubtransTableHandler {
2013

2114
static final TransitdataSchema schema;
15+
private static final String COLUMN_HAS_DESTINATION_DISPLAY_ID = "HasDestinationDisplayId";
16+
private static final String COLUMN_HAS_DESTINATION_STOP_AREA_GID = "HasDestinationStopAreaGid";
17+
private static final String COLUMN_HAS_SERVICE_REQUIREMENT_ID = "HasServiceRequirementId";
2218
static {
2319
int defaultVersion = PubtransTableProtos.ROIDeparture.newBuilder().getSchemaVersion();
2420
schema = new TransitdataSchema(TransitdataProperties.ProtobufSchema.PubtransRoiDeparture, Optional.of(defaultVersion));
@@ -37,19 +33,35 @@ protected String getTimetabledDateTimeColumnName() {
3733
protected TransitdataSchema getSchema() {
3834
return schema;
3935
}
36+
37+
@Override
38+
protected Map<String, Long> getTableColumnToIdMap(ResultSet resultSet) throws SQLException {
39+
Map<String, Long> columnToIdMap = new HashMap<>();
40+
if (resultSet.getBytes(COLUMN_HAS_DESTINATION_DISPLAY_ID) != null) {
41+
columnToIdMap.put(COLUMN_HAS_DESTINATION_DISPLAY_ID, resultSet.getLong(COLUMN_HAS_DESTINATION_DISPLAY_ID));
42+
}
43+
if (resultSet.getBytes(COLUMN_HAS_DESTINATION_STOP_AREA_GID) != null) {
44+
columnToIdMap.put(COLUMN_HAS_DESTINATION_STOP_AREA_GID, resultSet.getLong(COLUMN_HAS_DESTINATION_STOP_AREA_GID));
45+
}
46+
if (resultSet.getBytes(COLUMN_HAS_SERVICE_REQUIREMENT_ID) != null) {
47+
columnToIdMap.put(COLUMN_HAS_SERVICE_REQUIREMENT_ID, resultSet.getLong(COLUMN_HAS_SERVICE_REQUIREMENT_ID));
48+
}
49+
return columnToIdMap;
50+
}
4051

4152
@Override
42-
protected byte[] createPayload(ResultSet resultSet, PubtransTableProtos.Common common, PubtransTableProtos.DOITripInfo tripInfo) throws SQLException {
53+
protected byte[] createPayload(PubtransTableProtos.Common common, Map<String, Long> columnToIdMap,
54+
PubtransTableProtos.DOITripInfo tripInfo) throws SQLException {
4355
PubtransTableProtos.ROIDeparture.Builder departureBuilder = PubtransTableProtos.ROIDeparture.newBuilder();
4456
departureBuilder.setSchemaVersion(departureBuilder.getSchemaVersion());
4557
departureBuilder.setCommon(common);
4658
departureBuilder.setTripInfo(tripInfo);
47-
if (resultSet.getBytes("HasDestinationDisplayId") != null)
48-
departureBuilder.setHasDestinationDisplayId(resultSet.getLong("HasDestinationDisplayId"));
49-
if (resultSet.getBytes("HasDestinationStopAreaGid") != null)
50-
departureBuilder.setHasDestinationStopAreaGid(resultSet.getLong("HasDestinationStopAreaGid"));
51-
if (resultSet.getBytes("HasServiceRequirementId") != null)
52-
departureBuilder.setHasServiceRequirementId(resultSet.getLong("HasServiceRequirementId"));
59+
if (columnToIdMap.containsKey(COLUMN_HAS_DESTINATION_DISPLAY_ID))
60+
departureBuilder.setHasDestinationDisplayId(columnToIdMap.get(COLUMN_HAS_DESTINATION_DISPLAY_ID));
61+
if (columnToIdMap.containsKey(COLUMN_HAS_DESTINATION_STOP_AREA_GID))
62+
departureBuilder.setHasDestinationStopAreaGid(columnToIdMap.get(COLUMN_HAS_DESTINATION_STOP_AREA_GID));
63+
if (columnToIdMap.containsKey(COLUMN_HAS_SERVICE_REQUIREMENT_ID))
64+
departureBuilder.setHasServiceRequirementId(columnToIdMap.get(COLUMN_HAS_SERVICE_REQUIREMENT_ID));
5365
PubtransTableProtos.ROIDeparture departure = departureBuilder.build();
5466
return departure.toByteArray();
5567
}

src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import java.time.format.DateTimeFormatter;
1515
import java.time.temporal.ChronoUnit;
1616
import java.util.Collection;
17-
import java.util.Queue;
1817
import java.util.concurrent.TimeUnit;
1918

2019
public class PubtransConnector {
@@ -109,6 +108,28 @@ static boolean isCacheValid(OffsetDateTime lastCacheUpdate, final int cacheMaxAg
109108
log.info("Current time {}, last update {}} => mins from prev update: {}", now, lastCacheUpdate, minutesSinceUpdate);
110109
return minutesSinceUpdate <= cacheMaxAgeInMins;
111110
}
111+
112+
static void closeQuery(final ResultSet resultSet, final Statement statement) {
113+
if (resultSet != null) {
114+
try {
115+
resultSet.close();
116+
log.debug("ResultSet closed.");
117+
} catch (SQLException e) {
118+
log.error("Failed to close ResultSet", e);
119+
}
120+
}
121+
if (statement != null) {
122+
try {
123+
statement.close();
124+
log.debug("Statement closed.");
125+
} catch (SQLException e) {
126+
log.error("Failed to close Statement", e);
127+
}
128+
}
129+
if (resultSet == null && statement == null) {
130+
log.warn("ResultSet and Statement are null, nothing to close.");
131+
}
132+
}
112133

113134
public void queryAndProcessResults() throws SQLException, PulsarClientException {
114135

@@ -122,16 +143,11 @@ public void queryAndProcessResults() throws SQLException, PulsarClientException
122143
statement.setQueryTimeout(queryTimeoutSecs);
123144

124145
resultSet = statement.executeQuery();
125-
126-
produceMessages(handler.handleResultSet(resultSet));
127-
} finally {
128-
if (resultSet != null) try { resultSet.close(); } catch (Exception e) { log.error("Exception while closing result set", e); }
129-
if (statement != null) try { statement.close(); } catch (Exception e) { log.error("Exception while closing statement", e); }
130-
long queryDuration = System.currentTimeMillis() - queryStartTime;
131-
long secondsDuration = queryDuration / 1000;
132-
long minutesDuration = secondsDuration / 60;
133-
long remainingSecondsDuration = secondsDuration % 60;
134-
log.info("Database query executed in {} min {} sec", minutesDuration, remainingSecondsDuration);
146+
147+
produceMessages(handler.handleResultSet(resultSet, statement, queryStartTime));
148+
} catch (PulsarClientException | SQLException e) {
149+
closeQuery(resultSet, statement);
150+
throw e;
135151
}
136152
}
137153

src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.slf4j.LoggerFactory;
1111
import redis.clients.jedis.Jedis;
1212

13+
import java.sql.PreparedStatement;
1314
import java.sql.ResultSet;
1415
import java.sql.SQLException;
1516
import java.time.LocalDateTime;
@@ -25,6 +26,8 @@ public abstract class PubtransTableHandler {
2526
private Jedis jedis;
2627
private final String timeZone;
2728
private final boolean excludeMetroTrips;
29+
private record QueryResultItem(PubtransTableProtos.Common common, String key, long eventTimestampUtcMs,
30+
Map<String, Long> columnToIdMap) {};
2831

2932
public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) {
3033
lastModifiedTimeStamp = (System.currentTimeMillis() - 5000);
@@ -62,35 +65,58 @@ public static Optional<Long> toUtcEpochMs(String localTimestamp, String zoneId)
6265
return Optional.empty();
6366
}
6467
}
68+
69+
public static float getSeconds(long durationMs) {
70+
return durationMs / 1000.0f;
71+
}
72+
73+
abstract protected Map<String, Long> getTableColumnToIdMap(ResultSet resultSet) throws SQLException;
6574

66-
abstract protected byte[] createPayload(ResultSet resultSet, PubtransTableProtos.Common common, PubtransTableProtos.DOITripInfo tripInfo) throws SQLException;
75+
abstract protected byte[] createPayload(
76+
PubtransTableProtos.Common common, Map<String, Long> columnToIdMap,
77+
PubtransTableProtos.DOITripInfo tripInfo) throws SQLException;
6778

6879
abstract protected String getTimetabledDateTimeColumnName();
6980

7081
abstract protected TransitdataSchema getSchema();
7182

72-
public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultSet) throws SQLException {
83+
public Collection<TypedMessageBuilder<byte[]>> handleResultSet(
84+
ResultSet resultSet, PreparedStatement statement, long queryStartTime)
85+
throws SQLException {
7386
List<TypedMessageBuilder<byte[]>> messageBuilderQueue = new ArrayList<>();
7487

7588
long tempTimeStamp = getLastModifiedTimeStamp();
7689

7790
int count = 0;
7891
int metroTripCount = 0;
7992
Set<String> metroRouteIds = new HashSet<>();
93+
List<QueryResultItem> queryResultItems = new ArrayList<>();
94+
long queryDuration = -1L;
95+
long resultHandlerDuration = -1L;
96+
long queryAndResultHandlerDuration = -1L;
8097

8198
while (resultSet.next()) {
8299
count++;
83-
100+
84101
PubtransTableProtos.Common common = parseCommon(resultSet);
85102
final long eventTimestampUtcMs = common.getLastModifiedUtcDateTimeMs();
86-
103+
87104
final long delay = System.currentTimeMillis() - eventTimestampUtcMs;
88105
log.debug("Delay between current time and estimate publish time is {} ms", delay);
89-
106+
90107
final String key = resultSet.getString("IsOnDatedVehicleJourneyId") + resultSet.getString("JourneyPatternSequenceNumber");
91-
final long dvjId = common.getIsOnDatedVehicleJourneyId();
92-
final long scheduledJppId = common.getIsTimetabledAtJourneyPatternPointGid();
93-
final long targetedJppId = common.getIsTargetedAtJourneyPatternPointGid();
108+
final Map<String, Long> columnToIdMap = getTableColumnToIdMap(resultSet);
109+
queryResultItems.add(new QueryResultItem(common, key, eventTimestampUtcMs, columnToIdMap));
110+
}
111+
112+
PubtransConnector.closeQuery(resultSet, statement);
113+
long queryEndTime = System.currentTimeMillis();
114+
queryDuration = queryEndTime - queryStartTime;
115+
116+
for (QueryResultItem queryResultItem : queryResultItems) {
117+
final long dvjId = queryResultItem.common.getIsOnDatedVehicleJourneyId();
118+
final long scheduledJppId = queryResultItem.common.getIsTimetabledAtJourneyPatternPointGid();
119+
final long targetedJppId = queryResultItem.common.getIsTargetedAtJourneyPatternPointGid();
94120

95121
Optional<PubtransTableProtos.DOITripInfo> maybeTripInfo = getTripInfo(dvjId, scheduledJppId, targetedJppId);
96122
if (maybeTripInfo.isEmpty()) {
@@ -102,21 +128,28 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS
102128
metroTripCount++;
103129
metroRouteIds.add(tripInfo.getRouteId());
104130
} else {
105-
final byte[] data = createPayload(resultSet, common, tripInfo);
106-
TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema());
131+
final byte[] data = createPayload(queryResultItem.common, queryResultItem.columnToIdMap, tripInfo);
132+
TypedMessageBuilder<byte[]> msgBuilder = createMessage(queryResultItem.key,
133+
queryResultItem.eventTimestampUtcMs, dvjId, data, getSchema());
107134
messageBuilderQueue.add(msgBuilder);
108135
}
109136
}
110137

111138
//Update latest ts for next round
112-
if (eventTimestampUtcMs > tempTimeStamp) {
113-
tempTimeStamp = eventTimestampUtcMs;
139+
if (queryResultItem.eventTimestampUtcMs > tempTimeStamp) {
140+
tempTimeStamp = queryResultItem.eventTimestampUtcMs;
114141
}
115142
}
116-
117-
log.info("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {})",
118-
count, metroTripCount, metroRouteIds);
119-
143+
144+
long endTime = System.currentTimeMillis();
145+
resultHandlerDuration = endTime - queryEndTime;
146+
queryAndResultHandlerDuration = endTime - queryStartTime;
147+
148+
log.info("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {}). "
149+
+ "Operation took {} s (db query took {} s, handling results took {} s)",
150+
count, metroTripCount, metroRouteIds,
151+
getSeconds(queryAndResultHandlerDuration), getSeconds(queryDuration), getSeconds(resultHandlerDuration));
152+
120153
setLastModifiedTimeStamp(tempTimeStamp);
121154

122155
return messageBuilderQueue;

src/test/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandlerTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
import java.util.Optional;
66

77
import static org.junit.Assert.assertEquals;
8-
import static org.junit.Assert.assertTrue;
98

109
public class PubtransTableHandlerTest {
10+
private static final double DELTA = 1e-15;
11+
1112
@Test
1213
public void testTimestampConversionInUTC() {
1314
final String timezone = "UTC";
@@ -39,5 +40,15 @@ public void testDaylightSavings() {
3940
assertEquals(summerTime, summerTimeWithDaylightSavings);
4041
assertEquals(summerTimeWithDaylightSavings, winterTime + 1);
4142
}
42-
43+
44+
@Test
45+
public void testConvertMillisecondsSeconds() {
46+
assertEquals(0.0f, PubtransTableHandler.getSeconds(0), DELTA);
47+
assertEquals(0.011f, PubtransTableHandler.getSeconds(11), DELTA);
48+
assertEquals(0.5f, PubtransTableHandler.getSeconds(500), DELTA);
49+
assertEquals(0.6f, PubtransTableHandler.getSeconds(600), DELTA);
50+
assertEquals(1.0f, PubtransTableHandler.getSeconds(1000), DELTA);
51+
assertEquals(1.501f, PubtransTableHandler.getSeconds(1501), DELTA);
52+
assertEquals(59.0f, PubtransTableHandler.getSeconds(59000), DELTA);
53+
}
4354
}

0 commit comments

Comments
 (0)