dlt.common.pipeline
_StepInfo Objects
class _StepInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
StepInfo Objects
class StepInfo(SupportsHumanize, Generic[TStepMetricsCo])
metrics
Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
started_at
@property
def started_at() -> datetime.datetime
Returns the earliest start date of all collected metrics
finished_at
@property
def finished_at() -> datetime.datetime
Returns the latest end date of all collected metrics
_ExtractInfo Objects
class _ExtractInfo(NamedTuple)
NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
ExtractInfo Objects
class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo)
A tuple holding information on extracted data items. Returned by pipeline extract
method.
asdict
def asdict() -> DictStrAny
A dictionary representation of ExtractInfo that can be loaded with dlt
_NormalizeInfo Objects
class _NormalizeInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
NormalizeInfo Objects
class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo)
A tuple holding information on normalized data items. Returned by pipeline normalize
method.
asdict
def asdict() -> DictStrAny
A dictionary representation of NormalizeInfo that can be loaded with dlt
_LoadInfo Objects
class _LoadInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
LoadInfo Objects
class LoadInfo(StepInfo[LoadMetrics], _LoadInfo)
A tuple holding the information on recently loaded packages. Returned by pipeline run
and load
methods
asdict
def asdict() -> DictStrAny
A dictionary representation of LoadInfo that can be loaded with dlt
has_failed_jobs
@property
def has_failed_jobs() -> bool
Returns True if any of the load packages has a failed job.
raise_on_failed_jobs
def raise_on_failed_jobs() -> None
Raises DestinationHasFailedJobs
exception if any of the load packages has a failed job.
WithStepInfo Objects
class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo])
Implemented by classes that generate StepInfo with metrics and package infos
current_load_id
@property
def current_load_id() -> str
Returns currently processing load id
get_step_info
@abstractmethod
def get_step_info(pipeline: "SupportsPipeline") -> TStepInfo
Returns and instance of StepInfo with metrics and package infos
TPipelineLocalState Objects
class TPipelineLocalState(TypedDict)
first_run
Indicates a first run of the pipeline, where run ends with successful loading of data
initial_cwd
Run dir when pipeline was instantiated for a first time, defaults to cwd on OSS run context
TPipelineState Objects
class TPipelineState(TVersionedState)
Schema for a pipeline state that is stored within the pipeline working directory
default_schema_name
Name of the first schema added to the pipeline to which all the resources without schemas will be added
schema_names
All the schemas present within the pipeline working directory
TSourceState Objects
class TSourceState(TPipelineState)
sources
type: ignore[misc]
SupportsPipeline Objects
class SupportsPipeline(Protocol)
A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties
pipeline_name
Name of the pipeline
default_schema_name
Name of the default schema
destination
The destination reference which is ModuleType. destination.__name__
returns the name string
dataset_name
Name of the dataset to which pipeline will be loaded to
runtime_config
A configuration of runtime options like logging level and format and various tracing options
working_dir
A working directory of the pipeline
pipeline_salt
A configurable pipeline secret to be used as a salt or a seed for encryption key
first_run
Indicates a first run of the pipeline, where run ends with successful loading of the data
state
@property
def state() -> TPipelineState
Returns dictionary with current pipeline state
Returns:
TPipelineState
- The current pipeline state
schemas
@property
def schemas() -> Mapping[str, Schema]
Returns all known schemas of the pipeline
Returns:
Mapping[str, Schema]: A mapping of schema names to their corresponding Schema objects
get_local_state_val
def get_local_state_val(key: str) -> Any
Gets value from local state. Local state is not synchronized with destination.
Arguments:
key
str - The key to get the value from
Returns:
Any
- The value from the local state
destination_client
def destination_client(schema_name: str = None) -> JobClientBase
Get the destination job client for the configured destination
Arguments:
schema_name
str, optional - The name of the schema to get the client for. Defaults to None.
Returns:
JobClientBase
- The destination job client
run
def run(data: Any = None,
*,
destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None) -> LoadInfo
Loads the data into the destination
Arguments:
data
Any - The data to load. Can be a DltSource, DltResource or any iterable of data items.destination
TDestinationReferenceArg, optional - The destination to load the data to.dataset_name
str, optional - The name of the dataset to load the data to.credentials
Any, optional - The credentials to use to load the data.table_name
str, optional - The name of the table to load the data to.write_disposition
TWriteDispositionConfig, optional - The write disposition to use to load the data.columns
Sequence[TColumnSchema], optional - A list of column hints.primary_key
TColumnNames, optional - A list of column names to be used as primary key.schema
Schema, optional - A schema to use to load the data. Defaults to the schema configured in the pipeline.loader_file_format
TLoaderFileFormat, optional - The file format to use to load the data. Defaults to preferred loader file format of the destination.schema_contract
TSchemaContract, optional - The schema contract to use to load the data.
Returns:
LoadInfo
- Information on the load operation
PipelineContext Objects
@configspec
class PipelineContext(ContainerInjectableContext)
enable_activation_history
When True, references to activated pipelines will be also stored
pipeline
def pipeline() -> SupportsPipeline
Creates or returns an active pipeline
activate
def activate(pipeline: SupportsPipeline) -> None
Activates pipeline
and deactivates active one.
activation_history
def activation_history() -> List[SupportsPipeline]
Get list of pipelines that were activated
cls__init__
@classmethod
def cls__init__(
deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None
Initialize the context with a function returning the Pipeline object to allow creation on first use
current_pipeline
def current_pipeline() -> SupportsPipeline
Gets active pipeline context or None if not found
pipeline_state
def pipeline_state(
container: Container,
initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]
Gets value of the state from context or active pipeline, if none found returns initial_default
Injected state is called "writable": it is injected by the Pipeline
class and all the changes will be persisted.
The state coming from pipeline context or initial_default
is called "read only" and all the changes to it will be discarded
Returns tuple (state, writable)
get_dlt_pipelines_dir
def get_dlt_pipelines_dir() -> str
Gets default directory where pipelines' data will be stored
- in user home directory ~/.dlt/pipelines/
- if current user is root in /var/dlt/pipelines
- if current user does not have a home directory in /tmp/dlt/pipelines
get_dlt_repos_dir
def get_dlt_repos_dir() -> str
Gets default directory where command repositories will be stored