なんとな~くしあわせ?の日記

「そしてそれゆえ、知識そのものが力である」 (Nam et ipsa scientia potestas est.) 〜 フランシス・ベーコン

PySparkでいろいろ調査したのでメモ書き

PySparkにはいろいろなファイルロード/セーブ方法がある。

メモ

ローカル環境でShift-JISファイルを読み込んでUTF-8で出力

  • 順当にリストをparallelizeしてRDDからDataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

path = './sample-sjis.txt'
with open(path) as f:
    str_list = map(lambda l: l.decode('shift-jis'), f.readlines())

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(boto3)

  • stringをreadしてdecodeしてUTF-8化してdataframe化
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import boto3
s3 = boto3.client('s3')
response = s3.get_object(Bucket='spark.freestylewiki.xyz', Key='sample-sjis.txt')
str_list = response['Body'].read().decode('shift-jis')

sc = SparkContext()
spark = SparkSession(sc)

row = Row('title')
rdd = sc.parallelize(str_list)
df = rdd.map(row).toDF()

df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('s3://spark.freestylewiki.xyz/sample-utf8.csv')

sc.stop()

S3上のShift-JISファイルを読み込んでUTF-8で出力(databricks)

  • 上のやり方だとメモリに乗らない場合まずい
  • databricksのライブラリ上で文字コード指定ができるようだ

- python - load big japanese files in hadoop - Stack Overflow

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pprint import pprint

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

sc = SparkContext()
spark = SparkSession(sc)

df = spark.read \
     .format('com.databricks.spark.csv') \
     .option('header', 'false') \
     .option('inferschema', 'false') \
     .option('charset', 'shift-jis') \
     .option('delimiter', '\t') \
     .load('sample-sjis.txt')

df = df.withColumnRenamed('_c0', 'title')
df.show()

# 書き出してみる
df.select('title') \
  .coalesce(1) \
  .write \
  .format('com.databricks.spark.csv') \
  .option("overwrite", "true") \
  .save('sample-utf8.csv')

sc.stop()

Neo4jRBでグラフDBをRailsから使う

Neo4j単体でも面白いのだけど、結果を現実世界に出すにはWEB画面を作るのが楽。というわけでNeo4jRBに触れてみる。とりあえず環境構築のみ。

環境構築

  • とりあえずDebianで環境構築をやってみる

neo4jの準備

各環境にあったパッケージリポジトリを使用しましょう。
Neo4j Debian Packages

書いてあるとおりやればサービスとしてneo4jが入ります

// リポジトリの追加
$ wget -O - https://debian.neo4j.org/neotechnology.gpg.key | sudo apt-key add -
$ echo 'deb https://debian.neo4j.org/repo stable/' | sudo tee /etc/apt/sources.list.d/neo4j.list
$ sudo apt-get update

// インストール
$ sudo apt-get install neo4j

一回DBを起動させておいてパスワード変更しといてください

$ sudo systemctl start neo4j

この後 http://localhost:7474 へ行って、id: neo4j, pass:neo4jでログイン、初回パスをneo4jから違うものに変更しておく。

Rubyの準備

とりあえずrbenvを使ってRuby 2.4に変えておけばいいんじゃないかと

Railsの準備

Rails 5.2.1を使った。

いざモックアップ作成

とりあえず公式ドキュメントの通りコマンドを叩く、
Setup — Neo4j.rb 7.1.4 documentation

だいたい以下のような感じで進めた。

  • 初回はrbenvに入ったrailsコマンドを使いたかったので以下のようになっている
$ rbenv exec bundle install rails
$ rbenv exec gem rails new myapp -m http://neo4jrb.io/neo4j/neo4j.rb -O
$ cd myapp
$ bundle install

Railsは基本的なCRUD処理はscaffoldで作ってくれる。以下は例。

$ rails generate scaffold User name:string email:string
$ rails s

とりあえずこれでNeo4Jのノードとちょっとしたアトリビュートを画面から追加できる。
リレーションなどは別APIを作ってみるか?

Neo4jでJOINクエリ

Neo4jでJOINクエリ

課題・やりたいこと

課題

やってること

レポーティング, OLAP

  • 関係するテーブルのデータの有無で条件分岐を作成する → 開発当初から条件分岐が多すぎてテストが辛い
  • 多少はSQLに対する慣れにより解消したが、しばらくソースを見ないと忘れるし追加があると追加自体が難しい
  • これはすでにSQLアンチパターンのスパゲッティクエリの状態である

