Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.core;

import java.util.concurrent.CompletionStage;

import io.reactivex.rxjava4.disposables.Disposable;

public record CompletionStageDisposable<T>(CompletionStage<T> stage, Disposable disposable) {

}
152 changes: 152 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/Streamable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.core;

import java.lang.reflect.InvocationTargetException;
import java.util.Objects;
import java.util.concurrent.*;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.exceptions.Exceptions;
import io.reactivex.rxjava4.functions.Consumer;
import io.reactivex.rxjava4.internal.operators.streamable.*;

/**
* The {@code IAsyncEnumerable} of the Java world.
* Runs best with Virtual Threads.
* TODO proper docs
* @param <T> the element type of the stream.
* @since 4.0.0
*/
public abstract class Streamable<@NonNull T> {

/**
* Realizes the stream and returns an interface that let's one consume it.
* @param cancellation where to register and listen for cancellation calls.
* @return the Streamer instance to consume.
*/
@NonNull
public abstract Streamer<T> stream(@NonNull DisposableContainer cancellation);

/**
* Realizes the stream and returns an interface that let's one consume it.
* @return the Streamer instance to consume.
*/
@NonNull
public final Streamer<T> stream() {
return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead
}

/**
* Returns an empty {@code Streamable} that never produces an item and just completes.
* @param <T> the element type
* @return the {@code Streamable} instance
*/
@NonNull
public static <@NonNull T> Streamable<T> empty() {
return new StreamableEmpty<>();
}

/**
* Returns a single-element {@code Streamable} that produces the constant item and completes.
* @param <T> the element type
* @param item the constant item to produce
* @return the {@code Streamable} instance
*/
public static <@NonNull T> Streamable<T> just(@NonNull T item) {
Objects.requireNonNull(item, "item is null");
return new StreamableJust<>(item);
}

/**
* Consumes elements from this {@code Streamable} via the provided executor service.
* @param consumer the callback that gets the elements until completion
* @return a Disposable that let's one cancel the sequence asynchronously.
*/
@NonNull
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer) {
CompositeDisposable canceller = new CompositeDisposable();
return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor());
}

/**
* Consumes elements from this {@code Streamable} via the provided executor service.
* @param consumer the callback that gets the elements until completion
* @param canceller the container to trigger cancellation of the sequence
* @return the {@code CompletionStage} that gets notified when the sequence ends
*/
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller) {
return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor());
}

/**
* Consumes elements from this {@code Streamable} via the provided executor service.
* @param consumer the callback that gets the elements until completion
* @param executor the service that hosts the blocking waits.
* @return a Disposable that let's one cancel the sequence asynchronously.
*/
@NonNull
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull ExecutorService executor) {
CompositeDisposable canceller = new CompositeDisposable();
return forEach(consumer, canceller, executor);
}

/**
* Consumes elements from this {@code Streamable} via the provided executor service.
* @param consumer the callback that gets the elements until completion
* @param canceller the container to trigger cancellation of the sequence
* @param executor the service that hosts the blocking waits.
* @return the {@code CompletionStage} that gets notified when the sequence ends
*/
@SuppressWarnings("unchecked")
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller, @NonNull ExecutorService executor) {
Objects.requireNonNull(consumer, "consumer is null");
Objects.requireNonNull(canceller, "canceller is null");
Objects.requireNonNull(executor, "executor is null");
final Streamable<T> me = this;
var future = executor.submit(() -> {
try (var str = me.stream(canceller)) {
while (!canceller.isDisposed()) {
if (str.next().toCompletableFuture().join()) {
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
} else {
break;
}
}
} catch (final Throwable crash) {
Exceptions.throwIfFatal(crash);
if (crash instanceof RuntimeException ex) {
throw ex;
}
if (crash instanceof Exception ex) {
throw ex;
}
throw new InvocationTargetException(crash);
}
return null;
});
canceller.add(Disposable.fromFuture(future));
return new CompletionStageDisposable<Void>(StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
}

/**
* Consume this {@code Streamable} via the given flow-reactive-streams subscriber.
* @param subscriber the subscriber to consume with.
* @param executor the service that hosts the blocking waits.
*/
public final void subscribe(@NonNull Flow.Subscriber<? super T> subscriber, @NonNull ExecutorService executor) {

}
}
60 changes: 60 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/Streamer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.core;

