unitxt.split_utils module

unitxt.split_utils.build_stream_routing(mapping)[source]

Builds the stream mapping dictionary based on the provided mapping.

The stream mapping dictionary represents the mapping of old streams to new streams and their respective probabilities. It ensures that the probabilities for each old stream do not sum up to more than one. If the sum of probabilities is less than one, a null stream (None) is included to account for the remaining probability.

Parameters:

mapping (dict) – A dictionary specifying the mapping of old streams to new streams and their respective probabilities.

Returns:

A dictionary representing the stream mapping, where each entry corresponds to an
old stream, and the value is a tuple containing the new streams and their respective

probabilities.

Return type:

dict

Example

>>> mapping = {
        'my_new_stream': {
            'my_old_stream1': 0.6,
            'my_old_stream2': 0.2
        },
        'my_new_stream2': {
            'my_old_stream1': 0.4,
            'my_old_stream2': 0.8
        }
    }
    stream_mapping = build_stream_mapping(mapping)
    logger.info(stream_mapping)
    # Output: {'my_old_stream1': (['my_new_stream', 'my_new_stream2'], [0.6, 0.4]),
    #          'my_old_stream2': (['my_new_stream', 'my_new_stream2'], [0.2, 0.8])}
unitxt.split_utils.parse_random_mix_string(input_str)[source]

Parses a string of format “source1[percentage1%]+source2[value2]+…” and returns a dictionary.

Parameters:

input_str (str) – A string containing source names and their respective proportions. The format is “source[proportion%]” or “source[proportion]”, with multiple sources separated by “+”. The proportion can be a percentage (e.g., “90%”) or a decimal number (e.g., “0.7”). If the proportion is not provided, it assumes 100%.

Returns:

A dictionary where the keys are the source names and the values are the proportions converted to floats.

If the proportion was given as a percentage, the value is divided by 100.

Return type:

dict

Raises:

ValueError – If the input string is not in the correct format.

Example

>>> parse_random_mix_string("dale[90%]+oren[0.7]+mike")
    {'dale': 0.9, 'oren': 0.7, 'mike': 1.0}
unitxt.split_utils.parse_slices_string(input_str)[source]

Parses a string of format “source1[value1:value2] + source2[value2:] + source3 + …” and returns a dictionary.

{“source1”: [(value1,value2)], “source2”: [(value2, None)], “source3”: [(None,None)]…}.

If a source appears multiple times with different indices, all index pairs are included in the list.

Parameters:

input_str (str) – A string containing source names and their respective indices. The format is “source[:index]” or “source[index:]”, with multiple sources separated by “+”. The index represents the items to be taken from the source.

Returns:

A dictionary where the keys are the source names and the values are lists of indices as tuples.

If the index is before the colon, it is represented as (None, index), if it’s after the colon, it’s represented as (index, None)

Return type:

dict

Raises:

ValueError – If the input string is not in the correct format.

Example

>>> parse_slices_string("oren[:50]+jake[24:]+test+oren[5:10]")
{'oren': [(None, 50), (5, 10)], 'jake': [(24, None)], 'test': [(None, None)]}
unitxt.split_utils.random_mix_generator(new_stream_name, new_stream_sources, stream_routing, input_streams)[source]
unitxt.split_utils.random_mix_streams(input_streams, mapping)[source]

Creates new streams based on the provided input streams and mapping.

The create_streams function generates new streams by selectively including items from the old streams based on the specified mapping. Each item will be included in at most one new stream, as defined by the probabilities in the mapping and stream routing.

Parameters:
  • input_streams (dict) – A dictionary containing the input streams, where each key is the name of the stream and the value is an iterable or generator representing the stream.

  • mapping (dict) – A dictionary specifying the mapping of old streams to new streams and their respective probabilities.

Returns:

A dictionary containing the generated new streams, where each key is the name

of the new stream and the value is a generator representing the stream.

Return type:

dict

Example

>>> input_streams = {
        'my_old_stream1': gen1(),
        'my_old_stream2': gen2(),
    }
    mapping = {
        'my_new_stream': {
            'my_old_stream1': 0.6,
            'my_old_stream2': 0.2
        },
        'my_new_stream2': {
            'my_old_stream1': 0.4,
            'my_old_stream2': 0.8
        }
    }
    new_streams = create_streams(input_streams, mapping)
    for new_stream_name, new_stream in new_streams.items():
        logger.info(f"{new_stream_name}:")
        for _, item in zip(range(10), new_stream):
            logger.info(item)
unitxt.split_utils.rename_split(input_streams: Dict[str, Stream], mapping: Dict[str, str])[source]

Renames the streams.

Parameters:
  • input_streams (dict) – A dictionary containing the input streams, where each key is the name of the stream and the value is an iterable or generator representing the stream.

  • mapping (dict) – A dictionary specifying the mapping of old streams to new streams.

Returns:

A dictionary containing the generated new streams, where each key is the name

Return type:

dict

of the new stream and the value is a generator representing the stream.

unitxt.split_utils.slice_stream(stream, start, end)[source]
unitxt.split_utils.slice_streams(input_streams, mapping)[source]

Slices multiple input streams according to a mapping and chains the results together.

Parameters:
  • input_streams (dict) – A dictionary where the keys are the names of the input streams and the values are the input streams themselves.

  • mapping (dict) – A dictionary where the keys are the names of the new streams and the values are dictionaries mapping old stream names to lists of tuples representing slices.

Returns:

A dictionary where the keys are the names of the new streams and the values are

the new streams, which consist of parts of the old streams chained together.

Return type:

dict

Raises:

ValueError – If a stream is supposed to be sliced at an index greater than its length or a negative one.

Example

>>> old_streams = {"train": [1, 2, 3, 4, 5, 6, 7, 8, 9], "test": [10, 11, 12, 13, 14]}
>>> mapping = {"new_train": {"train": [(None, 5), (7, 9)]}, "new_test": {"test": [(2, None)]}}
>>> slice_streams(old_streams, mapping)
{"new_train": [1, 2, 3, 4, 5, 8, 9], "new_test": [12, 13, 14]}