How to decouple array processing from goroutines?
I have a chunk of 30,000 lines. How do I split this chunk down to, say, 10 goroutines that will take 3000 rows from a slice, extract some data from it, and click on a new slice?
So in the end I will have 10 slices with 3000 processed results each. What is the template for solving this problem?
I have looked through this article but am not sure which of these patterns apply to my case.
source to share
Using a channel, read items from a slice, use Fan out to balance loads and transmit messages. Then process the lines in goroutines and collect the results back (fan) in a single rope to avoid mutual interference.
You might want to set the number of concurrent concurrent goroutines Max.
Be aware that slices are not thread safe when writing to them.
Useful information:
https://blog.golang.org/pipelines https://talks.golang.org/2012/concurrency.slide#1 https://blog.golang.org/advanced-go-concurrency-patterns https: // talks .golang.org / 2013 / advconc.slide # 1
source to share
I agree with @JimB on limiting go procedures. However, since this is your request, I would probably do something like this. If you really want each gorountine to do 3000 items then it would be easier to create a 2d slice. [[3000 points], [3000 elements], ..], then in this 2d-array there is 1 normal process turn per index. Otherwise below is just limited to gorountines to 10. METHOD 1 main package
import (
"crypto/rand"
"fmt"
"log"
"sync"
"time"
)
var s []string
// genetate some mock data
func init() {
s = make([]string, 30000)
n := 5
for i := 0; i < 30000; i++ {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
panic(err)
}
s[i] = fmt.Sprintf("%X", b)
}
}
func main() {
// set the number of workers
ch := make(chan string)
var mut sync.Mutex
counter := 0
// limit the number of gorountines to 10
for w := 0; w < 10; w++ {
go func(ch chan string, mut *sync.Mutex) {
for {
// get and update counter using mux to stop race condtions
mut.Lock()
i := counter
counter++
mut.Unlock()
// break the loop
if counter > len(s) {
return
}
// get string
myString := s[i]
// to some work then pass to channel
ch <- myString
}
}(ch, &mut)
}
// adding time. If you play wiht the number of gorountines youll see how changing the number above can efficiency
t := time.Now()
for i := 0; i < len(s); i++ {
result := <-ch
log.Println(time.Since(t), result, i)
}
}
The METHOD2 init function creates a 2d array split into 10 arrays, each containing 3000 elements. If you are parsing your data this way, then the logic below needs very little modification to work
package main
import (
"crypto/rand"
"fmt"
"log"
"sync"
)
var chunkedSlice [10][3000]string
// genetate some mock data
// 2d array, each chunk has 3000 items in it
// there are 10 chunks, 1 go rountine per chunk
func init() {
n := 5
for i := 0; i < 10; i++ {
for j := 0; j < 3000; j++ {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
panic(err)
}
chunkedSlice[i][j] = fmt.Sprintf("%X", b)
}
}
}
func main() {
// channel to send parsed data to
ch := make(chan string)
var wg sync.WaitGroup
// 10 chunks
for _, chunk := range chunkedSlice {
wg.Add(1)
// if defining the 2d array e.g [10][3000]string, you need to pass it as a pointer to avoid stack error
go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) {
defer wg.Done()
for i := 0; i < len(chunk); i++ {
str := chunk[i]
// fmt.Println(str)
// parse the data (emulating)
parsedData := str
// send parsed data to the channel
ch <- parsedData
}
}(ch, &wg, &chunk)
}
// wait for all the routines to finish and close the channel
go func() {
wg.Wait()
close(ch)
}()
var counter int // adding to check that the right number of items was parsed
// get the data from the channel
for res := range ch {
log.Println(res, counter)
counter++
}
}
source to share