冥冥乃志

ソフトウェア開発会社でチームマネージャをしているエンジニアの雑記。アウトプットは少なめです。

follow us in feedly

今年入ってからの印象的な本

本を読んでなかったわけではないですが、読書メモの取りまとめはちょっとサボってましたね。棚卸しがてら、今月までで印象に残った本をいくつか紹介しようと思います。

今年読んだ本

抜けはかなりあるんですが、だいたいこんなところですね。ゼルダペルソナ5に時間泥棒されてた割には読めてる気がしますね。

印象的な本

フィクション/ノンフィクション取り混ぜて読んでおりますので、分けてご紹介したいと思います。

ノンフィクション系

今年に入ってから何故か言語学に関する本で印象に残ったものが多かったです。特に専門というわけではなかったですが、去年認知言語学の本を読んで、人が言葉を理解するということに純粋に興味が湧いていました。そんなタイミングで発売された本が両方とも非常にわかりやすく、より興味を刺激してくれるものだった、というのは非常に良い縁だったと思います。

ちいさい言語学者の冒険――子どもに学ぶことばの秘密 (岩波科学ライブラリー)

ちいさい言語学者の冒険――子どもに学ぶことばの秘密 (岩波科学ライブラリー)

働きたくないイタチと言葉がわかるロボット  人工知能から考える「人と言葉」

働きたくないイタチと言葉がわかるロボット 人工知能から考える「人と言葉」

両者とも言語学者の方が一般向けに書かれた本です。

前者は子供が第一言語として母国語を覚えるまでに起きていることにフォーカスを当てたものです。我が家には子供がいないので、これがどれほど「あるある」なのかわかりませんが、意味も文法もわからないところから人が言語を理解する過程への考察というのが非常に刺激的な本です。

後者は厳密にいうと言語学というよりも自然言語処理のAIの説明です。物語ベースで段階を追いながら現段階で自然言語処理がどういうモデルで理解されているか、ということが書かれています。幅広い話題を扱いながら、言葉のチョイスがあくまで一般的な範囲に収められていて、相当な労力がかかっているのがわかります。

たのしいプロパガンダ (イースト新書Q)

たのしいプロパガンダ (イースト新書Q)

少し毛色を変えてこんなものを。

まあ、今のようなポジションで仕事してると、発信するということに否応無しに向き合う必要があります。これがまた難しいわけですよ。基本的に自分だけの問題ではないことはわかってるんですが、せめてもう少し楽しいものを、と思ったらやっぱりプロパガンダを参考にすべきかな、と。エンタメに混ぜるのが基本で、となった時点で仕事に直接使うことはできませんでしたが、応用を考えるきっかけにはなりました。

フィクション

フィクションはいつも通りSFですよ。ちょっと読むのをためらっていたものに手を出してみて、案の定な読書体験をしたのが印象的でしたね。

ディアスポラ (ハヤカワ文庫 SF)

ディアスポラ (ハヤカワ文庫 SF)

初イーガンですよ。いやあ、本当にこれは辛かった。小説で3ヶ月くらいかかるとかこれが初めてかもしれないです。イーガン作品の中で一番読みやすいとか本当なんですかね。。。要求される教養レベルが違いすぎる。

ただ、その知的議論の果てに見せてくれる世界が、SFというかセンスオブワンダーをちゃんと感じさせてくれて、読後感は素晴らしいものでした。人間の好奇心が行き着く先をみたければこの作品はおすすめです。

幼年期の終わり (光文社古典新訳文庫)

幼年期の終わり (光文社古典新訳文庫)

ここ数年は海外SFもちゃんと読もうと思って手に取っていますが、もともと日本SFを中心に読んでいたというのもあって、ファーストコンタクトものの中でも古典に入るこの作品も読んでなかったわけなんです。しかも、クラークの作品ともあって結構ハードSFなのかなあ、と思ってたんですが、そうでもないというのが意外でした。「正解するカド」に求めていたのは、ディアスポラの行き着く先だったりこの作品のラストだったりしたのだなあ、と認識させてくれた作品。訳も読みやすかったです。

大四国戦線

大四国戦線

事態にふさわしいとは思えない卑近な登場人物の応酬や、大四国と銘打っておきながら高知と徳島は我関せずという看板の偽りっぷり、筒井康隆フォロアーとしか思えないラストに至るパイ投げエスカレーション、どれをとっても180kmのナックルという感じの怪作でした。こんなのまともなキャッチャーとれねーよ。こういうのがいるから面白い、というKDPの真髄を見た、という感じ。おすすめはしませんが、同じカルマを感じる人だったら紹介する、という本です。

ラノベ、普段読まないんです。理由は主に文体で、改行が総じて多すぎるというのが一番強い理由ですね。私は改行をカット割りのように捉えて読んでいるので、あんなに開業されると無駄なカットが多すぎてうるさい映画のように感じてしまいます。

それでもこれを読み始めたのは最新刊の帯に円城塔が推薦文を書いてたから。まあ、この人が帯書いてる時点で普通の作品ではないはずなので、その期待を込めて、ですね。

1巻ではまだ、異能バトルものみたいな感じですが、ちょいちょい怪しい仕掛けとかもあって、先が楽しみです。しばらく追いかけてみようと思っています。

この作品、好きなんですが、いろんな感情をむき出しで描いていて感想をいうのが難しいんですよね。一つの町が隔離されて消えるまでに起こる様々なことをオムニバスで描いています。その中で示される感情も決して一つじゃない、何か入り混じった複雑な感じで味わい深い読後感があります。ラストの含みが特に好きで、あれをシーンとして描き切らなかったのはいいですね。一応おまけ漫画と最終巻の表紙でどうなったかはわかるのですが、一応ハッピーエンドでよかったです。

AWS Glueのお試し:ジョブを動かしてみるのと、ついでに気づいたAthenaとの連携を確認

