The package bundles pre-built JARs for each supported Spark version (3.1–3.5). At runtime, the correct JAR is automatically selected based on your installed PySpark version.
If SPARK_HOME is set, the installer will copy the matching JAR into $SPARK_HOME/jars. For EMR, the path is handled automatically.
This will only install the library on the Driver node.
To distribute the library to all executor nodes, you can create an installation script and add a custom bootstrap while creating the EMR cluster.
For more information, take a look at EMR bootstrap.
Since bootstrap actions are executed before all EMR applications are installed, dependent jars cannot be automatically loaded to SPARK_HOME.
So when submitting your application, please specify dependent jars using:
--jars `feature-store-pyspark-dependency-jars`
SageMaker Notebook
SageMaker Notebook instances may use an older version of Spark. Install a compatible version first:
# Install a version of PySpark compatible with the spark library (3.1 - 3.5)
!pip3 install pyspark==3.5.1
Getting Started
FeatureStoreManager is the interface for all Spark library operations, such as data ingestion and loading feature definitions.
Scala
To ingest a DataFrame into FeatureStore:
import com.amazonaws.services.sagemaker.featurestore.sparksdk.FeatureStoreManager
val featureGroupArn = <your-feature-group-arn>
val featureStoreManager = new FeatureStoreManager()
featureStoreManager.ingestData(dataFrame, featureGroupArn, targetStores = List("OfflineStore"))
If targetStores is specified to List("OfflineStore"), the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this parameter is null (which ingests to both online and offline stores).
To load feature definitions:
val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(inputDataFrame)
After the feature definitions are returned, you can create feature groups using CreateFeatureGroup API.
If target_stores is specified to ["OfflineStore"], the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this parameter is None (which ingests to both online and offline stores).
After the feature definitions are returned, you can create feature groups using CreateFeatureGroup API.
Lake Formation Credential Vending
When your offline store’s S3 location is registered with AWS Lake Formation, the Spark connector can vend temporary credentials scoped to the table’s data location instead of relying on the caller’s IAM permissions for S3 access.
Version Requirements
Lake Formation credential vending requires Spark/PySpark 3.5 or newer. On older Spark versions, the use_lake_formation_credentials / useLakeFormationCredentials parameter is not available (Scala) or will raise ValueError if set to True (Python).
Usage
Scala
val featureStoreManager = new FeatureStoreManager()
featureStoreManager.ingestData(
dataFrame,
featureGroupArn,
targetStores = List("OfflineStore"),
useLakeFormationCredentials = true
)
The IAM role running the Spark job (the “ingestion role”) must have:
lakeformation:GetDataAccess and lakeformation:GetTemporaryGlueTableCredentials.
glue:GetTable, glue:GetDatabase, and glue:GetPartitions on the feature group’s Glue catalog.
sagemaker:DescribeFeatureGroup on the feature group.
The Lake Formation table must have SELECT, INSERT, and DESCRIBE granted on the Table resource to the ingestion role. GetTemporaryGlueTableCredentials validates permissions at the Table level; a column-only SELECT grant is not sufficient and returns Insufficient Lake Formation permission(s): SUPER privileges required on the table.
The Lake Formation account-level settings must allow third-party data access (required so Lake Formation will vend temporary credentials to the Spark connector):
AllowExternalDataFiltering: true permits Amazon EMR (and other third-party engines) to access data in S3 locations registered with Lake Formation.
AllowFullTableExternalDataAccess: true permits third-party engines like Spark to receive data access credentials without session tags, which GetTemporaryGlueTableCredentials requires. Without it, credential vending fails with Not authorized to call GetTemporaryCredentialsForTableV2.
ExternalDataFilteringAllowList must contain the account(s) whose principals will call the connector.
An S3A magic committer implementation must be available. The connector enables the S3A magic committer to let Parquet writes stay within the Lake Formation-scoped S3 prefix.
On EMR 6.15+/7.x: no action required. EMR ships its proprietary SQLEmrOptimizedCommitProtocol which the connector auto-detects and uses.
On AWS Glue, SageMaker Notebook, standalone PySpark, or any other non-EMR runtime: add the open-source spark-hadoop-cloud module at submit time, for example:
If neither is available on the classpath, the connector fails fast with a clear error describing how to add it.
Cross-Account Access
The connector supports cross-account credential vending. This allows a Spark job running in Account B to ingest data into a feature group owned by Account A, as long as the appropriate permissions are in place.
The feature group ARN contains the owning account’s ID, which the connector uses to build the Glue table ARN for credential vending. No additional connector configuration is needed for cross-account access.
Cross-Account Prerequisites
Lake Formation grant (run as Account A, the data owner):
Note: The --enable-hybrid TRUE flag is required if Account A uses Lake Formation hybrid access mode.
Account B IAM permissions: The role running the Spark job needs lakeformation:GetDataAccess and sagemaker:DescribeFeatureGroup on Account A’s feature group.
Troubleshooting
If credential vending fails, check the following:
AccessDeniedException: not authorized to perform lakeformation:GetDataAccess
The Lake Formation table grant is missing. Run the grant-permissions command above from the data owner’s account.
Lake Formation temporary credentials have a TTL of approximately one hour. The connector refreshes credentials immediately before each write, but once a Spark write operation begins, the credentials are fixed for the duration of that write. If the write takes longer than the credential TTL, executors will fail with S3 403 AccessDenied errors.
This can happen with very large DataFrames, small clusters, or skewed partition distributions. To avoid this, break large DataFrames into smaller batches and call ingestData / ingest_data separately for each batch. Each call vends fresh credentials, so keeping individual batches under ~20 minutes of write time provides a comfortable margin. Splitting by event time is recommended since it aligns with the offline store’s partitioning scheme. This approach works for both Iceberg and Glue (Hive-partitioned) table formats.
Credential Visibility in Shared SparkContext Environments
When Lake Formation credential vending is enabled, the connector writes temporary AWS credentials to the Spark session’s Hadoop configuration as per-bucket S3A properties. This is required because Spark distributes the Hadoop configuration to executors for S3 writes.
These credentials are visible to any code running in the same SparkContext. In single-tenant environments (dedicated EMR cluster, standalone PySpark, Docker container), this is not a concern. In multi-tenant environments where multiple users share a SparkContext (e.g., shared EMR notebooks), other code in the same context could read the credentials.
Lake Formation temporary credentials are short-lived (~1 hour TTL) and scoped to a specific S3 prefix. For environments where credential isolation is required, run ingestion jobs on a dedicated cluster or in a separate Spark application.
Development
New Features
This library is built in Scala, and Python methods are actually calling Scala via JVM through wrapper.py. To add more features, please make sure you finish the implementation in Scala first and perhaps you need to update the wrapper so that functionality of Scala and Python are in sync.
Note: Spark 3.5 introduced a breaking change in the ExpressionEncoder API. The library handles this with version-specific source directories (scala-spark-3.5 and scala-spark-3.1-3.4) that are selected at build time.
Test Coverage
Both Scala and Python versions have unit tests covered. In addition to that, we have integration tests for the Python version which verify there are no regressions in terms of functionality.
Scala Package Build
We use scalafmt to auto format the code. Please run sbt scalafmtAll to format your code.
To get the test coverage report and format check result, run sbt jacoco scalafmtCheckAll.
To build for a specific Spark version:
sbt -DSPARK_VERSION=3.5.1 assembly
Python Package Build
We are using tox for test purposes. The test matrix covers Python 3.8–3.12 against PySpark 3.2–3.5. You can check the build by running tox. To configure or figure out the commands we run, please check tox.ini.
Integration Test
The test execution script and tests are included in pyspark-sdk/integration_test. The runner submits each test as a separate EMR step:
BatchIngestionTest.py — end-to-end online/offline ingestion against Glue and Iceberg tables.
LakeFormationHiveIngestionTest.py — end-to-end ingestion with use_lake_formation_credentials=True against a Glue (Hive-partitioned) offline store.
LakeFormationIcebergIngestionTest.py — end-to-end ingestion with use_lake_formation_credentials=True against an Iceberg offline store.
To run:
Fetch the credential from our spark test account first.
Run the test execution script run-spark-integration-test
Integration tests run on Python 3.10 + PySpark 3.5.1.
GitHub Repository Automated Testing
The GitHub repository uses GitHub Actions for CI. The workflow runs unit tests across the full Python/PySpark version matrix and integration tests on Python 3.10 + PySpark 3.5.1. See .github/workflows/integration-tests.yml for details.
SageMaker FeatureStore Spark is an open source Spark library for Amazon SageMaker FeatureStore.
With this spark connector, you can easily ingest data to FeatureGroup’s online and offline store from Spark
DataFrame.Supported Versions
Python / PySpark Compatibility Matrix
Installation
Scala
After the library is imported, you can build your application into a jar and submit the application using
spark-shellorspark-submit.The Scala SDK supports cross-building for Spark 3.1 through 3.5. Specify the target Spark version at build time:
EMR
Once you import the spark library as a dependency in your application, you should be able to submit the spark job according to this EMR documentation.
Python
Please make sure that the environment has PySpark and Numpy installed.
The spark library is available on PyPi.
The package bundles pre-built JARs for each supported Spark version (3.1–3.5). At runtime, the correct JAR is automatically selected based on your installed PySpark version.
If
SPARK_HOMEis set, the installer will copy the matching JAR into$SPARK_HOME/jars. For EMR, the path is handled automatically.To install the library:
EMR
Create a custom jar step of EMR to start the library installation.
If your EMR has single node:
This will only install the library on the
Drivernode.To distribute the library to all executor nodes, you can create an installation script and add a custom bootstrap while creating the EMR cluster.
For more information, take a look at EMR bootstrap. Since bootstrap actions are executed before all EMR applications are installed, dependent jars cannot be automatically loaded to
SPARK_HOME. So when submitting your application, please specify dependent jars using:SageMaker Notebook
SageMaker Notebook instances may use an older version of Spark. Install a compatible version first:
Getting Started
FeatureStoreManageris the interface for all Spark library operations, such as data ingestion and loading feature definitions.Scala
To ingest a DataFrame into FeatureStore:
If
targetStoresis specified toList("OfflineStore"), the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this parameter is null (which ingests to both online and offline stores).To load feature definitions:
After the feature definitions are returned, you can create feature groups using
CreateFeatureGroupAPI.Python
To ingest a DataFrame into FeatureStore:
If
target_storesis specified to["OfflineStore"], the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this parameter is None (which ingests to both online and offline stores).To load feature definitions:
After the feature definitions are returned, you can create feature groups using
CreateFeatureGroupAPI.Lake Formation Credential Vending
When your offline store’s S3 location is registered with AWS Lake Formation, the Spark connector can vend temporary credentials scoped to the table’s data location instead of relying on the caller’s IAM permissions for S3 access.
Version Requirements
Lake Formation credential vending requires Spark/PySpark 3.5 or newer. On older Spark versions, the
use_lake_formation_credentials/useLakeFormationCredentialsparameter is not available (Scala) or will raiseValueErrorif set toTrue(Python).Usage
Scala
Python
Prerequisites
The offline store S3 location must be registered with Lake Formation. You can use the SageMaker Python SDK to enable Lake Formation governance of a Feature Group’s offline store.
The IAM role running the Spark job (the “ingestion role”) must have:
lakeformation:GetDataAccessandlakeformation:GetTemporaryGlueTableCredentials.glue:GetTable,glue:GetDatabase, andglue:GetPartitionson the feature group’s Glue catalog.sagemaker:DescribeFeatureGroupon the feature group.The Lake Formation table must have
SELECT,INSERT, andDESCRIBEgranted on the Table resource to the ingestion role.GetTemporaryGlueTableCredentialsvalidates permissions at the Table level; a column-onlySELECTgrant is not sufficient and returnsInsufficient Lake Formation permission(s): SUPER privileges required on the table.The Lake Formation account-level settings must allow third-party data access (required so Lake Formation will vend temporary credentials to the Spark connector):
AllowExternalDataFiltering: truepermits Amazon EMR (and other third-party engines) to access data in S3 locations registered with Lake Formation.AllowFullTableExternalDataAccess: truepermits third-party engines like Spark to receive data access credentials without session tags, whichGetTemporaryGlueTableCredentialsrequires. Without it, credential vending fails withNot authorized to call GetTemporaryCredentialsForTableV2.ExternalDataFilteringAllowListmust contain the account(s) whose principals will call the connector.See the AWS docs on external data filtering for details.
An S3A magic committer implementation must be available. The connector enables the S3A magic committer to let Parquet writes stay within the Lake Formation-scoped S3 prefix.
On EMR 6.15+/7.x: no action required. EMR ships its proprietary
SQLEmrOptimizedCommitProtocolwhich the connector auto-detects and uses.On AWS Glue, SageMaker Notebook, standalone PySpark, or any other non-EMR runtime: add the open-source
spark-hadoop-cloudmodule at submit time, for example:Or when building the
SparkSessionprogrammatically:If neither is available on the classpath, the connector fails fast with a clear error describing how to add it.
Cross-Account Access
The connector supports cross-account credential vending. This allows a Spark job running in Account B to ingest data into a feature group owned by Account A, as long as the appropriate permissions are in place.
The feature group ARN contains the owning account’s ID, which the connector uses to build the Glue table ARN for credential vending. No additional connector configuration is needed for cross-account access.
Cross-Account Prerequisites
Lake Formation grant (run as Account A, the data owner):
Glue cross-account access (run as Account A):
Account B IAM permissions: The role running the Spark job needs
lakeformation:GetDataAccessandsagemaker:DescribeFeatureGroupon Account A’s feature group.Troubleshooting
If credential vending fails, check the following:
AccessDeniedException: not authorized to perform lakeformation:GetDataAccessgrant-permissionscommand above from the data owner’s account.AccessDeniedException: Insufficient Glue permissions to access tableput-resource-policycommand above.AccessDeniedException: not authorized to perform glue:GetTableglue:GetTablefor the caller’s account on the data owner’s catalog.Credentials vended but S3 access denied
General debugging
aws sts get-caller-identityKnown Limitations
Credential Expiry on Large DataFrames
Lake Formation temporary credentials have a TTL of approximately one hour. The connector refreshes credentials immediately before each write, but once a Spark write operation begins, the credentials are fixed for the duration of that write. If the write takes longer than the credential TTL, executors will fail with S3
403 AccessDeniederrors.This can happen with very large DataFrames, small clusters, or skewed partition distributions. To avoid this, break large DataFrames into smaller batches and call
ingestData/ingest_dataseparately for each batch. Each call vends fresh credentials, so keeping individual batches under ~20 minutes of write time provides a comfortable margin. Splitting by event time is recommended since it aligns with the offline store’s partitioning scheme. This approach works for both Iceberg and Glue (Hive-partitioned) table formats.Credential Visibility in Shared SparkContext Environments
When Lake Formation credential vending is enabled, the connector writes temporary AWS credentials to the Spark session’s Hadoop configuration as per-bucket S3A properties. This is required because Spark distributes the Hadoop configuration to executors for S3 writes.
These credentials are visible to any code running in the same
SparkContext. In single-tenant environments (dedicated EMR cluster, standalone PySpark, Docker container), this is not a concern. In multi-tenant environments where multiple users share aSparkContext(e.g., shared EMR notebooks), other code in the same context could read the credentials.Lake Formation temporary credentials are short-lived (~1 hour TTL) and scoped to a specific S3 prefix. For environments where credential isolation is required, run ingestion jobs on a dedicated cluster or in a separate Spark application.
Development
New Features
This library is built in Scala, and Python methods are actually calling Scala via JVM through
wrapper.py. To add more features, please make sure you finish the implementation in Scala first and perhaps you need to update the wrapper so that functionality of Scala and Python are in sync.Note: Spark 3.5 introduced a breaking change in the
ExpressionEncoderAPI. The library handles this with version-specific source directories (scala-spark-3.5andscala-spark-3.1-3.4) that are selected at build time.Test Coverage
Both Scala and Python versions have unit tests covered. In addition to that, we have integration tests for the Python version which verify there are no regressions in terms of functionality.
Scala Package Build
We use
scalafmtto auto format the code. Please runsbt scalafmtAllto format your code.To get the test coverage report and format check result, run
sbt jacoco scalafmtCheckAll.To build for a specific Spark version:
Python Package Build
We are using
toxfor test purposes. The test matrix covers Python 3.8–3.12 against PySpark 3.2–3.5. You can check the build by runningtox. To configure or figure out the commands we run, please checktox.ini.Integration Test
The test execution script and tests are included in
pyspark-sdk/integration_test. The runner submits each test as a separate EMR step:BatchIngestionTest.py— end-to-end online/offline ingestion against Glue and Iceberg tables.LakeFormationHiveIngestionTest.py— end-to-end ingestion withuse_lake_formation_credentials=Trueagainst a Glue (Hive-partitioned) offline store.LakeFormationIcebergIngestionTest.py— end-to-end ingestion withuse_lake_formation_credentials=Trueagainst an Iceberg offline store.To run:
run-spark-integration-testIntegration tests run on Python 3.10 + PySpark 3.5.1.
GitHub Repository Automated Testing
The GitHub repository uses GitHub Actions for CI. The workflow runs unit tests across the full Python/PySpark version matrix and integration tests on Python 3.10 + PySpark 3.5.1. See
.github/workflows/integration-tests.ymlfor details.More Reference
Spark Application on EMR
Add Spark EMR Steps
License
This project is licensed under the Apache-2.0 License.