Go言語でJSONファイルをMongoDBへ挿入!並行処理でデータ挿入を効率化

最終更新日

本記事で実践すること

本記事の流れ

  1. MongoDBとの接続を複数のgoroutine内で確立
  2. 対象ディレクトリ内のJSONファイルを走査し開く
  3. 開いたJSONファイルの中身をMongoDBのコレクションへ挿入

今回使用したMongoDBのドライバはMongoDB公式でメンテナンスされているmongo-go-driverです。数年前まではmgoというドライバが人気のようでしたが、2020年3月現在、メンテナンスされていないので注意しましょう。

事前準備

まずは、mongo-db-driverをインストールしましょう。

$GOPATH配下へインストールするか、Go 1.11以降に導入されたGo Modulesという仕組みを使って、プロジェクト毎のライブラリバージョン管理をするか選択してください。

# GOPATH配下へインストール
go get go.mongodb.org/mongo-driver/mongo
# Go Modules
export GO111MODULE=on
go mod init github.com/$user_name/$repository_name
go get go.mongodb.org/mongo-driver/mongo

実装

それでは実際のコードを見ていきましょう。

1. MongoDBとの接続クライアント実装

package main
import (
	"context"
	"fmt"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)
const (
	// 接続先のDB情報を入力
	mongoDBHost   = ""
	mongoDBPort   = ""
	mongoUser     = ""
	mongoPassword = ""
	dbname        = ""
	colname       = ""
)
// mongo-driverのクライアントを自前で定義した構造体DBへセット
type DB struct {
	client *mongo.Client
}
// 実際にMongoDBへ接続するクライアントを内包したDBを返却
func ConnectMongoDB() (*DB, error) {
	ctx := context.Background()
	// 認証が必要な場合は、options.Credentialを作成
	credential := options.Credential{
		AuthSource: dbname,
		Username:   mongoUser,
		Password:   mongoPassword,
	}
	// 認証情報・接続情報を元にclientを作成
	client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://"+mongoDBHost+":"+mongoDBPort).SetAuth(credential))
	if err != nil {
		fmt.Println(err)
		return nil, err
	}
	return &DB{client}, nil
}
// DB構造体へInsert用のメソッドを定義
// JSONファイルから読み込んだバイトスライスを渡し、MongoDBへInsert
func (db *DB) InsertMongoDB(json []byte) {
	// 3秒でタイムアウトするコンテキストを作成
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	bsonMap := bson.M{}
	// JSONのバイトスライスをMongoDBのドキュメント型であるbsonへマップ
	err := bson.UnmarshalExtJSON([]byte(json), false, &bsonMap)
	if err != nil {
		fmt.Println(err)
		return
	}
	// Insert先のコレクション名からクライアント作成
	collection := db.client.Database(dbname).Collection(colname)
	_, err = collection.InsertOne(ctx, bsonMap)
	if err != nil {
		fmt.Println(err)
		return
	}
}

2. 複数のgoroutine作成

package main
import (
	"context"
	"fmt"
	"io/ioutil"
	"json2mongo/database"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)
const (
	mongoDBHost   = ""
	mongoDBPort   = ""
	mongoUser     = ""
	mongoPassword = ""
	dbname        = ""
	colname       = ""
	// JSONファイルを操作する大元のディレクトリパスを定義
	jsonRootDir   = ""
)
// ... 1. で定義した部分は省略
// 後ほど定義
// func readJSONFile(path string) []byte {}
// func insert2db(wg *sync.WaitGroup, c chan string, concurrentID int) {}
func main() {
	start := time.Now()
	// 10のバッファを持つ文字列型用のチャネルを作成
 	jsonPath := make(chan string, 10)
	// 非同期処理の待ち合わせ用にsync.WaitGroupを作成
	var wg sync.WaitGroup
	// 5個のgoroutineを起動しておく
	for i := 0; i < 5; i++ {
		wg.Add(1)
		// 関数insert2dbは後ほど作成
		go insert2db(&wg, jsonPath, i)
	}
}

3. ディレクトリ内のファイル走査をし、ファイルパスをgoroutineへ渡す

func main() {
	start := time.Now()
	jsonPath := make(chan string, 10)
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go insert2db(&wg, jsonPath, i)
	}
	// ここまでは上で記述済
	// filepath.WalkメソッドでjsonRootDir内の全てのファイルを走査
	err := filepath.Walk(jsonRootDir, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			fmt.Println(err)
			return err
		}
		// ディレクトリの場合は何もしない
		if info.IsDir() {
			return nil
		}
		// goroutineへ渡したjsonPathチャネルへファイルパスを送信
		jsonPath <- path
		return nil
	})
	// チャネルを閉鎖
	close(jsonPath)
	// WaitGroupが終了するまで待機
	wg.Wait()
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	end := time.Now()
	// 経過時間を標準出力へ出力
	fmt.Printf("%f [Sec]\n", (end.Sub(start)).Seconds())
}

4. goroutine内で呼び出すJSONファイル読み出し関数の定義

// パスからJSONファイルを開き中身を返す関数
func readJSONFile(path string) []byte {
	var json []byte
	json, err := ioutil.ReadFile(path)
 	 if err != nil {
  	 	fmt.Println(err)
		return json
	}
	return json
}
// MongoDBへインサートする関数
func insert2db(wg *sync.WaitGroup, c chan string, concurrentID int) {
	// 1. で定義したMongoDBクライアント作成関数から構造体を取得
	mongoClient, err := ConnectMongoDB()
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	// チャネルが閉じられ、forループが終わると関数が終了とともに呼び出される
	// main()関数でwg.Wait()されているので、このgoroutineでの処理が完了したことを伝達
	defer wg.Done()
	for path := range c {
		s := time.Now()
		json := readJSONFile(path)
		e := time.Now()
		// ファイルサイズが大きいと開くのに1sかかることがあるため、念のため確認
		if (e.Sub(s)).Seconds() > 1 {
			fmt.Printf("%f [Sec], ID: %d, Path: %s\n", (e.Sub(s)).Seconds(), concurrentID, path)
		}
		// MongoDBへJSONデータをインサート
		mongoClient.InsertMongoDB(json)
	}
}

コンパイル

# linuxで実行する場合
env GOOS=linux GOARCH=amd64 go build -o ${binary_file_name}
# Macで事項する場合
go build -o ${binary_file_name}

実行

# ${binary_file_name} がjson2mongoの場合
# ファイル件数によっては時間がかかるのでバックグランド実行しておくと良い
./json2mongo > json2mongo.log &

おわりに

実際には、ファイルサイズが多様な数百万件のJSONファイルをMongoDBへ挿入する試験を行いました。

goroutine数は10でチャネルのバッファも10で実行したところ、ファイルサイズの小さいものは10並行で効率良く捌けていました。CPU使用率も100%近く出ており、I/O待ちの発生もほとんどなかったようです。

しかし一旦、数MBのファイルを扱い始めると、I/Oバウンドの処理となりI/O待ちプロセスが多くなってしまいました。Diskの性能があまり良くないサーバで試験を行ったため、まあ納得せざるを得ないでしょう。

vmstatなどのコマンドで実行時のリソースの使用状況を確認しつつ、並行数などを調整してチューニングを行うといいかもしれません。