2015年10月10日土曜日

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)


下記でSQSにエンキューする部分を作ったので、
"API Gateway → Lambda → SQS"でJSONデータをエンキューする
今度はデキューする部分を作ってみます。

今まではEC2を起動して、その中で定期的にSQSをポーリングするプログラムを
実行していましたが、今ならLambdaのScheduled Eventを使うことで、
EC2無しに定期的にSQSをポーリングすることができます。

絵にすると、下記のような感じです。
(上段は前述のリンクの記事で作成したので今回は下段のLambdaを作成します)


Lambdaの設定


コードは以下のとおりです。
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueUrl = 'https://sqs.ap-northeast-1.amazonaws.com/000000000000/test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
        logger.info(event)
        client = boto3.client('sqs')

        receiveResponse = client.receive_message(
            QueueUrl = queueUrl,
            MaxNumberOfMessages = maxNumberOfMessages
        )
        logger.info(receiveResponse)
        response = receiveResponse

        if receiveResponse.has_key('Messages'):
            entries = []
            for message in receiveResponse['Messages']:
                entries.append({
                    "Id": message['MessageId'],
                    "ReceiptHandle": message['ReceiptHandle']
                })
            deleteResponse = client.delete_message_batch(
                QueueUrl = queueUrl,
                Entries = entries
            )
            logger.info(deleteResponse)
            response = deleteResponse

        return response

    except Exception as e:
        logger.error(e)
        raise e
"receive_message"で最大10件メッセージを受け取って"delete_message_batch"で
まとめて削除しています。

タイムアウトの時間も1分に変更しています。


Lambdaのテスト


まず、キューにメッセージが存在することを確認して、


Lambdaをテストします。


下記のように二つのメッセージの削除が成功したというレスポンスが返ってきて、
{
  "Successful": [
    {
      "Id": "c820ef92-7d8d-4aa4-94a0-0843e683d158"
    },
    {
      "Id": "52fa1240-bd8b-43bf-ac43-739118e613e1"
    }
  ],
  "ResponseMetadata": {
    "HTTPStatusCode": 200,
    "RequestId": "257f3292-d917-517b-8995-7c00dc7bbaa0"
  }
}

実際にキューを確認すると、メッセージ数も0になっています。


Scheduled Eventの設定


上記の処理を5分ごとに実行するためにLambdaのScheduled Eventの設定を行います。


これで5分毎にSQSをポーリングし、メッセージがあれば、
最大10件まで削除するようになったはずです。

CloudWatchでの確認


最後にキューの中のメッセージの推移をCloudWatchで確認します。


60程度のメッセージが5分毎に10程度ずつ減少していることが確認できます。



0 コメント: