Apache Spark: applying sklearn function in parallel across partitions

I'm new to Big Data and Apache Spark (and working under supervisor guidance).

Is it possible to apply a function (like a spline) to only RDD sections? I am trying to do some of the work in the doc here .

Exploring the Spark seems to indicate that this is possible, but does not explain how to do it.

"If you instead have many small datasets on which you want to train different learning models, it would be better to use a learning library with one node (like Weka or SciKit-Learn) on each node , perhaps calling it a parallel node with with Spark map()

. "

+3


source to share


3 answers


In fact, we have a library that does exactly that. We have several sklearn transformers and predictors. This name is spark-learn.
From our examples:

from splearn.rdd import DictRDD  
from splearn.feature_extraction.text import SparkHashingVectorizer  
from splearn.feature_extraction.text import SparkTfidfTransformer  
from splearn.svm import SparkLinearSVC  
from splearn.pipeline import SparkPipeline  

from sklearn.feature_extraction.text import HashingVectorizer  
from sklearn.feature_extraction.text import TfidfTransformer  
from sklearn.svm import LinearSVC  
from sklearn.pipeline import Pipeline  

X = [...]  # list of texts  
y = [...]  # list of labels  
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
            columns=('X', 'y'),
            dtype=[np.ndarray, np.ndarray])

local_pipeline = Pipeline((
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
    ('vect', SparkHashingVectorizer()),
    ('tfidf', SparkTfidfTransformer()),
    ('clf', SparkLinearSVC())
))

local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))

y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])

      



You can find it here .

+3


source


I'm not 100% sure what I am following, but there are a number of separation techniques for example mapPartitions

. These operators are passed to you Iterator

at each node and you can do whatever you want and pass it back through the new oneIterator



rdd.mapPartitions(iter=>{
  //Spin up something expensive that you only want to do once per node
  for(item<-iter) yield {
    //do stuff to the items using your expensive item
  }
})

      

0


source


If your dataset is small (it can be loaded and trained per worker), you can do something like this:

def trainModel[T](modelId: Int, trainingSet: List[T]) = {
  //trains model with modelId and returns it
}

//fake data
val data = List()
val numberOfModels = 100
val broadcastedData = sc.broadcast(data)
val trainedModels = sc.parallelize(Range(0, numberOfModels))
  .map(modelId => (modelId, trainModel(modelId, broadcastedData.value)))

      

I am assuming that you have a list of models (or some of them as parameterized models) and you can give them IDs. Then in the trainModel function, you select one based on id. And as a result, you will get rdd pairs of trainable models and their IDs.

0


source







All Articles