なぜ、SQLでJOINしまくる羽目になるのか?

  • 正規化しているから、は1つの理由だが
  • そもそも論
    • つまり、関係データベースは名前に反して関係性を管理するのが苦手だ

https://image.slidesharecdn.com/rdbmstographsintro-151028191556-lva1-app6892/95/neo4jrdb-9-638.jpg?cb=1446059865

from グラフデータベース:Neo4j、そしてRDBからの移行手順の紹介 | PPT

やりたいこと

クエリを減らしてメンテナンス性をよくする、こんな風に

from How to do Joins in Apache Cassandra and DSE | DataStax

実践

まずはSQLでいうところのJOINクエリをやっていく、やること

  • データ設計の見直し
  • Neo4jはグラフDB操作用にCypher (Cypher (query language) - Wikipedia)というクエリ言語をもっている、基本はそいつを使ってクエリを実行する ** あとはSQLとの橋渡しをこの辺を見ながらやってみる

curiousraj.com

概要と手順

データ設計の見直し

だいたいは、ここの通りに実行 グラフデータベース:Neo4j、そしてRDBからの移行手順の紹介 | PPT

手順

  1. ノード間の関係を定義する(例)従業員 =(販売)=> 受注
  2. 外部キーを見つける
  3. (外部キー)-[:変化]->(関係)
  4. 外部キーを取り除く
  5. ジョインテーブルを見つける
  6. ジョインテーブルがそのまま関係データになる
  7. 属性付きのジョインテーブルは、プロパティ付きの関係データに変換
  8. グラフ化完了

使ったクエリ

プラットフォームの選択

Windowsの場合WSLを使用してDebianでインストールしたほうがいい気がする Neo4j Debian Packages

  • neo4j-shell
    • /etc/neo4j/neo4j.conf のコメントを外しておく
# Enable a remote shell server which Neo4j Shell clients can log in to.
dbms.shell.enabled=true
# The network interface IP the shell will listen on (use 0.0.0.0 for all interfaces).
dbms.shell.host=127.0.0.1
# The port the shell will listen on, default is 1337.
dbms.shell.port=1337

インストールした後はserviceコマンドで起動できる。

$ sudo service neo4j start
Neo4jのコンフィグまわり
  • デフォルトではCSVファイルインポートができないので、Settingsから以下の設定を有効化

これをやらないとクエリを投げてもLOADできない。

# Determines if Cypher will allow using file URLs when loading data using
# `LOAD CSV`. Setting this value to `false` will cause Neo4j to fail `LOAD CSV`
# clauses that load data from the file system.
dbms.security.allow_csv_import_from_file_urls=true
WSLの環境へヘッダありCSVを読み込む例

まず一度WSLのLinux環境へファイルを送る

$ sudo cp /mnt/c/tmp/xxx.csv /var/lib/neo4j/import/

その後LOADクエリを実行する

> LOAD CSV WITH HEADERS
FROM 'file:///xxx.csv' AS line
CREATE (:X { head1: line.head1 });

なんか読み込みめっちゃ早い

実際にRDBのデータをGDBに移行
  • テーブルのCSVデータを登録してインデックスを張る
LOAD CSV WITH HEADERS
FROM 'file:///table-A.csv' AS line
CREATE (:TableA { f_key: line.f_key
});
CREATE INDEX ON :TableA(f_key);

LOAD CSV WITH HEADERS
FROM 'file:///table-B.csv' AS line
CREATE (:TableB { f_key: line.f_key
});
CREATE INDEX ON :TableB(f_key);
  • 関係テーブルから関係性を取り出して設定する
LOAD CSV WITH HEADERS
FROM 'file:///relation.csv' AS line
WITH line.relation_from AS from_key
         , line.relation_to AS to_key

MERGE (from:TableA { f_key: from_key })
MERGE (to:TableB { f_key: to_key })
MERGE (from)-[r:RELATE]->(to);
  • さっそくJOINみたいなことをやる
-- マッチしたものだけ返す
MATCH (a:TableA)-[r:RELATE]->(b:TableB) return a,b;

-- マッチしなかったものも全件返す
OPTIONAL MATCH (a:TableA)-[r:RELATE]->(b:TableB) return a,b;

はまりポイント・イケてる点など

はまりポイント

  • CSVインポートが結構めんどくさい
  • relationを作成するときはインデックスを作成しないと遅い(処理が終わらない)

イケてる点

  • 意外に早い
  • RDBで関係テーブルを作成する際は関係の意味を複数持たせにくいが、Neo4jならば自由に持たせられる

懸念

  • 関係性がテーブルみたいにしっかりと残らないので、関係性の管理とかがちょいわかりにくい?