VirtuousAI

Data Sources & Extraction

Deep dive into the dlt_extract action and available data sources

Data Sources & Extraction

VirtuousAI uses dlt (data load tool) under the hood for reliable data extraction. This guide covers how the dlt_extract action works and the available data sources.

How dlt_extract Works

The dlt_extract action:

  1. Resolves credentials from the specified connection
  2. Builds a dlt pipeline targeting your bronze data layer
  3. Extracts resources one at a time with checkpointing between each
  4. Writes Parquet files to S3 with Hive-style partitioning
  5. Tracks state for incremental extraction on subsequent runs

Per-Resource Checkpointing

Resources are extracted sequentially with state committed after each:

StepWhat Happens
1Check cancellation flag
2Build dlt source for single resource
3Run extraction in thread pool
4Commit dlt state
5Update progress callback
6Move to next resource

This means if extraction is interrupted (worker crash, cancellation), it resumes from the last completed resource — not from the beginning.

Cancellation is cooperative — checked between resources. The current resource completes before graceful exit.

Available Sources

Shopify

Extract e-commerce data from Shopify stores.

Resources:

ResourceSizeAPIIncrementalDescription
ordersXLRESTYes (updated_at)Order data with line items, sliced in 7-day windows
customersLRESTYes (updated_at)Customer profiles, sliced in 7-day windows
productsMRESTYes (updated_at)Product catalog
locationsXSGraphQLNo (full replace)Store locations (warehouses, retail stores)

The locations resource uses Shopify's GraphQL Admin API instead of REST. It's a small, reference dataset that syncs via full replace on each run.

Default Resources: orders, customers, products, locations

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "slug": "shopify" },
  "definition": {
    "source": "shopify",
    "resources": ["orders", "products", "locations"],
    "start_date": "2026-01-01",
    "source_config": {
      "items_per_page": 250
    }
  }
}

Source Config Options:

OptionTypeDefaultDescription
items_per_pageint250Pagination size (1-250)
order_statusstringanyFilter orders: any, open, closed, cancelled

Incremental Extraction:

  • Orders: Uses updated_at cursor, sliced into 7-day windows for large stores
  • Products: Uses updated_at cursor
  • Customers: Uses updated_at cursor, sliced into 7-day windows for large stores
  • Locations: Full sync (reference data, typically under 100 records)

REST API (Generic)

Extract data from any REST API endpoint.

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "id": "conn_rest_api" },
  "definition": {
    "source": "rest_api",
    "resources": ["customers"],
    "source_config": {
      "base_url": "https://api.example.com/v1",
      "endpoints": {
        "customers": {
          "path": "/customers",
          "paginate": {
            "type": "cursor",
            "cursor_param": "after"
          }
        }
      }
    }
  }
}

Klaviyo

Extract email and SMS marketing data from Klaviyo.

Resources:

ResourceIncrementalCursor FieldDescription
profilesYesattributes.updatedCustomer profiles and attributes
eventsYesattributes.datetimeEngagement events (opens, clicks, etc.)
listsYesattributes.updatedEmail/SMS lists
campaignsNoEmail and SMS campaigns
metricsNoCustom metric definitions
flowsNoAutomation flows
segmentsNoDynamic segments
tagsNoOrganization tags

Default Resources: profiles, events, lists

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "slug": "klaviyo" },
  "definition": {
    "source": "klaviyo",
    "resources": ["profiles", "events", "lists"],
    "start_date": "2026-01-01"
  }
}

Credentials Required:

  • api_key — Klaviyo Private API Key (starts with pk_)

Incremental Extraction:

  • Profiles: Uses updated cursor for changes
  • Events: Uses datetime cursor for new events
  • Lists: Uses updated cursor for changes
  • Other resources: Full sync (reference data)

Klaviyo's API has strict rate limits. The connector uses pre-emptive rate limiting to avoid 429 errors entirely — see Rate Limiting below.

Amazon Seller (SP-API)

Extract e-commerce data from Amazon Seller Central accounts using the Selling Partner API.

Resources:

