Reconnect with AnyEvent :: Handle and tcp_connect

I have a simple TCP-based server and client written with the help of AnyEvent::Handle

using tcp_connect

and tcp_server

. The client connects to the server and sends a string Test Message

every 5 seconds.

This works without issue if the server is available, however, if the server is not available, when the client starts up or becomes unavailable, the client script never tries to reconnect.

I would like it to try to reconnect if the connection descriptor is not available (destroyed?). If it is not available, do something (perhaps report a print status message) but try to reconnect every 5 seconds. This will be the perfect result.

I'm not sure how to do this. I changed my client and server code as follows.

Client

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my @bulk;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        push( @bulk, "Test message" );
        flush( \@bulk );
        undef @bulk;
    } );

my $host = '127.0.0.1';
my $port = 9999;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

$conn_hdl = AnyEvent::Handle->new(
    connect          => [$host, $port],
    keepalive        => 1,
    on_connect_error => sub {
        print "Could not connect: $_[1]\n";
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_error => sub {
        my ( $out_hdl, $fatal, $msg ) = @_;
        AE::log error => $msg;
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_read => sub {
        my ( $self ) = @_;
        $self->unshift_read(
            line => sub {
                my ( $hdl, $data ) = @_;
                print $data. "\n";
            } );
    } );

$conn_cv->recv;

# Flush array of events
sub flush {
    my ( $bulk ) = @_;
    return 0 if scalar @{$bulk} == 0;

    my $output = join( ",", @{$bulk} );
    $output = compress( $output );
    my $l = pack( "N", length( $output ) );
    $output = $l . $output;
    $conn_hdl->push_write( $output );
}

      

Server

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my %holding;

my $host = '127.0.0.1';
my $port = 9999;

my %connections;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        print "Number of connected hosts: ";
        print scalar keys %connections;
        print "\n";
        foreach my $k ( keys %connections ) {
            delete $connections{$k} if $connections{$k}->destroyed;
        }
    } );

my $server_cv = AnyEvent->condvar;
my $server    = tcp_server(
    $host, $port,
    sub {
        my ( $fh, $h, $p ) = @_;
        my $handle;

        $handle = AnyEvent::Handle->new(
            fh        => $fh,
            poll      => 'r',
            keepalive => 1,
            on_read   => sub {
                my ( $self ) = @_;

                # Get Length Header
                $self->unshift_read(
                    chunk => 4,
                    sub {
                        my $len = unpack( "N", $_[1] );

                        # Get Data
                        $self->unshift_read(
                            chunk => $len,
                            sub {
                                my $data = $_[1];
                                $data = uncompress( $data );
                                print $data. "\n";
                            } );
                    } );

            },
            on_eof => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
            on_error => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
        );

        $connections{ $h . ':' . $p } = $handle;    # keep it alive.
    } );

$server_cv->recv;

      

+3


source to share


2 answers


You can use the following:

package MyConnector;

use strict;
use warnings;

use AE               qw( );
use AnyEvent::Handle qw( );
use Scalar::Util     qw( );

sub new {
   my $class = shift;
   my %opts = @_;

   my $self = bless({}, $class);

   {
      Scalar::Util::weaken(my $self = $self);

      my $on_connect       = delete($opts{on_connect});
      my $on_connect_error = delete($opts{on_connect_error});

      my $tries    = delete($opts{tries})    ||  5;
      my $cooldown = delete($opts{cooldown}) || 15;

      $self->{_connect} = sub {
         $self->{_timer} = undef;

         $self->{_handle} = AnyEvent::Handle->new(
            %opts,

            on_connect => sub {
               my ($handle, $host, $port, $retry) = @_;

               $self->{handle} = $handle;
               delete @{$self}{qw( _connect _handle _timer )};

               $on_connect->($handle, $host, $port, $retry)
                  if $on_connect;
            },

            on_connect_error => sub {
               my ($handle, $message) = @_;

               if (!$tries--) {
                  $on_connect_error->($handle, $message)
                     if $on_connect_error;

                  delete @{$self}{qw( _connect _handle _timer )};

                  return;
               }

               # This will happen when this callback returns,
               # but that might not be for a while, so let's
               # do it now in case it saves resources.
               $handle->destroy();

               $self->{_timer} = AE::timer($cooldown, 0, $self->{_connect});
            },
         );
      };

      $self->{_connect}->();
   }

   return $self;
}

sub handle {
   my ($self) = @_;
   return $self->{handle};
}

1;

      



I'm pretty sure it is free of memory leaks (unlike your code). You would use it like this:

use strict;
use warnings;

use AE          qw( );
use MyConnector qw( );

my $host = $ARGV[0] || 'www.stackoverflow.com';
my $port = $ARGV[1] || 80;

my $conn_cv = AE::cv();

my $connector = MyConnector->new(
   connect   => [ $host, $port ],
   keepalive => 1,

   on_connect => sub {
       print("Connected successfully\n");
       $conn_cv->send();
   },

   on_connect_error => sub {
       warn("Could not connect: $_[1]\n");
       $conn_cv->send();
   },

   # ...
);

$conn_cv->recv();

      

+1


source


Thanks to ikegami, I figured I might want to monitor the state of the connection and use it in conjunction with another AnyEvent timer observer to reconnect if no connection is established. This results in connection attempts every second if the connection state ($ isConnected) is zero. Meanwhile, events are queued when the connection is restored.

If there is an easier way to do this, I'm all ears, but for now I think this will fix the problem.



my @bulk;
my $host = '127.0.0.1';
my $port = 9999;
my $isConnected = 0;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

# Flush Timer
my $timer = AnyEvent->timer(
    after => 5,
    interval => 5,
    cb => sub {
        push(@bulk,"Test message");
        if ($isConnected == 1) {
            flush(\@bulk);
            undef @bulk;
        }   
    }
);

# Reconnect Timer
my $reconn = AnyEvent->timer(
    after => 1,
    interval => 1,
    cb => sub {

        if ($isConnected == 0) {

            $conn_hdl = AnyEvent::Handle->new(
                connect => [$host, $port],
                keepalive => 1,
                on_connect => sub {
                    $isConnected = 1;
                },
                on_connect_error => sub {
                    warn "Could not connect: $_[1]\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;       
                },
                on_error => sub {
                    my ($out_hdl, $fatal, $msg) = @_;
                    AE::log error => $msg;
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_eof => sub {
                    warn "EOF\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_read => sub {
                    my ($self) = @_;
                        $self->unshift_read(line => sub {
                            my ($hdl,$data) = @_;
                            print $data."\n";
                    });
                }
            );  
        }
    }
);

$conn_cv->recv;

      

0


source







All Articles