📄 Arena Hard Hf Space Processing Steps¶
operators.arena_hard_hf_space_processing_steps
type: SequentialOperator
steps:
- type: Rename
field_to_field:
cluster: group
apply_to_streams:
- questions
- type: Copy
field_to_field:
turns/0/content: model_input
apply_to_streams:
- questions
- type: Copy
field_to_field:
choices/0/turns/0/content: model_output
choices/0/turns/0/token_len: model_output_token_len
apply_to_streams:
- model_answer
- type: Apply
function: str.lower
to_field: model_id
apply_to_streams:
- model_answer
_argv:
- model_id
- type: Copy
field_to_field:
games/0/user_prompt: judge_input_model_1_ordered_first
games/1/user_prompt: judge_input_model_2_ordered_first
games/0/judgment: judge_output_model_1_ordered_first
games/1/judgment: judge_output_model_2_ordered_first
games/0/score: score_model_1_ordered_first
games/1/score: score_model_2_ordered_first
apply_to_streams:
- judgment
- type: Rename
field_to_field:
model: model_2
judge: judge_model_id
apply_to_streams:
- judgment
- type: Set
fields:
model_1: gpt-4-0314
apply_to_streams:
- judgment
- type: Apply
function: str
to_field: judge_input_model_1_ordered_first
apply_to_streams:
- judgment
_argv:
- judge_input_model_1_ordered_first
- type: Apply
function: str
to_field: judge_input_model_2_ordered_first
apply_to_streams:
- judgment
_argv:
- judge_input_model_2_ordered_first
- type: Apply
function: str.lower
to_field: model_1
apply_to_streams:
- judgment
_argv:
- model_1
- type: Apply
function: str.lower
to_field: model_2
apply_to_streams:
- judgment
_argv:
- model_2
- type: FilterByCondition
values:
score_model_1_ordered_first:
- A=B
- A>B
- A>>B
- B>A
- B>>A
score_model_2_ordered_first:
- A=B
- A>B
- A>>B
- B>A
- B>>A
condition: in
apply_to_streams:
- judgment
- type: JoinStreams
left_stream: questions
right_stream: judgment
how: inner
on:
- question_id
new_stream_name: merged_stream
- type: Rename
field_to_field:
model_id: model_1
model_output: model_1_output
apply_to_streams:
- model_answer
- type: JoinStreams
left_stream: merged_stream
right_stream: model_answer
how: inner
on:
- question_id
- model_1
new_stream_name: merged_stream
- type: Rename
field_to_field:
model_1: model_2
model_1_output: model_2_output
apply_to_streams:
- model_answer
- type: JoinStreams
left_stream: merged_stream
right_stream: model_answer
how: inner
on:
- question_id
- model_2
new_stream_name: merged_stream
- type: DeleteSplits
splits:
- questions
- model_answer
- judgment
- type: RenameSplits
mapper:
merged_stream: test
- type: SelectFields
fields:
- question_id
- category
- model_input
- model_1
- model_2
- judge_model_id
- model_1_output
- model_2_output
- score_model_1_ordered_first
- score_model_2_ordered_first
- judge_input_model_1_ordered_first
- judge_input_model_2_ordered_first
- judge_output_model_1_ordered_first
- judge_output_model_2_ordered_first
[source]Explanation about SequentialOperator¶
A class representing a sequential operator in the streaming system.
A sequential operator is a type of MultiStreamOperator that applies a sequence of other operators to a MultiStream. It maintains a list of StreamingOperator`s and applies them in order to the `MultiStream.
Explanation about Apply¶
A class used to apply a python function and store the result in a field.
- Args:
function (str): name of function. to_field (str): the field to store the result any additional arguments are field names whose values will be passed directly to the function specified
Examples: Store in field “b” the uppercase string of the value in field “a” Apply(“a”, function=str.upper, to_field=”b”)
Dump the json representation of field “t” and store back in the same field. Apply(“t”, function=json.dumps, to_field=”t”)
Set the time in a field ‘b’. Apply(function=time.time, to_field=”b”)
Explanation about FilterByCondition¶
Filters a stream, yielding only instances in which the values in required fields follow the required condition operator.
Raises an error if a required field name is missing from the input instance.
- Args:
values (Dict[str, Any]): Field names and respective Values that instances must match according the condition, to be included in the output. condition: the name of the desired condition operator between the specified (sub) field’s value and the provided constant value. Supported conditions are (“gt”, “ge”, “lt”, “le”, “ne”, “eq”, “in”,”not in”) error_on_filtered_all (bool, optional): If True, raises an error if all instances are filtered out. Defaults to True.
- Examples:
FilterByCondition(values = {“a”:4}, condition = “gt”) will yield only instances where field “a” contains a value > 4 FilterByCondition(values = {“a”:4}, condition = “le”) will yield only instances where “a”<=4 FilterByCondition(values = {“a”:[4,8]}, condition = “in”) will yield only instances where “a” is 4 or 8 FilterByCondition(values = {“a”:[4,8]}, condition = “not in”) will yield only instances where “a” different from 4 or 8 FilterByCondition(values = {“a/b”:[4,8]}, condition = “not in”) will yield only instances where “a” is
a dict in which key “b” is mapped to a value that is neither 4 nor 8
- FilterByCondition(values = {“a[2]”:4}, condition = “le”) will yield only instances where “a” is a list whose 3-rd
element is <= 4
Explanation about JoinStreams¶
Join multiple streams into a single stream.
- Args:
left_stream (str): The stream that will be considered the “left” in the join operations. right_stream (str): The stream that will be considered the “right” in the join operations. how (Literal[“left”, “right”, “inner”, “outer”, “cross”]): The type of join to be performed. on (Optional[List[str]]): Column names to join on. These must be found in both streams. left_on (Optional[List[str]]): Column names to join on in the left stream. right_on (Optional[List[str]]): Column names to join on in the right streasm. new_stream_name (str): The name of the new stream resulting from the merge.
- Examples:
JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on=”question_id”, new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field using inner join, resulting with a new stream named “question_with_answers”. JoinStreams(left_stream = “questions”, right_stream = “answers”, how=”inner”, on_left=”question_id”, on_right=”question” new_stream_name=”question_with_answers” ) Join the ‘question’ and ‘answer’ stream based on the ‘question_id’ field in the left stream and the ‘question’ field in the right stream, using inner join, resulting with a new stream named “question_with_answers”. This is suitable when the fields have different labels across the streams.
Explanation about Copy¶
Copies values from specified fields to specified fields.
- Args (of parent class):
field_to_field (Union[List[List], Dict[str, str]]): A list of lists, where each sublist contains the source field and the destination field, or a dictionary mapping source fields to destination fields.
- Examples:
An input instance {“a”: 2, “b”: 3}, when processed by Copy(field_to_field={“a”: “b”} would yield {“a”: 2, “b”: 2}, and when processed by Copy(field_to_field={“a”: “c”} would yield {“a”: 2, “b”: 3, “c”: 2}
with field names containing / , we can also copy inside the field: Copy(field=”a/0”,to_field=”a”) would process instance {“a”: [1, 3]} into {“a”: 1}
Explanation about SelectFields¶
Keep only specified fields from each instance in a stream.
- Args:
fields (List[str]): The fields to keep from each instance.
Explanation about Set¶
Adds specified fields to each instance in a given stream or all streams (default) If fields exist, updates them.
- Args:
- fields (Dict[str, object]): The fields to add to each instance.
Use ‘/’ to access inner fields
use_deepcopy (bool) : Deep copy the input value to avoid later modifications
- Examples:
# Add a ‘classes’ field with a value of a list “positive” and “negative” to all streams Set(fields={“classes”: [“positive”,”negatives”]})
# Add a ‘start’ field under the ‘span’ field with a value of 0 to all streams Set(fields={“span/start”: 0}
# Add a ‘classes’ field with a value of a list “positive” and “negative” to ‘train’ stream Set(fields={“classes”: [“positive”,”negatives”], apply_to_stream=[“train”]})
# Add a ‘classes’ field on a given list, prevent modification of original list # from changing the instance. Set(fields={“classes”: alist}), use_deepcopy=True) # if now alist is modified, still the instances remain intact.
Explanation about DeleteSplits¶
Operator which delete splits in stream.
- Attributes:
splits (List[str]): The splits to delete from the stream.
Explanation about Rename¶
Renames fields.
Move value from one field to another, potentially, if field name contains a /, from one branch into another. Remove the from field, potentially part of it in case of / in from_field.
- Examples:
Rename(field_to_field={“b”: “c”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “c”: 2}, {“a”: 2, “c”: 3}]
Rename(field_to_field={“b”: “c/d”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “c”: {“d”: 2}}, {“a”: 2, “c”: {“d”: 3}}]
Rename(field_to_field={“b”: “b/d”}) will change inputs [{“a”: 1, “b”: 2}, {“a”: 2, “b”: 3}] to [{“a”: 1, “b”: {“d”: 2}}, {“a”: 2, “b”: {“d”: 3}}]
Rename(field_to_field={“b/c/e”: “b/d”}) will change inputs [{“a”: 1, “b”: {“c”: {“e”: 2, “f”: 20}}}] to [{“a”: 1, “b”: {“c”: {“f”: 20}, “d”: 2}}]
Read more about catalog usage here.