Go言語でJSONファイルをMongoDBへ挿入!並行処理でデータ挿入を効率化
本記事で実践すること
本記事の流れ
- MongoDBとの接続を複数のgoroutine内で確立
- 対象ディレクトリ内のJSONファイルを走査し開く
- 開いたJSONファイルの中身をMongoDBのコレクションへ挿入
今回使用したMongoDBのドライバはMongoDB公式でメンテナンスされているmongo-go-driver
です。数年前まではmgo
というドライバが人気のようでしたが、2020年3月現在、メンテナンスされていないので注意しましょう。
事前準備
まずは、mongo-db-driver
をインストールしましょう。
$GOPATH
配下へインストールするか、Go 1.11以降に導入されたGo Modulesという仕組みを使って、プロジェクト毎のライブラリバージョン管理をするか選択してください。
# GOPATH配下へインストール go get go.mongodb.org/mongo-driver/mongo # Go Modules export GO111MODULE=on go mod init github.com/$user_name/$repository_name go get go.mongodb.org/mongo-driver/mongo
実装
それでは実際のコードを見ていきましょう。
1. MongoDBとの接続クライアント実装
package main import ( "context" "fmt" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( // 接続先のDB情報を入力 mongoDBHost = "" mongoDBPort = "" mongoUser = "" mongoPassword = "" dbname = "" colname = "" ) // mongo-driverのクライアントを自前で定義した構造体DBへセット type DB struct { client *mongo.Client } // 実際にMongoDBへ接続するクライアントを内包したDBを返却 func ConnectMongoDB() (*DB, error) { ctx := context.Background() // 認証が必要な場合は、options.Credentialを作成 credential := options.Credential{ AuthSource: dbname, Username: mongoUser, Password: mongoPassword, } // 認証情報・接続情報を元にclientを作成 client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://"+mongoDBHost+":"+mongoDBPort).SetAuth(credential)) if err != nil { fmt.Println(err) return nil, err } return &DB{client}, nil } // DB構造体へInsert用のメソッドを定義 // JSONファイルから読み込んだバイトスライスを渡し、MongoDBへInsert func (db *DB) InsertMongoDB(json []byte) { // 3秒でタイムアウトするコンテキストを作成 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() bsonMap := bson.M{} // JSONのバイトスライスをMongoDBのドキュメント型であるbsonへマップ err := bson.UnmarshalExtJSON([]byte(json), false, &bsonMap) if err != nil { fmt.Println(err) return } // Insert先のコレクション名からクライアント作成 collection := db.client.Database(dbname).Collection(colname) _, err = collection.InsertOne(ctx, bsonMap) if err != nil { fmt.Println(err) return } }
2. 複数のgoroutine作成
package main import ( "context" "fmt" "io/ioutil" "json2mongo/database" "os" "path/filepath" "strings" "sync" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( mongoDBHost = "" mongoDBPort = "" mongoUser = "" mongoPassword = "" dbname = "" colname = "" // JSONファイルを操作する大元のディレクトリパスを定義 jsonRootDir = "" ) // ... 1. で定義した部分は省略 // 後ほど定義 // func readJSONFile(path string) []byte {} // func insert2db(wg *sync.WaitGroup, c chan string, concurrentID int) {} func main() { start := time.Now() // 10のバッファを持つ文字列型用のチャネルを作成 jsonPath := make(chan string, 10) // 非同期処理の待ち合わせ用にsync.WaitGroupを作成 var wg sync.WaitGroup // 5個のgoroutineを起動しておく for i := 0; i < 5; i++ { wg.Add(1) // 関数insert2dbは後ほど作成 go insert2db(&wg, jsonPath, i) } }
3. ディレクトリ内のファイル走査をし、ファイルパスをgoroutineへ渡す
func main() { start := time.Now() jsonPath := make(chan string, 10) var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go insert2db(&wg, jsonPath, i) } // ここまでは上で記述済 // filepath.WalkメソッドでjsonRootDir内の全てのファイルを走査 err := filepath.Walk(jsonRootDir, func(path string, info os.FileInfo, err error) error { if err != nil { fmt.Println(err) return err } // ディレクトリの場合は何もしない if info.IsDir() { return nil } // goroutineへ渡したjsonPathチャネルへファイルパスを送信 jsonPath <- path return nil }) // チャネルを閉鎖 close(jsonPath) // WaitGroupが終了するまで待機 wg.Wait() if err != nil { fmt.Println(err) os.Exit(1) } end := time.Now() // 経過時間を標準出力へ出力 fmt.Printf("%f [Sec]\n", (end.Sub(start)).Seconds()) }
4. goroutine内で呼び出すJSONファイル読み出し関数の定義
// パスからJSONファイルを開き中身を返す関数 func readJSONFile(path string) []byte { var json []byte json, err := ioutil.ReadFile(path) if err != nil { fmt.Println(err) return json } return json } // MongoDBへインサートする関数 func insert2db(wg *sync.WaitGroup, c chan string, concurrentID int) { // 1. で定義したMongoDBクライアント作成関数から構造体を取得 mongoClient, err := ConnectMongoDB() if err != nil { fmt.Println(err) os.Exit(1) } // チャネルが閉じられ、forループが終わると関数が終了とともに呼び出される // main()関数でwg.Wait()されているので、このgoroutineでの処理が完了したことを伝達 defer wg.Done() for path := range c { s := time.Now() json := readJSONFile(path) e := time.Now() // ファイルサイズが大きいと開くのに1sかかることがあるため、念のため確認 if (e.Sub(s)).Seconds() > 1 { fmt.Printf("%f [Sec], ID: %d, Path: %s\n", (e.Sub(s)).Seconds(), concurrentID, path) } // MongoDBへJSONデータをインサート mongoClient.InsertMongoDB(json) } }
コンパイル
# linuxで実行する場合 env GOOS=linux GOARCH=amd64 go build -o ${binary_file_name} # Macで事項する場合 go build -o ${binary_file_name}
実行
# ${binary_file_name} がjson2mongoの場合 # ファイル件数によっては時間がかかるのでバックグランド実行しておくと良い ./json2mongo > json2mongo.log &
おわりに
実際には、ファイルサイズが多様な数百万件のJSONファイルをMongoDBへ挿入する試験を行いました。
goroutine数は10でチャネルのバッファも10で実行したところ、ファイルサイズの小さいものは10並行で効率良く捌けていました。CPU使用率も100%近く出ており、I/O待ちの発生もほとんどなかったようです。
しかし一旦、数MBのファイルを扱い始めると、I/Oバウンドの処理となりI/O待ちプロセスが多くなってしまいました。Diskの性能があまり良くないサーバで試験を行ったため、まあ納得せざるを得ないでしょう。
vmstat
などのコマンドで実行時のリソースの使用状況を確認しつつ、並行数などを調整してチューニングを行うといいかもしれません。