programing tip

"스트림이 이미 작동되었거나 닫혔습니다"를 방지하기 위해 스트림을 복사합니다.

itbloger 2020. 8. 22. 08:17
반응형

"스트림이 이미 작동되었거나 닫혔습니다"를 방지하기 위해 스트림을 복사합니다.


두 번 처리 할 수 ​​있도록 Java 8 스트림을 복제하고 싶습니다. 나는 collect목록으로 할 수 있고 그로부터 새로운 스트림을 얻을 수 있습니다 .

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

하지만 좀 더 효율적이고 우아한 방법이 있어야한다고 생각합니다.

컬렉션으로 전환하지 않고 스트림을 복사하는 방법이 있습니까?

나는 실제로 Eithers 스트림으로 작업하고 있으므로 오른쪽 투영으로 이동하고 다른 방식으로 처리하기 전에 왼쪽 투영을 한 방향으로 처리하고 싶습니다. 이런 종류의 (지금까지는 toList트릭 을 사용해야합니다 ).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );

효율성에 대한 당신의 가정은 거꾸로 생각합니다. 데이터를 저장할 필요가 없기 때문에 데이터를 한 번만 사용하려는 경우 이러한 엄청난 효율성을 얻을 수 있으며, 스트림은 파이프 라인을 통해 전체 데이터를 효율적으로 흐르게하는 강력한 "루프 융합"최적화를 제공합니다.

동일한 데이터를 재사용하려면 정의에 따라 두 번 (결정적으로) 생성하거나 저장해야합니다. 이미 컬렉션에있는 경우 좋습니다. 두 번 반복하면 저렴합니다.

우리는 "포크 스트림"으로 디자인을 실험했습니다. 우리가 발견 한 것은이를 지원하는 데 실제 비용이 든다는 것입니다. 그것은 흔하지 않은 경우를 희생하여 일반적인 경우 (한 번 사용)를 부담했습니다. 큰 문제는 "두 파이프 라인이 동일한 속도로 데이터를 소비하지 않을 때 발생하는 일"을 처리하는 것이 었습니다. 이제 어쨌든 버퍼링으로 돌아갑니다. 이것은 분명히 그 무게를 지탱하지 못한 특징이었습니다.

동일한 데이터에 대해 반복적으로 작업하려면 데이터를 저장하거나 컨슈머로 작업을 구성하고 다음을 수행하십시오.

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

처리 모델이 이러한 종류의 "스트림 포크"에 더 적합하기 때문에 RxJava 라이브러리를 살펴볼 수도 있습니다.


java.util.function.Supplier를 사용하십시오 .

에서 http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

스트림 재사용

Java 8 스트림은 재사용 할 수 없습니다. 터미널 작업을 호출하자마자 스트림이 닫힙니다.

Stream<String> stream =

Stream.of("d2", "a2", "b1", "b3", "c")

.filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok

stream.noneMatch(s -> true);   // exception

동일한 스트림에서 anyMatch 후에 noneMatch를 호출하면 다음 예외가 발생합니다.

java.lang.IllegalStateException: stream has already been operated upon or closed

at 

java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)

at 

java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)

at com.winterbe.java8.Streams5.test7(Streams5.java:38)

at com.winterbe.java8.Streams5.main(Streams5.java:28)

이 한계를 극복하려면 실행하려는 모든 터미널 작업에 대해 새 스트림 체인을 만들어야합니다. 예를 들어 모든 중간 작업이 이미 설정된 새 스트림을 생성하는 스트림 공급자를 만들 수 있습니다.

Supplier<Stream<String>> streamSupplier =

    () -> Stream.of("d2", "a2", "b1", "b3", "c")

            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok

streamSupplier.get().noneMatch(s -> true);  // ok

에 대한 각 호출 get()은 원하는 터미널 작업을 호출하기 위해 저장되는 새 스트림 생성합니다.


우리는 구현했습니다 duplicate()에서 스트림을위한 방법을 jOOλ , 우리가에 대한 통합 테스트를 개선하기 위해 만든 오픈 소스 라이브러리 jOOQ을 . 기본적으로 다음과 같이 작성할 수 있습니다.

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.

Here's how the algorithm works:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

Tuple2 is probably like your Pair type, whereas Seq is Stream with some enhancements.


You could create a stream of runnables (for example):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Where failure and success are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.


Use supplier to produce the stream for each termination operation.

Supplier <Stream<Integer>> streamSupplier=()->list.stream();

Whenever you need a stream of that collection, use streamSupplier.get() to get a new stream.

Examples:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

Another way to handle the elements multiple times is to use Stream.peek(Consumer):

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) can be chained as many times as needed.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));

cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-

There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.


For this particular problem you can use also partitioning. Something like

     // Partition Eighters into left and right
     List<Either<Pair<A, Throwable>, A>> results = doSomething();
     Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
     passingFailing.get(true) <- here will be all passing (left values)
     passingFailing.get(false) <- here will be all failing (right values)

We can make use of Stream Builder at the time of reading or iterating a stream. Here's the document of Stream Builder.

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html

Use case

Let's say we have employee stream and we need to use this stream to write employee data in excel file and then update the employee collection/table [This is just use case to show the use of Stream Builder]:

Stream.Builder<Employee> builder = Stream.builder();

employee.forEach( emp -> {
   //store employee data to excel file 
   // and use the same object to build the stream.
   builder.add(emp);
});

//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();

I had a similar problem, and could think of three different intermediate structures from which to create a copy of the stream: a List, an array and a Stream.Builder. I wrote a little benchmark program, which suggested that from a performance point of view the List was about 30% slower than the other two which were fairly similar.

The only drawback of converting to an array is that it is tricky if your element type is a generic type (which in my case it was); therefore I prefer to use a Stream.Builder.

I ended up writing a little function that creates a Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

I can then make a copy of any stream str by doing str.collect(copyCollector()) which feels quite in keeping with the idiomatic usage of streams.

참고URL : https://stackoverflow.com/questions/23860533/copy-a-stream-to-avoid-stream-has-already-been-operated-upon-or-closed

반응형