Skip to content

base

mindnlp.transformers.pipelines.base.CsvPipelineDataFormat

Bases: PipelineDataFormat

Support for pipelines using CSV data format.

PARAMETER DESCRIPTION
output_path

Where to save the outgoing data.

TYPE: `str`

input_path

Where to look for the input data.

TYPE: `str`

column

The column to read.

TYPE: `str`

overwrite

Whether or not to overwrite the output_path.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

Source code in mindnlp/transformers/pipelines/base.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
class CsvPipelineDataFormat(PipelineDataFormat):
    """
    Support for pipelines using CSV data format.

    Args:
        output_path (`str`): Where to save the outgoing data.
        input_path (`str`): Where to look for the input data.
        column (`str`): The column to read.
        overwrite (`bool`, *optional*, defaults to `False`):
            Whether or not to overwrite the `output_path`.
    """
    def __init__(
        self,
        output_path: Optional[str],
        input_path: Optional[str],
        column: Optional[str],
        overwrite=False,
    ):
        """
        Initializes an instance of the CsvPipelineDataFormat class.

        Args:
            output_path (Optional[str]): The path to the output file. If specified, the processed data will be written to this file.
            input_path (Optional[str]): The path to the input file. If specified, the data will be read from this file.
            column (Optional[str]): The name of the column to process. If specified, only the data in this column will be processed.
            overwrite (bool, optional): Indicates whether the output file should be overwritten if it already exists. Defaults to False.

        Returns:
            None.

        Raises:
            None:
                However, this method may raise exceptions if the input or output file paths are invalid or
                if there are any issues during the data processing.

        Note:
            - The 'output_path', 'input_path', and 'column' parameters are optional. They can be left empty or set to
            None if not required.
            - The 'overwrite' parameter is optional and defaults to False.
        """
        super().__init__(output_path, input_path, column, overwrite=overwrite)

    def __iter__(self):
        """
        Iterates over the rows of a CSV file and yields the specified columns as a dictionary.

        Args:
            self: An instance of the CsvPipelineDataFormat class.

        Returns:
            None.

        Raises:
            FileNotFoundError: If the specified input file path does not exist.
            csv.Error: If there are issues with reading the CSV file.
            IndexError: If the column index is out of range.
            KeyError: If the column key is not found in the row dictionary.
            TypeError: If the column parameter is not a valid type.
            ValueError: If the column parameter is not properly formatted.

        Note:
            - The CSV file is read using the 'r' mode.
            - The CSV file is expected to have a header row.
            - If self.is_multi_columns is True, the method yields a dictionary with keys from the specified column list
            and values from the corresponding columns in the CSV file.
            - If self.is_multi_columns is False, the method yields the value from the specified column index in each row.

        Example:
            ```python
            >>> data_format = CsvPipelineDataFormat()
            >>> data_format.input_path = 'data.csv'
            >>> data_format.is_multi_columns = True
            >>> for row in data_format:
            >>>     print(row)
            ...
            >>> Output:
            >>> {'col1': 'value1', 'col2': 'value2'}
            >>> {'col1': 'value3', 'col2': 'value4'}
            ...
            ```
        """
        with open(self.input_path, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                if self.is_multi_columns:
                    yield {k: row[c] for k, c in self.column}
                else:
                    yield row[self.column[0]]

    def save(self, data: List[dict]):
        """
        Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`].

        Args:
            data (`List[dict]`): The data to store.
        """
        with open(self.output_path, "w") as f:
            if len(data) > 0:
                writer = csv.DictWriter(f, list(data[0].keys()))
                writer.writeheader()
                writer.writerows(data)

mindnlp.transformers.pipelines.base.CsvPipelineDataFormat.__init__(output_path, input_path, column, overwrite=False)

Initializes an instance of the CsvPipelineDataFormat class.

PARAMETER DESCRIPTION
output_path

The path to the output file. If specified, the processed data will be written to this file.

TYPE: Optional[str]

input_path

The path to the input file. If specified, the data will be read from this file.

TYPE: Optional[str]

column

The name of the column to process. If specified, only the data in this column will be processed.

TYPE: Optional[str]

overwrite

Indicates whether the output file should be overwritten if it already exists. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION

None.

RAISES DESCRIPTION
None

However, this method may raise exceptions if the input or output file paths are invalid or if there are any issues during the data processing.

Note
  • The 'output_path', 'input_path', and 'column' parameters are optional. They can be left empty or set to None if not required.
  • The 'overwrite' parameter is optional and defaults to False.
Source code in mindnlp/transformers/pipelines/base.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def __init__(
    self,
    output_path: Optional[str],
    input_path: Optional[str],
    column: Optional[str],
    overwrite=False,
):
    """
    Initializes an instance of the CsvPipelineDataFormat class.

    Args:
        output_path (Optional[str]): The path to the output file. If specified, the processed data will be written to this file.
        input_path (Optional[str]): The path to the input file. If specified, the data will be read from this file.
        column (Optional[str]): The name of the column to process. If specified, only the data in this column will be processed.
        overwrite (bool, optional): Indicates whether the output file should be overwritten if it already exists. Defaults to False.

    Returns:
        None.

    Raises:
        None:
            However, this method may raise exceptions if the input or output file paths are invalid or
            if there are any issues during the data processing.

    Note:
        - The 'output_path', 'input_path', and 'column' parameters are optional. They can be left empty or set to
        None if not required.
        - The 'overwrite' parameter is optional and defaults to False.
    """
    super().__init__(output_path, input_path, column, overwrite=overwrite)

mindnlp.transformers.pipelines.base.CsvPipelineDataFormat.__iter__()

Iterates over the rows of a CSV file and yields the specified columns as a dictionary.

PARAMETER DESCRIPTION
self

An instance of the CsvPipelineDataFormat class.

RETURNS DESCRIPTION

None.

RAISES DESCRIPTION
FileNotFoundError

If the specified input file path does not exist.

Error

If there are issues with reading the CSV file.

IndexError

If the column index is out of range.

KeyError

If the column key is not found in the row dictionary.

TypeError

If the column parameter is not a valid type.

ValueError

If the column parameter is not properly formatted.

Note
  • The CSV file is read using the 'r' mode.
  • The CSV file is expected to have a header row.
  • If self.is_multi_columns is True, the method yields a dictionary with keys from the specified column list and values from the corresponding columns in the CSV file.
  • If self.is_multi_columns is False, the method yields the value from the specified column index in each row.
Example
>>> data_format = CsvPipelineDataFormat()
>>> data_format.input_path = 'data.csv'
>>> data_format.is_multi_columns = True
>>> for row in data_format:
>>>     print(row)
...
>>> Output:
>>> {'col1': 'value1', 'col2': 'value2'}
>>> {'col1': 'value3', 'col2': 'value4'}
...
Source code in mindnlp/transformers/pipelines/base.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def __iter__(self):
    """
    Iterates over the rows of a CSV file and yields the specified columns as a dictionary.

    Args:
        self: An instance of the CsvPipelineDataFormat class.

    Returns:
        None.

    Raises:
        FileNotFoundError: If the specified input file path does not exist.
        csv.Error: If there are issues with reading the CSV file.
        IndexError: If the column index is out of range.
        KeyError: If the column key is not found in the row dictionary.
        TypeError: If the column parameter is not a valid type.
        ValueError: If the column parameter is not properly formatted.

    Note:
        - The CSV file is read using the 'r' mode.
        - The CSV file is expected to have a header row.
        - If self.is_multi_columns is True, the method yields a dictionary with keys from the specified column list
        and values from the corresponding columns in the CSV file.
        - If self.is_multi_columns is False, the method yields the value from the specified column index in each row.

    Example:
        ```python
        >>> data_format = CsvPipelineDataFormat()
        >>> data_format.input_path = 'data.csv'
        >>> data_format.is_multi_columns = True
        >>> for row in data_format:
        >>>     print(row)
        ...
        >>> Output:
        >>> {'col1': 'value1', 'col2': 'value2'}
        >>> {'col1': 'value3', 'col2': 'value4'}
        ...
        ```
    """
    with open(self.input_path, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if self.is_multi_columns:
                yield {k: row[c] for k, c in self.column}
            else:
                yield row[self.column[0]]

mindnlp.transformers.pipelines.base.CsvPipelineDataFormat.save(data)

Save the provided data object with the representation for the current [~pipelines.PipelineDataFormat].

PARAMETER DESCRIPTION
data

The data to store.

TYPE: `List[dict]`

Source code in mindnlp/transformers/pipelines/base.py
658
659
660
661
662
663
664
665
666
667
668
669
def save(self, data: List[dict]):
    """
    Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`].

    Args:
        data (`List[dict]`): The data to store.
    """
    with open(self.output_path, "w") as f:
        if len(data) > 0:
            writer = csv.DictWriter(f, list(data[0].keys()))
            writer.writeheader()
            writer.writerows(data)

mindnlp.transformers.pipelines.base.JsonPipelineDataFormat

Bases: PipelineDataFormat

Support for pipelines using JSON file format.

PARAMETER DESCRIPTION
output_path

Where to save the outgoing data.

TYPE: `str`

input_path

Where to look for the input data.

TYPE: `str`

column

The column to read.

TYPE: `str`

overwrite

Whether or not to overwrite the output_path.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

Source code in mindnlp/transformers/pipelines/base.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
class JsonPipelineDataFormat(PipelineDataFormat):
    """
    Support for pipelines using JSON file format.

    Args:
        output_path (`str`): Where to save the outgoing data.
        input_path (`str`): Where to look for the input data.
        column (`str`): The column to read.
        overwrite (`bool`, *optional*, defaults to `False`):
            Whether or not to overwrite the `output_path`.
    """
    def __init__(
        self,
        output_path: Optional[str],
        input_path: Optional[str],
        column: Optional[str],
        overwrite=False,
    ):
        """
        Initializes a JsonPipelineDataFormat object.

        Args:
            self: The instance of the class.
            output_path (Optional[str]): The path to the output file where the processed data will be saved.
            input_path (Optional[str]): The path to the input file containing the data to be processed.
            column (Optional[str]): The column in the input data to be processed.
            overwrite (bool): Indicates whether to overwrite the existing output file if it already exists.
                Default is False.

        Returns:
            None.

        Raises:
            FileNotFoundError: If the input file specified by 'input_path' does not exist.
            json.JSONDecodeError: If the input file does not contain valid JSON data.
            IOError: If there is an issue with reading the input file.
        """
        super().__init__(output_path, input_path, column, overwrite=overwrite)

        with open(input_path, "r") as f:
            self._entries = json.load(f)

    def __iter__(self):
        """
        Iterates over the entries of the JsonPipelineDataFormat object.

        Args:
            self (JsonPipelineDataFormat): The JsonPipelineDataFormat object itself.

        Returns:
            None

        Raises:
            None

        This method iterates over the entries stored in the JsonPipelineDataFormat object and yields each entry
        as a dictionary. If the JsonPipelineDataFormat object is configured with multiple columns, each yielded entry
        is a dictionary where the keys correspond to the column names and the values are the values of the respective
        columns for that entry. If the JsonPipelineDataFormat object is not configured with multiple columns,
        each yielded entry is a single value corresponding to the first column specified in the 'column' attribute of
        the JsonPipelineDataFormat object.
        """
        for entry in self._entries:
            if self.is_multi_columns:
                yield {k: entry[c] for k, c in self.column}
            else:
                yield entry[self.column[0]]

    def save(self, data: dict):
        """
        Save the provided data object in a json file.

        Args:
            data (`dict`): The data to store.
        """
        with open(self.output_path, "w") as f:
            json.dump(data, f)

mindnlp.transformers.pipelines.base.JsonPipelineDataFormat.__init__(output_path, input_path, column, overwrite=False)

Initializes a JsonPipelineDataFormat object.

PARAMETER DESCRIPTION
self

The instance of the class.

output_path

The path to the output file where the processed data will be saved.

TYPE: Optional[str]

input_path

The path to the input file containing the data to be processed.

TYPE: Optional[str]

column

The column in the input data to be processed.

TYPE: Optional[str]

overwrite

Indicates whether to overwrite the existing output file if it already exists. Default is False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION

None.

RAISES DESCRIPTION
FileNotFoundError

If the input file specified by 'input_path' does not exist.

JSONDecodeError

If the input file does not contain valid JSON data.

IOError

If there is an issue with reading the input file.

Source code in mindnlp/transformers/pipelines/base.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
def __init__(
    self,
    output_path: Optional[str],
    input_path: Optional[str],
    column: Optional[str],
    overwrite=False,
):
    """
    Initializes a JsonPipelineDataFormat object.

    Args:
        self: The instance of the class.
        output_path (Optional[str]): The path to the output file where the processed data will be saved.
        input_path (Optional[str]): The path to the input file containing the data to be processed.
        column (Optional[str]): The column in the input data to be processed.
        overwrite (bool): Indicates whether to overwrite the existing output file if it already exists.
            Default is False.

    Returns:
        None.

    Raises:
        FileNotFoundError: If the input file specified by 'input_path' does not exist.
        json.JSONDecodeError: If the input file does not contain valid JSON data.
        IOError: If there is an issue with reading the input file.
    """
    super().__init__(output_path, input_path, column, overwrite=overwrite)

    with open(input_path, "r") as f:
        self._entries = json.load(f)

mindnlp.transformers.pipelines.base.JsonPipelineDataFormat.__iter__()

Iterates over the entries of the JsonPipelineDataFormat object.

PARAMETER DESCRIPTION
self

The JsonPipelineDataFormat object itself.

TYPE: JsonPipelineDataFormat

RETURNS DESCRIPTION

None

This method iterates over the entries stored in the JsonPipelineDataFormat object and yields each entry as a dictionary. If the JsonPipelineDataFormat object is configured with multiple columns, each yielded entry is a dictionary where the keys correspond to the column names and the values are the values of the respective columns for that entry. If the JsonPipelineDataFormat object is not configured with multiple columns, each yielded entry is a single value corresponding to the first column specified in the 'column' attribute of the JsonPipelineDataFormat object.

Source code in mindnlp/transformers/pipelines/base.py
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
def __iter__(self):
    """
    Iterates over the entries of the JsonPipelineDataFormat object.

    Args:
        self (JsonPipelineDataFormat): The JsonPipelineDataFormat object itself.

    Returns:
        None

    Raises:
        None

    This method iterates over the entries stored in the JsonPipelineDataFormat object and yields each entry
    as a dictionary. If the JsonPipelineDataFormat object is configured with multiple columns, each yielded entry
    is a dictionary where the keys correspond to the column names and the values are the values of the respective
    columns for that entry. If the JsonPipelineDataFormat object is not configured with multiple columns,
    each yielded entry is a single value corresponding to the first column specified in the 'column' attribute of
    the JsonPipelineDataFormat object.
    """
    for entry in self._entries:
        if self.is_multi_columns:
            yield {k: entry[c] for k, c in self.column}
        else:
            yield entry[self.column[0]]

mindnlp.transformers.pipelines.base.JsonPipelineDataFormat.save(data)

Save the provided data object in a json file.

PARAMETER DESCRIPTION
data

The data to store.

TYPE: `dict`

Source code in mindnlp/transformers/pipelines/base.py
740
741
742
743
744
745
746
747
748
def save(self, data: dict):
    """
    Save the provided data object in a json file.

    Args:
        data (`dict`): The data to store.
    """
    with open(self.output_path, "w") as f:
        json.dump(data, f)

mindnlp.transformers.pipelines.base.PipedPipelineDataFormat

Bases: PipelineDataFormat

Read data from piped input to the python process. For multi columns data, columns should separated by

If columns are provided, then the output will be a dictionary with {column_x: value_x}

PARAMETER DESCRIPTION
output_path

Where to save the outgoing data.

TYPE: `str`

input_path

Where to look for the input data.

TYPE: `str`

column

The column to read.

TYPE: `str`

overwrite

Whether or not to overwrite the output_path.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

Source code in mindnlp/transformers/pipelines/base.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
class PipedPipelineDataFormat(PipelineDataFormat):
    """
    Read data from piped input to the python process. For multi columns data, columns should separated by \t

    If columns are provided, then the output will be a dictionary with {column_x: value_x}

    Args:
        output_path (`str`): Where to save the outgoing data.
        input_path (`str`): Where to look for the input data.
        column (`str`): The column to read.
        overwrite (`bool`, *optional*, defaults to `False`):
            Whether or not to overwrite the `output_path`.
    """
    def __iter__(self):
        '''
        Iterates over input lines from the standard input and yields formatted data.

        Args:
            self (PipedPipelineDataFormat): An instance of the PipedPipelineDataFormat class.

        Returns:
            None

        Raises:
            None

        Description:
            This method is used to iterate over input lines read from the standard input.
            Each line is checked for the presence of a tab character ('\t').
            If a tab character is found, the line is split using the tab character as the delimiter.
            If the PipedPipelineDataFormat instance has a defined column attribute,
            a dictionary is yielded containing key-value pairs where the keys are the column names
            and the values are extracted from the corresponding line elements.
            If the column attribute is not defined, a tuple containing the line elements is yielded.
            If a line does not contain a tab character, the entire line is yielded as is.
        '''
        for line in sys.stdin:
            # Split for multi-columns
            if "\t" in line:
                line = line.split("\t")
                if self.column:
                    # Dictionary to map arguments
                    yield {kwargs: l for (kwargs, _), l in zip(self.column, line)}
                else:
                    yield tuple(line)

            # No dictionary to map arguments
            else:
                yield line

    def save(self, data: dict):
        """
        Print the data.

        Args:
            data (`dict`): The data to store.
        """
        print(data)

    def save_binary(self, data: Union[dict, List[dict]]) -> str:
        """
        Save binary data to an output file path.

        Args:
            self (PipedPipelineDataFormat): An instance of the PipedPipelineDataFormat class.
            data (Union[dict, List[dict]]): The binary data to be saved. It can be either a single dictionary or
                a list of dictionaries.

        Returns:
            str: The output file path where the binary data was saved.

        Raises:
            KeyError: If the `output_path` attribute of `self` is None, indicating that an output file path is required
                when using piped input on pipeline outputting large objects. The error message will
                prompt the user to provide the output path through the `--output` argument.

        """
        if self.output_path is None:
            raise KeyError(
                "When using piped input on pipeline outputting large object requires an output file path. "
                "Please provide such output path through --output argument."
            )

        return super().save_binary(data)

mindnlp.transformers.pipelines.base.PipedPipelineDataFormat.__iter__()

Iterates over input lines from the standard input and yields formatted data.

PARAMETER DESCRIPTION
self

An instance of the PipedPipelineDataFormat class.

TYPE: PipedPipelineDataFormat

RETURNS DESCRIPTION

None

Description

This method is used to iterate over input lines read from the standard input. Each line is checked for the presence of a tab character (' '). If a tab character is found, the line is split using the tab character as the delimiter. If the PipedPipelineDataFormat instance has a defined column attribute, a dictionary is yielded containing key-value pairs where the keys are the column names and the values are extracted from the corresponding line elements. If the column attribute is not defined, a tuple containing the line elements is yielded. If a line does not contain a tab character, the entire line is yielded as is.

Source code in mindnlp/transformers/pipelines/base.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
def __iter__(self):
    '''
    Iterates over input lines from the standard input and yields formatted data.

    Args:
        self (PipedPipelineDataFormat): An instance of the PipedPipelineDataFormat class.

    Returns:
        None

    Raises:
        None

    Description:
        This method is used to iterate over input lines read from the standard input.
        Each line is checked for the presence of a tab character ('\t').
        If a tab character is found, the line is split using the tab character as the delimiter.
        If the PipedPipelineDataFormat instance has a defined column attribute,
        a dictionary is yielded containing key-value pairs where the keys are the column names
        and the values are extracted from the corresponding line elements.
        If the column attribute is not defined, a tuple containing the line elements is yielded.
        If a line does not contain a tab character, the entire line is yielded as is.
    '''
    for line in sys.stdin:
        # Split for multi-columns
        if "\t" in line:
            line = line.split("\t")
            if self.column:
                # Dictionary to map arguments
                yield {kwargs: l for (kwargs, _), l in zip(self.column, line)}
            else:
                yield tuple(line)

        # No dictionary to map arguments
        else:
            yield line

mindnlp.transformers.pipelines.base.PipedPipelineDataFormat.save(data)

Print the data.

PARAMETER DESCRIPTION
data

The data to store.

TYPE: `dict`

Source code in mindnlp/transformers/pipelines/base.py
801
802
803
804
805
806
807
808
def save(self, data: dict):
    """
    Print the data.

    Args:
        data (`dict`): The data to store.
    """
    print(data)

mindnlp.transformers.pipelines.base.PipedPipelineDataFormat.save_binary(data)

Save binary data to an output file path.

PARAMETER DESCRIPTION
self

An instance of the PipedPipelineDataFormat class.

TYPE: PipedPipelineDataFormat

data

The binary data to be saved. It can be either a single dictionary or a list of dictionaries.

TYPE: Union[dict, List[dict]]

RETURNS DESCRIPTION
str

The output file path where the binary data was saved.

TYPE: str

RAISES DESCRIPTION
KeyError

If the output_path attribute of self is None, indicating that an output file path is required when using piped input on pipeline outputting large objects. The error message will prompt the user to provide the output path through the --output argument.

Source code in mindnlp/transformers/pipelines/base.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
def save_binary(self, data: Union[dict, List[dict]]) -> str:
    """
    Save binary data to an output file path.

    Args:
        self (PipedPipelineDataFormat): An instance of the PipedPipelineDataFormat class.
        data (Union[dict, List[dict]]): The binary data to be saved. It can be either a single dictionary or
            a list of dictionaries.

    Returns:
        str: The output file path where the binary data was saved.

    Raises:
        KeyError: If the `output_path` attribute of `self` is None, indicating that an output file path is required
            when using piped input on pipeline outputting large objects. The error message will
            prompt the user to provide the output path through the `--output` argument.

    """
    if self.output_path is None:
        raise KeyError(
            "When using piped input on pipeline outputting large object requires an output file path. "
            "Please provide such output path through --output argument."
        )

    return super().save_binary(data)

mindnlp.transformers.pipelines.base.Pipeline

Bases: _ScikitCompat

The Pipeline class is the class from which all pipelines inherit. Refer to this class for methods shared across different pipelines.

Base class implementing pipelined operations. Pipeline workflow is defined as a sequence of the following operations:

Input -> Tokenization -> Model Inference -> Post-Processing (task dependent) -> Output

Pipeline supports running on CPU or GPU through the device argument (see below).

Some pipeline, like for instance [FeatureExtractionPipeline] ('feature-extraction') output large tensor object as nested-lists. In order to avoid dumping such large structure as textual data we provide the binary_output forwardor argument. If set to True, the output will be stored in the pickle format.

Source code in mindnlp/transformers/pipelines/base.py
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
class Pipeline(_ScikitCompat):
    """
    The Pipeline class is the class from which all pipelines inherit. Refer to this class for methods shared across
    different pipelines.

    Base class implementing pipelined operations. Pipeline workflow is defined as a sequence of the following
    operations:

    `Input -> Tokenization -> Model Inference -> Post-Processing (task dependent) -> Output`

    Pipeline supports running on CPU or GPU through the device argument (see below).

    Some pipeline, like for instance [`FeatureExtractionPipeline`] (`'feature-extraction'`) output large tensor object
    as nested-lists. In order to avoid dumping such large structure as textual data we provide the `binary_output`
    forwardor argument. If set to `True`, the output will be stored in the pickle format.
    """
    default_input_names = None

    def __init__(
        self,
        model: "PreTrainedModel",
        tokenizer: Optional[PreTrainedTokenizer] = None,
        feature_extractor: Optional[PreTrainedFeatureExtractor] = None,
        image_processor: Optional['BaseImageProcessor'] = None,
        modelcard: Optional['ModelCard'] = None,
        task: str = "",
        ms_dtype: Optional[Union[str, "mindspore.common.dtype.Dtype"]] = None,
        binary_output: bool = False,
        **kwargs,
    ):
        """
        Initializes a new instance of the Pipeline class.

        Args:
            model (PreTrainedModel): The pre-trained model to be used in the pipeline.
            tokenizer (Optional[PreTrainedTokenizer]): An optional pre-trained tokenizer for processing input data.
            feature_extractor (Optional[PreTrainedFeatureExtractor]):
                An optional feature extractor for extracting features from the input data.
            image_processor (Optional[BaseImageProcessor]): An optional image processor for handling image data.
            modelcard (Optional[ModelCard]): An optional model card containing information about the model.
            task (str): The task that the pipeline is designed to perform.
            ms_dtype (Optional[Union[str, mindspore.common.dtype.Dtype]]): An optional data type for MindSpore computations.
            binary_output (bool): A flag indicating whether the output should be binary.
            **kwargs: Additional keyword arguments for configuring the pipeline.

        Returns:
            None.

        Raises:
            None.
        """
        self.task = task
        self.model = model
        self.tokenizer = tokenizer
        self.feature_extractor = feature_extractor
        self.image_processor = image_processor
        self.modelcard = modelcard
        self.ms_dtype = ms_dtype
        self.binary_output = binary_output

        # Update config and generation_config with task specific parameters
        task_specific_params = self.model.config.task_specific_params
        if task_specific_params is not None and task in task_specific_params:
            self.model.config.update(task_specific_params.get(task))
            if self.model.can_generate():
                self.model.generation_config.update(**task_specific_params.get(task))

        self.call_count = 0
        self._batch_size = kwargs.pop("batch_size", None)
        self._num_workers = kwargs.pop("num_workers", None)
        self._preprocess_params, self._forward_params, self._postprocess_params = self._sanitize_parameters(**kwargs)

        # if self.image_processor is None and self.feature_extractor is not None:
        #     if isinstance(self.feature_extractor, BaseImageProcessor):
        #         # Backward compatible change, if users called
        #         # ImageSegmentationPipeline(.., feature_extractor=MyFeatureExtractor())
        #         # then we should keep working
        #         self.image_processor = self.feature_extractor

    def save_pretrained(self, save_directory: str, safe_serialization: bool = True):
        """
        Save the pipeline's model and tokenizer.

        Args:
            save_directory (`str`):
                A path to the directory where to saved. It will be created if it doesn't exist.
            safe_serialization (`str`):
                Whether to save the model using `safetensors` or the traditional way for PyTorch or Tensorflow.
        """
        if os.path.isfile(save_directory):
            logger.error(f"Provided path ({save_directory}) should be a directory, not a file")
            return
        os.makedirs(save_directory, exist_ok=True)

        self.model.save_pretrained(save_directory, safe_serialization=safe_serialization)

        if self.tokenizer is not None:
            self.tokenizer.save_pretrained(save_directory)

        if self.feature_extractor is not None:
            self.feature_extractor.save_pretrained(save_directory)

        if self.image_processor is not None:
            self.image_processor.save_pretrained(save_directory)

        if self.modelcard is not None:
            self.modelcard.save_pretrained(save_directory)

    def transform(self, X):
        """
        Scikit / Keras interface to transformers' pipelines. This method will forward to __call__().
        """
        return self(X)

    def predict(self, X):
        """
        Scikit / Keras interface to transformers' pipelines. This method will forward to __call__().
        """
        return self(X)

    def check_model_type(self, supported_models: Union[List[str], dict]):
        """
        Check if the model class is in supported by the pipeline.

        Args:
            supported_models (`List[str]` or `dict`):
                The list of models supported by the pipeline, or a dictionary with model class values.
        """
        if not isinstance(supported_models, list):  # Create from a model mapping
            supported_models_names = []
            for _, model_name in supported_models.items():
                # Mapping can now contain tuples of models for the same configuration.
                if isinstance(model_name, tuple):
                    supported_models_names.extend(list(model_name))
                else:
                    supported_models_names.append(model_name)
            if hasattr(supported_models, "_model_mapping"):
                for _, model in supported_models._model_mapping._extra_content.items():
                    if isinstance(model_name, tuple): # pylint: disable=undefined-loop-variable
                        supported_models_names.extend([m.__name__ for m in model])
                    else:
                        supported_models_names.append(model.__name__)
            supported_models = supported_models_names
        if self.model.__class__.__name__ not in supported_models:
            logger.error(
                f"The model '{self.model.__class__.__name__}' is not supported for {self.task}. Supported models are"
                f" {supported_models}."
            )

    @abstractmethod
    def _sanitize_parameters(self, **pipeline_parameters):
        """
        _sanitize_parameters will be called with any excessive named arguments from either `__init__` or `__call__`
        methods. It should return 3 dictionnaries of the resolved parameters used by the various `preprocess`,
        `forward` and `postprocess` methods. Do not fill dictionnaries if the caller didn't specify a kwargs. This
        let's you keep defaults in function signatures, which is more "natural".

        It is not meant to be called directly, it will be automatically called and the final parameters resolved by
        `__init__` and `__call__`
        """
        raise NotImplementedError("_sanitize_parameters not implemented")

    @abstractmethod
    def preprocess(self, input_: Any, **preprocess_parameters: Dict) -> Dict[str, GenericTensor]:
        """
        Preprocess will take the `input_` of a specific pipeline and return a dictionary of everything necessary for
        `_forward` to run properly. It should contain at least one tensor, but might have arbitrary other items.
        """
        raise NotImplementedError("preprocess not implemented")

    @abstractmethod
    def _forward(self, input_tensors: Dict[str, GenericTensor], **forward_parameters: Dict) -> ModelOutput:
        """
        _forward will receive the prepared dictionary from `preprocess` and run it on the model. This method might
        involve the GPU or the CPU and should be agnostic to it. Isolating this function is the reason for `preprocess`
        and `postprocess` to exist, so that the hot path, this method generally can run as fast as possible.

        It is not meant to be called directly, `forward` is preferred. It is basically the same but contains additional
        code surrounding `_forward` making sure tensors and models are on the same device, disabling the training part
        of the code (leading to faster inference).
        """
        raise NotImplementedError("_forward not implemented")

    @abstractmethod
    def postprocess(self, model_outputs: ModelOutput, **postprocess_parameters: Dict) -> Any:
        """
        Postprocess will receive the raw outputs of the `_forward` method, generally tensors, and reformat them into
        something more friendly. Generally it will output a list or a dict or results (containing just strings and
        numbers).
        """
        raise NotImplementedError("postprocess not implemented")

    def forward(self, model_inputs, **forward_params):
        """
        This method performs the forward pass of the pipeline model.

        Args:
            self (Pipeline): The instance of the Pipeline class.
            model_inputs: The inputs to the model for the forward pass.
                Type can vary depending on the model architecture and input requirements.

        Returns:
            None: This method returns None as it directly returns the model outputs.

        Raises:
            None:
                However, the _forward method it calls may raise exceptions based on the model's implementation.
        """
        model_outputs = self._forward(model_inputs, **forward_params)
        return model_outputs

    def __call__(self, inputs, *args, num_workers=None, batch_size=None, **kwargs):
        """
        Performs the main processing logic for the Pipeline class.

        Args:
            self (Pipeline): The instance of the Pipeline class.
            inputs: The input data for processing. It can be a Dataset, GeneratorType, or list.

        Returns:
            None: This method does not return any value.

        Raises:
            UserWarning:
                If the method is called more than 10 times,
                a warning is raised to prompt the user to use a dataset for efficiency.
        """
        if args:
            logger.warning(f"Ignoring args : {args}")

        if num_workers is None:
            if self._num_workers is None:
                num_workers = 0
            else:
                num_workers = self._num_workers
        if batch_size is None:
            if self._batch_size is None:
                batch_size = 1
            else:
                batch_size = self._batch_size

        preprocess_params, forward_params, postprocess_params = self._sanitize_parameters(**kwargs)
        # Fuse __init__ params and __call__ params without modifying the __init__ ones.
        preprocess_params = {**self._preprocess_params, **preprocess_params}
        forward_params = {**self._forward_params, **forward_params}
        postprocess_params = {**self._postprocess_params, **postprocess_params}

        self.call_count += 1
        if self.call_count > 10:
            warnings.warn(
                "You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a"
                " dataset",
                UserWarning,
            )

        is_dataset = isinstance(inputs, Dataset)
        is_generator = isinstance(inputs, types.GeneratorType)
        is_list = isinstance(inputs, list)

        is_iterable = is_dataset or is_generator or is_list

        if is_list:
            return self.run_multi(inputs, preprocess_params, forward_params, postprocess_params)
        elif is_iterable:
            return self.iterate(inputs, preprocess_params, forward_params, postprocess_params)
        else:
            return self.run_single(inputs, preprocess_params, forward_params, postprocess_params)

    def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params):
        """
        Method that runs a series of input items through the pipeline.

        Args:
            self (Pipeline): The instance of the Pipeline class.
            inputs (list): A list of input items to be processed by the pipeline.
            preprocess_params (dict): Parameters for preprocessing the input items.
            forward_params (dict): Parameters for the forward pass through the pipeline.
            postprocess_params (dict): Parameters for postprocessing the output items.

        Returns:
            None: This method does not return any value but processes the input items through the pipeline.

        Raises:
            None.
        """
        return [self.run_single(item, preprocess_params, forward_params, postprocess_params) for item in inputs]

    def run_single(self, inputs, preprocess_params, forward_params, postprocess_params):
        """
        This method 'run_single' is a member of the 'Pipeline' class and is responsible for executing a single run of the pipeline.

        Args:
            self (object): The instance of the Pipeline class.
            inputs (object): The input data to be processed by the pipeline.
            preprocess_params (dict): Parameters for the preprocessing step, used to configure the preprocessing behavior.
            forward_params (dict): Parameters for the forward step, used to configure the forward pass behavior.
            postprocess_params (dict): Parameters for the postprocessing step, used to configure the postprocessing behavior.

        Returns:
            None.

        Raises:
            Any exceptions that is:
                raised by the 'preprocess', 'forward', or 'postprocess' methods called within this method
                will be propagated to the caller.
        """
        model_inputs = self.preprocess(inputs, **preprocess_params)
        model_outputs = self.forward(model_inputs, **forward_params)
        outputs = self.postprocess(model_outputs, **postprocess_params)
        return outputs

    def iterate(self, inputs, preprocess_params, forward_params, postprocess_params):
        """
        Iterates through the input data and yields the result of running each input through the pipeline.

        Args:
            self (Pipeline): The instance of the Pipeline class.
            inputs (Union[Dataset, List[Any]]): The input data to iterate over.

                - If inputs is a Dataset object, it will be iterated over by creating a dictionary iterator.
                - If inputs is a list of inputs, each input will be iterated over individually.
            preprocess_params (Any): The parameters used for preprocessing the input data.
            forward_params (Any): The parameters used for the forward pass of the pipeline.
            postprocess_params (Any): The parameters used for postprocessing the output data.

        Returns:
            None.

        Raises:
            None.
        """
        # This function should become `get_iterator` again, this is a temporary
        # easy solution.
        if isinstance(inputs, Dataset):
            for input_ in inputs.create_dict_iterator(output_numpy=True):
                yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)
        else:
            for input_ in inputs:
                yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)

