![]() |
|||||||||||||||||||
Apache Spark 2.2のアプリケーション(Spark SQL編)Apache Sparkの環境を構築したらSpark SQLも使ってみよう。Spark SQLは、使い慣れたSQLやSQLライクなメソッド(Untyped Dataset Operations)でデータをSELECT、JOIN等することができる。SparkのアプリケーションはScalaの他、JavaやPythonでも書くことができるが、ここではScalaを用いる。Apache Sparkの環境構築方法については以下を参照していただきたい。 SparkSessionSpark SQLを使うには、手始めにSparkSessionを生成する。以下のコードではspark変数に生成したSparkSessionを格納している。このspark変数(SparkSession)を使ってCSVからデータを読み込むためのDataFrameReaderを生成する。
DataFrameReaderのcsvメソッドを使えば容易にCSVからデータを読み込むことができる。上記の例ではheaderオプションをtrueにして、CSVの先頭行を列名として使用する。このアプリケーションで読み込む"employee.csv"は以下のようなものである。 employeeid,name,code 0001,J.M. ケインズ,826 0002,ミルトン フリードマン,840 0003,アダム スミス,826 0004,J.S. ミル,826 0005,ポール サミュエルソン,840 0006,サイモン クズネッツ,840 0007,ポール クルーグマン,840 0008,N.グレゴリー マンキュー,840 0009,宇沢 弘文,392 このアプリケーションを実行すると、上記のCSVのデータからemployeeid列とname列がSELECTされ、コンソールに以下のような結果が得られる。 +----------+-------------+ |employeeid| name| +----------+-------------+ | 0001| J.M. ケインズ| | 0002| ミルトン フリードマン| | 0003| アダム スミス| | 0004| J.S. ミル| | 0005| ポール サミュエルソン| | 0006| サイモン クズネッツ| | 0007| ポール クルーグマン| | 0008|N.グレゴリー マンキュー| | 0009| 宇沢 弘文| +----------+-------------+ 結果をファイルに出力Sparkでデータを処理した結果はもちろんファイルに出力することもできる。DataFrameのwriteメソッドでDataFrameWriterを取得しcsvメソッドで結果をCSVに出力することができる。
普通にcsvメソッドで結果を出力すると、既にファイルが存在するとエラーになってしまうので、上記ではmode(SaveMode.Overwrite)でオプションを指定して上書き可能にしている。 もちろんSQLも使える使い慣れたSQLでデータを処理することもできる。ただし、DataFrameをSQLで処理する場合、ひと手間加える必要がある。予め、SQLで使用するテーブル名をcreateOrReplaceTempViewメソッドで登録しておかなければならない。以下では、df(DataFrame)に読み込んだCSVのデータを"employee"テーブルとして登録している。
上記の例では、Untyped Dataset Operationsのselectメソッド同様に、SQLのSELECT文でemployeeid列とname列を選択している。SQLであるから、例えばソートをするのも容易だ。上記のSQLにORDER BYを付け加えてemployeeid列を降順でソートしてみる。
上記のコードを実行した結果は以下である。 +----------+-------------+ |employeeid| name| +----------+-------------+ | 0009| 宇沢 弘文| | 0008|N.グレゴリー マンキュー| | 0007| ポール クルーグマン| | 0006| サイモン クズネッツ| | 0005| ポール サミュエルソン| | 0004| J.S. ミル| | 0003| アダム スミス| | 0002| ミルトン フリードマン| | 0001| J.M. ケインズ| +----------+-------------+ 2つのデータをJOINしてみるSQLが使えるのだから、当然2つのデータをJOINするなんてこともできる。ここで新たに"country.csv"を用意して読み込み、これと先ほどの"employee.csv"をJOINしてみる。"country.csv"は"employee.csv"にあった国コード(code)と国名(country)の対応表となっている。 code,country 840,米国 826,英国 392,日本 アプリケーションは2つのCSVを読み込み、それぞれテーブル名を登録して、2つのテーブルのJOINをSQLで行う。result(DataFrame)に対してはshowメソッドとwriteメソッドを実行するので、sqlで得られるresultはcacheメソッドで常時メモリ上に載せておく(とはいえ、今回のような少量のデータでは意味はないが)。
実行結果は以下になる。 +----------+-------------+-------+ |employeeid| name|country| +----------+-------------+-------+ | 0001| J.M. ケインズ| 英国| | 0002| ミルトン フリードマン| 米国| | 0003| アダム スミス| 英国| | 0004| J.S. ミル| 英国| | 0005| ポール サミュエルソン| 米国| | 0006| サイモン クズネッツ| 米国| | 0007| ポール クルーグマン| 米国| | 0008|N.グレゴリー マンキュー| 米国| | 0009| 宇沢 弘文| 日本| +----------+-------------+-------+ SQLでなく、Untyped Dataset Operationsで同様の処理を行うなら以下のようになる。
joinメソッドは以下のように書くこともできるので、JOINするキーの列名が異なる場合でも問題ない。
更に、データをAmazon S3に置きたくなったなら、こちらもご参照いただきたい。 (2018/01/05)
Copyright© 2004-2022 モバイル開発系(K) All rights reserved.
[Home]
|