前回、全体像を追いかけてクローラを実行するだけで結構なボリューム行ってしまったので続きです。

mao-instantlife.hatenablog.com

今回は右から左に流すジョブを作ってみるのと、その過程でわかったことを何点かまとめておきたいと思います。

Webコンソールからジョブを作る

ソースもターゲットが単一で、単にマッピングの変更があるレベルであればウィザードから作成するだけでジョブが作れます。下図のように、ソースもターゲットもウィザード上はラジオボタンなので、ソースやターゲットが単一でないようなレベルのジョブを作りたい場合は、メインのソースやターゲットのみ決定してテンプレートとして作った後でソースを編集するなどの方法を取る必要があります。

f:id:mao_instantlife:20170824193802p:plain

ちなみに、ジョブの全般的な設定でTemporary directoryの入力が必須なんですが、デフォルトでは何も入力されていません。スクリプトの場所はデフォルトで入ってるし、S3のバケットみてもジョブの実行後に残ってるような内容でもないみたいなので、適当にスクリプトの配下に置くようなデフォルト値を入れておくとかしておいてくれてもよかったんじゃないのかなあ、と思いました。

また、ジョブの実行パラメータやリトライ回数などの各種設定もウィザードからできます。この辺りはまだ試せてないですが、とりあえずジョブパラメータってなんでキーとバリュー両方指定するんだっけ、みたいな感じになってます。

f:id:mao_instantlife:20170824193824p:plain

今回は、ターゲットのスキーマをその場で生成するようなジョブを作ったので、マッピングはソースからの引き渡しになります(ちなみにこれで指定したターゲットのスキーマは使い捨てでData Catalogueには自動で登録されません)。ターゲットをData Catalogue上から指定して名前と型レベルで合わせてくれるかどうかは今後の検証で確認しようと思います。天下のAWSのプロダクトなので、やってくれると信じていますが。

f:id:mao_instantlife:20170824193845p:plain

マッピングは、相手がCSVだったりすると出力順がその後の処理に影響したりすることを考慮して、ターゲットの順序を変更したりすることが可能です。項目を追加することも可能です。

ざっと生成されたソースを眺めてみる

で、そんなこんなでウィザードから雑にでもジョブを作ると、ソースエディタが表示され、以下のような一番シンプルな形のジョブのコードが生成されます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "flights", table_name = "flightsaws_glue_application_test", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-application-test-target"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ソースから生成可能なダイアログもエディタの左側に表示されています。ちなみにこのダイアログのノードをクリックするとソースの該当箇所をハイライトしてくれます。カーソルの移動してくれればいいのに。ちなみに、ドキュメントレベルで確認したことですが、ダイアグラムの生成はソースの アノテーション(##と@の項目指定) を利用している模様。

また、ダイアログ上でソースかターゲットをクリックして、右下部の「Schema」タブを選ぶとスキーマを確認することもできます。

各transformの引数にあるのはDynamicFrameと呼ばれるジョブ内で扱うデータフレームです。各transformの戻り値はこの型になっているようです(まだ全てのAPIドキュメントを確認できているわけではない)。なので、基本的なジョブの流れは、

  1. ソースから対象データのDynamicFrameを作る
  2. 各種transformを使ってDynamicFrameを操作
  3. 操作したDynamicFrameの内容をターゲットに出力

という感じですね。シンプル。

何はともあれキックしてみる -> 失敗 -> 成功

何もしてないのに失敗した、とかいうつもりはないです。何もしてないつもりだったんですけど、何かされていて実行したらこけました。エラーログは以下のような感じ。

Syntax Error: File "/tmp/***/script_2017-08-21-20-45-42.py", line 5
<output> = Spigot.apply(frame = <frame>, path = "<path>")
^
SyntaxError: invalid syntax

適当にソースエディタ触っているうちに、メニューのSpigotのボタンを押してしまい、自動生成されたソースが入っていたようです。この辺削除して実行するとうまくいきました。クリックした時のレスポンスがあまりにも薄いので気づきませんでした。ちょっとこの辺は改善していただけると嬉しかったり。

ちなみに、メニュー上transformと別になっていますが、Spigotもbuilt-in transformの一種です。DynamicFrameをJsonでS3に出力する機能を持っているようです。printfデバッグ的に使うやつですかね?他のtransformとは別のボタンになっているのは、transformがデータの操作を担うのに対して、これだけ出力を伴うからなんでしょうが、Spigotという単語に慣れてないだけに理解に時間がかかったポイントでした。英語大事。

transformで追加されるソースを眺めてみる

先ほど追加されてしまったSpigotのソースを改めてみてみましょう。アノテーションまで含めて以下のような感じになってます。

## @type: Spigot
## @args: [path = "<path>"]
## @return: <output>
## @inputs: [frame = <frame>]
<output> = Spigot.apply(frame = <frame>, path = "<path>")

<> の中身を適当にアプリに合わせて埋めていけ、という感じですね。他の部分と合わせてみると frame はインプットになるDynamicFrameのようです。

実行結果

ちなみに、サンプルに使ったおよそ90万件のデータをParquet形式でS3に吐き出す、というジョブで実行時間は約9分でした。10万件/分くらいの計算ですね。Parquetファイルの様式があまりわからないので、そこまで勘案して遅いか早いかはちょっとよくわかっていません。

なお、サブディレクトリでパーティション分けたデータソースは、パーティションごとに異なるparquetファイルに吐かれた模様です。

GlueとAthenaとの連携を確認する

実行できたので、ちゃんとtransformできてるかな、と確認したくなるのは人情です。適当にテキストエディタで開いて見てもいいんですが、せっかくなので今まで使ってなかったAthenaで読み込んで確認しようかな、とおもむろに開くと。。。

f:id:mao_instantlife:20170824193412p:plain

