umma.dev

AWS: Messaging and Queues

Getting Started with Amazon Simple Queue Service

Creating a Lambda Function to Send Messages to SQS

  • Author from scratch
  • Function name: <func-name>
  • Runtime: Python 3.12
  • Execution: choose an existing one or create a new one
  • Click Create function
  • Replace the existing code:
import json
import boto3
import random

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    queue_url = '<YOUR_QUEUE_URL>'

    # Generate multiple messages for different orders
    orders = [
        {"orderId": str(random.randint(10000, 99999)), "product": "Book", "quantity": 1},
        {"orderId": str(random.randint(10000, 99999)), "product": "Laptop", "quantity": 2},
        {"orderId": str(random.randint(10000, 99999)), "product": "Headphones", "quantity": 1},
        {"orderId": str(random.randint(10000, 99999)), "product": "Camera", "quantity": 1},
    ]

    for order in orders:
        # Send each order as a message
        response = sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(order)
        )
        print(f"Sent order {order['orderId']} - Response: {response['MessageId']}")

    return {
        'statusCode': 200,
        'body': json.dumps('All orders sent successfully!'),
        'orders: ' : orders
    }
  • Click Deploy
  • Click Test

Creating a Lambda Function to Receive Messages from SQS

  • Author from scratch
  • Function name: <func-name>
  • Runtime: Python 3.12
  • Execution role: choose an existing one or create one
  • Click Create function
  • Replace the code:
import json
import boto3
import random

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    queue_url = '<YOUR_QUEUE_URL>'

    # List of available delivery trucks
    delivery_trucks = ['Truck-1', 'Truck-2', 'Truck-3', 'Truck-4']

    # Randomly choose the number of messages to process (between 1 and 4)
    num_messages_to_process = random.randint(1, 4)

    # Poll messages (up to the randomly chosen number)
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=num_messages_to_process,
        WaitTimeSeconds=10
    )

    messages = response.get('Messages', [])

    if not messages:
        return {
            'statusCode': 200,
            'body': 'No messages to process.'
        }

    # Store processed order details for the response
    processed_orders = []

    for message in messages:
        order = json.loads(message['Body'])

        # Randomly assign the order to a delivery truck
        assigned_truck = random.choice(delivery_trucks)

        # Record the processed order details
        processed_order = {
            "orderId": order['orderId'],
            "product": order['product'],
            "quantity": order['quantity'],
            "assignedTruck": assigned_truck
        }
        processed_orders.append(processed_order)

        # Simulate order processing log
        print(f"Order {processed_order['orderId']} - Product: {processed_order['product']} "
              f"(Quantity: {processed_order['quantity']}) has been assigned to {processed_order['assignedTruck']} "
              f"for delivery.")

        # Delete the message from the queue after processing
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )

    # Return the response as a dictionary to avoid escape characters
    return {
        'statusCode': 200,
        'body': {
            'message': 'Processed random orders and assigned to delivery trucks!',
            'processedOrders': processed_orders
        }
    }
  • Click Deploy
  • Click Test

Creating an Amazon SNS Standard Topic

Create an SNS Standard Topic

  • In SNS, look for Create topic
  • Type in a topic name and click Next step
  • Topic type: standard
  • Name: <name-of-topic>
  • Leave all other settings as default and click Create topic

Create a Lambda Function to Process SNS Messages

  • Author from scratch
  • Name: <func-name>
  • Runtime: Python 3.12
  • Execution role: create a new one or choose an existing one
  • Click Create function
  • Replace the code:
import json

def lambda_handler(event, context):
    # Print the event received from SNS
    print("Received event from SNS:")
    print(json.dumps(event, separators=(',', ':')))

    # Process each record in the event
    for record in event['Records']:
        # Access the SNS message inside the Sns object
        sns_message = record['Sns']['Message']
        print(f"Processing SNS message: {sns_message}")

    return {
        'statusCode': 200,
        'body': json.dumps('Messages processed successfully!')
    }
  • Click Deploy
  • Click Test

Create a Subscription for the Topic

  • In the SNS Topic created earlier, click Create subscription
  • Protocol: AWS Lambda
  • Endpoint: Lambda function created earlier
  • Click Create subscription

Publish a Message to the SNS Topic and Verify

  • In SNS, go the topic created and click on Topics (on the left)
  • Click Publish message (top right)
  • Message details:
    • Subject: <subject-test>
    • Message body: <message-body>
  • Click Publish message
  • In the Lambda function, go to Monitoring tab and then View logs in CloudWatch - check the log for something like Processing SNS message

Configuring Dead Letter Queues (DLQ) in Amazon SQS

Create the Main SQS Queue

  • Click on Create Queue
  • Type: standard
  • Name: <queue-name>
  • Leave all other settings as default and click Create Queue

Create the DLQ (Dead Letter Queue)

  • Click Create queue
  • Name: <dlq-name>
  • Type: standard
  • Leave all other settings as default and click Create Queue

Configure the Main Queue to use the DLQ

  • In the main queue you created earlier, click on Edit
    • Scroll down to the Dead-letter queue section and select Enable
    • Select the dlq created earlier and set the max receives to 3
    • Click Save changes

Simulate Processing Failures

  • Author from scratch
  • Function name: <func-name>
  • Runtime: Python 3.12
  • Execution role: choose an existing one or create one
  • Click Create function
  • Replace the code:
