Skip to main content

Dagster & Airbyte Cloud with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-airbyte library provides an AirbyteCloudWorkspaceComponent which can be used to easily represent Airbyte Cloud connections as assets in Dagster.

Preparing a Dagster project

To begin, you'll need a Dagster project. You can use an existing project ready for components or scaffold a new one:

create-dagster project my-project && cd my-project/src

Next, you will need to add the dagster-airbyte library to the project:

uv add dagster-airbyte

Scaffolding an Airbyte Cloud component

Now that you have a Dagster project, you can scaffold an Airbyte Cloud component. You'll need to provide your Airbyte Cloud workspace ID and API credentials:

dg scaffold defs dagster_airbyte.AirbyteCloudWorkspaceComponent airbyte_ingest \
--workspace-id test_workspace --client-id "{{ env('AIRBYTE_CLIENT_ID') }}" --client-secret "{{ env('AIRBYTE_CLIENT_SECRET') }}"
Creating a component at /.../my-project/src/my_project/defs/airbyte_ingest.

The scaffold call will generate a defs.yaml file:

tree my_project/defs
my_project/defs
├── __init__.py
└── airbyte_ingest
└── defs.yaml

2 directories, 2 files

In its scaffolded form, the defs.yaml file contains the configuration for your Airbyte Cloud workspace:

my_project/defs/airbyte_ingest/defs.yaml
type: dagster_airbyte.AirbyteCloudWorkspaceComponent

attributes:
workspace:
workspace_id: test_workspace
client_id: '{{ env(''AIRBYTE_CLIENT_ID'') }}'
client_secret: '{{ env(''AIRBYTE_CLIENT_SECRET'') }}'

You can check the configuration of your component:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ account │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ company │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ contact │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ opportunity │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ task │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ user │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └─────────────┴─────────┴──────┴───────────┴─────────────┘ │
└─────────┴────────────────────────────────────────────────────────────┘

Selecting specific connections

You can select specific Airbyte Cloud connections to include in your component using the connection_selector key. This allows you to filter which connections are represented as assets:

my_project/defs/airbyte_ingest/defs.yaml
type: dagster_airbyte.AirbyteCloudWorkspaceComponent

attributes:
workspace:
workspace_id: test_workspace
client_id: "{{ env('AIRBYTE_CLIENT_ID') }}"
client_secret: "{{ env('AIRBYTE_CLIENT_SECRET') }}"
connection_selector:
by_name:
- salesforce_to_snowflake
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ account │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ opportunity │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ task │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├─────────────┼─────────┼──────┼───────────┼─────────────┤ │
│ │ │ user │ default │ │ airbyte │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └─────────────┴─────────┴──────┴───────────┴─────────────┘ │
└─────────┴────────────────────────────────────────────────────────────┘

Customizing Airbyte Cloud assets

Properties of the assets emitted by each connection can be customized in the defs.yaml file using the translation key:

my_project/defs/airbyte_ingest/defs.yaml
type: dagster_airbyte.AirbyteCloudWorkspaceComponent

attributes:
workspace:
workspace_id: test_workspace
client_id: "{{ env('AIRBYTE_CLIENT_ID') }}"
client_secret: "{{ env('AIRBYTE_CLIENT_SECRET') }}"
connection_selector:
by_name:
- salesforce_to_snowflake
translation:
group_name: airbyte_data
description: "Loads data from Airbyte connection {{ props.connection_name }}"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ account │ airbyte_data │ │ airbyte │ Loads data from Airbyte connection │ │
│ │ │ │ │ │ snowflake │ salesforce_to_snowflake │ │
│ │ ├─────────────┼──────────────┼──────┼───────────┼────────────────────────────────────────────────────────┤ │
│ │ │ opportunity │ airbyte_data │ │ airbyte │ Loads data from Airbyte connection │ │
│ │ │ │ │ │ snowflake │ salesforce_to_snowflake │ │
│ │ ├─────────────┼──────────────┼──────┼───────────┼────────────────────────────────────────────────────────┤ │
│ │ │ task │ airbyte_data │ │ airbyte │ Loads data from Airbyte connection │ │
│ │ │ │ │ │ snowflake │ salesforce_to_snowflake │ │
│ │ ├─────────────┼──────────────┼──────┼───────────┼────────────────────────────────────────────────────────┤ │
│ │ │ user │ airbyte_data │ │ airbyte │ Loads data from Airbyte connection │ │
│ │ │ │ │ │ snowflake │ salesforce_to_snowflake │ │
│ │ └─────────────┴──────────────┴──────┴───────────┴────────────────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