unitxt.operator module

class unitxt.operator.InstanceOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, apply_to_streams: List[str] = None, dont_apply_to_streams: List[str] = None)[source]

Bases: StreamOperator

A class representing a stream instance operator in the streaming system.

A stream instance operator is a type of StreamOperator that operates on individual instances within a Stream. It iterates through each instance in the Stream and applies the process method. The process method should be implemented by subclasses to define the specific operations to be performed on each instance.

class unitxt.operator.InstanceOperatorValidator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, apply_to_streams: List[str] = None, dont_apply_to_streams: List[str] = None)[source]

Bases: InstanceOperator

A class representing a stream instance operator validator in the streaming system.

A stream instance operator validator is a type of InstanceOperator that includes a validation step. It operates on individual instances within a Stream and validates the result of processing each instance.

class unitxt.operator.InstanceOperatorWithMultiStreamAccess(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [])[source]

Bases: StreamingOperator

A class representing an instance operator with global access in the streaming system.

An instance operator with global access is a type of StreamingOperator that operates on individual instances within a Stream and can also access other streams. It uses the accessible_streams attribute to determine which other streams it has access to. In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.

exception unitxt.operator.MissingRequirementsError(class_name, missing_packages, version_mismatched_packages, installation_instructions)[source]

Bases: Exception

class unitxt.operator.MultiStreamOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: StreamingOperator

A class representing a multi-stream operator in the streaming system.

A multi-stream operator is a type of StreamingOperator that operates on an entire MultiStream object at once. It takes a MultiStream as input and produces a MultiStream as output. The process method should be implemented by subclasses to define the specific operations to be performed on the input MultiStream.

class unitxt.operator.Operator(data_classification_policy: List[str] = None)[source]

Bases: Artifact

exception unitxt.operator.OperatorError(exception: Exception, operators: List[Operator])[source]

Bases: Exception

classmethod from_exception(exception: Exception, operator: Operator)[source]
classmethod from_operator_error(exception: Exception, operator: Operator)[source]
class unitxt.operator.PackageRequirementsMixin(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [])[source]

Bases: Artifact

Base class used to automatically check for the existence of required Python dependencies for an artifact (e.g., Operator or Metric).

The _requirements_list is either a list of required packages or a dictionary mapping required packages to installation instructions. The _requirements_list should be used at class level definition, and the requirements at instance creation.

  • List format: Just specify the package names, optionally with version annotations (e.g., [“torch>=1.2.4”, “numpy<1.19”]).

  • Dict format: Specify package names as keys and installation instructions as values (e.g., {“torch>=1.2.4”: “Install torch with pip install torch>=1.2.4”}).

When a package version annotation is specified (like torch>=1.2.4), the check_missing_requirements method verifies that the installed version meets the specified constraint.

class unitxt.operator.PagedStreamOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, apply_to_streams: List[str] = None, dont_apply_to_streams: List[str] = None, page_size: int = 1000)[source]

Bases: StreamOperator

A class representing a paged-stream operator in the streaming system.

A paged-stream operator is a type of StreamOperator that operates on a page of instances in a Stream at a time, where a page is a subset of instances. The process method should be implemented by subclasses to define the specific operations to be performed on each page.

Parameters:

page_size (int) – The size of each page in the stream. Defaults to 1000.

class unitxt.operator.SequentialMixin(data_classification_policy: List[str] = None, max_steps: int | NoneType = None, steps: List[unitxt.operator.StreamingOperator] = [])[source]

Bases: Artifact

class unitxt.operator.SequentialOperator(data_classification_policy: List[str] = None, max_steps: int | NoneType = None, steps: List[unitxt.operator.StreamingOperator] = [], _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: MultiStreamOperator, SequentialMixin

A class representing a sequential operator in the streaming system.

A sequential operator is a type of MultiStreamOperator that applies a sequence of other operators to a MultiStream. It maintains a list of StreamingOperator`s and applies them in order to the `MultiStream.

class unitxt.operator.SequentialOperatorInitializer(data_classification_policy: List[str] = None, max_steps: int | NoneType = None, steps: List[unitxt.operator.StreamingOperator] = [], _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: SequentialOperator

A class representing a sequential operator initializer in the streaming system.

A sequential operator initializer is a type of SequntialOperator that starts with a stream initializer operator. The first operator in its list of steps is a StreamInitializerOperator, which generates the initial MultiStream based on the provided arguments and keyword arguments.

class unitxt.operator.SideEffectOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [])[source]

Bases: StreamingOperator

Base class for operators that does not affect the stream.

class unitxt.operator.SingleStreamOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, apply_to_streams: List[str] = None, dont_apply_to_streams: List[str] = None)[source]

Bases: StreamOperator

class unitxt.operator.SingleStreamReducer(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [])[source]

Bases: StreamingOperator

A class representing a single-stream reducer in the streaming system.

A single-stream reducer is a type of StreamingOperator that operates on individual Stream objects within a MultiStream and reduces each Stream to a single output value.

The process method should be implemented by subclasses to define the specific reduction operation to be performed on each Stream.

class unitxt.operator.SourceOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: MultiStreamOperator

A class representing a source operator in the streaming system.

A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The SourceOperator class is a type of MultiStreamOperator, which is a special type of StreamingOperator that generates an output stream but does not take any input streams.

When called, a SourceOperator invokes its process method, which should be implemented by all subclasses to generate the required MultiStream.

class unitxt.operator.SourceSequentialOperator(data_classification_policy: List[str] = None, max_steps: int | NoneType = None, steps: List[unitxt.operator.StreamingOperator] = [], _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: SourceOperator, SequentialMixin

A class representing a source sequential operator in the streaming system.

A source sequential operator is a type of SequentialOperator that starts with a source operator. The first operator in its list of steps is a SourceOperator, which generates the initial MultiStream that the other operators then process.

class unitxt.operator.StreamInitializerOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None)[source]

Bases: SourceOperator

A class representing a stream initializer operator in the streaming system.

A stream initializer operator is a special type of SourceOperator that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.

When called, a StreamInitializerOperator invokes its process method, passing any supplied arguments and keyword arguments. The process method should be implemented by all subclasses to generate the required MultiStream based on the given arguments and keyword arguments.

class unitxt.operator.StreamOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, apply_to_streams: List[str] = None, dont_apply_to_streams: List[str] = None)[source]

Bases: MultiStreamOperator

A class representing a single-stream operator in the streaming system.

A single-stream operator is a type of MultiStreamOperator that operates on individual Stream objects within a MultiStream. It iterates through each Stream in the MultiStream and applies the process method.

The process method should be implemented by subclasses to define the specific operations to be performed on each Stream.

class unitxt.operator.StreamingOperator(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [])[source]

Bases: Operator, PackageRequirementsMixin

Base class for all stream operators in the streaming model.

Stream operators are a key component of the streaming model and are responsible for processing continuous data streams. They perform operations such as transformations, aggregations, joins, windowing and more on these streams. There are several types of stream operators, including source operators, processing operators, etc.

As a StreamingOperator, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system. When called, a StreamingOperator must return a MultiStream.

As a subclass of Artifact, every StreamingOperator can be saved in a catalog for further usage or reference.

unitxt.operator.instance_generator(instance)[source]
unitxt.operator.instance_result(result_stream)[source]
unitxt.operator.stream_single(instance: Dict[str, Any]) Stream[source]