PySparkでいろいろ調査したのでメモ書き
PySparkにはいろいろなファイルロード/セーブ方法がある。
メモ
ローカル環境でShift-JISファイルを読み込んでUTF-8で出力
- 順当にリストをparallelizeしてRDDからDataframe化
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') path = './sample-sjis.txt' with open(path) as f: str_list = map(lambda l: l.decode('shift-jis'), f.readlines()) sc = SparkContext() spark = SparkSession(sc) row = Row('title') rdd = sc.parallelize(str_list) df = rdd.map(row).toDF() df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('sample-utf8.csv') sc.stop()
S3上のShift-JISファイルを読み込んでUTF-8で出力(boto3)
- stringをreadしてdecodeしてUTF-8化してdataframe化
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') import boto3 s3 = boto3.client('s3') response = s3.get_object(Bucket='spark.freestylewiki.xyz', Key='sample-sjis.txt') str_list = response['Body'].read().decode('shift-jis') sc = SparkContext() spark = SparkSession(sc) row = Row('title') rdd = sc.parallelize(str_list) df = rdd.map(row).toDF() df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('s3://spark.freestylewiki.xyz/sample-utf8.csv') sc.stop()
S3上のShift-JISファイルを読み込んでUTF-8で出力(databricks)
- 上のやり方だとメモリに乗らない場合まずい
- databricksのライブラリ上で文字コード指定ができるようだ
- python - load big japanese files in hadoop - Stack Overflow
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext() spark = SparkSession(sc) df = spark.read \ .format('com.databricks.spark.csv') \ .option('header', 'false') \ .option('inferschema', 'false') \ .option('charset', 'shift-jis') \ .option('delimiter', '\t') \ .load('sample-sjis.txt') df = df.withColumnRenamed('_c0', 'title') df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('sample-utf8.csv') sc.stop()