なんとな~くしあわせ?の日記

「そしてそれゆえ、知識そのものが力である」 (Nam et ipsa scientia potestas est.) 〜 フランシス・ベーコン

PySparkのハマり(メモ)

pipで取得したpysparkを起動させると、以下のようなエラーが出ることがある。
Sparkのバージョンは2.3.1。

(venv) C:\Users\hiroyuki.nagata\PycharmProjects\vjsk-etl-flow>pyspark
Python 2.7.12 (v2.7.12:d33e0cf91556, Jun 27 2016, 15:24:40) [MSC v.1500 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
2019-01-09 09:46:27 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\shell.py", line 38, in 
    SparkContext._ensure_initialized()
  File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\context.py", line 292, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\java_gateway.py", line 120, in launch_gateway
    auto_convert=True))
TypeError: __init__() got an unexpected keyword argument 'auth_token'

コードを辿ってみると、どうやら起動したPySparkが呼び出している関数の引数でpy4jを呼び出しているのだが、そいつの名前付き引数"auth_token"が、py4j側で削除されてるらしい。どう見てもpipのパッケージ管理でpysparkとpy4jのバージョンの整合性が無いように見える。

原因はpipで用意したpysparkがv2.2.1で、インストールされているsparkがv2.3.1であることだった。後で入れ替えとくか…

Go言語で継承みたいなこと

最近Go言語でつくるインタプリタを読んでいる。ほとんど趣味だが、lexer/parser(字句解析/構文解析)->ast化(抽象構文木)->eval(評価)までの流れを辿れるためとてもおもしろい。

Go言語でつくるインタプリタ

Go言語でつくるインタプリタ

Go言語で継承

これは、Scalaでtrait作ってwith句で使えるようにしたり、RubyでModuleをincludeしてやるのに似ている。

簡単な例

package main

import (
    "fmt"
)

// 共通処理を入れたクラス、みたいなもの
type CommonUtil struct {}
// 名前を入れたらあいさつする関数
func (*CommonUtil) SayHello(name string) {
    fmt.Printf("SayHello! %s", name)
}

// 実際のモジュール
type ActualModule struct {
    *CommonUtil  // ここでEmbeddingする
    Name string
}

func main() {
    fmt.Println("Hello, Wandbox!")
    am := ActualModule{&CommonUtil{}, "hiroyuki"}
    // SayHelloを呼び出せる
    am.SayHello(am.Name)
}

PySparkでいろいろ調査したのでメモ書き

PySparkにはいろいろなファイルロード/セーブ方法がある。

メモ

ローカル環境でShift-JISファイルを読み込んでUTF-8で出力

  • 順当にリストをparallelizeしてRDDからDataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

path = './sample-sjis.txt'
with open(path) as f:
    str_list = map(lambda l: l.decode('shift-jis'), f.readlines())

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(boto3)

  • stringをreadしてdecodeしてUTF-8化してdataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import boto3
s3 = boto3.client('s3')
response = s3.get_object(Bucket='spark.freestylewiki.xyz', Key='sample-sjis.txt')
str_list = response['Body'].read().decode('shift-jis')

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('s3://spark.freestylewiki.xyz/sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(databricks)

  • 上のやり方だとメモリに乗らない場合まずい
  • databricksのライブラリ上で文字コード指定ができるようだ

- python - load big japanese files in hadoop - Stack Overflow

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

sc = SparkContext()
spark = SparkSession(sc)

df = spark.read \
     .format('com.databricks.spark.csv') \
     .option('header', 'false') \
     .option('inferschema', 'false') \
     .option('charset', 'shift-jis') \
     .option('delimiter', '\t') \
     .load('sample-sjis.txt')

df = df.withColumnRenamed('_c0', 'title')
df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()