2011年6月28日火曜日

SQSのメッセージの処理状態をSimpleDBで管理

スズキです。

下記のFAQにある通り非常に稀なのですが、
SQSのメッセージは同じものが複数回配信される可能性があるようです。
Q: How many times will I receive each message?
それぞれのメッセージは何回受信しますか?

Amazon SQS is engineered to provide “at least once” delivery
of all messages in its queues.
SQSはキュー内の全てメッセージを"少なくても一回"配信するように処理しています。

Although most of the time each message will be delivered
to your application exactly once,

それぞれのメッセージは、ほとんどの場合、使用しているアプリケーションに対して
ちょうど一回配信されますが、

you should design your system so that processing a message
more than once does not create any errors or inconsistencies.

あなたはそのアプリケーションを、同じメッセージを一回以上処理しても
エラーが発生したりデータ不整合がおきないように設計すべきです。
ということで、既に処理したメッセージを受信した場合に処理を行わないように、
メッセージ処理に関するステータスをSimpleDBに保存して、
処理していないメッセージのみ処理するようなものを作ってみました。

SQSのキューの作成やメッセージの送信は下記を参考にして下さい。
PHPでSQS
まずは共通部分です。(いろいろ関係ないのも混じってますが...)

▼ common.php
define("AWS_KEY"              , "AAAAAAAA");
define("AWS_SECRET_KEY"       , "SSSSSSSS");
define("CP_SQS_URL_CRAWL"     , "https://sqs.ap-northeast-1.amazonaws.com/00000000/crawl");
define("CP_AS_NAME"           , "crawl");
define("CP_SDB_DOMAIN_MESSAGE", "message");
define("CP_SDB_DOMAIN_LOG"    , "log");
date_default_timezone_set("Asia/Tokyo");

SimpleDBのドメインの作成は、こんな感じです。

▼ create-domain.php
require_once("/opt/cloudpack/bin/common.php");
require_once("/opt/aws/php/sdk.class.php");
$sdb = new AmazonSDB();
$sdb->set_region(AmazonSDB::REGION_APAC_NE1);
$response = $sdb->createDomain(CP_SDB_DOMAIN_MESSAGE); 
var_dump($response);

そして、SQSのメッセージの取得と処理の部分です。
メッセージを処理する前にSimpleDBを確認するようにしています。

▼ get-message.php
require_once("/opt/cloudpack/bin/common.php");
require_once("/opt/aws/php/sdk.class.php");
$sqs = new AmazonSQS();
$sdb = new AmazonSDB();
$sdb->set_region(AmazonSDB::REGION_APAC_NE1);
$response = $sqs->receive_message(CP_SQS_URL_CRAWL);
print($response->isOK() . "\n");

// メッセージがあったら実行
if(isset($response->body->ReceiveMessageResult->Message)) {
    $message_id     = $response->body->ReceiveMessageResult->Message->MessageId;
    $body           = $response->body->ReceiveMessageResult->Message->Body;
    $receipt_handle = $response->body->ReceiveMessageResult->Message->ReceiptHandle;
    $response = $sdb->get_attributes(CP_SDB_DOMAIN_MESSAGE, $message_id);
    print($response->isOK() . "\n");

    // まだメッセージが処理されていなかったら実行
    if(!isset($response->body->GetAttributesResult->Attribute)) {
        print($body . "\n");
        sleep(5);
        // メッセージを処理したらSimpleDBに登録
        $response = $sdb->put_attributes(CP_SDB_DOMAIN_MESSAGE, $message_id, array(
            "timestamp" => time(),
            "body"      => $body
        ), true);
        print($response->isOK() . "\n");
    }

    $response = $sqs->delete_message(CP_SQS_URL_CRAWL, $receipt_handle);
    print($response->isOK() . "\n");
}

下記にもマージしておこう。
"SQS"と"CloudWatch"と"Auto Scaling"
--------
http://www.suz-lab.com

0 コメント: