Skip to content

streamers

mindnlp.transformers.generation.streamers

streamers

mindnlp.transformers.generation.streamers.BaseStreamer

Base class from which .generate() streamers should inherit.

Source code in mindnlp/transformers/generation/streamers.py
25
26
27
28
29
30
31
32
33
34
35
class BaseStreamer:
    """
    Base class from which `.generate()` streamers should inherit.
    """
    def put(self, value):
        """Function that is called by `.generate()` to push new tokens"""
        raise NotImplementedError()

    def end(self):
        """Function that is called by `.generate()` to signal the end of generation"""
        raise NotImplementedError()

mindnlp.transformers.generation.streamers.BaseStreamer.end()

Function that is called by .generate() to signal the end of generation

Source code in mindnlp/transformers/generation/streamers.py
33
34
35
def end(self):
    """Function that is called by `.generate()` to signal the end of generation"""
    raise NotImplementedError()

mindnlp.transformers.generation.streamers.BaseStreamer.put(value)

Function that is called by .generate() to push new tokens

Source code in mindnlp/transformers/generation/streamers.py
29
30
31
def put(self, value):
    """Function that is called by `.generate()` to push new tokens"""
    raise NotImplementedError()

mindnlp.transformers.generation.streamers.TextIteratorStreamer

Bases: TextStreamer

Streamer that stores print-ready text in a queue, to be used by a downstream application as an iterator. This is useful for applications that benefit from acessing the generated text in a non-blocking way (e.g. in an interactive Gradio demo).

The API for the streamer classes is still under development and may change in the future.

PARAMETER DESCRIPTION
tokenizer

The tokenized used to decode the tokens.

TYPE: `AutoTokenizer`

skip_prompt

Whether to skip the prompt to .generate() or not. Useful e.g. for chatbots.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

timeout

The timeout for the text queue. If None, the queue will block indefinitely. Useful to handle exceptions in .generate(), when it is called in a separate thread.

TYPE: `float`, *optional* DEFAULT: None

decode_kwargs

Additional keyword arguments to pass to the tokenizer's decode method.

TYPE: `dict`, *optional* DEFAULT: {}

Example
>>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
>>> from threading import Thread
...
>>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
>>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
>>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
>>> streamer = TextIteratorStreamer(tok)
...
>>> # Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
>>> generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
>>> thread = Thread(target=model.generate, kwargs=generation_kwargs)
>>> thread.start()
>>> generated_text = ""
>>> for new_text in streamer:
...     generated_text += new_text
>>> generated_text
'An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,'
Source code in mindnlp/transformers/generation/streamers.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class TextIteratorStreamer(TextStreamer):
    """
    Streamer that stores print-ready text in a queue, to be used by a downstream application as an iterator. This is
    useful for applications that benefit from acessing the generated text in a non-blocking way (e.g. in an interactive
    Gradio demo).

    <Tip warning={true}>

    The API for the streamer classes is still under development and may change in the future.

    </Tip>

    Parameters:
        tokenizer (`AutoTokenizer`):
            The tokenized used to decode the tokens.
        skip_prompt (`bool`, *optional*, defaults to `False`):
            Whether to skip the prompt to `.generate()` or not. Useful e.g. for chatbots.
        timeout (`float`, *optional*):
            The timeout for the text queue. If `None`, the queue will block indefinitely. Useful to handle exceptions
            in `.generate()`, when it is called in a separate thread.
        decode_kwargs (`dict`, *optional*):
            Additional keyword arguments to pass to the tokenizer's `decode` method.

    Example:
        ```python
        >>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
        >>> from threading import Thread
        ...
        >>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
        >>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
        >>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
        >>> streamer = TextIteratorStreamer(tok)
        ...
        >>> # Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
        >>> generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
        >>> thread = Thread(target=model.generate, kwargs=generation_kwargs)
        >>> thread.start()
        >>> generated_text = ""
        >>> for new_text in streamer:
        ...     generated_text += new_text
        >>> generated_text
        'An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,'
        ```
    """
    def __init__(
        self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, timeout: Optional[float] = None, **decode_kwargs
    ):
        """
        Initializes an instance of the TextIteratorStreamer class.

        Args:
            self: The instance of the class itself.
            tokenizer (AutoTokenizer): An instance of the AutoTokenizer class used for tokenization.
            skip_prompt (bool): A flag indicating whether prompts should be skipped during iteration. Defaults to False.
            timeout (Optional[float]): An optional timeout value in seconds for waiting on text_queue. Defaults to None.
            **decode_kwargs: Additional keyword arguments for decoding.

        Returns:
            None

        Raises:
            None
        """
        super().__init__(tokenizer, skip_prompt, **decode_kwargs)
        self.text_queue = Queue()
        self.stop_signal = None
        self.timeout = timeout

    def on_finalized_text(self, text: str, stream_end: bool = False):
        """Put the new text in the queue. If the stream is ending, also put a stop signal in the queue."""
        self.text_queue.put(text, timeout=self.timeout)
        if stream_end:
            self.text_queue.put(self.stop_signal, timeout=self.timeout)

    def __iter__(self):
        """
        Docstring for method '__iter__' in the class 'TextIteratorStreamer'.

        Args:
            self (object): The instance of the class TextIteratorStreamer.
                This parameter is required to access the object's attributes and methods.

        Returns:
            None: This method returns None as it is meant to be an iterator and does not explicitly return a value.

        Raises:
            None.
        """
        return self

    def __next__(self):
        """
        Method to retrieve the next value from the text queue in the TextIteratorStreamer class.

        Args:
            self:
                An instance of the TextIteratorStreamer class.

                - Type: TextIteratorStreamer
                - Purpose: Represents the current instance of the TextIteratorStreamer class.
                - Restrictions: This parameter is automatically passed when the method is called.

        Returns:
            None: This method does not explicitly return a value. It retrieves the next value from the text queue
                and processes it accordingly within the context of the TextIteratorStreamer class.

        Raises:
            StopIteration: Raised when the retrieved value from the text queue is equal to the stop signal,
                indicating the end of iteration.
        """
        value = self.text_queue.get(timeout=self.timeout)
        if value == self.stop_signal:
            raise StopIteration()
        return value

mindnlp.transformers.generation.streamers.TextIteratorStreamer.__init__(tokenizer, skip_prompt=False, timeout=None, **decode_kwargs)

Initializes an instance of the TextIteratorStreamer class.

PARAMETER DESCRIPTION
self

The instance of the class itself.

tokenizer

An instance of the AutoTokenizer class used for tokenization.

TYPE: AutoTokenizer

skip_prompt

A flag indicating whether prompts should be skipped during iteration. Defaults to False.

TYPE: bool DEFAULT: False

timeout

An optional timeout value in seconds for waiting on text_queue. Defaults to None.

TYPE: Optional[float] DEFAULT: None

**decode_kwargs

Additional keyword arguments for decoding.

DEFAULT: {}

RETURNS DESCRIPTION

None

Source code in mindnlp/transformers/generation/streamers.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def __init__(
    self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, timeout: Optional[float] = None, **decode_kwargs
):
    """
    Initializes an instance of the TextIteratorStreamer class.

    Args:
        self: The instance of the class itself.
        tokenizer (AutoTokenizer): An instance of the AutoTokenizer class used for tokenization.
        skip_prompt (bool): A flag indicating whether prompts should be skipped during iteration. Defaults to False.
        timeout (Optional[float]): An optional timeout value in seconds for waiting on text_queue. Defaults to None.
        **decode_kwargs: Additional keyword arguments for decoding.

    Returns:
        None

    Raises:
        None
    """
    super().__init__(tokenizer, skip_prompt, **decode_kwargs)
    self.text_queue = Queue()
    self.stop_signal = None
    self.timeout = timeout

mindnlp.transformers.generation.streamers.TextIteratorStreamer.__iter__()

Docstring for method 'iter' in the class 'TextIteratorStreamer'.

PARAMETER DESCRIPTION
self

The instance of the class TextIteratorStreamer. This parameter is required to access the object's attributes and methods.

TYPE: object

RETURNS DESCRIPTION
None

This method returns None as it is meant to be an iterator and does not explicitly return a value.

Source code in mindnlp/transformers/generation/streamers.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def __iter__(self):
    """
    Docstring for method '__iter__' in the class 'TextIteratorStreamer'.

    Args:
        self (object): The instance of the class TextIteratorStreamer.
            This parameter is required to access the object's attributes and methods.

    Returns:
        None: This method returns None as it is meant to be an iterator and does not explicitly return a value.

    Raises:
        None.
    """
    return self

mindnlp.transformers.generation.streamers.TextIteratorStreamer.__next__()

Method to retrieve the next value from the text queue in the TextIteratorStreamer class.

PARAMETER DESCRIPTION
self

An instance of the TextIteratorStreamer class.

  • Type: TextIteratorStreamer
  • Purpose: Represents the current instance of the TextIteratorStreamer class.
  • Restrictions: This parameter is automatically passed when the method is called.

RETURNS DESCRIPTION
None

This method does not explicitly return a value. It retrieves the next value from the text queue and processes it accordingly within the context of the TextIteratorStreamer class.

RAISES DESCRIPTION
StopIteration

Raised when the retrieved value from the text queue is equal to the stop signal, indicating the end of iteration.

Source code in mindnlp/transformers/generation/streamers.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def __next__(self):
    """
    Method to retrieve the next value from the text queue in the TextIteratorStreamer class.

    Args:
        self:
            An instance of the TextIteratorStreamer class.

            - Type: TextIteratorStreamer
            - Purpose: Represents the current instance of the TextIteratorStreamer class.
            - Restrictions: This parameter is automatically passed when the method is called.

    Returns:
        None: This method does not explicitly return a value. It retrieves the next value from the text queue
            and processes it accordingly within the context of the TextIteratorStreamer class.

    Raises:
        StopIteration: Raised when the retrieved value from the text queue is equal to the stop signal,
            indicating the end of iteration.
    """
    value = self.text_queue.get(timeout=self.timeout)
    if value == self.stop_signal:
        raise StopIteration()
    return value

mindnlp.transformers.generation.streamers.TextIteratorStreamer.on_finalized_text(text, stream_end=False)

Put the new text in the queue. If the stream is ending, also put a stop signal in the queue.

Source code in mindnlp/transformers/generation/streamers.py
240
241
242
243
244
def on_finalized_text(self, text: str, stream_end: bool = False):
    """Put the new text in the queue. If the stream is ending, also put a stop signal in the queue."""
    self.text_queue.put(text, timeout=self.timeout)
    if stream_end:
        self.text_queue.put(self.stop_signal, timeout=self.timeout)

mindnlp.transformers.generation.streamers.TextStreamer

Bases: BaseStreamer

Simple text streamer that prints the token(s) to stdout as soon as entire words are formed.

The API for the streamer classes is still under development and may change in the future.

PARAMETER DESCRIPTION
tokenizer

The tokenized used to decode the tokens.

TYPE: `AutoTokenizer`

skip_prompt

Whether to skip the prompt to .generate() or not. Useful e.g. for chatbots.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

decode_kwargs

Additional keyword arguments to pass to the tokenizer's decode method.

TYPE: `dict`, *optional* DEFAULT: {}

Example
>>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
...
>>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
>>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
>>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
>>> streamer = TextStreamer(tok)
...
>>> # Despite returning the usual output, the streamer will also print the generated text to stdout.
>>> _ = model.generate(**inputs, streamer=streamer, max_new_tokens=20)
An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,
Source code in mindnlp/transformers/generation/streamers.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class TextStreamer(BaseStreamer):
    """
    Simple text streamer that prints the token(s) to stdout as soon as entire words are formed.

    <Tip warning={true}>

    The API for the streamer classes is still under development and may change in the future.

    </Tip>

    Parameters:
        tokenizer (`AutoTokenizer`):
            The tokenized used to decode the tokens.
        skip_prompt (`bool`, *optional*, defaults to `False`):
            Whether to skip the prompt to `.generate()` or not. Useful e.g. for chatbots.
        decode_kwargs (`dict`, *optional*):
            Additional keyword arguments to pass to the tokenizer's `decode` method.

    Example:
        ```python
        >>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
        ...
        >>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
        >>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
        >>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
        >>> streamer = TextStreamer(tok)
        ...
        >>> # Despite returning the usual output, the streamer will also print the generated text to stdout.
        >>> _ = model.generate(**inputs, streamer=streamer, max_new_tokens=20)
        An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,
        ```
    """
    def __init__(self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, **decode_kwargs):
        """
        Initializes an instance of the TextStreamer class.

        Args:
            tokenizer (AutoTokenizer): An instance of AutoTokenizer used for tokenization.
            skip_prompt (bool, optional): A flag indicating whether to skip the prompt. Defaults to False.
            **decode_kwargs: Additional keyword arguments for decoding.

        Returns:
            None.

        Raises:
            TypeError: If tokenizer is not an instance of AutoTokenizer.
            ValueError: If skip_prompt is not a boolean.
        """
        self.tokenizer = tokenizer
        self.skip_prompt = skip_prompt
        self.decode_kwargs = decode_kwargs

        # variables used in the streaming process
        self.token_cache = []
        self.print_len = 0
        self.next_tokens_are_prompt = True

    def put(self, value):
        """
        Receives tokens, decodes them, and prints them to stdout as soon as they form entire words.
        """
        if len(value.shape) > 1 and value.shape[0] > 1:
            raise ValueError("TextStreamer only supports batch size 1")
        if len(value.shape) > 1:
            value = value[0]

        if self.skip_prompt and self.next_tokens_are_prompt:
            self.next_tokens_are_prompt = False
            return

        # Add the new token to the cache and decodes the entire thing.
        self.token_cache.extend(value.tolist())
        text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)

        # After the symbol for a new line, we flush the cache.
        if text.endswith("\n"):
            printable_text = text[self.print_len :]
            self.token_cache = []
            self.print_len = 0
        # If the last token is a CJK character, we print the characters.
        elif len(text) > 0 and self._is_chinese_char(ord(text[-1])):
            printable_text = text[self.print_len :]
            self.print_len += len(printable_text)
        # Otherwise, prints until the last space char (simple heuristic to avoid printing incomplete words,
        # which may change with the subsequent token -- there are probably smarter ways to do this!)
        else:
            printable_text = text[self.print_len : text.rfind(" ") + 1]
            self.print_len += len(printable_text)

        self.on_finalized_text(printable_text)

    def end(self):
        """Flushes any remaining cache and prints a newline to stdout."""
        # Flush the cache, if it exists
        if len(self.token_cache) > 0:
            text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)
            printable_text = text[self.print_len :]
            self.token_cache = []
            self.print_len = 0
        else:
            printable_text = ""

        self.next_tokens_are_prompt = True
        self.on_finalized_text(printable_text, stream_end=True)

    def on_finalized_text(self, text: str, stream_end: bool = False):
        """Prints the new text to stdout. If the stream is ending, also prints a newline."""
        print(text, flush=True, end="" if not stream_end else None)

    def _is_chinese_char(self, cp):
        """Checks whether CP is the codepoint of a CJK character."""
        # This defines a "chinese character" as anything in the CJK Unicode block:
        #   https://en.wikipedia.org/wiki/CJK_Unified_Ideographs_(Unicode_block)
        #
        # Note that the CJK Unicode block is NOT all Japanese and Korean characters,
        # despite its name. The modern Korean Hangul alphabet is a different block,
        # as is Japanese Hiragana and Katakana. Those alphabets are used to write
        # space-separated words, so they are not treated specially and handled
        # like the all of the other languages.
        if (
            (cp >= 0x4E00 and cp <= 0x9FFF)
            or (cp >= 0x3400 and cp <= 0x4DBF)  #
            or (cp >= 0x20000 and cp <= 0x2A6DF)  #
            or (cp >= 0x2A700 and cp <= 0x2B73F)  #
            or (cp >= 0x2B740 and cp <= 0x2B81F)  #
            or (cp >= 0x2B820 and cp <= 0x2CEAF)  #
            or (cp >= 0xF900 and cp <= 0xFAFF)
            or (cp >= 0x2F800 and cp <= 0x2FA1F)  #
        ):  #
            return True

        return False

