Skip to content

Commit

Permalink
docs(samples): Add code sample for optimistic subscribe (#1182)
Browse files Browse the repository at this point in the history
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mukund-ananthu and gcf-owl-bot[bot] committed Jun 18, 2024
1 parent 5094605 commit d8e8aa5
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
93 changes: 93 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
# [END pubsub_create_pull_subscription]


def optimistic_subscribe(
project_id: str,
topic_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
# [START pubsub_optimistic_subscribe]
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except NotFound:
print(f"Subscription {subscription_path} not found, creating it.")

try:
# If the subscription does not exist, then create it.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

if subscription:
print(f"Subscription {subscription.name} created")
else:
raise ValueError("Subscription creation failed.")

# Subscribe on the created subscription.
try:
streaming_pull_future = subscriber.subscribe(
subscription.name, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except Exception as e:
print(
f"Exception occurred when creating subscription and subscribing to it: {e}"
)
except Exception as e:
print(f"Exception occurred when attempting optimistic subscribe: {e}")
# [END pubsub_optimistic_subscribe]


def create_subscription_with_dead_letter_topic(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -1161,6 1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
optimistic_subscribe_parser.add_argument("topic_id")
optimistic_subscribe_parser.add_argument("subscription_id")
optimistic_subscribe_parser.add_argument(
"timeout", default=None, type=float, nargs="?"
)

receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
receive_parser.add_argument("subscription_id")
receive_parser.add_argument("timeout", default=None, type=float, nargs="?")
Expand Down Expand Up @@ -1303,6 1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
elif args.command == "optimistic-subscribe":
optimistic_subscribe(
args.project_id, args.topic_id, args.subscription_id, args.timeout
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_id, args.timeout)
elif args.command == "receive-custom-attributes":
Expand Down
50 changes: 50 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 234,56 @@ def test_create_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_optimistic_subscribe(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
publisher_client: pubsub_v1.PublisherClient,
capsys: CaptureFixture[str],
) -> None:
subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id)
# Ensure there is no pre-existing subscription.
# So that we can test the case where optimistic subscribe fails.
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

# Invoke optimistic_subscribe when the subscription is not present.
# This tests scenario where optimistic subscribe fails.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
out, _ = capsys.readouterr()
# Verify optimistic subscription failed.
assert f"Subscription {subscription_path} not found, creating it." in out
# Verify that subscription created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" in out
# Verify that subscription didn't already exist.
assert "Successfully subscribed until the timeout passed." not in out

# Invoke optimistic_subscribe when the subscription is present.
# This tests scenario where optimistic subscribe succeeds.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)

out, _ = capsys.readouterr()
# Verify optimistic subscription succeeded.
assert f"Subscription {subscription_path} not found, creating it." not in out
# Verify that subscription was not created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" not in out
# Verify that subscription already existed.
assert "Successfully subscribed until the timeout passed." in out

# Test case where optimistic subscribe throws an exception other than NotFound
# or TimeoutError.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5)
out, _ = capsys.readouterr()
assert "Exception occurred when attempting optimistic subscribe:" in out

# Clean up resources created during test.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_create_subscription_with_dead_letter_policy(
subscriber_client: pubsub_v1.SubscriberClient,
dead_letter_topic: str,
Expand Down

0 comments on commit d8e8aa5

Please sign in to comment.