Adding Stream Operators and Metrics

In this section we will add brand new stream operators and metrics to use in our processing pipelines.

Adding a new stream operator

Create a new class that extends the StreamInstanceOperator or any other Stream Operator class.

from unitxt.operator import StreamInstanceOperator

class AddFields(StreamInstanceOperator):
    fields: Dict[str, object]

    def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
        return {**instance, **self.fields}

To test that our operator works as expected, we can use the unitxt built-in testing suit:

from unitxt.test_utils.operators import check_operator

operator = AddFields(fields={"b": 2})

inputs = [{'a': 1}, {'a': 2}]
targets = [{'a': 1, 'b': 2}, {'a': 2, 'b': 2}]

print(check_operator(operator, inputs, targets)) # True

Adding a new metric

Create a new class that extends the Metric or any other Metric class.

class Accuracy(SingleReferenceInstanceMetric):
    reduction_map = {"mean": ["accuracy"]}
    main_score = "accuracy"

    def compute(self, reference, prediction: str) -> dict:
        return {"accuracy": float(str(reference) == str(prediction))}

Other base classes for metrics are: InstanceMetric, GlobalMetric.

To test our metric work as expected we can use unitxt built in testing suit:

from unitxt.test_utils.metrics import test_metric

metric = Accuracy()

predictions = ['positive', 'negative']
references = [['positive'], ['positive']]
target = {'accuracy': 0.5}

print(test_metric(metric, predictions, references, target)) # True