How to limit the number of parallel threads
The VIDEO_URL
download thousands of videos. I want to use threads to complete the job, but limit to ten threads at a time. How can I rewrite the following code to get it?
VIDEO_URL.each do | video |
@workers << Thread.new{dl_video(video)}
end
@workers.each { |t| t.join }
Update
Powder flow does not seem to be blocked after worker threads are more than 10, is the I / O block for creating the thread pool invalid?
If I upload video without threadpool it works well.
But if I load the video using threadpool, the video won't load, the main thread should be blocked when there are 10 workers, but this is not the case. (Each video must have at least 1 minute to download)
MAX_WORKERS = 10
@pool = Thread.pool(MAX_WORKERS)
def dl_video(video)
File.open(video["title"], "wb") do |saved_file|
@pool.process{
saved_file.write open(video["link"], :allow_redirections => :safe).read
# saved_file.write(HTTParty.get(video["link"]).parsed_response)
}
end
end
source to share
What you are trying to implement is a commonly used pattern and is called a thread pool.
I haven't tried it, but maybe a threadpool gem or something similar is worth looking at:
require "threadpool"
pool = ThreadPool.new(10)
VIDEO_URL.each{|video| pool.process{dl_video(video)}}
source to share
What you want is called a thread pool. There is a Ruby threading extension that includes this functionality.
Untested snippet, directly adapted from the example libraries:
require 'thread/pool'
# Create thread pool with up to 10 simultaneous running threads
pool = Thread.pool(10)
VIDEO_URL.each do | video |
# Add each download task the the thread pool
pool.process do
dl_video(video)
end
end
# Block and wait for the thread pool to run out of tasks
pool.shutdown
source to share
A simple solution (without bringing in any new gems) would be to initiate 10 threads which pop
and process the first url in your array.
[].tap do |threads|
urls = VIDEO_URLS.clone
semaphore = Mutex.new
number_of_threads = 10
number_of_threads.times do
threads << Thread.new do
until urls.empty?
url = semaphore.synchronize { urls.pop }
download_video(url)
end
end
end
end.each(&:join)
Another solution would be to split your array into different chunks (10 or less); you can do it in different ways. Subsequently, each thread can process each slice. The code might be longer, but you'd get rid of Mutex
it if you wanted.
[].tap do |threads|
slices # split VIDEO_URLS into required slices. leave this up to you.
slices.each do |urls|
threads << Thread.new do
urls.each { |url| download_video(url) }
end
end
end.each(&:join)
source to share
You can use each_slice .
VIDEO_URL.each_slice(10) do | batch |
batch.each do |video|
@workers << Thread.new{dl_video(video)}
end
@workers.each { |t| t.join }
@workers = []
end
source to share