Skip to content

Commit 433fd85

Browse files
committed
[client] Support log scanner scan to arrow record batch
1 parent 57cb378 commit 433fd85

23 files changed

Lines changed: 1679 additions & 283 deletions

fluss-client/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@
4949
<version>${project.version}</version>
5050
</dependency>
5151

52+
<dependency>
53+
<groupId>org.apache.arrow</groupId>
54+
<artifactId>arrow-vector</artifactId>
55+
<version>${arrow.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.arrow</groupId>
61+
<artifactId>arrow-memory-netty</artifactId>
62+
<version>${arrow.version}</version>
63+
<scope>provided</scope>
64+
</dependency>
65+
5266
<!-- test dependency -->
5367
<dependency>
5468
<groupId>org.apache.fluss</groupId>
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.metadata.MetadataUpdater;
22+
import org.apache.fluss.config.ConfigOptions;
23+
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.exception.AuthorizationException;
25+
import org.apache.fluss.exception.FetchException;
26+
import org.apache.fluss.metadata.TableBucket;
27+
import org.apache.fluss.metadata.TablePath;
28+
import org.apache.fluss.rpc.protocol.ApiError;
29+
import org.apache.fluss.rpc.protocol.Errors;
30+
31+
import org.slf4j.Logger;
32+
33+
import javax.annotation.Nullable;
34+
import javax.annotation.concurrent.ThreadSafe;
35+
36+
import java.util.ArrayList;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
/** Shared implementation for polling completed fetches into scanner results. */
42+
@ThreadSafe
43+
@Internal
44+
abstract class AbstractLogFetchCollector<T, R> {
45+
protected final Logger log;
46+
protected final TablePath tablePath;
47+
protected final LogScannerStatus logScannerStatus;
48+
private final int maxPollRecords;
49+
private final MetadataUpdater metadataUpdater;
50+
51+
protected AbstractLogFetchCollector(
52+
Logger log,
53+
TablePath tablePath,
54+
LogScannerStatus logScannerStatus,
55+
Configuration conf,
56+
MetadataUpdater metadataUpdater) {
57+
this.log = log;
58+
this.tablePath = tablePath;
59+
this.logScannerStatus = logScannerStatus;
60+
this.maxPollRecords = conf.getInt(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS);
61+
this.metadataUpdater = metadataUpdater;
62+
}
63+
64+
/**
65+
* Return the fetched log records, empty the record buffer and update the consumed position.
66+
*
67+
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
68+
*
69+
* @return The fetched records per partition
70+
* @throws FetchException If there is OffsetOutOfRange error in fetchResponse and the
71+
* defaultResetPolicy is NONE
72+
*/
73+
public R collectFetch(final LogFetchBuffer logFetchBuffer) {
74+
Map<TableBucket, List<T>> fetched = new HashMap<>();
75+
int recordsRemaining = maxPollRecords;
76+
77+
try {
78+
while (recordsRemaining > 0) {
79+
CompletedFetch nextInLineFetch = logFetchBuffer.nextInLineFetch();
80+
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
81+
CompletedFetch completedFetch = logFetchBuffer.peek();
82+
if (completedFetch == null) {
83+
break;
84+
}
85+
86+
if (!completedFetch.isInitialized()) {
87+
try {
88+
logFetchBuffer.setNextInLineFetch(initialize(completedFetch));
89+
} catch (Exception e) {
90+
// Remove a completedFetch upon a parse with exception if
91+
// (1) it contains no records, and
92+
// (2) there are no fetched records with actual content
93+
// preceding this exception.
94+
if (fetched.isEmpty() && completedFetch.sizeInBytes == 0) {
95+
logFetchBuffer.poll();
96+
}
97+
throw e;
98+
}
99+
} else {
100+
logFetchBuffer.setNextInLineFetch(completedFetch);
101+
}
102+
103+
logFetchBuffer.poll();
104+
} else {
105+
List<T> records = fetchRecords(nextInLineFetch, recordsRemaining);
106+
if (!records.isEmpty()) {
107+
TableBucket tableBucket = nextInLineFetch.tableBucket;
108+
List<T> currentRecords = fetched.get(tableBucket);
109+
if (currentRecords == null) {
110+
fetched.put(tableBucket, records);
111+
} else {
112+
// this case shouldn't usually happen because we only send one fetch
113+
// at a time per bucket, but it might conceivably happen in some rare
114+
// cases (such as bucket leader changes). we have to copy to a new list
115+
// because the old one may be immutable
116+
List<T> mergedRecords =
117+
new ArrayList<>(records.size() + currentRecords.size());
118+
mergedRecords.addAll(currentRecords);
119+
mergedRecords.addAll(records);
120+
fetched.put(tableBucket, mergedRecords);
121+
}
122+
123+
recordsRemaining -= recordCount(records);
124+
}
125+
}
126+
}
127+
} catch (FetchException e) {
128+
if (fetched.isEmpty()) {
129+
throw e;
130+
}
131+
}
132+
133+
return toResult(fetched);
134+
}
135+
136+
/** Initialize a {@link CompletedFetch} object. */
137+
@Nullable
138+
private CompletedFetch initialize(CompletedFetch completedFetch) {
139+
TableBucket tb = completedFetch.tableBucket;
140+
ApiError error = completedFetch.error;
141+
142+
try {
143+
if (error.isSuccess()) {
144+
return handleInitializeSuccess(completedFetch);
145+
} else {
146+
handleInitializeErrors(completedFetch, error.error(), error.messageWithFallback());
147+
return null;
148+
}
149+
} finally {
150+
if (error.isFailure()) {
151+
// we move the bucket to the end if there was an error. This way,
152+
// it's more likely that buckets for the same table can remain together
153+
// (allowing for more efficient serialization).
154+
logScannerStatus.moveBucketToEnd(tb);
155+
}
156+
}
157+
}
158+
159+
private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
160+
TableBucket tb = completedFetch.tableBucket;
161+
long fetchOffset = completedFetch.nextFetchOffset();
162+
163+
// we are interested in this fetch only if the beginning offset matches the
164+
// current consumed position.
165+
Long offset = logScannerStatus.getBucketOffset(tb);
166+
if (offset == null) {
167+
log.debug(
168+
"Discarding stale fetch response for bucket {} since the expected offset is null which means the bucket has been unsubscribed.",
169+
tb);
170+
return null;
171+
}
172+
if (offset != fetchOffset) {
173+
log.warn(
174+
"Discarding stale fetch response for bucket {} since its offset {} does not match the expected offset {}.",
175+
tb,
176+
fetchOffset,
177+
offset);
178+
return null;
179+
}
180+
181+
long highWatermark = completedFetch.highWatermark;
182+
if (highWatermark >= 0) {
183+
log.trace("Updating high watermark for bucket {} to {}.", tb, highWatermark);
184+
logScannerStatus.updateHighWatermark(tb, highWatermark);
185+
}
186+
187+
completedFetch.setInitialized();
188+
return completedFetch;
189+
}
190+
191+
private void handleInitializeErrors(
192+
CompletedFetch completedFetch, Errors error, String errorMessage) {
193+
TableBucket tb = completedFetch.tableBucket;
194+
long fetchOffset = completedFetch.nextFetchOffset();
195+
if (error == Errors.NOT_LEADER_OR_FOLLOWER
196+
|| error == Errors.LOG_STORAGE_EXCEPTION
197+
|| error == Errors.KV_STORAGE_EXCEPTION
198+
|| error == Errors.STORAGE_EXCEPTION
199+
|| error == Errors.FENCED_LEADER_EPOCH_EXCEPTION) {
200+
log.debug(
201+
"Error in fetch for bucket {}: {}:{}",
202+
tb,
203+
error.exceptionName(),
204+
error.exception(errorMessage));
205+
metadataUpdater.checkAndUpdateMetadata(tablePath, tb);
206+
} else if (error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION) {
207+
log.warn("Received unknown table or bucket error in fetch for bucket {}", tb);
208+
metadataUpdater.checkAndUpdateMetadata(tablePath, tb);
209+
} else if (error == Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION) {
210+
throw new FetchException(
211+
String.format(
212+
"The fetching offset %s is out of range: %s",
213+
fetchOffset, error.exception(errorMessage)));
214+
} else if (error == Errors.AUTHORIZATION_EXCEPTION) {
215+
throw new AuthorizationException(errorMessage);
216+
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
217+
log.warn(
218+
"Unknown server error while fetching offset {} for bucket {}: {}",
219+
fetchOffset,
220+
tb,
221+
error.exception(errorMessage));
222+
} else if (error == Errors.CORRUPT_MESSAGE) {
223+
throw new FetchException(
224+
String.format(
225+
"Encountered corrupt message when fetching offset %s for bucket %s: %s",
226+
fetchOffset, tb, error.exception(errorMessage)));
227+
} else {
228+
throw new FetchException(
229+
String.format(
230+
"Unexpected error code %s while fetching at offset %s from bucket %s: %s",
231+
error, fetchOffset, tb, error.exception(errorMessage)));
232+
}
233+
}
234+
235+
protected abstract List<T> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords);
236+
237+
protected abstract int recordCount(List<T> fetchedRecords);
238+
239+
protected abstract R toResult(Map<TableBucket, List<T>> fetchedRecords);
240+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.metadata.MetadataUpdater;
22+
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.record.ArrowBatchData;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.concurrent.ThreadSafe;
31+
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/** Collects Arrow batches from completed fetches. */
37+
@ThreadSafe
38+
@Internal
39+
public class ArrowLogFetchCollector
40+
extends AbstractLogFetchCollector<ArrowBatchData, ArrowScanRecords> {
41+
private static final Logger LOG = LoggerFactory.getLogger(ArrowLogFetchCollector.class);
42+
43+
public ArrowLogFetchCollector(
44+
TablePath tablePath,
45+
LogScannerStatus logScannerStatus,
46+
Configuration conf,
47+
MetadataUpdater metadataUpdater) {
48+
super(LOG, tablePath, logScannerStatus, conf, metadataUpdater);
49+
}
50+
51+
@Override
52+
protected List<ArrowBatchData> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
53+
TableBucket tb = nextInLineFetch.tableBucket;
54+
Long offset = logScannerStatus.getBucketOffset(tb);
55+
if (offset == null) {
56+
LOG.debug(
57+
"Ignoring fetched records for {} at offset {} since the current offset is null which means the bucket has been unsubscribed.",
58+
tb,
59+
nextInLineFetch.nextFetchOffset());
60+
} else {
61+
if (nextInLineFetch.nextFetchOffset() == offset) {
62+
List<ArrowBatchData> batches = nextInLineFetch.fetchArrowBatches(maxRecords);
63+
LOG.trace(
64+
"Returning {} fetched arrow batches at offset {} for assigned bucket {}.",
65+
batches.size(),
66+
offset,
67+
tb);
68+
69+
if (nextInLineFetch.nextFetchOffset() > offset) {
70+
LOG.trace(
71+
"Updating fetch offset from {} to {} for bucket {} and returning {} arrow batches from poll()",
72+
offset,
73+
nextInLineFetch.nextFetchOffset(),
74+
tb,
75+
batches.size());
76+
logScannerStatus.updateOffset(tb, nextInLineFetch.nextFetchOffset());
77+
}
78+
return batches;
79+
} else {
80+
// these records aren't next in line based on the last consumed offset, ignore them
81+
// they must be from an obsolete request
82+
LOG.warn(
83+
"Ignoring fetched records for {} at offset {} since the current offset is {}",
84+
nextInLineFetch.tableBucket,
85+
nextInLineFetch.nextFetchOffset(),
86+
offset);
87+
}
88+
}
89+
90+
LOG.trace("Draining fetched records for bucket {}", nextInLineFetch.tableBucket);
91+
nextInLineFetch.drain();
92+
return Collections.emptyList();
93+
}
94+
95+
@Override
96+
protected int recordCount(List<ArrowBatchData> fetchedRecords) {
97+
int count = 0;
98+
for (ArrowBatchData fetchedRecord : fetchedRecords) {
99+
count += fetchedRecord.getRecordCount();
100+
}
101+
return count;
102+
}
103+
104+
@Override
105+
protected ArrowScanRecords toResult(Map<TableBucket, List<ArrowBatchData>> fetchedRecords) {
106+
return new ArrowScanRecords(fetchedRecords);
107+
}
108+
}

0 commit comments

Comments
 (0)