SageMaker Spark is an open source Spark library for Amazon SageMaker. With SageMaker Spark you construct Spark ML Pipelines using Amazon SageMaker stages. These pipelines interleave native Spark ML stages and stages that interact with SageMaker training and model hosting.
With SageMaker Spark, you can train on Amazon SageMaker from Spark DataFrames using Amazon-provided ML algorithms
like K-Means clustering or XGBoost, and make predictions on DataFrames against
SageMaker endpoints hosting your trained models, and, if you have your own ML algorithms built
into SageMaker compatible Docker containers, you can use SageMaker Spark to train and infer on DataFrames with your
own algorithms – all at Spark scale.
SageMaker Spark depends on hadoop-aws-2.8.1. To run Spark applications that depend on SageMaker Spark, you need to
build Spark with Hadoop 2.8. However, if you are running Spark applications on EMR, you can use Spark built with Hadoop 2.7.
Apache Spark currently distributes binaries built against Hadoop-2.7, but not 2.8.
See the Spark documentation for more on building Spark
with Hadoop 2.8.
SageMaker Spark needs to be added to both the driver and executor classpaths.
Running SageMaker Spark Applications with spark-shell or spark-submit
You can submit SageMaker Spark and the AWS Java Client as dependencies with the “–jars” flag, or take a dependency
on SageMaker Spark in Maven using the “–package” flag.
You can run SageMaker Spark applications on an EMR cluster just like any other Spark application by
submitting your Spark application jar and the SageMaker Spark dependency jars with the –jars or –packages flags.
SageMaker Spark is pre-installed on EMR releases since 5.11.0. You can run your SageMaker Spark application
on EMR by submitting your Spark application jar and any additional dependencies your Spark application uses.
SageMaker Spark applications have also been verified to be compatible with EMR-5.6.0 (which runs Spark 2.1) and EMR-5-8.0
(which runs Spark 2.2). When submitting your Spark application to an earlier EMR release, use the --packages flag to
depend on a recent version of the AWS Java SDK:
The spark.driver.userClassPathFirst=true and spark.executor.userClassPathFirst=true properties are required so that
the Spark cluster will use the AWS Java SDK dependencies with SageMaker, rather than the AWS Java SDK installed on these
earlier EMR clusters.
For more on running Spark application on EMR, see the
EMR Documentation on submitting a step.
Getting Started: K-Means Clustering on SageMaker with SageMaker Spark SDK
This example walks through using SageMaker Spark to train on a Spark DataFrame using a SageMaker-provided algorithm,
host the resulting model on SageMaker Spark, and making predictions on a Spark DataFrame using that hosted model.
We’ll cluster handwritten digits in the MNIST dataset, which we’ve made available in LibSVM format at
s3://sagemaker-sample-data-us-east-1/spark/mnist/train/mnist_train.libsvm.
Create your Spark Session and load your training and test data into DataFrames:
```scala
val spark = SparkSession.builder.getOrCreate
// load mnist data as a dataframe from libsvm. replace this region with your own.
val region = “us-east-1”
val trainingData = spark.read.format(“libsvm”)
.option(“numFeatures”, “784”)
.load(s”s3://sagemaker-sample-data-$region/spark/mnist/train/“)
val testData = spark.read.format(“libsvm”)
.option(“numFeatures”, “784”)
.load(s”s3://sagemaker-sample-data-$region/spark/mnist/test/“)
The `DataFrame` consists of a column named "label" of Doubles, indicating the digit for each example,
and a column named "features" of Vectors:
```scala
trainingData.show
+-----+--------------------+
|label| features|
+-----+--------------------+
| 5.0|(784,[152,153,154...|
| 0.0|(784,[127,128,129...|
| 4.0|(784,[160,161,162...|
| 1.0|(784,[158,159,160...|
| 9.0|(784,[208,209,210...|
| 2.0|(784,[155,156,157...|
| 1.0|(784,[124,125,126...|
| 3.0|(784,[151,152,153...|
| 1.0|(784,[152,153,154...|
| 4.0|(784,[134,135,161...|
| 3.0|(784,[123,124,125...|
| 5.0|(784,[216,217,218...|
| 3.0|(784,[143,144,145...|
| 6.0|(784,[72,73,74,99...|
| 1.0|(784,[151,152,153...|
| 7.0|(784,[211,212,213...|
| 2.0|(784,[151,152,153...|
| 8.0|(784,[159,160,161...|
| 6.0|(784,[100,101,102...|
| 9.0|(784,[209,210,211...|
+-----+--------------------+
Construct a KMeansSageMakerEstimator, which extends SageMakerEstimator, which is a Spark Estimator.
You need to pass in an Amazon SageMaker-compatible
IAM Role that Amazon SageMaker will use to make AWS service calls on your behalf (or configure SageMaker Spark
to get this from Spark Config). Consult the API Documentation for a
complete list of parameters.
In this example, we are setting the “k” and “feature_dim” hyperparameters, corresponding to the number
of clusters we want and to the number of dimensions in our training dataset, respectively.
// Replace this IAM Role ARN with your own.
val roleArn = "arn:aws:iam::account-id:role/rolename"
val estimator = new KMeansSageMakerEstimator(
sagemakerRole = IAMRole(roleArn),
trainingInstanceType = "ml.p2.xlarge",
trainingInstanceCount = 1,
endpointInstanceType = "ml.c4.xlarge",
endpointInitialInstanceCount = 1)
.setK(10).setFeatureDim(784)
To train and host your model, call fit() on your training DataFrame:
val model = estimator.fit(trainingData)
What happens in this call to fit()?
SageMaker Spark serializes your DataFrame and uploads the
serialized training data to S3. For the K-Means algorithm, SageMaker Spark converts the DataFrame to the Amazon Record
format.
SageMaker Spark will create an S3 bucket for you that your IAM role can access if you do not provide an S3 Bucket in
the constructor.
SageMaker Spark sends a CreateTrainingJobRequest to Amazon SageMaker to run a Training Job with one p2.xlarge on the data in S3, configured with the
values you pass in to the SageMakerEstimator, and polls for completion of the Training Job.
In this example, we are sending a CreateTrainingJob request to run a k-means clustering Training Job on Amazon SageMaker
on serialized data we uploaded from your DataFrame. When training completes, the Amazon SageMaker service puts
a serialized model in an S3 bucket you own (or the default bucket created by SageMaker Spark).
After training completes, SageMaker Spark sends a CreateModelRequest, a CreateEndpointConfigRequest, and a
CreateEndpointRequest and polls for completion, each configured with the values you pass in to the SageMakerEstimator.
This Endpoint will initially be backed by one c4.xlarge.
To make inferences using the Endpoint hosting our model, call transform() on the SageMakerModel returned by fit().
In this call to transform(), the SageMakerModel serializes chunks of the input DataFrame and sends them to the
Endpoint using the SageMakerRuntime InvokeEndpoint API. The SageMakerModel deserializes the Endpoint’s responses,
which contain predictions, and appends the prediction columns to the input DataFrame.
Example: Using SageMaker Spark with Any SageMaker Algorithm
The SageMakerEstimator is an org.apache.spark.ml.Estimator that trains a model on Amazon SageMaker.
SageMaker Spark provides several classes that extend SageMakerEstimator to run particular algorithms, like KMeansSageMakerEstimator
to run the SageMaker-provided k-means algorithm, or XGBoostSageMakerEstimator to run the SageMaker-provided XGBoost
algorithm. These classes are just SageMakerEstimators with certain default values passed in. You can use SageMaker Spark with
any algorithm that runs on Amazon SageMaker by creating a SageMakerEstimator.
Instead of creating a KMeansSageMakerEstimator, you can create an equivalent SageMakerEstimator:
trainingImage identifies the Docker registry path to the training image containing your custom code. In this case,
this points to the us-east-1 k-means image.
modelImage identifies the Docker registry path to the image containing inference code. Amazon SageMaker k-means
uses the same image to train and to host trained models.
requestRowSerializer implements com.amazonaws.services.sagemaker.sparksdk.transformation.RequestRowSerializer.
A RequestRowSerializer serializes org.apache.spark.sql.Rows in the input DataFrame to send them to the model hosted in Amazon SageMaker for inference.
This is passed to the SageMakerModel returned by fit. In this case, we pass in a RequestRowSerializer that serializes
Rows to the Amazon Record protobuf format. See Serializing and Deserializing for Inference
for more information on how SageMaker Spark makes inferences.
responseRowDeserializer Implements
com.amazonaws.services.sagemaker.sparksdk.transformation.ResponseRowDeserializer. A ResponseRowDeserializer deserializes
responses containing predictions from the Endpoint back into columns in a DataFrame.
hyperParameters is a Map[String, String] that the trainingImage will use to set training hyperparameters.
trainingSparkDataFormat specifies the data format that Spark uses when uploading training data from a DataFrame
to S3.
SageMaker Spark needs the trainingSparkDataFormat to tell Spark how to write the DataFrame to S3 for the trainingImage to
train on. In this example, “sagemaker” tells Spark to write the data as
RecordIO-encoded Amazon Records, but your own algorithm may take another data format.
You can pass in any format that Spark supports as long as your trainingImage can train using that data format,
such as “csv”, “parquet”, “com.databricks.spark.csv”, or “libsvm.”
SageMaker Spark also needs a RequestRowSerializer to serialize Spark Rows to a
data format the modelImage can deserialize, and a ResponseRowDeserializer to deserialize responses that contain
predictions from the modelImage back into Spark Rows. See Serializing and Deserializing for Inference
for more details.
Example: Using SageMakerEstimator and SageMakerModel in a Spark Pipeline
SageMakerEstimators and SageMakerModels can be used in Pipelines. In this
example, we run org.apache.spark.ml.feature.PCA on our Spark cluster, then train and infer using Amazon SageMaker’s
K-Means on the output column from PCA:
new ProtobufRequestRowSerializer(featuresColumnName = "projectedFeatures")` tells the `SageMakerModel` returned
by `fit()` to infer on the features in the "projectedFeatures" column
trainingSparkDataFormatOptions = Map("featuresColumnName" -> "projectedFeatures") tells the SageMakerProtobufWriter
that Spark is using to write the DataFrame as format “sagemaker” to serialize the “projectedFeatures” column when
writing Amazon Records for training.
Example: Using Multiple SageMakerEstimators and SageMakerModels in a Spark Pipeline
We can use multiple SageMakerEstimators and SageMakerModels in a pipeline. Here, we use
SageMaker’s PCA algorithm to reduce a dataset with 50 dimensions to a dataset with 20 dimensions, then
use SageMaker’s K-Means algorithm to train on the 20-dimension data.
responseRowDeserializer = new PCAProtobufResponseRowDeserializer( projectionColumnName = "projectionDim20") tells the SageMakerModel attached to the PCA endpoint to deserialize
responses (which contain the lower-dimensional projections of the features vectors) into the column named “projectionDim20”
endpointCreationPolicy = EndpointCreationPolicy.CREATE_ON_TRANSFORM tells the SageMakerEstimator to delay SageMaker
Endpoint creation until it is needed to transform a DataFrame.
`trainingSparkDataFormatOptions = Map(“featuresColumnName” -> “projectionDim20”),
requestRowSerializer = new ProtobufRequestRowSerializer(
featuresColumnName = "projectionDim20")` these lines tell the `KMeansSageMakerEstimator`
to respectively train and infer on the features in the "projectionDim20" column.
Example: Creating a SageMakerModel
SageMaker Spark supports attaching SageMakerModels to an existing SageMaker endpoint, or to an Endpoint created by
reference to model data in S3, or to a previously completed Training Job.
This allows you to use SageMaker Spark just for model hosting and inference on Spark-scale DataFrames without running
a new Training Job.
SageMakerModel From an Endpoint
You can attach a SageMakerModel to an endpoint that has already been created. Supposing an endpoint with name
“my-endpoint-name” is already in service and hosting a SageMaker K-Means model:
val model = SageMakerModel
.fromEndpoint(endpointName = "my-endpoint-name",
requestRowSerializer = new ProtobufRequestRowSerializer(
featuresColumnName = "MyFeaturesColumn"),
responseRowDeserializer = new KMeansProtobufResponseRowDeserializer(
distanceToClusterColumnName = "DistanceToCluster",
closestClusterColumnName = "ClusterLabel"
))
This SageMakerModel will, upon a call to transform(), serialize the column named
“MyFeaturesColumn” for inference, and append the columns “DistanceToCluster” and “ClusterLabel” to the DataFrame.
SageMakerModel From Model Data in S3
You can create a SageMakerModel and an Endpoint by referring directly to your model data in S3:
val model = SageMakerModel
.fromModelS3Path(modelPath = "s3://my-model-bucket/my-model-data/model.tar.gz",
modelExecutionRoleARN = "arn:aws:iam::account-id:role/rolename"
modelImage = 382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:1",
endpointInstanceType = "ml.c4.xlarge",
endpointInitialInstanceCount = 1
requestRowSerializer = new ProtobufRequestRowSerializer(),
responseRowDeserializer = new KMeansProtobufResponseRowDeserializer()
)
SageMakerModel From a Previously Completed Training Job
You can create a SageMakerModel and an Endpoint by referring to a previously-completed training job:
val model = SageMakerModel
.fromTrainingJob(trainingJobName = "my-training-job-name",
modelExecutionRoleARN = "arn:aws:iam::account-id:role/rolename"
modelImage = 382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:1",
endpointInstanceType = "ml.c4.xlarge",
endpointInitialInstanceCount = 1
requestRowSerializer = new ProtobufRequestRowSerializer(),
responseRowDeserializer = new KMeansProtobufResponseRowDeserializer()
)
Example: Tearing Down Amazon SageMaker Endpoints
SageMaker Spark provides a utility for deleting Endpoints created by a SageMakerModel:
val sagemakerClient = AmazonSageMakerClientBuilder.defaultClient
val cleanup = new SageMakerResourceCleanup(sagemakerClient)
cleanup.deleteResources(model.getCreatedResources)
Configuring an IAM Role
SageMaker Spark allows you to add your IAM Role ARN to your Spark Config so that you don’t have to keep passing in
IAMRole("arn:aws:iam::account-id:role/rolename").
Add an entry to your Spark Config with key com.amazonaws.services.sagemaker.sparksdk.sagemakerrole whose value is your
Amazon SageMaker-compatible IAM Role. SageMakerEstimator will look for this role if it is not supplied in the constructor.
SageMaker Spark: In-Depth
The Amazon Record format
KMeansSageMakerEstimator, PCASageMakerEstimator, and LinearLearnerSageMakerEstimator all serialize DataFrames
to the Amazon Record protobuf format with each Record encoded in
RecordIO.
They do this by passing in “sagemaker” to the trainingSparkDataFormat constructor argument, which configures Spark
to use the SageMakerProtobufWriter to serialize Spark DataFrames.
Writing a DataFrame using the “sagemaker”
format serializes a column named “label”, expected to contain
Doubles, and a column named “features”, expected to contain a Sparse or Dense org.apache.mllib.linalg.Vector.
If the features column contains a SparseVector, SageMaker Spark sparsely-encodes the Vector into the Amazon Record.
If the features column contains a DenseVector, SageMaker Spark densely-encodes the Vector into the Amazon Record.
You can choose which columns the SageMakerEstimator chooses as its “label” and “features” columns by passing in
a trainingSparkDataFormatOptionsMap[String, String] with keys “labelColumnName” and “featuresColumnName” and with
values corresponding to the names of your chosen label and features columns.
You can also write Amazon Records using SageMaker Spark by using the “sagemaker” format directly:
By default, SageMakerEstimator deletes the RecordIO-encoded Amazon Records in S3 following training on Amazon
SageMaker. You can choose to allow the data to persist in S3 by passing in deleteStagingDataAfterTraining = true to
SageMakerEstimator.
SageMakerEstimator.fit() returns a SageMakerModel, which transforms a DataFrame by calling InvokeEndpoint on
an Amazon SageMaker Endpoint. InvokeEndpointRequests carry serialized Rows as their payload.Rows in the DataFrame
are serialized for predictions against an Endpoint using a RequestRowSerializer. Responses from an Endpoint containing
predictions are deserialized into Spark Rows and appended as columns in a DataFrame using a ResponseRowDeserializer.
Internally, SageMakerModel.transform calls mapPartitions to distribute the work
of serializing Spark Rows, constructing and sending InvokeEndpointRequests to an Endpoint, and deserializing
InvokeEndpointResponses across a Spark cluster. Because each InvokeEndpointRequest can carry only 5MB, each
Spark partition creates a
com.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIterator to iterate over its partition,
sending prediction requests to the Endpoint in 5MB increments.
RequestRowSerializer.serializeRow() converts a Row to an Array[Byte].
The RequestBatchIterator appends these byte arrays to
form the request body of an InvokeEndpointRequest.
For example, the
com.amazonaws.services.sagemaker.sparksdk.transformation.ProtobufRequestRowSerializer creates one
RecordIO-encoded Amazon Record per input row by serializing the “features” column in each row, and wrapping each
Amazon Record in the RecordIO header.
ResponseRowDeserializer.deserializeResponse() converts an Array[Byte] containing predictions from an Endpoint to
an Iterator[Row]to appends columns containing these predictions to the DataFrame being transformed by the
SageMakerModel.
For comparison, SageMaker’s XGBoost uses LibSVM-formatted data for inference (as well as training), and responds with a comma-delimited list of predictions.
Accordingly, SageMaker Spark uses com.amazonaws.services.sagemaker.sparksdk.transformation.LibSVMRequestRowSerializer
to serialize rows into LibSVM-formatted data, and uses com.amazonaws.services.sagemaker.sparksdk.transformation.XGBoostCSVResponseRowDeserializer
to deserialize the response into a column of predictions.
To support your own model image’s data formats for inference, you can implement your own RequestRowSerializer and ResponseRowDeserializer.
SageMaker Spark
SageMaker Spark is an open source Spark library for Amazon SageMaker. With SageMaker Spark you construct Spark ML
Pipelines using Amazon SageMaker stages. These pipelines interleave native Spark ML stages and stages that interact with SageMaker training and model hosting.With SageMaker Spark, you can train on Amazon SageMaker from Spark
DataFrames using Amazon-provided ML algorithms like K-Means clustering or XGBoost, and make predictions onDataFrames against SageMaker endpoints hosting your trained models, and, if you have your own ML algorithms built into SageMaker compatible Docker containers, you can use SageMaker Spark to train and infer onDataFrames with your own algorithms – all at Spark scale.Table of Contents
spark-submitGetting SageMaker Spark
Scala
SageMaker Spark for Scala is available in the Maven central repository:
Or, if your project depends on Spark 2.1:
You can also build SageMaker Spark from source. See sagemaker-spark-sdk for more on building SageMaker Spark from source.
Python
See the sagemaker-pyspark-sdk for more on installing and running SageMaker PySpark.
Running SageMaker Spark
SageMaker Spark depends on hadoop-aws-2.8.1. To run Spark applications that depend on SageMaker Spark, you need to build Spark with Hadoop 2.8. However, if you are running Spark applications on EMR, you can use Spark built with Hadoop 2.7.
Apache Spark currently distributes binaries built against Hadoop-2.7, but not 2.8. See the Spark documentation for more on building Spark with Hadoop 2.8.
SageMaker Spark needs to be added to both the driver and executor classpaths.
Running SageMaker Spark Applications with
spark-shellorspark-submitYou can submit SageMaker Spark and the AWS Java Client as dependencies with the “–jars” flag, or take a dependency on SageMaker Spark in Maven using the “–package” flag.
spark-shellorspark-submitwith the--packagesflag:Running SageMaker Spark Applications on EMR
You can run SageMaker Spark applications on an EMR cluster just like any other Spark application by submitting your Spark application jar and the SageMaker Spark dependency jars with the –jars or –packages flags.
SageMaker Spark is pre-installed on EMR releases since 5.11.0. You can run your SageMaker Spark application on EMR by submitting your Spark application jar and any additional dependencies your Spark application uses.
SageMaker Spark applications have also been verified to be compatible with EMR-5.6.0 (which runs Spark 2.1) and EMR-5-8.0 (which runs Spark 2.2). When submitting your Spark application to an earlier EMR release, use the
--packagesflag to depend on a recent version of the AWS Java SDK:The
spark.driver.userClassPathFirst=trueandspark.executor.userClassPathFirst=trueproperties are required so that the Spark cluster will use the AWS Java SDK dependencies with SageMaker, rather than the AWS Java SDK installed on these earlier EMR clusters.For more on running Spark application on EMR, see the EMR Documentation on submitting a step.
Python
See the sagemaker-pyspark-sdk for more on installing and running SageMaker PySpark.
S3 FileSystem Schemes
EMR allows you to read and write data using the EMR FileSystem (EMRFS), accessed through Spark with “s3://“:
In other execution environments, you can use the S3A schema to use the S3A FileSystem “s3a://“ to read and write data:
In the code examples in this README, we use “s3://“ to use the EMRFS, or “s3a://“ to use the S3A system, which is recommended over “s3n://“.
API Documentation
You can view the Scala API Documentation for SageMaker Spark here.
You can view the PySpark API Documentation for SageMaker Spark here.
Getting Started: K-Means Clustering on SageMaker with SageMaker Spark SDK
This example walks through using SageMaker Spark to train on a Spark DataFrame using a SageMaker-provided algorithm, host the resulting model on SageMaker Spark, and making predictions on a Spark DataFrame using that hosted model.
We’ll cluster handwritten digits in the MNIST dataset, which we’ve made available in LibSVM format at
s3://sagemaker-sample-data-us-east-1/spark/mnist/train/mnist_train.libsvm.You can start a Spark shell with SageMaker Spark
// load mnist data as a dataframe from libsvm. replace this region with your own. val region = “us-east-1” val trainingData = spark.read.format(“libsvm”) .option(“numFeatures”, “784”) .load(s”s3://sagemaker-sample-data-$region/spark/mnist/train/“)
val testData = spark.read.format(“libsvm”) .option(“numFeatures”, “784”) .load(s”s3://sagemaker-sample-data-$region/spark/mnist/test/“)
KMeansSageMakerEstimator, which extendsSageMakerEstimator, which is a SparkEstimator. You need to pass in an Amazon SageMaker-compatible IAM Role that Amazon SageMaker will use to make AWS service calls on your behalf (or configure SageMaker Spark to get this from Spark Config). Consult the API Documentation for a complete list of parameters.In this example, we are setting the “k” and “feature_dim” hyperparameters, corresponding to the number of clusters we want and to the number of dimensions in our training dataset, respectively.
fit()on your trainingDataFrame:What happens in this call to
fit()?SageMaker Spark serializes your
DataFrameand uploads the serialized training data to S3. For the K-Means algorithm, SageMaker Spark converts theDataFrameto the Amazon Record format. SageMaker Spark will create an S3 bucket for you that your IAM role can access if you do not provide an S3 Bucket in the constructor.SageMaker Spark sends a
CreateTrainingJobRequestto Amazon SageMaker to run a Training Job with onep2.xlargeon the data in S3, configured with the values you pass in to theSageMakerEstimator, and polls for completion of the Training Job. In this example, we are sending a CreateTrainingJob request to run a k-means clustering Training Job on Amazon SageMaker on serialized data we uploaded from yourDataFrame. When training completes, the Amazon SageMaker service puts a serialized model in an S3 bucket you own (or the default bucket created by SageMaker Spark).After training completes, SageMaker Spark sends a
CreateModelRequest, aCreateEndpointConfigRequest, and aCreateEndpointRequestand polls for completion, each configured with the values you pass in to the SageMakerEstimator. This Endpoint will initially be backed by onec4.xlarge.To make inferences using the Endpoint hosting our model, call
transform()on theSageMakerModelreturned byfit().In this call to
transform(), theSageMakerModelserializes chunks of the inputDataFrameand sends them to the Endpoint using the SageMakerRuntimeInvokeEndpointAPI. TheSageMakerModeldeserializes the Endpoint’s responses, which contain predictions, and appends the prediction columns to the inputDataFrame.Example: Using SageMaker Spark with Any SageMaker Algorithm
The
SageMakerEstimatoris anorg.apache.spark.ml.Estimatorthat trains a model on Amazon SageMaker.SageMaker Spark provides several classes that extend
SageMakerEstimatorto run particular algorithms, likeKMeansSageMakerEstimatorto run the SageMaker-provided k-means algorithm, orXGBoostSageMakerEstimatorto run the SageMaker-provided XGBoost algorithm. These classes are justSageMakerEstimators with certain default values passed in. You can use SageMaker Spark with any algorithm that runs on Amazon SageMaker by creating a SageMakerEstimator.Instead of creating a KMeansSageMakerEstimator, you can create an equivalent SageMakerEstimator:
trainingImageidentifies the Docker registry path to the training image containing your custom code. In this case, this points to the us-east-1 k-means image.modelImageidentifies the Docker registry path to the image containing inference code. Amazon SageMaker k-means uses the same image to train and to host trained models.requestRowSerializerimplementscom.amazonaws.services.sagemaker.sparksdk.transformation.RequestRowSerializer. ARequestRowSerializerserializesorg.apache.spark.sql.Rows in the inputDataFrameto send them to the model hosted in Amazon SageMaker for inference. This is passed to the SageMakerModel returned byfit. In this case, we pass in aRequestRowSerializerthat serializesRows to the Amazon Record protobuf format. See Serializing and Deserializing for Inference for more information on how SageMaker Spark makes inferences.responseRowDeserializerImplementscom.amazonaws.services.sagemaker.sparksdk.transformation.ResponseRowDeserializer. AResponseRowDeserializerdeserializes responses containing predictions from the Endpoint back into columns in aDataFrame.hyperParametersis aMap[String, String]that thetrainingImagewill use to set training hyperparameters.trainingSparkDataFormatspecifies the data format that Spark uses when uploading training data from aDataFrameto S3.SageMaker Spark needs the trainingSparkDataFormat to tell Spark how to write the DataFrame to S3 for the
trainingImageto train on. In this example, “sagemaker” tells Spark to write the data as RecordIO-encoded Amazon Records, but your own algorithm may take another data format. You can pass in any format that Spark supports as long as yourtrainingImagecan train using that data format, such as “csv”, “parquet”, “com.databricks.spark.csv”, or “libsvm.”SageMaker Spark also needs a
RequestRowSerializerto serialize SparkRows to a data format themodelImagecan deserialize, and aResponseRowDeserializerto deserialize responses that contain predictions from themodelImageback into SparkRows. See Serializing and Deserializing for Inference for more details.Example: Using SageMakerEstimator and SageMakerModel in a Spark Pipeline
SageMakerEstimators andSageMakerModels can be used inPipelines. In this example, we runorg.apache.spark.ml.feature.PCAon our Spark cluster, then train and infer using Amazon SageMaker’s K-Means on the output column fromPCA:trainingSparkDataFormatOptions = Map("featuresColumnName" -> "projectedFeatures")tells theSageMakerProtobufWriterthat Spark is using to write theDataFrameas format “sagemaker” to serialize the “projectedFeatures” column when writing Amazon Records for training.Example: Using Multiple SageMakerEstimators and SageMakerModels in a Spark Pipeline
We can use multiple
SageMakerEstimators andSageMakerModels in a pipeline. Here, we use SageMaker’s PCA algorithm to reduce a dataset with 50 dimensions to a dataset with 20 dimensions, then use SageMaker’s K-Means algorithm to train on the 20-dimension data.responseRowDeserializer = new PCAProtobufResponseRowDeserializer( projectionColumnName = "projectionDim20")tells theSageMakerModelattached to the PCA endpoint to deserialize responses (which contain the lower-dimensional projections of the features vectors) into the column named “projectionDim20”endpointCreationPolicy = EndpointCreationPolicy.CREATE_ON_TRANSFORMtells theSageMakerEstimatorto delay SageMaker Endpoint creation until it is needed to transform aDataFrame.Example: Creating a SageMakerModel
SageMaker Spark supports attaching
SageMakerModels to an existing SageMaker endpoint, or to an Endpoint created by reference to model data in S3, or to a previously completed Training Job.This allows you to use SageMaker Spark just for model hosting and inference on Spark-scale
DataFrames without running a new Training Job.SageMakerModel From an Endpoint
You can attach a
SageMakerModelto an endpoint that has already been created. Supposing an endpoint with name “my-endpoint-name” is already in service and hosting a SageMaker K-Means model:This
SageMakerModelwill, upon a call totransform(), serialize the column named “MyFeaturesColumn” for inference, and append the columns “DistanceToCluster” and “ClusterLabel” to theDataFrame.SageMakerModel From Model Data in S3
You can create a SageMakerModel and an Endpoint by referring directly to your model data in S3:
SageMakerModel From a Previously Completed Training Job
You can create a SageMakerModel and an Endpoint by referring to a previously-completed training job:
Example: Tearing Down Amazon SageMaker Endpoints
SageMaker Spark provides a utility for deleting Endpoints created by a SageMakerModel:
Configuring an IAM Role
SageMaker Spark allows you to add your IAM Role ARN to your Spark Config so that you don’t have to keep passing in
IAMRole("arn:aws:iam::account-id:role/rolename").Add an entry to your Spark Config with key
com.amazonaws.services.sagemaker.sparksdk.sagemakerrolewhose value is your Amazon SageMaker-compatible IAM Role.SageMakerEstimatorwill look for this role if it is not supplied in the constructor.SageMaker Spark: In-Depth
The Amazon Record format
KMeansSageMakerEstimator,PCASageMakerEstimator, andLinearLearnerSageMakerEstimatorall serializeDataFrames to the Amazon Record protobuf format with each Record encoded in RecordIO. They do this by passing in “sagemaker” to thetrainingSparkDataFormatconstructor argument, which configures Spark to use theSageMakerProtobufWriterto serialize SparkDataFrames.Writing a
DataFrameusing the “sagemaker” format serializes a column named “label”, expected to containDoubles, and a column named “features”, expected to contain a Sparse or Denseorg.apache.mllib.linalg.Vector. If the features column contains aSparseVector, SageMaker Spark sparsely-encodes theVectorinto the Amazon Record. If the features column contains aDenseVector, SageMaker Spark densely-encodes theVectorinto the Amazon Record.You can choose which columns the
SageMakerEstimatorchooses as its “label” and “features” columns by passing in atrainingSparkDataFormatOptionsMap[String, String]with keys “labelColumnName” and “featuresColumnName” and with values corresponding to the names of your chosen label and features columns.You can also write Amazon Records using SageMaker Spark by using the “sagemaker” format directly:
By default,
SageMakerEstimatordeletes the RecordIO-encoded Amazon Records in S3 following training on Amazon SageMaker. You can choose to allow the data to persist in S3 by passing indeleteStagingDataAfterTraining = truetoSageMakerEstimator.See the AWS Documentation on Amazon Records for more information on Amazon Records.
Serializing and Deserializing for Inference
SageMakerEstimator.fit()returns aSageMakerModel, which transforms aDataFrameby callingInvokeEndpointon an Amazon SageMaker Endpoint.InvokeEndpointRequests carry serializedRows as their payload.Rows in theDataFrameare serialized for predictions against an Endpoint using aRequestRowSerializer. Responses from an Endpoint containing predictions are deserialized into SparkRows and appended as columns in aDataFrameusing aResponseRowDeserializer.Internally,
SageMakerModel.transformcallsmapPartitionsto distribute the work of serializing SparkRows, constructing and sendingInvokeEndpointRequests to an Endpoint, and deserializingInvokeEndpointResponses across a Spark cluster. Because eachInvokeEndpointRequestcan carry only 5MB, each Spark partition creates acom.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIteratorto iterate over its partition, sending prediction requests to the Endpoint in 5MB increments.RequestRowSerializer.serializeRow()converts aRowto anArray[Byte]. TheRequestBatchIteratorappends these byte arrays to form the request body of anInvokeEndpointRequest.For example, the
com.amazonaws.services.sagemaker.sparksdk.transformation.ProtobufRequestRowSerializercreates one RecordIO-encoded Amazon Record per input row by serializing the “features” column in each row, and wrapping each Amazon Record in the RecordIO header.ResponseRowDeserializer.deserializeResponse()converts anArray[Byte]containing predictions from an Endpoint to anIterator[Row]to appends columns containing these predictions to theDataFramebeing transformed by theSageMakerModel.For comparison, SageMaker’s XGBoost uses LibSVM-formatted data for inference (as well as training), and responds with a comma-delimited list of predictions. Accordingly, SageMaker Spark uses
com.amazonaws.services.sagemaker.sparksdk.transformation.LibSVMRequestRowSerializerto serialize rows into LibSVM-formatted data, and usescom.amazonaws.services.sagemaker.sparksdk.transformation.XGBoostCSVResponseRowDeserializerto deserialize the response into a column of predictions.To support your own model image’s data formats for inference, you can implement your own
RequestRowSerializerandResponseRowDeserializer.License
SageMaker Spark is licensed under Apache-2.0.