Skip to content

More flexible PGMQ Postgres extension Python client that using SQLAlchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

License

Notifications You must be signed in to change notification settings

jason810496/pgmq-sqlalchemy

Repository files navigation

Poetry Ruff PyPI - Version PyPI - License PyPI - Python Version codecov Documentation Status

pgmq-sqlalchemy

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Table of Contents

Features

  • Supports async and sync engines and sessionmakers, or built from dsn.
  • Automatically creates pgmq (or pg_partman) extension on the database if not exists.
  • Supports all postgres DBAPIs supported by sqlalchemy.

    e.g. psycopg, psycopg2, asyncpg ..
    See SQLAlchemy Postgresql Dialects

Installation

Install with pip:

pip install pgmq-sqlalchemy

Install with additional DBAPIs packages:

pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"

Getting Started

Postgres Setup

Prerequisites: Postgres with PGMQ extension installed.
For quick setup:

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

For more information, see PGMQ

Usage

Note

Check pgmq-sqlalchemy Document for more examples and detailed usage.

For dispatcher.py:

from typing import List
from pgmq_sqlalchemy import PGMQueue

postgres_dsn = "postgresql://postgres:postgres@localhost:5432/postgres"

pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue("my_queue")

msg = {"key": "value", "key2": "value2"}
msg_id:int = pgmq.send("my_queue", msg)

# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch("my_queue", [msg, msg])

For consumer.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

postgres_dsn = "postgresql://postgres:postgres@localhost:5432/postgres"

pgmq = PGMQueue(dsn=postgres_dsn)

# read a single message
msg:Message = pgmq.read("my_queue")

# read a batch of messages
msgs:List[Message] = pgmq.read_batch("my_queue", 10)

For monitor.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics

postgres_dsn = "postgresql://postgres:postgres@localhost:5432/postgres"

pgmq = PGMQueue(dsn=postgres_dsn)

# get queue metrics
metrics:QueueMetrics = pgmq.metrics("my_queue")
print(metrics.queue_length)
print(metrics.total_messages)

Issue/ Contributing / Development

Welcome to open an issue or pull request !
See Development on Online Document or CONTRIBUTING.md for more information.

TODO

  • Add time-based partition option and validation to create_partitioned_queue method.
  • Read(single/batch) Archive Table ( read_archive method )
  • Detach Archive Table ( detach_archive method )
  • Add set_vt utils method.

About

More flexible PGMQ Postgres extension Python client that using SQLAlchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published