Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQS Support #374

Merged
merged 9 commits into from
Sep 1, 2022
Merged

Add SQS Support #374

merged 9 commits into from
Sep 1, 2022

Conversation

tomazfernandes
Copy link
Contributor

@tomazfernandes tomazfernandes commented May 6, 2022

Resolves #344

Adds support for listening to SQS queues and handling messages. (WIP)

Features

  • See SqsIntegrationTests for usage examples and features
  • Provides almost all features from the previous version and a few new ones
  • Full Async support via CompletableFuture
  • Fully configurable via familiar Spring abstractions

Shortcomings (for now...)

  • No documentation or unit tests
  • Template not implemented
  • Support for FIFO queues not implemented
  • Lots of ground yet to cover 😄

I tried to follow the provided general guidelines and what I thought would make sense for the project. I'm ok with discussing and or changing any of it, both design and implementation-wise.

I'll wait for feedback before adding anything else - hopefully this will be a good enough fit for us to work together on going forward. If it doesn't look right, or perhaps another direction is preferred, no worries, we can set it aside and let the team come with something else later.

As a note, there's a messaging support module that provides a set of abstractions for adding support to any number of messaging systems - Kinesis comes to mind, but there sure are others in the AWS landscape. It was largely inspired by the design of the Spring Kafka project - that's a battle tested design that most users and engineers should be comfortable with. If that seems like an overkill for this project, it should be easy enough merging the two modules and getting rid of some abstractions.

This is a great project and I'm really thrilled to perhaps be taking a small part into building it! 😄

Thanks!

@github-actions github-actions bot added the type: dependency-upgrade Dependency version bump label May 6, 2022
@maciejwalkowiak
Copy link
Contributor

Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days. Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38.

After very quick look - we've decided to drop @EnableXXX annotations and support only auto-configurations. There should be still a way for users who do not rely on autoconfigurations to use SQS integration, but then they should rather manually do what auto-configuration does. With @EnableXXX we would have to register bean definitions which causes headaches we wanted to avoid.

Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :)

@maciejwalkowiak maciejwalkowiak added component: sqs SQS integration related issue type: feature Integration with a new AWS service or bigger change in existing integration and removed type: dependency-upgrade Dependency version bump labels May 6, 2022
@maciejwalkowiak maciejwalkowiak added this to the 3.0.0 M3 milestone May 6, 2022
if (attemptsLeft == 0) {
throw new IllegalStateException("Could not retrieve properties for queue " queue, e);
}
Thread.sleep(8000 / attemptsLeft);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that AWS clients has a built-in retry mechanisms, perhaps we don't need to repeat on failure when getting queue attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days.

Thanks for the swift response @maciejwalkowiak!

Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38.

Sounds good! Maybe we can work on a PoC and see how it looks like.

After very quick look - we've decided to drop @EnableXXX annotations and support only auto-configurations.

Sure, I had noticed the project didn't make use of such annotations but decided to leave auto-configuration out of the scope of this first draft. I can add it.

Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :)

Thanks for looking into this! Please let me know if there are any suggestions or concerns - this is meant to be a draft we can improve on rather than some final solution.

Also, there are a few points I think we should address if we're moving forward with this, for example:

  • I tried to leverage existing code as much as possible, specially since it was well adapted to the Spring Messaging interfaces and I wanted to have a working draft quickly. But while I think the AbstractMessageHandler solution is great for what the previous version achieved - handling all queues from the same container and resolving methods at runtime - I'm not sure it's the best solution if we're having a single container per @SqsListener annotation, since we can just have the Method and invoke it directly.

  • The Spring Messaging project doesn't have much support for CompletableFuture-based async - I had to come up with a pretty hacky workaround to make a fully async solution. It does have support for reactive though. Maybe adding such support is something worth discussing with that team?

  • I broke down the container runtime logic into many small pieces - I like this kind of design where we can easily swap components to change behavior. But if it looks too fragmented we can change it to a single larger class.

Thanks again!

@tomazfernandes
Copy link
Contributor Author

Quick question @maciejwalkowiak - there seems to be an error related to setting the AWS region on the CI build. Since this is a proverbial case of it works on my machine 😄, I wonder if it is a known issue with a known solution. If not I can look into it.

Thanks!

@maciejwalkowiak
Copy link
Contributor

maciejwalkowiak commented May 6, 2022

Sounds good! Maybe we can work on a PoC and see how it looks like.

@arunkpatra plans to take a look so let's keep him in the look.

Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).

I can't wait to dig deeper into this PR!

Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.

One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust spring-integration-aws to the new implementation.

@tomazfernandes
Copy link
Contributor Author

@arunkpatra plans to take a look so let's keep him in the look.

Sure, no hurry.

Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).

Sure, I'm using LocalStack for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.

I can't wait to dig deeper into this PR!

Cool, looking forward for your review too!

Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.

I've actually enjoyed most of what it reported 😄 Of course, some less-than-critical things, but nothing to write home about.
What I didn't change was some things related to duplicated code in legacy classes and a couple Thread.sleep in ITs for seeing how the system performs under load - we should probably remove it afterwards.

One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust spring-integration-aws to the new implementation.

I see, maybe we'll need to make a couple adjustments, but at a first glance seems simple enough. I'll keep that in mind, and maybe we can involve them when it's appropriate.

Thanks!

@tomazfernandes
Copy link
Contributor Author

Sure, I'm using LocalStack for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.

Well, it's been a while since I've last dealt with AWS configuration issues, and I'm not sure exactly how these properties are mapped by the framework. I'll take a better look on Monday, and also implement auto-configuration for this - probably with that we'll have a properly configured SQSAsyncClient for the CI tests.

In the meantime, let me know if there are any further questions or concerns about any of this.

Thanks!

@maciejwalkowiak
Copy link
Contributor

@tomazfernandes I pushed fixing tests to your branch

@maciejwalkowiak maciejwalkowiak mentioned this pull request May 6, 2022
@tomazfernandes
Copy link
Contributor Author

@tomazfernandes I pushed fixing tests to your branch

Nice, thanks @maciejwalkowiak!

Copy link
Contributor

@eddumelendez eddumelendez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your contribution @tomazfernandes ! I have left some small comments. I would be checking spring kafka during the week in order to get more familiar with it.

<name>Spring Cloud AWS Messaging Support</name>
<description>Spring Cloud Support for AWS Messaging Services</description>

<properties>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those properties should be dropped. the project currently relies on spring-cloud-build and that's the place where the java version is specific

* @author Tomaz Fernandes
* @since 3.0
*/
public class MessagingUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the methods in this class are just used in one single place I would suggest to keep simple validations where they are used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't really understand what you're suggesting - you mean having if (xpto != null) { ... } etc?

This class was inspired by SK's JavaUtils class and is used a lot there, so I think as we keep working on this we'll probably find quite a few uses for this pattern.

* @author Tomaz Fernandes
* @since 3.0
*/
public class MessagingConfigUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in favor to move those constants to the places where they are used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #374 (comment)

Comment on lines 48 to 63
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<!-- AWS SDK v1 is required by testcontainers-localstack -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess tests are going to be implemented later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I prefer to have the team take a look at this PR first, and if we're moving through with it I can implement the unit tests and add docs for this.

spring-cloud-aws-sqs/pom.xml Outdated Show resolved Hide resolved
for (Map.Entry<MessageSystemAttributeName, String> attributeKeyValuePair : message.attributes().entrySet()) {
messageHeaders.put(attributeKeyValuePair.getKey().name(), attributeKeyValuePair.getValue());
}
return messageHeaders;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return messageHeaders;
return Collections.unmodifiableMap(messageHeaders);

Comment on lines 137 to 140
Map<String, Object> messageHeaders = new HashMap<>();
for (Map.Entry<MessageSystemAttributeName, String> attributeKeyValuePair : message.attributes().entrySet()) {
messageHeaders.put(attributeKeyValuePair.getKey().name(), attributeKeyValuePair.getValue());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be moved to make use of streams api

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But I think there's a good chance we'll end up doing some refactoring, or perhaps rewriting, to this and other parts I've brought from the previous version, so if that's ok I prefer deferring this change for a moment.


@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS).withReuse(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to use withReuse

Suggested change
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS).withReuse(true);
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not be related to this PR, but the question has been raised I want to share what I have in Spring Integration AWS for Localstack Testcontainers: https://github.com/spring-projects/spring-integration-aws/blob/main/src/test/java/org/springframework/integration/aws/LocalstackContainerTest.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just replied here

