Amazon Simple Queue Service is a fully managed message queuing service that enables the developer to decouple and scale microservices, distributed systems, and serverless applications.
The AWS SQS Consumer reduces time to launch a SQS Message Consumer, by empowering developers to focus on business logic of consuming the message.
The "unique selling proposition" of AWS SQS Consumer includes:
- support for priority-based consumption of Messages across multiple Queues
- asynchronous (truly non-blocking) implementation to maximize throughput and optimize resource utilization by leveraging Asynchronous AWS SDK for Java 2.0 and Kotlin > Coroutines > Channels
- out-of-the-box exception handling via Dead-Letter Queues
- hides complexity involved in invoking SQS APIs ReceiveMessage and DeleteMessage
- cost reduction because default behavior set to Long Polling
AWS SQS Consumer has two public interfaces for the user.
- QueueConsumer - framework interface starts/stops the process of polling messages and processing/consuming them
- MessageProcessor - implementation hook for the user to implement business logic for a given Message
The usage involves implementing a MessageProcessor and wiring up the QueueConsumer
Implementing MessageProcessor
import org.apache.logging.log4j.kotlin.Logging
import software.amazon.awssdk.services.sqs.model.Message
class MyAwesomeMessageProcessor : MessageProcessor, Logging {
override suspend fun processMessage(message: Message) {
logger.info("my awesome message is ${message.body()}")
// business logic goes here
}
}
Wiring Up QueueConsumer
import org.apache.logging.log4j.kotlin.Logging
import org.seekerwing.aws.sqsconsumer.builder.SingleQueueConsumerBuilder
import org.seekerwing.aws.sqsconsumer.configuration.ConsumerConfiguration
import org.seekerwing.aws.sqsconsumer.configuration.MessageFetcherConfiguration
import org.seekerwing.aws.sqsconsumer.configuration.MessageProviderConfiguration
import org.seekerwing.aws.sqsconsumer.model.Queue
import org.seekerwing.aws.sqsconsumer.model.QueueContext
import software.amazon.awssdk.services.sqs.SqsAsyncClient
class MyAwesomeQueueConsumerApp : Logging {
private fun execute(args: Array<String>) {
logger.info("starting app with args ${args.contentToString()}")
// message processor created in previous step
val messageProcessor = MyAwesomeMessageProcessor()
// build queue object with the SQS Client and the Queue URL and the MessageProcessor that must be used to consume messages from the queue
val queue = Queue(SqsAsyncClient.create(), "https://sqs.us-east-1.amazonaws.com/777777777777/my-awesome-queue", QueueContext(messageProcessor))
// the parameters indicate number of messages fetched per poll, wait time when polling for messages, and message visibility timeout
// all the parameters are optional, user may override them as and when necessary or leave them at defaults
val messageFetcherConfiguration = MessageFetcherConfiguration(10, 20, 300)
// build message provider configuration with previously built queue and message fetcher configuration and the desired nummber of parallel pollers
// the message fetcher configuration and the number of parallel pollers configuration are optional, user may override them as and when necessary or leave them at defaults
val messageProviderConfiguration = MessageProviderConfiguration(queue, messageFetcherConfiguration, 10)
// build queue consumer with the previously built message provider configuration and consumer configuration (that defines number of parallel message processors)
// the queue consumer configuration is optional, user may override as and when necessary or leave them at defaults
val queueConsumer = SingleQueueConsumerBuilder(messageProviderConfiguration, ConsumerConfiguration(50)).build()
// start queue consumer
queueConsumer.start()
Runtime.getRuntime().addShutdownHook(Thread(Runnable { queueConsumer.stop() }))
// wait for termination
Thread.currentThread().join()
}
companion object {
fun main(args: Array<String>) {
MyAwesomeQueueConsumerApp().execute(args)
}
}
}
A: Kotlin is a JVM based language; the beauty of Kotlin is its interoperability with Java. You can read more about it at Calling Kotlin from Java and Calling Java code from Kotlin.
A: The library recommends that you bubble up your exceptions and let the SQS (re-drive DLQ) concept take care of it. Let us do a deep dive here to get a better understanding of what we're proposing. In the normal course of events, after the MessageProcessor returns control to the library (without an exception), the library deletes the message from the queue - thus marking successful completion of the message processing. However, when the MessageProcessor throws an exception, the library does not delete the message from the queue. Eventually, the visibility timeout of the message expires. When the visibility timeout of a message expires before it has been deleted explicitly by a consumer, SQS assumes that the consumer has failed to process the message and makes it available to be consumed again. SQS keeps on doing so until the maximum re-drive count is reached after which SQS pushes the message to the DLQ. We strongly recommend against writing explicit code to move messages to DLQ because it adds a point of failure and complicates the code maintenance in the user's codebase.
Q: I hear on AWS Lambda I only need to implement my business logic?
A: Yes, you are right. AWS Lambda does the heavy lifting of polling for messages and handling errors and re-driving messages so that the developer doesn't have to deal with it. You can read more about it at AWS Lambda Event Source Mapping and Asynchronous Invocation and Error Handling and Automatic Retries in AWS Lambda.
Q: Why do I need this library if AWS Lambda solves for the message processing boilerplate?
A: AWS Lambda is great for most workloads and is evolving gradually to support more use cases. However, it is fairly common for developers to use other compute platforms for not standard workloads. These are (but not limited to) EC2, EKS, ECS and Fargate. When using compute platforms where an external actor is not polling and providing messages to your compute layer, it is the developer's responsibility to code for SQS polling, deletion and error handling around messages. This library attempts to alleviate that complexity from the developer, allowing the developer to focus on the business logic of processing/consuming the message.