sparkly package

Subpackages

Submodules

sparkly.index_config module

class sparkly.index_config.IndexConfig(*, store_vectors: bool = False, id_col: str = '_id', weighted_queries: bool = False)

Bases: object

Attributes:
id_col

The unique id column for the records in the index this must be a 32 or 64 bit integer

is_frozen

Returns

store_vectors

True if the term vectors in the index should be stored, else False

weighted_queries

True if the term vectors in the index should be stored, else False

Methods

add_concat_field(field, concat_fields, analyzers)

Add a new concat field to be indexed with this config

add_field(field, analyzers)

Add a new field to be indexed with this config

freeze()

from_json(data)

construct an index config from a dict or json string, see IndexConfig.to_dict for expected format

get_analyzed_fields([query_spec])

Get the fields used by the index or query_spec.

remove_field(field)

remove a field from the config

to_dict()

convert this IndexConfig to a dictionary which can easily be stored as json

to_json()

Dump this IndexConfig to a valid json strings

add_concat_field(field: str, concat_fields: Iterable[str], analyzers: Iterable[str])

Add a new concat field to be indexed with this config

Parameters:
fieldstr

The name of the field that will be added to the index

concat_fieldsset, list or tuple of strs

the fields in the table that will be concatenated together to create field

analyzersset, list or tuple of str

The names of the analyzers that will be used to index the field

add_field(field: str, analyzers: Iterable[str])

Add a new field to be indexed with this config

Parameters:
fieldstr

The name of the field in the table to the index

analyzersset, list or tuple of str

The names of the analyzers that will be used to index the field

freeze()
Returns:
IndexConfig

a frozen deepcopy of this index config

classmethod from_json(data)

construct an index config from a dict or json string, see IndexConfig.to_dict for expected format

Returns:
IndexConfig
get_analyzed_fields(query_spec=None)

Get the fields used by the index or query_spec. If query_spec is None, the fields that are used by the index are returned.

Parameters:
query_specQuerySpec, optional

if provided, the fields that are used by query_spec in creating a query

Returns:
list of str

the fields used

property id_col

The unique id column for the records in the index this must be a 32 or 64 bit integer

property is_frozen
Returns:
bool

True if this index is frozen (not modifiable) else False

remove_field(field: str)

remove a field from the config

Parameters:
fieldstr

the field to be removed from the config

Returns:
bool

True if the field existed else False

property store_vectors

True if the term vectors in the index should be stored, else False

to_dict()

convert this IndexConfig to a dictionary which can easily be stored as json

Returns:
dict

A dictionary representation of this IndexConfig

to_json()

Dump this IndexConfig to a valid json strings

Returns:
str
property weighted_queries

True if the term vectors in the index should be stored, else False

sparkly.analysis module

class sparkly.analysis.Gram2Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
class sparkly.analysis.Gram3Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
class sparkly.analysis.Gram4Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
class sparkly.analysis.PythonAlnumTokenFilter(tokenStream)

Bases: PythonFilteringTokenFilter

Attributes:
attributeClassesIterator
attributeFactory
attributeImplsIterator
class
self

Methods

State

accept

addAttribute

addAttributeImpl

captureState

cast_

clearAttributes

cloneAttributes

close

copyTo

end

endAttributes

equals

finalize

getAttribute

getAttributeClassesIterator

getAttributeFactory

getAttributeImplsIterator

getClass

hasAttribute

hasAttributes

hashCode

incrementToken

instance_

notify

notifyAll

pythonExtension

reflectAsString

reflectWith

removeAllAttributes

reset

restoreState

toString

unwrap

wait

accept()
class sparkly.analysis.StandardEdgeGram36Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
class sparkly.analysis.StrippedGram3Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
initReader(fieldName, reader)
class sparkly.analysis.UnfilteredGram3Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
class sparkly.analysis.UnfilteredGram5Analyzer

Bases: PythonAnalyzer

Attributes:
class
reuseStrategy
self

Methods

ReuseStrategy

TokenStreamComponents

cast_

close

createComponents

equals

finalize

getClass

getOffsetGap

getPositionIncrementGap

getReuseStrategy

hashCode

initReader

instance_

normalize

notify

notifyAll

pythonExtension

toString

tokenStream

wait

createComponents(fieldName)
sparkly.analysis.analyze(analyzer, text, with_offset=False)

Apply the analyzer to the text and return the tokens, optionally with offsets

Parameters:
analyzer

The lucene analyzer to be applied

textstr

the text that will be analyzer

with_offsetbool

if true, return the offsets with the tokens in the form (TOKEN, START_OFFSET, END_OFFSET)

