Skip to content

Commit

Permalink
Replace new DefaultStreamMessage() to StreamMessage.streaming() (#…
Browse files Browse the repository at this point in the history
…4717)



### Motivation:

> ```java
>    static <T> StreamMessageWriter<T> streaming() {
>       return new DefaultStreamMessage<>();
> ```
> So you want to deprecate this class in the follow-up PR?
> I mean deprecating the whole class because it doesn't need to be exposed to public.

- See #4696 (comment)
- We don't want to expose `new DefaultStreamMessage()` directly cause we have `StreamMessage.streaming()` instead 


### Modifications:
- Replace `new DefaultStreamMessage()` to `StreamMessage.streaming()`

### Result:
- Migrate the usage of `new DefaultStreamMessage()` with `StreamMessage.streaming()`.

**Related PR** #4696, **Issue** #4253
  • Loading branch information
injae-kim authored Mar 9, 2023
1 parent 1fc5470 commit 179d9a5
Show file tree
Hide file tree
Showing 29 changed files with 95 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public long notJmhEventLoop(StreamObjects streamObjects) throws Exception {
static StreamMessage<Integer> newStream(StreamObjects streamObjects) {
switch (streamObjects.streamType) {
case DEFAULT_STREAM_MESSAGE:
return new DefaultStreamMessage<>();
return StreamMessage.streaming();
case FIXED_STREAM_MESSAGE:
switch (streamObjects.num) {
case 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.DefaultStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.StreamWriter;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.internal.common.stream.NoopSubscription;
import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals;
Expand Down Expand Up @@ -68,7 +69,7 @@ final class MultipartEncoder implements StreamMessage<HttpData> {
private volatile CompletableFuture<Void> completionFuture;

@Nullable
private volatile DefaultStreamMessage<StreamMessage<HttpData>> emitter;
private volatile StreamWriter<StreamMessage<HttpData>> emitter;

MultipartEncoder(StreamMessage<BodyPart> publisher, String boundary) {
requireNonNull(boundary, "boundary");
Expand Down Expand Up @@ -150,8 +151,8 @@ public long demand() {
return emitter.demand();
}

private static DefaultStreamMessage<StreamMessage<HttpData>> newEmitter(Subscription upstream) {
final DefaultStreamMessage<StreamMessage<HttpData>> emitter =
private static StreamWriter<StreamMessage<HttpData>> newEmitter(Subscription upstream) {
final StreamWriter<StreamMessage<HttpData>> emitter =
new DefaultStreamMessage<StreamMessage<HttpData>>() {
@Override
protected void onRequest(long n) {
Expand Down Expand Up @@ -223,7 +224,7 @@ public void onSubscribe(Subscription subscription) {
assert downstream != null;

subscribed = true;
final DefaultStreamMessage<StreamMessage<HttpData>> newEmitter = newEmitter(subscription);
final StreamWriter<StreamMessage<HttpData>> newEmitter = newEmitter(subscription);

// The 'emitter' should be set before reading 'closed' flag.
// It guarantees that the emitter.abort() or downstream.onError() is always called with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.linecorp.armeria.internal.common.stream.AbortingSubscriber;
import com.linecorp.armeria.internal.common.stream.StreamMessageUtil;

abstract class AbstractStreamMessageWriter<T> extends CancellableStreamMessage<T> implements StreamWriter<T> {
abstract class AbstractStreamWriter<T> extends CancellableStreamMessage<T> implements StreamWriter<T> {

enum State {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

final class ByteStreamMessageOutputStream implements ByteStreamMessage {

private final StreamWriter<HttpData> outputStreamWriter = new DefaultStreamMessage<>();
private final StreamWriter<HttpData> outputStreamWriter = StreamMessage.streaming();
private final ByteStreamMessage delegate = ByteStreamMessage.of(outputStreamWriter);

private final Consumer<? super OutputStream> outputStreamConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
* @param <T> the type of element signaled
*/
@UnstableApi
public class DefaultStreamMessage<T> extends AbstractStreamMessageWriter<T> {
public class DefaultStreamMessage<T> extends AbstractStreamWriter<T> {

private static final Logger logger = LoggerFactory.getLogger(DefaultStreamMessage.class);

Expand Down Expand Up @@ -104,7 +104,10 @@ public class DefaultStreamMessage<T> extends AbstractStreamMessageWriter<T> {

/**
* Creates a new instance.
*
* @deprecated Use {@link StreamMessage#streaming()} instead.
*/
@Deprecated
public DefaultStreamMessage() {
queue = new MpscChunkedArrayQueue<>(INITIAL_CAPACITY, 1 << 30);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ default StreamMessage<T> peekError(Consumer<? super Throwable> action) {
* {@link StreamMessage} when any error occurs.
*
* <p>Example:<pre>{@code
* DefaultStreamMessage<Integer> stream = new DefaultStreamMessage<>();
* StreamWriter<Integer> stream = StreamMessage.streaming();
* stream.write(1);
* stream.write(2);
* stream.close(new IllegalStateException("Oops..."));
Expand All @@ -884,7 +884,7 @@ default StreamMessage<T> recoverAndResume(
* specified {@code causeClass}.
*
* <p>Example:<pre>{@code
* DefaultStreamMessage<Integer> stream = new DefaultStreamMessage<>();
* StreamWriter<Integer> stream = StreamMessage.streaming();
* stream.write(1);
* stream.write(2);
* stream.close(new IllegalStateException("Oops..."));
Expand All @@ -893,7 +893,7 @@ default StreamMessage<T> recoverAndResume(
*
* assert resumed.collect().join().equals(List.of(1, 2, 3, 4));
*
* DefaultStreamMessage<Integer> stream = new DefaultStreamMessage<>();
* StreamWriter<Integer> stream = StreamMessage.streaming();
* stream.write(1);
* stream.write(2);
* stream.write(3);
Expand All @@ -910,7 +910,7 @@ default StreamMessage<T> recoverAndResume(
*
* recoverChain.collect().join();
*
* DefaultStreamMessage<Integer> stream = new DefaultStreamMessage<>();
* StreamWriter<Integer> stream = StreamMessage.streaming();
* stream.write(1);
* stream.write(2);
* stream.close(ClosedStreamException.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
/**
* Default {@link HttpRequest} implementation.
*/
@SuppressWarnings("deprecation")
public class DefaultHttpRequest extends DefaultStreamMessage<HttpObject> implements HttpRequestWriter {

private final RequestHeaders headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
/**
* Default {@link HttpResponse} instance.
*/
@SuppressWarnings("deprecation")
public class DefaultHttpResponse extends DefaultStreamMessage<HttpObject> implements HttpResponseWriter {
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void regularFixedStreamMessageShouldAbortElements() {
@Test
void defaultStreamMessageShouldAbortElements() {
final StreamMessage<Integer> inner = StreamMessage.of(1);
final DefaultStreamMessage<StreamMessage<Integer>> defaultStreamMessage = new DefaultStreamMessage<>();
final StreamWriter<StreamMessage<Integer>> defaultStreamMessage = StreamMessage.streaming();
defaultStreamMessage.write(inner);
defaultStreamMessage.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ void testInitialState() {
@Test
void testSetDelegate() {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
m.delegate(new DefaultStreamMessage<>());
assertThatThrownBy(() -> m.delegate(new DefaultStreamMessage<>()))
m.delegate(StreamMessage.streaming());
assertThatThrownBy(() -> m.delegate(StreamMessage.streaming()))
.isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> m.delegate(null)).isInstanceOf(NullPointerException.class);
}
Expand Down Expand Up @@ -91,7 +91,7 @@ void testEarlyAbortWithSubscriber(@Nullable Throwable cause) {
}
assertAborted(m, cause);

final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
final StreamWriter<Object> d = StreamMessage.streaming();
m.delegate(d);
assertAborted(d, cause);
}
Expand All @@ -100,7 +100,7 @@ void testEarlyAbortWithSubscriber(@Nullable Throwable cause) {
@ArgumentsSource(AbortCauseArgumentProvider.class)
void testLateAbort(@Nullable Throwable cause) {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
final StreamWriter<Object> d = StreamMessage.streaming();

m.delegate(d);
if (cause == null) {
Expand All @@ -117,7 +117,7 @@ void testLateAbort(@Nullable Throwable cause) {
@ArgumentsSource(AbortCauseArgumentProvider.class)
void testLateAbortWithSubscriber(@Nullable Throwable cause) {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
final StreamWriter<Object> d = StreamMessage.streaming();
@SuppressWarnings("unchecked")
final Subscriber<Object> subscriber = mock(Subscriber.class);

Expand All @@ -143,7 +143,7 @@ void testLateAbortWithSubscriber(@Nullable Throwable cause) {
@Test
void testEarlySubscription() {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
final StreamWriter<Object> d = StreamMessage.streaming();
@SuppressWarnings("unchecked")
final Subscriber<Object> subscriber = mock(Subscriber.class);

Expand All @@ -157,7 +157,7 @@ void testEarlySubscription() {
@Test
void testLateSubscription() {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
final StreamWriter<Object> d = StreamMessage.streaming();

m.delegate(d);

Expand Down Expand Up @@ -190,7 +190,7 @@ private static void assertFailedSubscription(StreamMessage<?> m, Class<? extends
@Test
void testStreaming() {
final DeferredStreamMessage<String> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<String> d = new DefaultStreamMessage<>();
final StreamWriter<String> d = StreamMessage.streaming();
m.delegate(d);

final RecordingSubscriber subscriber = new RecordingSubscriber();
Expand All @@ -216,7 +216,7 @@ void testStreaming() {
@Test
void testStreamingError() {
final DeferredStreamMessage<String> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<String> d = new DefaultStreamMessage<>();
final StreamWriter<String> d = StreamMessage.streaming();
m.delegate(d);

final RecordingSubscriber subscriber = new RecordingSubscriber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public StreamMessage<Long> createPublisher(long elements) {
@Override
public StreamMessage<Long> createFailedPublisher() {
final DeferredStreamMessage<Long> stream = new DeferredStreamMessage<>();
final DefaultStreamMessage<Long> delegate = new DefaultStreamMessage<>();
final StreamWriter<Long> delegate = StreamMessage.streaming();
delegate.subscribe(new NoopSubscriber<>());
stream.delegate(delegate);
return stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void withPooledObjects(boolean filterSupportsPooledObjects, boolean subscribedWi
int expectedRefCntInFilter, int expectedRefCntInOnNext) {
final ByteBuf buf = newPooledBuffer();
final HttpData data = HttpData.wrap(buf).withEndOfStream();
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
final StreamWriter<HttpData> stream = StreamMessage.streaming();
stream.write(data);
stream.close();

Expand Down Expand Up @@ -108,7 +108,7 @@ public void onComplete() {
void notifyCancellation() {
final ByteBuf buf = newPooledBuffer();
final HttpData data = HttpData.wrap(buf).withEndOfStream();
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
final StreamWriter<HttpData> stream = StreamMessage.streaming();
stream.write(data);
stream.close();

Expand All @@ -126,7 +126,7 @@ protected HttpData filter(HttpData obj) {
void errorPropagation() {
final EventLoop eventLoop = eventLoopExtension.get();
final AtomicReference<Throwable> causeRef = new AtomicReference<>();
final DefaultStreamMessage<Integer> streamMessage = new DefaultStreamMessage<>();
final StreamWriter<Integer> streamMessage = StreamMessage.streaming();
streamMessage.write(1);
streamMessage.write(2);
streamMessage.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void mapError() {
.expectErrorMatches(cause -> cause == third)
.verify();

final DefaultStreamMessage<Integer> defaultStream = new DefaultStreamMessage<>();
final StreamWriter<Integer> defaultStream = StreamMessage.streaming();
defaultStream.write(1);
defaultStream.write(2);
defaultStream.close(first);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ void testAbortWithoutSubscriber(@Nullable Throwable cause) {
@Test
void notifyCancellation() {
final ByteBuf buf = newPooledBuffer();
final DefaultStreamMessage<HttpData> delegate = new DefaultStreamMessage<>();
final StreamWriter<HttpData> delegate = StreamMessage.streaming();
delegate.write(HttpData.wrap(buf));
final PublisherBasedStreamMessage<HttpData> p = new PublisherBasedStreamMessage<>(delegate);
SubscriptionOptionTest.notifyCancellation(buf, p);
}

@Test
void cancellationIsNotPropagatedByDefault() {
final DefaultStreamMessage<Integer> delegate = new DefaultStreamMessage<>();
final StreamWriter<Integer> delegate = StreamMessage.streaming();
final PublisherBasedStreamMessage<Integer> p = new PublisherBasedStreamMessage<>(delegate);

p.subscribe(new Subscriber<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ void emptyStreamMessage() {
// An empty stream isn't aborted.
assertThat(stream1.collect().join()).isEqualTo(ImmutableList.of());

final DefaultStreamMessage<Object> stream3 = new DefaultStreamMessage<>();
final StreamWriter<Object> stream3 = StreamMessage.streaming();
stream3.close();
assertThat(stream3.collect().join()).isEqualTo(ImmutableList.of());

final DefaultStreamMessage<Object> stream4 = new DefaultStreamMessage<>();
final StreamWriter<Object> stream4 = StreamMessage.streaming();
final CompletableFuture<List<Object>> collectingFuture = stream4.collect();
stream4.close();
assertThat(collectingFuture.join()).isEqualTo(ImmutableList.of());

final DefaultStreamMessage<Object> stream5 = new DefaultStreamMessage<>();
final StreamWriter<Object> stream5 = StreamMessage.streaming();
final Throwable cause = new IllegalStateException("oops");
stream5.abort(cause);
assertThatThrownBy(() -> stream5.collect().join())
Expand Down Expand Up @@ -109,7 +109,7 @@ void closeOrAbortAndCollect(int size, boolean fixedStream) {
void collectAndClose() {
final int size = 5;
final Map<HttpData, ByteBuf> data = newHttpData(size);
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
final StreamWriter<HttpData> stream = StreamMessage.streaming();
data.forEach((httpData, buf) -> stream.write(httpData));
final CompletableFuture<List<HttpData>> collectingFuture = stream.collect();
assertThat(collectingFuture).isNotDone();
Expand All @@ -122,7 +122,7 @@ void collectAndClose() {
void collectAndAbort() {
final int size = 5;
final Map<HttpData, ByteBuf> data = newHttpData(size);
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
final StreamWriter<HttpData> stream = StreamMessage.streaming();
data.forEach((httpData, buf) -> stream.write(httpData));
final CompletableFuture<List<HttpData>> collectingFuture = stream.collect();
assertThat(collectingFuture).isNotDone();
Expand Down Expand Up @@ -359,7 +359,7 @@ private static StreamMessage<HttpData> newStreamMessage(HttpData[] httpData, boo
if (fixedStream) {
return StreamMessage.of(httpData);
} else {
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
final StreamWriter<HttpData> stream = StreamMessage.streaming();
for (HttpData data : httpData) {
stream.write(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StreamMessageDuplicatorCloseTest {

@Test
void closedDuplicator_elementsAreNotReleasedUntilSubscribedByAllSubscribers() {
final DefaultStreamMessage<HttpData> publisher = new DefaultStreamMessage<>();
final StreamWriter<HttpData> publisher = StreamMessage.streaming();
final ArrayList<ByteBuf> byteBufs = new ArrayList<>(60);
for (int i = 0; i < 60; i++) { // More than 50 that is the REQUEST_REMOVAL_THRESHOLD.
final ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4).writeInt(i);
Expand Down
Loading

0 comments on commit 179d9a5

Please sign in to comment.