Skip to content

The AWS SQS Consumer reduces time to launch a SQS Message Consumer, by empowering developers to focus on business logic of consuming the message.

License

Notifications You must be signed in to change notification settings

SeekerWing/aws-sqs-consumer

Repository files navigation

aws-sqs-consumer

Build Status Download Coverage Status Code Climate Codacy Badge Kotlin version badge License

Introduction

Amazon Simple Queue Service is a fully managed message queuing service that enables the developer to decouple and scale microservices, distributed systems, and serverless applications.

The AWS SQS Consumer reduces time to launch a SQS Message Consumer, by empowering developers to focus on business logic of consuming the message.

The "unique selling proposition" of AWS SQS Consumer includes:

Design

design diagram

User Guide

AWS SQS Consumer has two public interfaces for the user.

  • QueueConsumer - framework interface starts/stops the process of polling messages and processing/consuming them
  • MessageProcessor - implementation hook for the user to implement business logic for a given Message

The usage involves implementing a MessageProcessor and wiring up the QueueConsumer

Implementing MessageProcessor

import org.apache.logging.log4j.kotlin.Logging
import software.amazon.awssdk.services.sqs.model.Message

class MyAwesomeMessageProcessor : MessageProcessor, Logging {

    override suspend fun processMessage(message: Message) {
        logger.info("my awesome message is ${message.body()}")
        // business logic goes here
    }
}

Wiring Up QueueConsumer

import org.apache.logging.log4j.kotlin.Logging
import org.seekerwing.aws.sqsconsumer.builder.SingleQueueConsumerBuilder
import org.seekerwing.aws.sqsconsumer.configuration.ConsumerConfiguration
import org.seekerwing.aws.sqsconsumer.configuration.MessageFetcherConfiguration
import org.seekerwing.aws.sqsconsumer.configuration.MessageProviderConfiguration
import org.seekerwing.aws.sqsconsumer.model.Queue
import org.seekerwing.aws.sqsconsumer.model.QueueContext
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class MyAwesomeQueueConsumerApp : Logging {

    private fun execute(args: Array<String>) {
        logger.info("starting app with args ${args.contentToString()}")

        // message processor created in previous step
        val messageProcessor = MyAwesomeMessageProcessor()

        // build queue object with the SQS Client and the Queue URL and the MessageProcessor that must be used to consume messages from the queue
        val queue = Queue(SqsAsyncClient.create(), "https://sqs.us-east-1.amazonaws.com/777777777777/my-awesome-queue", QueueContext(messageProcessor))

        // the parameters indicate number of messages fetched per poll, wait time when polling for messages, and message visibility timeout
        // all the parameters are optional, user may override them as and when necessary or leave them at defaults
        val messageFetcherConfiguration = MessageFetcherConfiguration(10, 20, 300)

        // build message provider configuration with previously built queue and message fetcher configuration and the desired nummber of parallel pollers
        // the message fetcher configuration and the number of parallel pollers configuration are optional, user may override them as and when necessary or leave them at defaults
        val messageProviderConfiguration = MessageProviderConfiguration(queue, messageFetcherConfiguration, 10)

        // build queue consumer with the previously built message provider configuration and consumer configuration (that defines number of parallel message processors)
        // the queue consumer configuration is optional, user may override as and when necessary or leave them at defaults
        val queueConsumer = SingleQueueConsumerBuilder(messageProviderConfiguration, ConsumerConfiguration(50)).build()

        // start queue consumer
        queueConsumer.start()
        Runtime.getRuntime().addShutdownHook(Thread(Runnable { queueConsumer.stop() }))
        // wait for termination
        Thread.currentThread().join()
    }

    companion object {
        fun main(args: Array<String>) {
            MyAwesomeQueueConsumerApp().execute(args)
        }
    }
}

FAQ

Q: I don't speak Kotlin, can I still use this library?

A: Kotlin is a JVM based language; the beauty of Kotlin is its interoperability with Java. You can read more about it at Calling Kotlin from Java and Calling Java code from Kotlin.

Q: How do I handle errors encountered while processing a message?

A: The library recommends that you bubble up your exceptions and let the SQS (re-drive DLQ) concept take care of it. Let us do a deep dive here to get a better understanding of what we're proposing. In the normal course of events, after the MessageProcessor returns control to the library (without an exception), the library deletes the message from the queue - thus marking successful completion of the message processing. However, when the MessageProcessor throws an exception, the library does not delete the message from the queue. Eventually, the visibility timeout of the message expires. When the visibility timeout of a message expires before it has been deleted explicitly by a consumer, SQS assumes that the consumer has failed to process the message and makes it available to be consumed again. SQS keeps on doing so until the maximum re-drive count is reached after which SQS pushes the message to the DLQ. We strongly recommend against writing explicit code to move messages to DLQ because it adds a point of failure and complicates the code maintenance in the user's codebase.

Q: I hear on AWS Lambda I only need to implement my business logic?

A: Yes, you are right. AWS Lambda does the heavy lifting of polling for messages and handling errors and re-driving messages so that the developer doesn't have to deal with it. You can read more about it at AWS Lambda Event Source Mapping and Asynchronous Invocation and Error Handling and Automatic Retries in AWS Lambda.

Q: Why do I need this library if AWS Lambda solves for the message processing boilerplate?

A: AWS Lambda is great for most workloads and is evolving gradually to support more use cases. However, it is fairly common for developers to use other compute platforms for not standard workloads. These are (but not limited to) EC2, EKS, ECS and Fargate. When using compute platforms where an external actor is not polling and providing messages to your compute layer, it is the developer's responsibility to code for SQS polling, deletion and error handling around messages. This library attempts to alleviate that complexity from the developer, allowing the developer to focus on the business logic of processing/consuming the message.

About

The AWS SQS Consumer reduces time to launch a SQS Message Consumer, by empowering developers to focus on business logic of consuming the message.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages