冥冥乃志

ソフトウェア開発会社でチームマネージャをしているエンジニアの雑記。アウトプットは少なめです。

follow us in feedly

AWS Glueのお試し:JSONを扱う時に注意事項があるっぽい

本当はもっと先のところまでやりたかったんですが、ちょっといくつか引っかかっているポイントがあって、一つわかったことがあるのでそこだけ先出しします。AWS GlueとAthenaでJSONを取り扱う時の注意事項の話。というか現象はわかったけど、どのアプローチが仕様的に正しいんだっけ?というのは未だわからず。

JSONファイルのクローラは作れたがジョブでデータを扱えない

前々回のエントリで、JSONファイルのテーブル定義を取得するクローラを作ることはできました。

で、これを普通にParquet形式に受け流すだけのジョブを作って実行して見たんですが、以下のようなエラー吐いてこけます。

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-19-251.ec2.internal, executor 1): com.fasterxml.jackson.core.JsonParseException: Unexpected character (',' (code 44)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')

カンマのパースでこけてますね。もともと適当にGithubを探していて見つけたJSONファイルで、クローラを作る時にもカンマ区切りのオブジェクトの羅列になるように手を入れています。このオブジェクトの区切り方がおかしいということらしいです。

というわけで、レコードのオブジェクトの区切りをなくす、具体的には }, から } に変えてみることでジョブを実行することができました。返還後のParquetファイルをAthenaで検索することもできています。Athenaに投げたDDLは以下の通り。

CREATE EXTERNAL TABLE IF NOT EXISTS flights.airport_target_test (
  iata string,
  lon string,
  iso string,
  status int,
  name string,
  continent string,
  type string,
  lat string,
  size string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://***/'

Athenaで元データを検索してみる

それはそうとAthenaで変換元のJSONファイルを検索しようとして見たら、

HIVE_CURSOR_ERROR: Row is not a valid JSON Object - JSONException: A JSONObject text must end with '}' at 2 [character 3 line 1]

というエラーになって検索できません。どうにもJSONオブジェクトは } で終われ、と言われているようですが、そうなってるんですよね。

で、色々試した結果、おそらくAthenaとGlueで使っているJSONパーサーが違うんじゃないかという挙動です。元データは、以下のように「人間にとって」可読性がある程度高くなるようにカラムごとの改行を入れたデータでした。これで、Glueのクローラとジョブは実行できます。ジョブで変換したParquetファイルを検索しても、データはきちんと認識している模様。

{
    "iata": "UTK",
    "lon": "169.86667",
    "iso": "MH",
    "status": 1,
    "name": "Utirik Airport",
    "continent": "OC",
    "type": "airport",
    "lat": "11.233333",
    "size": "small"
}
{
    "iata": "FIV",
    "iso": "US",
    "status": 1,
    "name": "Five Finger CG Heliport",
    "continent": "NA",
    "type": "heliport",
    "size": null
}
...

Athenaでも検索可能なJSONファイルは以下で、1行1レコードになっていないとエラーになるパーサのようです。

{"iata": "UTK", "lon": "169.86667", "iso": "MH", "status": 1, "name": "Utirik Airport", "continent": "OC", "type": "airport", "lat": "11.233333", "size": "small"}
{"iata": "FIV", "iso": "US", "status": 1, "name": "Five Finger CG Heliport", "continent": "NA", "type": "heliport", "size": null}
...

あー、面倒臭い。

まとめ

まずこういうケースでバルクデータっぽく使われる場合のJSONの仕様ってあったっけ?というのがドキュメント読んでもよくわからないのと、GlueとAthenaのパースの扱いが違うのとか割と問題に気づくまでが面倒臭いのとか考えると、特に強い理由がなければCSVでもいいんじゃないのかなあ?というのが正直な感想です。CSVにしたくなければそのままJDBCをデータソースやターゲットに使うという方法もありますし。バルクデータという観点で行けば、CSVの方がデータの構成上サイズは小さくなりそうなので。

今年入ってからの印象的な本

本を読んでなかったわけではないですが、読書メモの取りまとめはちょっとサボってましたね。棚卸しがてら、今月までで印象に残った本をいくつか紹介しようと思います。

今年読んだ本

