Skip to main content
Version: devel

dlt.pipeline.pipeline

Pipeline Objects

class Pipeline(SupportsPipeline)

View source on GitHub

pipelines_dir

A directory where the pipelines' working directories are created

is_active

Tells if instance is currently active and available via dlt.pipeline()

__init__

def __init__(pipeline_name: str,
pipelines_dir: str,
pipeline_salt: TSecretStrValue,
destination: AnyDestination,
staging: AnyDestination,
dataset_name: str,
import_schema_path: str,
export_schema_path: str,
dev_mode: bool,
progress: _Collector,
must_attach_to_local_pipeline: bool,
config: PipelineConfiguration,
runtime: RuntimeConfiguration,
refresh: Optional[TRefreshMode] = None) -> None

View source on GitHub

Initializes the Pipeline class which implements dlt pipeline. Please use pipeline function in dlt module to create a new Pipeline instance.

drop

def drop(pipeline_name: str = None) -> "Pipeline"

View source on GitHub

Deletes local pipeline state, schemas and any working files. Re-initializes all internal fields via init. If pipeline_name is specified that is different from the current name, new pipeline instance is created, activated and returned. Note that original pipeline is still dropped.

Arguments:

  • pipeline_name str - Optional. New pipeline name. Creates and activates new instance

Returns:

  • "Pipeline" - returns self

extract

@with_runtime_trace()
@with_schemas_sync
@with_state_sync(may_extract_state=True)
@with_config_section((known_sections.EXTRACT, ))
def extract(
data: Any,
*,
table_name: str = None,
parent_table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
max_parallel_items: int = ConfigValue,
workers: int = ConfigValue,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: Optional[TRefreshMode] = None,
loader_file_format: Optional[TLoaderFileFormat] = None) -> ExtractInfo

View source on GitHub

Extracts the data and prepare it for the normalization. Does not require destination or credentials to be configured. See run method for the arguments' description.

normalize

@with_runtime_trace()
@with_schemas_sync
@with_config_section((known_sections.NORMALIZE, ))
def normalize(workers: int = 1) -> NormalizeInfo

View source on GitHub

Normalizes the data prepared with extract method, infers the schema and creates load packages for the load method. Requires destination to be known.

load

@with_runtime_trace(send_state=True)
@with_state_sync()
@with_config_section((known_sections.LOAD, ))
def load(destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
*,
workers: int = 20,
raise_on_failed_jobs: bool = ConfigValue) -> LoadInfo

View source on GitHub

Loads the packages prepared by normalize method into the dataset_name at destination, optionally using provided credentials

run

@with_runtime_trace()
@with_config_section(("run", ))
def run(data: Any = None,
*,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: TRefreshMode = None) -> LoadInfo

View source on GitHub

Loads the data from data argument into the destination specified in destination and dataset specified in dataset_name.

Notes:

This method will extract the data from the data argument, infer the schema, normalize the data into a load package (ie. jsonl or PARQUET files representing tables) and then load such packages into the destination.

The data may be supplied in several forms:

  • a list or Iterable of any JSON-serializable objects ie. dlt.run([1, 2, 3], table_name="numbers")
  • any Iterator or a function that yield (Generator) ie. dlt.run(range(1, 10), table_name="range")
  • a function or a list of functions decorated with @dlt.resource ie. dlt.run([chess_players(title="GM"), chess_games()])
  • a function or a list of functions decorated with @dlt.source.

Please note that dlt deals with bytes, datetime, decimal and uuid objects so you are free to load documents containing ie. binary data or dates.

Execution: The run method will first use sync_destination method to synchronize pipeline state and schemas with the destination. You can disable this behavior with restore_from_destination configuration option. Next it will make sure that data from the previous is fully processed. If not, run method normalizes, loads pending data items and exits If there was no pending data, new data from data argument is extracted, normalized and loaded.

