sparkly.index package

Submodules

sparkly.index.index_base module

class sparkly.index.index_base.Index

Bases: ABC

Attributes:
config

Methods

delete_docs

search

search_many

upsert_docs

abstract property config
abstractmethod delete_docs(ids) int
abstractmethod search(doc, query_spec, limit)
abstractmethod search_many(docs, query_spec, limit)
abstractmethod upsert_docs(df) None
class sparkly.index.index_base.QueryResult(ids, scores, search_time)

Bases: tuple

Attributes:
ids

Alias for field number 0

scores

Alias for field number 1

search_time

Alias for field number 2

Methods

count(value, /)

Return number of occurrences of value.

index(value[, start, stop])

Return first index of value.

ids

Alias for field number 0

scores

Alias for field number 1

search_time

Alias for field number 2

sparkly.index.index_config module

sparkly.index.lucene_index module

class sparkly.index.lucene_index.LuceneIndex(index_path: Path | str, config: IndexConfig, delete_if_exists: bool = True)

Bases: Index

Attributes:
config

the index config used to build this index

index_path
is_built

True if this index has been built else False

is_on_spark

True if this index has been distributed to the spark workers else False

query_gen

the query generator for this index

Methods

deinit()

release resources held by this Index

get_full_query_spec([cross_fields])

get a query spec that uses all indexed columns

init()

initialize the index for usage in a spark worker.

num_indexed_docs()

get the number of indexed documents

search(doc, query_spec, limit)

perform search for doc according to query_spec return at most limit docs

search_many(docs, query_spec, limit)

perform search for the documents in docs according to query_spec return at most limit docs per document docs.

to_spark()

send this index to the spark cluster.

upsert_docs(df[, disable_distributed, ...])

build the index, indexing df according to self.config

delete_docs

id_to_lucene_id

score_docs

ANALYZERS = {'2gram': <class 'sparkly.analysis.Gram2Analyzer'>, '3gram': <class 'sparkly.analysis.Gram3Analyzer'>, '4gram': <class 'sparkly.analysis.Gram4Analyzer'>, 'shingle': <function get_shingle_analyzer>, 'standard': <function get_standard_analyzer_no_stop_words>, 'standard36edgegram': <class 'sparkly.analysis.StandardEdgeGram36Analyzer'>, 'standard_stopwords': <class 'org.apache.lucene.analysis.standard.StandardAnalyzer'>, 'stripped_3gram': <class 'sparkly.analysis.StrippedGram3Analyzer'>, 'unfiltered_3gram': <class 'sparkly.analysis.UnfilteredGram3Analyzer'>, 'unfiltered_5gram': <class 'sparkly.analysis.UnfilteredGram5Analyzer'>}
LUCENE_DIR = 'LUCENE_INDEX'
PY_META_FILE = 'PY_META.json'
property config

the index config used to build this index

Returns:
IndexConfig
deinit()

release resources held by this Index

delete_docs(ids)
get_full_query_spec(cross_fields: bool = False)

get a query spec that uses all indexed columns

Parameters:
cross_fieldsbool, default = False

if True return <FIELD> -> <CONCAT FIELD> in the query spec if FIELD is used to create CONCAT_FIELD else just return <FIELD> -> <FIELD> and <CONCAT_FIELD> -> <CONCAT_FIELD> pairs

Returns:
QuerySpec
id_to_lucene_id(i)
property index_path
init()

initialize the index for usage in a spark worker. This method must be called before calling search or search_many.

property is_built

True if this index has been built else False

Returns:
bool
property is_on_spark

True if this index has been distributed to the spark workers else False

Returns:
bool
num_indexed_docs()

get the number of indexed documents

property query_gen

the query generator for this index

Returns:
LuceneQueryGenerator
score_docs(ids, queries: dict)
search(doc: Series | dict, query_spec: QuerySpec, limit: Annotated[int, Gt(gt=0)])

perform search for doc according to query_spec return at most limit docs

Parameters:
docpd.Series or dict

the record for searching

query_specQuerySpec

the query template that specifies how to search for doc

limitint

the maximum number of documents returned

Returns:
QueryResult

the documents matching the doc

search_many(docs: DataFrame, query_spec: QuerySpec, limit: Annotated[int, Gt(gt=0)])

perform search for the documents in docs according to query_spec return at most limit docs per document docs.

Parameters:
docpd.DataFrame

the records for searching

query_specQuerySpec

the query template that specifies how to search for doc

limitint

the maximum number of documents returned

Returns:
pd.DataFrame

the search results for each document in docs, indexed by docs.index

to_spark()

send this index to the spark cluster. subsequent uses will read files from SparkFiles, allowing spark workers to perform search with a local copy of the index.

upsert_docs(df: DataFrame | DataFrame, disable_distributed: bool = False, force_distributed: bool = False)

build the index, indexing df according to self.config

Parameters:
dfpd.DataFrame or pyspark DataFrame

the table that will be indexed, if a pyspark DataFrame is provided, the build will be done in parallel for suffciently large tables

disable_distributedbool, default=False

disable using spark for building the index even for large tables

force_distributedbool, default=False

force using spark for building the index even for smaller tables

Module contents