How to limit the number of parallel threads

The VIDEO_URL

download thousands of videos. I want to use threads to complete the job, but limit to ten threads at a time. How can I rewrite the following code to get it?

VIDEO_URL.each do | video |
  @workers << Thread.new{dl_video(video)}
end
@workers.each { |t| t.join }

      

Update

Powder flow does not seem to be blocked after worker threads are more than 10, is the I / O block for creating the thread pool invalid?

If I upload video without threadpool it works well.

But if I load the video using threadpool, the video won't load, the main thread should be blocked when there are 10 workers, but this is not the case. (Each video must have at least 1 minute to download)

MAX_WORKERS = 10
@pool = Thread.pool(MAX_WORKERS)

def dl_video(video)
  File.open(video["title"], "wb") do |saved_file|
    @pool.process{
      saved_file.write open(video["link"], :allow_redirections => :safe).read
      # saved_file.write(HTTParty.get(video["link"]).parsed_response)
    }
  end
end

      

+3


source to share


4 answers


What you are trying to implement is a commonly used pattern and is called a thread pool.

I haven't tried it, but maybe a threadpool gem or something similar is worth looking at:



require "threadpool"

pool = ThreadPool.new(10)
VIDEO_URL.each{|video| pool.process{dl_video(video)}}

      

+2


source


What you want is called a thread pool. There is a Ruby threading extension that includes this functionality.

Untested snippet, directly adapted from the example libraries:



require 'thread/pool'

# Create thread pool with up to 10 simultaneous running threads 
pool = Thread.pool(10)

VIDEO_URL.each do | video |
  # Add each download task the the thread pool
  pool.process do 
    dl_video(video)
  end
end

# Block and wait for the thread pool to run out of tasks
pool.shutdown

      

+2


source


A simple solution (without bringing in any new gems) would be to initiate 10 threads which pop

and process the first url in your array.

[].tap do |threads|
  urls = VIDEO_URLS.clone
  semaphore = Mutex.new
  number_of_threads = 10

  number_of_threads.times do
    threads << Thread.new do
      until urls.empty?        
        url = semaphore.synchronize { urls.pop }
        download_video(url)
      end
    end
  end
end.each(&:join)

      

Another solution would be to split your array into different chunks (10 or less); you can do it in different ways. Subsequently, each thread can process each slice. The code might be longer, but you'd get rid of Mutex

it if you wanted.

[].tap do |threads|
  slices # split VIDEO_URLS into required slices. leave this up to you.
  slices.each do |urls|
    threads << Thread.new do
      urls.each { |url| download_video(url) }
    end
  end
end.each(&:join)

      

+1


source


You can use each_slice .

VIDEO_URL.each_slice(10) do | batch |
    batch.each do |video|
        @workers << Thread.new{dl_video(video)}
    end
    @workers.each { |t| t.join }
    @workers = []
end

      

0


source







All Articles