2015年11月6日金曜日

Lambda(Python)からKinesisにPut(API)してLambda(Python)でGet(Event)する


こんな感じです。


API(Boto3)でPutするのは、まあ当たり前ですが、Getに関しては、
Lambdaの"event source"にKinesisというものがあり、それを設定することにより
イベントドリブンでKinesisのデータをLambdaで処理することができます。

ということで、作ってみます。

Kinesisの作成


AWSマネジメントコンソールより下記のように作成します。


IAMの設定


Lambdaに設定するIAMロール(lambda_basic_exection)にKinesisが扱える
マネージドポリシー(AmazonKinesisFullAccess)をアタッチします。

 

KinesisへPutするLambdaファンクション


コードは、こんな感じです。
import logging
import boto3

def lambda_handler(event, context):

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    try:
        logger.info(event)
        response = boto3.client('kinesis').put_record(
            StreamName = "test",
            Data = "test",
            PartitionKey = "test",
        )
        logger.info(response)
        return response

    except Exception as e:
        logger.error(e)
        raise e

実行すると、レスポンスは次のように返ってきます。
{
  "ShardId": "shardId-000000000000",
  "ResponseMetadata": {
    "HTTPStatusCode": 200,
    "RequestId": "eef4ccde-840d-11e5-9d26-6b863e58e437"
  },
  "SequenceNumber": "49556079134761214959115455796946952254907110183804076034"
}

Kinesisのモニタリングで確認しても、
ちゃんとPutレコードがカウントされていることがわかります。


KinesisからGetするLambdaファンクション


コードは、こんな感じです。
import logging
import base64

def lambda_handler(event, context):

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    try:

        logger.info(event)

        payloads = []
        for record in event['Records']:
            payload = base64.b64decode(record["kinesis"]["data"])
            payloads.append(payload)
        logger.info(payloads)

        return payloads

    except Exception as e:
        logger.error(e)
        raise e

そして、Event Sourceとして、Kinesisを追加します。


CloudWatch Logsを確認すると、ちゃんとKinesisのEvent Sourceを設定した、
Lambdaファンクションが実行されログが出力されていることがわかります。


CloudWatchのMetricsでもGetリクエストで確認することができます。


0 コメント: