How to close connections on Tkp Akka server?

The expected behavior of close () on our akka-streams based Tkp server is that the client should see the SocketException immediately, and none of the messages posted after the exception should be delivered to the server. However, the last message sent before the exception must be delivered. All new clients should encounter a ConnectException.

Unable to decipher the "Closing connections" paragraph on the akka-streams 1.0 docs , we use the close () implementation as follows for Tcp Server:

  val serverSource: Promise[ByteString] = Promise[ByteString]()

  var serverBindingOption: Option[Tcp.ServerBinding] = None

  def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
    implicit val executionContext = system.dispatcher
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
          .map(raw β‡’ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // drop the last one for no response
    }

    for {
      serverBinding ← Tcp().bind(address, port).to(sink).run()
    } yield {
      serverBindingOption = Some(serverBinding)
      new SyslogStreamServer
    }
  }

  def close()(implicit ec: ExecutionContext): Future[Unit] = {
    serverSource.success(ByteString.empty) // dummy value to be dropped above
    serverBindingOption match {
      case Some(serverBinding) β‡’ serverBinding.unbind()
      case None β‡’ Future[Unit] {}
    }
  }

      

Is this a good way to shut down the server? (for deployments, etc.)

If anyone can view and comment on the validity of the above implementation of the close () function. We are trying to prove or disprove ourselves, does this code fit the bill and wondering should we migrate the stream source first, or unbind the server first? Any feedback / criticism is appreciated. Thanks in advance!

+3


source to share





All Articles