抜けはかなりあるんですが、だいたいこんなところですね。ゼルダペルソナ5に時間泥棒されてた割には読めてる気がしますね。

印象的な本

フィクション/ノンフィクション取り混ぜて読んでおりますので、分けてご紹介したいと思います。

ノンフィクション系

今年に入ってから何故か言語学に関する本で印象に残ったものが多かったです。特に専門というわけではなかったですが、去年認知言語学の本を読んで、人が言葉を理解するということに純粋に興味が湧いていました。そんなタイミングで発売された本が両方とも非常にわかりやすく、より興味を刺激してくれるものだった、というのは非常に良い縁だったと思います。

ちいさい言語学者の冒険――子どもに学ぶことばの秘密 (岩波科学ライブラリー)

ちいさい言語学者の冒険――子どもに学ぶことばの秘密 (岩波科学ライブラリー)

働きたくないイタチと言葉がわかるロボット  人工知能から考える「人と言葉」

働きたくないイタチと言葉がわかるロボット 人工知能から考える「人と言葉」

両者とも言語学者の方が一般向けに書かれた本です。

前者は子供が第一言語として母国語を覚えるまでに起きていることにフォーカスを当てたものです。我が家には子供がいないので、これがどれほど「あるある」なのかわかりませんが、意味も文法もわからないところから人が言語を理解する過程への考察というのが非常に刺激的な本です。

後者は厳密にいうと言語学というよりも自然言語処理のAIの説明です。物語ベースで段階を追いながら現段階で自然言語処理がどういうモデルで理解されているか、ということが書かれています。幅広い話題を扱いながら、言葉のチョイスがあくまで一般的な範囲に収められていて、相当な労力がかかっているのがわかります。

たのしいプロパガンダ (イースト新書Q)

たのしいプロパガンダ (イースト新書Q)

少し毛色を変えてこんなものを。

まあ、今のようなポジションで仕事してると、発信するということに否応無しに向き合う必要があります。これがまた難しいわけですよ。基本的に自分だけの問題ではないことはわかってるんですが、せめてもう少し楽しいものを、と思ったらやっぱりプロパガンダを参考にすべきかな、と。エンタメに混ぜるのが基本で、となった時点で仕事に直接使うことはできませんでしたが、応用を考えるきっかけにはなりました。

フィクション

フィクションはいつも通りSFですよ。ちょっと読むのをためらっていたものに手を出してみて、案の定な読書体験をしたのが印象的でしたね。

ディアスポラ (ハヤカワ文庫 SF)

ディアスポラ (ハヤカワ文庫 SF)

初イーガンですよ。いやあ、本当にこれは辛かった。小説で3ヶ月くらいかかるとかこれが初めてかもしれないです。イーガン作品の中で一番読みやすいとか本当なんですかね。。。要求される教養レベルが違いすぎる。

ただ、その知的議論の果てに見せてくれる世界が、SFというかセンスオブワンダーをちゃんと感じさせてくれて、読後感は素晴らしいものでした。人間の好奇心が行き着く先をみたければこの作品はおすすめです。

幼年期の終わり (光文社古典新訳文庫)

幼年期の終わり (光文社古典新訳文庫)

ここ数年は海外SFもちゃんと読もうと思って手に取っていますが、もともと日本SFを中心に読んでいたというのもあって、ファーストコンタクトものの中でも古典に入るこの作品も読んでなかったわけなんです。しかも、クラークの作品ともあって結構ハードSFなのかなあ、と思ってたんですが、そうでもないというのが意外でした。「正解するカド」に求めていたのは、ディアスポラの行き着く先だったりこの作品のラストだったりしたのだなあ、と認識させてくれた作品。訳も読みやすかったです。

大四国戦線

大四国戦線

事態にふさわしいとは思えない卑近な登場人物の応酬や、大四国と銘打っておきながら高知と徳島は我関せずという看板の偽りっぷり、筒井康隆フォロアーとしか思えないラストに至るパイ投げエスカレーション、どれをとっても180kmのナックルという感じの怪作でした。こんなのまともなキャッチャーとれねーよ。こういうのがいるから面白い、というKDPの真髄を見た、という感じ。おすすめはしませんが、同じカルマを感じる人だったら紹介する、という本です。

