Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams take does not terminate the stream early #368

Closed
lukestephenson opened this issue May 15, 2024 · 11 comments
Closed

streams take does not terminate the stream early #368

lukestephenson opened this issue May 15, 2024 · 11 comments
Labels
bug Something isn't working

Comments

@lukestephenson
Copy link
Contributor

Consider the following application which emits 10,000 stream elements, but a downstream take(5) limits that to the first 5 operations. I'd expect this to signal early termination up the stream and stop emitting elements.

package kyo.example

import kyo.{KyoApp, Streams}
import kyo._
import scala.concurrent.duration._

object KyoBufferExample {
  val seq = (1 to 10000)
  val streamApp = Streams.initSeq(seq)
    .transform { i =>
      for {
        _ <- Consoles.println(s"pre take got $i")
      } yield i
    }
    .take(5)
    .transform { i =>
      for {
        _ <- Consoles.println(s"post take got $i")
        _ <- Fibers.sleep(1.seconds)
      } yield i
    }
    .runFold(0)(_   _)

  def main(args: Array[String]): Unit = {
    KyoApp.run(streamApp)
  }

}

However, when this is run, after the first 5 elements are consumed, the remaining elements are still printed to the console as "pre take"

The equivalent zio example completes early:

    ZStream.range(1, 20, 1)
      .tap(i => Console.printLine(s"got $i"))
      .take(2)
      .tap(i => Console.printLine(s"take got $i").delay(1.second))
      .runCollect
@hearnadam hearnadam added the bug Something isn't working label May 15, 2024
@hearnadam
Copy link
Collaborator

This seems to have been introduced by stream chunking: #270

@fwbrasil, correct me if I am wrong, but I think we need a Chunk[V].map which produces Chunk[U < S] instead of Chunk[U] < S
https://github.com/getkyo/kyo/blob/main/kyo-core/shared/src/main/scala/kyo/chunks.scala#L84-L95

@lukestephenson
Copy link
Contributor Author

@hearnadam It doesn't specifically look related to chunking to me. Check out the top level definition of take:

def take(n: Int): Stream[T, V, S] =
if n <= 0 then
runDiscard
else
Streams[V].handle(handlers[V, Any].takeHandler)(n, s)

The way I read that is, when invoked as take(0) that it still tries to process (runDiscard) the stream, just not passing it through any further downstream handlers (but anything upstream will still run to stream completion).

@fwbrasil
Copy link
Collaborator

Yes, it doesn't seem related to chunking. I think we need a way for handlers to signal to producers that there is no interest in new elements anymore. It seems an easy way to do that is to resume with a Boolean instead of a Unit, indicating whether the producer should continue or not.

hearnadam added a commit to hearnadam/kyo that referenced this issue May 16, 2024
fwbrasil added a commit that referenced this issue May 16, 2024
Reproduce #368: Stream transform test
@fwbrasil
Copy link
Collaborator

fwbrasil commented May 16, 2024

I've explored the solution I mentioned but it's not clear to me what the behavior should be with chunking. Does tap break chunking in ZIO?

@hearnadam
Copy link
Collaborator

hearnadam commented May 17, 2024

ZStreams have operators that break chunking, as well as operators that expose the underlying chunk. Choosing an API here will depend on what's most important to end users. In my opinion, it's generally non-obvious that operators such as mapZIO break chunking...

https://github.com/zio/zio/blob/series/2.x/streams/shared/src/main/scala/zio/stream/ZStream.scala#L1850-L1853

To directly answer your question, yes .tap breaks chunking:

        for
            ref <- Ref.make(0)
            inc = ref.update(_   1)
            _ <- ZStream
                .range(0, 128, chunkSize = 8)
                .tap(_ => inc)
                .mapChunks { c =>
                    assert(c.size == 1)
                    c
                }
                .take(10)
                .runCollect
            count <- ref.get
        yield assert(count == 10)
        end for

@lukestephenson
Copy link
Contributor Author

@hearnadam While there is a decision here about if take(x) should break chunks / upstream effects should be limited to the number of elements taken, the main issue here is the upstream emits elements until it completes (rather than short circuiting early). The only difference the chunking decision would make is about the current chunk, not all following chunks. (Side note, I'm not a fan of the ZStream approach to chunking as a hidden concern and I hope kyo can avoid those limitations).

@fwbrasil
Copy link
Collaborator

fwbrasil commented May 18, 2024

Thank you for the clarifications and links! If I understood correctly, we need to keep chunking but any handler in the chain of transformations should be able to halt execution for all its nested handlers, even they're still processing a chunk. This kind of "bi-directional" communication between nested handlers would be too expensive. It'd require double the effect suspensions.

Maybe we could have two separate effects:

  1. The current Streams effect removing IOs/Fibers implementations. 100% pure effect, no chunking
  2. Another effect adopting a pull-based approach using IOs/Fibers and chunking

I'm not sure what to name those but they seem both valuable independent directions to evolve the handling of this kind of computation

@fwbrasil
Copy link
Collaborator

@lukestephenson @hearnadam would you have some feedback on my last comment? I can probably pick this up in the next weeks.

@lukestephenson
Copy link
Contributor Author

It seems an easy way to do that is to resume with a Boolean instead of a Unit, indicating whether the producer should continue or not.

This sounds like the best approach to me. Although I'd prefer to avoid the raw Boolean and have something like:

opaque type Ack = Boolean

object Ack {
  val Stop = false
  val Continue = true
}

If I understood correctly, we need to keep chunking but any handler in the chain of transformations should be able to halt execution for all its nested handlers, even they're still processing a chunk.

I don't care about the semantics within processing a chunk. If take indicated to terminate, I don't think that needs to cause termination of processing the in flight chunk, and agree this is too expensive. I don't want it to continue past the current chunk though.

@fwbrasil
Copy link
Collaborator

I don't care about the semantics within processing a chunk. If take indicated to terminate, I don't think that needs to cause termination of processing the in flight chunk, and agree this is too expensive. I don't want it to continue past the current chunk though.

If this works, we could still keep a single Streams impl. I think we'd also need a mechanism to automatically break chunks if they're too large? What do you think @hearnadam?

@fwbrasil fwbrasil mentioned this issue Jun 4, 2024
hearnadam pushed a commit that referenced this issue Jun 5, 2024
I noticed two `Chunk` bugs while working on
#368. `Chunk.changes` was ignoring
the `first` param and `Chunk.copyTo` wasn't considering the offset
correctly for appends.
fwbrasil added a commit that referenced this issue Jul 5, 2024
This PR introduces a new `Emit` effect to be used as the underlying
effect of the new `Stream` implementation. The effect replaces `Sum`
since it has a very similar functionality.

The `Stream` implementation introduces two major changes:

1. The type now has only two parameters `Stream[S, V]` and the
underlying computation now produces an `Ack`. It's a simplification that
reduces the expressivity of the effect but I think usability is more
important. We can later add a separate implementation like `Flow[S, V,
A]` with a functionality like the previous Stream implementation if
needed. I placed the `S` parameter as the first one. It's similar to the
new `Fiber[E, A]` design.

2. The implementation now properly handles early termination
(#368). I won't close the ticket for
now since I still need to add the `IO`-based operations as extension
methods when porting `kyo-core`.

I've also made a minor improvement to `Var.update` so it returns the
updated value.
@fwbrasil
Copy link
Collaborator

fixed by #572

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants