Moving data from multiple sources into multiple databases is one of the data engineer responsibilities. It is always recommended to use simple and direct approach first, especially when the data is small enough, like iterating data one by one and inserting it at the same time. However this approach will not going to work well on larger sized data or higher throughput stream data.

In this blog post, we will demonstrate how to insert JSON text file into Google’s BigQuery Table using Go language and its various native libraries (channel, goroutine, waitgroup). Go is known for one of the best language to write high-performance programs due to its native libraries that make concurrent and parallel programming easier.

Preparation

For preparation, let’s create a text file containing 1 million rows of single JSON per row, using a python script provided below[1].

# Generate dataset
python3 gen-txt.py
# you will see files like this
# -rw-rw-r-- 1 pandu pandu  220 Mar  5 18:31 students-10.json.txt
# -rw-rw-r-- 1 pandu pandu 2.3K Mar  5 18:31 students-100.json.txt
# -rw-rw-r-- 1 pandu pandu  24K Mar  5 18:31 students-1000.json.txt
# -rw-rw-r-- 1 pandu pandu 244K Mar  5 18:31 students-10000.json.txt
# -rw-rw-r-- 1 pandu pandu 2.5M Mar  5 18:31 students-100000.json.txt
# -rw-rw-r-- 1 pandu pandu  26M Mar  5 18:31 students-1000000.json.txt

We will get text files like this. In this case, each row contains 22 characters = 22 Bytes.

{"id":1,"name":"aaa"}
{"id":2,"name":"bbb"}
...

For BigQuery, make sure we have GCP key JSON file that has access to modify BigQuery table and don’t forget to export it to GOOGLE_APPLICATION_CREDENTIALS env var. After that, we all set.

Part One: Simple Approach

We will do three approaches to do insertion. But first, let’s use simplest approach. Read text file one by one and insert it to a BigQuery table one at a time.

In this diagram, there are three main components: 1) File Reader, 2) Channel c1, and 3) Worker. File Reader is for reading text file and sending lines to channel c1. Channel c1 is for sharing string data. Worker is responsible for receiving string data from channel c1 and inserting it into a BigQuery table.

Channel is a native type in Go that enable us to send/receive values and share it across coroutine (goroutine). Let’s define Channel c1 first.

c1 = make(chan string)

Next, define File Reader as an anonymous function and immediately run it as a goroutine. Using go keyword in front of function call can enable it to run asynchronously/non-blocking to main function (goroutine).

// Read file line by line and send it to channel
log.Info("Reading file")
go func(filepath string, chanStr chan<- string) {
    file, _ := os.Open(filepath)
    defer file.Close()
    scanner := bufio.NewScanner(file)
    counter := 0
    for scanner.Scan() {
        text := scanner.Text()
        chanStr <- text
        counter++
    }
    close(chanStr)
    log.Info("Finished reading ", counter, " rows")
}(FILEPATH, c1)

Next, define Worker outside main function. What this function does is to continuously receiving string data from a channel, parse it, and insert it to BQ Table. When the channel is closed, it will stop.

func deployWorker(ch <-chan string, project, dataset, table string, wg *sync.WaitGroup) {
    // Create BQ client and context
    client, ctx := getBQClient(project)
    // Looping to receive data from channel
    for {
        strJSON, more := <-ch
        if !more {
            break
        } else {
            user := parseUserFromJSONStr(strJSON)
            users := []*User{&user}
            // Insert to BQ table
            bqErr := insertUsersToBQTable(ctx, client, dataset, table, users)
            if bqErr == nil {
                log.Info(fmt.Sprintf("Inserted %d rows - %s", len(users), strJSON))
            }
        }
    }
    client.Close()
    wg.Done()
}

Run it as a goroutine.

var wg sync.WaitGroup
wg.Add(1)
go deployWorker(c1, PROJECT, DATASET, TABLE, &wg)
wg.Wait()
log.Info("Done in ", time.Since(start).Seconds(), "seconds")

Don’t forget that goroutines are asynchronous, the program will done be immediately. To prevent that, Go has native feature called WaitGroup. It enables goroutine to be done first and then proceed to the next step. It is one of very important Go native features, WaitGroup usually used for waiting multiple parallel goroutines.

This Go script is available at cmd/main1/main1.go[1].