mindnlp.transformers.pipelines.base.Pipeline.__call__(inputs, *args, num_workers=None, batch_size=None, **kwargs)

Performs the main processing logic for the Pipeline class.

PARAMETER DESCRIPTION
self

The instance of the Pipeline class.

TYPE: Pipeline

inputs

The input data for processing. It can be a Dataset, GeneratorType, or list.

RETURNS DESCRIPTION
None

This method does not return any value.

RAISES DESCRIPTION
UserWarning

If the method is called more than 10 times, a warning is raised to prompt the user to use a dataset for efficiency.

Source code in mindnlp/transformers/pipelines/base.py
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
def __call__(self, inputs, *args, num_workers=None, batch_size=None, **kwargs):
    """
    Performs the main processing logic for the Pipeline class.

    Args:
        self (Pipeline): The instance of the Pipeline class.
        inputs: The input data for processing. It can be a Dataset, GeneratorType, or list.

    Returns:
        None: This method does not return any value.

    Raises:
        UserWarning:
            If the method is called more than 10 times,
            a warning is raised to prompt the user to use a dataset for efficiency.
    """
    if args:
        logger.warning(f"Ignoring args : {args}")

    if num_workers is None:
        if self._num_workers is None:
            num_workers = 0
        else:
            num_workers = self._num_workers
    if batch_size is None:
        if self._batch_size is None:
            batch_size = 1
        else:
            batch_size = self._batch_size

    preprocess_params, forward_params, postprocess_params = self._sanitize_parameters(**kwargs)
    # Fuse __init__ params and __call__ params without modifying the __init__ ones.
    preprocess_params = {**self._preprocess_params, **preprocess_params}
    forward_params = {**self._forward_params, **forward_params}
    postprocess_params = {**self._postprocess_params, **postprocess_params}

    self.call_count += 1
    if self.call_count > 10:
        warnings.warn(
            "You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a"
            " dataset",
            UserWarning,
        )

    is_dataset = isinstance(inputs, Dataset)
    is_generator = isinstance(inputs, types.GeneratorType)
    is_list = isinstance(inputs, list)

    is_iterable = is_dataset or is_generator or is_list

    if is_list:
        return self.run_multi(inputs, preprocess_params, forward_params, postprocess_params)
    elif is_iterable:
        return self.iterate(inputs, preprocess_params, forward_params, postprocess_params)
    else:
        return self.run_single(inputs, preprocess_params, forward_params, postprocess_params)

