Raise TimeOutError after the specified time for multiprocessing. Connection.Listener.accept ()

I am trying to interrupt multiprocessing.connection.Listener.accept()

but so far have not been successful. Since it doesn't provide a parameter timeout

, I thought that maybe I could use socket.setdefaulttimeout()

to abort it as suggested in a post that I can no longer find here on SO.

It didn't work. Then I tried to call close()

the object Listener()

. according to this post , this should work.

It looks like these objects don't play along with the usual socket

-linked solutions.

I can confirm that I am Listener

closed by the object Timer

as expected, but the call is accept()

not interrupted.

Code:

import logging
import socket
import os
from multiprocessing.connection import Listener
from queue import Queue, Empty
from threading import Thread, Event, Timer

class Node(Thread):
    """Base Class providing a AF_INET, AF_UNIX or AF_PIPE connection to its
    data queue. It offers put() and get() method wrappers, and therefore
    behaves like a Queue as well as a Thread.

    Data from the internal queue is automatically fed to any connecting client.
    """
    def __init__(self, sock_name, max_q_size=None, timeout=None,
                 *thread_args, **thread_kwargs):
        """Initialize class.

        :param sock_name: UDS, TCP socket or pipe name
        :param max_q_size: maximum queue size for self.q, default infinite
        """
        self._sock_name = sock_name
        self.connector = Listener(sock_name)
        max_q_size = max_q_size if max_q_size else 0
        self.q = Queue(maxsize=max_q_size)
        self._running = Event()
        self.connection_timer = Timer(timeout, self.connection_timed_out)
        super(Node, self).__init__(*thread_args, **thread_kwargs)

    def connection_timed_out(self):
        """Closes the Listener and shuts down Node if no Client connected.

        :return:
        """
        self.connector.close()
        self.join()

    def _start_connection_timer(self):
        self.connection_timer.start()

    def start(self):
        self._running.set()
        super(Node, self).start()

    def join(self, timeout=None):
        print("clearing..")
        self._running.clear()
        print("internal join")
        super(Node, self).join(timeout=timeout)
        print("Done")

    def run(self):
        while self._running.is_set():
            print("Accepting connections..")
            self._start_connection_timer()
            try:
                client = self.connector.accept()
                self.connection_timer.cancel()
                self.feed_data(client)
            except (TimeoutError, socket.timeout):
                continue
            except Exception as e:
                raise
        print("Run() Terminated!")

    def feed_data(self, client):
        try:
            while self._running.is_set():
                try:
                    client.send(self.q.get())
                except Empty:
                    continue
        except EOFError:
            return


if __name__ == '__main__':
    import time
    n = Node('/home/nils/git/spab2/test.uds', timeout=10)
    n.start()
    print("Sleeping")
    time.sleep(15)
    print("Manual join")
    n.join()

      

I understand that my question is a duplicate of this question - however it is almost one year old and hasn't even received a comment. Also, I am using Unix Domain Socket

s as opposed to the linked post TCP

.

+3
python python-3.x sockets unix-domain-sockets


source to share


No one has answered this question yet

See similar questions:

nine
Non-blocking multiprocessor communication.
0
Python: interrupt s.accept ()
0
How can I interrupt socket.accept () that is running on another thread in Python?

or similar:

2818
Finding the index of an element by specifying the list that contains it in Python
2683
Check if the given key exists in the dictionary
2601
How can I make a time delay in Python?
2568
How to find the current time in Python
2020
Manually raising (throwing) an exception in Python
985
How do I import a module with a full path?
21
close () does not close the socket correctly
1
Azure boot basulator maintains server side socket after timeout
1
Killing the thread after closing the nest
0
Python TCP Server using select () function associated with error 110



All Articles
Loading...
X
Show
Funny
Dev
Pics