ResourceIncrementalCursor FieldDescription
ordersYeslast-updated-dateCustomer orders and order details
inventoryNoFBA inventory levels and status

Default Resources: orders

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "slug": "amazon-seller" },
  "definition": {
    "source": "amazon_sp",
    "resources": ["orders", "inventory"],
    "start_date": "2026-01-01"
  }
}

Credentials Required:

FieldDescriptionExample
lwa_client_idLWA Application IDamzn1.application-oa2-client.xxx
lwa_client_secretLWA Application Secretamzn1.oa2-cs.v1.xxx
lwa_refresh_tokenOAuth refresh tokenAtzr|xxx
regionAPI region (na, eu, fe)na
marketplace_idTarget marketplaceATVPDKIKX0DER (US)

Marketplace IDs:

RegionMarketplaceID
NAUnited StatesATVPDKIKX0DER
NACanadaA2EUQ1WTGCTBG2
NAMexicoA1AM78C64UM0Y8
EUUnited KingdomA1F83G8C2ARO7P
EUGermanyA1PA6795UKMFR9
FEJapanA1VC38T7YXB528
FEAustraliaA39IBJ37TRP1C6

Incremental Extraction:

  • Orders: Uses last-updated-date cursor for changes
  • Inventory: Full sync (returns current state snapshot)

Amazon SP-API has aggressive rate limits. The connector uses the Reports API for bulk extraction and pre-emptive rate limiting with 65-second intervals between API calls — see Rate Limiting below.

Adding New Sources

VirtuousAI's source registry is extensible. Contact support for custom source integrations or check the API for programmatic source registration.

Resource Ordering

When extracting multiple resources, VirtuousAI uses size-optimized ordering by default — extracting smaller, faster resources first before heavier ones.

Size Classes

Each resource is classified by expected data volume:

Size ClassDescriptionExamples
XSReference data, under 100 recordstags, metrics
SSmall, under 1K recordslists, segments
MMedium, 1K–100K recordscampaigns, flows
LLarge, 100K–1M recordsorders, products
XLExtra large, 1M+ recordsprofiles, events

Ordering Behavior

ScenarioBehavior
No resources specifiedUses source defaults in size-optimized order
Resources explicitly listedPreserves your order exactly as specified
Default extractionXS → S → M → L → XL (fast first, heavy last)

Example: For Klaviyo with default resources, the extraction order is:

  1. lists (S) — fast reference data
  2. profiles (XL) — large but typically faster than events
  3. events (XL) — highest volume, runs last

This ensures you get partial results quickly and heavy resources don't block lighter ones.

Preserving Custom Order

If you specify resources explicitly, your order is preserved:

{
  "definition": {
    "source": "klaviyo",
    "resources": ["events", "profiles", "lists"]
  }
}

This extracts in exactly that order, regardless of size class.

Rate Limiting

VirtuousAI implements pre-emptive rate limiting to avoid hitting vendor API limits entirely — rather than reacting to 429 errors after they occur.

How It Works

Each source has a dedicated rate limiter that enforces minimum intervals between API calls:

Amazon SP-API Rate Limits

Amazon's Reports API has strict sustained rate limits:

OperationLimitOur IntervalWhy
createReport1/60s sustained65s5s safety buffer
getReportDocument1/60s sustained65s5s safety buffer
getReport (polling)2/s sustained0.5sPoll status while waiting

Runtime Impact: For historical syncs with many time slices, expect ~65 seconds per slice for report creation/retrieval. A 118-slice sync takes approximately 4-5 hours.

Klaviyo Rate Limits

Klaviyo has per-endpoint rate limits with burst capacity:

EndpointBurstSustainedOur Interval
Most endpoints75/s700/min0.1s (~10 req/s)
Events350/s3500/min0.02s (~50 req/s)

Why pre-emptive? Klaviyo's rate limits are aggressive. By staying well under the limit, we avoid 429 responses and the exponential backoff delays they cause.

Reactive Fallback