mindnlp.transformers.pipelines.base.Pipeline.__init__(model, tokenizer=None, feature_extractor=None, image_processor=None, modelcard=None, task='', ms_dtype=None, binary_output=False, **kwargs)

Initializes a new instance of the Pipeline class.

PARAMETER DESCRIPTION
model

The pre-trained model to be used in the pipeline.

TYPE: PreTrainedModel

tokenizer

An optional pre-trained tokenizer for processing input data.

TYPE: Optional[PreTrainedTokenizer] DEFAULT: None

feature_extractor

An optional feature extractor for extracting features from the input data.

TYPE: Optional[PreTrainedFeatureExtractor] DEFAULT: None

image_processor

An optional image processor for handling image data.

TYPE: Optional[BaseImageProcessor] DEFAULT: None

modelcard

An optional model card containing information about the model.

TYPE: Optional[ModelCard] DEFAULT: None

task

The task that the pipeline is designed to perform.

TYPE: str DEFAULT: ''

ms_dtype

An optional data type for MindSpore computations.

TYPE: Optional[Union[str, Dtype]] DEFAULT: None

binary_output

A flag indicating whether the output should be binary.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments for configuring the pipeline.

DEFAULT: {}

RETURNS DESCRIPTION

None.

Source code in mindnlp/transformers/pipelines/base.py
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
def __init__(
    self,
    model: "PreTrainedModel",
    tokenizer: Optional[PreTrainedTokenizer] = None,
    feature_extractor: Optional[PreTrainedFeatureExtractor] = None,
    image_processor: Optional['BaseImageProcessor'] = None,
    modelcard: Optional['ModelCard'] = None,
    task: str = "",
    ms_dtype: Optional[Union[str, "mindspore.common.dtype.Dtype"]] = None,
    binary_output: bool = False,
    **kwargs,
):
    """
    Initializes a new instance of the Pipeline class.

    Args:
        model (PreTrainedModel): The pre-trained model to be used in the pipeline.
        tokenizer (Optional[PreTrainedTokenizer]): An optional pre-trained tokenizer for processing input data.
        feature_extractor (Optional[PreTrainedFeatureExtractor]):
            An optional feature extractor for extracting features from the input data.
        image_processor (Optional[BaseImageProcessor]): An optional image processor for handling image data.
        modelcard (Optional[ModelCard]): An optional model card containing information about the model.
        task (str): The task that the pipeline is designed to perform.
        ms_dtype (Optional[Union[str, mindspore.common.dtype.Dtype]]): An optional data type for MindSpore computations.
        binary_output (bool): A flag indicating whether the output should be binary.
        **kwargs: Additional keyword arguments for configuring the pipeline.

    Returns:
        None.

    Raises:
        None.
    """
    self.task = task
    self.model = model
    self.tokenizer = tokenizer
    self.feature_extractor = feature_extractor
    self.image_processor = image_processor
    self.modelcard = modelcard
    self.ms_dtype = ms_dtype
    self.binary_output = binary_output

    # Update config and generation_config with task specific parameters
    task_specific_params = self.model.config.task_specific_params
    if task_specific_params is not None and task in task_specific_params:
        self.model.config.update(task_specific_params.get(task))
        if self.model.can_generate():
            self.model.generation_config.update(**task_specific_params.get(task))

    self.call_count = 0
    self._batch_size = kwargs.pop("batch_size", None)
    self._num_workers = kwargs.pop("num_workers", None)
    self._preprocess_params, self._forward_params, self._postprocess_params = self._sanitize_parameters(**kwargs)