#374 (comment)

Comment on lines 51 to 64
static void beforeAll() throws IOException, InterruptedException {
// create needed queues in SQS
// TODO: Not working as expected due to some port mapping issue - will look into in the future
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name",
RECEIVES_MESSAGE_QUEUE_NAME);
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name",
DOES_NOT_ACK_ON_ERROR_QUEUE_NAME);
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name",
RECEIVE_FROM_MANY_1_QUEUE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For initialization resources we can use docker-entrypoint-initaws.d

v0.9.0: Enhance integration with Serverless; refactor CloudFormation implementation; add support for Step Functions, IAM, STS; fix CloudFormation integration; support mounting Lambda code locally; add docker-entrypoint-initaws.d dir for initializing resources; add S3Event Parser for Lambda; fix S3 chunk encoding; fix S3 multipart upload notification; add dotnetcore2.1 and ruby2.5 Lambda runtimes; fix issues with JDK 9; install ES plugins available in AWS

From changelog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then you can't use constants or any other variables there. As long as the number of commands is short, for me its easier to have them in the same file. I think we can start using docker-entrypoint-initaws.d but then it should be consistent across tests - so perhaps something for a separate PR.

@tomazfernandes
Copy link
Contributor Author

thanks for your contribution @tomazfernandes !

Thanks a lot for looking into this @eddumelendez!

I have left some small comments.

Sure, I'll address them, thanks.

I would be checking spring kafka during the week in order to get more familiar with it.

Sounds great! Let me know if there's anything I can help with - I've been contributing to that project for a bit more than a year now and know the codebase reasonably well at this point - and surely would enjoy discussing both that and this design.

Also, I'm not sure how familiar you are with the previous version of the SQS integration - that would be a good thing to look at as I took a few classes and code from there. Although of course we can rewrite anything we feel could be different.

A few quick notes:

  • I tried to make use of what I thought was essential from the Spring Kafka project, but I didn't try to make an exact copy - perhaps there are some things I may have missed
  • I got 'creative' for the container part - there we have a large KafkaMessageListenerContainer class that handles most runtime logic, here I tried to break it down into smaller components, such as Ack, MessageProducer and MessageListener
  • Some of the design for this PR was constrained by the use of the MessageHandler interface, which was the main logic for the previous version, including annotation processing and runtime method resolving / invoking. I think we could look into decoupling those roles, separating annotation processing from method invoking.

Please let me know if there's anything else I can help with. Thanks!

@tomazfernandes
Copy link
Contributor Author

I've made a few adjustments to performance and ran some preliminary tests against AWS.

For a load of 1K messages with a 1s processing time (Thread.sleep), the result was:
10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742

Running the same test with the current 2.4 version, the result was:
65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359

Of course, that difference is kind of expected given the 10 thread / queue limitation of the previous version, but still should be a happy gain for users. The performance is further improved when there's more than 1K messages in the queue e.g. from a previously interrupted test.

I think we can expect better performance from a true async workload, e.g. the listener using other async methods such as perhaps saving something to DynamoDB or any async http calls, and returning a CompletableFuture<Void>, instead of a blocking load such as Thread.sleep. I'll look into implementing such test.

Also, probably there's room for performance improvement in the logic itself - we can look into that in the future.

Thanks!

@tomazfernandes
Copy link
Contributor Author

I think my next move here would be taking yet another page from the Spring Kafka project and replacing the MessageHandler interface by a BeanPostProcessor for annotation processing / Endpoint creation and InvocableHandlerMethod for method invocation.

With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler interface is not well suited to, such as async listener methods returning a CompletableFuture<Void> and batch processing listener methods that receive a Collection<Message> as an argument.

And after that, add auto-configuration with this new structure.

Since that should be quite a bit of work, I think I'll let the team catch up with what we already have first, to make sure we're all in the same page.

Thanks and please let me know if there's anything I can help with.

Copy link
Contributor

@maciejwalkowiak maciejwalkowiak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation looks like you're definitely know what you're doing, but I am getting lost in abstractions.

