PySparkのハマり(メモ)
pipで取得したpysparkを起動させると、以下のようなエラーが出ることがある。
Sparkのバージョンは2.3.1。
(venv) C:\Users\hiroyuki.nagata\PycharmProjects\vjsk-etl-flow>pyspark Python 2.7.12 (v2.7.12:d33e0cf91556, Jun 27 2016, 15:24:40) [MSC v.1500 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. 2019-01-09 09:46:27 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Traceback (most recent call last): File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\shell.py", line 38, inSparkContext._ensure_initialized() File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\context.py", line 292, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\java_gateway.py", line 120, in launch_gateway auto_convert=True)) TypeError: __init__() got an unexpected keyword argument 'auth_token'
コードを辿ってみると、どうやら起動したPySparkが呼び出している関数の引数でpy4jを呼び出しているのだが、そいつの名前付き引数"auth_token"が、py4j側で削除されてるらしい。どう見てもpipのパッケージ管理でpysparkとpy4jのバージョンの整合性が無いように見える。
原因はpipで用意したpysparkがv2.2.1で、インストールされているsparkがv2.3.1であることだった。後で入れ替えとくか…
Go言語で継承みたいなこと
最近Go言語でつくるインタプリタを読んでいる。ほとんど趣味だが、lexer/parser(字句解析/構文解析)->ast化(抽象構文木)->eval(評価)までの流れを辿れるためとてもおもしろい。
- 作者: Thorsten Ball,設樂洋爾
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/06/16
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
Go言語で継承
これは、Scalaでtrait作ってwith句で使えるようにしたり、RubyでModuleをincludeしてやるのに似ている。
簡単な例
package main import ( "fmt" ) // 共通処理を入れたクラス、みたいなもの type CommonUtil struct {} // 名前を入れたらあいさつする関数 func (*CommonUtil) SayHello(name string) { fmt.Printf("SayHello! %s", name) } // 実際のモジュール type ActualModule struct { *CommonUtil // ここでEmbeddingする Name string } func main() { fmt.Println("Hello, Wandbox!") am := ActualModule{&CommonUtil{}, "hiroyuki"} // SayHelloを呼び出せる am.SayHello(am.Name) }
PySparkでいろいろ調査したのでメモ書き
PySparkにはいろいろなファイルロード/セーブ方法がある。
メモ
ローカル環境でShift-JISファイルを読み込んでUTF-8で出力
- 順当にリストをparallelizeしてRDDからDataframe化
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') path = './sample-sjis.txt' with open(path) as f: str_list = map(lambda l: l.decode('shift-jis'), f.readlines()) sc = SparkContext() spark = SparkSession(sc) row = Row('title') rdd = sc.parallelize(str_list) df = rdd.map(row).toDF() df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('sample-utf8.csv') sc.stop()
S3上のShift-JISファイルを読み込んでUTF-8で出力(boto3)
- stringをreadしてdecodeしてUTF-8化してdataframe化
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') import boto3 s3 = boto3.client('s3') response = s3.get_object(Bucket='spark.freestylewiki.xyz', Key='sample-sjis.txt') str_list = response['Body'].read().decode('shift-jis') sc = SparkContext() spark = SparkSession(sc) row = Row('title') rdd = sc.parallelize(str_list) df = rdd.map(row).toDF() df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('s3://spark.freestylewiki.xyz/sample-utf8.csv') sc.stop()
S3上のShift-JISファイルを読み込んでUTF-8で出力(databricks)
- 上のやり方だとメモリに乗らない場合まずい
- databricksのライブラリ上で文字コード指定ができるようだ
- python - load big japanese files in hadoop - Stack Overflow
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row from pprint import pprint import sys reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext() spark = SparkSession(sc) df = spark.read \ .format('com.databricks.spark.csv') \ .option('header', 'false') \ .option('inferschema', 'false') \ .option('charset', 'shift-jis') \ .option('delimiter', '\t') \ .load('sample-sjis.txt') df = df.withColumnRenamed('_c0', 'title') df.show() # 書き出してみる df.select('title') \ .coalesce(1) \ .write \ .format('com.databricks.spark.csv') \ .option("overwrite", "true") \ .save('sample-utf8.csv') sc.stop()