mindnlp.transformers.pipelines.base.Pipeline.check_model_type(supported_models)

Check if the model class is in supported by the pipeline.

PARAMETER DESCRIPTION
supported_models

The list of models supported by the pipeline, or a dictionary with model class values.

TYPE: `List[str]` or `dict`

Source code in mindnlp/transformers/pipelines/base.py
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
def check_model_type(self, supported_models: Union[List[str], dict]):
    """
    Check if the model class is in supported by the pipeline.

    Args:
        supported_models (`List[str]` or `dict`):
            The list of models supported by the pipeline, or a dictionary with model class values.
    """
    if not isinstance(supported_models, list):  # Create from a model mapping
        supported_models_names = []
        for _, model_name in supported_models.items():
            # Mapping can now contain tuples of models for the same configuration.
            if isinstance(model_name, tuple):
                supported_models_names.extend(list(model_name))
            else:
                supported_models_names.append(model_name)
        if hasattr(supported_models, "_model_mapping"):
            for _, model in supported_models._model_mapping._extra_content.items():
                if isinstance(model_name, tuple): # pylint: disable=undefined-loop-variable
                    supported_models_names.extend([m.__name__ for m in model])
                else:
                    supported_models_names.append(model.__name__)
        supported_models = supported_models_names
    if self.model.__class__.__name__ not in supported_models:
        logger.error(
            f"The model '{self.model.__class__.__name__}' is not supported for {self.task}. Supported models are"
            f" {supported_models}."
        )