Arguments:

  • data Any - Data to be loaded to destination

  • destination TDestinationReferenceArg, optional - A name of the destination to which dlt will load the data, or a destination module imported from dlt.destination. If not provided, the value passed to dlt.pipeline will be used.

  • staging TDestinationReferenceArg, optional - A name of the stagingdestination to which dlt will load the data temporarily before it is loaded to the destination, can also be a module imported from dlt.destination.

  • dataset_name str, optional - A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. schema in relational databases or folder grouping many files. If not provided, the value passed to dlt.pipeline will be used. If not provided at all then defaults to the pipeline_name

  • credentials Any, optional - Credentials for the destination ie. database connection string or a dictionary with google cloud credentials. In most cases should be set to None, which lets dlt to use secrets.toml or environment variables to infer right credentials values.

  • table_name str, optional - The name of the table to which the data should be loaded within the dataset. This argument is required for a data that is a list/Iterable or Iterator without __name__ attribute. The behavior of this argument depends on the type of the data:

    • generator functions - the function name is used as table name, table_name overrides this default
    • @dlt.resource - resource contains the full table schema and that includes the table name. table_name will override this property. Use with care!
    • @dlt.source - source contains several resources each with a table schema. table_name will override all table names within the source and load the data into single table.
  • write_disposition TWriteDispositionConfig, optional - Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. Allowed shorthand string literals: append will always add new data at the end of the table. replace will replace existing data with new data. skip will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide write_disposition={"disposition": "merge", "strategy": "scd2"}. Please note that in case of dlt.resource the table schema value will be overwritten and in case of dlt.source, the values in all resources will be overwritten.

  • columns TAnySchemaColumns, optional - A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.

  • primary_key TColumnNames, optional - A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.

  • schema Schema, optional - An explicit Schema object in which all table schemas will be grouped. By default dlt takes the schema from the source (if passed in data argument) or creates a default one itself.

  • loader_file_format TLoaderFileFormat, optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.

  • table_format TTableFormat, optional - Can be "delta" or "iceberg". The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.

  • schema_contract TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.

  • refresh TRefreshMode, optional - Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:

    • drop_sources - Drop tables and source and resource state for all sources currently being processed in run or extract methods of the pipeline. (Note: schema history is erased)
    • drop_resources- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
    • drop_data - Wipe all data and resource state for all resources being processed. Schema is not modified.

Raises:

  • PipelineStepFailed - when a problem happened during extract, normalize or load steps.

Returns:

  • LoadInfo - Information on loaded data including the list of package ids and failed job statuses. Please not that dlt will not raise if a single job terminally fails. Such information is provided via LoadInfo.

sync_destination

@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sync_destination(destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None) -> None

View source on GitHub

Synchronizes pipeline state with the destination's state kept in dataset_name

Notes:

Attempts to restore pipeline state and schemas from the destination. Requires the state that is present at the destination to have a higher version number that state kept locally in working directory. In such a situation the local state, schemas and intermediate files with the data will be deleted and replaced with the state and schema present in the destination.

A special case where the pipeline state exists locally but the dataset does not exist at the destination will wipe out the local state.

  • Note - this method is executed by the run method before any operation on data. Use restore_from_destination configuration option to disable that behavior.

activate

def activate() -> None

View source on GitHub

Activates the pipeline

The active pipeline is used as a context for several dlt components. It provides state to sources and resources evaluated outside of pipeline.run and pipeline.extract method. For example, if the source you use is accessing state in dlt.source decorated function, the state is provided by active pipeline.

The name of active pipeline is used when resolving secrets and config values as the optional most outer section during value lookup. For example if pipeline with name chess_pipeline is active and dlt looks for BigQuery configuration, it will look in chess_pipeline.destination.bigquery.credentials first and then in destination.bigquery.credentials.

Active pipeline also provides the current DestinationCapabilitiesContext to other components ie. Schema instances. Among others, it sets the naming convention and maximum identifier length.

Only one pipeline is active at a given time.

Pipeline created or attached with dlt.pipeline/'dlt.attachis automatically activated.run, loadandextract` methods also activate pipeline.

deactivate

def deactivate() -> None

View source on GitHub

Deactivates the pipeline

Pipeline must be active in order to use this method. Please refer to activate() method for the explanation of active pipeline concept.

has_data

@property
def has_data() -> bool

View source on GitHub

Tells if the pipeline contains any data: schemas, extracted files, load packages or loaded packages in the destination

has_pending_data

@property
def has_pending_data() -> bool

View source on GitHub