import java.util.NoSuchElementException;
import java.util.concurrent.CompletionStage;

import io.reactivex.rxjava4.annotations.NonNull;

/**
* A realized stream which can then be consumed asynchronously in steps.
* Think of it as the {@IAsyncEnumerator} of the Java world. Runs best on Virtual Threads.
* @param <T> the element type.
* TODO proper docs
* @since 4.0.0
*/
public interface Streamer<@NonNull T> extends AutoCloseable {

/**
* Determine if there are more elements available from the source.
* @return eventually true or false, indicating availability or termination
*/
@NonNull
CompletionStage<Boolean> next();

/**
* Returns the current element if {@link #next()} yielded {@code true}.
* Can be called multiple times between {@link #next()} calls.
* @return the current element
* @throws NoSuchElementException before the very first {@link #next()} or after {@link #next()} returned {@code false}
*/
@NonNull
T current();

/**
* Called when the stream ends or gets cancelled. Should be always invoked.
* TODO, this is inherited from {@code IAsyncDisposable} in C#...
* @return the stage you can await to cleanups to happen
*/
@NonNull
CompletionStage<Void> cancel();

/**
* Make this Streamer a resource and a Closeable.
*/
default void close() {
cancel().toCompletableFuture().join();
}
}
18 changes: 13 additions & 5 deletions src/main/java/io/reactivex/rxjava4/disposables/Disposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@

package io.reactivex.rxjava4.disposables;

import java.util.Objects;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.Future;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.functions.Action;
import io.reactivex.rxjava4.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava4.internal.functions.Functions;
import static java.util.concurrent.Flow.*;

import java.util.Objects;
import java.util.concurrent.Future;

/**
* Represents a disposable resource.
*/
public interface Disposable {
public interface Disposable extends AutoCloseable {
/**
* Dispose the resource, the operation should be idempotent.
*/
Expand All @@ -37,6 +37,14 @@ public interface Disposable {
*/
boolean isDisposed();

/**
* Dispose the resource, the operation should be idempotent.
* @since 4.0.0
*/
default void close() {
dispose();
}

/**
* Construct a {@code Disposable} by wrapping a {@link Runnable} that is
* executed exactly once when the {@code Disposable} is disposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Common interface to add and remove disposables from a container.
* @since 2.0
*/
public interface DisposableContainer {
public interface DisposableContainer extends Disposable {

/**
* Adds a disposable to this container or disposes it if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
}
}

@SuppressWarnings("resource")
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void subscribeActual(Observer<? super T> observer) {
return;
}

@SuppressWarnings("resource")
AmbCoordinator<T> ac = new AmbCoordinator<>(observer, count);
ac.subscribe(sources);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void subscribeActual(Observer<? super R> observer) {
return;
}

@SuppressWarnings("resource")
LatestCoordinator<T, R> lc = new LatestCoordinator<>(observer, combiner, count, bufferSize, delayError);
lc.subscribe(sources);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ protected void subscribeActual(Observer<? super T> observer) {
}
}

@SuppressWarnings("resource")
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void subscribeActual(Observer<? super R> observer) {
return;
}

@SuppressWarnings("resource")
ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.operators.streamable;

import java.util.NoSuchElementException;
import java.util.concurrent.*;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.disposables.DisposableContainer;

public final class StreamableEmpty<T> extends Streamable<T> {

@Override
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
return new EmptyStreamer<T>();
}

static final class EmptyStreamer<T> implements Streamer<T> {

@Override
public @NonNull CompletionStage<Boolean> next() {
return CompletableFuture.completedStage(false); // TODO would constant stages work here or is that contention?
}

@Override
public @NonNull T current() {
throw new NoSuchElementException("This Streamable/Streamer never has elements");
}

@Override
public @NonNull CompletionStage<Void> cancel() {
return CompletableFuture.completedStage(null); // TODO would constant stages work here or is that contention?
}
}
}
Loading