share facebook facebook twitter menu hatena pocket slack

2018.08.27 MON

PySparkでMySQLからのデータ取得&集計方法

河村 敏貴

WRITTEN BY 河村 敏貴

MySQLに対してSQLでよくやるようなデータの取得や集計などをPySparkのDataFrameだとどうやるのか調べてみましたので、備忘録として残しておきたいと思います。
検証環境は以前紹介したDockerではじめるPySparkをベースにDockerで環境を構築しいます。
こういった検証にDockerはすごく便利でいいですね

環境

  • PySpark 2.2
  • MySQL5.7

データはMySQLの公式でサンプルとして提供されているworldデータベースを利用します。

環境の構築

利用するDockerイメージ

  • cloudfish/pyspark-notebook(公式のpysapark-notebookにJDBCを同梱したイメージ)
  • kakakakakku/mysql-57-world-database(mysqlはworldデータベースを取り込んだイメージ。Redash を詳しく学べる「Redash ハンズオン資料」を作ったのイメージを利用させていただいています。)
  • phpmyadmin/phpmyadmin(データ確認用として公式のphpmyadminのイメージ)

以下の通りdocker-compose.ymlを作成します。

version: '2'
services:
 pyspark:
    image: cloudfish/pyspark-notebook
    volumes:
       - LOCAL_PATH:/home/jovyan/work
    ports:
      - "8888:8888"
    command: bash -c "start-notebook.sh --NotebookApp.token=''"
    links:
      - dbserver
    environment:
      GRANT_SUDO: "yes"
  dbserver:
    image: kakakakakku/mysql-57-world-database
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
  phpmyadmi:
    image: phpmyadmin/phpmyadmin
    ports: 
      - "18080:80"
    links:
      - "dbserver"
    environment:
      PMA_HOST: dbserver
      PMA_USER: root 
      PMA_PASSWORD: ""

Docker起動

docker-compose up

Jupyter Notebook画面確認
http://localhost:8888

phpmyadmin画面確認
http://localhost:18080

PySparkの実行確認

早速サンプルデータベースで実行確認を進めていきます。worldデータベースは以下のようなテーブルが含まれています。
これらのテーブルを使ってデータを取得してみたいと思います。

Tables_in_world
city
country
countrylanguage

画面右端のNewボタンをクリックしPython3を選択し、開いた画面で以下を入力していきます。

以下のコードはコードセルごとに入力してください。入力後Shift + Enterでコードが実行されます。

Sparkの初期化処理

from pyspark.sql import SQLContext, Row
from pyspark import SparkContext

sc = SparkContext("local", "First App")

※2回実行するとエラーになります。

JDBC接続処理

JDBCに接続しています。

sqlContext = SQLContext(sc)
jdbc_url="jdbc:mysql://dbserver/mysql"
driver_class="com.mysql.jdbc.Driver"

DB_USER="root"
DB_PASSWORD=""

def load_dataframe(table):
  df=sqlContext.read.format("jdbc").options(
    url =jdbc_url,
    driver=driver_class,
   dbtable=table,
   user=DB_USER,
    password=DB_PASSWORD
  ).load()
  return df

データ取得処理

データをDataFrameに取得します
テーブル指定でデータ取得する場合(countryテーブル、cityテーブルを取得)

df_country = load_dataframe("world.country")
# 実行されるSQL:SELECT * FROM world.country WHERE 1=0
df_city = load_dataframe("world.city")
# 実行されるSQL:SELECT * FROM world.city WHERE 1=0

SQLを指定してデータ取得する場合(cityテーブルの国コードがJPNのものだけを取得)

df_city_japan = load_dataframe("(select * from world.city where CountryCode='JPN') city_japan")
# 実行されるSQL:SELECT * FROM (select * from world.city where CountryCode='JPN') city_japan WHERE 1=0

MySQL側でどのようなSQLが流れるのか見てみたところ、テーブルにセットした内容がFROM句の後にセットされるようです。

カラム指定

Nameカラムを表示

df_country.select("Name").show()

条件検索

国名がJapanのデータを抽出。

df_country.filter(df_country["Name"] == "Japan").show()
df_country.where(df_country["Name"] == "Japan").select("Code","GNP").show()

isNull

独立年がNullのデータを抽出

df_country.where(df_country["IndepYear"].isNull()).show()

like

国名がJで始まるデータを抽出

df_country.where(df_country["Name"].like("J%")).show()

Case When式

人口が100000人より大きい場合は「Big」、小さい場合は「Small」を表示

from pyspark.sql import functions as F
df_country.select(df_country["Name"], F.when(df_country["Population"] > 100000,"Big").otherwise("Small").alias("CountryDiv")).show()

substr

国名を3文字切り出して表示

df_country.select(df_country["Name"].substr(1,3)).show()

limit

5件のみ抽出

df_country.limit(5).show()

Join

countryテーブルとcityテーブルを結合し国名がJapanのものを抽出

df_join = df_country.alias('country').join(df_city.alias('city'),(df_city["countryCode"] == df_country["Code"]) & (df_country["Name"]=="Japan")).show()

OrderBy

GNPが高い国順に表示

df_country.orderBy("GNP" , ascending=False).select("Code","Name","GNP").show()

GroupBy

国ごとにグループ化し、cityの数、人口の平均と合計を集計

from pyspark.sql import functions as F

df_city.groupBy("countryCode") \
 .agg( \
    F.count(df_city["Name"]).alias("total_count"), \
    F.avg(df_city["Population"]).alias("avg_population"), \
    F.sum(df_city["Population"]).alias("sum_population") \
).show()

まとめ

書き方は少し慣れる必要がありますが、かなりSQLに近いイメージでデータ取得が可能なことが分かりました。
今回はDBに対して実行しましたが、ファイルに対しても同様に実行可能です。

元記事はこちら

PySparkでMySQLからのデータ取得&集計方法

河村 敏貴

河村 敏貴

クラウド業界に興味を持ち、業務系のアプリエンジニアからインフラエンジニア見習いに転身しました。日々新しい技術に触れることができる環境でもがきつつも楽しい毎日を過ごしています。 河村 敏貴

cloudpack

cloudpackは、Amazon EC2やAmazon S3をはじめとするAWSの各種プロダクトを利用する際の、導入・設計から運用保守を含んだフルマネージドのサービスを提供し、バックアップや24時間365日の監視/障害対応、技術的な問い合わせに対するサポートなどを行っております。
AWS上のインフラ構築およびAWSを活用したシステム開発など、案件のご相談はcloudpack.jpよりご連絡ください。