import json

     def lambda_handler(event, context):
         # Simulate failure by always returning an error
         print("Simulating message processing failure")
         raise Exception("Processing failed")
  • Click Deploy
  • Go to the Lambda triggers tab and click Configure Lambda function trigger
  • Choose the function created and click Save
  • Wait for the status to change to Enabled

Send Messages to Main Queue

  • Select the main queue and click Send and receive messages
  • Fill out the form and click Send message
  • Send multiple messages for testing purposes

## Observe Messages in the DLQ

  • In the dlq created, click Send and receive messages and then click Poll for messages
  • Check to confirm that failed messages have been transferred

Understanding Visibility Timeout in Amazon SQS

Create an SQS Queue

  • Standard queue
  • Name: <queue-name>
  • Configuration: set Default Visibility Timeout to 30 seconds
  • Leave all other settings as default and click Create Queue

Create a Lambda Function to Simulate Message Processing

  • Author from scratch
  • Name: <func-name>
  • Runtime: Python 3.12
  • Execution role: choose an existing one or create a new one
  • Click Create function
  • Replace the code:

import time

def lambda_handler(event, context):
    for record in event['Records']:
        print("Processing message:", record['body'])
        time.sleep(45)  # Simulate processing delay (longer than visibility timeout)
        print("Finished processing")
  • Click Deploy
  • Configuration tab > General configuration > Edit > Timeout to 30 seconds
  • In SQS, go the queue created > Lambda triggers > Configure Lambda function trigger
  • Choose lambda function previously created and click Save
  • Wait for status to change to Enabled

Send a Message to the Queue

  • In the queue created, click Send and receive messages
  • Fill out the form and click Send Message

Observe Message Behaviour with Visibility Timeout

  • In AWS Lambda, go to the Monitoring tab and click CloudWatch Logs Console
  • Looking for Processing message

Modify the Lambda Function to Simulate Successful Processing

  • In the Lambda function, modify the code:
import json

def lambda_handler(event, context):
    for record in event['Records']:
        print("Successfully processed message:", record['body'])
    return {
        'statusCode': 200,
        'body': 'Success'
    }
  • Click Deploy

Observe the Message Result

  • Check the logs to see if you can see Successfully processed message

Amazon SNS Email Subscription

Create an SNS Topic

  • Type: standard
  • Name: <topic-name>
  • Click Create Topic

Create an Email Subscription

  • In the Subscription tab click Create subscription
  • Topic ARN: (automatic)
  • Protocol: email
  • Endpoint: provide an email address
  • Click Create subscription

Confirm the Email Subscription

  • Click Confirm subscription in the email sent via AWS
  • Check the status to go from Pending confirmation to Confirmed

Publish a Test Message to the Topic

  • Click Publish message in SNS
  • Subject: <subject-title>
  • Message: <message-body>
  • Click Publish message
  • Check inbox

Creating a FIFO Topic in Amazon SNS

Create a FIFO Topic

  • In SNS, click Create topic and give the topic a name (make sure it ends in .fifo)
  • Topic type: FIFO
  • Content-based deduplication: ticked
  • Leave all other settings as default
  • Click Create topic

Publish a Message to the FIFO Topic

  • Once the topic has been created, you’ll be redirected to the details page
  • Click on Publish message
  • Message details section:
    • Subject: <subject-description>
    • Message Group ID: a unique identifier required for all FIFO topics (all messages with same group ID will be processed in the exact order they were received)
    • Message deduplication ID: if enabled, leave blank otherwise provide an ID
    • Message body: <message>
    • Leave all other settings as default
    • Click Publish message

Integrating SNS FIFO Topic with SQS FIFO Queue

Create a FIFO Topic

  • In Amazon SNS, click on Topics (on the left)
  • Click Create topic
  • Topic type: FIFO
  • Name: <Topic-Name.fifo>
  • Content-based deduplication: enabled
  • Leave all other settings as default and click Create topic

Create a SQS FIFO Queue

  • In Amazon SQS, click Create queue
  • Queue type: FIFO
  • Name: <Queue-Name.fifo>
  • Content-based deduplication: enabled
  • Leave all other settings as default and click Create queue

Subscribe the SQS FIFO Queue to the SNS FIFO Topic

  • Navigate to the topic created earlier and look for Subscriptions
  • Click on Create subscription
  • Choose Subscription Protocol
  • Protocol: Amazon SQS
  • Endpoint: <Queue-Name.fifo>
  • Enable raw message delivery: enabled
  • Click Create subscription

Configure SQS Queue for SNS

  • Navigate to the queue previously created
  • Look for Queue policies
  • Click on edit and paste the following policy to grant permission to the SNS FIFO topic:
{
  "Version": "2012-10-17",
  "Id": "QueuePolicy",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": "*",
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:<region>:<account-id>:MyFifoQueue.fifo",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:sns:<region>:<account-id>:MyFifoTopic.fifo"
        }
      }
    }
  ]
}
  • Click Save

Publish Multiple Messages to Test FIFO Ordering

  • Go to the topic created and click on Publish message
  • Publish three messages, ensuring each has the same message group ID

Verifying FIFO Behaviour in the SQS Queue

  • Poll the SQS queue messages by navigating to the SQS console and select the queue created earlier
  • Click Send and receive messages
  • Click Poll for messages
  • Click on each ID