share facebook facebook twitter menu hatena pocket slack

2015.12.07 MON

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

鈴木 宏康

WRITTEN BY 鈴木 宏康

下記のの記事で、Lambda(Python)を使って定期的にSQSのメッセージを受信して
削除する仕組みを作ってみました。

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)

また下記の記事では、この仕組のLambdaのコード(Python)の改善もしました。

Boto3(Python)で”Service Resource”を使ってみた(Lambda)

しかしSQSのメッセージを受信して削除しているだけなので、実質、何もしてません。

今回は下図のように、SQSのメッセージを受信して「DynamoDBにデータをPUT」して
削除するようにしてみました。

Untitled(5) (2)

Lambdaの設定

対象のコードは下記のようになりました。

import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
         logger.info(event)

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )

         messages = queue.receive_messages(
             MaxNumberOfMessages = maxNumberOfMessages
         )

         entries = []
         items = []
         for message in messages:
              entries.append({
                   "Id": message.message_id,
                   "ReceiptHandle": message.receipt_handle
              })
              items.append({
                   "uuid": uuid.uuid1().urn,
                   "key1": json.loads(message.body)['key1'],
                   "key2": json.loads(message.body)['key2'],
                   "key3": json.loads(message.body)['key3']
              })

         table = boto3.resource('dynamodb').Table('Test')
         with table.batch_writer() as batch:
             for item in items:
                  batch.put_item(Item = item)

         response = {}
         if len(entries) != 0:
             response = queue.delete_messages(
                  Entries = entries
             )

         logger.info(response)
         return response

     except Exception as e:
          logger.error(e)
          raise e

IAMの設定

LambdaからDynamoDBを操作できるようにするために、
IAMロール(lambda_basic_execution)にポリシーをアタッチします。

2015-10-10_00-58-29
2015-10-10_01-00-34
2015-10-10_01-01-51

DynamoDBの準備

こんな感じに準備しています。
2015-10-10_02-12-14

テスト

適当にAPI Gatewayに対してCURLにてデータをPOSTします。
(Post後LambdaがSQSにエンキューします)

 curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "be6ea76d033276891dcd884cf81a8602", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "43fd09b4-e772-55e3-98b2-47263fc9b8b2"}, "MessageId": "f0e908a3-f08d-4ea9-9d22-41d09a011e8b"}
$ curl -d '{"key1":"value4","key2":"value5","key3":"value6"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "0593cbab7f22d4166ce2de3c2352e869", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "5ec87932-6f15-576e-9a91-37a0b7c1205a"}, "MessageId": "6c70aac1-525c-4d72-b1e4-3f891577b9d5"}
$ curl -d '{"key1":"value7","key2":"value8","key3":"value9"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "ee7ad5a858124910f959a2823d4ceab0", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "935d4868-f9d4-5e1b-b2fd-06fa21330d55"}, "MessageId": "7ae31368-c969-4958-998f-d11ecc620651"}

数分待つと(5分ごとにLambdaがSQSをポーリングしているので)、
DynamoDBにPostしたデータがPUTされていることがわかります。

2015-10-10_02-24-56

元記事はこちら

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

鈴木 宏康

鈴木 宏康

愛知県生まれ。東京工業大学大学院修士課程修了。在学時より、ベンチャー企業でインターネットに関する業務に携わり、現在はクラウド(主にAmazon Web Services)上での開発・運用を軸とした事業の、業務の中心として活躍。