I am 100% confident we should continue with this design but we either have to simplify it (for example by merging these two modules) or document it.

  1. Simplification - I understand the reasoning for keeping them separate. The only possible implementation I can think of is AWS IOT but not sure if this will really fit.
  2. Documentation - at least on the Javadoc level, it must come anyways. But I think it would be really really great if you could describe either with words or a diagram how all of these pieces are meant to work together.

I see that container can have single interceptor - perhaps we should consider adding more? For example, adding a possibility to measure execution time and send it via Micrometer, or adding tracing with Sleuth.

Please add auto-configuration (we do not need @EnableSqs) and tests. I imagine integration tests cover big chunk of this change and I do not mean that we have to have a unit test for each class, but I do believe some pieces here should be unit tested.

.collect(Collectors.toList());
}

private static SqsMessageProducer createMessageProducer(SqsAsyncClient sqsClient, String logicalEndpointName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see this method is used only in line 44 and does not add more than just calling constructor. Perhaps we can remove it and inline calling constructor in line 44?

Copy link
Contributor Author

@tomazfernandes tomazfernandes Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, TBH, I think there are a couple of things here to reason about and improve.

For example, I mostly relied on creating a SqsAsyncClient bean and injecting / passing it along to all containers.

But it seems that the client has a maximum number of concurrent connections, something around 40 if I recall correctly. So I'm thinking maybe we should have a SqsMessageProducerFactory that creates a separate SqsAsyncClient instance per container, and the corresponding MessageProducer.

How does that sound? We do have ConsumerFactory and ProducerFactory on Spring Kafka.

* @since 3.0
*/
@SpringBootTest
@DirtiesContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be able to get rid of this before merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly, you mean the Integration Test suite as a whole, or the DirtiesContext annotation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean @DirtiesContext

@maciejwalkowiak
Copy link
Contributor

I think my next move here would be taking yet another page from the Spring Kafka project and replacing the MessageHandler interface by a BeanPostProcessor for annotation processing / Endpoint creation and InvocableHandlerMethod for method invocation.

With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler interface is not well suited to, such as async listener methods returning a CompletableFuture and batch processing listener methods that receive a Collection as an argument.

And after that, add auto-configuration with this new structure.

Sounds like a reasonable thing to do. Go for it 👍

@tomazfernandes
Copy link
Contributor Author

Implementation looks like you're definitely know what you're doing, but I am getting lost in abstractions.

Yeah, well, I definitely mostly know what I'm doing 😄 The bits from Spring Kafka definitely help having a solid foundation to build upon.

I am 100% confident we should continue with this design but we either have to simplify it (for example by merging these two modules) or document it.

  1. Simplification - I understand the reasoning for keeping them separate. The only possible implementation I can think of is AWS IOT but not sure if this will really fit.
  2. Documentation - at least on the Javadoc level, it must come anyways. But I think it would be really really great if you could describe either with words or a diagram how all of these pieces are meant to work together.

Sure, that was something in my mind too, specially given this is a subproject rather than an entire project on its own.

I can add some javadocs and create some diagrams as you suggested to help everyone navigate the code easier. I actually looked into creating the diagrams today, but failed to find a proper tool. I'll give IntelliJ's UML diagrams another shot. Otherwise I can also write in words somewhere how the components are related - it's not that hard after you get the bigger picture.

That should give everyone a better sense on how the components are related and what role they play at assembly and runtime. I think then we'll be able to get a better feel of the overall complexity and if and how we should reduce it - including whether to join the modules or not.

I see that container can have single interceptor - perhaps we should consider adding more? For example, adding a possibility to measure execution time and send it via Micrometer, or adding tracing with Sleuth.

Yes, I'm aware of that. TBH, I just wanted to be able to implement the visibility extension feature, so I just added the one 😄 I'll change that to a list of interceptors.

Please add auto-configuration (we do not need @EnableSqs)

Sure. This will come after the BeanPostProcessor change, since that impacts how the feature is bootstrapped.

and tests. I imagine integration tests cover big chunk of this change and I do not mean that we have to have a unit test for each class, but I do believe some pieces here should be unit tested.

Sure, I'll implement UTs and ITs after this is a bit more mature. Same with javadocs and documentation.

@tomazfernandes
Copy link
Contributor Author

@maciejwalkowiak, I've added simple javadocs to the assembly-time components just to give a jump start on this.

Below is a high-level view of the design, focusing at first at assembly-time components and flows, which I think are harder to understand. Feel free to ask if you (or anyone) have any questions or suggestions.

Afterwards we can dive into the runtime components, and some other aspects of the design.

I hope this makes sense to everyone 😄

Component types

  • Assembly-time components are invoked during application startup to create the runtime components. Those are the components in the config and endpoint packages, such as MessageListenerContainerFactory and Endpoint.

  • Runtime components - basically everything used at runtime, such as method invocation, payload conversion, and the MessageListenerContainer with its many sub-components.

Abstraction levels

In terms of abstraction I think in three levels:

  • Interface level - top level abstractions. As a rule of thumb, should only interact with other interfaces.

  • Abstract level - base classes that implement the interfaces. Should only interact with other abstract-level components and interfaces.

  • Concrete level - lower level abstractions that represent concrete logic for a given messaging system (SQS).

In the messaging-support module we have the first two layers of abstraction, and in the sqs module we have the third.

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Jun 3, 2022

Assembly time flow

This is a bit more complicated to illustrate in writing, but hopefully it'll be helpful.

  • SqsEndpointMessageHandler

The SqsEndpointMessageHandler bean detects @SqsListener annotations and extracts the information that will be used to populate the SQSEndpoint instances. It implements the EndpointRegistry interface, which means it'll store the endpoints for later processing.

It also gathers the methods that will be invoked to handle messages at runtime.

This is the part that should be replaced by the BeanPostProcessor approach.

  • EndpointProcessor

Then the EndpointProcessor fetches the EndpointRegistry bean (in our case the message handler above) and feeds the Endpoint instances to the corresponding MessageListenerContainerFactory instance. In this version, the Endpoint has the factoryBeanName field - Spring Kafka handles it differently and more complex.

The factory creates the appropriate MessageListenerContainer instance, which is then registered in the ListenerContainerRegistry.

  • MessageListenerContainerFactory

The MessageListenerContainerFactory bean name comes from the Endpoint instance, which can either have been specified through the @SqsListener annotation or have the default value.

Each factory will have a FactoryOptions instance (user provided or default). The AbstractFactory should only know about the AbstractFactoryOptions and FactoryOptions, so any configuration needed for the AbstractFactory to do its work should be there. Same goes for the AbstractEndpoint.

The factory will combine configuration from the FactoryOptions with configuration from the Endpoint to create a ContainerOptions instance.

Then it creates the MessageListenerContainer instance.

  • MessageListenerContainer

This is the class that will be used at runtime to fetch and process the actual messages. This part is more straightforward I guess. Currently, all queue names provided in the same @SqsListener annotations are handled by the same container, although each will have its own MessageProducer.

Again, the AsbtractMessageListenerContainer should only know about AbstractContainerOptions.

@tomazfernandes
Copy link
Contributor Author

Added a bunch of unit tests. Coverage is now at ~93% combined with integration tests.

@tomazfernandes
Copy link
Contributor Author

Fixed it, you can run again, thanks!

@tomazfernandes
Copy link
Contributor Author

One quick note: I think I may have eventually committed a CrossRegionS3Client.java that seems to be auto-generated by the maven build.

Not sure if it's a problem, haven't looked too deep into it and I can revert it if it is. Maybe we should add it to .gitignore and let the CI create it on the fly?

Thanks

}
else if (object instanceof Collection) {
if (this.parallelLifecycle) {
CompletableFuture.allOf(((Collection<?>) object).stream().map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could try catch here for QueueAttributesResolvingException and add a failure analyzer for this exception so that the error massage in the console is self explanatory.

@maciejwalkowiak
Copy link
Contributor

We need to merge this tomazfernandes#1 and 👍 from my side to merge.

In subsequent PRs:

  1. add sample project (I'm already on it so don't bother, will send PR once this PR is merged)
  2. failure analyzer for QueueAttributesResolvingException
  3. SqsTemplate

2 & 3 can be done in M3.

@maciejwalkowiak
Copy link
Contributor

Regarding CrossRegionS3Client - it is committed on purpose so that we can see if there are any changes when AWS SDK is bumped.

@maciejwalkowiak
Copy link
Contributor

One more thing I noticed that is missing is package-info.java files for default @NotNull

tomazfernandes and others added 9 commits August 31, 2022 22:18
Resolves awspring#344

Initial draft
Improve TaskExecutor handling

Add Interceptor after processing stage

Move polling logic to source

Add BackPressureHandler

Add MessageProcessingPipeline

Add Batch Processing Support

Add Blocking Interfaces

Add Async / Blocking adapters

Review container interfaces

Add SqsException and SqsListenerException

Add SQS AutoConfiguration

Autoconfigure SqsAsyncClient and SqsMessageListenerContainerFactory beans

Import SqsBootstrapConfiguration

Add method-level javadocs

Add class-level javadocs

Add VisibilityExtensionInterceptor

Add SqsListenerCustomizer

Improve startup performance

Remove messaging support module

Join modules

Add MessageListener

Add javadocs for assembly time components

Add `id` to Container, Endpoint and Message

Add AbstractEndpoint
Add MessageVisibilityExtendingSinkAdapter

Improve Sink ThreadPool Configuration
Add Automatic Payload Conversion by Header

QueueAttributes improvements

Lifecycle improvements
Improve Thread Management
Improve BackPressureMode

Change package for Conversion Context

Improve Message Conversion

Improve Manual Acknowledgement

Extract AbstractMessageConvertingMessageSource

Use TaskExecutor interface instead of Executor

Add builders to Options, Factory and Container

Add QueueNotFoundStrategy

Add SQS auto-configuration to spring.factories

Add SQS starter

Disable formatting for some chained methods

Also some ternary conditions

Apply spotless

Add license header where missing

Begin Acknowledgement unit tests

Minor improvements

Adjust FIFO default acknowledgement orderings

Improve extensibility

Improve MessageListener creation

Introduce Buffering Acknowledgement Processor

Small fixes and improvements

Add Async and Fail on Ack integration tests

Improve BackPressure handling

Add set to ContainerOptions methods

Other minor improvements

Lifecycle Improvements

MessageProcessingPipeline fixes and improvements

Change afterProcessing return type to Void
Fix code highlighting in documentation

Add auto-configuration props to docs

Address suggestions

Add SqsListenerAnnotationBeanPostProcessorTests

Add SQS properties to auto-configuration

Use context ObjectMapper

Improve Visibility and tests
Add package-info.java

Increase Unit Test Coverage

Combine Acknowledgement Interfaces

Support more than 10 maxMessagesPerPoll

Add AcknowledgementOrdering.ORDERED_BY_GROUP

Add BatchAcknowledgement

Add AsyncBatchAcknowledgement

Add tests

Add AcknowledgementResultCallback

Add tests

Replace ExpressionHelper with StringValueResolver

Add maxMessagesPerPoll to SqsListener

Disable container reuse for TC

Use ApplicationContextRunner in tests

Address review suggestions

Clean up tests

Fix InterceptorIntegrationTest

Make ComponentFactory a collection

Add Unit Tests

Apply Spotless
@sonarcloud
Copy link

sonarcloud bot commented Sep 1, 2022

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 19 Code Smells

No Coverage information No Coverage information
0.5% 0.5% Duplication

@tomazfernandes tomazfernandes merged commit f507c9c into awspring:main Sep 1, 2022
@tomazfernandes
Copy link
Contributor Author

We need to merge this tomazfernandes#1 and 👍 from my side to merge.

I thought I had this change, but actually this is for the starter and I had the sqs module. Can you point this to main instead?

In subsequent PRs:

  1. add sample project (I'm already on it so don't bother, will send PR once this PR is merged)
  2. failure analyzer for QueueAttributesResolvingException
  3. SqsTemplate

2 & 3 can be done in M3.

Sure, let's see how this first milestone goes and what issues appear. Let me know if there's anything else needed for M2.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue type: dependency-upgrade Dependency version bump type: documentation Documentation or Samples related issue type: feature Integration with a new AWS service or bigger change in existing integration
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Integration with SQS
6 participants