なんとな~くしあわせ?の日記

「そしてそれゆえ、知識そのものが力である」 (Nam et ipsa scientia potestas est.) 〜 フランシス・ベーコン

PySparkでいろいろ調査したのでメモ書き

PySparkにはいろいろなファイルロード/セーブ方法がある。

メモ

ローカル環境でShift-JISファイルを読み込んでUTF-8で出力

  • 順当にリストをparallelizeしてRDDからDataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

path = './sample-sjis.txt'
with open(path) as f:
    str_list = map(lambda l: l.decode('shift-jis'), f.readlines())

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(boto3)

  • stringをreadしてdecodeしてUTF-8化してdataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import boto3
s3 = boto3.client('s3')
response = s3.get_object(Bucket='spark.freestylewiki.xyz', Key='sample-sjis.txt')
str_list = response['Body'].read().decode('shift-jis')

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('s3://spark.freestylewiki.xyz/sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(databricks)

  • 上のやり方だとメモリに乗らない場合まずい
  • databricksのライブラリ上で文字コード指定ができるようだ

- python - load big japanese files in hadoop - Stack Overflow

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

sc = SparkContext()
spark = SparkSession(sc)

df = spark.read \
     .format('com.databricks.spark.csv') \
     .option('header', 'false') \
     .option('inferschema', 'false') \
     .option('charset', 'shift-jis') \
     .option('delimiter', '\t') \
     .load('sample-sjis.txt')

df = df.withColumnRenamed('_c0', 'title')
df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()