Returns:
list of str or tuples

a list of tokens potentially with offsets

sparkly.analysis.analyze_generator(analyzer, text, with_offset=False)

Apply the analyzer to the text and return the tokens, optionally with offsets

Parameters:
analyzer

The lucene analyzer to be applied

textstr

the text that will be analyzer

with_offsetbool

if true, return the offsets with the tokens in the form (TOKEN, START_OFFSET, END_OFFSET)

Returns:
generator of str or tuples

a list of tokens potentially with offsets

sparkly.analysis.get_shingle_analyzer()
sparkly.analysis.get_standard_analyzer_no_stop_words()

sparkly.search module

class sparkly.search.Searcher(index: Index, search_chunk_size: Annotated[int, Gt(gt=0)] = 500)

Bases: object

class for performing bulk search over a dataframe

Methods

get_full_query_spec()

get a query spec that searches on all indexed fields

search(search_df, query_spec, limit[, id_col])

perform search for all the records in search_df according to query_spec

get_full_query_spec()

get a query spec that searches on all indexed fields

search(search_df: DataFrame, query_spec: QuerySpec, limit: Annotated[int, Gt(gt=0)], id_col: str = '_id')

perform search for all the records in search_df according to query_spec

Parameters:
search_dfpyspark.sql.DataFrame

the records used for searching

query_specQuerySpec

the query spec for searching

limitint

the topk that will be retrieved for each query

id_colstr

the id column from search_df that will be output with the query results

Returns:
pyspark DataFrame

a pyspark dataframe with the schema (id_col, ids array<long> , scores array<float>, search_time float)

sparkly.search.search(index, query_spec, limit, search_recs)
sparkly.search.search_gen(index, query_spec, limit, search_recs)

sparkly.utils module

class sparkly.utils.Timer

Bases: object

utility class for timing execution of code

Methods

get_interval()

get the time that has elapsed since the object was created or the last time get_interval() was called

get_total()

get total time this Timer has been alive

set_start_time()

set the start time to the current time

get_interval()

get the time that has elapsed since the object was created or the last time get_interval() was called

Returns:
float
get_total()

get total time this Timer has been alive

Returns:
float
set_start_time()

set the start time to the current time

sparkly.utils.atomic_unzip(zip_file_name, output_loc)

atomically unzip the file, that is this function is safe to call from multiple threads at the same time

Parameters:
zip_file_namestr

the name of the file to be unzipped

output_locstr

the location that the file will be unzipped to

sparkly.utils.attach_current_thread_jvm()

attach the current thread to the jvm for PyLucene

sparkly.utils.auc(x)
sparkly.utils.get_index_name(n, *postfixes)

utility function for generating index names in a uniform way

sparkly.utils.get_logger(name, level=10)

Get the logger for a module

Returns:
Logger
sparkly.utils.init_jvm(vmargs=[])

initialize the jvm for PyLucene

Parameters:
vmargslist[str]

the jvm args to the passed to the vm

sparkly.utils.invoke_task(task)

invoke a task created by joblib.delayed

sparkly.utils.is_null(o)

check if the object is null, note that this is here to get rid of the weird behavior of np.isnan and pd.isnull

sparkly.utils.is_persisted(df)

check if the pyspark dataframe is persist

sparkly.utils.kill_loky_workers()

kill all the child loky processes of this process. used to prevent joblib from sitting on resources after using joblib.Parallel to do computation

sparkly.utils.local_parquet_to_spark_df(file)
sparkly.utils.norm_auc(x)
sparkly.utils.persisted(df, storage_level=StorageLevel(True, True, False, False, 1))

context manager for presisting a dataframe in a with statement. This automatically unpersists the dataframe at the end of the context

sparkly.utils.repartition_df(df, part_size, by=None)

repartition the dataframe into chunk of size ‘part_size’ by column ‘by’

sparkly.utils.spark_to_pandas_stream(df, chunk_size, by='_id')

repartition df into chunk_size and return as iterator of pandas dataframes

sparkly.utils.type_check(var, var_name, expected)

type checking utility, throw a type error if the var isn’t the expected type

sparkly.utils.type_check_iterable(var, var_name, expected_var_type, expected_element_type)

type checking utility for iterables, throw a type error if the var isn’t the expected type or any of the elements are not the expected type

sparkly.utils.zip_dir(d, outfile=None)

Zip a directory d and output it to outfile. If outfile is not provided, the zipped file is output in /tmp

Parameters:
dstr or Path

the directory to be zipped

outfilestr or Path, optional

the output location of the zipped file

Returns:
Path

the path to the new zip file

Module contents