kazu.pipeline

Functions

batch_metrics(docs)

calc_doc_size(doc)

load_steps_and_log_memory_usage(cfg)

Loads steps based on the pipeline config, and log the memory increase after loading each step.

Classes

FailedDocsFileHandler

Log failed docs to a directory along with the relevant exception.

FailedDocsHandler

Handle failed docs.

FailedDocsLogHandler

Log failed docs as warnings.

Pipeline

Exceptions

PipelineValueError

An Exception that gets raised if arguments passed to a Pipeline are inappropriate.

exception kazu.pipeline.PipelineValueError[source]

Bases: ValueError

An Exception that gets raised if arguments passed to a Pipeline are inappropriate.

This is particularly useful to allow handling just these errors - e.g. in Kazu’s web API, where we can then highlight that there is a problem with the data passed to the API, rather than an internal error to Kazu.

class kazu.pipeline.FailedDocsFileHandler[source]

Bases: FailedDocsHandler

Log failed docs to a directory along with the relevant exception.

__call__(step_docs_map)[source]
Parameters:

step_docs_map (dict[str, list[Document]]) – a dict of step namespace and the docs that failed for it

Returns:

Return type:

None

__init__(log_dir)[source]
Parameters:

log_dir (str | Path)

class kazu.pipeline.FailedDocsHandler[source]

Bases: Protocol

Handle failed docs.

__call__(step_docs_map)[source]
Parameters:

step_docs_map (dict[str, list[Document]]) – a dict of step namespace and the docs that failed for it

Returns:

Return type:

None

__init__(*args, **kwargs)[source]
class kazu.pipeline.FailedDocsLogHandler[source]

Bases: FailedDocsHandler

Log failed docs as warnings.

__call__(step_docs_map)[source]
Parameters:

step_docs_map (dict[str, list[Document]]) – a dict of step namespace and the docs that failed for it

Returns:

Return type:

None

class kazu.pipeline.Pipeline[source]

Bases: object

__call__(docs, step_namespaces=None, step_group=None)[source]

Run the pipeline.

Parameters:
  • docs (list[Document]) – Docs to process

  • step_namespaces (Iterable[str] | None) – The namespaces of the steps to use in processing. Default behaviour is to use all steps on the pipeline in the order given when creating the pipeline. This parameter gives the flexibility to sometimes only run some of the steps.

  • step_group (str | None) – One of the pipeline’s configured step_groups to run. This is just a convenience over needing to specify common groups of step_namespaces. Note that passing both step_group and step_namespaces is incohorent and a ValueError will be raised.

Returns:

processed docs

Return type:

list[Document]

__init__(steps, failure_handler=None, profile_steps_dir=None, skip_doc_len=200000, step_groups=None)[source]

A basic pipeline, used to help run a series of steps.

Parameters:
  • steps (list[Step]) – list of steps to run

  • failure_handler (list[FailedDocsHandler] | None) – optional list of handlers to process failed docs

  • profile_steps_dir (str | None) – profile throughout of each step with tensorboard. path to log dir

  • skip_doc_len (int | None) – a maximum length for documents (in characters), above which they will be skipped. Extremely long inputs can be symptomatic of very strange text which can result in errors and excessive memory usage.

  • step_groups (dict[str, Collection[str]] | None) – groups of steps to make available for running together as a convenience. The keys are names of groups to create, and values are the namespaces of steps. Order of running steps is still taken from the steps argument, not the order within each group. To customize step running order, you can instead use the step_namespaces parameter of __call__().

prefilter_docs(docs)[source]
Parameters:

docs (list[Document])

Return type:

list[Document]

profile(step_times, batch_time, batch_metrics_dict)[source]
Parameters:
Return type:

None

reset()[source]
Return type:

None

update_failed_docs(step, failed_docs)[source]
Parameters:
Return type:

None

kazu.pipeline.batch_metrics(docs)[source]
Parameters:

docs (list[Document])

Return type:

dict[str, float]

kazu.pipeline.calc_doc_size(doc)[source]
Parameters:

doc (Document)

Return type:

int

kazu.pipeline.load_steps_and_log_memory_usage(cfg)[source]

Loads steps based on the pipeline config, and log the memory increase after loading each step.

Note that you can instantiate the pipeline directly from the config in a way that gives the same results, but this is useful for monitoring/debugging high memory usage.

Parameters:

cfg (DictConfig) – An OmegaConf config object for the kazu Pipeline. Normally created from hydra config files.

Returns:

The instantiated steps from the pipeline config

Return type:

list[Step]