How to close connections on Tkp Akka server?

The expected behavior of close () on our akka-streams based Tkp server is that the client should see the SocketException immediately, and none of the messages posted after the exception should be delivered to the server. However, the last message sent before the exception must be delivered. All new clients should encounter a ConnectException.

Unable to decipher the "Closing connections" paragraph on the akka-streams 1.0 docs , we use the close () implementation as follows for Tcp Server:

  val serverSource: Promise[ByteString] = Promise[ByteString]()

  var serverBindingOption: Option[Tcp.ServerBinding] = None

  def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
    implicit val executionContext = system.dispatcher
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
          .map(raw β‡’ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // drop the last one for no response
    }

    for {
      serverBinding ← Tcp().bind(address, port).to(sink).run()
    } yield {
      serverBindingOption = Some(serverBinding)
      new SyslogStreamServer
    }
  }

  def close()(implicit ec: ExecutionContext): Future[Unit] = {
    serverSource.success(ByteString.empty) // dummy value to be dropped above
    serverBindingOption match {
      case Some(serverBinding) β‡’ serverBinding.unbind()
      case None β‡’ Future[Unit] {}
    }
  }

      

Is this a good way to shut down the server? (for deployments, etc.)

If anyone can view and comment on the validity of the above implementation of the close () function. We are trying to prove or disprove ourselves, does this code fit the bill and wondering should we migrate the stream source first, or unbind the server first? Any feedback / criticism is appreciated. Thanks in advance!

+3
scala akka akka-stream


source to share


No one has answered this question yet

Check out similar questions:

167
How to get started with Akka Streams?
4
akka streams with akka cluster
4
akka streams over tcp
3
Akka Stream connects to multiple receivers
3
How to represent multiple incoming TCP connections as a stream of Akka streams?
2
Allow one connection to Akka stream as tcp server
1
Akka streams with Akka server and client
1
Tcp query to database with akka streams
0
Reject TCP request from not localhost on Akka stream
0
Akka streams: how to connect stream to stream?



All Articles
Loading...
X
Show
Funny
Dev
Pics