delex.execution package
Submodules
delex.execution.cost_estimation module
- class delex.execution.cost_estimation.CostEstimator(table_a: DataFrame, table_b: DataFrame, nthreads: int)
Bases:
object
class for estimating the runtime, working set size, and selectivity
- Attributes:
- nthreads
- table_a_count
- table_b_count
Methods
compute_estimates
(blocking_program)compute the cost estimates for blocking_program
build_time
estimate_plan_cost
filter_time
search_time
selectivity
validate
working_set_size
- compute_estimates(blocking_program: BlockingProgram)
compute the cost estimates for blocking_program
- property nthreads: int
- property table_a_count: int
- property table_b_count: int
- validate(blocking_program: BlockingProgram) None
delex.execution.dataframe_stream module
- class delex.execution.dataframe_stream.DataFrameStream(stream: Iterable, schema: dict)
Bases:
object
a stream of dataframes on which tranformations can be applied this replaces pyspark based execution for resource allocation issues
- Attributes:
- schema
Methods
apply
(func, input_cols, out_name, out_dtype)apply func with input_cols to this stream and append the result to the stream as out_name with data type out_dtype return a new DataFrameStream
drop
(columns)drop columns from this dataframe stream, return a new stream Parameters ---------- columns : list[str | tuple] the list of input columns for func, if nested tuples are provided
from_arrow_iter
(itr, schema)create a DataFrameStream from an iterable of pyarrow RecordBatchs and a pyarrow Schema
from_pandas_iter
(itr, schema)create a DataFrameStream from an iterable of pd.DataFrames and a pyspark Schema
pyarrow_schema
([flat])return schema as a pyarrow schema
spark_schema
([flat])return schema as a pyspark schema
convert this dataframe stream into an iterator of pyarrow RecordBatchs
convert this dataframe stream into an iterator of pandas DataFrames
- apply(func: Callable, input_cols: list[str | tuple], out_name: str, out_dtype: DataType)
apply func with input_cols to this stream and append the result to the stream as out_name with data type out_dtype return a new DataFrameStream
- Parameters:
- funcCallable
the function that will be executed over the stream
- input_colslist[str | tuple]
the list of input columns for func, if nested tuples are provided
- out_namestr
the name of the output column to be added to the stream
- out_dtypeT.DataType
the type returned by func
- Returns:
- DataFrameStream
a new dataframe stream
- Raises:
- KeyError
if any of input_cols cannot be resolved
- drop(columns)
drop columns from this dataframe stream, return a new stream Parameters ———- columns : list[str | tuple]
the list of input columns for func, if nested tuples are provided
- Returns:
- DataFrameStream
a new dataframe stream with columns removed
- Raises:
- KeyError
if any of columns cannot be resolved
- classmethod from_arrow_iter(itr: Iterable[RecordBatch], schema: Schema)
create a DataFrameStream from an iterable of pyarrow RecordBatchs and a pyarrow Schema
- classmethod from_pandas_iter(itr: Iterable[DataFrame], schema: StructType)
create a DataFrameStream from an iterable of pd.DataFrames and a pyspark Schema
- pyarrow_schema(flat: bool = False) Schema
return schema as a pyarrow schema
- Parameters:
- flatbool = False
if True return the schema in flattened format
- Returns:
- pa.Schema
- property schema: dict
- spark_schema(flat: bool = False) StructType
return schema as a pyspark schema
- Parameters:
- flatbool = False
if True return the schema in flattened format
- Returns:
- T.StructType
- to_arrow_stream() Iterator[RecordBatch]
convert this dataframe stream into an iterator of pyarrow RecordBatchs
- to_pandas_stream() Iterator[DataFrame]
convert this dataframe stream into an iterator of pandas DataFrames
delex.execution.graph_executor module
- class delex.execution.graph_executor.GraphExecutionStats(*, nodes: list[Node], sub_graph_stats: list[SubGraphExecutionStats], dot_graph: str)
Bases:
BaseModel
execution statistics for an entire execution plan
- Attributes:
- build_time
- exec_time
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
- total_time
- working_set_size
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)Override this method to perform additional initialization after __init__ and model_construct.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- property build_time: float
- dot_graph: str
- property exec_time: float
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- sub_graph_stats: list[SubGraphExecutionStats]
- property total_time: float
- property working_set_size: float
- class delex.execution.graph_executor.GraphExecutor(*, index_table: DataFrame, search_table: DataFrame, build_parallelism: Annotated[int, Gt(gt=0)] = 4, index_table_id_col: str = '_id', ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None = None, cost_est: CostEstimator | None = None)
Bases:
BaseModel
a class for executing a execution graph over two dataframes
- Attributes:
- index_table_count
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
- use_chunking
- use_cost_estimation
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
execute
(sink[, projection])execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)This function is meant to behave like a BaseModel method to initialise private attributes.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- build_parallelism: Annotated[int, Gt(gt=0)]
- cost_est: CostEstimator | None
- execute(sink: Node, projection: list[str] | None = None)
execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink
- Parameters:
- sinkNode
the sink of the execution graph
- projectionOptional[list[str]] = None
columns to be projected along with the output of sink
- Raises:
- ValueError
if sink is not a sink in the graph
- index_table: DataFrame
- property index_table_count: int
- index_table_id_col: str
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None
- search_table: DataFrame
- property use_chunking: bool
- property use_cost_estimation: bool
- class delex.execution.graph_executor.PartitionExecutionStats(*, partitioner: DataFramePartitioner | None, part_num: int | None, build_time: float, exec_time: float, working_set_size: int)
Bases:
BaseModel
execution statistics for a single partition
- Attributes:
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)Override this method to perform additional initialization after __init__ and model_construct.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- build_time: float
- exec_time: float
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- part_num: int | None
- partitioner: DataFramePartitioner | None
- working_set_size: int
- class delex.execution.graph_executor.SubGraphExecutionStats(*, nodes: list[Node], partition_stats: list[PartitionExecutionStats])
Bases:
BaseModel
execution statistics for a subgraph
- Attributes:
- build_time
- exec_time
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
- total_time
- working_set_size
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)Override this method to perform additional initialization after __init__ and model_construct.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- property build_time: float
- property exec_time: float
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- partition_stats: list[PartitionExecutionStats]
- property total_time: float
- property working_set_size: float
delex.execution.optimizer module
- class delex.execution.optimizer.BlockingProgramOptimizer
Bases:
object
a class for converting a BlockingProgram into an execution plan of Nodes, optionally applying optimizations
Methods
default_plan
(blocking_program)create a default execution plan for blocking_program
optimize
(blocking_program[, cost_est])create an optimized execution plan for blocking_program, optionally using cost_est If cost_est is not supplied, the optimizer simply indexes the least number of predicates possible and generates a default plan using those nodes.
preprocess
(blocking_program)preprocess the blocking program by dropping any redundant rules or predicates.
- default_plan(blocking_program: BlockingProgram) Node
create a default execution plan for blocking_program
- Parameters:
- blocking_programBlockingProgram
the blocking program that will be turned into an execution plan
- Returns:
- Node
the sink of the execution plan
- Raises:
- ValueError
if blocking_program cannot be turned into an efficient execution plan, i.e. it would require executing over the cross product of the tables
- optimize(blocking_program: BlockingProgram, cost_est: CostEstimator | None = None)
create an optimized execution plan for blocking_program, optionally using cost_est If cost_est is not supplied, the optimizer simply indexes the least number of predicates possible and generates a default plan using those nodes.
- Parameters:
- blocking_programBlockingProgram
the blocking program that will be turned into an execution plan
- cost_estOptional[cost_est] = None
the cost estimator used for optimizing blocking_program
- Returns:
- Node
the sink of the execution plan
- Raises:
- ValueError
if blocking_program cannot be turned into an efficient execution plan, i.e. it would require executing over the cross product of the tables
- preprocess(blocking_program: BlockingProgram) BlockingProgram
preprocess the blocking program by dropping any redundant rules or predicates. That is, remove anything that doens’t affect the output of blocking_program
delex.execution.partitioner module
- class delex.execution.partitioner.DataFramePartitioner(column: str, nparts: Annotated[int, Gt(gt=0)])
Bases:
object
A simple class for hash paritioning dataframes using the xxhash64 implementation in pyspark
Methods
filter_array
(ids, arr, pnum)filter an array column based on ids
get_partition
(df, pnum)get partition pnum of df
- column: str
- filter_array(ids: str | Column, arr: str | Column | None, pnum: int)
filter an array column based on ids
- Parameters:
- idsstr | pyspark.sql.Column
array<long> column used to partition the dataframe
- arrOptional[str | pyspark.sql.Column]
the array column that will be filtered, and returned, if not provided, ids will be filtered and returned
- pnumint
the partition number
- Returns:
- pyspark.sql.Column
a column expression for the filtered array
- Raises:
- ValueError
if pnum < 0 or pnum >= self.nparts:
- get_partition(df: DataFrame, pnum: int) DataFrame
get partition pnum of df
- Parameters:
- dfpyspark.sql.DataFrame
the dataframe to be partitioned
- pnumint
the partition number
- Returns:
- pyspark.sql.DataFrame
the parition of df
- Raises:
- ValueError
if pnum < 0 or pnum >= self.nparts:
- nparts: Annotated[int, Gt(gt=0)]
delex.execution.plan_executor module
- class delex.execution.plan_executor.PlanExecutionStats(*, optimize_time: float, cost_estimation_time: float, graph_exec_stats: GraphExecutionStats)
Bases:
BaseModel
- Attributes:
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
- total_time
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)Override this method to perform additional initialization after __init__ and model_construct.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- cost_estimation_time: float
- graph_exec_stats: GraphExecutionStats
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- optimize_time: float
- property total_time: float
- class delex.execution.plan_executor.PlanExecutor(*, index_table: DataFrame, search_table: DataFrame, build_parallelism: Annotated[int, Gt(gt=0)] = 4, index_table_id_col: str = '_id', ram_size_in_bytes: Annotated[int, Gt(gt=0)] | None = None, cost_est: CostEstimator | None = None, optimize: bool, estimate_cost: bool)
Bases:
GraphExecutor
- Attributes:
- index_table_count
model_extra
Get extra fields set during validation.
model_fields_set
Returns the set of fields that have been explicitly set on this model instance.
- use_chunking
- use_cost_estimation
Methods
copy
(*[, include, exclude, update, deep])Returns a copy of the model.
execute
(prog[, projection])execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink
model_construct
([_fields_set])Creates a new instance of the Model class with validated data.
model_copy
(*[, update, deep])!!! abstract "Usage Documentation"
model_dump
(*[, mode, include, exclude, ...])!!! abstract "Usage Documentation"
model_dump_json
(*[, indent, include, ...])!!! abstract "Usage Documentation"
model_json_schema
([by_alias, ref_template, ...])Generates a JSON schema for a model class.
model_parametrized_name
(params)Compute the class name for parametrizations of generic classes.
model_post_init
(context, /)This function is meant to behave like a BaseModel method to initialise private attributes.
model_rebuild
(*[, force, raise_errors, ...])Try to rebuild the pydantic-core schema for the model.
model_validate
(obj, *[, strict, ...])Validate a pydantic model instance.
model_validate_json
(json_data, *[, strict, ...])!!! abstract "Usage Documentation"
model_validate_strings
(obj, *[, strict, ...])Validate the given object with string data against the Pydantic model.
construct
dict
from_orm
generate_plan
json
parse_file
parse_obj
parse_raw
schema
schema_json
update_forward_refs
validate
- estimate_cost: bool
- execute(prog, projection=None)
execute the graph sink over self.index_table and self.search_table optionally, projecting columns projection along with the output of executing sink
- Parameters:
- sinkNode
the sink of the execution graph
- projectionOptional[list[str]] = None
columns to be projected along with the output of sink
- Raises:
- ValueError
if sink is not a sink in the graph
- generate_plan(prog)
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- optimize: bool