-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streams take does not terminate the stream early #368
Comments
This seems to have been introduced by stream chunking: #270 @fwbrasil, correct me if I am wrong, but I think we need a |
@hearnadam It doesn't specifically look related to chunking to me. Check out the top level definition of kyo/kyo-core/shared/src/main/scala/kyo/streams.scala Lines 22 to 26 in 3ebea0c
The way I read that is, when invoked as |
Yes, it doesn't seem related to chunking. I think we need a way for handlers to signal to producers that there is no interest in new elements anymore. It seems an easy way to do that is to resume with a |
Reproduce #368: Stream transform test
I've explored the solution I mentioned but it's not clear to me what the behavior should be with chunking. Does |
ZStreams have operators that break chunking, as well as operators that expose the underlying chunk. Choosing an API here will depend on what's most important to end users. In my opinion, it's generally non-obvious that operators such as To directly answer your question, yes
|
@hearnadam While there is a decision here about if |
Thank you for the clarifications and links! If I understood correctly, we need to keep chunking but any handler in the chain of transformations should be able to halt execution for all its nested handlers, even they're still processing a chunk. This kind of "bi-directional" communication between nested handlers would be too expensive. It'd require double the effect suspensions. Maybe we could have two separate effects:
I'm not sure what to name those but they seem both valuable independent directions to evolve the handling of this kind of computation |
@lukestephenson @hearnadam would you have some feedback on my last comment? I can probably pick this up in the next weeks. |
This sounds like the best approach to me. Although I'd prefer to avoid the raw Boolean and have something like:
I don't care about the semantics within processing a chunk. If |
If this works, we could still keep a single |
I noticed two `Chunk` bugs while working on #368. `Chunk.changes` was ignoring the `first` param and `Chunk.copyTo` wasn't considering the offset correctly for appends.
This PR introduces a new `Emit` effect to be used as the underlying effect of the new `Stream` implementation. The effect replaces `Sum` since it has a very similar functionality. The `Stream` implementation introduces two major changes: 1. The type now has only two parameters `Stream[S, V]` and the underlying computation now produces an `Ack`. It's a simplification that reduces the expressivity of the effect but I think usability is more important. We can later add a separate implementation like `Flow[S, V, A]` with a functionality like the previous Stream implementation if needed. I placed the `S` parameter as the first one. It's similar to the new `Fiber[E, A]` design. 2. The implementation now properly handles early termination (#368). I won't close the ticket for now since I still need to add the `IO`-based operations as extension methods when porting `kyo-core`. I've also made a minor improvement to `Var.update` so it returns the updated value.
fixed by #572 |
Consider the following application which emits 10,000 stream elements, but a downstream
take(5)
limits that to the first 5 operations. I'd expect this to signal early termination up the stream and stop emitting elements.However, when this is run, after the first 5 elements are consumed, the remaining elements are still printed to the console as "pre take"
The equivalent zio example completes early:
The text was updated successfully, but these errors were encountered: