📄 Arena Hard Hf Space Processing Steps

operators.arena_hard_hf_space_processing_steps

SequentialOperator(
    steps=[
        Rename(
            field_to_field={
                "cluster": "group",
            },
            apply_to_streams=[
                "questions",
            ],
        ),
        Copy(
            field_to_field={
                "turns/0/content": "model_input",
            },
            apply_to_streams=[
                "questions",
            ],
        ),
        Copy(
            field_to_field={
                "choices/0/turns/0/content": "model_output",
                "choices/0/turns/0/token_len": "model_output_token_len",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        Apply(
            function="str.lower",
            to_field="model_id",
            apply_to_streams=[
                "model_answer",
            ],
            _argv=[
                "model_id",
            ],
        ),
        Copy(
            field_to_field={
                "games/0/user_prompt": "judge_input_model_1_ordered_first",
                "games/1/user_prompt": "judge_input_model_2_ordered_first",
                "games/0/judgment": "judge_output_model_1_ordered_first",
                "games/1/judgment": "judge_output_model_2_ordered_first",
                "games/0/score": "score_model_1_ordered_first",
                "games/1/score": "score_model_2_ordered_first",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Rename(
            field_to_field={
                "model": "model_2",
                "judge": "judge_model_id",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Set(
            fields={
                "model_1": "gpt-4-0314",
            },
            apply_to_streams=[
                "judgment",
            ],
        ),
        Cast(
            field="judge_input_model_1_ordered_first",
            to="str",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Cast(
            field="judge_input_model_2_ordered_first",
            to="str",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Lower(
            field="model_1",
            apply_to_streams=[
                "judgment",
            ],
        ),
        Lower(
            field="model_2",
            apply_to_streams=[
                "judgment",
            ],
        ),
        FilterByCondition(
            values={
                "score_model_1_ordered_first": [
                    "A=B",
                    "A>B",
                    "A>>B",
                    "B>A",
                    "B>>A",
                ],
                "score_model_2_ordered_first": [
                    "A=B",
                    "A>B",
                    "A>>B",
                    "B>A",
                    "B>>A",
                ],
            },
            condition="in",
            apply_to_streams=[
                "judgment",
            ],
        ),
        JoinStreams(
            left_stream="questions",
            right_stream="judgment",
            how="inner",
            on=[
                "question_id",
            ],
            new_stream_name="merged_stream",
        ),
        Rename(
            field_to_field={
                "model_id": "model_1",
                "model_output": "model_1_output",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        JoinStreams(
            left_stream="merged_stream",
            right_stream="model_answer",
            how="inner",
            on=[
                "question_id",
                "model_1",
            ],
            new_stream_name="merged_stream",
        ),
        Rename(
            field_to_field={
                "model_1": "model_2",
                "model_1_output": "model_2_output",
            },
            apply_to_streams=[
                "model_answer",
            ],
        ),
        JoinStreams(
            left_stream="merged_stream",
            right_stream="model_answer",
            how="inner",
            on=[
                "question_id",
                "model_2",
            ],
            new_stream_name="merged_stream",
        ),
        DeleteSplits(
            splits=[
                "questions",
                "model_answer",
                "judgment",
            ],
        ),
        RenameSplits(
            mapper={
                "merged_stream": "test",
            },
        ),
        SelectFields(
            fields=[
                "question_id",
                "category",
                "model_input",
                "model_1",
                "model_2",
                "judge_model_id",
                "model_1_output",
                "model_2_output",
                "score_model_1_ordered_first",
                "score_model_2_ordered_first",
                "judge_input_model_1_ordered_first",
                "judge_input_model_2_ordered_first",
                "judge_output_model_1_ordered_first",
                "judge_output_model_2_ordered_first",
            ],
        ),
    ],
)
[source]

Explanation about SequentialOperator

A class representing a sequential operator in the streaming system.

A sequential operator is a type of MultiStreamOperator that applies a sequence of other operators to a MultiStream. It maintains a list of StreamingOperator`s and applies them in order to the `MultiStream.

Explanation about Set

Sets specified fields in each instance, in a given stream or all streams (default), with specified values. If fields exist, updates them, if do not exist – adds them.

Args:

fields (Dict[str, object]): The fields to add to each instance. Use ‘/’ to access inner fields

use_deepcopy (bool) : Deep copy the input value to avoid later modifications

Examples:

# Set a value of a list consisting of “positive” and “negative” do field “classes” to each and every instance of all streams Set(fields={"classes": ["positive","negatives"]})

# In each and every instance of all streams, field “span” is to become a dictionary containing a field “start”, in which the value 0 is to be set Set(fields={"span/start": 0}

# In all instances of stream “train” only, Set field “classes” to have the value of a list consisting of “positive” and “negative” Set(fields={"classes": ["positive","negatives"], apply_to_stream=["train"]})

# Set field “classes” to have the value of a given list, preventing modification of original list from changing the instance. Set(fields={"classes": alist}), use_deepcopy=True) if now alist is modified, still the instances remain intact.

Explanation about DeleteSplits

Operator which delete splits in stream.

Attributes:

splits (List[str]): The splits to delete from the stream.

Explanation about Rename

Renames fields.

Move value from one field to another, potentially, if field name contains a /, from one branch into another. Remove the from field, potentially part of it in case of / in from_field.

Examples:

Rename(field_to_field={“b”: “c”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “c”: 2}, {“a”: 2, “c”: 3}]

Rename(field_to_field={“b”: “c/d”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “c”: {“d”: 2}}, {“a”: 2, “c”: {“d”: 3}}]

Rename(field_to_field={“b”: “b/d”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “b”: {“d”: 2}}, {“a”: 2, “b”: {“d”: 3}}]

Rename(field_to_field={“b/c/e”: “b/d”}) will change inputs [{“a”: 1, “b”: {“c”: {“e”: 2, “f”: 20}}}] to [{“a”: 1, “b”: {“c”: {“f”: 20}, “d”: 2}}]

Explanation about Copy

Copies values from specified fields to specified fields.

Args (of parent class):

field_to_field (Union[List[List], Dict[str, str]]): A list of lists, where each sublist contains the source field and the destination field, or a dictionary mapping source fields to destination fields.

Examples:

An input instance {“a”: 2, “b”: 3}, when processed by Copy(field_to_field={"a": "b"}) would yield {“a”: 2, “b”: 2}, and when processed by Copy(field_to_field={"a": "c"}) would yield {“a”: 2, “b”: 3, “c”: 2}

with field names containing / , we can also copy inside the field: Copy(field="a/0",to_field="a") would process instance {“a”: [1, 3]} into {“a”: 1}

Explanation about Apply

A class used to apply a python function and store the result in a field.

Args:

function (str): name of function. to_field (str): the field to store the result

any additional arguments are field names whose values will be passed directly to the function specified

Examples: Store in field “b” the uppercase string of the value in field “a”: Apply("a", function=str.upper, to_field="b")

Dump the json representation of field “t” and store back in the same field: Apply("t", function=json.dumps, to_field="t")

Set the time in a field ‘b’: Apply(function=time.time, to_field="b")

Explanation about Cast

Casts specified fields to specified types.

Args:

default (object): A dictionary mapping field names to default values for cases of casting failure. process_every_value (bool): If true, all fields involved must contain lists, and each value in the list is then casted. Defaults to False.

Explanation about JoinStreams

Join multiple streams into a single stream.

Args:

left_stream (str): The stream that will be considered the “left” in the join operations. right_stream (str): The stream that will be considered the “right” in the join operations. how (Literal[“left”, “right”, “inner”, “outer”, “cross”]): The type of join to be performed. on (Optional[List[str]]): Column names to join on. These must be found in both streams. left_on (Optional[List[str]]): Column names to join on in the left stream. right_on (Optional[List[str]]): Column names to join on in the right streasm. new_stream_name (str): The name of the new stream resulting from the merge.

Examples:

JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on=”question_id”, new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field using inner join, resulting with a new stream named “question_with_answers”. JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on_left=”question_id”, on_right=”question” new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field in the left stream and the ‘question’ field in the right stream, using inner join, resulting with a new stream named “question_with_answers”. This is suitable when the fields have different labels across the streams.

Explanation about SelectFields

Keep only specified fields from each instance in a stream.

Args:

fields (List[str]): The fields to keep from each instance.

Explanation about FilterByCondition

Filters a stream, yielding only instances in which the values in required fields follow the required condition operator.

Raises an error if a required field name is missing from the input instance.

Args:

values (Dict[str, Any]): Field names and respective Values that instances must match according the condition, to be included in the output.

condition: the name of the desired condition operator between the specified (sub) field’s value and the provided constant value. Supported conditions are (“gt”, “ge”, “lt”, “le”, “ne”, “eq”, “in”,”not in”)

error_on_filtered_all (bool, optional): If True, raises an error if all instances are filtered out. Defaults to True.

Examples:
FilterByCondition(values = {"a":4}, condition = "gt") will yield only instances where field "a" contains a value > 4
FilterByCondition(values = {"a":4}, condition = "le") will yield only instances where "a"<=4
FilterByCondition(values = {"a":[4,8]}, condition = "in") will yield only instances where "a" is 4 or 8
FilterByCondition(values = {"a":[4,8]}, condition = "not in") will yield only instances where "a" is different from 4 or 8
FilterByCondition(values = {"a/b":[4,8]}, condition = "not in") will yield only instances where "a" is a dict in which key "b" is mapped to a value that is neither 4 nor 8
FilterByCondition(values = {"a[2]":4}, condition = "le") will yield only instances where “a” is a list whose 3-rd element is <= 4

Read more about catalog usage here.