ラノベ、普段読まないんです。理由は主に文体で、改行が総じて多すぎるというのが一番強い理由ですね。私は改行をカット割りのように捉えて読んでいるので、あんなに開業されると無駄なカットが多すぎてうるさい映画のように感じてしまいます。

それでもこれを読み始めたのは最新刊の帯に円城塔が推薦文を書いてたから。まあ、この人が帯書いてる時点で普通の作品ではないはずなので、その期待を込めて、ですね。

1巻ではまだ、異能バトルものみたいな感じですが、ちょいちょい怪しい仕掛けとかもあって、先が楽しみです。しばらく追いかけてみようと思っています。

この作品、好きなんですが、いろんな感情をむき出しで描いていて感想をいうのが難しいんですよね。一つの町が隔離されて消えるまでに起こる様々なことをオムニバスで描いています。その中で示される感情も決して一つじゃない、何か入り混じった複雑な感じで味わい深い読後感があります。ラストの含みが特に好きで、あれをシーンとして描き切らなかったのはいいですね。一応おまけ漫画と最終巻の表紙でどうなったかはわかるのですが、一応ハッピーエンドでよかったです。

AWS Glueのお試し:ジョブを動かしてみるのと、ついでに気づいたAthenaとの連携を確認

前回、全体像を追いかけてクローラを実行するだけで結構なボリューム行ってしまったので続きです。

mao-instantlife.hatenablog.com

今回は右から左に流すジョブを作ってみるのと、その過程でわかったことを何点かまとめておきたいと思います。

Webコンソールからジョブを作る

ソースもターゲットが単一で、単にマッピングの変更があるレベルであればウィザードから作成するだけでジョブが作れます。下図のように、ソースもターゲットもウィザード上はラジオボタンなので、ソースやターゲットが単一でないようなレベルのジョブを作りたい場合は、メインのソースやターゲットのみ決定してテンプレートとして作った後でソースを編集するなどの方法を取る必要があります。

f:id:mao_instantlife:20170824193802p:plain

ちなみに、ジョブの全般的な設定でTemporary directoryの入力が必須なんですが、デフォルトでは何も入力されていません。スクリプトの場所はデフォルトで入ってるし、S3のバケットみてもジョブの実行後に残ってるような内容でもないみたいなので、適当にスクリプトの配下に置くようなデフォルト値を入れておくとかしておいてくれてもよかったんじゃないのかなあ、と思いました。

また、ジョブの実行パラメータやリトライ回数などの各種設定もウィザードからできます。この辺りはまだ試せてないですが、とりあえずジョブパラメータってなんでキーとバリュー両方指定するんだっけ、みたいな感じになってます。

f:id:mao_instantlife:20170824193824p:plain

今回は、ターゲットのスキーマをその場で生成するようなジョブを作ったので、マッピングはソースからの引き渡しになります(ちなみにこれで指定したターゲットのスキーマは使い捨てでData Catalogueには自動で登録されません)。ターゲットをData Catalogue上から指定して名前と型レベルで合わせてくれるかどうかは今後の検証で確認しようと思います。天下のAWSのプロダクトなので、やってくれると信じていますが。

f:id:mao_instantlife:20170824193845p:plain

マッピングは、相手がCSVだったりすると出力順がその後の処理に影響したりすることを考慮して、ターゲットの順序を変更したりすることが可能です。項目を追加することも可能です。

ざっと生成されたソースを眺めてみる

