Pearlescent streams

I'm new to perl (and programming too) and have been doing threads with threads for the last couple of weeks and so far I figured out that using them to do some similar parallel tasks is discouraged - memory consumption is out of control if your number of threads depends on some input values, and just limiting that number and doing some intermediate joins seems pretty silly. So I tried to trick the threads to give me some values ​​back through the queues, followed by removing those threads (and without attaching them) - here's an example with parallel ping:

#!/usr/bin/perl
#

use strict;
use warnings;
use threads;
use NetAddr::IP;
use Net::Ping;
use Thread::Queue;
use Thread::Semaphore;
########## get my IPs from CIDR-notation #############
my @ips;
for my $cidr (@ARGV) {
    my $n = NetAddr::IP->new($cidr);
    foreach ( @{ $n->hostenumref } ) {
        push @ips, ( split( '/', $_ ) )[0];
    }
}

my $ping      = Net::Ping->new("icmp");
my $pq        = Thread::Queue->new( @ips, undef );    # ping-worker-queue
my $rq        = Thread::Queue->new();                 # response queue
my $semaphore = Thread::Semaphore->new(100);          # I hoped this may be usefull to limit # of concurrent threads

while ( my $phost = $pq->dequeue() ) {
    $semaphore->down();
    threads->create( { 'stack_size' => 32 * 4096 }, \&ping_th, $phost );
}

sub ping_th {
    $rq->enqueue( $_[0] ) if $ping->ping( $_[0], 1 );
    $semaphore->up();
    threads->detach();
}

$rq->enqueue(undef);

while ( my $alive_ip = $rq->dequeue() ) {
    print $alive_ip, "\n";
}

      

I was unable to find a complete description of how thread-> detach () is supposed to work from a threaded subroutine and thought it might work ... and it does - if I do something in the main program (thread) that stretches it for life (sleep works well), so all separate threads end up and insert their part into my $ rq, otherwise it will start some threads, collect their results into a queue and exit with warnings like:

Perl exited with active threads:
    5 running and unjoined
    0 finished and unjoined
    0 running and detached

      

Forcing the main program to "sleep" for a while, again, seems silly - isn't there a way to make the threads their own business and detach ONLY after the actual call to thread-> detach ()? So far I am assuming that threads-> detach () inside the sub-application are applied as soon as the thread is created, and therefore it is not. I tried this with good ol 'CentOSs v5.10.1. Should this change with modern versions v5.16 or v5.18 (compiled by usethreads)?

+3


source to share


2 answers


Detaching a stream isn't particularly useful because you're effectively saying "I don't care when they get out."

This is usually not what you want - your process ends up with the thread still running.

In general - creating threads has an overhead because your processes are cloned in memory. You want to avoid this. Thread::Queue

also good to use because it is a thread safe way of passing information. In your code, you don't really need this for $pq

, because you are not actually threading where you are using it.



A semaphore is one way to do it, but I can suggest as an alternative:

#!/usr/bin/perl
use strict;
use warnings;
use Thread::Queue;

my $nthreads = 100;

my $ping_q = Thread::Queue -> new(); 
my $result_q = Thread::Queue -> new(); 

sub ping_host {
     my $pinger = Net::Ping->new("icmp");
     while ( my $hostname = $ping_q -> dequeue() ) {
         if ( $pinger -> ping ( $hostname, 1 ) ) { 
              $result_q -> enqueue ( $hostname ); 
         }
     }
}

#start the threads

for ( 1..$nthreads ) {
     threads -> create ( \&ping_host );
}

#queue the workload
$ping_q -> enqueue ( @ip_list );

#close the queue, so '$ping_q -> dequeue' returns undef, breaking the while loop. 

$ping_q -> end();

#wait for pingers to finish.
foreach my $thr ( threads -> list() ) {
   $thr -> join();
}
$results_q -> end();

#collate results
while ( my $successful_host = $results_q -> dequeue_nb() ) {
    print $successful_host, "\n"; 
}

      

This way you create streams in the front, set goals, and then collect the results when you're done. You don't charge overhead for multiple respawning threads, and your program will wait until all threads are done. It may take a while because the ping timeout on the down host will be quite long.

+6


source


Since separate threads cannot be combined, you can wait for the threads to finish their jobs,



sleep 1 while threads->list();

      

+1


source







All Articles