AthenaにAWS Glue Data Catalogueとかありますね。ってか、データベースにGlueで作成したデータベースとテーブルの定義があります。ちゃっかりAthenaと統合していたようです。Athena側にはドキュメントがありました。Glueのドキュメントでは気づかなかったです。こちらでも章立てして置いていい内容じゃないですかね。

Integration with AWS Glue — User Guide

今のところ確認しているのは、

  • Glueで作成したデータカタログ(データベースとテーブル)をAthenaで使う
  • Athenaでテーブル作成時にGlueのクローラを作ることができる
  • AthenaでマニュアルでGlueと連携するデータベースにテーブル作ったらGlueにも反映される

といったあたりの動き。ETLだと連携の中間データとか確認しづらかったり、そもそもファイルだと検索性悪かったりとかあるので、この連携は地味に嬉しいです。もちろんselectもちゃんと投げられます。素晴らしい。Redashかませば割といろんな用途が事足りる気がします。

まとめ

定型化しやすいところ、連携がシームレスだと嬉しいところをちゃんと抑えた展開をしている感じがしますね。データ連携基盤から分析のためのダッシュボード作りまでGlueとAthenaでかなりの部分が賄えそうです。ますますGlueが好きになりました。こういうギョーミーで派手さのかけらもないプロダクト好きです。

次はジョブの実行パラメータなんかを少し触ってみようかと思います。

AWS Glueのお試し:ドキュメントのざっと読みとクローラを使ってみる

先日、このようなブログが発信されました。

aws.amazon.com

この辺りのデータ環境周りを作るためのサービスは、どんな状況であっても覚えておいて損はないですし、好きなレイヤーでもあるのでちょっと触ってみました。なお、公開されているのはまだ北米リージョンのみです。

ドキュメント読みながら、使いながらメモ書きしてたらある程度のボリュームになったので、一旦区切って出します。

参考リソース

なんにせよ公式リソースが一番ですので、迷ったらここに当たってみましょう。

docs.aws.amazon.com

サンプルや各種ライブラリのリポジトリもすでに作られています。今はまだ少ないですが、これから充実して行くことを期待しています。

github.com

github.com

AWS Glue is 何

AWS上のフルマネージドなETLです。ETLはextract, transform, and loadの略で、ちょっとした規模の企業だと必ずあるデータ連携基盤みたいなものを構築するためのソリューションです。自前で構築しているところもあるでしょうが、ソリューションを使っているところもあります。ちなみに私が仕事で使ったことがあるのは、DataStageとHULFTですね。Embulkあたりも同じ目的で使うことができると思います。Glue(糊、接着剤)とはよく言ったもの。

AWS Glueは雑な理解ですが、様々なシステム、サブシステムが構築されているAWS上(JDBCはおそらくその限りではないはず)のデータリソースの

を取りまとめて、データのやり取りをするジョブにおいて

  • 変換ルール
  • トリガー

を定め、データ連携基盤を作るためのソリューション。

ドキュメントのざっと読みまとめ

先にドキュメントをざっと読んで、コンポーネントやモデルを理解しましょう。大まかに以下のコンポーネントの役割を覚えればいいのではないかと思います。なお、現段階でのジョブやトリガーについては使った上での理解ではないことをご承知おきください。

  • AWS Glue Data Catalog
    • Database
      • Table
      • Connection
    • Crawler
      • Classifier(Custom)
    • Job
      • Trigger

AWS Glue Data Catalog

一つのAWSアカウントに一つ対応するメタデータリポジトリで、全てのテーブル定義、ジョブ定義、クローラ定義を含みます。

Database

スキーマ情報の格納単位と理解すればいいでしょう。ほぼタグとしての扱いなので、現実のデータベースの境界線に必ずしも一致する必要はありません。その配下にデータベースに紐づいたテーブルやコネクションがぶら下がります。

Table,Connection

Tableは各種データソースのスキーマ情報です。データソースに対応しているのはS3かJDBCコネクションのいずれか。DynamoDBはそのままでは対応していないようですが、まあこれはデータ同士をマッピングして連携をするためのものなので、スキーマレスはあまりお呼びじゃないですよ、という感じですね。

ConnectionはJDBC接続の接続情報。スキーマ情報のクローリングやジョブの実行に使います。

Crawler,Classifier

Crawlerはデータストア(S3,JDBC)を指定して定期的にスキーマ情報を取得しに行くための仕掛けです。ETLはすでに作ってある各種サービスや社内システムを繋いで行くものなので、こういうの地味に便利。クローリングスケジュールも設定できて、スキーマ変更もある程度自動で追随できます。スケジュール実行とオンデマンド実行を選べます。

Classifierはデータソースのファイルやバイナリ形式などに応じたスキーマ情報定義のための分類子です。AWS Glueに用意されているものはBuilt-in Classifierと呼ばれ、これらはデータストア読み込み時に自動で確認されます。

docs.aws.amazon.com

上記のBuilt-inではないカスタムなClassifierを作ることもでき、それらはクローラに実行を指定することができます。

Job,Trigger

Jobはデータ変換処理を記載したものです。裏でApache Sparkが動いているらしく、PySpark形式に変換するため、スクリプトPythonで書きます。データ連携でよく利用する基本的な変換ロジック自体はTransformと呼ばれるAPIが提供されていて、自前でゴリゴリ書くというよりはAPIの組み合わせでなんとかすることを最初に考えた方が良さそうです。作成したJobをダイアグラムとしても確認できるようです。ダイアグラムベースに開発できるわけではないようなので、開発の補助的な役割と実行時の状態可視化に使うためのものではないかと思います。

Triggerは実行タイミングの制御です。AWS Glueで対応しているのはcron形式のスケジューラ、他のジョブの完了検知、オンデマンド実行です。AWSの他のサービスの実行状態なんかをトリガーにする場合は、おそらく対応するイベントで発火するLambdaファンクションを書いて、その中からオンデマンド実行という流れがいいのではないかと思います。