If a 429 response does occur (e.g., shared API key with other systems), the connector:

  1. Reads the Retry-After header
  2. Waits the specified duration (up to 5 minutes for rate limits)
  3. Retries with a time budget (10 minutes total retry window)
{
  "error": {
    "code": "RATE_LIMITED",
    "message": "Rate limited by vendor API",
    "details": {
      "retry_after_seconds": 120,
      "budget_remaining_seconds": 480
    },
    "retryable": true
  }
}

Extraction Modes

Incremental (Default)

Only fetches new or modified records since last extraction:

{
  "definition": {
    "source": "shopify",
    "resources": ["orders"],
    "incremental": true
  }
}

How it works:

  1. dlt maintains state with last cursor position
  2. Each run queries only records after the cursor
  3. Dramatically reduces API calls and processing time

Full Resync

Drops existing state and extracts all data:

{
  "definition": {
    "source": "shopify",
    "resources": ["orders"],
    "full_resync": true
  }
}

When to use:

  • Schema changes in source system
  • Data corruption in bronze layer
  • Initial backfill with new date range

Full resync drops the dlt pipeline state. Use sparingly on large datasets.

Output Format

Bronze Layer Structure

Extracted data is written to S3 as Parquet files:

s3://vai-bronze-{env}/
└── {tenant_id}/
    └── {connection_id}/
        └── {source}/
            └── {resource}/
                └── year=2026/
                    └── month=01/
                        └── day=22/
                            └── data.parquet

Metrics

Each extraction returns detailed metrics:

{
  "pipeline_name": "bronze__tenant123__conn456__shopify",
  "rows_extracted": 1250,
  "files_written": 3,
  "bytes_written": 2456789,
  "resources_loaded": ["orders", "order_line_items", "products"],
  "per_resource_metrics": {
    "orders": { "rows_extracted": 500, "files_written": 1 },
    "order_line_items": { "rows_extracted": 1500, "files_written": 1 },
    "products": { "rows_extracted": 250, "files_written": 1 }
  },
  "duration_seconds": 45.2,
  "state_fingerprint": "abc123..."
}

Error Handling

Retryable Errors

ErrorCauseResolution
RATE_LIMITEDAPI rate limit hitAutomatic retry with backoff
CONNECTION_ERRORNetwork timeoutAutomatic retry
EXTRACTION_ERRORTransient failureAutomatic retry

Non-Retryable Errors

ErrorCauseResolution
AUTH_ERRORInvalid credentialsUpdate connection credentials
CONFIGURATION_ERRORInvalid source/resourceFix action definition

Resume After Failure

If extraction fails mid-way, the next run resumes from where it left off:

{
  "result": {
    "resumed_from": ["orders", "order_line_items"],
    "resources_loaded": ["orders", "order_line_items", "products"]
  }
}

The progress is tracked via:

{
  "progress": {
    "completed_resources": ["orders", "order_line_items"],
    "total_resources": 3
  }
}

Heartbeat & Lease

Long-running extractions use the lease system:

ParameterValue
Lease Duration90 seconds
HeartbeatEvery 30 seconds during extraction
Watchdog Grace180 seconds

The ExtractionHeartbeat context manager extends the lease during pipeline execution:

with ExtractionHeartbeat(run_id, pipeline_name):
    # dlt pipeline runs here
    # Heartbeat extends lease automatically

Scheduling Extractions

For regular data syncs, create an automation:

vai automations create \
  --name "Daily Shopify Sync" \
  --trigger-type schedule \
  --config '{
    "schedule": "0 6 * * *",
    "timezone": "UTC",
    "action": {
      "kind": "dlt_extract",
      "connectionRef": {"slug": "shopify"},
      "definition": {
        "source": "shopify",
        "resources": ["orders", "products"],
        "incremental": true
      }
    }
  }'

Best Practices

  1. Use incremental mode — Only use full_resync when necessary
  2. Limit resources — Extract only the resources you need
  3. Set date ranges — Use start_date to limit historical data
  4. Monitor metrics — Track rows_extracted to detect anomalies
  5. Handle rate limits — The executor handles this, but schedule during off-peak hours

Next Steps

On this page