Part Two: Multiple Rows Insertion

According to BigQuery Streaming Insert Documentation[2], multiple rows can be inserted in one API call. Let’s modify our program.

In this new architecture, we add two more components. First is Buffer and the second is channel c2. Buffer receives string data from c1, collect it temporarily into an array with length n, and then send it to c2 simultaneously. Let’s run it as a goroutine.

// Put string data to a buffer and send it to another channel
go func(chInput <-chan string, chOutput chan<- []string, n int) {
    rows := []string{}
    counter := 0
    for {
        row, more := <-chInput
        if !more {
            break
        }
        rows = append(rows, row)
        counter++
        if counter == n {
            chOutput <- rows
            rows = []string{}
            counter = 0
        }
    }
    close(chOutput)
}(c1, c2, BUFFER_LENGTH)

Channel c2 is a []string type channel. It only receive/send an array of string.

c1 = make(chan string)
c2 = make(chan []string)

Next, modify Worker to receive new type of channel c2.

func deployWorker(ch <-chan []string, project, dataset, table string, wg *sync.WaitGroup) {
    // Create BQ client and context
    client, ctx := getBQClient(project)
    // Looping to receive data from channel
    for {
        strJSONs, more := <-ch
        if !more {
            break
        } else {
            // Parser also modified
            users := []*User{}
            for _, strJSON := range strJSONs {
                user := parseUserFromJSONStr(strJSON)
                users = append(users, &user)
            }
            // Insert to BQ table
            bqErr := insertUsersToBQTable(ctx, client, dataset, table, users)
            if bqErr == nil {
                log.Info(fmt.Sprintf("Inserted %d rows - %s", len(users), strJSONs))
            }
        }
    }
    client.Close()
    wg.Done()
}

Finally the second approach is done. Using right number of n, it can improve insertion speed significantly. This Go script is available at cmd/main2/main2.go[1].

Part Three: Multiple Workers and Multiple Rows Insertion

For the third and final approach, we will take advantage of Go’s goroutine. Go can spawn multiple goroutines in large number and significantly faster than other coroutine in other programming languages.

In this approach, we will simply deploy more workers using for loop. Let’s modify the main function a bit.

var wg sync.WaitGroup
for idx := 0; idx < WORKER_NUMBER; idx++ {
    wg.Add(1)
    go deployWorker(c2, PROJECT, DATASET, TABLE, &wg)
}
wg.Wait()
log.Info("Done in ", time.Since(start).Seconds(), "seconds")

In addition to n, another variable w is introduced. It determines how many workers should be deployed, and thus make insertion even faster. This Go script is available at cmd/main3/main3.go[1].

Benchmark

For deciding max number of buffer and workers, we need to read BigQuery Streaming API quotas and limits[3].

Maximum rows per request: 10,000 rows per request, maximum of 500 rows is recommended
Concurrent API requests, per user: 300
API requests per second, per user — 100

Benchmark was taken using same type of machine, n1-standard-1 (1 vCPU, 3.75 GB mem)[4]. Using multiple JSON text files generated at different sizes, we benchmark three approaches and measure time taken.

File Parameter 1.000 rows 10.000 100.000 1.000.000
main.go w1 - n1 312.164s 3242.766s n/a n/a
main2.go w1 - n100 4.599s 35.735s 381.251s 3738.669s
main3.go w4 - n100 1.453s 9.137s 95.666s 939.175s
main3.go w16 - n100 0.808s 2.938s 24.754s 224.630s
main3.go w64 - n100 0.848s 1.643s 11.667s 62.624s
main3.go w300 - n500 0.934s 1.296s 4.081s 14.787s

As we can see, higher number of w and n make insertion faster. Using our highest configuration enable us to insert one million rows in a mere 14 seconds!

Conclusion

In this blog post we learned to use concurrency/parallelism concept to make our programs do the job faster. We also explored Go language and its native libraries (channel, goroutines, waitgroup) to develop a high-performance program.

One thing to remember is that everything came with a tradeoff. We also need to watch out for memory and/or API calls limit when using large number of goroutines.

Also checkout my open source project ps2bq. It is a tool to insert GCP PubSub messages into BigQuery table. It was also developed using Go and concurrency/parallelism concept.

Reference

Leave a comment