delex.execution package

Submodules

delex.execution.cost_estimation module

class delex.execution.cost_estimation.CostEstimator(table_a: DataFrame, table_b: DataFrame, nthreads: int)

Bases: object

class for estimating the runtime, working set size, and selectivity

Attributes:
nthreads
table_a_count
table_b_count

Methods

compute_estimates(blocking_program)

compute the cost estimates for blocking_program

build_time

estimate_plan_cost

filter_time

search_time

selectivity

validate

working_set_size

build_time(obj: Node | Predicate) float
compute_estimates(blocking_program: BlockingProgram)

compute the cost estimates for blocking_program

estimate_plan_cost(node: Node) float
filter_time(obj: Node | Predicate) float
property nthreads: int
search_time(obj: Node | Predicate) float
selectivity(obj: Node | Predicate) float
property table_a_count: int
property table_b_count: int
validate(blocking_program: BlockingProgram) None
working_set_size(obj: Node | Predicate, for_search: bool = None, size: int = None) float
class delex.execution.cost_estimation.ScalingModel(res)

Bases: object

a linear model for scaling behavior

Methods

fit

predict

classmethod fit(size: ndarray, time: ndarray)
predict(size)

delex.execution.dataframe_stream module

class delex.execution.dataframe_stream.DataFrameStream(stream: Iterable, schema: dict)

Bases: object

a stream of dataframes on which tranformations can be applied this replaces pyspark based execution for resource allocation issues

Attributes:
schema

Methods

apply(func, input_cols, out_name, out_dtype)

apply func with input_cols to this stream and append the result to the stream as out_name with data type out_dtype return a new DataFrameStream

drop(columns)

drop columns from this dataframe stream, return a new stream Parameters ---------- columns : list[str | tuple] the list of input columns for func, if nested tuples are provided

from_arrow_iter(itr, schema)

create a DataFrameStream from an iterable of pyarrow RecordBatchs and a pyarrow Schema

from_pandas_iter(itr, schema)

create a DataFrameStream from an iterable of pd.DataFrames and a pyspark Schema

pyarrow_schema([flat])

return schema as a pyarrow schema

spark_schema([flat])

return schema as a pyspark schema

to_arrow_stream()

convert this dataframe stream into an iterator of pyarrow RecordBatchs

to_pandas_stream()

convert this dataframe stream into an iterator of pandas DataFrames

apply(func: Callable, input_cols: list[str | tuple], out_name: str, out_dtype: DataType)

apply func with input_cols to this stream and append the result to the stream as out_name with data type out_dtype return a new DataFrameStream

Parameters:
funcCallable

the function that will be executed over the stream

input_colslist[str | tuple]

the list of input columns for func, if nested tuples are provided

out_namestr

the name of the output column to be added to the stream

out_dtypeT.DataType

the type returned by func

Returns:
DataFrameStream

a new dataframe stream

Raises:
KeyError

if any of input_cols cannot be resolved

drop(columns)

drop columns from this dataframe stream, return a new stream Parameters ———- columns : list[str | tuple]

the list of input columns for func, if nested tuples are provided

Returns:
DataFrameStream

a new dataframe stream with columns removed

Raises:
KeyError

if any of columns cannot be resolved

classmethod from_arrow_iter(itr: Iterable[RecordBatch], schema: Schema)

create a DataFrameStream from an iterable of pyarrow RecordBatchs and a pyarrow Schema

classmethod from_pandas_iter(itr: Iterable[DataFrame], schema: StructType)

create a DataFrameStream from an iterable of pd.DataFrames and a pyspark Schema

pyarrow_schema(flat: bool = False) Schema

return schema as a pyarrow schema

Parameters:
flatbool = False

if True return the schema in flattened format

Returns:
pa.Schema
property schema: dict
spark_schema(flat: bool = False) StructType

return schema as a pyspark schema

Parameters:
flatbool = False

if True return the schema in flattened format

Returns:
T.StructType
to_arrow_stream() Iterator[RecordBatch]

convert this dataframe stream into an iterator of pyarrow RecordBatchs

to_pandas_stream() Iterator[DataFrame]

convert this dataframe stream into an iterator of pandas DataFrames

delex.execution.graph_executor module

class delex.execution.graph_executor.GraphExecutionStats(*, nodes: list[Node], sub_graph_stats: list[SubGraphExecutionStats], dot_graph: str)

Bases: BaseModel

execution statistics for an entire execution plan

Attributes:
build_time
exec_time
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

total_time
working_set_size

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

property build_time: float
dot_graph: str
property exec_time: float
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nodes: list[Node]
sub_graph_stats: list[SubGraphExecutionStats]
property total_time: float
property working_set_size: float
class delex.execution.graph_executor.GraphExecutor(*, index_table: DataFrame, search_table: DataFrame, build_parallelism: Annotated[int, Gt(gt=0)] = 4, index_table_id_col: str = '_id', ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None = None, cost_est: CostEstimator | None = None)

Bases: BaseModel

a class for executing a execution graph over two dataframes

Attributes:
index_table_count
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

use_chunking
use_cost_estimation

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

execute(sink[, projection])

execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

build_parallelism: Annotated[int, Gt(gt=0)]
cost_est: CostEstimator | None
execute(sink: Node, projection: list[str] | None = None)

execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink

Parameters:
sinkNode

the sink of the execution graph

projectionOptional[list[str]] = None

columns to be projected along with the output of sink

Raises:
ValueError

if sink is not a sink in the graph

index_table: DataFrame
property index_table_count: int
index_table_id_col: str
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None
search_table: DataFrame
property use_chunking: bool
property use_cost_estimation: bool
class delex.execution.graph_executor.PartitionExecutionStats(*, partitioner: DataFramePartitioner | None, part_num: int | None, build_time: float, exec_time: float, working_set_size: int)

Bases: BaseModel

execution statistics for a single partition

Attributes:
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

build_time: float
exec_time: float
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

part_num: int | None
partitioner: DataFramePartitioner | None
working_set_size: int
class delex.execution.graph_executor.SubGraphExecutionStats(*, nodes: list[Node], partition_stats: list[PartitionExecutionStats])

Bases: BaseModel

execution statistics for a subgraph

Attributes:
build_time
exec_time
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

total_time
working_set_size

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

property build_time: float
property exec_time: float
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nodes: list[Node]
partition_stats: list[PartitionExecutionStats]
property total_time: float
property working_set_size: float

delex.execution.optimizer module

class delex.execution.optimizer.BlockingProgramOptimizer

Bases: object

a class for converting a BlockingProgram into an execution plan of Nodes, optionally applying optimizations

Methods

default_plan(blocking_program)

create a default execution plan for blocking_program

optimize(blocking_program[, cost_est])

create an optimized execution plan for blocking_program, optionally using cost_est If cost_est is not supplied, the optimizer simply indexes the least number of predicates possible and generates a default plan using those nodes.

preprocess(blocking_program)

preprocess the blocking program by dropping any redundant rules or predicates.

default_plan(blocking_program: BlockingProgram) Node

create a default execution plan for blocking_program

Parameters:
blocking_programBlockingProgram

the blocking program that will be turned into an execution plan

Returns:
Node

the sink of the execution plan

Raises:
ValueError

if blocking_program cannot be turned into an efficient execution plan, i.e. it would require executing over the cross product of the tables

optimize(blocking_program: BlockingProgram, cost_est: CostEstimator | None = None)

create an optimized execution plan for blocking_program, optionally using cost_est If cost_est is not supplied, the optimizer simply indexes the least number of predicates possible and generates a default plan using those nodes.

Parameters:
blocking_programBlockingProgram

the blocking program that will be turned into an execution plan

cost_estOptional[cost_est] = None

the cost estimator used for optimizing blocking_program

Returns:
Node

the sink of the execution plan

Raises:
ValueError

if blocking_program cannot be turned into an efficient execution plan, i.e. it would require executing over the cross product of the tables

preprocess(blocking_program: BlockingProgram) BlockingProgram

preprocess the blocking program by dropping any redundant rules or predicates. That is, remove anything that doens’t affect the output of blocking_program

delex.execution.partitioner module

class delex.execution.partitioner.DataFramePartitioner(column: str, nparts: Annotated[int, Gt(gt=0)])

Bases: object

A simple class for hash paritioning dataframes using the xxhash64 implementation in pyspark

Methods

filter_array(ids, arr, pnum)

filter an array column based on ids

get_partition(df, pnum)

get partition pnum of df

column: str
filter_array(ids: str | Column, arr: str | Column | None, pnum: int)

filter an array column based on ids

Parameters:
idsstr | pyspark.sql.Column

array<long> column used to partition the dataframe

arrOptional[str | pyspark.sql.Column]

the array column that will be filtered, and returned, if not provided, ids will be filtered and returned

pnumint

the partition number

Returns:
pyspark.sql.Column

a column expression for the filtered array

Raises:
ValueError

if pnum < 0 or pnum >= self.nparts:

get_partition(df: DataFrame, pnum: int) DataFrame

get partition pnum of df

Parameters:
dfpyspark.sql.DataFrame

the dataframe to be partitioned

pnumint

the partition number

Returns:
pyspark.sql.DataFrame

the parition of df

Raises:
ValueError

if pnum < 0 or pnum >= self.nparts:

nparts: Annotated[int, Gt(gt=0)]

delex.execution.plan_executor module

class delex.execution.plan_executor.PlanExecutionStats(*, optimize_time: float, cost_estimation_time: float, graph_exec_stats: GraphExecutionStats)

Bases: BaseModel

Attributes:
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

total_time

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

cost_estimation_time: float
graph_exec_stats: GraphExecutionStats
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

optimize_time: float
property total_time: float
class delex.execution.plan_executor.PlanExecutor(*, index_table: DataFrame, search_table: DataFrame, build_parallelism: Annotated[int, Gt(gt=0)] = 4, index_table_id_col: str = '_id', ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None = None, cost_est: CostEstimator | None = None, optimize: bool, estimate_cost: bool)

Bases: GraphExecutor

Attributes:
index_table_count
model_extra

Get extra fields set during validation.

model_fields_set

Returns the set of fields that have been explicitly set on this model instance.

use_chunking
use_cost_estimation

Methods

copy(*[, include, exclude, update, deep])

Returns a copy of the model.

execute(prog[, projection])

execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink

model_construct([_fields_set])

Creates a new instance of the Model class with validated data.

model_copy(*[, update, deep])

!!! abstract "Usage Documentation"

model_dump(*[, mode, include, exclude, ...])

!!! abstract "Usage Documentation"

model_dump_json(*[, indent, include, ...])

!!! abstract "Usage Documentation"

model_json_schema([by_alias, ref_template, ...])

Generates a JSON schema for a model class.

model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

model_rebuild(*[, force, raise_errors, ...])

Try to rebuild the pydantic-core schema for the model.

model_validate(obj, *[, strict, ...])

Validate a pydantic model instance.

model_validate_json(json_data, *[, strict, ...])

!!! abstract "Usage Documentation"

model_validate_strings(obj, *[, strict, ...])

Validate the given object with string data against the Pydantic model.

construct

dict

from_orm

generate_plan

json

parse_file

parse_obj

parse_raw

schema

schema_json

update_forward_refs

validate

estimate_cost: bool
execute(prog, projection=None)

execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink

Parameters:
sinkNode

the sink of the execution graph

projectionOptional[list[str]] = None

columns to be projected along with the output of sink

Raises:
ValueError

if sink is not a sink in the graph

generate_plan(prog)
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

optimize: bool

Module contents