Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Stream): broadcastDynamicRefCount and new PubSub options support #3080

Draft
wants to merge 29 commits into
base: next-minor
Choose a base branch
from

Conversation

dilame
Copy link
Contributor

@dilame dilame commented Jun 24, 2024

Type

  • Refactor
  • Feature
  • Bug Fix
  • Optimization
  • Documentation Update

Description

Stream.share({ connector: PubSub.unbounded() }) returns a new Stream that multicasts (shares) the original Stream by forking runIntoPubSub process in forkScoped mode. As long as there is at least one consumer, this Stream will be run and emitting data. When all consumers have exited, it will kill forked daemon.

I understand that this PR is kinda incomplete, i just need to know if you are interested in this operator at all, so i could finish.
Please, let me know.

Related

I have not filled an issue on GitHub, but i posted a question in Discord, i will repeat it here

I have a WebSocket connection to a remote server. To subscribe to a topic, I send a subscribe@someTopic message, and to unsubscribe, I send an unsubscribe@someTopic message. Each topic is represented as a broadcasted Stream.

The challenge is that if two different parts of my code subscribe to the same topic, they should use the same WS connection, but should be able to work independently at the same time. This is difficult to achieve with the classic Scope finalization model because if one of the two consumers closes the scope, it will send the unsubscribe@someTopic signal to the server, causing the second consumer to stop receiving events without notice.

Using a global scope is not a viable solution because I need to ensure the WebSocket connection remains clean and unsubscribe from the topic when there are no active consumers.

The share operator addresses this issue by allowing multiple independent consumers to subscribe to the same topic without interfering with each other. This ensures that each consumer can independently manage its subscription and receive events without being affected by other consumers.

And, actually, there are much more scenarios when such share is invaluable.

@dilame dilame requested a review from mikearnaldi as a code owner June 24, 2024 23:40
Copy link

changeset-bot bot commented Jun 24, 2024

🦋 Changeset detected

Latest commit: 68dc7ef

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 30 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@tim-smart
Copy link
Member

I think a better primitive is a kind of ScopedRef that does ref counting. You can then wrap a PubSub or Queue with it and call Stream.fromQueue etc.

It could also be used for other applications.

@dilame
Copy link
Contributor Author

dilame commented Jun 25, 2024

As i said, this PR is WIP. It should have options like

interface Options {
  readonly connector: Effect.Effect<
      PubSub.PubSub<Take.Take<A, E>> | Queue.Queue<Take.Take<A, E>>
    >;
  readonly resetOnRefCountZero?: boolean;
  readonly resetOnError?: boolean;
  readonly resetOnComplete?: boolean;
}

Let's say we have

Stream.share({
  connector: PubSub.replay(1),
  resetOnRefCountZero: false;
})

That means if there was 2 consumers, then they both unsubscribed (0 consumers at now), then 1 consumer subscribed – it will receive the last value from stream. As i understand, it is impossible to achieve with ScopedRef idea.
Also, i don't quite understand how to implement such a ScopedRef primitive

@dilame
Copy link
Contributor Author

dilame commented Jun 25, 2024

@tim-smart could you maybe provide an example of code of how to use hypothetical ScopedRef to solve the described problem?

@github-actions github-actions bot force-pushed the stream-share-auto branch 2 times, most recently from 26d52b5 to abff7f3 Compare June 26, 2024 02:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Waiting on Author
Development

Successfully merging this pull request may close these issues.

None yet

6 participants