๐Ÿ“„ Arena Hard Hf Space Processing Stepsยถ

operators.arena_hard_hf_space_processing_steps

SequentialOperator(
    steps=[
        Rename(
            field_to_field={
                "cluster": "group",
            },
            apply_to_streams=[
                "questions",
            ],
        ),
        Copy(
            field_to_field={
                "turns/0/content": "model_input",
            },
            apply_to_streams=[
                "questions",
            ],
        ),
        Copy(
            field_to_field={
                "choices/0/turns/0/content": "model_output",
                "choices/0/turns/0/token_len": "model_output_token_len",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        Apply(
            function="str.lower",
            to_field="model_id",
            apply_to_streams=[
                "model_answer",
            ],
            _argv=[
                "model_id",
            ],
        ),
        Copy(
            field_to_field={
                "games/0/user_prompt": "judge_input_model_1_ordered_first",
                "games/1/user_prompt": "judge_input_model_2_ordered_first",
                "games/0/judgment": "judge_output_model_1_ordered_first",
                "games/1/judgment": "judge_output_model_2_ordered_first",
                "games/0/score": "score_model_1_ordered_first",
                "games/1/score": "score_model_2_ordered_first",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Rename(
            field_to_field={
                "model": "model_2",
                "judge": "judge_model_id",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Set(
            fields={
                "model_1": "gpt-4-0314",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Cast(
            field="judge_input_model_1_ordered_first",
            to="str",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Cast(
            field="judge_input_model_2_ordered_first",
            to="str",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Lower(
            field="model_1",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Lower(
            field="model_2",
            apply_to_streams=[
                "judgment",
            ],
        ),
        FilterByCondition(
            values={
                "score_model_1_ordered_first": [
                    "A=B",
                    "A>B",
                    "A>>B",
                    "B>A",
                    "B>>A",
                ],
                "score_model_2_ordered_first": [
                    "A=B",
                    "A>B",
                    "A>>B",
                    "B>A",
                    "B>>A",
                ],
            },
            condition="in",
            apply_to_streams=[
                "judgment",
            ],
        ),
        JoinStreams(
            left_stream="questions",
            right_stream="judgment",
            how="inner",
            on=[
                "question_id",
            ],
            new_stream_name="merged_stream",
        ),
        Rename(
            field_to_field={
                "model_id": "model_1",
                "model_output": "model_1_output",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        JoinStreams(
            left_stream="merged_stream",
            right_stream="model_answer",
            how="inner",
            on=[
                "question_id",
                "model_1",
            ],
            new_stream_name="merged_stream",
        ),
        Rename(
            field_to_field={
                "model_1": "model_2",
                "model_1_output": "model_2_output",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        JoinStreams(
            left_stream="merged_stream",
            right_stream="model_answer",
            how="inner",
            on=[
                "question_id",
                "model_2",
            ],
            new_stream_name="merged_stream",
        ),
        DeleteSplits(
            splits=[
                "questions",
                "model_answer",
                "judgment",
            ],
        ),
        RenameSplits(
            mapper={
                "merged_stream": "test",
            },
        ),
        SelectFields(
            fields=[
                "question_id",
                "category",
                "model_input",
                "model_1",
                "model_2",
                "judge_model_id",
                "model_1_output",
                "model_2_output",
                "score_model_1_ordered_first",
                "score_model_2_ordered_first",
                "judge_input_model_1_ordered_first",
                "judge_input_model_2_ordered_first",
                "judge_output_model_1_ordered_first",
                "judge_output_model_2_ordered_first",
            ],
        ),
    ],
)
[source]

from unitxt.operators import Apply, Cast, Copy, FilterByCondition, Rename, SelectFields, Set
from unitxt.processors import Lower
from unitxt.splitters import RenameSplits
from unitxt.stream_operators import DeleteSplits, JoinStreams

Explanation about SequentialOperatorยถ

A class representing a sequential operator in the streaming system.

A sequential operator is a type of MultiStreamOperator that applies a sequence of other operators to a MultiStream. It maintains a list of StreamingOperator`s and applies them in order to the `MultiStream.

Explanation about Copyยถ

Copies values from specified fields to specified fields.

Args (of parent class):

field_to_field (Union[List[List], Dict[str, str]]): A list of lists, where each sublist contains the source field and the destination field, or a dictionary mapping source fields to destination fields.

Examples:

An input instance {โ€œaโ€: 2, โ€œbโ€: 3}, when processed by Copy(field_to_field={"a": "b"}) would yield {โ€œaโ€: 2, โ€œbโ€: 2}, and when processed by Copy(field_to_field={"a": "c"}) would yield {โ€œaโ€: 2, โ€œbโ€: 3, โ€œcโ€: 2}

with field names containing / , we can also copy inside the field: Copy(field="a/0",to_field="a") would process instance {โ€œaโ€: [1, 3]} into {โ€œaโ€: 1}

Explanation about DeleteSplitsยถ

Operator which delete splits in stream.

Attributes:

splits (List[str]): The splits to delete from the stream.

Explanation about JoinStreamsยถ

Join multiple streams into a single stream.

Args:

left_stream (str): The stream that will be considered the โ€œleftโ€ in the join operations. right_stream (str): The stream that will be considered the โ€œrightโ€ in the join operations. how (Literal[โ€œleftโ€, โ€œrightโ€, โ€œinnerโ€, โ€œouterโ€, โ€œcrossโ€]): The type of join to be performed. on (Optional[List[str]]): Column names to join on. These must be found in both streams. left_on (Optional[List[str]]): Column names to join on in the left stream. right_on (Optional[List[str]]): Column names to join on in the right streasm. new_stream_name (str): The name of the new stream resulting from the merge.

Examples:

JoinStreams(left_stream = โ€œquestionsโ€, right_stream = โ€œanswersโ€, how=โ€innerโ€, on=โ€question_idโ€, new_stream_name=โ€question_with_answersโ€ ) Join the โ€˜questionโ€™ and โ€˜answerโ€™ stream based on the โ€˜question_idโ€™ field using inner join, resulting with a new stream named โ€œquestion_with_answersโ€. JoinStreams(left_stream = โ€œquestionsโ€, right_stream = โ€œanswersโ€, how=โ€innerโ€, on_left=โ€question_idโ€, on_right=โ€questionโ€ new_stream_name=โ€question_with_answersโ€ ) Join the โ€˜questionโ€™ and โ€˜answerโ€™ stream based on the โ€˜question_idโ€™ field in the left stream and the โ€˜questionโ€™ field in the right stream, using inner join, resulting with a new stream named โ€œquestion_with_answersโ€. This is suitable when the fields have different labels across the streams.

Explanation about FilterByConditionยถ

Filters a stream, yielding only instances in which the values in required fields follow the required condition operator.

Raises an error if a required field name is missing from the input instance.

Args:

values (Dict[str, Any]): Field names and respective Values that instances must match according the condition, to be included in the output.

condition: the name of the desired condition operator between the specified (sub) fieldโ€™s value and the provided constant value. Supported conditions are (โ€œgtโ€, โ€œgeโ€, โ€œltโ€, โ€œleโ€, โ€œneโ€, โ€œeqโ€, โ€œinโ€,โ€not inโ€)

error_on_filtered_all (bool, optional): If True, raises an error if all instances are filtered out. Defaults to True.

Examples:
FilterByCondition(values = {"a":4}, condition = "gt") will yield only instances where field "a" contains a value > 4
FilterByCondition(values = {"a":4}, condition = "le") will yield only instances where "a"<=4
FilterByCondition(values = {"a":[4,8]}, condition = "in") will yield only instances where "a" is 4 or 8
FilterByCondition(values = {"a":[4,8]}, condition = "not in") will yield only instances where "a" is different from 4 or 8
FilterByCondition(values = {"a/b":[4,8]}, condition = "not in") will yield only instances where "a" is a dict in which key "b" is mapped to a value that is neither 4 nor 8
FilterByCondition(values = {"a[2]":4}, condition = "le") will yield only instances where โ€œaโ€ is a list whose 3-rd element is <= 4
FilterByCondition(values = {"a":False}, condition = "exists") will yield only instances which do not contain a field named "a"
FilterByCondition(values = {"a/b":True}, condition = "exists") will yield only instances which contain a field named "a" whose value is a dict containing, in turn, a field named "b"

Explanation about Applyยถ

A class used to apply a python function and store the result in a field.

Args:

function (str): name of function. to_field (str): the field to store the result

any additional arguments are field names whose values will be passed directly to the function specified

Examples: Store in field โ€œbโ€ the uppercase string of the value in field โ€œaโ€: Apply("a", function=str.upper, to_field="b")

Dump the json representation of field โ€œtโ€ and store back in the same field: Apply("t", function=json.dumps, to_field="t")

Set the time in a field โ€˜bโ€™: Apply(function=time.time, to_field="b")

Explanation about SelectFieldsยถ

Keep only specified fields from each instance in a stream.

Args:

fields (List[str]): The fields to keep from each instance.

Explanation about Castยถ

Casts specified fields to specified types.

Args:

default (object): A dictionary mapping field names to default values for cases of casting failure. process_every_value (bool): If true, all fields involved must contain lists, and each value in the list is then casted. Defaults to False.

Explanation about Renameยถ

Renames fields.

Move value from one field to another, potentially, if field name contains a /, from one branch into another. Remove the from field, potentially part of it in case of / in from_field.

Examples:

Rename(field_to_field={โ€œbโ€: โ€œcโ€}) will change inputs [{โ€œaโ€: 1, โ€œbโ€: 2}, {โ€œaโ€: 2, โ€œbโ€: 3}] to [{โ€œaโ€: 1, โ€œcโ€: 2}, {โ€œaโ€: 2, โ€œcโ€: 3}]

Rename(field_to_field={โ€œbโ€: โ€œc/dโ€}) will change inputs [{โ€œaโ€: 1, โ€œbโ€: 2}, {โ€œaโ€: 2, โ€œbโ€: 3}] to [{โ€œaโ€: 1, โ€œcโ€: {โ€œdโ€: 2}}, {โ€œaโ€: 2, โ€œcโ€: {โ€œdโ€: 3}}]

Rename(field_to_field={โ€œbโ€: โ€œb/dโ€}) will change inputs [{โ€œaโ€: 1, โ€œbโ€: 2}, {โ€œaโ€: 2, โ€œbโ€: 3}] to [{โ€œaโ€: 1, โ€œbโ€: {โ€œdโ€: 2}}, {โ€œaโ€: 2, โ€œbโ€: {โ€œdโ€: 3}}]

Rename(field_to_field={โ€œb/c/eโ€: โ€œb/dโ€}) will change inputs [{โ€œaโ€: 1, โ€œbโ€: {โ€œcโ€: {โ€œeโ€: 2, โ€œfโ€: 20}}}] to [{โ€œaโ€: 1, โ€œbโ€: {โ€œcโ€: {โ€œfโ€: 20}, โ€œdโ€: 2}}]

Explanation about Setยถ

Sets specified fields in each instance, in a given stream or all streams (default), with specified values. If fields exist, updates them, if do not exist โ€“ adds them.

Args:

fields (Dict[str, object]): The fields to add to each instance. Use โ€˜/โ€™ to access inner fields

use_deepcopy (bool) : Deep copy the input value to avoid later modifications

Examples:

# Set a value of a list consisting of โ€œpositiveโ€ and โ€œnegativeโ€ do field โ€œclassesโ€ to each and every instance of all streams Set(fields={"classes": ["positive","negatives"]})

# In each and every instance of all streams, field โ€œspanโ€ is to become a dictionary containing a field โ€œstartโ€, in which the value 0 is to be set Set(fields={"span/start": 0}

# In all instances of stream โ€œtrainโ€ only, Set field โ€œclassesโ€ to have the value of a list consisting of โ€œpositiveโ€ and โ€œnegativeโ€ Set(fields={"classes": ["positive","negatives"], apply_to_stream=["train"]})

# Set field โ€œclassesโ€ to have the value of a given list, preventing modification of original list from changing the instance. Set(fields={"classes": alist}), use_deepcopy=True) if now alist is modified, still the instances remain intact.

Read more about catalog usage here.