mindnlp.transformers.pipelines.base.Pipeline.forward(model_inputs, **forward_params)

This method performs the forward pass of the pipeline model.

PARAMETER DESCRIPTION
self

The instance of the Pipeline class.

TYPE: Pipeline

model_inputs

The inputs to the model for the forward pass. Type can vary depending on the model architecture and input requirements.

RETURNS DESCRIPTION
None

This method returns None as it directly returns the model outputs.

RAISES DESCRIPTION
None

However, the _forward method it calls may raise exceptions based on the model's implementation.

Source code in mindnlp/transformers/pipelines/base.py
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
def forward(self, model_inputs, **forward_params):
    """
    This method performs the forward pass of the pipeline model.

    Args:
        self (Pipeline): The instance of the Pipeline class.
        model_inputs: The inputs to the model for the forward pass.
            Type can vary depending on the model architecture and input requirements.

    Returns:
        None: This method returns None as it directly returns the model outputs.

    Raises:
        None:
            However, the _forward method it calls may raise exceptions based on the model's implementation.
    """
    model_outputs = self._forward(model_inputs, **forward_params)
    return model_outputs

mindnlp.transformers.pipelines.base.Pipeline.iterate(inputs, preprocess_params, forward_params, postprocess_params)

Iterates through the input data and yields the result of running each input through the pipeline.