mindnlp.transformers.generation.streamers.TextStreamer.__init__(tokenizer, skip_prompt=False, **decode_kwargs)

Initializes an instance of the TextStreamer class.

PARAMETER DESCRIPTION
tokenizer

An instance of AutoTokenizer used for tokenization.

TYPE: AutoTokenizer

skip_prompt

A flag indicating whether to skip the prompt. Defaults to False.

TYPE: bool DEFAULT: False

**decode_kwargs

Additional keyword arguments for decoding.

DEFAULT: {}

RETURNS DESCRIPTION

None.

RAISES DESCRIPTION
TypeError

If tokenizer is not an instance of AutoTokenizer.

ValueError

If skip_prompt is not a boolean.

Source code in mindnlp/transformers/generation/streamers.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, **decode_kwargs):
    """
    Initializes an instance of the TextStreamer class.

    Args:
        tokenizer (AutoTokenizer): An instance of AutoTokenizer used for tokenization.
        skip_prompt (bool, optional): A flag indicating whether to skip the prompt. Defaults to False.
        **decode_kwargs: Additional keyword arguments for decoding.

    Returns:
        None.

    Raises:
        TypeError: If tokenizer is not an instance of AutoTokenizer.
        ValueError: If skip_prompt is not a boolean.
    """
    self.tokenizer = tokenizer
    self.skip_prompt = skip_prompt
    self.decode_kwargs = decode_kwargs

    # variables used in the streaming process
    self.token_cache = []
    self.print_len = 0
    self.next_tokens_are_prompt = True

mindnlp.transformers.generation.streamers.TextStreamer.end()

Flushes any remaining cache and prints a newline to stdout.

Source code in mindnlp/transformers/generation/streamers.py
129
130
131
132
133
134
135
136
137
138
139
140
141
def end(self):
    """Flushes any remaining cache and prints a newline to stdout."""
    # Flush the cache, if it exists
    if len(self.token_cache) > 0:
        text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)
        printable_text = text[self.print_len :]
        self.token_cache = []
        self.print_len = 0
    else:
        printable_text = ""

    self.next_tokens_are_prompt = True
    self.on_finalized_text(printable_text, stream_end=True)

mindnlp.transformers.generation.streamers.TextStreamer.on_finalized_text(text, stream_end=False)

Prints the new text to stdout. If the stream is ending, also prints a newline.

Source code in mindnlp/transformers/generation/streamers.py
143
144
145
def on_finalized_text(self, text: str, stream_end: bool = False):
    """Prints the new text to stdout. If the stream is ending, also prints a newline."""
    print(text, flush=True, end="" if not stream_end else None)

mindnlp.transformers.generation.streamers.TextStreamer.put(value)

Receives tokens, decodes them, and prints them to stdout as soon as they form entire words.

Source code in mindnlp/transformers/generation/streamers.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def put(self, value):
    """
    Receives tokens, decodes them, and prints them to stdout as soon as they form entire words.
    """
    if len(value.shape) > 1 and value.shape[0] > 1:
        raise ValueError("TextStreamer only supports batch size 1")
    if len(value.shape) > 1:
        value = value[0]

    if self.skip_prompt and self.next_tokens_are_prompt:
        self.next_tokens_are_prompt = False
        return

    # Add the new token to the cache and decodes the entire thing.
    self.token_cache.extend(value.tolist())
    text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)

    # After the symbol for a new line, we flush the cache.
    if text.endswith("\n"):
        printable_text = text[self.print_len :]
        self.token_cache = []
        self.print_len = 0
    # If the last token is a CJK character, we print the characters.
    elif len(text) > 0 and self._is_chinese_char(ord(text[-1])):
        printable_text = text[self.print_len :]
        self.print_len += len(printable_text)
    # Otherwise, prints until the last space char (simple heuristic to avoid printing incomplete words,
    # which may change with the subsequent token -- there are probably smarter ways to do this!)
    else:
        printable_text = text[self.print_len : text.rfind(" ") + 1]
        self.print_len += len(printable_text)

    self.on_finalized_text(printable_text)