お試しの準備

ETLはそのソリューションの特性上、インターネットにオープンにして使うものではありませんのでVPCの利用が前提になります。ドキュメントにも一通りこの辺りは整備されていますので、参照して環境を作っていきましょう。

docs.aws.amazon.com

なお、S3 to S3のようにサービス上のやりとりになる場合はなくても動くようです。JDBCでアクセスするリソースの場合も同様にVPCの設定が必要です。

クローラを作ってみる

いつまでもドキュメント読むのも疲れるので、少し触って理解しましょう。クローラを作ってS3に放り込んでいるファイルからスキーマ情報を抜き出してみます。S3に放り込んだcsvスキーマ定義とjsonスキーマ定義のためのクローラを作ります。

なお、クローラの作り方そのものは先のブログや公式の方が詳しいので、そちらを説明するよりも、こういう風に使ったらこういう挙動になる、という感じの部分を主に載せておきます。というわけでクローラが対象にするファイルが一つだけのケースについてはドキュメント通りにやれば特に迷うことはないので割愛します。

同一ファイルを年次などの条件でディレクトリ分けしているケースの取り込み

S3データストアを利用する場合、対象として指定されたS3ディレクトリをルートとして、以下の条件が整っていれば、サブディレクトリをパーティションキーとしてスキーマ定義を行います。

  • 同一形式のファイルであること
  • 配下のファイルのカラム構成が同じであること
  • 圧縮の形式が同じであること

例えば、年度のごとのディレクトリを以下のように配置して、バケットをデータストアとして指定した場合のクローリング結果をみてみましょう。

/bucket-name
  /2017
    /csv
      /file.csv
  /2016
    /csv
      /file.csv

クローリングのスキーマの内容は以下のようになります。

f:id:mao_instantlife:20170819092240p:plain

ファイルの項目以外にパーティション用の項目が追加されていますね。まだジョブを触っていないので、具体的にどう使うかはイメージついてないですが、おそらくこれを使ってジョブで読み込むデータをフィルタリングしたり、振り分けたりするのでしょう。

実際のパーティションの状況もみることができます。

f:id:mao_instantlife:20170819092256p:plain

当たり前ですが、この辺、ディレクトリ構成なども含めたデータ設計をやっておかないと実務上で綺麗に仕掛けることは難しいと思います。

複数データストア(ファイル形式が異なる)から読み込んでみる

ドキュメント読んでいて気づいたのですが、意外とクローラのリミットが小さいんですよね(今後大きくなるかどうかは不明)。アカウントごとに10個ですよ。

docs.aws.amazon.com

なので、データストア一つにつき一つのクローラを定義する、といった富豪的な使い方ができません。データのグループごとにまとめて定時実行という戦略を立てる必要があります。この辺のグルーピングの仕方とか境界の引き方というのも割とセンスが出そうですね。

というわけで他のシステムが持っている関連する別テーブル(ファイル)がjsonで出力されているというシナリオで、先と同じクローラで一緒に処理をするということもできます。以下のようにクローラに対して複数のデータセットは普通に指定することができます。

f:id:mao_instantlife:20170819092355p:plain

jsonファイル指定時の注意

何ですが、実行時に少しハマりました。今回追加したデータは、airport.jsonというGithubに公開されていたjsonファイルを使ってクローラを実行したのですが、結果のClassifier情報が UNKNOWN になってしまいました。スキーマ情報も空。csvの方は今までと同様にスキーマ情報を取得できているので、うまく取得できなかったようです。

というわけで立てた仮説は以下。

  1. クローラごとにClassifierは固定される
  2. jsonファイルがおかしい

1を確認するために、jsonファイルのクローラを単独で作って実行。同じようにUNKNOWNになったので否定されました。ドキュメントを見ても仮説を裏付けるような記載はなかったので、データストアごとにClassifier指定のロジックは働くと見ていいでしょう。

2は、jsonがおかしいというよりもjsonファイルのClassiferが使っているライブラリのjsonパースが仕様に沿っていないというべきでしょう。この可能性に気づいたのはファイルの中身を改めて見た時です。このファイル、トップレベルに配列がきているデータなんですね。で、配列を外してカンマ区切りのオブジェクトの羅列にしてクローリングしてみると以下のように取り込めます。

f:id:mao_instantlife:20170819092335p:plain

これ、会社のプロダクトでも同じような経験があって、おそらくjsonのパースにjacksonを使っているのではないかと思われます。jsonの仕様としてトップレベルに配列を持ってくることは禁じられていないはずなのですが、デフォルトではこれをうまくパースしてくれないからClassifierがUNKNOWNになってしまった、といったところでしょうか。この辺りはjsonでデータ出力するときに考慮しておくべき内容ではないかと思います。

今回のまとめ

クローラのお手軽さがいいですね。RDSはまだ試していませんが(サンプル用にスキーマ作るの面倒臭い、誰かいいサンプル持ってないですか?)、使うハードルが同じくらいであれば非常に楽だと思います。ETLはデータを他のデータにロードするためのものなので、input/outputはある程度形式化でき、ジョブで実際に動くビジネスロジックもある程度形式化してきます。そのため、この辺りのものについては使い勝手以外の面で各ソリューションで差別化しづらいのかな、と。AWS Glueについてはそれよりも、連携基盤を作るために必要なデータを集めてくるところの手軽さに注力している感じがしていいですね。Lambdaと組み合わせると柔軟なジョブ起動もできそうなので、他の機能を試すのが楽しみです。

これから試そうと思っているのは以下です。またメモが溜まってきたらエントリにまとめて放出します。

  • ジョブ、トリガー
  • 開発用エンドポイントの作成 Zeppelinとか使うらしいですが、触ったことがないので全くイメージが湧きませんでした。作り方はドキュメントにあります。 Using Development Endpoints for Developing Scripts - AWS Glue
  • コンソールだけで開発するのしんどいのでGithubに置ける形でプロジェクト構成できないか調べてみる これ、Serverless Frameworkみたいなの作る、とか変な方向に行きそうで藪蛇感満載です。

