unitxt.stream_operators module¶
This section describes unitxt operators.
Operators: Building Blocks of Unitxt Processing Pipelines¶
Within the Unitxt framework, operators serve as the foundational elements used to assemble processing pipelines. Each operator is designed to perform specific manipulations on dictionary structures within a stream. These operators are callable entities that receive a MultiStream as input. The output is a MultiStream, augmented with the operator’s manipulations, which are then systematically applied to each instance in the stream when pulled.
Creating Custom Operators¶
To enhance the functionality of Unitxt, users are encouraged to develop custom operators.
This can be achieved by inheriting from any of the existing operators listed below or from one of the fundamental base operators
.
The primary task in any operator development is to implement the process function, which defines the unique manipulations the operator will perform.
General or Specelized Operators¶
Some operators are specielized in specific task such as:
loaders
for loading data.splitters
for fixing data splits.struct_data_operators
for structured data operators.
Other specelized operators are used by unitxt internally:
The rest of this section is dedicated for operators that operates on streams.
- class unitxt.stream_operators.DeleteSplits(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, splits: List[str] = __required__)[source]¶
Bases:
MultiStreamOperator
Operator which delete splits in stream.
- splits¶
The splits to delete from the stream.
- Type:
List[str]
- class unitxt.stream_operators.DuplicateSplit(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, split: str = __required__, to_split: str = __required__)[source]¶
Bases:
MultiStreamOperator
Operator which duplicate a split.
- split¶
The split to duplicate from the stream.
- Type:
str
- to_split¶
The duplicate split’s name.
- Type:
str
- class unitxt.stream_operators.JoinStreams(data_classification_policy: List[str] = None, _requirements_list: List[str] | Dict[str, str] = [], requirements: List[str] | Dict[str, str] = [], caching: bool = None, left_stream: str = __required__, right_stream: str = __required__, how: Literal['left', 'right', 'inner', 'outer', 'cross'] = __required__, on: List[str] | NoneType = None, left_on: List[str] | NoneType = None, right_on: List[str] | NoneType = None, new_stream_name: str = __required__)[source]¶
Bases:
MultiStreamOperator
Join multiple streams into a single stream.
- Parameters:
left_stream (str) – The stream that will be considered the “left” in the join operations.
right_stream (str) – The stream that will be considered the “right” in the join operations.
how (Literal["left", "right", "inner", "outer", "cross"]) – The type of join to be performed.
on (Optional[List[str]]) – Column names to join on. These must be found in both streams.
left_on (Optional[List[str]]) – Column names to join on in the left stream.
right_on (Optional[List[str]]) – Column names to join on in the right streasm.
new_stream_name (str) – The name of the new stream resulting from the merge.
Examples
JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on=”question_id”, new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field using inner join, resulting with a new stream named “question_with_answers”. JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on_left=”question_id”, on_right=”question” new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field in the left stream and the ‘question’ field in the right stream, using inner join, resulting with a new stream named “question_with_answers”. This is suitable when the fields have different labels across the streams.