Virtual tables allow you to query and write to tables in supported data platforms without storing the data in Foundry.
You can interact with the tables in Python transforms with the transforms-tables
library.
To interact with virtual tables from a Python transform, you must:
transforms-tables
from the Libraries tab.The Pythonic virtual tables API provides TableInput
and TableOutput
types to interact with virtual tables.
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput @transform( source_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput("ri.tables.main.table.5678"), ) def compute(source_table: TableTransformInput, output_table: TableTransformOutput): ... # normal transforms API
The tables referred to in a Python transform need not come from the same source, or even the same platform.
The above example relies on the tables specified in the transform to already exist within your Foundry environment. If this is not the case, you can configure the output virtual table to be created during checks, as with dataset outputs. This requires extra configuration to specify the source and location where the table should be stored.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
from transforms.api import transform from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput, SnowflakeTable @transform( source_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput( "/path/to/new/table", # Must specify the Data Connection source you want to create the table in and the table identifier/location "ri.magritte..source.1234", SnowflakeTable("database", "schema", "table"), ), ) def compute(source_table: TableTransformInput, output_table: TableTransformOutput): ... # normal transforms API
Once created, the extra configuration for the source and table metadata can be removed from the TableOutput
to be more concise. Once a virtual table has been created, it is not possible to change the source or location. Modifying the source or location will cause checks to fail.
The available Table
subclasses are:
BigQueryTable(project: str, dataset: str, table: str)
DeltaTable(path: str)
FilesTable(path: str, format: FileFormat)
IcebergTable(table: str, warehouse_path: str)
SnowflakeTable(database: str, schema: str, table: str)
You must use the appropriate class based on the type of source you are connecting to.
Tables backed by a Snowflake connection can push Foundry authored transforms to SnowflakeDB. This is known as compute pushdown, and it allows for the use of Foundry's pipeline management, data lineage, and security functionality on top of Snowflake compute.
To use compute pushdown with Snowflake, create a lightweight Python repository and install the most recent version of the transforms-tables
library. A Snowpark ↗ session will be configured based on the connection details of the Snowflake tables configured as inputs and/or outputs to the transforms. The data can be transformed using the Snowpark DataFrame API. For full guidance on the Snowpark API, consult the Snowpark documentation ↗.
An example of a Snowpark transform is shown below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
from snowflake.snowpark.functions import col, udf from snowflake.snowpark.types import StringType from transforms.api import lightweight, transform from transforms.tables import ( SnowflakeTable, TableInput, TableLightweightInput, TableLightweightOutput, TableOutput, ) ID_PREFIX = "CUSTOMER-NO-" @lightweight @transform( input_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput( "ri.tables.main.table.5678", "ri.magritte..source.1234", SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED"), ), ) def compute_in_snowflake(input_table: TableLightweightInput, output_table: TableLightweightOutput): """ With Snowflake tables, you can perform lightweight transforms using the Snowpark APIs. All compute for these is pushed down to the underlying Snowflake instance, so this can tackle big data workloads. In a set up like this, all data must live in the same Snowpark instance and be accessible through the same connection. """ # get a Snowpark DataFrame instance df: snow.DataFrame = input_table.snowpark().dataframe() session: snow.Session = df.session # define a UDF to apply to our data @udf(session=session, return_type=StringType()) def fix_id_col(ident: int) -> str: """ UDF to convert id to string and prepend "CUSTOMER-NO-". """ return ID_PREFIX + str(ident) # apply UDF df = df.with_column("ID", fix_id_col(col("ID"))) # write back to the new table output_table.snowpark().write(df)
The Snowpark API allows data to be converted into a pandas DataFrame. If the scale of your data is small enough, this can be used to bring the data from Snowflake into Foundry lightweight compute. This enables the use of transforms beyond the capabilities of the Snowpark APIs, and allows Snowflake tables to be combined with other Foundry data.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
import hashlib from transforms.api import lightweight, transform from transforms.tables import ( SnowflakeTable, TableInput, TableLightweightInput, TableLightweightOutput, TableOutput, ) @lightweight @transform( input_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput( "ri.tables.main.table.5678", "ri.magritte..source.1234", SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"), ), ) def compute_local(input_table: TableLightweightInput, output_table: TableLightweightOutput): """ Snowpark also supports conversion to pandas DataFrames, meaning that you can use lightweight transforms on top of Snowflake tables to conduct in-container compute work. You can use this to go beyond the scope of what is supported in Snowpark. """ # get a Snowpark DataFrame instance df = input_table.snowpark().dataframe() session = df.session # convert to pandas pd_df = df.to_pandas() # create ANON_CODE by hashing the concatenation of CITY, STATE, and ZIP_CODE def generate_anon_code(row): concatenated = f"{row['CITY']}{row['STATE']}{row['ZIP_CODE']}" return hashlib.sha256(concatenated.encode("utf-8")).hexdigest() # apply the function to create the ANON_CODE column pd_df["ANON_CODE"] = pd_df.apply(generate_anon_code, axis=1) # select the ID and ANON_CODE columns result_data = pd_df[["ID", "ANON_CODE"]] # write back to the new table new_df = session.create_dataframe(result_data, schema=["ID", "ANON_CODE"]) output_table.snowpark().write(new_df)