share facebook facebook2 twitter menu hatena pocket slack

2016.06.13 MON

Boto3のヘルパークラスを作った【cloudpack大阪BLOG】

廣山 豊

WRITTEN BY廣山 豊

これまでBoto3を使う時にコピペで流用していたコード部分をライブラリ化しました。
ついでに、PyPIに登録して世にリリースしてみました。
以下で利用可能になります。

# pip install boto3helper

機能の紹介

いろいろ機能は作っていく予定ですが、今回は、各リソースに対してイテレート対策を紹介します。

Strategyパターン風にイテレートするよう実装しました。
Throttlingによるリトライも考慮しています。

これをコーディングすると、結構なボリュームになります。
例えば、各EC2インスタンスに対し処理(インスタンスIDを出力)を行う場合は、以下のようになります。

#!/usr/bin/python
# coding:utf-8

import boto3
from boto3.session import Session
import botocore

session ={"accessKey" : "hogehoge...",
                  "secretKey"  : "fugafuga...",
                  "region"        : "ap-northeast-1など"}

client = session.client('ec2')


def describe_instances_force_1st():
    try:
        response = client.describe_instances()
    except botocore.exceptions.ClientError:
        if not 'ThrottlingException' in sys.exc_info():
            raise
        time.sleep(1.0)
        response = describe_instances_force_1st()

    return response

def describe_instances_force(next_token):
    try:
        response = client.describe_instances(
            NextToken = next_token
        )
    except botocore.exceptions.ClientError:
        if not 'ThrottlingException' in sys.exc_info():
            raise
        time.sleep(1.0)
        response = describe_instances_force(next_token)

    return response


if __name__ == "__main__":

    first_call = True
    result     = True
    while True:
        if first_call:
            resources = describe_instances_force_1st()
            first_call = False
        else:
            resources = describe_instances_force(next_token)

        for reservation in resources['Reservations']:
            instances = reservation['Instances']
            for instance in instances:
                print('InstanceId'),
                print(instance['InstanceId'])

        # 次のページなし
        if not resources.has_key('NextToken'):
            break
        # 次のページへのポインタを取得
        next_token = resources['NextToken']

boto3helperを使うことで、以下のようになります。

#!/usr/bin/python
# coding:utf-8

import boto3helper

# 各インスタンスに対して実行する処理
def test_func(instance, args):
    print(args['arg1']),
    print(instance[args['arg2']])


if __name__ == "__main__":

    credential ={"accessKey" : "hogehoge...",
                         "secretKey"  : "fugafuga...",
                         "region"        : "ap-northeast-1など"}

    o = boto3helper.ec2_helper.EC2_Helper(credential)
    o.exec_func_each_instances(test_func,
                               arg1 = 'InstanceId: ',
                               arg2 = 'InstanceId')

テストコードが適当だったり、対応サービスも少なかったりまだまだ未熟なライブラリですが、コツコツとアップデートしておきますので、気になった方は使ってみてください。


github.com

元記事はこちら

Boto3のヘルパークラスを作った【cloudpack大阪BLOG】