Skip to content

Commit fbe8e16

Browse files
committed
MLE-27304 Small cleanup for Polaris
No change here, just moving InputStreamTee into RequestLoggerImpl as that's the only place where it's used. And it makes it easier to see that the potential thread safety issues of InputStreamTee won't happen in practice.
1 parent 8492540 commit fbe8e16

File tree

2 files changed

+125
-134
lines changed

2 files changed

+125
-134
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/impl/InputStreamTee.java

Lines changed: 0 additions & 128 deletions
This file was deleted.

marklogic-client-api/src/main/java/com/marklogic/client/impl/RequestLoggerImpl.java

Lines changed: 125 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
/*
2-
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.client.impl;
55

6-
import java.io.File;
7-
import java.io.InputStream;
8-
import java.io.OutputStream;
9-
import java.io.PrintStream;
10-
import java.io.Reader;
6+
import java.io.*;
117

128
import org.slf4j.Logger;
139
import org.slf4j.LoggerFactory;
@@ -117,4 +113,127 @@ public void close() {
117113
enabled = false;
118114
}
119115

116+
// Moved here in the 8.2 release to make it obvious it's only used by RequestLoggerImpl. It is only used by the
117+
// copyContent method in RequestLoggerImpl. So while Polaris rightfully complains about the thread safety issues of
118+
// this class, it is only used in a single thread context and so the thread safety issues are not a problem.
119+
private static class InputStreamTee extends InputStream {
120+
private InputStream in;
121+
private OutputStream tee;
122+
private long max = 0;
123+
private long sent = 0;
124+
125+
public InputStreamTee(InputStream in, OutputStream tee, long max) {
126+
super();
127+
this.in = in;
128+
this.tee = tee;
129+
this.max = max;
130+
}
131+
132+
@Override
133+
public int read() throws IOException {
134+
if (in == null) return -1;
135+
136+
if (sent >= max) return in.read();
137+
138+
int b = in.read();
139+
if (b == -1) {
140+
cleanupTee();
141+
return b;
142+
}
143+
144+
if (max == Long.MAX_VALUE) {
145+
tee.write(b);
146+
return b;
147+
}
148+
149+
tee.write(b);
150+
151+
sent++;
152+
if (sent == max) cleanupTee();
153+
154+
return b;
155+
}
156+
157+
@Override
158+
public int read(byte[] b) throws IOException {
159+
if (in == null) return -1;
160+
161+
if (sent >= max) return in.read(b);
162+
163+
return readTee(b, 0, in.read(b));
164+
}
165+
166+
@Override
167+
public int read(byte[] b, int off, int len) throws IOException {
168+
if (in == null) return -1;
169+
170+
if (sent >= max) return in.read(b, off, len);
171+
172+
return readTee(b, 0, in.read(b, off, len));
173+
}
174+
175+
private int readTee(byte[] b, int off, int resultLen) throws IOException {
176+
if (resultLen < 1) {
177+
if (resultLen == -1) cleanupTee();
178+
return resultLen;
179+
}
180+
181+
if (max == Long.MAX_VALUE) {
182+
tee.write(b, off, resultLen);
183+
return resultLen;
184+
}
185+
186+
int teeLen = ((sent + resultLen) <= max) ? resultLen : (int) (max - sent);
187+
sent += teeLen;
188+
tee.write(b, off, teeLen);
189+
190+
if (sent >= max) cleanupTee();
191+
192+
return resultLen;
193+
}
194+
195+
private void cleanupTee() throws IOException {
196+
if (tee == null) return;
197+
tee.flush();
198+
tee = null;
199+
}
200+
201+
@Override
202+
public void close() throws IOException {
203+
if (in == null) return;
204+
in.close();
205+
in = null;
206+
cleanupTee();
207+
}
208+
209+
@Override
210+
public int available() throws IOException {
211+
if (in == null) return 0;
212+
return in.available();
213+
}
214+
215+
@Override
216+
public boolean markSupported() {
217+
if (in == null) return false;
218+
return in.markSupported();
219+
}
220+
221+
@Override
222+
public synchronized void mark(int readLimit) {
223+
if (in == null) return;
224+
in.mark(readLimit);
225+
}
226+
227+
@Override
228+
public synchronized void reset() throws IOException {
229+
if (in == null) throw new IOException("Input Stream closed");
230+
in.reset();
231+
}
232+
233+
@Override
234+
public long skip(long n) throws IOException {
235+
if (in == null) throw new IOException("Input Stream closed");
236+
return in.skip(n);
237+
}
238+
}
120239
}

0 commit comments

Comments
 (0)