Serverless Frameworkプロジェクトと開発者アセットのCDを回す

前回の続きです。

mao-instantlife.hatenablog.com

mao-instantlife.hatenablog.com

GithubにコードがプッシュされたらServerlessプロジェクトをデプロイし、デプロイ後のAPIドキュメントやクライアントライブラリの生成を継続的にする環境を作ってみましょう。

閑話:外部パッケージ管理の修正

最初からかよ、とか言わない。前回、とあるLambdaファンクションを作る際に外部ライブラリを必要としました。とりあえずは .gitignore に追加してローカルで pip install することでお茶を濁していましたが、やはりかっこ悪いですし、CD環境作るには微妙なところでもあるのでプラグインを導入してなんとかします。

github.com

やはり誰かが考えてるものですよね、遠慮なく乗っからせていただきます。ちなみに、クラメソさんの以下の記事を参考にして使い始めたけど、少しバージョンアップして使い方が変わったっぽいです。最新はドキュメントをみましょう。

dev.classmethod.jp

ローカルでは普通にグローバルに pip install してパスを通して使うという場合はプラグインを入れて requirements.txt を使う感じです。特にプロダクトコードに記載を加える必要はありません。

グローバルに pip install せずに requirements.txt の設定を元にローカルでもキックするよ、という場合は外部ライブラリをzipにまとめて使う設定にしましょう。 serverless.yml に以下の設定を追加します。

custom:
  pythonRequirements:
    zip: true

その上で、対象の外部ライブラリを使うコードの前に

import unzip_requirements

を追加します。

ダウンロードしたライブラリや自動生成されるコードのなどがあるので、以下は.gitignoreに追加しておくのが良いと思います。

  • .requirements
  • .requirements.zip
  • unzip_requirements.py

閑話休題:CD環境の整理

前回の構想、こんな感じでした。

Github(merge)-> CodeBuild -> CodeDeploy -(API Gateway)-> クライアントライブラリの生成とSwagger UIの更新

今回、CodePipelineで全体を管理できないか調べてみました。

CodePipeline is 何?

リリースのための手順、フローのモデル化、自動化のためのAWSのサービスです。で、調べ始めたんですが、割とつらみがあるっぽいです。

qiita.com

CircleCIでいいんじゃない?

ですよねー。

というわけでCircleCIをおさらいしてみました。前にちょっと使ってた時は1系だったので少し進化してましたね。

  • ビルドやテスト実行はDockerが基本になってる
  • 複数のイメージの連携もCirclCIがよしなにしてくれる
  • ジョブ毎に個別のイメージが使える
  • AWSへのデプロイ設定も楽

circleci.com

カスタムコンテナを使う

で、今回のプロジェクトはServerless Framework + Python3なわけですので、ビルド環境のイメージは専用に作ってあげる必要があります。基本的なビルド環境についてはCircleCIが使う各種ツールがインストールされた状態でのオフィシャルイメージがあるので、nodeから派生させることにしましょう。

イメージは .config/images にDockerfileをおくか、リポジトリにプッシュしておいてもいいです。構成が決まればイメージは共有できて、必要なミドルウェアは別のイメージで配備するというてもあるので、Dockerhubとかにプッシュしておくのが楽だと思います。

というわけで出来上がったのがこちらのイメージ。Dockerhubにも公開していますので、必要な方はご自由にどうぞ。

github.com

Serverless Framework + Python3で使うPythonのバージョンが3.6で、apt-getでインストールできるPythonが3.4だったし、いくつかググって出てきたバージョン3.6があると思しきリポジトリは追加しても404だったりで、結局イメージないでコンパイル・インストールしています。このあたりはもっとスマートに行きたいところだったんですが。。。

今回はデプロイまでなのでJREをインストールしていませんが、テストを流すとなるとDynamoDB Localを使うことになるのでJREのインストールが必要になります。これもこれで面倒臭いというか、LocalStackとかの利用で別のイメージ前提にした方がいいのではないかと考え中です。

github.com

qiita.com

閑話:serverless-step-functionsプラグインの制約にハマる

で、意気揚々とイメージ作ってデプロイしてみたら sls invoke stepf が動かないわけですよ。実はローカルで実行してみても同様だったと。StateMachineが見つからないとか言われます、そんなバカな。で、色々とごにょごにょしていて、StateMachineの設定からname属性を外し、命名をキャメルケースにしたらできました。実際にデプロイされた時の名前はこんな感じです。

MyAssistantGenerateClientStepFunctionsStateMachine-************

末尾はなんかハッシュっぽい、念のためマスクしています。この末尾の部分はデプロイするステージを変更したら変わりました。この辺普通に文字列でつけたりできないか確認したいところ。まあ、Step Functionsへのキックイベントも serverless.yml で書けるので実際問題はそこまできにする必要がないのかもしれませんが。

で、前回までの命名とname属性で登録された名前がこちら。

MyDashassistantDashgenerateDashclientDashdev-************

なんかname属性がある場合とか、変換とか文字数とかいくつか原因はありそうな気がします。とりあえず現状はそんなに細かい命名をしたいわけではないので、上の対応策でなんとかしました。

閑話休題:継続的デプロイの設定

というわけで今回の内容を反映した .circleci/config.yml がこちらです。走り過ぎても面倒臭いので現状はmasterへのプッシュ、マージの時のみ発動、CodeGeneratorもまだ常時キックする感じでもないためStepFunctionsの設定をパスするように変更しています。

version: 2
jobs:
  build:
    docker:
      - image: shinsukeabe/circleci-serverless-python3:latest
    steps:
      - checkout
      - run:
          name: install project dependencies plugins
          command: sudo npm install serverless-dynamodb-local
      - run:
          name: deploy serverless functions
          command: sls deploy -v
      - run:
          name: distribute developers assets
          command: sls invoke stepf --name myAssistantGenerateClient
