Python-igraph multiprocessing challenge

I am using python-igraph for my task. Let's say we have a large weighted directed graph, and for each node we want to highlight its edges with a special flag (add an attribute to it) if the weight is significant (for example, the top one by weight). Obviously, this operation can be done in parallel because it is independent for each node. So I decided to use multiprocessing. But it seems that I am doing something wrong because the graph is not changing. One version of the process works.

Here's a rough graph . This is how I do it:

from argparse import ArgumentParser
from igraph import Graph
from enum import Enum
import multiprocessing as mp

MAXIMUM_PARALLEL_PROCESSES = 4

class EdgeType(Enum):
    Distance = 0
    ListOfNeighbors = 1


class ClustGraph():
    def __init__(self):
        self.__graph = None

    def create_short_graph(self):
        graph = self.__graph
        print graph.es[0]

        queue = mp.JoinableQueue()
        [queue.put(vertex.index) for vertex in graph.vs]
        [queue.put(None) for i in range(MAXIMUM_PARALLEL_PROCESSES)]
        workers = []
        for i in range(MAXIMUM_PARALLEL_PROCESSES):
            worker = mp.Process(target=ClustGraph.__create_neighbors_list_parallel, args=(self, queue))
            workers.append(worker)
            worker.start()
        queue.join()

        print graph.es[0]

    def load(self, f):
        self.__graph = Graph.Read_Ncol(f)
        self.__graph.es['type'] = [{EdgeType.Distance} for i in xrange(len(self.__graph.es))]

    @staticmethod
    def __create_neighbors_list(vertex, graph, p, n):
        edges = graph.es.select(_from=vertex)
        size = int(max(len(edges) * p, n))
        short_edges = dict(sorted([(edge, edge['weight']) for edge in edges], key=lambda (k, v): v, reverse=True)[:size]).keys()
        [e['type'].add(EdgeType.ListOfNeighbors) for e in short_edges]

    def __create_neighbors_list_parallel(self, queue, p=0.05, n=7):
        while True:
            vertex = queue.get()
            if vertex is None:
                break
            ClustGraph.__create_neighbors_list(vertex, self.__graph, p, n)
            queue.task_done()
        queue.task_done()


def main():
    parser = ArgumentParser()
    parser.add_argument('-i', '--input')
    args = parser.parse_args()

    graph = ClustGraph()
    graph.load(args.input)
    graph.create_short_graph()

if __name__ == '__main__':
    main()

      

However, if there are any best practices for such a task, I would be happy to know about them.

PS enum enum34

+3


source to share





All Articles