冥冥乃志

ソフトウェア開発会社でチームマネージャをしているエンジニアの雑記。アウトプットは少なめです。

follow us in feedly

AWS Glueのお試し:ジョブを動かしてみるのと、ついでに気づいたAthenaとの連携を確認

前回、全体像を追いかけてクローラを実行するだけで結構なボリューム行ってしまったので続きです。

mao-instantlife.hatenablog.com

今回は右から左に流すジョブを作ってみるのと、その過程でわかったことを何点かまとめておきたいと思います。

Webコンソールからジョブを作る

ソースもターゲットが単一で、単にマッピングの変更があるレベルであればウィザードから作成するだけでジョブが作れます。下図のように、ソースもターゲットもウィザード上はラジオボタンなので、ソースやターゲットが単一でないようなレベルのジョブを作りたい場合は、メインのソースやターゲットのみ決定してテンプレートとして作った後でソースを編集するなどの方法を取る必要があります。

f:id:mao_instantlife:20170824193802p:plain

ちなみに、ジョブの全般的な設定でTemporary directoryの入力が必須なんですが、デフォルトでは何も入力されていません。スクリプトの場所はデフォルトで入ってるし、S3のバケットみてもジョブの実行後に残ってるような内容でもないみたいなので、適当にスクリプトの配下に置くようなデフォルト値を入れておくとかしておいてくれてもよかったんじゃないのかなあ、と思いました。

また、ジョブの実行パラメータやリトライ回数などの各種設定もウィザードからできます。この辺りはまだ試せてないですが、とりあえずジョブパラメータってなんでキーとバリュー両方指定するんだっけ、みたいな感じになってます。

f:id:mao_instantlife:20170824193824p:plain

今回は、ターゲットのスキーマをその場で生成するようなジョブを作ったので、マッピングはソースからの引き渡しになります(ちなみにこれで指定したターゲットのスキーマは使い捨てでData Catalogueには自動で登録されません)。ターゲットをData Catalogue上から指定して名前と型レベルで合わせてくれるかどうかは今後の検証で確認しようと思います。天下のAWSのプロダクトなので、やってくれると信じていますが。

f:id:mao_instantlife:20170824193845p:plain

マッピングは、相手がCSVだったりすると出力順がその後の処理に影響したりすることを考慮して、ターゲットの順序を変更したりすることが可能です。項目を追加することも可能です。

ざっと生成されたソースを眺めてみる

で、そんなこんなでウィザードから雑にでもジョブを作ると、ソースエディタが表示され、以下のような一番シンプルな形のジョブのコードが生成されます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ソースから生成可能なダイアログもエディタの左側に表示されています。ちなみにこのダイアログのノードをクリックするとソースの該当箇所をハイライトしてくれます。カーソルの移動してくれればいいのに。ちなみに、ドキュメントレベルで確認したことですが、ダイアグラムの生成はソースの アノテーション(##と@の項目指定) を利用している模様。

また、ダイアログ上でソースかターゲットをクリックして、右下部の「Schema」タブを選ぶとスキーマを確認することもできます。

各transformの引数にあるのはDynamicFrameと呼ばれるジョブ内で扱うデータフレームです。各transformの戻り値はこの型になっているようです(まだ全てのAPIドキュメントを確認できているわけではない)。なので、基本的なジョブの流れは、

  1. ソースから対象データのDynamicFrameを作る
  2. 各種transformを使ってDynamicFrameを操作
  3. 操作したDynamicFrameの内容をターゲットに出力

という感じですね。シンプル。

何はともあれキックしてみる -> 失敗 -> 成功

何もしてないのに失敗した、とかいうつもりはないです。何もしてないつもりだったんですけど、何かされていて実行したらこけました。エラーログは以下のような感じ。

Syntax Error: File "/tmp/***/script_2017-08-21-20-45-42.py", line 5
<output> = Spigot.apply(frame = <frame>, path = "<path>")
^
SyntaxError: invalid syntax

適当にソースエディタ触っているうちに、メニューのSpigotのボタンを押してしまい、自動生成されたソースが入っていたようです。この辺削除して実行するとうまくいきました。クリックした時のレスポンスがあまりにも薄いので気づきませんでした。ちょっとこの辺は改善していただけると嬉しかったり。

ちなみに、メニュー上transformと別になっていますが、Spigotもbuilt-in transformの一種です。DynamicFrameをJsonでS3に出力する機能を持っているようです。printfデバッグ的に使うやつですかね?他のtransformとは別のボタンになっているのは、transformがデータの操作を担うのに対して、これだけ出力を伴うからなんでしょうが、Spigotという単語に慣れてないだけに理解に時間がかかったポイントでした。英語大事。

transformで追加されるソースを眺めてみる

先ほど追加されてしまったSpigotのソースを改めてみてみましょう。アノテーションまで含めて以下のような感じになってます。

## @type: Spigot
## @args: [path = "<path>"]
## @return: <output>
## @inputs: [frame = <frame>]
<output> = Spigot.apply(frame = <frame>, path = "<path>")

<> の中身を適当にアプリに合わせて埋めていけ、という感じですね。他の部分と合わせてみると frame はインプットになるDynamicFrameのようです。

実行結果

ちなみに、サンプルに使ったおよそ90万件のデータをParquet形式でS3に吐き出す、というジョブで実行時間は約9分でした。10万件/分くらいの計算ですね。Parquetファイルの様式があまりわからないので、そこまで勘案して遅いか早いかはちょっとよくわかっていません。

なお、サブディレクトリでパーティション分けたデータソースは、パーティションごとに異なるparquetファイルに吐かれた模様です。

GlueとAthenaとの連携を確認する

実行できたので、ちゃんとtransformできてるかな、と確認したくなるのは人情です。適当にテキストエディタで開いて見てもいいんですが、せっかくなので今まで使ってなかったAthenaで読み込んで確認しようかな、とおもむろに開くと。。。

f:id:mao_instantlife:20170824193412p:plain

AthenaにAWS Glue Data Catalogueとかありますね。ってか、データベースにGlueで作成したデータベースとテーブルの定義があります。ちゃっかりAthenaと統合していたようです。Athena側にはドキュメントがありました。Glueのドキュメントでは気づかなかったです。こちらでも章立てして置いていい内容じゃないですかね。

Integration with AWS Glue — User Guide

今のところ確認しているのは、

  • Glueで作成したデータカタログ(データベースとテーブル)をAthenaで使う
  • Athenaでテーブル作成時にGlueのクローラを作ることができる
  • AthenaでマニュアルでGlueと連携するデータベースにテーブル作ったらGlueにも反映される

といったあたりの動き。ETLだと連携の中間データとか確認しづらかったり、そもそもファイルだと検索性悪かったりとかあるので、この連携は地味に嬉しいです。もちろんselectもちゃんと投げられます。素晴らしい。Redashかませば割といろんな用途が事足りる気がします。

まとめ

定型化しやすいところ、連携がシームレスだと嬉しいところをちゃんと抑えた展開をしている感じがしますね。データ連携基盤から分析のためのダッシュボード作りまでGlueとAthenaでかなりの部分が賄えそうです。ますますGlueが好きになりました。こういうギョーミーで派手さのかけらもないプロダクト好きです。

次はジョブの実行パラメータなんかを少し触ってみようかと思います。