general:
  branches:
    only:
      - master

github.com

まとめ

というわけで、CircleCIで自動デプロイまで仕掛けました。今回テストは仕掛けてませんが、ここまでできて入れば後は細かいつらみとの戦いになります。ここからしばらくはServerlessとLambdaの使い方を深めていく方向に試していこうかと思います。KMSとか使ってないですし、Lambdaに合ったデータの永続化のあり方とか。キリが良くなったら続けます。

Serverless Frameworkでデプロイしたサービスからswagger.jsonを作りSDKやドキュメントを生成する

先日の続きです。

前回はServerless Frameworkを使ってAPI Gatewayでサービスを公開してみる、というところまでやりました。

個人や自分のチームだけが使ってメンテナンスするシングルサービスで構成されるアプリケーションならこれでガンガン開発すれば良いと思います。ただ、ものによってはチームで複数のサービスから構成される複雑なシステムを作る、といったケースや、外部に公開するAPIをもつサービスの一部としてこれらのサービスを開発する、というケースも想定されます。このような場合に必要なのが、開発者向けのドキュメントやクライアントライブラリを提供することです。

今回はそのあたりの整備をやっていきたいと思います。例によってリポジトリはここです。

github.com

github.com

一つ増えていますが、これはクライアントライブラリなどの開発者アセットの生成と配備を行うLambdaファンクション群をまとめたプロジェクトです。複数のプロジェクトから使い回せるように別プロジェクトに分けてあります。

最終的には、Github(merge)-> CodeBuild -> CodeDeploy -(API Gateway)-> クライアントライブラリの生成とSwagger UIの更新、みたいな流れををCodePipelineで作りたいわけですが、長くなったので最後の「クライアントライブラリの生成とSwagger UIの更新」の部分にフォーカスしています。

とりあえず、何をするにしてもswaggerファイルがあれば勝てると思うのでそこを目指すことにしましょう。

API GatewayからSwaggerファイルをエクスポートする

API GatewayAPIにはSwaggerファイルの形式でエクスポートするための get-export があるのでそれを使います。REST APIもありますが、Lambdaで叩こうと思うのでboto3を使っています。

import boto3
import os
import logging

apigateway = boto3.client('apigateway')
s3 = boto3.resource('s3')


def generate_swagger(event, context):
    try:
        staged_api = [api for api in apigateway.get_rest_apis()['items']
                      if api['name'] == os.environ['REST_API_NAME']].pop()

        exported_api = apigateway.get_export(
            restApiId=staged_api['id'],
            stageName=os.environ['REST_API_STAGE_NAME'],
            exportType="swagger"
        )

        swagger_bucket = s3.Bucket(os.environ['SWAGGER_FILE_BUCKET'])

        swagger_file = swagger_bucket.Object('swagger.json')
        swagger_file.put(
            Body=exported_api['body'].read().decode('utf-8'),
            ContentEncoding='utf-8',
            ContentType='text/plane',
            ACL='public-read'
        )
        return "/".join(["https://s3-ap-northeast-1.amazonaws.com",
                         os.environ['SWAGGER_FILE_BUCKET'],
                         "swagger.json"])
    except Exception as e:
        logging.error("Generate Swagger-File Failed. Detail:{0}".format(e))
        raise Exception("Coudn't create swagger file. Detail:{0}".format(e))

apigateway.get_export をキックする前に apigateway.get_rest_apis を叩いているのは、 get_export でのAPI指定にIDが必要で、 serverless.yml に書かれている設定だけではまかなえなかったからです。これについてはデプロイする環境名など、アプリケーションの情報を使うのでアプリケーション内に含まれるLambdaファンクションとしています。

ちなみに、これは後々ElasticBeanstalkで立てるSwagger GeneratorがURLでアクセスする必要があるのでアクセス権を public-read にしています。

(当たり前だけど)ローカル実行時のユーザとデプロイ時のユーザが違うので注意

Serverless Frameworkにはローカル実行オプションがあって、リソースへのアクセスさえできればデプロイせずにも実行できるのですが、実行ユーザに関する注意点があることに今更気づきました。ローカル実行時のユーザが割と強い権限なので、 serverless.yml に権限つけ忘れても動いてしまうんですよね。で、デプロイして実行たら権限が足りなくてこける、という(デプロイ自体は成功する)。

今回は、API Gateway関連の権限をつけ忘れていたので、以下を参考にしました。

docs.aws.amazon.com

APIドキュメントを充実させる

ちなみに、普通にServerless Framework使っているだけの状態でSwaggerファイルを生成しても、連携するもしくはユーザになる開発者に有用なアセットを作ることができません。エンドポイントの説明とかモデルの定義とかが空の状態のままなんですね。というわけで、APIドキュメントを充実させるために以下のプラグインを導入します。

www.npmjs.com

基本的な使い方はドキュメントに書いてある通りなんですが、statusCodeはstringを要求されるみたいなので気をつけましょう。

Swaggerの定義では、リクエスト/レスポンスのモデルをJsonSchemaで定義します。このプラグインでは、それらの定義を別ファイルに切り出しておくことができます。それが以下の部分です。

    models:
      - name: "CreateLifeLogRequest"
        description: "ライフログを作成する時のリクエスト"
        contentType: "application/json"
        schema: "${file(models/create_life_log_request.json)}"
      - name: "LifeLog"
        description: "ライフログのモデル"
        contentType: "application/json"
        schema: "${file(models/lie_log.json)}"

実際のJsonSchemaの定義は以下。

{
  "type": "object",
  "properties": {
    "event": {
      "description": "ライフイベントの内容を記載してください。",
      "type": "string"
    },
    "insight": {
      "description": "ライフイベントが起こった時の感情を記載してください。(任意)",
      "type": "string"
    }
  },
  "required": ["event"]
}

