なんとな~くしあわせ?の日記

「そしてそれゆえ、知識そのものが力である」 (Nam et ipsa scientia potestas est.) 〜 フランシス・ベーコン

AWS GlueでSparkのDataframeを使う

AWS GlueでSparkのDataframeを使う

Glue上のクラス構造

docs.aws.amazon.com

  • 引用

Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルタ/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。
これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

こう書かれてはいるが、DynamicFrameは少し自由度が低いように感じられる(S3やRDSへの接続が可能なのはいいんだけど)。とは言えDynamicFrameからDataFrameへの変換、そしてその逆が可能であるので、やりたいことは実現できた。

DynamicFrameからDataFrameへの変換

toDF
toDF(options)

# DynamicFrame -> Spark DataFrame
src_s3_df = DynamicFrame.toDF(<元のDynamicFrame>)
  • DynamicRecords を DataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。

DataFrameからDynamicFrameへの変換

fromDF
fromDF(dataframe, glue_ctx, name)

# Spark DataFrame -> DynamicFrame
result_s3 = DynamicFrame.fromDF(<加工したDataFrame>, glueContext, 'result_s3')
  • DataFrame フィールドを DynamicRecord に変換することにより、DataFrame を DynamicFrame に変換します。新しい DynamicFrame を返します。

ここまでをまとめると以下のような感じ

f:id:panzer-jagdironscrap1:20180621180055p:plain

DataFrameを使った処理など

連番作成

最初はrow_number()を使おうとしたのだけど、うまくいかなかったので zipWithIndex を使った。DataFrameに生えているrddを使えばOK。中のデータをいじるときはrddを更新したほうがよさそう。

rdd_indexed = dataframe.rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))
df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()

カラムの追加、リネーム

withColumn, withColumnRenamedというものがDataFrameに生えているのでそれを使う。表形式のデータを変更したい場合はDataFrameに生えているメソッドでだいたい解決しそう。

.withColumn(lit('固定値'))
.withColumnRenamed('from', 'to')

あとはこの調子でデータを変形してDynamicFrameに戻せばまた連携できる