目录
Anton Kukushkin

feat: Support S3 Vectors (#3330)

  • feat: Support S3 Vectors

Bring Amazon S3 Vectors — AWS’s native, cost-optimised vector store for similarity search, RAG, and AI agents — to AWS SDK for pandas. With this change, indexing a DataFrame and running an approximate- nearest-neighbour query is a one-liner; on-the-fly embedding via Amazon Bedrock is built in.

Implementation lives in a private subpackage awswrangler/s3/_vectors/ and is re-exported flat on wr.s3.*, matching the s3_tables convention.

Public surface (14 functions on wr.s3):

  • Buckets: create_vector_bucket, delete_vector_bucket,
          list_vector_buckets, get_vector_bucket
  • Indexes: create_vector_index, delete_vector_index,
          list_vector_indexes, get_vector_index
  • Data: put_vectors, put_vectors_from_df, get_vectors,
          delete_vectors, list_vectors, query_vectors

Highlights:

  • End-to-end RAG: pass text_column + bedrock_model_id to put_vectors_from_df and awswrangler embeds each row via Bedrock (Titan / Cohere) and writes the resulting vectors plus all other columns as filterable metadata. query_vectors mirrors this with query_text / query_vector.
  • MongoDB-style metadata filters (eq,eq,ne, gt,gt,gte, lt,lt,lte, in,in,nin, exists,exists,and, $or) evaluated server-side during search.
  • Automatic chunking to AWS API limits (500/put, 100/get, 500/delete) and parallel-segment list_vectors (up to 16 segments).
  • Float32 coercion + non-finite rejection; NaN / pd.NA / None metadata cells dropped per row.
  • test: add mocked unit tests and live integration tests for S3 Vectors
  • tests/unit/test_s3_vectors_mocked.py — 46 tests using unittest.mock, no AWS required. Covers chunking, target resolution, float32 coercion, NaN/pd.NA metadata drop, parallel-segment list, query top-k bounds, Bedrock Titan/Cohere request and response shapes, and aliasing identity.
  • tests/unit/test_s3_vectors.py — 10 live integration tests using new vector_bucket (session-scope) and vector_index (function-scope) fixtures in tests/conftest.py. Fixtures self-bootstrap via create_vector_bucket / create_vector_index; no CDK stack required.
  • docs: api reference and tutorial for S3 Vectors
  • docs/source/api.rst: new “Amazon S3 Vectors” section after “Amazon S3 Tables”, listing all 14 public functions.
  • tutorials/043 - Amazon S3 Vectors.ipynb: end-to-end walkthrough covering bucket/index lifecycle, discovery, Bedrock-embedded writes, semantic queries with metadata filters, per-key CRUD, bulk export, and cleanup.
  • README.md: tutorial 043 entry.
  • fix: type-parametrize np.ndarray for mypy on Python 3.10

  • fix: use hyphen in vector index fixture name (underscore is invalid)

  • fix: avoid zero-norm vectors in live integration tests (cosine rejects them)

  • feat: add chunked parameter to list_vectors

Memory-friendly streaming for indexes too large to materialise in one DataFrame. Mirrors the chunked: bool | int convention used by s3.read_parquet and athena.read_sql_query:

  • chunked=False (default) — unchanged; returns a DataFrame and keeps the parallel-segment fan-out (up to 16 segments).
  • chunked=True — yields one DataFrame per underlying API page.
  • chunked=INTEGER — yields DataFrames of exactly N rows (final frame may be shorter).

Chunked streaming is single-segment and sequential by design; use_threads is ignored in that mode, since lazy iteration across parallel segments would require buffering and defeat the memory win.

Internally, _list_segment is now a thin materialiser around a new _iter_list_pages generator, so the per-segment pagination logic (including max_items enforcement across pages) is shared by both the eager and streaming paths.

Tests: 4 new cases cover per-page yield, exact-size chunking, laziness (no API call before the first next()), and max_items truncation of the chunked stream.

20天前2729次提交

AWS SDK for pandas (awswrangler)

Pandas on AWS

Easy integration with Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

AWS SDK for pandas tracker

An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com

PyPi Conda Python Version Code style: ruff License

Checked with mypy Static Checking Documentation Status

Source Downloads Installation Command
PyPi PyPI Downloads pip install awswrangler
Conda Conda Downloads conda install -c conda-forge awswrangler

⚠️ Starting version 3.0, optional modules must be installed explicitly:
➡️pip install 'awswrangler[redshift]'

Table of contents

Quick Start

Installation command: pip install awswrangler

⚠️ Starting version 3.0, optional modules must be installed explicitly:
➡️pip install 'awswrangler[redshift]'

import awswrangler as wr
import pandas as pd
from datetime import datetime

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()

# Amazon Timestream Write
df = pd.DataFrame({
    "time": [datetime.now(), datetime.now()],   
    "my_dimension": ["foo", "boo"],
    "measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="measure",
    dimensions_cols=["my_dimension"],
)

# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")

At scale

AWS SDK for pandas can also run your workflows at scale by leveraging Modin and Ray. Both projects aim to speed up data workloads by distributing processing over a cluster of workers.

Read our docs or head to our latest tutorials to learn more.

Read The Docs

Getting Help

The best way to interact with our team is through GitHub. You can open an issue and choose from one of our templates for bug reports, feature requests… You may also find help on these community resources:

Logging

Enabling internal logging examples:

import logging
logging.basicConfig(level=logging.INFO, format="[%(name)s][%(funcName)s] %(message)s")
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL)

Into AWS lambda:

import logging
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9 京公网安备 11010802047560号