Apache Spark: applying sklearn function in parallel across partitions
I'm new to Big Data and Apache Spark (and working under supervisor guidance).
Is it possible to apply a function (like a spline) to only RDD sections? I am trying to do some of the work in the doc here .
Exploring the Spark seems to indicate that this is possible, but does not explain how to do it.
"If you instead have many small datasets on which you want to train different learning models, it would be better to use a learning library with one node (like Weka or SciKit-Learn) on each node , perhaps calling it a parallel node with with Spark
map()
. "
source to share
In fact, we have a library that does exactly that. We have several sklearn transformers and predictors. This name is spark-learn.
From our examples:
from splearn.rdd import DictRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.svm import SparkLinearSVC
from splearn.pipeline import SparkPipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
X = [...] # list of texts
y = [...] # list of labels
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
columns=('X', 'y'),
dtype=[np.ndarray, np.ndarray])
local_pipeline = Pipeline((
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer()),
('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
('vect', SparkHashingVectorizer()),
('tfidf', SparkTfidfTransformer()),
('clf', SparkLinearSVC())
))
local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))
y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
You can find it here .
source to share
I'm not 100% sure what I am following, but there are a number of separation techniques for example mapPartitions
. These operators are passed to you Iterator
at each node and you can do whatever you want and pass it back through the new oneIterator
rdd.mapPartitions(iter=>{
//Spin up something expensive that you only want to do once per node
for(item<-iter) yield {
//do stuff to the items using your expensive item
}
})
source to share
If your dataset is small (it can be loaded and trained per worker), you can do something like this:
def trainModel[T](modelId: Int, trainingSet: List[T]) = { //trains model with modelId and returns it } //fake data val data = List() val numberOfModels = 100 val broadcastedData = sc.broadcast(data) val trainedModels = sc.parallelize(Range(0, numberOfModels)) .map(modelId => (modelId, trainModel(modelId, broadcastedData.value)))
I am assuming that you have a list of models (or some of them as parameterized models) and you can give them IDs. Then in the trainModel function, you select one based on id. And as a result, you will get rdd pairs of trainable models and their IDs.
source to share