A pool of workers to process requests
I'm new to Go and looking for a way to handle 3000 queries using 100 workers and providing a connection for each worker (MySQL is already configured with over 100 connections). This is my attempt:
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var query *sql.Stmt
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
_, e := query.Exec("a")
if e != nil {
panic(e.Error())
}
results <- 1
}
}
func main() {
workers := 100
db, e := sql.Open("mysql", "foo:foo@/foo")
if e != nil {
panic(e.Error())
}
db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)
defer db.Close()
query, e = db.Prepare("INSERT INTO foo (foo) values(?)")
if e != nil {
panic(e.Error())
}
total := 30000
jobs := make(chan int, total)
results := make(chan int, total)
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
for r := 0; r < total; r++ {
<-results
}
}
It works, but I'm not sure if this is the best way to do it.
Please, if you think this opinion is based or not a good question, just flag it closed and leave a comment explaining why.
source to share
That you're basically working, but to get rid of the buffering, you need to write to jobs
and read from results
at the same time. Otherwise, your process ends: workers cannot submit results because they are not receiving anything, and you cannot insert jobs because workers are locked.
Here's a playground example on how to make a work queue that pushes jobs in the background while getting results into main
:
package main
import "fmt"
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
// ...do work here...
results <- 1
}
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
// start workers
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
}()
// collect results
for i := 0; i < total; i++ {
<-results
fmt.Printf(".")
}
close(jobs)
}
For this code to work, you need to know how many results you will get. If you don't know this (say, each job may produce zero or more results), you can use sync.WaitGroup
to wait for the job to complete, then close the result stream :
package main
import (
"fmt"
"sync"
)
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...do work here...
results <- 1
}
wg.Done()
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &sync.WaitGroup{}
// start workers
for w := 0; w < workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
// all workers are done so no more results
close(results)
}()
// collect results
for _ = range results {
fmt.Printf(".")
}
}
There are many other more sophisticated tricks you can do to stop all workers after an error occurs, put the results in the same order as the original assignments, or do similar things. Sounds like the main version works here.
source to share