Skip to main content
Version: devel

dlt.common.pipeline

_StepInfo Objects

class _StepInfo(NamedTuple)

View source on GitHub

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

StepInfo Objects

class StepInfo(SupportsHumanize, Generic[TStepMetricsCo])

View source on GitHub

metrics

Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

started_at

@property
def started_at() -> datetime.datetime

View source on GitHub

Returns the earliest start date of all collected metrics

finished_at

@property
def finished_at() -> datetime.datetime

View source on GitHub

Returns the latest end date of all collected metrics

_ExtractInfo Objects

class _ExtractInfo(NamedTuple)

View source on GitHub

NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

ExtractInfo Objects

class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo)

View source on GitHub

A tuple holding information on extracted data items. Returned by pipeline extract method.

asdict

def asdict() -> DictStrAny

View source on GitHub

A dictionary representation of ExtractInfo that can be loaded with dlt

_NormalizeInfo Objects

class _NormalizeInfo(NamedTuple)

View source on GitHub

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

NormalizeInfo Objects

class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo)

View source on GitHub

A tuple holding information on normalized data items. Returned by pipeline normalize method.

asdict

def asdict() -> DictStrAny

View source on GitHub

A dictionary representation of NormalizeInfo that can be loaded with dlt

_LoadInfo Objects

class _LoadInfo(NamedTuple)

View source on GitHub

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

LoadInfo Objects

class LoadInfo(StepInfo[LoadMetrics], _LoadInfo)

View source on GitHub

A tuple holding the information on recently loaded packages. Returned by pipeline run and load methods

asdict

def asdict() -> DictStrAny

View source on GitHub

A dictionary representation of LoadInfo that can be loaded with dlt

has_failed_jobs

@property
def has_failed_jobs() -> bool

View source on GitHub

Returns True if any of the load packages has a failed job.

raise_on_failed_jobs

def raise_on_failed_jobs() -> None

View source on GitHub

Raises DestinationHasFailedJobs exception if any of the load packages has a failed job.

WithStepInfo Objects

class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo])

View source on GitHub

Implemented by classes that generate StepInfo with metrics and package infos

current_load_id

@property
def current_load_id() -> str

View source on GitHub

Returns currently processing load id

get_step_info

@abstractmethod
def get_step_info(pipeline: "SupportsPipeline") -> TStepInfo

View source on GitHub

Returns and instance of StepInfo with metrics and package infos

TPipelineLocalState Objects

class TPipelineLocalState(TypedDict)

View source on GitHub

first_run

Indicates a first run of the pipeline, where run ends with successful loading of data

initial_cwd

Run dir when pipeline was instantiated for a first time, defaults to cwd on OSS run context

TPipelineState Objects

class TPipelineState(TVersionedState)

View source on GitHub

Schema for a pipeline state that is stored within the pipeline working directory

default_schema_name

Name of the first schema added to the pipeline to which all the resources without schemas will be added

schema_names

All the schemas present within the pipeline working directory

TSourceState Objects

class TSourceState(TPipelineState)

View source on GitHub

sources

type: ignore[misc]

SupportsPipeline Objects

class SupportsPipeline(Protocol)

View source on GitHub

A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties

pipeline_name

Name of the pipeline

default_schema_name

Name of the default schema

destination

The destination reference which is ModuleType. destination.__name__ returns the name string

dataset_name

Name of the dataset to which pipeline will be loaded to

runtime_config

A configuration of runtime options like logging level and format and various tracing options

working_dir

A working directory of the pipeline

pipeline_salt

A configurable pipeline secret to be used as a salt or a seed for encryption key

first_run

Indicates a first run of the pipeline, where run ends with successful loading of the data

state

@property
def state() -> TPipelineState

View source on GitHub

Returns dictionary with current pipeline state

Returns:

  • TPipelineState - The current pipeline state

schemas

@property
def schemas() -> Mapping[str, Schema]

View source on GitHub

Returns all known schemas of the pipeline

Returns:

Mapping[str, Schema]: A mapping of schema names to their corresponding Schema objects

get_local_state_val

def get_local_state_val(key: str) -> Any

View source on GitHub

Gets value from local state. Local state is not synchronized with destination.

Arguments:

  • key str - The key to get the value from

Returns:

  • Any - The value from the local state

destination_client

def destination_client(schema_name: str = None) -> JobClientBase

View source on GitHub

Get the destination job client for the configured destination

Arguments:

  • schema_name str, optional - The name of the schema to get the client for. Defaults to None.

Returns:

  • JobClientBase - The destination job client

run

def run(data: Any = None,
*,
destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None) -> LoadInfo

View source on GitHub

Loads the data into the destination

Arguments:

  • data Any - The data to load. Can be a DltSource, DltResource or any iterable of data items.
  • destination TDestinationReferenceArg, optional - The destination to load the data to.
  • dataset_name str, optional - The name of the dataset to load the data to.
  • credentials Any, optional - The credentials to use to load the data.
  • table_name str, optional - The name of the table to load the data to.
  • write_disposition TWriteDispositionConfig, optional - The write disposition to use to load the data.
  • columns Sequence[TColumnSchema], optional - A list of column hints.
  • primary_key TColumnNames, optional - A list of column names to be used as primary key.
  • schema Schema, optional - A schema to use to load the data. Defaults to the schema configured in the pipeline.
  • loader_file_format TLoaderFileFormat, optional - The file format to use to load the data. Defaults to preferred loader file format of the destination.
  • schema_contract TSchemaContract, optional - The schema contract to use to load the data.

Returns:

  • LoadInfo - Information on the load operation

PipelineContext Objects

@configspec
class PipelineContext(ContainerInjectableContext)

View source on GitHub

enable_activation_history

When True, references to activated pipelines will be also stored

pipeline

def pipeline() -> SupportsPipeline

View source on GitHub

Creates or returns an active pipeline

activate

def activate(pipeline: SupportsPipeline) -> None

View source on GitHub

Activates pipeline and deactivates active one.

activation_history

def activation_history() -> List[SupportsPipeline]

View source on GitHub

Get list of pipelines that were activated

cls__init__

@classmethod
def cls__init__(
deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None

View source on GitHub

Initialize the context with a function returning the Pipeline object to allow creation on first use

current_pipeline

def current_pipeline() -> SupportsPipeline

View source on GitHub

Gets active pipeline context or None if not found

pipeline_state

def pipeline_state(
container: Container,
initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]

View source on GitHub

Gets value of the state from context or active pipeline, if none found returns initial_default

Injected state is called "writable": it is injected by the Pipeline class and all the changes will be persisted. The state coming from pipeline context or initial_default is called "read only" and all the changes to it will be discarded

Returns tuple (state, writable)

get_dlt_pipelines_dir

def get_dlt_pipelines_dir() -> str

View source on GitHub

Gets default directory where pipelines' data will be stored

  1. in user home directory ~/.dlt/pipelines/
  2. if current user is root in /var/dlt/pipelines
  3. if current user does not have a home directory in /tmp/dlt/pipelines

get_dlt_repos_dir

def get_dlt_repos_dir() -> str

View source on GitHub

Gets default directory where command repositories will be stored

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.