@@ -126,6 +126,7 @@ public class WriteBatcherImpl
126126 private boolean initialized = false ;
127127 private CompletableThreadPoolExecutor threadPool = null ;
128128 private DocumentMetadataHandle defaultMetadata ;
129+ private DocumentWriteSetFilter documentWriteSetFilter ;
129130
130131 public WriteBatcherImpl (DataMovementManager moveMgr , ForestConfiguration forestConfig ) {
131132 super (moveMgr );
@@ -200,7 +201,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) {
200201 writeSet .getDocumentWriteSet ().add (doc );
201202 }
202203 if ( writeSet .getDocumentWriteSet ().size () > minBatchSize ) {
203- threadPool .submit ( new BatchWriter (writeSet ) );
204+ threadPool .submit ( new BatchWriter (writeSet , documentWriteSetFilter ) );
204205 }
205206 }
206207 return this ;
@@ -308,7 +309,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) {
308309 for (WriteEvent doc : batch .getItems ()) {
309310 writeSet .getDocumentWriteSet ().add (doc .getTargetUri (), doc .getMetadata (), doc .getContent ());
310311 }
311- BatchWriter runnable = new BatchWriter (writeSet );
312+ BatchWriter runnable = new BatchWriter (writeSet , documentWriteSetFilter );
312313 runnable .run ();
313314 }
314315 @ Override
@@ -379,7 +380,7 @@ private void flush(boolean waitForCompletion) {
379380 DocumentWriteOperation doc = iter .next ();
380381 writeSet .getDocumentWriteSet ().add (doc );
381382 }
382- threadPool .submit ( new BatchWriter (writeSet ) );
383+ threadPool .submit ( new BatchWriter (writeSet , documentWriteSetFilter ) );
383384 }
384385
385386 if (waitForCompletion ) awaitCompletion ();
@@ -597,7 +598,7 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
597598 for ( WriteEvent doc : writerTask .batchWriteSet ().getBatchOfWriteEvents ().getItems () ) {
598599 writeSet .getDocumentWriteSet ().add (doc .getTargetUri (), doc .getMetadata (), doc .getContent ());
599600 }
600- BatchWriter retryWriterTask = new BatchWriter (writeSet );
601+ BatchWriter retryWriterTask = new BatchWriter (writeSet , documentWriteSetFilter );
601602 Runnable fretryWriterTask = (Runnable ) threadPool .submit (retryWriterTask );
602603 threadPool .replaceTask (writerTask , fretryWriterTask );
603604 // jump to the next task
@@ -846,4 +847,10 @@ public void addAll(Stream<? extends DocumentWriteOperation> operations) {
846847 public DocumentMetadataHandle getDocumentMetadata () {
847848 return defaultMetadata ;
848849}
850+
851+ @ Override
852+ public WriteBatcher withDocumentWriteSetFilter (DocumentWriteSetFilter filter ) {
853+ this .documentWriteSetFilter = filter ;
854+ return this ;
855+ }
849856}
0 commit comments