Tells if the pipeline contains any pending packages to be normalized or loaded

state

@property
def state() -> TPipelineState

View source on GitHub

Returns a dictionary with the pipeline state

naming

@property
def naming() -> NamingConvention

View source on GitHub

Returns naming convention of the default schema

last_trace

@property
def last_trace() -> PipelineTrace

View source on GitHub

Returns or loads last trace generated by pipeline. The trace is loaded from standard location.

list_extracted_resources

@deprecated(
"Please use list_extracted_load_packages instead. Flat extracted storage format got dropped"
" in dlt 0.4.0",
category=Dlt04DeprecationWarning,
)
def list_extracted_resources() -> Sequence[str]

View source on GitHub

Returns a list of all the files with extracted resources that will be normalized.

list_extracted_load_packages

def list_extracted_load_packages() -> Sequence[str]

View source on GitHub

Returns a list of all load packages ids that are or will be normalized.

list_normalized_load_packages

def list_normalized_load_packages() -> Sequence[str]

View source on GitHub

Returns a list of all load packages ids that are or will be loaded.

list_completed_load_packages

def list_completed_load_packages() -> Sequence[str]

View source on GitHub

Returns a list of all load package ids that are completely loaded

get_load_package_info

def get_load_package_info(load_id: str) -> LoadPackageInfo

View source on GitHub

Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.

get_load_package_state

def get_load_package_state(load_id: str) -> TLoadPackageState

View source on GitHub

Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.

list_failed_jobs_in_package

def list_failed_jobs_in_package(load_id: str) -> Sequence[LoadJobInfo]

View source on GitHub

List all failed jobs and associated error messages for a specified load_id

drop_pending_packages

def drop_pending_packages(with_partial_loads: bool = True) -> None

View source on GitHub

Deletes all extracted and normalized packages, including those that are partially loaded by default

sync_schema

@with_schemas_sync
def sync_schema(schema_name: str = None) -> TSchemaTables

View source on GitHub

Synchronizes the schema schema_name with the destination. If no name is provided, the default schema will be synchronized.

set_local_state_val

def set_local_state_val(key: str, value: Any) -> None

View source on GitHub

Sets value in local state. Local state is not synchronized with destination.

get_local_state_val

def get_local_state_val(key: str) -> Any

View source on GitHub

Gets value from local state. Local state is not synchronized with destination.

sql_client

@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sql_client(schema_name: str = None) -> SqlClientBase[Any]

View source on GitHub

Returns a sql client configured to query/change the destination and dataset that were used to load the data. Use the client with with statement to manage opening and closing connection to the destination:

with pipeline.sql_client() as client:
with client.execute_query(
"SELECT id, name, email FROM customers WHERE id = %s", 10
) as cursor:
print(cursor.fetchall())

The client is authenticated and defaults all queries to dataset_name used by the pipeline. You can provide alternative schema_name which will be used to normalize dataset name.

destination_client

@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def destination_client(schema_name: str = None) -> JobClientBase

View source on GitHub

Get the destination job client for the configured destination Use the client with with statement to manage opening and closing connection to the destination:

with pipeline.destination_client() as client:
client.drop_storage() # removes storage which typically wipes all data in it

The client is authenticated. You can provide alternative schema_name which will be used to normalize dataset name. If no schema name is provided and no default schema is present in the pipeline, and ad hoc schema will be created and discarded after use.

managed_state

@contextmanager
def managed_state(*, extract_state: bool = False) -> Iterator[TPipelineState]

View source on GitHub

Puts pipeline state in managed mode, where yielded state changes will be persisted or fully roll-backed on exception.

Makes the state to be available via StateInjectableContext

dataset

def dataset(schema: Union[Schema, str, None] = None,
dataset_type: TDatasetType = "auto") -> Any

View source on GitHub

Returns a dataset object for querying the destination data.

Arguments:

  • schema Union[Schema, str, None] - Schema name or Schema object to use. If None, uses the default schema if set.
  • dataset_type TDatasetType - Type of dataset interface to return. Defaults to 'auto' which will select ibis if available otherwise it will fallback to the standard dbapi interface.

Returns:

  • Any - A dataset object that supports querying the destination data.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.