Variables and functions
Inside a Python script, you get access to some useful variables and functions.
context
Variable​
A context
object with information relevant to the model through which the script was run. For the meta
Syntax example, we would get the following:
context.current_model.name
# str
#= historical_ozone_levels
context.current_model.status
# NodeStatus, enum: 'success' | 'error' | 'skipped'
context.current_model.columns
# Dict[str, ColumnInfo(name: str, tags: List[str], meta: Dict)]
context.current_model.tests
# List[CurrentTest(name: str, modelname: str, column: str, status: str)]
context.current_model.meta
# meta information in the schema.yml
#= {'owner': '@me'}
context
object also has access to test information related to the current model. If the previous dbt command was either test
or build
, the context.current_model.test
property is populated with a list of tests:
context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]
Read functions​
The familiar dbt functions ref
and source
are available in fal scripts to read the models and sources as a Pandas DataFrame.
ref
function​
The ref
function is used exactly like in dbt
. You reference a model in your project
# returned as `pandas.DataFrame`
df = ref('model_name')
Or a package model (package first, model second)
df = ref('dbt_artifacts', 'dim_dbt__exposures')
You can use the context variable to the the associated model data
df = ref(context.current_model.name)
source
function​
The source
function is used exactly like in dbt
. You reference a source in your project
# returned as `pandas.DataFrame`
df = source('source_name', 'table_name')
Write functions​
It is also possible to send data back to your data warehouse. This makes it easy to get the data, process it, and upload it back into dbt territory.
write_to_source
function​
You first have to define the source in your schema. This operation appends to the existing source by default and should only be used targetting tables, not views.
# Upload a `pandas.DataFrame` back to the data warehouse
write_to_source(df, 'source_name', 'table_name2')
write_to_source
also accepts an optional dtype
argument, which lets you specify datatypes of columns. It works the same way as dtype
argument for DataFrame.to_sql
function.
from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'value': Integer()})
write_to_model
function​
This operation overwrites the existing relation by default and should only be used targetting tables, not views.
For example, if the script is attached to the zendesk_ticket_metrics
model,
models:
- name: zendesk_ticket_metrics
meta:
fal:
scripts:
after:
- from_zendesk_ticket_data.py
write_to_model
will write to the zendesk_ticket_metrics
table:
df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)
# Upload a `pandas.DataFrame` back to the data warehouse
write_to_model(df) # writes to attached model: zendesk_ticket_metrics
NOTE: When used with
fal flow run
orfal run
commands,write_to_model
does not accept a model name, it only operates on the associated model.
But when importing fal
as a Python module, you have to specify the model to write to:
from fal import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")
faldbt.list_models()
# {
# 'zendesk_ticket_metrics': <RunStatus.Success: 'success'>,
# 'stg_zendesk_ticket_data': <RunStatus.Success: 'success'>,
# }
df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)
faldbt.write_to_model(df, 'zendesk_ticket_metrics') # specify the model
Specifying column types​
The functions write_to_source
and write_to_model
also accept an optional dtype
argument, which lets you specify datatypes of columns.
It works the same way as dtype
argument for DataFrame.to_sql
function.
from sqlalchemy.types import Integer
# Upload but specifically create the `my_col` column with type `integer`
# Can be specially useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'my_col': Integer()})
Modes of writing​
These functions accepts two modes of writing: append
and overwrite
.
They are passed with the optional mode
argument (append
is the default value).
# Overwrite the table with the dataframe data, deleting old data
write_to_source(df, 'source_name', 'table_name', mode='overwrite')
write_to_model(df, 'model_name', mode='overwrite') # default mode
# Append more data to the existing table (create it if it does not exist)
write_to_source(df2, 'source_name', 'table_name', mode='append') # default mode
write_to_model(df2, 'model_name', mode='apend')
The append
mode​
- creates the table if it does not exist yet
- insert data into the table
The overwrite
mode​
- creates a temporal table
- insert data into the temporal table
- drops the old table if it exists
- renames the temporal table to the final table name
Extract-Load pipelines​
fal
provides a module el
that lets you run EL jobs. el
has two methods: el.airbyte_sync
and el.fivetran_sync
. These methods let you run sync jobs on respective Airbyte and Fivetran connections and connectors. API connection details as well information on connectors have to be provided in profiles.yml
.
Given this bit of profiles.yml
:
fal_extract_load:
dev:
my_fivetran_el
type: fivetran
api_key: my_fivetran_key
api_secret: my_fivetran_secret
connectors:
- name: fivetran_connector_1
id: id_fivetran_connector_1
my_airbyte_el
type: airbyte
host: http://localhost:8001
connections:
- name: airbyte_connection_1
id: id_airbyte_connection_1
you can run an Airbyte sync job:
el.airbyte_sync(config_name="my_airbyte_el", connection_name="airbyte_connection_1")
airbyte_sync
triggers and waits for a sync job on an Airbyte connections. Inputs:
config_name
- name of the API configuration defined inprofiles.yml
connection_name
- target Airbyte connection name (Optional ifconnection_id
is provided)connection_id
- target Airbyte connection id (Optional ifconnection_name
is provided)poll_interval
- wait time between polling of a running job (Optional, default is 10 seconds)poll_timeout
- timeout on polling requests (Optional, default is none)
Running a Fivetran sync job is similar:
el.fivetran_sync(config_name="my_fivetran_el", connector_name="fivetran_connector_1")
fivetran_sync
triggers and waits for a sync job on a Fivetran connector. Inputs:
config_name
- name of the API configuration defined inprofiles.yml
connector_name
- target Fivetran connector name (Optional ifconnector_id
is provided)connection_id
- target Fivetran connector id (Optional ifconnector_name
is provided)poll_interval
- wait time between polling of a running job (Optional, default is 10 seconds)poll_timeout
- timeout on polling requests (Optional, default is none)
meta
syntax​
models:
- name: historical_ozone_levels
...
meta:
owner: "@me"
fal:
scripts:
- send_slack_message.py
- another_python_script.py # will be run sequentially
Use the fal
and scripts
keys underneath the meta
config to let fal CLI know where to look for the Python scripts.
You can pass a list of scripts as shown above to run one or more scripts as a post-hook operation after a dbt run.