Skip to content

Commit

Permalink
Merge branch 'feat/snapshot-submitted-event' into dockerify
Browse files Browse the repository at this point in the history
  • Loading branch information
xadahiya committed Jun 14, 2023
2 parents c8ba160 0a366c4 commit 6717d0d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 37 deletions.
102 changes: 81 additions & 21 deletions go/goutils/taskmgr/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 7,7 @@ import (
"net"
"strconv"

"github.com/cenkalti/backoff/v4"
"github.com/swagftw/gi"

"audit-protocol/goutils/settings"
Expand All @@ -18,10 19,18 @@ import (
)

type RabbitmqTaskMgr struct {
conn *amqp.Connection
settings *settings.SettingsObj
consumeConn *amqp.Connection
publishConn *amqp.Connection
settings *settings.SettingsObj
}

type ConnectionType string

const (
Consumer ConnectionType = "consumer"
Publisher ConnectionType = "publisher"
)

var _ taskmgr.TaskMgr = &RabbitmqTaskMgr{}

// NewRabbitmqTaskMgr returns a new rabbitmq task manager
Expand All @@ -31,14 40,31 @@ func NewRabbitmqTaskMgr() *RabbitmqTaskMgr {
log.WithError(err).Fatalf("failed to invoke settingsObj object")
}

conn, err := Dial(settingsObj)
consumeConn := new(amqp.Connection)
publishConn := new(amqp.Connection)

err = backoff.Retry(func() error {
consumeConn, err = Dial(settingsObj)

return err
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
if err != nil {
log.WithError(err).Fatalf("failed to connect to rabbitmq")
}

err = backoff.Retry(func() error {
publishConn, err = Dial(settingsObj)

return err
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
if err != nil {
log.WithError(err).Fatalf("failed to connect to rabbitmq")
}

taskMgr := &RabbitmqTaskMgr{
conn: conn,
settings: settingsObj,
consumeConn: consumeConn,
publishConn: publishConn,
settings: settingsObj,
}

if err := gi.Inject(taskMgr); err != nil {
Expand All @@ -50,6 76,7 @@ func NewRabbitmqTaskMgr() *RabbitmqTaskMgr {
return taskMgr
}

// TODO: improve publishing logic with channel pooling
func (r *RabbitmqTaskMgr) Publish(ctx context.Context, workerType worker.Type, msg []byte) error {
defer func() {
// recover from panic
Expand All @@ -58,26 85,32 @@ func (r *RabbitmqTaskMgr) Publish(ctx context.Context, workerType worker.Type, m
}
}()

channel, err := r.getChannel(workerType)
channel, err := r.getChannel(workerType, Publisher)
if err != nil {
return err
}

defer func(channel *amqp.Channel) {
err = channel.Close()
if err != nil {
log.WithError(err).Error("failed to close rabbitmq channel")
}
}(channel)

exchange := r.getExchange(workerType)
routingKey := r.getRoutingKeys(workerType)

err = channel.Publish(
exchange,
routingKey[0],
false, // mandatory
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: msg,
},
)

if err != nil {
log.WithError(err).
WithField("exchange", exchange).
Expand All @@ -90,22 123,38 @@ func (r *RabbitmqTaskMgr) Publish(ctx context.Context, workerType worker.Type, m

// getChannel returns a channel from the connection
// this method is also used to create a new channel if channel is closed
func (r *RabbitmqTaskMgr) getChannel(workerType worker.Type) (*amqp.Channel, error) {
if r.conn == nil || r.conn.IsClosed() {
func (r *RabbitmqTaskMgr) getChannel(workerType worker.Type, connectionType ConnectionType) (*amqp.Channel, error) {
var conn *amqp.Connection

switch connectionType {
case Consumer:
conn = r.consumeConn
case Publisher:
conn = r.publishConn
}

if conn == nil || conn.IsClosed() {
log.Debug("rabbitmq connection is closed, reconnecting")
var err error

r.conn, err = Dial(r.settings)
conn, err = Dial(r.settings)
if err != nil {
return nil, err
}

switch connectionType {
case Consumer:
r.consumeConn = conn
case Publisher:
r.publishConn = conn
}
}

channel, err := r.conn.Channel()
channel, err := conn.Channel()
if err != nil {
log.Errorf("failed to open a channel on rabbitmq: %v", err)

return nil, taskmgr.ErrConsumerInitFailed
return nil, err
}

exchange := r.getExchange(workerType)
Expand All @@ -114,7 163,7 @@ func (r *RabbitmqTaskMgr) getChannel(workerType worker.Type) (*amqp.Channel, err
if err != nil {
log.Errorf("failed to declare an exchange on rabbitmq: %v", err)

return nil, taskmgr.ErrConsumerInitFailed
return nil, err
}

// declare the queue
Expand All @@ -125,7 174,7 @@ func (r *RabbitmqTaskMgr) getChannel(workerType worker.Type) (*amqp.Channel, err
if err != nil {
log.Errorf("failed to declare a queue on rabbitmq: %v", err)

return nil, taskmgr.ErrConsumerInitFailed
return nil, err
}

// bind the queue to the exchange on the routing keys
Expand All @@ -134,7 183,7 @@ func (r *RabbitmqTaskMgr) getChannel(workerType worker.Type) (*amqp.Channel, err
if err != nil {
log.WithField("routingKey", routingKey).Errorf("failed to bind a queue on rabbitmq: %v", err)

return nil, taskmgr.ErrConsumerInitFailed
return nil, err
}
}

Expand All @@ -149,7 198,8 @@ func (r *RabbitmqTaskMgr) Consume(ctx context.Context, workerType worker.Type, m
log.Errorf("recovered from panic: %v", p)
}
}()
channel, err := r.getChannel(workerType)

channel, err := r.getChannel(workerType, Consumer)
if err != nil {
return err
}
Expand All @@ -162,7 212,7 @@ func (r *RabbitmqTaskMgr) Consume(ctx context.Context, workerType worker.Type, m
}(channel)

defer func() {
err = r.conn.Close()
err = r.consumeConn.Close()
if err != nil && !errors.Is(err, amqp.ErrClosed) {
log.Errorf("failed to close connection on rabbitmq: %v", err)
}
Expand Down Expand Up @@ -190,7 240,7 @@ func (r *RabbitmqTaskMgr) Consume(ctx context.Context, workerType worker.Type, m
connCloseChan := make(chan *amqp.Error, 1)
channelCloseChan := make(chan *amqp.Error, 1)

connCloseChan = r.conn.NotifyClose(connCloseChan)
connCloseChan = r.consumeConn.NotifyClose(connCloseChan)
channelCloseChan = channel.NotifyClose(channelCloseChan)

go func() {
Expand Down Expand Up @@ -272,15 322,25 @@ func (r *RabbitmqTaskMgr) getRoutingKeys(workerType worker.Type) []string {
}

func (r *RabbitmqTaskMgr) Shutdown(ctx context.Context) error {
if r.conn == nil || r.conn.IsClosed() {
if r.consumeConn == nil || r.consumeConn.IsClosed() {
return nil
}

if err := r.conn.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
if err := r.consumeConn.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
log.Errorf("failed to close connection on rabbitmq: %v", err)

return err
}

if r.publishConn == nil || r.publishConn.IsClosed() {
return nil
}

if err := r.publishConn.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
log.WithError(err).Error("failed to close connection on rabbitmq")

return err
}

return nil
}
5 changes: 0 additions & 5 deletions go/goutils/taskmgr/taskmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 2,6 @@ package taskmgr

import (
"context"
"errors"

"github.com/streadway/amqp"

Expand Down Expand Up @@ -78,7 77,3 @@ func (t *Task) Nack(requeue bool) error {
}

type ErrTaskMgr error

var (
ErrConsumerInitFailed = errors.New("failed to initialize consumer")
)
20 changes: 9 additions & 11 deletions go/payload-commit/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 31,17 @@ func (w *Worker) ConsumeTask() error {
// start consuming messages in separate go routine.
// messages will be sent back over taskChan.
go func() {
err := backoff.Retry(func() error {
err := w.taskmgr.Consume(context.Background(), workerInterface.TypePayloadCommitWorker, taskChan)
if err != nil {
log.WithError(err).Error("failed to consume the message, retrying")

return err
}
for {
_ = backoff.Retry(func() error {
err := w.taskmgr.Consume(context.Background(), workerInterface.TypePayloadCommitWorker, taskChan)
if err != nil {
log.WithError(err).Error("failed to consume the message, retrying")

return nil
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
return err
}

if err != nil {
log.WithError(err).Fatal("failed to consume the messages after max retries")
return nil
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
}
}()

Expand Down

0 comments on commit 6717d0d

Please sign in to comment.