cloudpack の 自称 Sensu芸人 の かっぱこと 川原 洋平@inokara)です。

参考

本記事を書くにあたりまして以下の記事を参考にさせて頂きました。

有難うございます。


はじめに

イマイチ使い方が解っていないけど名前は格好いいなあと思っていた Amazon Data Pipeline についてお話を聞いた以降、ずっと気になっていたので触ってみました。(以下、 Amazon Data Pipeline は Data Pipeline と記載します)


とは言え…Amazon Data Pipeline って?

こちらの抜粋です。

AWS Data Pipeline は、指定された間隔で、信頼性のあるデータ処理やデータ移動(AWS のコンピューティングサービスやストレージサービス、ならびにオンプレミスのデータソース間)を行うことができるウェブサービスです。AWS Data Pipeline を使用すると、保存場所にあるお客様のデータに定期的にアクセスし、そのスケールで変換と処理を行い、その結果を Amazon S3、Amazon RDS、Amazon DynamoDB、Amazon Elastic MapReduce(EMR)のような AWS サービスに効率的に転送できます。

とあります。例えば、以下のような処理を行わせたい場合に EC2 上でスクリプト作って cron 仕込んで…と頑張れば出来そうなことを Amazon のサービスを利用することでスクリプトや cron のメンテナンスを行う手間を省くことが出来ます。

  • Amazon S3 に保存されている CSV や TSV データを定期的に Amazon RDS や Amazon EMR でバッチ処理させたい
  • バッチ処理させた結果を Amazon S3 に保存させたい
  • オンプレミスの DB サーバーから取得したデータを Amazon EMR で分析させて Amazon の RDS や Redshift に保存したい

上記の例を見て頂くとお気づきかと思いますが Data Pipeline を一言で言うと…「データを中心とした保存や処理のワークフローを提供するサービス」という感じでしょうか。

尚、Data Pipeline は以下のような機能を提供します。

  • ジョブのスケジュール設定、実行、および再試行ロジック
  • データの実行処理の追跡
  • 処理が失敗した際の通知
  • 処理に必要なリソース

まさにバッチ処理スクリプトと cron の機能そのものですね。


ひとまず Hello Data Pipeline

とりあえず Data Pipeline を使ってみたいと思います。

何をする?

以下を参考に Amazon S3 から Amazon S3 へのデータコピーをやってみたいと思います。

以下のようなイメージです。

01

非常にシンプルな構成ですね。

設定の流れ

設定は以下のような流れで設定します。

  1. from のバケットを用意する(DataNodes にて設定する)
  2. to のバケットを用意する(DataNodes にて設定する)
  3. データコピー作業で利用するリソースを定義する(Resources にて設定する)
  4. バケット間のデータコピー作業を定義する(Activities にて設定する)

尚、Data Pipeline には VisioCacoo のように構成図を書くようなノリで各種の機能アイコンを並べて視覚的に設定を行うことが出来ます。

02

設定

Pipeline を作成

まずは Create Pipeline を選択して Pipeline を作成します。

03

Run every にて 15 minute(s) を設定します。尚、Data Pipeline では 15 分毎が最小単位となるので注意しましょう。尚、ドキュメントには…

The minimum period is 15 minutes and the maximum period is 3 years.

とありますので最長で 3 年毎のタスクを Data Pipeline に任せられるようです。気の長い話ですなあ。

ということで、Name と Description と Run every を設定したら Create をクリックします。

DataNodes で S3 のバケットをそれぞれ定義する

DataNodes メニューでコピー元とコピー先のバケットを定義します。

04

Add data node をクリックして DataNode を作ります。今回はデータのコピー元とコピー先で二つの DataNode を作成します。青で囲まれた DataNode をコピー元(Name は from_ にします)、緑で囲まれた DataNode をコピー先(Name を to_ にします)とします。それぞれの DataNode では Name 以外に以下のような項目を設定します。

  • Type は S3DataNode を選択する(他にも DynamoDBDataNode / RedshiftDataNode 等が選択出来ますが詳しくはこちら
  • Schedule は Pipeline 作成時点の 15 分毎の設定が既に適用されている
  • Add an optional field… から File Path を選択する
  • File Path を選択すると図のように File Path にテキスト入力ボックスが現れるのでコピー元、コピー先のオブジェクトを選択する

尚、指定方法についてはドキュメントの以下の項目が参考になりました。

#{@scheduledStartTime.format(‘YYYYMMDD’)} こんな感じでタスクスケジュールの開始日時を表現出来るので日付でディレクトリを作成する際等に有効かと思います。

Activities でデータコピーを定義する

Activities でデータのコピー作業を定義します。

05

Add activity をクリックして Activity を作成します。オレンジで囲まれた歯車アイコンが出来ます。Name は適当に付けます。(今回は copy という名前にしました。)続いて以下の作業を行います。

  • Type にて CopyActivity を選択する(他にも EmrActivity や Hive Activity がありますが詳しくは詳しくはこちら
  • Output に出力先に指定している DataNode 名(to_)
  • Input に入力元に指定している DataNode 名(from_)
  • Add an optional field… から Runs On を選択する
  • Runs On プルダウンから DefaultResource1 を選択する

Runs On で指定している DefaultResource1 はまだ定義していないので次のステップで定義します。

Resources でデータコピーで利用するリソースを定義する

Resources でデータコピーで利用するリソースを設定します。

06

Activities の Runs On で指定した DefaultResource1 が既に作成されているので以下の項目を設定します。

  • Type プルダウンから Ec2Resource を選択する
  • Add an optional field… から Runs On を選択する
  • Instance Type にて t1.micro を設定する

尚、Instance Type を指定しない場合には m1.small がデフォルトで利用される。利用出来るインスタンスタイプはこちらをご確認下さい。

Pipeline を保存、そして有効にする

ひと通り設定が終わったら Save pipeline をクリックします。

07

上記のように Warning! が表示されますが、今回は無視させて頂きます。ちなみに、Warning や Error に関しては下記のように確認出来ます。

08

Pipeline 自体のログを出力する logUri を設定しろとのことですね。尚、ログは Amazon S3 のバケットに保存することが出来ます。また、terminateAfter は使い終わったリソース(インスタンス)を停止するまでにどのくらい待つかを指定します。今回はお試しなのでどちらも指定しません。

09

次に Activate をクリックすると再び Warning! が表示されますが Continue をクリックして続けると以下のように表示されます。

10


で、Data Pipeline されているか確認

設定した Pipeline が動作しているかざくっと確認してみましょう。

Pipeline のリスト

11

上記のように設定した Pipeline が確認出来ます。

コピー元

コピー元のバケットを確認してみましょう。

12

sample.txt はあらかじめてアップロードしておきました。

コピー先

暫く時間を置いた後でコピー先のバケットを確認してみましょう。

13

sample.txt が見事にコピーされていますね。


最後に

今回やったこと

  • Data Pipeline がどんなサービスがざっくり理解しました
  • Data Pipeline で簡単な Pipeline を設定しました
  • 簡単なバケット間のファイルコピーを設定しました

次にやってみたいこと

  • データコピーだけではなくもう少し複雑な作業を試してみたいと思います

感想

  • Data Pipeline は使えば Cron を使ったバッチ処理を Amazon のサービスにお任せ出来るのは嬉しい
  • こちらの資料にあるように Data Pipeline と S3 が AWS のビッグデータ関連サービス(どのサービスまでが範疇かな…)の中核になるサービスではないかな…

元記事は、こちら