PARAMETER DESCRIPTION
self

The instance of the Pipeline class.

TYPE: Pipeline

inputs

The input data to iterate over.

  • If inputs is a Dataset object, it will be iterated over by creating a dictionary iterator.
  • If inputs is a list of inputs, each input will be iterated over individually.

TYPE: Union[Dataset, List[Any]]

preprocess_params

The parameters used for preprocessing the input data.

TYPE: Any

forward_params

The parameters used for the forward pass of the pipeline.

TYPE: Any

postprocess_params

The parameters used for postprocessing the output data.

TYPE: Any

RETURNS DESCRIPTION

None.

Source code in mindnlp/transformers/pipelines/base.py
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
def iterate(self, inputs, preprocess_params, forward_params, postprocess_params):
    """
    Iterates through the input data and yields the result of running each input through the pipeline.

    Args:
        self (Pipeline): The instance of the Pipeline class.
        inputs (Union[Dataset, List[Any]]): The input data to iterate over.

            - If inputs is a Dataset object, it will be iterated over by creating a dictionary iterator.
            - If inputs is a list of inputs, each input will be iterated over individually.
        preprocess_params (Any): The parameters used for preprocessing the input data.
        forward_params (Any): The parameters used for the forward pass of the pipeline.
        postprocess_params (Any): The parameters used for postprocessing the output data.

    Returns:
        None.

    Raises:
        None.
    """
    # This function should become `get_iterator` again, this is a temporary
    # easy solution.
    if isinstance(inputs, Dataset):
        for input_ in inputs.create_dict_iterator(output_numpy=True):
            yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)
    else:
        for input_ in inputs:
            yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)

mindnlp.transformers.pipelines.base.Pipeline.postprocess(model_outputs, **postprocess_parameters) abstractmethod

Postprocess will receive the raw outputs of the _forward method, generally tensors, and reformat them into something more friendly. Generally it will output a list or a dict or results (containing just strings and numbers).

Source code in mindnlp/transformers/pipelines/base.py
1063
1064
1065
1066
1067
1068
1069
1070
@abstractmethod
def postprocess(self, model_outputs: ModelOutput, **postprocess_parameters: Dict) -> Any:
    """
    Postprocess will receive the raw outputs of the `_forward` method, generally tensors, and reformat them into
    something more friendly. Generally it will output a list or a dict or results (containing just strings and
    numbers).
    """
    raise NotImplementedError("postprocess not implemented")

mindnlp.transformers.pipelines.base.Pipeline.predict(X)

Scikit / Keras interface to transformers' pipelines. This method will forward to call().

Source code in mindnlp/transformers/pipelines/base.py
994
995
996
997
998
def predict(self, X):
    """
    Scikit / Keras interface to transformers' pipelines. This method will forward to __call__().
    """
    return self(X)

mindnlp.transformers.pipelines.base.Pipeline.preprocess(input_, **preprocess_parameters) abstractmethod

Preprocess will take the input_ of a specific pipeline and return a dictionary of everything necessary for _forward to run properly. It should contain at least one tensor, but might have arbitrary other items.

Source code in mindnlp/transformers/pipelines/base.py
1042
1043
1044
1045
1046
1047
1048
@abstractmethod
def preprocess(self, input_: Any, **preprocess_parameters: Dict) -> Dict[str, GenericTensor]:
    """
    Preprocess will take the `input_` of a specific pipeline and return a dictionary of everything necessary for
    `_forward` to run properly. It should contain at least one tensor, but might have arbitrary other items.
    """
    raise NotImplementedError("preprocess not implemented")

mindnlp.transformers.pipelines.base.Pipeline.run_multi(inputs, preprocess_params, forward_params, postprocess_params)

Method that runs a series of input items through the pipeline.

PARAMETER DESCRIPTION
self

The instance of the Pipeline class.

TYPE: Pipeline

inputs

A list of input items to be processed by the pipeline.

TYPE: list

preprocess_params

Parameters for preprocessing the input items.

TYPE: dict

forward_params

Parameters for the forward pass through the pipeline.

TYPE: dict

postprocess_params

Parameters for postprocessing the output items.

TYPE: dict

RETURNS DESCRIPTION
None

This method does not return any value but processes the input items through the pipeline.

