How to decouple array processing from goroutines?

I have a chunk of 30,000 lines. How do I split this chunk down to, say, 10 goroutines that will take 3000 rows from a slice, extract some data from it, and click on a new slice?

So in the end I will have 10 slices with 3000 processed results each. What is the template for solving this problem?

I have looked through this article but am not sure which of these patterns apply to my case.

+3


source to share


2 answers


Using a channel, read items from a slice, use Fan out to balance loads and transmit messages. Then process the lines in goroutines and collect the results back (fan) in a single rope to avoid mutual interference.

You might want to set the number of concurrent concurrent goroutines Max.

Be aware that slices are not thread safe when writing to them.



Useful information:

https://blog.golang.org/pipelines https://talks.golang.org/2012/concurrency.slide#1 https://blog.golang.org/advanced-go-concurrency-patterns https: // talks .golang.org / 2013 / advconc.slide # 1

+2


source


I agree with @JimB on limiting go procedures. However, since this is your request, I would probably do something like this. If you really want each gorountine to do 3000 items then it would be easier to create a 2d slice. [[3000 points], [3000 elements], ..], then in this 2d-array there is 1 normal process turn per index. Otherwise below is just limited to gorountines to 10. METHOD 1   main package

import (
    "crypto/rand"
    "fmt"
    "log"
    "sync"
    "time"
)

var s []string

// genetate some mock data
func init() {
    s = make([]string, 30000)
    n := 5
    for i := 0; i < 30000; i++ {
        b := make([]byte, n)
        if _, err := rand.Read(b); err != nil {
            panic(err)
        }
        s[i] = fmt.Sprintf("%X", b)
    }
}

func main() {
    // set the number of workers
    ch := make(chan string)
    var mut sync.Mutex
    counter := 0

    // limit the number of gorountines to 10
    for w := 0; w < 10; w++ {
        go func(ch chan string, mut *sync.Mutex) {
            for {
                // get and update counter using mux to stop race condtions
                mut.Lock()
                i := counter
                counter++
                mut.Unlock()
                // break the loop
                if counter > len(s) {
                    return
                }
                // get string
                myString := s[i]
                // to some work then pass to channel
                ch <- myString

            }
        }(ch, &mut)
    }
    // adding time.  If you play wiht the number of gorountines youll see how changing the number above can efficiency 
    t := time.Now()
    for i := 0; i < len(s); i++ {
        result := <-ch
        log.Println(time.Since(t), result, i)
    }
}

      



The METHOD2 init function creates a 2d array split into 10 arrays, each containing 3000 elements. If you are parsing your data this way, then the logic below needs very little modification to work

package main

import (
    "crypto/rand"
    "fmt"
    "log"
    "sync"
)

var chunkedSlice [10][3000]string

// genetate some mock data
// 2d array, each chunk has 3000 items in it
// there are 10 chunks, 1 go rountine per chunk
func init() {
    n := 5
    for i := 0; i < 10; i++ {
        for j := 0; j < 3000; j++ {
            b := make([]byte, n)
            if _, err := rand.Read(b); err != nil {
                panic(err)
            }
            chunkedSlice[i][j] = fmt.Sprintf("%X", b)
        }
    }
}

func main() {
    // channel to send parsed data to

    ch := make(chan string)
    var wg sync.WaitGroup

    // 10 chunks
    for _, chunk := range chunkedSlice {
        wg.Add(1)
        // if defining the 2d array e.g [10][3000]string, you need to pass it as a pointer to avoid stack error
        go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) {
            defer wg.Done()
            for i := 0; i < len(chunk); i++ {
                str := chunk[i]
                // fmt.Println(str)
                // parse the data (emulating)
                parsedData := str
                // send parsed data to the channel
                ch <- parsedData
            }
        }(ch, &wg, &chunk)
    }
    // wait for all the routines to finish and close the channel
    go func() {
        wg.Wait()
        close(ch)
    }()

    var counter int // adding to check that the right number of items was parsed
    // get the data from the channel
    for res := range ch {
        log.Println(res, counter)
        counter++
    }
}

      

0


source







All Articles