で、そんなこんなでウィザードから雑にでもジョブを作ると、ソースエディタが表示され、以下のような一番シンプルな形のジョブのコードが生成されます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ソースから生成可能なダイアログもエディタの左側に表示されています。ちなみにこのダイアログのノードをクリックするとソースの該当箇所をハイライトしてくれます。カーソルの移動してくれればいいのに。ちなみに、ドキュメントレベルで確認したことですが、ダイアグラムの生成はソースの アノテーション(##と@の項目指定) を利用している模様。

また、ダイアログ上でソースかターゲットをクリックして、右下部の「Schema」タブを選ぶとスキーマを確認することもできます。

各transformの引数にあるのはDynamicFrameと呼ばれるジョブ内で扱うデータフレームです。各transformの戻り値はこの型になっているようです(まだ全てのAPIドキュメントを確認できているわけではない)。なので、基本的なジョブの流れは、

  1. ソースから対象データのDynamicFrameを作る
  2. 各種transformを使ってDynamicFrameを操作
  3. 操作したDynamicFrameの内容をターゲットに出力

という感じですね。シンプル。

何はともあれキックしてみる -> 失敗 -> 成功

何もしてないのに失敗した、とかいうつもりはないです。何もしてないつもりだったんですけど、何かされていて実行したらこけました。エラーログは以下のような感じ。

Syntax Error: File "/tmp/***/script_2017-08-21-20-45-42.py", line 5
<output> = Spigot.apply(frame = <frame>, path = "<path>")
^
SyntaxError: invalid syntax

適当にソースエディタ触っているうちに、メニューのSpigotのボタンを押してしまい、自動生成されたソースが入っていたようです。この辺削除して実行するとうまくいきました。クリックした時のレスポンスがあまりにも薄いので気づきませんでした。ちょっとこの辺は改善していただけると嬉しかったり。

ちなみに、メニュー上transformと別になっていますが、Spigotもbuilt-in transformの一種です。DynamicFrameをJsonでS3に出力する機能を持っているようです。printfデバッグ的に使うやつですかね?他のtransformとは別のボタンになっているのは、transformがデータの操作を担うのに対して、これだけ出力を伴うからなんでしょうが、Spigotという単語に慣れてないだけに理解に時間がかかったポイントでした。英語大事。

transformで追加されるソースを眺めてみる

先ほど追加されてしまったSpigotのソースを改めてみてみましょう。アノテーションまで含めて以下のような感じになってます。

## @type: Spigot
## @args: [path = "<path>"]
## @return: <output>
## @inputs: [frame = <frame>]
<output> = Spigot.apply(frame = <frame>, path = "<path>")

<> の中身を適当にアプリに合わせて埋めていけ、という感じですね。他の部分と合わせてみると frame はインプットになるDynamicFrameのようです。

実行結果

ちなみに、サンプルに使ったおよそ90万件のデータをParquet形式でS3に吐き出す、というジョブで実行時間は約9分でした。10万件/分くらいの計算ですね。Parquetファイルの様式があまりわからないので、そこまで勘案して遅いか早いかはちょっとよくわかっていません。

なお、サブディレクトリでパーティション分けたデータソースは、パーティションごとに異なるparquetファイルに吐かれた模様です。

GlueとAthenaとの連携を確認する

実行できたので、ちゃんとtransformできてるかな、と確認したくなるのは人情です。適当にテキストエディタで開いて見てもいいんですが、せっかくなので今まで使ってなかったAthenaで読み込んで確認しようかな、とおもむろに開くと。。。

f:id:mao_instantlife:20170824193412p:plain

AthenaにAWS Glue Data Catalogueとかありますね。ってか、データベースにGlueで作成したデータベースとテーブルの定義があります。ちゃっかりAthenaと統合していたようです。Athena側にはドキュメントがありました。Glueのドキュメントでは気づかなかったです。こちらでも章立てして置いていい内容じゃないですかね。

Integration with AWS Glue — User Guide

今のところ確認しているのは、

  • Glueで作成したデータカタログ(データベースとテーブル)をAthenaで使う
  • Athenaでテーブル作成時にGlueのクローラを作ることができる
  • AthenaでマニュアルでGlueと連携するデータベースにテーブル作ったらGlueにも反映される

といったあたりの動き。ETLだと連携の中間データとか確認しづらかったり、そもそもファイルだと検索性悪かったりとかあるので、この連携は地味に嬉しいです。もちろんselectもちゃんと投げられます。素晴らしい。Redashかませば割といろんな用途が事足りる気がします。

まとめ

定型化しやすいところ、連携がシームレスだと嬉しいところをちゃんと抑えた展開をしている感じがしますね。データ連携基盤から分析のためのダッシュボード作りまでGlueとAthenaでかなりの部分が賄えそうです。ますますGlueが好きになりました。こういうギョーミーで派手さのかけらもないプロダクト好きです。

次はジョブの実行パラメータなんかを少し触ってみようかと思います。

AWS Glueのお試し:ドキュメントのざっと読みとクローラを使ってみる

先日、このようなブログが発信されました。

aws.amazon.com

この辺りのデータ環境周りを作るためのサービスは、どんな状況であっても覚えておいて損はないですし、好きなレイヤーでもあるのでちょっと触ってみました。なお、公開されているのはまだ北米リージョンのみです。

ドキュメント読みながら、使いながらメモ書きしてたらある程度のボリュームになったので、一旦区切って出します。

参考リソース

なんにせよ公式リソースが一番ですので、迷ったらここに当たってみましょう。

docs.aws.amazon.com

サンプルや各種ライブラリのリポジトリもすでに作られています。今はまだ少ないですが、これから充実して行くことを期待しています。

github.com

github.com

AWS Glue is 何

AWS上のフルマネージドなETLです。ETLはextract, transform, and loadの略で、ちょっとした規模の企業だと必ずあるデータ連携基盤みたいなものを構築するためのソリューションです。自前で構築しているところもあるでしょうが、ソリューションを使っているところもあります。ちなみに私が仕事で使ったことがあるのは、DataStageとHULFTですね。Embulkあたりも同じ目的で使うことができると思います。Glue(糊、接着剤)とはよく言ったもの。

AWS Glueは雑な理解ですが、様々なシステム、サブシステムが構築されているAWS上(JDBCはおそらくその限りではないはず)のデータリソースの

を取りまとめて、データのやり取りをするジョブにおいて

  • 変換ルール
  • トリガー

を定め、データ連携基盤を作るためのソリューション。

ドキュメントのざっと読みまとめ

先にドキュメントをざっと読んで、コンポーネントやモデルを理解しましょう。大まかに以下のコンポーネントの役割を覚えればいいのではないかと思います。なお、現段階でのジョブやトリガーについては使った上での理解ではないことをご承知おきください。

  • AWS Glue Data Catalog
    • Database
      • Table
      • Connection
    • Crawler
      • Classifier(Custom)
    • Job
      • Trigger

AWS Glue Data Catalog

一つのAWSアカウントに一つ対応するメタデータリポジトリで、全てのテーブル定義、ジョブ定義、クローラ定義を含みます。

Database

スキーマ情報の格納単位と理解すればいいでしょう。ほぼタグとしての扱いなので、現実のデータベースの境界線に必ずしも一致する必要はありません。その配下にデータベースに紐づいたテーブルやコネクションがぶら下がります。

Table,Connection

Tableは各種データソースのスキーマ情報です。データソースに対応しているのはS3かJDBCコネクションのいずれか。DynamoDBはそのままでは対応していないようですが、まあこれはデータ同士をマッピングして連携をするためのものなので、スキーマレスはあまりお呼びじゃないですよ、という感じですね。

ConnectionはJDBC接続の接続情報。スキーマ情報のクローリングやジョブの実行に使います。

Crawler,Classifier

Crawlerはデータストア(S3,JDBC)を指定して定期的にスキーマ情報を取得しに行くための仕掛けです。ETLはすでに作ってある各種サービスや社内システムを繋いで行くものなので、こういうの地味に便利。クローリングスケジュールも設定できて、スキーマ変更もある程度自動で追随できます。スケジュール実行とオンデマンド実行を選べます。

Classifierはデータソースのファイルやバイナリ形式などに応じたスキーマ情報定義のための分類子です。AWS Glueに用意されているものはBuilt-in Classifierと呼ばれ、これらはデータストア読み込み時に自動で確認されます。

docs.aws.amazon.com

上記のBuilt-inではないカスタムなClassifierを作ることもでき、それらはクローラに実行を指定することができます。

Job,Trigger

Jobはデータ変換処理を記載したものです。裏でApache Sparkが動いているらしく、PySpark形式に変換するため、スクリプトPythonで書きます。データ連携でよく利用する基本的な変換ロジック自体はTransformと呼ばれるAPIが提供されていて、自前でゴリゴリ書くというよりはAPIの組み合わせでなんとかすることを最初に考えた方が良さそうです。作成したJobをダイアグラムとしても確認できるようです。ダイアグラムベースに開発できるわけではないようなので、開発の補助的な役割と実行時の状態可視化に使うためのものではないかと思います。

Triggerは実行タイミングの制御です。AWS Glueで対応しているのはcron形式のスケジューラ、他のジョブの完了検知、オンデマンド実行です。AWSの他のサービスの実行状態なんかをトリガーにする場合は、おそらく対応するイベントで発火するLambdaファンクションを書いて、その中からオンデマンド実行という流れがいいのではないかと思います。

お試しの準備

ETLはそのソリューションの特性上、インターネットにオープンにして使うものではありませんのでVPCの利用が前提になります。ドキュメントにも一通りこの辺りは整備されていますので、参照して環境を作っていきましょう。

docs.aws.amazon.com

なお、S3 to S3のようにサービス上のやりとりになる場合はなくても動くようです。JDBCでアクセスするリソースの場合も同様にVPCの設定が必要です。

クローラを作ってみる

いつまでもドキュメント読むのも疲れるので、少し触って理解しましょう。クローラを作ってS3に放り込んでいるファイルからスキーマ情報を抜き出してみます。S3に放り込んだcsvスキーマ定義とjsonスキーマ定義のためのクローラを作ります。

なお、クローラの作り方そのものは先のブログや公式の方が詳しいので、そちらを説明するよりも、こういう風に使ったらこういう挙動になる、という感じの部分を主に載せておきます。というわけでクローラが対象にするファイルが一つだけのケースについてはドキュメント通りにやれば特に迷うことはないので割愛します。

同一ファイルを年次などの条件でディレクトリ分けしているケースの取り込み

S3データストアを利用する場合、対象として指定されたS3ディレクトリをルートとして、以下の条件が整っていれば、サブディレクトリをパーティションキーとしてスキーマ定義を行います。

  • 同一形式のファイルであること
  • 配下のファイルのカラム構成が同じであること
  • 圧縮の形式が同じであること

例えば、年度のごとのディレクトリを以下のように配置して、バケットをデータストアとして指定した場合のクローリング結果をみてみましょう。

/bucket-name
  /2017
    /csv
      /file.csv
  /2016
    /csv
      /file.csv

クローリングのスキーマの内容は以下のようになります。

f:id:mao_instantlife:20170819092240p:plain

ファイルの項目以外にパーティション用の項目が追加されていますね。まだジョブを触っていないので、具体的にどう使うかはイメージついてないですが、おそらくこれを使ってジョブで読み込むデータをフィルタリングしたり、振り分けたりするのでしょう。

実際のパーティションの状況もみることができます。

f:id:mao_instantlife:20170819092256p:plain

当たり前ですが、この辺、ディレクトリ構成なども含めたデータ設計をやっておかないと実務上で綺麗に仕掛けることは難しいと思います。

複数データストア(ファイル形式が異なる)から読み込んでみる

ドキュメント読んでいて気づいたのですが、意外とクローラのリミットが小さいんですよね(今後大きくなるかどうかは不明)。アカウントごとに10個ですよ。

docs.aws.amazon.com

なので、データストア一つにつき一つのクローラを定義する、といった富豪的な使い方ができません。データのグループごとにまとめて定時実行という戦略を立てる必要があります。この辺のグルーピングの仕方とか境界の引き方というのも割とセンスが出そうですね。

というわけで他のシステムが持っている関連する別テーブル(ファイル)がjsonで出力されているというシナリオで、先と同じクローラで一緒に処理をするということもできます。以下のようにクローラに対して複数のデータセットは普通に指定することができます。

f:id:mao_instantlife:20170819092355p:plain

jsonファイル指定時の注意

何ですが、実行時に少しハマりました。今回追加したデータは、airport.jsonというGithubに公開されていたjsonファイルを使ってクローラを実行したのですが、結果のClassifier情報が UNKNOWN になってしまいました。スキーマ情報も空。csvの方は今までと同様にスキーマ情報を取得できているので、うまく取得できなかったようです。

というわけで立てた仮説は以下。

  1. クローラごとにClassifierは固定される
  2. jsonファイルがおかしい

1を確認するために、jsonファイルのクローラを単独で作って実行。同じようにUNKNOWNになったので否定されました。ドキュメントを見ても仮説を裏付けるような記載はなかったので、データストアごとにClassifier指定のロジックは働くと見ていいでしょう。

2は、jsonがおかしいというよりもjsonファイルのClassiferが使っているライブラリのjsonパースが仕様に沿っていないというべきでしょう。この可能性に気づいたのはファイルの中身を改めて見た時です。このファイル、トップレベルに配列がきているデータなんですね。で、配列を外してカンマ区切りのオブジェクトの羅列にしてクローリングしてみると以下のように取り込めます。

f:id:mao_instantlife:20170819092335p:plain

これ、会社のプロダクトでも同じような経験があって、おそらくjsonのパースにjacksonを使っているのではないかと思われます。jsonの仕様としてトップレベルに配列を持ってくることは禁じられていないはずなのですが、デフォルトではこれをうまくパースしてくれないからClassifierがUNKNOWNになってしまった、といったところでしょうか。この辺りはjsonでデータ出力するときに考慮しておくべき内容ではないかと思います。

今回のまとめ

クローラのお手軽さがいいですね。RDSはまだ試していませんが(サンプル用にスキーマ作るの面倒臭い、誰かいいサンプル持ってないですか?)、使うハードルが同じくらいであれば非常に楽だと思います。ETLはデータを他のデータにロードするためのものなので、input/outputはある程度形式化でき、ジョブで実際に動くビジネスロジックもある程度形式化してきます。そのため、この辺りのものについては使い勝手以外の面で各ソリューションで差別化しづらいのかな、と。AWS Glueについてはそれよりも、連携基盤を作るために必要なデータを集めてくるところの手軽さに注力している感じがしていいですね。Lambdaと組み合わせると柔軟なジョブ起動もできそうなので、他の機能を試すのが楽しみです。

これから試そうと思っているのは以下です。またメモが溜まってきたらエントリにまとめて放出します。

  • ジョブ、トリガー
  • 開発用エンドポイントの作成 Zeppelinとか使うらしいですが、触ったことがないので全くイメージが湧きませんでした。作り方はドキュメントにあります。 Using Development Endpoints for Developing Scripts - AWS Glue
  • コンソールだけで開発するのしんどいのでGithubに置ける形でプロジェクト構成できないか調べてみる これ、Serverless Frameworkみたいなの作る、とか変な方向に行きそうで藪蛇感満載です。

Serverless Frameworkプロジェクトと開発者アセットのCDを回す

前回の続きです。

mao-instantlife.hatenablog.com

mao-instantlife.hatenablog.com

GithubにコードがプッシュされたらServerlessプロジェクトをデプロイし、デプロイ後のAPIドキュメントやクライアントライブラリの生成を継続的にする環境を作ってみましょう。

閑話:外部パッケージ管理の修正

最初からかよ、とか言わない。前回、とあるLambdaファンクションを作る際に外部ライブラリを必要としました。とりあえずは .gitignore に追加してローカルで pip install することでお茶を濁していましたが、やはりかっこ悪いですし、CD環境作るには微妙なところでもあるのでプラグインを導入してなんとかします。

github.com

やはり誰かが考えてるものですよね、遠慮なく乗っからせていただきます。ちなみに、クラメソさんの以下の記事を参考にして使い始めたけど、少しバージョンアップして使い方が変わったっぽいです。最新はドキュメントをみましょう。

dev.classmethod.jp

ローカルでは普通にグローバルに pip install してパスを通して使うという場合はプラグインを入れて requirements.txt を使う感じです。特にプロダクトコードに記載を加える必要はありません。

グローバルに pip install せずに requirements.txt の設定を元にローカルでもキックするよ、という場合は外部ライブラリをzipにまとめて使う設定にしましょう。 serverless.yml に以下の設定を追加します。

custom:
  pythonRequirements:
    zip: true

その上で、対象の外部ライブラリを使うコードの前に

import unzip_requirements

を追加します。

ダウンロードしたライブラリや自動生成されるコードのなどがあるので、以下は.gitignoreに追加しておくのが良いと思います。

  • .requirements
  • .requirements.zip
  • unzip_requirements.py

閑話休題:CD環境の整理

前回の構想、こんな感じでした。

Github(merge)-> CodeBuild -> CodeDeploy -(API Gateway)-> クライアントライブラリの生成とSwagger UIの更新

今回、CodePipelineで全体を管理できないか調べてみました。

CodePipeline is 何?

リリースのための手順、フローのモデル化、自動化のためのAWSのサービスです。で、調べ始めたんですが、割とつらみがあるっぽいです。

qiita.com

CircleCIでいいんじゃない?

ですよねー。

というわけでCircleCIをおさらいしてみました。前にちょっと使ってた時は1系だったので少し進化してましたね。

  • ビルドやテスト実行はDockerが基本になってる
  • 複数のイメージの連携もCirclCIがよしなにしてくれる
  • ジョブ毎に個別のイメージが使える
  • AWSへのデプロイ設定も楽

circleci.com

カスタムコンテナを使う

で、今回のプロジェクトはServerless Framework + Python3なわけですので、ビルド環境のイメージは専用に作ってあげる必要があります。基本的なビルド環境についてはCircleCIが使う各種ツールがインストールされた状態でのオフィシャルイメージがあるので、nodeから派生させることにしましょう。

イメージは .config/images にDockerfileをおくか、リポジトリにプッシュしておいてもいいです。構成が決まればイメージは共有できて、必要なミドルウェアは別のイメージで配備するというてもあるので、Dockerhubとかにプッシュしておくのが楽だと思います。

というわけで出来上がったのがこちらのイメージ。Dockerhubにも公開していますので、必要な方はご自由にどうぞ。

github.com

Serverless Framework + Python3で使うPythonのバージョンが3.6で、apt-getでインストールできるPythonが3.4だったし、いくつかググって出てきたバージョン3.6があると思しきリポジトリは追加しても404だったりで、結局イメージないでコンパイル・インストールしています。このあたりはもっとスマートに行きたいところだったんですが。。。

今回はデプロイまでなのでJREをインストールしていませんが、テストを流すとなるとDynamoDB Localを使うことになるのでJREのインストールが必要になります。これもこれで面倒臭いというか、LocalStackとかの利用で別のイメージ前提にした方がいいのではないかと考え中です。

github.com

qiita.com

閑話:serverless-step-functionsプラグインの制約にハマる

で、意気揚々とイメージ作ってデプロイしてみたら sls invoke stepf が動かないわけですよ。実はローカルで実行してみても同様だったと。StateMachineが見つからないとか言われます、そんなバカな。で、色々とごにょごにょしていて、StateMachineの設定からname属性を外し、命名をキャメルケースにしたらできました。実際にデプロイされた時の名前はこんな感じです。

MyAssistantGenerateClientStepFunctionsStateMachine-************

末尾はなんかハッシュっぽい、念のためマスクしています。この末尾の部分はデプロイするステージを変更したら変わりました。この辺普通に文字列でつけたりできないか確認したいところ。まあ、Step Functionsへのキックイベントも serverless.yml で書けるので実際問題はそこまできにする必要がないのかもしれませんが。

で、前回までの命名とname属性で登録された名前がこちら。

MyDashassistantDashgenerateDashclientDashdev-************

なんかname属性がある場合とか、変換とか文字数とかいくつか原因はありそうな気がします。とりあえず現状はそんなに細かい命名をしたいわけではないので、上の対応策でなんとかしました。

閑話休題:継続的デプロイの設定

というわけで今回の内容を反映した .circleci/config.yml がこちらです。走り過ぎても面倒臭いので現状はmasterへのプッシュ、マージの時のみ発動、CodeGeneratorもまだ常時キックする感じでもないためStepFunctionsの設定をパスするように変更しています。

version: 2
jobs:
  build:
    docker:
      - image: shinsukeabe/circleci-serverless-python3:latest
    steps:
      - checkout
      - run:
          name: install project dependencies plugins
          command: sudo npm install serverless-dynamodb-local
      - run:
          name: deploy serverless functions
          command: sls deploy -v
      - run:
          name: distribute developers assets
          command: sls invoke stepf --name myAssistantGenerateClient
general:
  branches:
    only:
      - master

github.com

まとめ

というわけで、CircleCIで自動デプロイまで仕掛けました。今回テストは仕掛けてませんが、ここまでできて入れば後は細かいつらみとの戦いになります。ここからしばらくはServerlessとLambdaの使い方を深めていく方向に試していこうかと思います。KMSとか使ってないですし、Lambdaに合ったデータの永続化のあり方とか。キリが良くなったら続けます。