share facebook facebook twitter menu hatena pocket slack

2015.12.07 MON

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

鈴木 宏康

WRITTEN BY 鈴木 宏康

下記のの記事で、Lambda(Python)を使って定期的にSQSのメッセージを受信して
削除する仕組みを作ってみました。

SQSのメッセージをLambdaで5分おきに処理する(Scheduled Event)

また下記の記事では、この仕組のLambdaのコード(Python)の改善もしました。

Boto3(Python)で”Service Resource”を使ってみた(Lambda)

しかしSQSのメッセージを受信して削除しているだけなので、実質、何もしてません。

今回は下図のように、SQSのメッセージを受信して「DynamoDBにデータをPUT」して
削除するようにしてみました。

Untitled(5) (2)

Lambdaの設定

対象のコードは下記のようになりました。

import json
import uuid
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'test'
maxNumberOfMessages = 10

def lambda_handler(event, context):

    try:
         logger.info(event)

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )

         messages = queue.receive_messages(
             MaxNumberOfMessages = maxNumberOfMessages
         )

         entries = []
         items = []
         for message in messages:
              entries.append({
                   "Id": message.message_id,
                   "ReceiptHandle": message.receipt_handle
              })
              items.append({
                   "uuid": uuid.uuid1().urn,
                   "key1": json.loads(message.body)['key1'],
                   "key2": json.loads(message.body)['key2'],
                   "key3": json.loads(message.body)['key3']
              })

         table = boto3.resource('dynamodb').Table('Test')
         with table.batch_writer() as batch:
             for item in items:
                  batch.put_item(Item = item)

         response = {}
         if len(entries) != 0:
             response = queue.delete_messages(
                  Entries = entries
             )

         logger.info(response)
         return response

     except Exception as e:
          logger.error(e)
          raise e

IAMの設定

LambdaからDynamoDBを操作できるようにするために、
IAMロール(lambda_basic_execution)にポリシーをアタッチします。

2015-10-10_00-58-29
2015-10-10_01-00-34
2015-10-10_01-01-51

DynamoDBの準備

こんな感じに準備しています。
2015-10-10_02-12-14

テスト

適当にAPI Gatewayに対してCURLにてデータをPOSTします。
(Post後LambdaがSQSにエンキューします)

 curl -d '{"key1":"value1","key2":"value2","key3":"value3"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "be6ea76d033276891dcd884cf81a8602", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "43fd09b4-e772-55e3-98b2-47263fc9b8b2"}, "MessageId": "f0e908a3-f08d-4ea9-9d22-41d09a011e8b"}
$ curl -d '{"key1":"value4","key2":"value5","key3":"value6"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "0593cbab7f22d4166ce2de3c2352e869", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "5ec87932-6f15-576e-9a91-37a0b7c1205a"}, "MessageId": "6c70aac1-525c-4d72-b1e4-3f891577b9d5"}
$ curl -d '{"key1":"value7","key2":"value8","key3":"value9"}' https://8vdagewkwe.execute-api.ap-northeast-1.amazonaws.com/prod
{"MD5OfMessageBody": "ee7ad5a858124910f959a2823d4ceab0", "ResponseMetadata": {"HTTPStatusCode": 200, "RequestId": "935d4868-f9d4-5e1b-b2fd-06fa21330d55"}, "MessageId": "7ae31368-c969-4958-998f-d11ecc620651"}

数分待つと(5分ごとにLambdaがSQSをポーリングしているので)、
DynamoDBにPostしたデータがPUTされていることがわかります。

2015-10-10_02-24-56

元記事はこちら

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

鈴木 宏康

鈴木 宏康

愛知県生まれ。東京工業大学大学院修士課程修了。在学時より、ベンチャー企業でインターネットに関する業務に携わり、現在はクラウド(主にAmazon Web Services)上での開発・運用を軸とした事業の、業務の中心として活躍。

cloudpack

cloudpackは、Amazon EC2やAmazon S3をはじめとするAWSの各種プロダクトを利用する際の、導入・設計から運用保守を含んだフルマネージドのサービスを提供し、バックアップや24時間365日の監視/障害対応、技術的な問い合わせに対するサポートなどを行っております。
AWS上のインフラ構築およびAWSを活用したシステム開発など、案件のご相談はcloudpack.jpよりご連絡ください。