2015年10月11日日曜日

"Scheduled Event"の"Lambda"の処理を並列化する


下記の記事でLambdaのScheduled Eventを使ってSQSを定期的にポーリングして
メッセージを処理する仕組みを作りました。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)
しかし、SQSのメッセージを一度に取得できるのは10件までで
(MaxNumberOfMessagesの最大が10)、そうなると下図のように100件の
メッセージを処理するのに1時間近くかかってしまいます。


ということで、5分毎のメッセージ処理を並列化してみました。仕組みとしては、
下図のように、5分毎に起動するLambdaファンクション(test-cron)は、
もうメッセージの処理は行わず、キュー上のメッセージの数に応じて
複数のメッセージを処理するLambdaファンクション(tset-sqs2ddb)をSNS経由で
呼び出す形としています。


全体の構成は、こんな感じになります。


上図までの経緯は、下記記事が参考になります。

それでは次の三つのLambdaファンクションを準備していきます。
  • test-api : API Gatewayのバックエンド。パラメーターをSQSにエンキュー。
  • test-cron : 5分毎に起動。メッセージ数に応じてSNSにPublish。
  • test-sqs2bdb : SNSから起動。SQSのメッセージの内容をDynamoDBにPUT。

Lambdaの設定(test-api)


コードは、こんな感じです。
import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'

def lambda_handler(event, context):

    try:
        logger.info(event)
        responses = []
        
        responses.append(
            boto3.resource('sqs').get_queue_by_name(
                QueueName = queueName
            ).send_message(
                MessageBody = json.dumps(event)
            )
        )

        logger.info(responses)
        return responses

    except Exception as e:
        logger.error(e)
        raise e
API Gatewayとの連携は下記を参考に。
"API Gateway"のバックエンドを"Lambda"にしてJSONデータをエコーさせる

Lambdaの設定(test-cron)


まずは、SNSのトピックを作成します。

 

Lambdaからトピックが利用できるようにするために
IAMロール(lambda_basic_execution)にポリシーをアタッチします。


準備はできました。そしてコードは、こんな感じです。
import math
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
arn = 'arn:aws:sns:ap-northeast-1:000000000000:test'

def lambda_handler(event, context):

    try:
        logger.info(event)
        responses = []

        numberOfMessages = boto3.resource('sqs').get_queue_by_name(
            QueueName = queueName
        ).attributes['ApproximateNumberOfMessages']
        numberOfPublish = int(math.ceil(float(numberOfMessages) / 10))

        for i in range(numberOfPublish):
            responses.append(
                boto3.resource('sns').Topic(arn).publish(
                    Message = 'test'
                )
            )

        logger.info(responses)
        return responses

    except Exception as e:
        logger.error(e)
        raise e
Publish(test-sqs2ddbを実行)する回数(numberOfPublish)の決定は次の通りです。
(メッセージ数 / 10)の切り上げ
これはtest-sqs2ddbが一度に取得できるメッセージ数の上限が10だからです。
(SQSのAPIであるReceiveMessageの属性MaxNumberOfMessagesの上限が10)

Scheduled Eventの設定は下記を参考に。
SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)


Lambdaの設定(test-sqs2bdb)

コードは、こんな感じです。
import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
        logger.info(event)
        responses = []

        queue = boto3.resource('sqs').get_queue_by_name(
            QueueName = queueName
        )

        messages = queue.receive_messages(
            MaxNumberOfMessages = maxNumberOfMessages
        )

        entries = []
        items = []
        for message in messages:
            entries.append({
                "Id": message.message_id,
                "ReceiptHandle": message.receipt_handle
            })
            items.append({
                "uuid": uuid.uuid1().urn,
                "key1": json.loads(message.body)['key1'],
                "key2": json.loads(message.body)['key2'],
                "key3": json.loads(message.body)['key3']
            })

        table = boto3.resource('dynamodb').Table('Test')
        with table.batch_writer() as batch:
            for item in items:
                batch.put_item(
                    Item = item
                )

        if len(entries) != 0:
            responses.append(
                queue.delete_messages(
                    Entries = entries
                )
            )

        logger.info(responses)
        return responses

    except Exception as e:
        logger.error(e)
        raise e

SNS Eventの設定も行います。


テスト


あらかじめ次のコマンドSQSにメッセージを作成しておきます。
$ curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod

手動でトピックをPublishしてみます。


するとLambda(test-sqs2bdb)が起動されDynamoDBのテーブルにデータが入ります。


次に同様に100件SQSにメッセージを作成しておきます。


手動でPublishしなくても5分毎にLambda(test-cron)が起動してPublishしているので、
やはり、DynamoDBのテーブルにデータが入ります。


下図のように、より短時間で100件のメッセージを処理していることもわかります。


今まで処理量は下図のように5分で10件でしたが、今回は5分で
全体のほとんど(80件程度)を処理してます。(まあそういう仕組みにしたんですが...)

0 コメント: