share facebook facebook2 twitter menu hatena pocket slack

2011.07.04 MON

SQSのメッセージの処理状態をSimpleDBで管理

鈴木 宏康

WRITTEN BY 鈴木 宏康

とても稀にですが、下記のFAQにもある通りSQSのメッセージは、
同じものが複数回配信される可能性があるようです。

Q: How many times will I receive each message?
それぞれのメッセージは何回受信しますか?

Amazon SQS is engineered to provide “at least once” delivery
of all messages in its queues.
SQSはキュー内の全てメッセージを”少なくても一回”配信するように処理しています。

Although most of the time each message will be delivered
to your application exactly once,

それぞれのメッセージは、ほとんどの場合、使用しているアプリケーションに対して
ちょうど一回配信されますが、

you should design your system so that processing a message
more than once does not create any errors or inconsistencies.

あなたはそのアプリケーションを、同じメッセージを一回以上処理しても
エラーが発生したりデータ不整合がおきないように設計すべきです。

ということなので、既に処理したメッセージを受信した場合に処理を行わないように、
メッセージ処理に関するステータスをSimpleDBに保存し、
処理していないメッセージのみ処理するようなものを作りました。

SQSのキューの作成やメッセージの送信は、PHPでSQSの利用下記を参考にして下さい。

まずは共通部分になります。(少し関係ないものも混じっています)

▼ common.php

define("AWS_KEY"              , "AAAAAAAA");
define("AWS_SECRET_KEY" , "SSSSSSSS");
define("CP_SQS_URL_CRAWL" , "https://sqs.ap-northeast-1.amazonaws.com/00000000/crawl");
define("CP_AS_NAME" , "crawl");
define("CP_SDB_DOMAIN_MESSAGE", "message");
define("CP_SDB_DOMAIN_LOG" , "log");
date_default_timezone_set("Asia/Tokyo");

SimpleDBのドメインの作成は、下記になります。

▼ create-domain.php

require_once("/opt/cloudpack/bin/common.php");
require_once("/opt/aws/php/sdk.class.php");
$sdb = new AmazonSDB();
$sdb->set_region(AmazonSDB::REGION_APAC_NE1);
$response = $sdb->createDomain(CP_SDB_DOMAIN_MESSAGE);
var_dump($response);

そして、SQSのメッセージの取得と処理の部分です。
メッセージを処理する前に、SimpleDBを確認するようにしています。

▼ get-message.php

require_once("/opt/cloudpack/bin/common.php");
require_once("/opt/aws/php/sdk.class.php");
$sqs = new AmazonSQS();
$sdb = new AmazonSDB();
$sdb->set_region(AmazonSDB::REGION_APAC_NE1);
$response = $sqs->receive_message(CP_SQS_URL_CRAWL);
print($response->isOK() . "n");

// メッセージがあったら実行
if(isset($response->body->ReceiveMessageResult->Message)) {
$message_id = $response->body->ReceiveMessageResult->Message->MessageId;
$body = $response->body->ReceiveMessageResult->Message->Body;
$receipt_handle = $response->body->ReceiveMessageResult->Message->ReceiptHandle;
$response = $sdb->get_attributes(CP_SDB_DOMAIN_MESSAGE, $message_id);
print($response->isOK() . "n");

// まだメッセージが処理されていなかったら実行
if(!isset($response->body->GetAttributesResult->Attribute)) {
print($body . "n");
sleep(5);
// メッセージを処理したらSimpleDBに登録
$response = $sdb->put_attributes(CP_SDB_DOMAIN_MESSAGE, $message_id, array(
"timestamp" => time(),
"body" => $body
));
print($response->isOK() . "n");
}

$response = $sqs->delete_message(CP_SQS_URL_CRAWL, $receipt_handle);
print($response->isOK() . "n");
}

こちらの記事はなかの人(suz-lab)監修のもと掲載しています。
元記事は、こちら

鈴木 宏康

鈴木 宏康

愛知県生まれ。東京工業大学大学院修士課程修了。在学時より、ベンチャー企業でインターネットに関する業務に携わり、現在はクラウド(主にAmazon Web Services)上での開発・運用を軸とした事業の、業務の中心として活躍。