2015年10月10日土曜日

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする


下記のの記事で、Lambda(Python)を使って定期的にSQSのメッセージを受信して
削除する仕組みを作ってみました。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
また下記の記事では、この仕組のLambdaのコード(Python)の改善もしました。
Boto3(Python)で"Service Resource"を使ってみた(Lambda)
しかしSQSのメッセージを受信して削除しているだけなので、実質、何もしてません。

今回は下図のように、SQSのメッセージを受信して「DynamoDBにデータをPUT」して
削除するようにしてみました。


Lambdaの設定


対象のコードは下記のようになりました。
import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
        logger.info(event)

        queue = boto3.resource('sqs').get_queue_by_name(
            QueueName = queueName
        )

        messages = queue.receive_messages(
            MaxNumberOfMessages = maxNumberOfMessages
        )

        entries = []
        items = []
        for message in messages:
            entries.append({
                "Id": message.message_id,
                "ReceiptHandle": message.receipt_handle
            })
            items.append({
                "uuid": uuid.uuid1().urn,
                "key1": json.loads(message.body)['key1'],
                "key2": json.loads(message.body)['key2'],
                "key3": json.loads(message.body)['key3']
            })

        table = boto3.resource('dynamodb').Table('Test')
        with table.batch_writer() as batch:
            for item in items:
                batch.put_item(Item = item)

        response = {}
        if len(entries) != 0:
            response = queue.delete_messages(
                Entries = entries
            )

        logger.info(response)
        return response

    except Exception as e:
        logger.error(e)
        raise e

IAMの設定


LambdaからDynamoDBを操作できるようにするために、
IAMロール(lambda_basic_execution)にポリシーをアタッチします。


DynamoDBの準備


こんな感じに準備しています。


テスト


適当にAPI Gatewayに対してCURLにてデータをPOSTします。
(Post後LambdaがSQSにエンキューします)
$ curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "be6ea76d033276891dcd884cf81a8602", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "43fd09b4-e772-55e3-98b2-47263fc9b8b2"}, "MessageId": "f0e908a3-f08d-4ea9-9d22-41d09a011e8b"}
$ curl -d '{"key1":"value4","key2":"value5","key3":"value6"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "0593cbab7f22d4166ce2de3c2352e869", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "5ec87932-6f15-576e-9a91-37a0b7c1205a"}, "MessageId": "6c70aac1-525c-4d72-b1e4-3f891577b9d5"}
$ curl -d '{"key1":"value7","key2":"value8","key3":"value9"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "ee7ad5a858124910f959a2823d4ceab0", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "935d4868-f9d4-5e1b-b2fd-06fa21330d55"}, "MessageId": "7ae31368-c969-4958-998f-d11ecc620651"}

数分待つと(5分ごとにLambdaがSQSをポーリングしているので)、
DynamoDBにPostしたデータがPUTされていることがわかります。



0 コメント: