-
-
Notifications
You must be signed in to change notification settings - Fork 231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Stream.share api #3080
add Stream.share api #3080
Changes from 1 commit
9775cb4
25f614b
e61d2f5
cccdd87
650b825
fe3dc3d
4f3f1d3
7da06ee
12a71b8
db839f5
ecb9e8e
7953e66
2e60d91
4849549
b6f27be
f9800cb
c14e39f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8637,42 8637,49 @@ export const shareRefCount = dual< | |
readonly connector?: Effect.Effect<PubSub.PubSub<Take.Take<A, E>>> | ||
}) => <R>( | ||
self: Stream.Stream<A, E, R> | ||
) => Stream.Stream<A, E, R>, | ||
) => Effect.Effect<Stream.Stream<A, E, R>, never, R | Scope.Scope>, | ||
<A, E, R>( | ||
self: Stream.Stream<A, E, R>, | ||
config: { | ||
readonly connector?: Effect.Effect<PubSub.PubSub<Take.Take<A, E>>> | ||
} | ||
) => Stream.Stream<A, E, R> | ||
) => Effect.Effect<Stream.Stream<A, E, R>, never, R | Scope.Scope> | ||
>( | ||
2, | ||
<A, E, R>( | ||
self: Stream.Stream<A, E, R>, | ||
config: { | ||
readonly connector?: Effect.Effect<PubSub.PubSub<Take.Take<A, E>>> | ||
} | ||
): Stream.Stream<A, E, R> => { | ||
let refCount = 0 | ||
let connector: PubSub.PubSub<Take.Take<A, E>> | null = null | ||
let fiber: Fiber.RuntimeFiber<void> | null = null | ||
): Effect.Effect<Stream.Stream<A, E, R>, never, R | Scope.Scope> => { | ||
return Effect.gen(function*() { | ||
refCount | ||
connector ??= yield* (config.connector ?? PubSub.bounded<Take.Take<A, E>>(16)) | ||
fiber ??= yield* Effect.forkDaemon(runIntoPubSub(self, connector)) | ||
return flattenTake(fromPubSub(connector)) | ||
}).pipe( | ||
unwrap, | ||
ensuring( | ||
Effect.suspend(() => { | ||
refCount-- | ||
const cleanup = fiber | ||
fiber = null | ||
return refCount === 0 && cleanup | ||
? Fiber.interrupt(cleanup) | ||
: Effect.void | ||
}) | ||
let refCount = 0 | ||
let connector: PubSub.PubSub<Take.Take<A, E>> | null = null | ||
let fiber: Fiber.RuntimeFiber<void> | null = null | ||
const scope = yield* Effect.scope | ||
const context = yield* Effect.context<R>() | ||
return Effect.gen(function*() { | ||
refCount | ||
connector ??= yield* (config.connector ?? PubSub.bounded<Take.Take<A, E>>(16)) | ||
fiber ??= yield* runIntoPubSub(self, connector).pipe( | ||
tim-smart marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Effect.locally(FiberRef.currentContext, context), | ||
Effect.forkIn(scope) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. declare const stream: Stream<number, never, Foo>
Effect.gen(function* () {
const shared = yield* sharedRefCount(stream)
yield* doSomethingWith(shared).pipe(
Effect.provide(Foo1Layer),
Effect.forkScoped,
)
yield* doSomethingWith(shared).pipe(
Effect.provide(Foo2Layer),
Effect.forkScoped
)
}) @tim-smart if I understand correctly, both instances would use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They would use whatever was supplied to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. despite this effect being ran as part of the stream evaluation? How? |
||
) | ||
return flattenTake(fromPubSub(connector)) | ||
}).pipe( | ||
unwrap, | ||
ensuring( | ||
Effect.suspend(() => { | ||
refCount-- | ||
const cleanup = fiber | ||
fiber = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are clearing the fiber even if the ref count is not 0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it doesn't matter what is the ref count, theres a new pubsub and new forked fiber for every evaluation of listening There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use Nullish coalescing assignment, so i don't think it's the case here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, only a new forked fiber on every eval, not new You should still instantiate the |
||
return refCount === 0 && cleanup | ||
? Fiber.interrupt(cleanup) | ||
: Effect.void | ||
}) | ||
) | ||
) | ||
) | ||
}) | ||
} | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.