で、モデルをJsonSchemaで表せるということは、Lambdaファンクション内でスキーマをベースにしたチェックができるということなんですね。

import json
import logging
import os
import time
import uuid
import jsonschema

from commons import aws_resources


def create_life_log(event, context):
    try:
        dynamodb = aws_resources.get_dynamodb(event)
        timestamp = int(time.time() * 1000)

        data = json.loads(event['body'])
        request_schema = json.load(open(os.environ['REQUEST_MODEL_SCHEMA']))
        jsonschema.validate(data, request_schema)
        table = dynamodb.Table(os.environ['LIFE_EVENT_TABLENAME'])

        item = {
            'id': str(uuid.uuid1()),
            'event': data['event'],
            'createdAt': timestamp,
            'updatedAt': timestamp
        }

        if 'insight' in data:
            item['insight'] = data['insight']

        table.put_item(Item=item)

        response = {
            "statusCode": 200,
            "body": json.dumps(item)
        }

        return response
    except KeyError as e:
        logging.error("Validation Failed")
        raise Exception("Coudn't create life log.Detail:{0}".format(e))
        return

jsonschema.validate(data, request_schema) の部分がそれです。ちなみに、Lambdaの実行環境には jsonschema は含まれていないので、プロジェクトにインストールしてデプロイ時のzipファイルに含まれるようにしてあげる必要があります。どうしようか迷ったんですが、とりあえずインストールした外部パッケージ自体はgitignoreには追加してます。

と、ここまで書きながら調べたら、 lambda-uploderというプラグインがあるんですね、今度使ってみたいと思います。

dev.classmethod.jp

SwaggerファイルからSDKを作る

さて、ここからは生成したSwaggerファイルを活用していきます。Swaggerファイルから各種コード生成を行うためのSwagger Codegeneratorというツールがあるので、活用します。

https://swagger.io/swagger-codegen/swagger.io

これは、Dockerhubにある公式イメージで、ジェネレータをWebサービスとして起動可能なので、Elastic Beanstalkを使ってサービスを立てます。その時Dockerrun.aws.jsonは以下のような感じ。

{
  "AWSEBDockerrunVersion": "1",
  "Image": {
    "Name": "swaggerapi/swagger-generator",
    "Update": true
  },
  "Ports": [
    {
      "ContainerPort": "8080"
    }
  ]
}

例えばJavaのクライアントライブラリを生成したい場合は以下のようにリクエストしてください。

curl -X POST --header 'Content-type: application/json' --header 'Accept: application/json' -d '{"swaggerUrl": "swagger.jsonのURL"}' EBのURL/api/gen/clients/java

このAPIが請け負うのはライブラリの生成要求のみで、ダウンロードは別のAPIが用意されています。先のAPIの結果から code を取得して以下のように投げます。

curl -X GET "EBのURL/api/gen/download/取得したcode" -H "accept: application/octet-stream" > library.zip

なお、これで生成されるライブラリのダウンロードはワンタイムな模様で、ダウンロードに失敗したら生成からやり直しが必要です。

これをLambdaファンクションとして書いたものが以下です。これは割と使い回せるので、別に配布用のサービスを作っています。

import json
import os
import boto3
import urllib.request
s3 = boto3.resource('s3')


def generate_client_api(swagger_url, language):
    api_base = {
        "url": "/".join([os.environ['SWAGGER_API_URL'],
                        os.environ['GENERATE_CLIENT_URL'],
                        language]),
        "method": "POST",
        "headers": {"Content-Type": "application/json"}
    }
    obj = {"swaggerUrl": swagger_url}
    json_data = json.dumps(obj).encode("utf-8")

    return urllib.request.Request(api_base['url'],
                                  data=json_data,
                                  method=api_base['method'],
                                  headers=api_base['headers'])


def download_client_api(code):
    return "/".join([os.environ['SWAGGER_API_URL'],
                     os.environ['DOWNLOAD_CLIENT_URL'],
                     code])


def distribute_client(event, context):
    request = generate_client_api(event['swaggerUrl'], event['language'])

    with urllib.request.urlopen(request) as res:
        code = json.loads(res.read().decode('utf-8'))['code']
        with urllib.request.urlopen(download_client_api(code)) as client_res:
            client_bucket = s3.Bucket(os.environ['CLIENT_SDK_BUCKET'])
            client_file = client_bucket.Object(
                event['serviceName'] +
                '/{0}/{0}-generated-client.zip'.format(event['language']))
            client_file.put(Body=client_res.read())

    response = {
        "statusCode": 200
    }

    return response

Swaggerファイルの生成とクライアントの生成をStepFunctionsで繋ぐ

Swaggerファイルとクライアントの生成を個別にLambdaファンクションで行うことには成功しました。どうせならこの辺繋げたいですよね?

というわけでStep Functionsを使って繋げましょう。StepFunctionsとは、視覚的なワークフローを使って、コンポーネントやLambdaファンクションを繋ぐ分散アプリケーションを構築するためのサービスです。

aws.amazon.com

前職時代にSOAPでこういうソリューションを使った記憶があって、大規模SIerが好きそうな感じがしすね。時代は巡る糸車。

ステートマシンという実行ワークフローを構築し、その中のタスクでLambdaの実行などをします。ステートマシンはjsonベースのAmazon State Languageで書かれるようです。

docs.aws.amazon.com

ドキュメントに日本語はありませんが、東京リージョンにもちゃんときているので、安心して使いましょう。なお、当然といえば当然ですが、さすがに同じアカウント内のLambdaファンクションじゃないとワークフローに使えません。

というわけで、こんな感じのStep Functionsを作りました。

f:id:mao_instantlife:20170721212911p:plain

