AWS GlueでSparkのDataframeを使う
AWS GlueでSparkのDataframeを使う
Glue上のクラス構造
- 引用
Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルタ/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。
これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。
こう書かれてはいるが、DynamicFrameは少し自由度が低いように感じられる(S3やRDSへの接続が可能なのはいいんだけど)。とは言えDynamicFrameからDataFrameへの変換、そしてその逆が可能であるので、やりたいことは実現できた。
DynamicFrameからDataFrameへの変換
toDF
toDF(options)
# DynamicFrame -> Spark DataFrame
src_s3_df = DynamicFrame.toDF(<元のDynamicFrame>)
- DynamicRecords を DataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。
DataFrameからDynamicFrameへの変換
fromDF fromDF(dataframe, glue_ctx, name) # Spark DataFrame -> DynamicFrame result_s3 = DynamicFrame.fromDF(<加工したDataFrame>, glueContext, 'result_s3')
- DataFrame フィールドを DynamicRecord に変換することにより、DataFrame を DynamicFrame に変換します。新しい DynamicFrame を返します。
ここまでをまとめると以下のような感じ
DataFrameを使った処理など
連番作成
最初はrow_number()を使おうとしたのだけど、うまくいかなかったので zipWithIndex を使った。DataFrameに生えているrddを使えばOK。中のデータをいじるときはrddを更新したほうがよさそう。
rdd_indexed = dataframe.rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1)) df = rdd_indexed.toDF(['id','score','rowNum']) df.show()
カラムの追加、リネーム
withColumn, withColumnRenamedというものがDataFrameに生えているのでそれを使う。表形式のデータを変更したい場合はDataFrameに生えているメソッドでだいたい解決しそう。
.withColumn(lit('固定値')) .withColumnRenamed('from', 'to')
あとはこの調子でデータを変形してDynamicFrameに戻せばまた連携できる