A pool of workers to process requests

I'm new to Go and looking for a way to handle 3000 queries using 100 workers and providing a connection for each worker (MySQL is already configured with over 100 connections). This is my attempt:

package main

import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)

var query *sql.Stmt

func worker(jobs <-chan int, results chan<- int) {

    for _ = range jobs {

        _, e := query.Exec("a")

        if e != nil {

            panic(e.Error())
        }

        results <- 1
    }
}

func main() {

    workers := 100

    db, e := sql.Open("mysql", "foo:foo@/foo")

    if e != nil {

        panic(e.Error())
    }

    db.SetMaxOpenConns(workers)
    db.SetMaxIdleConns(workers)

    defer db.Close()

    query, e = db.Prepare("INSERT INTO foo (foo) values(?)")

    if e != nil {

        panic(e.Error())
    }

    total := 30000
    jobs := make(chan int, total)
    results := make(chan int, total)

    for w := 0; w < workers; w++ {

        go worker(jobs, results)
    }

    for j := 0; j < total; j++ {

        jobs <- j
    }

    close(jobs)

    for r := 0; r < total; r++ {

        <-results
    }
}

      

It works, but I'm not sure if this is the best way to do it.

Please, if you think this opinion is based or not a good question, just flag it closed and leave a comment explaining why.

+3


source to share


1 answer


That you're basically working, but to get rid of the buffering, you need to write to jobs

and read from results

at the same time. Otherwise, your process ends: workers cannot submit results because they are not receiving anything, and you cannot insert jobs because workers are locked.

Here's a playground example on how to make a work queue that pushes jobs in the background while getting results into main

:

package main

import "fmt"

func worker(jobs <-chan int, results chan<- int) {
    for _ = range jobs {
        // ...do work here...
        results <- 1
    }
}

func main() {
    workers := 10
    total := 30
    jobs := make(chan int)
    results := make(chan int)

    // start workers
    for w := 0; w < workers; w++ {
        go worker(jobs, results)
    }

    // insert jobs in background
    go func() {
        for j := 0; j < total; j++ {
            jobs <- j
        }
    }()

    // collect results
    for i := 0; i < total; i++ {
        <-results
        fmt.Printf(".")
    }

    close(jobs)
}

      



For this code to work, you need to know how many results you will get. If you don't know this (say, each job may produce zero or more results), you can use sync.WaitGroup

to wait for the job to complete, then close the result stream :

package main

import (
    "fmt"
    "sync"
)

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    for _ = range jobs {
        // ...do work here...
        results <- 1
    }
    wg.Done()
}

func main() {
    workers := 10
    total := 30
    jobs := make(chan int)
    results := make(chan int)
    wg := &sync.WaitGroup{}

    // start workers
    for w := 0; w < workers; w++ {
        wg.Add(1)
        go worker(jobs, results, wg)
    }

    // insert jobs in background
    go func() {
        for j := 0; j < total; j++ {
            jobs <- j
        }
        close(jobs)
        wg.Wait()
        // all workers are done so no more results
        close(results)
    }()

    // collect results
    for _ = range results {
        fmt.Printf(".")
    }

}

      

There are many other more sophisticated tricks you can do to stop all workers after an error occurs, put the results in the same order as the original assignments, or do similar things. Sounds like the main version works here.

+1


source







All Articles