share facebook facebook2 twitter menu hatena pocket slack

2017.04.26 WED

Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]

那須隆

WRITTEN BY 那須隆

ナスです。

elasticsearch service (ES) 2.3 の古くなったインデックスを削除することにしたんですが、完成までわりと苦労したので書きます。

まずは ES への接続

通常、ES へは curl で操作するんですが、この curl に AWS の認証情報をつけることができません。(いや、できるかもしれんけどわからない

今回は、Python で ES に接続します。elasticsearch モジュールがあったので、これも使います。
Python Elasticsearch Client — Elasticsearch 5.2.0 documentation

AWS 認証情報を作るところは、この aws4auth を使います。


pypi.python.org

ES 側は、アクセスポリシーで特定の IAM ロールからのみアクセス許可するようにしています。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:role/rolename"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:ap-northeast-1:123456789012:domain/domainname/*"
    }
  ]
}

あとは、Lambda から ES へ接続するだけです。今回は下記のサンプルからほぼパクリました。本当にありがとうございます!

IAM ロールの切り替え(API) - AWS Identity and Access Management
AWS API を使用して、AWS アカウントのリソースへの一時的なアクセス権を付与する IAM ロールに切り替えます。

docs.aws.amazon.com

AWS LambdaからElasticsearch Serviceに接続するときに、IAM Roleを使う
Amazon ElasticSearch Serviceに接続するとき、アクセスポリシーを使い許可してあげる必要がある。いくつかテンプレートが用意されているのだが、その中の選択肢としてはIAM Userによる認証特定IPからの接続許可フルオープンとなっている。AWS Lambdaから接続するっ...

blog.youyo.info

接続できたらやりたいことを実装

テスト的に 1 週間経ったインデックスは削除するように作りました。作ったのがこれ↓
インデックス名に年月日が含まれていたので、この文字列から何日経過したかを判定しました。普段コードはあまり書かないので、変数名がもう一つかもしれません…

# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import requests
import json, os
import boto3
import datetime

region = 'ap-northeast-1'
es_endpoint = 'search-domainname-hogehoge.ap-northeast-1.es.amazonaws.com'
role_name = 'rolename'

def lambda_handler(event, context):
    es = connect_to_es()

    for key in (es.indices.get_alias()).keys():
        datename=key.split("-")
        pastdate=datetime.datetime.strptime(datename[len(datename)-1], '%Y.%m.%d')
        today=datetime.datetime.now()
        delta=(today-pastdate).days
        print(str(delta) + ' days has passed since index ' + key + ' was created.')
        if delta > 7 and es.indices.exists(index=key):
            if es.indices.delete(index=key):
                print('[INFO] Index ' + key + ' was deleted successfully.')
            else:
                print("[ERROR] Index " + key + " couldn't be deleted.")

def connect_to_es():
    credentials = get_credential()
    awsauth = AWS4Auth(
        credentials['access_key'],
        credentials['secret_key'],
        region,
        'es',
        session_token=credentials['token']
    )
    es = Elasticsearch(
        hosts=[{'host': es_endpoint, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )
    return es

def get_credential():
    sts_client = boto3.client('sts')
    assumedRoleObject = sts_client.assume_role(
        RoleArn="arn:aws:iam::123456789012:role/rolename",
        RoleSessionName="Access_to_ES_from_lambda"
    )
    credentials = assumedRoleObject['Credentials']
    return { 'access_key': credentials['AccessKeyId'],
             'secret_key': credentials['SecretAccessKey'],
             'token': credentials['SessionToken'] }

今まで curl で取ってきた JSON からデータ取り出していろいろして、ってやってたのが、elasticsearch モジュールに変えただけでむちゃくちゃシンプルなコードになりました!

本当は、Curator ってのを使って楽をしたかったのですが、Amazon ES のサポートが微妙だったので、今回はやめました。そのうち普通に使えるようになると信じてます。

elastic/curator
curator - Curator: Tending your Elasticsearch indices

github.com

ここまでたどり着くのに 1 週間弱もかかってしまったけど、これでもう困らないだろう。

元記事はこちら

Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]

那須隆

那須隆

ネットワークエンジニア、SAPコンサルタントを経て、cloudpackにJOIN。Webサイトや基幹システムのインフラ構築および運用を主に行い、シェルやPythonなどでスクリプトを組んで、インフラ運用の効率化を目指している。