Source code in mindnlp/transformers/pipelines/base.py
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params):
    """
    Method that runs a series of input items through the pipeline.

    Args:
        self (Pipeline): The instance of the Pipeline class.
        inputs (list): A list of input items to be processed by the pipeline.
        preprocess_params (dict): Parameters for preprocessing the input items.
        forward_params (dict): Parameters for the forward pass through the pipeline.
        postprocess_params (dict): Parameters for postprocessing the output items.

    Returns:
        None: This method does not return any value but processes the input items through the pipeline.

    Raises:
        None.
    """
    return [self.run_single(item, preprocess_params, forward_params, postprocess_params) for item in inputs]

mindnlp.transformers.pipelines.base.Pipeline.run_single(inputs, preprocess_params, forward_params, postprocess_params)

This method 'run_single' is a member of the 'Pipeline' class and is responsible for executing a single run of the pipeline.

PARAMETER DESCRIPTION
self

The instance of the Pipeline class.

TYPE: object

inputs

The input data to be processed by the pipeline.

TYPE: object

preprocess_params

Parameters for the preprocessing step, used to configure the preprocessing behavior.

TYPE: dict

forward_params

Parameters for the forward step, used to configure the forward pass behavior.

TYPE: dict

postprocess_params

Parameters for the postprocessing step, used to configure the postprocessing behavior.

TYPE: dict

RETURNS DESCRIPTION

None.

RAISES DESCRIPTION
Any exceptions that is

raised by the 'preprocess', 'forward', or 'postprocess' methods called within this method will be propagated to the caller.

Source code in mindnlp/transformers/pipelines/base.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params):
    """
    This method 'run_single' is a member of the 'Pipeline' class and is responsible for executing a single run of the pipeline.

    Args:
        self (object): The instance of the Pipeline class.
        inputs (object): The input data to be processed by the pipeline.
        preprocess_params (dict): Parameters for the preprocessing step, used to configure the preprocessing behavior.
        forward_params (dict): Parameters for the forward step, used to configure the forward pass behavior.
        postprocess_params (dict): Parameters for the postprocessing step, used to configure the postprocessing behavior.

    Returns:
        None.

    Raises:
        Any exceptions that is:
            raised by the 'preprocess', 'forward', or 'postprocess' methods called within this method
            will be propagated to the caller.
    """
    model_inputs = self.preprocess(inputs, **preprocess_params)
    model_outputs = self.forward(model_inputs, **forward_params)
    outputs = self.postprocess(model_outputs, **postprocess_params)
    return outputs

mindnlp.transformers.pipelines.base.Pipeline.save_pretrained(save_directory, safe_serialization=True)

Save the pipeline's model and tokenizer.

PARAMETER DESCRIPTION
save_directory

A path to the directory where to saved. It will be created if it doesn't exist.

TYPE: `str`

safe_serialization

Whether to save the model using safetensors or the traditional way for PyTorch or Tensorflow.

TYPE: `str` DEFAULT: True

Source code in mindnlp/transformers/pipelines/base.py
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def save_pretrained(self, save_directory: str, safe_serialization: bool = True):
    """
    Save the pipeline's model and tokenizer.

    Args:
        save_directory (`str`):
            A path to the directory where to saved. It will be created if it doesn't exist.
        safe_serialization (`str`):
            Whether to save the model using `safetensors` or the traditional way for PyTorch or Tensorflow.
    """
    if os.path.isfile(save_directory):
        logger.error(f"Provided path ({save_directory}) should be a directory, not a file")
        return
    os.makedirs(save_directory, exist_ok=True)

    self.model.save_pretrained(save_directory, safe_serialization=safe_serialization)

    if self.tokenizer is not None:
        self.tokenizer.save_pretrained(save_directory)

    if self.feature_extractor is not None:
        self.feature_extractor.save_pretrained(save_directory)

    if self.image_processor is not None:
        self.image_processor.save_pretrained(save_directory)

    if self.modelcard is not None:
        self.modelcard.save_pretrained(save_directory)

mindnlp.transformers.pipelines.base.Pipeline.transform(X)

Scikit / Keras interface to transformers' pipelines. This method will forward to call().

Source code in mindnlp/transformers/pipelines/base.py
988
989
990
991
992
def transform(self, X):
    """
    Scikit / Keras interface to transformers' pipelines. This method will forward to __call__().
    """
    return self(X)

mindnlp.transformers.pipelines.base.PipelineDataFormat

Base class for all the pipeline supported data format both for reading and writing. Supported data formats currently includes:

  • JSON
  • CSV
  • stdin/stdout (pipe)

PipelineDataFormat also includes some utilities to work with multi-columns like mapping from datasets columns to pipelines keyword arguments through the dataset_kwarg_1=dataset_column_1 format.

PARAMETER DESCRIPTION
output_path

Where to save the outgoing data.

TYPE: `str`

input_path

Where to look for the input data.

TYPE: `str`

column

The column to read.

TYPE: `str`

overwrite

Whether or not to overwrite the output_path.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

Source code in mindnlp/transformers/pipelines/base.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
class PipelineDataFormat:
    """
    Base class for all the pipeline supported data format both for reading and writing. Supported data formats
    currently includes:

    - JSON
    - CSV
    - stdin/stdout (pipe)

    `PipelineDataFormat` also includes some utilities to work with multi-columns like mapping from datasets columns to
    pipelines keyword arguments through the `dataset_kwarg_1=dataset_column_1` format.

    Args:
        output_path (`str`): Where to save the outgoing data.
        input_path (`str`): Where to look for the input data.
        column (`str`): The column to read.
        overwrite (`bool`, *optional*, defaults to `False`):
            Whether or not to overwrite the `output_path`.
    """
    SUPPORTED_FORMATS = ["json", "csv", "pipe"]

    def __init__(
        self,
        output_path: Optional[str],
        input_path: Optional[str],
        column: Optional[str],
        overwrite: bool = False,
    ):
        """Initializes an instance of the PipelineDataFormat class.

        Args:
            output_path (Optional[str]): The path to the output file. Defaults to None.
            input_path (Optional[str]): The path to the input file. Defaults to None.
            column (Optional[str]): The column(s) to use for data processing. Defaults to None.
                If multiple columns are provided, they should be comma-separated.
                Each column can be specified as 'name' or 'name=value' to map input and output columns.
            overwrite (bool, optional): Determines whether to overwrite the output file if it already exists.
                Defaults to False.

        Returns:
            None:
                This method does not return a value.

        Raises:
            OSError: If the output_path is provided and the overwrite parameter is False,
                and the output_path already exists on disk.
            OSError: If the input_path is provided and the input_path does not exist on disk.
        """
        self.output_path = output_path
        self.input_path = input_path
        self.column = column.split(",") if column is not None else [""]
        self.is_multi_columns = len(self.column) > 1

        if self.is_multi_columns:
            self.column = [tuple(c.split("=")) if "=" in c else (c, c) for c in self.column]

        if output_path is not None and not overwrite:
            if exists(abspath(self.output_path)):
                raise OSError(f"{self.output_path} already exists on disk")

        if input_path is not None:
            if not exists(abspath(self.input_path)):
                raise OSError(f"{self.input_path} doesnt exist on disk")

    @abstractmethod
    def __iter__(self):
        """
        This method '__iter__' in the class 'PipelineDataFormat' is used to define an iterator for instances of the class.

        Args:
            self: An instance of the 'PipelineDataFormat' class.

        Returns:
            None:
                This method does not return any value explicitly
                but is meant to be implemented by subclasses to return an iterator.

        Raises:
            NotImplementedError:
                This exception is raised if the method is not implemented by a subclass.
                It serves as a reminder for the subclass to implement its own iteration logic.
        """
        raise NotImplementedError()

    @abstractmethod
    def save(self, data: Union[dict, List[dict]]):
        """
        Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`].

        Args:
            data (`dict` or list of `dict`): The data to store.
        """
        raise NotImplementedError()

    def save_binary(self, data: Union[dict, List[dict]]) -> str:
        """
        Save the provided data object as a pickle-formatted binary data on the disk.

        Args:
            data (`dict` or list of `dict`): The data to store.

        Returns:
            `str`: Path where the data has been saved.
        """
        path, _ = os.path.splitext(self.output_path)
        binary_path = os.path.extsep.join((path, "pickle"))

        with open(binary_path, "wb+") as f_output:
            pickle.dump(data, f_output)

        return binary_path

    @staticmethod
    def from_str(
        format: str,
        output_path: Optional[str],
        input_path: Optional[str],
        column: Optional[str],
        overwrite=False,
    ) -> "PipelineDataFormat":
        """
        Creates an instance of the right subclass of [`~pipelines.PipelineDataFormat`] depending on `format`.

        Args:
            format (`str`):
                The format of the desired pipeline. Acceptable values are `"json"`, `"csv"` or `"pipe"`.
            output_path (`str`, *optional*):
                Where to save the outgoing data.
            input_path (`str`, *optional*):
                Where to look for the input data.
            column (`str`, *optional*):
                The column to read.
            overwrite (`bool`, *optional*, defaults to `False`):
                Whether or not to overwrite the `output_path`.

        Returns:
            [`~pipelines.PipelineDataFormat`]: The proper data format.
        """
        if format == "json":
            return JsonPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
        elif format == "csv":
            return CsvPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
        elif format == "pipe":
            return PipedPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
        else:
            raise KeyError(f"Unknown reader {format} (Available reader are json/csv/pipe)")

