Message queues
editMessage queues
editThe Node.js Agent will automatically create spans for activity to/from your Amazon SQS message queues. To record these spans, your message queue activity must occur during a transaction. If you’re performing queue operations during an HTTP request from a supported framework, the agent will start a transaction automatically. However, if you’re performing queue operations in a stand-alone program (such as a message processor), you’ll need to use the Node.js Agent’s startTransaction()
method to manually create transactions for your messages.
You can see an example of this in the following code sample.
const apm = require("elastic-apm-node").start({/*...*/}) const AWS = require("aws-sdk"); // Set the region AWS.config.update({region: "us-west"}); // Create an SQS service object const sqs = new AWS.SQS({apiVersion: "2012-11-05"}); /* ... */ const transaction = apm.startTransaction("Process Messages", "cli") sqs.receiveMessage(params, function(err, data) { if(err) { console.log("Receive Error", err); } else { console.log(`Length: ${data.Messages.length}`) /* process messages */ } // end the transaction transaction.end() })
Prior to calling the |
|
Only end the transaction after the queue’s processing callback finishes executing. The will ensure a transaction is active while processing your queue messages. |
Distributed tracing and messaging queues
editTo enable queue scheduling and queue processing with distributed tracing, use the Node.js Agent’s API to store a traceparent
header with your queue message; then, provide that traceparent
header when starting a new transaction.
Here’s a new example that uses the Node.js Agent API to store the traceparent
as a message attribute and then uses that attribute to link your new transaction with the original.
Storing the Traceparent
When sending the message, you’ll want to add the trace as one of the MessageAttributes
.
// stores the traceparent when sending the queue message const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : "" // Use the Amazon SQS `MessageAttributes` to pass // on the traceparent header const params = { /* ... other params ... */ MessageAttributes: { /* ... other attributes ... */ "MyTraceparent":{ DataType: "String", StringValue: traceParent } } } sqs.sendMessage(params, function(err, data) { /* ... */ });
This will save the traceparent value so we can use it later on when receiving the messages.
Applying the Traceparent
When we receive our queue messages, we’ll check the message for our Traceparent header, and use it to start a new transaction. By starting a transaction with this traceparent header we’ll be linking the sending and receiving via distributed tracing.
// uses the traceparent to start a transaction sqs.receiveMessage(params, function(err, data) { if(!data.Messages) { return } // loop over your returned messages for(const message of data.Messages) { // start a transaction to process each message, using our previously // saved distributed tracing traceparent header let traceparent if(message.MessageAttributes.MyTraceparent) { traceparent = message.MessageAttributes.MyTraceparent.StringValue } const transactionMessage = apm.startTransaction("RECEIVE_TRANSACTION", "cli", { childOf:traceparent }) /* ... process message ... */ transactionMessage.end() } })
Even though we only scheduled one queue message, Amazon’s SQS API returns an array of multiple messages. Therefore we’ll need to loop over each one. |
|
We extract the traceparent header we’d previously save, and use it to start a transaction. |
|
Once we’re done processing a single message, we end the transaction and move on to the next. |