Data Sources & Extraction
Deep dive into the dlt_extract action and available data sources
Data Sources & Extraction
VirtuousAI uses dlt (data load tool) under the hood for reliable data extraction. This guide covers how the dlt_extract action works and the available data sources.
How dlt_extract Works
The dlt_extract action:
- Resolves credentials from the specified connection
- Builds a dlt pipeline targeting your bronze data layer
- Extracts resources one at a time with checkpointing between each
- Writes Parquet files to S3 with Hive-style partitioning
- Tracks state for incremental extraction on subsequent runs
Per-Resource Checkpointing
Resources are extracted sequentially with state committed after each:
| Step | What Happens |
|---|---|
| 1 | Check cancellation flag |
| 2 | Build dlt source for single resource |
| 3 | Run extraction in thread pool |
| 4 | Commit dlt state |
| 5 | Update progress callback |
| 6 | Move to next resource |
This means if extraction is interrupted (worker crash, cancellation), it resumes from the last completed resource — not from the beginning.
Cancellation is cooperative — checked between resources. The current resource completes before graceful exit.
Available Sources
Shopify
Extract e-commerce data from Shopify stores.
Resources:
| Resource | Size | API | Incremental | Description |
|---|---|---|---|---|
orders | XL | REST | Yes (updated_at) | Order data with line items, sliced in 7-day windows |
customers | L | REST | Yes (updated_at) | Customer profiles, sliced in 7-day windows |
products | M | REST | Yes (updated_at) | Product catalog |
locations | XS | GraphQL | No (full replace) | Store locations (warehouses, retail stores) |
The locations resource uses Shopify's GraphQL Admin API instead of REST. It's a small, reference dataset that syncs via full replace on each run.
Default Resources: orders, customers, products, locations
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "slug": "shopify" },
"definition": {
"source": "shopify",
"resources": ["orders", "products", "locations"],
"start_date": "2026-01-01",
"source_config": {
"items_per_page": 250
}
}
}Source Config Options:
| Option | Type | Default | Description |
|---|---|---|---|
items_per_page | int | 250 | Pagination size (1-250) |
order_status | string | any | Filter orders: any, open, closed, cancelled |
Incremental Extraction:
- Orders: Uses
updated_atcursor, sliced into 7-day windows for large stores - Products: Uses
updated_atcursor - Customers: Uses
updated_atcursor, sliced into 7-day windows for large stores - Locations: Full sync (reference data, typically under 100 records)
REST API (Generic)
Extract data from any REST API endpoint.
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "id": "conn_rest_api" },
"definition": {
"source": "rest_api",
"resources": ["customers"],
"source_config": {
"base_url": "https://api.example.com/v1",
"endpoints": {
"customers": {
"path": "/customers",
"paginate": {
"type": "cursor",
"cursor_param": "after"
}
}
}
}
}
}Klaviyo
Extract email and SMS marketing data from Klaviyo.
Resources:
| Resource | Incremental | Cursor Field | Description |
|---|---|---|---|
profiles | Yes | attributes.updated | Customer profiles and attributes |
events | Yes | attributes.datetime | Engagement events (opens, clicks, etc.) |
lists | Yes | attributes.updated | Email/SMS lists |
campaigns | No | — | Email and SMS campaigns |
metrics | No | — | Custom metric definitions |
flows | No | — | Automation flows |
segments | No | — | Dynamic segments |
tags | No | — | Organization tags |
Default Resources: profiles, events, lists
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "slug": "klaviyo" },
"definition": {
"source": "klaviyo",
"resources": ["profiles", "events", "lists"],
"start_date": "2026-01-01"
}
}Credentials Required:
api_key— Klaviyo Private API Key (starts withpk_)
Incremental Extraction:
- Profiles: Uses
updatedcursor for changes - Events: Uses
datetimecursor for new events - Lists: Uses
updatedcursor for changes - Other resources: Full sync (reference data)
Klaviyo's API has strict rate limits. The connector uses pre-emptive rate limiting to avoid 429 errors entirely — see Rate Limiting below.
Amazon Seller (SP-API)
Extract e-commerce data from Amazon Seller Central accounts using the Selling Partner API.
Resources:
| Resource | Incremental | Cursor Field | Description |
|---|---|---|---|
orders | Yes | last-updated-date | Customer orders and order details |
inventory | No | — | FBA inventory levels and status |
Default Resources: orders
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "slug": "amazon-seller" },
"definition": {
"source": "amazon_sp",
"resources": ["orders", "inventory"],
"start_date": "2026-01-01"
}
}Credentials Required:
| Field | Description | Example |
|---|---|---|
lwa_client_id | LWA Application ID | amzn1.application-oa2-client.xxx |
lwa_client_secret | LWA Application Secret | amzn1.oa2-cs.v1.xxx |
lwa_refresh_token | OAuth refresh token | Atzr|xxx |
region | API region (na, eu, fe) | na |
marketplace_id | Target marketplace | ATVPDKIKX0DER (US) |
Marketplace IDs:
| Region | Marketplace | ID |
|---|---|---|
| NA | United States | ATVPDKIKX0DER |
| NA | Canada | A2EUQ1WTGCTBG2 |
| NA | Mexico | A1AM78C64UM0Y8 |
| EU | United Kingdom | A1F83G8C2ARO7P |
| EU | Germany | A1PA6795UKMFR9 |
| FE | Japan | A1VC38T7YXB528 |
| FE | Australia | A39IBJ37TRP1C6 |
Incremental Extraction:
- Orders: Uses
last-updated-datecursor for changes - Inventory: Full sync (returns current state snapshot)
Amazon SP-API has aggressive rate limits. The connector uses the Reports API for bulk extraction and pre-emptive rate limiting with 65-second intervals between API calls — see Rate Limiting below.
Adding New Sources
VirtuousAI's source registry is extensible. Contact support for custom source integrations or check the API for programmatic source registration.
Resource Ordering
When extracting multiple resources, VirtuousAI uses size-optimized ordering by default — extracting smaller, faster resources first before heavier ones.
Size Classes
Each resource is classified by expected data volume:
| Size Class | Description | Examples |
|---|---|---|
| XS | Reference data, under 100 records | tags, metrics |
| S | Small, under 1K records | lists, segments |
| M | Medium, 1K–100K records | campaigns, flows |
| L | Large, 100K–1M records | orders, products |
| XL | Extra large, 1M+ records | profiles, events |
Ordering Behavior
| Scenario | Behavior |
|---|---|
| No resources specified | Uses source defaults in size-optimized order |
| Resources explicitly listed | Preserves your order exactly as specified |
| Default extraction | XS → S → M → L → XL (fast first, heavy last) |
Example: For Klaviyo with default resources, the extraction order is:
lists(S) — fast reference dataprofiles(XL) — large but typically faster than eventsevents(XL) — highest volume, runs last
This ensures you get partial results quickly and heavy resources don't block lighter ones.
Preserving Custom Order
If you specify resources explicitly, your order is preserved:
{
"definition": {
"source": "klaviyo",
"resources": ["events", "profiles", "lists"]
}
}This extracts in exactly that order, regardless of size class.
Rate Limiting
VirtuousAI implements pre-emptive rate limiting to avoid hitting vendor API limits entirely — rather than reacting to 429 errors after they occur.
How It Works
Each source has a dedicated rate limiter that enforces minimum intervals between API calls:
Amazon SP-API Rate Limits
Amazon's Reports API has strict sustained rate limits:
| Operation | Limit | Our Interval | Why |
|---|---|---|---|
createReport | 1/60s sustained | 65s | 5s safety buffer |
getReportDocument | 1/60s sustained | 65s | 5s safety buffer |
getReport (polling) | 2/s sustained | 0.5s | Poll status while waiting |
Runtime Impact: For historical syncs with many time slices, expect ~65 seconds per slice for report creation/retrieval. A 118-slice sync takes approximately 4-5 hours.
Klaviyo Rate Limits
Klaviyo has per-endpoint rate limits with burst capacity:
| Endpoint | Burst | Sustained | Our Interval |
|---|---|---|---|
| Most endpoints | 75/s | 700/min | 0.1s (~10 req/s) |
| Events | 350/s | 3500/min | 0.02s (~50 req/s) |
Why pre-emptive? Klaviyo's rate limits are aggressive. By staying well under the limit, we avoid 429 responses and the exponential backoff delays they cause.
Reactive Fallback
If a 429 response does occur (e.g., shared API key with other systems), the connector:
- Reads the
Retry-Afterheader - Waits the specified duration (up to 5 minutes for rate limits)
- Retries with a time budget (10 minutes total retry window)
{
"error": {
"code": "RATE_LIMITED",
"message": "Rate limited by vendor API",
"details": {
"retry_after_seconds": 120,
"budget_remaining_seconds": 480
},
"retryable": true
}
}Extraction Modes
Incremental (Default)
Only fetches new or modified records since last extraction:
{
"definition": {
"source": "shopify",
"resources": ["orders"],
"incremental": true
}
}How it works:
- dlt maintains state with last cursor position
- Each run queries only records after the cursor
- Dramatically reduces API calls and processing time
Full Resync
Drops existing state and extracts all data:
{
"definition": {
"source": "shopify",
"resources": ["orders"],
"full_resync": true
}
}When to use:
- Schema changes in source system
- Data corruption in bronze layer
- Initial backfill with new date range
Full resync drops the dlt pipeline state. Use sparingly on large datasets.
Output Format
Bronze Layer Structure
Extracted data is written to S3 as Parquet files:
s3://vai-bronze-{env}/
└── {tenant_id}/
└── {connection_id}/
└── {source}/
└── {resource}/
└── year=2026/
└── month=01/
└── day=22/
└── data.parquetMetrics
Each extraction returns detailed metrics:
{
"pipeline_name": "bronze__tenant123__conn456__shopify",
"rows_extracted": 1250,
"files_written": 3,
"bytes_written": 2456789,
"resources_loaded": ["orders", "order_line_items", "products"],
"per_resource_metrics": {
"orders": { "rows_extracted": 500, "files_written": 1 },
"order_line_items": { "rows_extracted": 1500, "files_written": 1 },
"products": { "rows_extracted": 250, "files_written": 1 }
},
"duration_seconds": 45.2,
"state_fingerprint": "abc123..."
}Error Handling
Retryable Errors
| Error | Cause | Resolution |
|---|---|---|
RATE_LIMITED | API rate limit hit | Automatic retry with backoff |
CONNECTION_ERROR | Network timeout | Automatic retry |
EXTRACTION_ERROR | Transient failure | Automatic retry |
Non-Retryable Errors
| Error | Cause | Resolution |
|---|---|---|
AUTH_ERROR | Invalid credentials | Update connection credentials |
CONFIGURATION_ERROR | Invalid source/resource | Fix action definition |
Resume After Failure
If extraction fails mid-way, the next run resumes from where it left off:
{
"result": {
"resumed_from": ["orders", "order_line_items"],
"resources_loaded": ["orders", "order_line_items", "products"]
}
}The progress is tracked via:
{
"progress": {
"completed_resources": ["orders", "order_line_items"],
"total_resources": 3
}
}Heartbeat & Lease
Long-running extractions use the lease system:
| Parameter | Value |
|---|---|
| Lease Duration | 90 seconds |
| Heartbeat | Every 30 seconds during extraction |
| Watchdog Grace | 180 seconds |
The ExtractionHeartbeat context manager extends the lease during pipeline execution:
with ExtractionHeartbeat(run_id, pipeline_name):
# dlt pipeline runs here
# Heartbeat extends lease automaticallyScheduling Extractions
For regular data syncs, create an automation:
vai automations create \
--name "Daily Shopify Sync" \
--trigger-type schedule \
--config '{
"schedule": "0 6 * * *",
"timezone": "UTC",
"action": {
"kind": "dlt_extract",
"connectionRef": {"slug": "shopify"},
"definition": {
"source": "shopify",
"resources": ["orders", "products"],
"incremental": true
}
}
}'Best Practices
- Use incremental mode — Only use full_resync when necessary
- Limit resources — Extract only the resources you need
- Set date ranges — Use
start_dateto limit historical data - Monitor metrics — Track rows_extracted to detect anomalies
- Handle rate limits — The executor handles this, but schedule during off-peak hours