mindnlp.transformers.pipelines.base.PipelineDataFormat.__init__(output_path, input_path, column, overwrite=False)

Initializes an instance of the PipelineDataFormat class.

PARAMETER DESCRIPTION
output_path

The path to the output file. Defaults to None.

TYPE: Optional[str]

input_path

The path to the input file. Defaults to None.

TYPE: Optional[str]

column

The column(s) to use for data processing. Defaults to None. If multiple columns are provided, they should be comma-separated. Each column can be specified as 'name' or 'name=value' to map input and output columns.

TYPE: Optional[str]

overwrite

Determines whether to overwrite the output file if it already exists. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
None

This method does not return a value.

RAISES DESCRIPTION
OSError

If the output_path is provided and the overwrite parameter is False, and the output_path already exists on disk.

OSError

If the input_path is provided and the input_path does not exist on disk.

Source code in mindnlp/transformers/pipelines/base.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def __init__(
    self,
    output_path: Optional[str],
    input_path: Optional[str],
    column: Optional[str],
    overwrite: bool = False,
):
    """Initializes an instance of the PipelineDataFormat class.

    Args:
        output_path (Optional[str]): The path to the output file. Defaults to None.
        input_path (Optional[str]): The path to the input file. Defaults to None.
        column (Optional[str]): The column(s) to use for data processing. Defaults to None.
            If multiple columns are provided, they should be comma-separated.
            Each column can be specified as 'name' or 'name=value' to map input and output columns.
        overwrite (bool, optional): Determines whether to overwrite the output file if it already exists.
            Defaults to False.

    Returns:
        None:
            This method does not return a value.

    Raises:
        OSError: If the output_path is provided and the overwrite parameter is False,
            and the output_path already exists on disk.
        OSError: If the input_path is provided and the input_path does not exist on disk.
    """
    self.output_path = output_path
    self.input_path = input_path
    self.column = column.split(",") if column is not None else [""]
    self.is_multi_columns = len(self.column) > 1

    if self.is_multi_columns:
        self.column = [tuple(c.split("=")) if "=" in c else (c, c) for c in self.column]

    if output_path is not None and not overwrite:
        if exists(abspath(self.output_path)):
            raise OSError(f"{self.output_path} already exists on disk")

    if input_path is not None:
        if not exists(abspath(self.input_path)):
            raise OSError(f"{self.input_path} doesnt exist on disk")

mindnlp.transformers.pipelines.base.PipelineDataFormat.__iter__() abstractmethod

This method 'iter' in the class 'PipelineDataFormat' is used to define an iterator for instances of the class.

PARAMETER DESCRIPTION
self

An instance of the 'PipelineDataFormat' class.

RETURNS DESCRIPTION
None

This method does not return any value explicitly but is meant to be implemented by subclasses to return an iterator.

RAISES DESCRIPTION
NotImplementedError

This exception is raised if the method is not implemented by a subclass. It serves as a reminder for the subclass to implement its own iteration logic.

Source code in mindnlp/transformers/pipelines/base.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
@abstractmethod
def __iter__(self):
    """
    This method '__iter__' in the class 'PipelineDataFormat' is used to define an iterator for instances of the class.

    Args:
        self: An instance of the 'PipelineDataFormat' class.

    Returns:
        None:
            This method does not return any value explicitly
            but is meant to be implemented by subclasses to return an iterator.

    Raises:
        NotImplementedError:
            This exception is raised if the method is not implemented by a subclass.
            It serves as a reminder for the subclass to implement its own iteration logic.
    """
    raise NotImplementedError()

mindnlp.transformers.pipelines.base.PipelineDataFormat.from_str(format, output_path, input_path, column, overwrite=False) staticmethod

Creates an instance of the right subclass of [~pipelines.PipelineDataFormat] depending on format.

PARAMETER DESCRIPTION
format

The format of the desired pipeline. Acceptable values are "json", "csv" or "pipe".

TYPE: `str`

output_path

Where to save the outgoing data.

TYPE: `str`, *optional*

input_path

Where to look for the input data.

TYPE: `str`, *optional*

column

The column to read.

TYPE: `str`, *optional*

overwrite

Whether or not to overwrite the output_path.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

RETURNS DESCRIPTION
PipelineDataFormat

[~pipelines.PipelineDataFormat]: The proper data format.

Source code in mindnlp/transformers/pipelines/base.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
@staticmethod
def from_str(
    format: str,
    output_path: Optional[str],
    input_path: Optional[str],
    column: Optional[str],
    overwrite=False,
) -> "PipelineDataFormat":
    """
    Creates an instance of the right subclass of [`~pipelines.PipelineDataFormat`] depending on `format`.

    Args:
        format (`str`):
            The format of the desired pipeline. Acceptable values are `"json"`, `"csv"` or `"pipe"`.
        output_path (`str`, *optional*):
            Where to save the outgoing data.
        input_path (`str`, *optional*):
            Where to look for the input data.
        column (`str`, *optional*):
            The column to read.
        overwrite (`bool`, *optional*, defaults to `False`):
            Whether or not to overwrite the `output_path`.

    Returns:
        [`~pipelines.PipelineDataFormat`]: The proper data format.
    """
    if format == "json":
        return JsonPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
    elif format == "csv":
        return CsvPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
    elif format == "pipe":
        return PipedPipelineDataFormat(output_path, input_path, column, overwrite=overwrite)
    else:
        raise KeyError(f"Unknown reader {format} (Available reader are json/csv/pipe)")

mindnlp.transformers.pipelines.base.PipelineDataFormat.save(data) abstractmethod

Save the provided data object with the representation for the current [~pipelines.PipelineDataFormat].

PARAMETER DESCRIPTION
data

The data to store.

TYPE: `dict` or list of `dict`

Source code in mindnlp/transformers/pipelines/base.py
505
506
507
508
509
510
511
512
513
@abstractmethod
def save(self, data: Union[dict, List[dict]]):
    """
    Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`].

    Args:
        data (`dict` or list of `dict`): The data to store.
    """
    raise NotImplementedError()

mindnlp.transformers.pipelines.base.PipelineDataFormat.save_binary(data)

Save the provided data object as a pickle-formatted binary data on the disk.

PARAMETER DESCRIPTION
data

The data to store.

TYPE: `dict` or list of `dict`

RETURNS DESCRIPTION
str

str: Path where the data has been saved.

Source code in mindnlp/transformers/pipelines/base.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def save_binary(self, data: Union[dict, List[dict]]) -> str:
    """
    Save the provided data object as a pickle-formatted binary data on the disk.

    Args:
        data (`dict` or list of `dict`): The data to store.

    Returns:
        `str`: Path where the data has been saved.
    """
    path, _ = os.path.splitext(self.output_path)
    binary_path = os.path.extsep.join((path, "pickle"))

    with open(binary_path, "wb+") as f_output:
        pickle.dump(data, f_output)

    return binary_path