{
  "Comment": "Swaggerファイルの生成に成功したら、各言語向けにライブラリを生成する。",
  "StartAt": "SetServiceName",
  "States": {
    "SetServiceName": {
      "Comment": "サービス名の設定",
      "Type": "Pass",
      "Result": "my-assistant",
      "ResultPath": "$.serviceName",
      "Next": "GenerateSwaggerFile"
    },

    "GenerateSwaggerFile": {
      "Type": "Task",
      "Comment": "Swaggerファイルの生成",
      "Resource": "Swaggerファイル生成のLambdaファンクションARN",
      "Catch": [{
        "ErrorEquals": ["UnhandledError"],
        "Next": "GenerateErrorFallback"
      }],
      "ResultPath": "$.swaggerUrl",
      "Next": "GenerateClientParallel"
    },

    "GenerateErrorFallback": {
         "Type": "Pass",
         "Result": "This is a fallback from a generate swagger file Lambda function exception",
         "End": true
    },

    "GenerateClientParallel": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "SetLanguageJava",
          "States": {
            "SetLanguageJava": {
              "Comment": "言語設定にJavaを追加する",
              "Type": "Pass",
              "Result": "java",
              "ResultPath": "$.language",
              "Next": "GenerateClientJava"
            },

            "GenerateClientJava": {
              "Type": "Task",
              "Resource": "クライアント生成のLambdaファンクションARN",
              "InputPath": "$",
              "OutputPath": "$",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetLanguageRuby",
          "States": {
            "SetLanguageRuby": {
              "Comment": "言語設定にRubyを追加する",
              "Type": "Pass",
              "Result": "ruby",
              "ResultPath": "$.language",
              "Next": "GenerateClientRuby"
            },

            "GenerateClientRuby": {
              "Type": "Task",
              "Resource": "クライアント生成のLambdaファンクションARN",
              "InputPath": "$",
              "OutputPath": "$",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetLanguageJavascript",
          "States": {
            "SetLanguageJavascript": {
              "Comment": "言語設定にJavaScriptを追加する",
              "Type": "Pass",
              "Result": "javascript",
              "ResultPath": "$.language",
              "Next": "GenerateClientJavaScript"
            },

            "GenerateClientJavaScript": {
              "Type": "Task",
              "Resource": "クライアント生成のLambdaファンクションARN",
              "InputPath": "$",
              "OutputPath": "$",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetLanguageScala",
          "States": {
            "SetLanguageScala": {
              "Comment": "言語設定にScalaを追加する",
              "Type": "Pass",
              "Result": "scala",
              "ResultPath": "$.language",
              "Next": "GenerateClientScala"
            },

            "GenerateClientScala": {
              "Type": "Task",
              "Resource": "クライアント生成のLambdaファンクションARN",
              "InputPath": "$",
              "OutputPath": "$",
              "End": true
            }
          }
        }
      ]
    }
  }
}

Swaggerファイルの生成に成功したら、JavaRubyJavaScriptScalaのクライアント生成をパラレルで実行するようにしています。Swaggerファイルの生成に失敗した場合はLambdaファンクションから例外を発生させて、それをキャッチしてStateMachinを止めるように作ってあります。カスタムエラーもドキュメントにはあるのですが、うまく動きませんでした。

ドキュメントを読む限りは、他には分岐やリトライなどもあり、基本的なワークフローは問題なく実装できそうです。

ただし、StateMachineのinputとoutputは特殊で慣れが必要だな、と感じました。StateMachine自体のInputとOutputを全てのタスクでコンテキストとして共有して、やりとりしている感じです。全てのタスクにはStateMachineのInputが渡されます。そのため、あるタスクの結果を後続タスクに曳き回したいときは、Inputの構成にそれを追加してあげる必要が出てきます。 GenerateSwaggerFile タスクの "ResultPath": "$.swaggerUrl", となってる箇所がその箇所です。このタスクの処理結果をInputの swaggerUrl という属性に放り込んでいます。

ちなみに、Parallelで並列実行する場合は各ブランチごとにInput領域がコピーされるため、ブランチ内では独立してInput領域を扱うことができ、名前が被っても問題ありません。実行時に実際に渡されたInputを見てみましょう。Java生成ブランチでのものとRuby生成ブランチでのものです。

f:id:mao_instantlife:20170721212930p:plain

f:id:mao_instantlife:20170721212943p:plain

上記のように、同じInputの属性でも別々の値がセットされています。

StepFunctionsをServerlessフレームワークの設定として書く

ここまでくると、アプリケーションのサービスをデプロイしたらStep FunctionsのStateMachineも作って欲しいですよね?というわけで、こんなプラグインを導入しました。

github.com

このプラグインを導入すれば、 serverless.yml にStep Functionsの設定をかけるようになります。基本的にはjson形式がyaml形式になるだけです。

github.com

ただ、StateMachineの登録される名前がちょっとあれでですね、今回の設定でデプロイしたらこんな感じになりました。

MyDashassistantDashgenerateDashclientDashdev-HJGGH29DKCX7

ちょっと待って、 -Dash に変換されるとか望んでない。。。

SwaggerファイルからSwagger UIを作る

で、最後にSwagger UIなわけですが、これもswagger-uiもdockerイメージとして起動可能なのがわかったので、今回はちょっと省略させてもらいます。

https://hub.docker.com/r/swaggerapi/swagger-ui/hub.docker.com

基本的なEBの構成はイメージの変更だけでSwagger Codegenと同じでいいはず。環境変数でSwaggerファイルを参照するので、更新されたら念のためアプリケーションサーバの再起動をすれば問題なさそうです。

まとめ

長くなりましたが、これで開発者向けの各種アセットをワンアクションで生成、配備することができるようになりました。次は、コードの更新に合わせてそれを実行する、というプロセスができれば、Serverless Frameworkでマイクロサービスを作る上での基盤というかリファレンスアーキテクチャ的なものができるかな、と。次はその辺りに取り組んでみたいと思います。