fluvio

The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.

This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.

Creating a topic with default settings (1 partition, 1 replication) is as simple as:

fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")

Or just create a topic with custom settings:

import fluvio

fluvio_admin = FluvioAdmin.connect()
topic_spec = (
    TopicSpec.create()
    .with_partitions(3)
    .with_replications(3)
    .with_max_partition_size("1Gb")
    .with_retention_time("1h")
    .with_segment_size("10M")
    .build()
)
fluvio_admin.create_topic("a_topic", topic_spec)

Producing data to a topic in a Fluvio cluster is as simple as:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
producer = fluvio.topic_producer(topic)

for i in range(10):
    producer.send_string("Hello %s " % i)

Consuming is also simple:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

For more examples see the integration tests in the fluvio-python repository.

test_produce.py test_consumer.py

  1"""
  2The __fluvio__ python module provides an extension for working with the Fluvio
  3streaming platform.
  4
  5This module builds on top of the Fluvio Client Rust Crate and provides a
  6pythonic access to the API.
  7
  8
  9Creating a topic with default settings (1 partition, 1 replication) is as simple as:
 10
 11```python
 12fluvio_admin = FluvioAdmin.connect()
 13fluvio_admin.create_topic("a_topic")
 14```
 15
 16Or just create a topic with custom settings:
 17
 18```python
 19import fluvio
 20
 21fluvio_admin = FluvioAdmin.connect()
 22topic_spec = (
 23    TopicSpec.create()
 24    .with_partitions(3)
 25    .with_replications(3)
 26    .with_max_partition_size("1Gb")
 27    .with_retention_time("1h")
 28    .with_segment_size("10M")
 29    .build()
 30)
 31fluvio_admin.create_topic("a_topic", topic_spec)
 32```
 33
 34Producing data to a topic in a Fluvio cluster is as simple as:
 35
 36```python
 37import fluvio
 38
 39fluvio = Fluvio.connect()
 40
 41topic = "a_topic"
 42producer = fluvio.topic_producer(topic)
 43
 44for i in range(10):
 45    producer.send_string("Hello %s " % i)
 46```
 47
 48Consuming is also simple:
 49
 50```python
 51import fluvio
 52
 53fluvio = Fluvio.connect()
 54
 55topic = "a_topic"
 56builder = ConsumerConfigExtBuilder(topic)
 57config = builder.build()
 58stream = fluvio.consumer_with_config(config)
 59
 60num_items = 2
 61records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
 62```
 63
 64For more examples see the integration tests in the fluvio-python repository.
 65
 66[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py)
 67[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py)
 68
 69"""
 70
 71import typing
 72from enum import Enum
 73
 74from ._fluvio_python import (
 75    Fluvio as _Fluvio,
 76    FluvioConfig as _FluvioConfig,
 77    Offset,
 78    FluvioAdmin as _FluvioAdmin,
 79    # consumer types
 80    ConsumerConfig as _ConsumerConfig,
 81    ConsumerConfigExt,
 82    ConsumerConfigExtBuilder,
 83    PartitionConsumer as _PartitionConsumer,
 84    MultiplePartitionConsumer as _MultiplePartitionConsumer,
 85    PartitionSelectionStrategy as _PartitionSelectionStrategy,
 86    PartitionConsumerStream as _PartitionConsumerStream,
 87    AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream,
 88    # producer types
 89    TopicProducer as _TopicProducer,
 90    ProduceOutput as _ProduceOutput,
 91    ProducerBatchRecord as _ProducerBatchRecord,
 92    # admin and misc types
 93    SmartModuleKind as _SmartModuleKind,
 94    TopicSpec as _TopicSpec,
 95    WatchTopicStream as _WatchTopicStream,
 96    WatchSmartModuleStream as _WatchSmartModuleStream,
 97)
 98
 99from ._fluvio_python import Error as FluviorError  # noqa: F401
100
101from .topic import TopicSpec, CompressionType, TopicMode
102from .record import Record, RecordMetadata
103from .specs import (
104    # support types
105    CommonCreateRequest,
106    PartitionMap,
107    # specs
108    SmartModuleSpec,
109    MessageMetadataTopicSpec,
110    MetadataPartitionSpec,
111    MetadataSmartModuleSpec,
112    MetadataTopicSpec,
113    MetaUpdateSmartModuleSpec,
114    MetaUpdateTopicSpec,
115)
116
117# this structures the module a bit and allows pydoc to generate better docs
118# with better ordering of types and functions
119# inclusion in __all__, will pull the PyO3 rust inline docs into
120# the pdoc generated documentation
121__all__ = [
122    # top level types
123    "Fluvio",
124    "FluvioConfig",
125    "FluvioAdmin",
126    # topic
127    "TopicSpec",
128    "CompressionType",
129    "TopicMode",
130    # record
131    "Record",
132    "RecordMetadata",
133    "Offset",
134    # producer
135    "TopicProducer",
136    "ProduceOutput",
137    # consumer
138    "ConsumerConfigExt",
139    "ConsumerConfigExtBuilder",
140    "ConsumerConfig",
141    "PartitionConsumer",
142    "MultiplePartitionConsumer",
143    # specs
144    "CommonCreateRequest",
145    "SmartModuleSpec",
146    "TopicSpec",
147    "PartitionMap",
148    "MessageMetadataTopicSpec",
149    "MetadataPartitionSpec",
150    "MetadataTopicSpec",
151    "MetaUpdateTopicSpec",
152    # Misc
153    "PartitionSelectionStrategy",
154    "SmartModuleKind",
155]
156
157
158class ProduceOutput:
159    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
160
161    _inner: _ProduceOutput
162
163    def __init__(self, inner: _ProduceOutput) -> None:
164        self._inner = inner
165
166    def wait(self) -> typing.Optional[RecordMetadata]:
167        """Wait for the record metadata.
168
169        This is a blocking call and may only return a `RecordMetadata` once.
170        Any subsequent call to `wait` will return a `None` value.
171        Errors will be raised as exceptions of type `FluvioError`.
172        """
173        res = self._inner.wait()
174        if res is None:
175            return None
176        return RecordMetadata(res)
177
178    async def async_wait(self) -> typing.Optional[RecordMetadata]:
179        """Asynchronously wait for the record metadata.
180
181        This may only return a `RecordMetadata` once.
182        Any subsequent call to `wait` will return a `None` value.
183        """
184        return await self._inner.async_wait()
185
186
187class SmartModuleKind(Enum):
188    """
189    Use of this is to explicitly set the kind of a smartmodule. Not required
190    but needed for legacy SmartModules.
191    """
192
193    Filter = _SmartModuleKind.Filter
194    Map = _SmartModuleKind.Map
195    ArrayMap = _SmartModuleKind.ArrayMap
196    FilterMap = _SmartModuleKind.FilterMap
197    Aggregate = _SmartModuleKind.Aggregate
198
199
200class ConsumerConfig:
201    _inner: _ConsumerConfig
202
203    def __init__(self):
204        self._inner = _ConsumerConfig()
205
206    def disable_continuous(self, val: bool = True):
207        """Disable continuous mode after fetching specified records"""
208        self._inner.disable_continuous(val)
209
210    def smartmodule(
211        self,
212        name: str = None,
213        path: str = None,
214        kind: SmartModuleKind = None,
215        params: typing.Dict[str, str] = None,
216        aggregate: typing.List[bytes] = None,
217    ):
218        """
219        This is a method for adding a smartmodule to a consumer config either
220        using a `name` of a `SmartModule` or a `path` to a wasm binary.
221
222        Args:
223
224            name: str
225            path: str
226            kind: SmartModuleKind
227            params: Dict[str, str]
228            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
229
230        Raises:
231            "Require either a path or a name for a smartmodule."
232            "Only specify one of path or name not both."
233
234        Returns:
235
236            None
237        """
238
239        if kind is not None:
240            kind = kind.value
241
242        if path is None and name is None:
243            raise Exception("Require either a path or a name for a smartmodule.")
244
245        if path is not None and name is not None:
246            raise Exception("Only specify one of path or name not both.")
247
248        params = {} if params is None else params
249        param_keys = [x for x in params.keys()]
250        param_vals = [x for x in params.values()]
251
252        self._inner.smartmodule(
253            name,
254            path,
255            kind,
256            param_keys,
257            param_vals,
258            aggregate,
259            # These arguments are for Join stuff but that's not implemented on
260            # the python side yet
261            None,
262            None,
263            None,
264            None,
265        )
266
267
268class PartitionConsumer:
269    """
270    An interface for consuming events from a particular partition
271
272    There are two ways to consume events: by "fetching" events and by
273    "streaming" events. Fetching involves specifying a range of events that you
274    want to consume via their Offset. A fetch is a sort of one-time batch
275    operation: you’ll receive all of the events in your range all at once. When
276    you consume events via Streaming, you specify a starting Offset and receive
277    an object that will continuously yield new events as they arrive.
278    """
279
280    _inner: _PartitionConsumer
281
282    def __init__(self, inner: _PartitionConsumer):
283        self._inner = inner
284
285    def stream(self, offset: Offset) -> typing.Iterator[Record]:
286        """
287        Continuously streams events from a particular offset in the consumer’s
288        partition. This returns a `Iterator[Record]` which is an
289        iterator.
290
291        Streaming is one of the two ways to consume events in Fluvio. It is a
292        continuous request for new records arriving in a partition, beginning
293        at a particular offset. You specify the starting point of the stream
294        using an Offset and periodically receive events, either individually or
295        in batches.
296        """
297        return self._generator(self._inner.stream(offset))
298
299    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
300        """
301        Continuously streams events from a particular offset in the consumer’s
302        partition. This returns a `AsyncIterator[Record]` which is an
303        iterator.
304
305        Streaming is one of the two ways to consume events in Fluvio. It is a
306        continuous request for new records arriving in a partition, beginning
307        at a particular offset. You specify the starting point of the stream
308        using an Offset and periodically receive events, either individually or
309        in batches.
310        """
311        return self._async_generator(await self._inner.async_stream(offset))
312
313    def stream_with_config(
314        self, offset: Offset, config: ConsumerConfig
315    ) -> typing.Iterator[Record]:
316        """
317        Continuously streams events from a particular offset with a SmartModule
318        WASM module in the consumer’s partition. This returns a
319        `Iterator[Record]` which is an iterator.
320
321        Streaming is one of the two ways to consume events in Fluvio. It is a
322        continuous request for new records arriving in a partition, beginning
323        at a particular offset. You specify the starting point of the stream
324        using an Offset and periodically receive events, either individually or
325        in batches.
326
327        Args:
328            offset: Offset
329            wasm_module_path: str - The absolute path to the WASM file
330
331        Example:
332            import os
333
334            wmp = os.path.abspath("somefilter.wasm")
335            config = ConsumerConfig()
336            config.smartmodule(path=wmp)
337            for i in consumer.stream_with_config(Offset.beginning(), config):
338                # do something with i
339
340        Returns:
341            `Iterator[Record]`
342
343        """
344        return self._generator(self._inner.stream_with_config(offset, config._inner))
345
346    async def async_stream_with_config(
347        self, offset: Offset, config: ConsumerConfig
348    ) -> typing.AsyncIterator[Record]:
349        """
350        Continuously streams events from a particular offset with a SmartModule
351        WASM module in the consumer’s partition. This returns a
352        `AsyncIterator[Record]` which is an async iterator.
353
354        Streaming is one of the two ways to consume events in Fluvio. It is a
355        continuous request for new records arriving in a partition, beginning
356        at a particular offset. You specify the starting point of the stream
357        using an Offset and periodically receive events, either individually or
358        in batches.
359
360        Args:
361            offset: Offset
362            wasm_module_path: str - The absolute path to the WASM file
363
364        Example:
365            import os
366
367            wmp = os.path.abspath("somefilter.wasm")
368            config = ConsumerConfig()
369            config.smartmodule(path=wmp)
370            `AsyncIterator[Record]`
371
372        """
373        return self._async_generator(
374            await self._inner.async_stream_with_config(offset, config._inner)
375        )
376
377    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
378        item = stream.next()
379        while item is not None:
380            yield Record(item)
381            item = stream.next()
382
383    async def _async_generator(
384        self, astream: _AsyncPartitionConsumerStream
385    ) -> typing.AsyncIterator[Record]:
386        item = await astream.async_next()
387        while item is not None:
388            yield Record(item)
389            item = await astream.async_next()
390
391
392class MultiplePartitionConsumer:
393    """
394    An interface for consuming events from multiple partitions
395
396    There are two ways to consume events: by "fetching" events and by
397    "streaming" events. Fetching involves specifying a range of events that you
398    want to consume via their Offset. A fetch is a sort of one-time batch
399    operation: you’ll receive all of the events in your range all at once. When
400    you consume events via Streaming, you specify a starting Offset and receive
401    an object that will continuously yield new events as they arrive.
402    """
403
404    _inner: _MultiplePartitionConsumer
405
406    def __init__(self, inner: _MultiplePartitionConsumer):
407        self._inner = inner
408
409    def stream(self, offset: Offset) -> typing.Iterator[Record]:
410        """
411        Continuously streams events from a particular offset in the consumer’s
412        partitions. This returns a `Iterator[Record]` which is an
413        iterator.
414
415        Streaming is one of the two ways to consume events in Fluvio. It is a
416        continuous request for new records arriving in a partition, beginning
417        at a particular offset. You specify the starting point of the stream
418        using an Offset and periodically receive events, either individually or
419        in batches.
420        """
421        return self._generator(self._inner.stream(offset))
422
423    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
424        """
425        Continuously streams events from a particular offset in the consumer’s
426        partitions. This returns a `AsyncIterator[Record]` which is an
427        async iterator.
428
429        Streaming is one of the two ways to consume events in Fluvio. It is a
430        continuous request for new records arriving in a partition, beginning
431        at a particular offset. You specify the starting point of the stream
432        using an Offset and periodically receive events, either individually or
433        in batches.
434        """
435        return self._async_generator(await self._inner.async_stream(offset))
436
437    def stream_with_config(
438        self, offset: Offset, config: ConsumerConfig
439    ) -> typing.Iterator[Record]:
440        """
441        Continuously streams events from a particular offset with a SmartModule
442        WASM module in the consumer’s partitions. This returns a
443        `Iterator[Record]` which is an iterator.
444
445        Streaming is one of the two ways to consume events in Fluvio. It is a
446        continuous request for new records arriving in a partition, beginning
447        at a particular offset. You specify the starting point of the stream
448        using an Offset and periodically receive events, either individually or
449        in batches.
450
451        Args:
452            offset: Offset
453            wasm_module_path: str - The absolute path to the WASM file
454
455        Example:
456            import os
457
458            wmp = os.path.abspath("somefilter.wasm")
459            config = ConsumerConfig()
460            config.smartmodule(path=wmp)
461            for i in consumer.stream_with_config(Offset.beginning(), config):
462                # do something with i
463
464        Returns:
465            `Iterator[Record]`
466
467        """
468        return self._generator(
469            self._inner.stream_with_config(offset._inner, config._inner)
470        )
471
472    async def async_stream_with_config(
473        self, offset: Offset, config: ConsumerConfig
474    ) -> typing.AsyncIterator[Record]:
475        """
476        Continuously streams events from a particular offset with a SmartModule
477        WASM module in the consumer’s partitions. This returns a
478        `AsyncIterator[Record]` which is an async iterator.
479
480        Streaming is one of the two ways to consume events in Fluvio. It is a
481        continuous request for new records arriving in a partition, beginning
482        at a particular offset. You specify the starting point of the stream
483        using an Offset and periodically receive events, either individually or
484        in batches.
485
486        Args:
487            offset: Offset
488            wasm_module_path: str - The absolute path to the WASM file
489
490        Example:
491            import os
492
493            wmp = os.path.abspath("somefilter.wasm")
494            config = ConsumerConfig()
495            config.smartmodule(path=wmp)
496            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
497                # do something with i
498
499        Returns:
500            `AsyncIterator[Record]`
501
502        """
503        return self._async_generator(
504            await self._inner.async_stream_with_config(offset, config._inner)
505        )
506
507    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
508        item = stream.next()
509        while item is not None:
510            yield Record(item)
511            item = stream.next()
512
513    async def _async_generator(
514        self, astream: _AsyncPartitionConsumerStream
515    ) -> typing.AsyncIterator[Record]:
516        item = await astream.async_next()
517        while item is not None:
518            yield Record(item)
519            item = await astream.async_next()
520
521
522class TopicProducer:
523    """An interface for producing events to a particular topic.
524
525    A `TopicProducer` allows you to send events to the specific topic it was
526    initialized for. Once you have a `TopicProducer`, you can send events to
527    the topic, choosing which partition each event should be delivered to.
528    """
529
530    _inner: _TopicProducer
531
532    def __init__(self, inner: _TopicProducer):
533        self._inner = inner
534
535    def send_string(self, buf: str) -> ProduceOutput:
536        """Sends a string to this producer’s topic"""
537        return self.send([], buf.encode("utf-8"))
538
539    async def async_send_string(self, buf: str) -> ProduceOutput:
540        """Sends a string to this producer’s topic"""
541        return await self.async_send([], buf.encode("utf-8"))
542
543    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
544        """
545        Sends a key/value record to this producer's Topic.
546
547        The partition that the record will be sent to is derived from the Key.
548        """
549        return ProduceOutput(self._inner.send(key, value))
550
551    async def async_send(
552        self, key: typing.List[int], value: typing.List[int]
553    ) -> ProduceOutput:
554        """
555        Async sends a key/value record to this producer's Topic.
556
557        The partition that the record will be sent to is derived from the Key.
558        """
559        produce_output = await self._inner.async_send(key, value)
560        return ProduceOutput(produce_output)
561
562    def flush(self) -> None:
563        """
564        Send all the queued records in the producer batches.
565        """
566        return self._inner.flush()
567
568    async def async_flush(self) -> None:
569        """
570        Async send all the queued records in the producer batches.
571        """
572        await self._inner.async_flush()
573
574    def send_all(
575        self, records: typing.List[typing.Tuple[bytes, bytes]]
576    ) -> typing.List[ProduceOutput]:
577        """
578        Sends a list of key/value records as a batch to this producer's Topic.
579        :param records: The list of records to send
580        """
581        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
582        return [
583            ProduceOutput(output_inner)
584            for output_inner in self._inner.send_all(records_inner)
585        ]
586
587    async def async_send_all(
588        self, records: typing.List[typing.Tuple[bytes, bytes]]
589    ) -> typing.List[ProduceOutput]:
590        """
591        Async sends a list of key/value records as a batch to this producer's Topic.
592        :param records: The list of records to send
593        """
594        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
595        return [
596            ProduceOutput(output_inner)
597            for output_inner in await self._inner.async_send_all(records_inner)
598        ]
599
600
601class PartitionSelectionStrategy:
602    """Stragegy to select partitions"""
603
604    _inner: _PartitionSelectionStrategy
605
606    def __init__(self, inner: _FluvioConfig):
607        self._inner = inner
608
609    @classmethod
610    def with_all(cls, topic: str):
611        """select all partitions of one topic"""
612        return cls(_PartitionSelectionStrategy.with_all(topic))
613
614    @classmethod
615    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
616        """select multiple partitions of multiple topics"""
617        return cls(_PartitionSelectionStrategy.with_multiple(topic))
618
619
620class FluvioConfig:
621    """Configuration for Fluvio client"""
622
623    _inner: _FluvioConfig
624
625    def __init__(self, inner: _FluvioConfig):
626        self._inner = inner
627
628    @classmethod
629    def load(cls):
630        """get current cluster config from default profile"""
631        return cls(_FluvioConfig.load())
632
633    @classmethod
634    def new(cls, addr: str):
635        """Create a new cluster configuration with no TLS."""
636        return cls(_FluvioConfig.new(addr))
637
638    def set_endpoint(self, endpoint: str):
639        """set endpoint"""
640        self._inner.set_endpoint(endpoint)
641
642    def set_use_spu_local_address(self, val: bool):
643        """set wheather to use spu local address"""
644        self._inner.set_use_spu_local_address(val)
645
646    def disable_tls(self):
647        """disable tls for this config"""
648        self._inner.disable_tls()
649
650    def set_anonymous_tls(self):
651        """set the config to use anonymous tls"""
652        self._inner.set_anonymous_tls()
653
654    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
655        """specify inline tls parameters"""
656        self._inner.set_inline_tls(domain, key, cert, ca_cert)
657
658    def set_tls_file_paths(
659        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
660    ):
661        """specify paths to tls files"""
662        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
663
664    def set_client_id(self, client_id: str):
665        """set client id"""
666        self._inner.set_client_id(client_id)
667
668    def unset_client_id(self):
669        """remove the configured client id from config"""
670        self._inner.unset_client_id()
671
672
673class Fluvio:
674    """An interface for interacting with Fluvio streaming."""
675
676    _inner: _Fluvio
677
678    def __init__(self, inner: _Fluvio):
679        self._inner = inner
680
681    @classmethod
682    def connect(cls):
683        """Tries to create a new Fluvio client using the current profile from
684        `~/.fluvio/config`
685        """
686        return cls(_Fluvio.connect())
687
688    @classmethod
689    def connect_with_config(cls, config: FluvioConfig):
690        """Creates a new Fluvio client using the given configuration"""
691        return cls(_Fluvio.connect_with_config(config._inner))
692
693    def consumer_with_config(
694        self, config: ConsumerConfigExt
695    ) -> typing.Iterator[Record]:
696        """Creates consumer with settings defined in config
697
698        This is the recommended way to create a consume records.
699        """
700        return self._generator(self._inner.consumer_with_config(config))
701
702    def topic_producer(self, topic: str) -> TopicProducer:
703        """
704        Creates a new `TopicProducer` for the given topic name.
705
706        Currently, producers are scoped to a specific Fluvio topic. That means
707        when you send events via a producer, you must specify which partition
708        each event should go to.
709        """
710        return TopicProducer(self._inner.topic_producer(topic))
711
712    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
713        """Creates a new `PartitionConsumer` for the given topic and partition
714
715        Currently, consumers are scoped to both a specific Fluvio topic and to
716        a particular partition within that topic. That means that if you have a
717        topic with multiple partitions, then in order to receive all of the
718        events in all of the partitions, you will need to create one consumer
719        per partition.
720        """
721        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
722
723    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
724        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
725
726        Currently, consumers are scoped to both a specific Fluvio topic and to
727        its all partitions within that topic.
728        """
729        strategy = PartitionSelectionStrategy.with_all(topic)
730        return MultiplePartitionConsumer(
731            self._inner.multi_partition_consumer(strategy._inner)
732        )
733
734    def multi_topic_partition_consumer(
735        self, selections: typing.List[typing.Tuple[str, int]]
736    ) -> MultiplePartitionConsumer:
737        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
738
739        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
740        """
741        strategy = PartitionSelectionStrategy.with_multiple(selections)
742        return MultiplePartitionConsumer(
743            self._inner.multi_partition_consumer(strategy._inner)
744        )
745
746    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
747        item = stream.next()
748        while item is not None:
749            yield Record(item)
750            item = stream.next()
751
752
753class FluvioAdmin:
754    _inner: _FluvioAdmin
755
756    def __init__(self, inner: _FluvioAdmin):
757        self._inner = inner
758
759    def connect():
760        return FluvioAdmin(_FluvioAdmin.connect())
761
762    def connect_with_config(config: FluvioConfig):
763        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
764
765    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
766        partitions = 1
767        replication = 1
768        ignore_rack = True
769        dry_run = False
770        spec = (
771            spec
772            if spec is not None
773            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
774        )
775        return self._inner.create_topic(topic, dry_run, spec)
776
777    def create_topic_with_config(
778        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
779    ):
780        return self._inner.create_topic_with_config(topic, req._inner, spec)
781
782    def delete_topic(self, topic: str):
783        return self._inner.delete_topic(topic)
784
785    def all_topics(self) -> typing.List[MetadataTopicSpec]:
786        return self._inner.all_topics()
787
788    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
789        return self._inner.list_topics(filters)
790
791    def list_topics_with_params(
792        self, filters: typing.List[str], summary: bool
793    ) -> typing.List[MetadataTopicSpec]:
794        return self._inner.list_topics_with_params(filters, summary)
795
796    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
797        return self._topic_spec_generator(self._inner.watch_topic())
798
799    def create_smartmodule(self, name: str, path: str, dry_run: bool):
800        spec = SmartModuleSpec.new(path)
801        return self._inner.create_smart_module(name, dry_run, spec._inner)
802
803    def delete_smartmodule(self, name: str):
804        return self._inner.delete_smart_module(name)
805
806    def list_smartmodules(
807        self, filters: typing.List[str]
808    ) -> typing.List[MetadataSmartModuleSpec]:
809        return self._inner.list_smart_modules(filters)
810
811    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
812        return self._smart_module_spec_generator(self._inner.watch_smart_module())
813
814    def list_partitions(
815        self, filters: typing.List[str]
816    ) -> typing.List[MetadataPartitionSpec]:
817        return self._inner.list_partitions(filters)
818
819    def _topic_spec_generator(
820        self, stream: _WatchTopicStream
821    ) -> typing.Iterator[MetaUpdateTopicSpec]:
822        item = stream.next().inner()
823        while item is not None:
824            yield MetaUpdateTopicSpec(item)
825            item = stream.next().inner()
826
827    def _smart_module_spec_generator(
828        self, stream: _WatchSmartModuleStream
829    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
830        item = stream.next().inner()
831        while item is not None:
832            yield MetaUpdateSmartModuleSpec(item)
833            item = stream.next().inner()
class Fluvio:
674class Fluvio:
675    """An interface for interacting with Fluvio streaming."""
676
677    _inner: _Fluvio
678
679    def __init__(self, inner: _Fluvio):
680        self._inner = inner
681
682    @classmethod
683    def connect(cls):
684        """Tries to create a new Fluvio client using the current profile from
685        `~/.fluvio/config`
686        """
687        return cls(_Fluvio.connect())
688
689    @classmethod
690    def connect_with_config(cls, config: FluvioConfig):
691        """Creates a new Fluvio client using the given configuration"""
692        return cls(_Fluvio.connect_with_config(config._inner))
693
694    def consumer_with_config(
695        self, config: ConsumerConfigExt
696    ) -> typing.Iterator[Record]:
697        """Creates consumer with settings defined in config
698
699        This is the recommended way to create a consume records.
700        """
701        return self._generator(self._inner.consumer_with_config(config))
702
703    def topic_producer(self, topic: str) -> TopicProducer:
704        """
705        Creates a new `TopicProducer` for the given topic name.
706
707        Currently, producers are scoped to a specific Fluvio topic. That means
708        when you send events via a producer, you must specify which partition
709        each event should go to.
710        """
711        return TopicProducer(self._inner.topic_producer(topic))
712
713    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
714        """Creates a new `PartitionConsumer` for the given topic and partition
715
716        Currently, consumers are scoped to both a specific Fluvio topic and to
717        a particular partition within that topic. That means that if you have a
718        topic with multiple partitions, then in order to receive all of the
719        events in all of the partitions, you will need to create one consumer
720        per partition.
721        """
722        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
723
724    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
725        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
726
727        Currently, consumers are scoped to both a specific Fluvio topic and to
728        its all partitions within that topic.
729        """
730        strategy = PartitionSelectionStrategy.with_all(topic)
731        return MultiplePartitionConsumer(
732            self._inner.multi_partition_consumer(strategy._inner)
733        )
734
735    def multi_topic_partition_consumer(
736        self, selections: typing.List[typing.Tuple[str, int]]
737    ) -> MultiplePartitionConsumer:
738        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
739
740        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
741        """
742        strategy = PartitionSelectionStrategy.with_multiple(selections)
743        return MultiplePartitionConsumer(
744            self._inner.multi_partition_consumer(strategy._inner)
745        )
746
747    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
748        item = stream.next()
749        while item is not None:
750            yield Record(item)
751            item = stream.next()

An interface for interacting with Fluvio streaming.

Fluvio(inner: Fluvio)
679    def __init__(self, inner: _Fluvio):
680        self._inner = inner
@classmethod
def connect(cls):
682    @classmethod
683    def connect(cls):
684        """Tries to create a new Fluvio client using the current profile from
685        `~/.fluvio/config`
686        """
687        return cls(_Fluvio.connect())

Tries to create a new Fluvio client using the current profile from ~/.fluvio/config

@classmethod
def connect_with_config(cls, config: FluvioConfig):
689    @classmethod
690    def connect_with_config(cls, config: FluvioConfig):
691        """Creates a new Fluvio client using the given configuration"""
692        return cls(_Fluvio.connect_with_config(config._inner))

Creates a new Fluvio client using the given configuration

def consumer_with_config(self, config: ConsumerConfigExt) -> Iterator[Record]:
694    def consumer_with_config(
695        self, config: ConsumerConfigExt
696    ) -> typing.Iterator[Record]:
697        """Creates consumer with settings defined in config
698
699        This is the recommended way to create a consume records.
700        """
701        return self._generator(self._inner.consumer_with_config(config))

Creates consumer with settings defined in config

This is the recommended way to create a consume records.

def topic_producer(self, topic: str) -> TopicProducer:
703    def topic_producer(self, topic: str) -> TopicProducer:
704        """
705        Creates a new `TopicProducer` for the given topic name.
706
707        Currently, producers are scoped to a specific Fluvio topic. That means
708        when you send events via a producer, you must specify which partition
709        each event should go to.
710        """
711        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.

def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
713    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
714        """Creates a new `PartitionConsumer` for the given topic and partition
715
716        Currently, consumers are scoped to both a specific Fluvio topic and to
717        a particular partition within that topic. That means that if you have a
718        topic with multiple partitions, then in order to receive all of the
719        events in all of the partitions, you will need to create one consumer
720        per partition.
721        """
722        return PartitionConsumer(self._inner.partition_consumer(topic, partition))

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
724    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
725        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
726
727        Currently, consumers are scoped to both a specific Fluvio topic and to
728        its all partitions within that topic.
729        """
730        strategy = PartitionSelectionStrategy.with_all(topic)
731        return MultiplePartitionConsumer(
732            self._inner.multi_partition_consumer(strategy._inner)
733        )

Creates a new MultiplePartitionConsumer for the given topic and its all partitions

Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.

def multi_topic_partition_consumer( self, selections: List[Tuple[str, int]]) -> MultiplePartitionConsumer:
735    def multi_topic_partition_consumer(
736        self, selections: typing.List[typing.Tuple[str, int]]
737    ) -> MultiplePartitionConsumer:
738        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
739
740        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
741        """
742        strategy = PartitionSelectionStrategy.with_multiple(selections)
743        return MultiplePartitionConsumer(
744            self._inner.multi_partition_consumer(strategy._inner)
745        )

Creates a new MultiplePartitionConsumer for the given topics and partitions

Currently, consumers are scoped to a list of Fluvio topic and partition tuple.

class FluvioConfig:
621class FluvioConfig:
622    """Configuration for Fluvio client"""
623
624    _inner: _FluvioConfig
625
626    def __init__(self, inner: _FluvioConfig):
627        self._inner = inner
628
629    @classmethod
630    def load(cls):
631        """get current cluster config from default profile"""
632        return cls(_FluvioConfig.load())
633
634    @classmethod
635    def new(cls, addr: str):
636        """Create a new cluster configuration with no TLS."""
637        return cls(_FluvioConfig.new(addr))
638
639    def set_endpoint(self, endpoint: str):
640        """set endpoint"""
641        self._inner.set_endpoint(endpoint)
642
643    def set_use_spu_local_address(self, val: bool):
644        """set wheather to use spu local address"""
645        self._inner.set_use_spu_local_address(val)
646
647    def disable_tls(self):
648        """disable tls for this config"""
649        self._inner.disable_tls()
650
651    def set_anonymous_tls(self):
652        """set the config to use anonymous tls"""
653        self._inner.set_anonymous_tls()
654
655    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
656        """specify inline tls parameters"""
657        self._inner.set_inline_tls(domain, key, cert, ca_cert)
658
659    def set_tls_file_paths(
660        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
661    ):
662        """specify paths to tls files"""
663        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
664
665    def set_client_id(self, client_id: str):
666        """set client id"""
667        self._inner.set_client_id(client_id)
668
669    def unset_client_id(self):
670        """remove the configured client id from config"""
671        self._inner.unset_client_id()

Configuration for Fluvio client

FluvioConfig(inner: FluvioConfig)
626    def __init__(self, inner: _FluvioConfig):
627        self._inner = inner
@classmethod
def load(cls):
629    @classmethod
630    def load(cls):
631        """get current cluster config from default profile"""
632        return cls(_FluvioConfig.load())

get current cluster config from default profile

@classmethod
def new(cls, addr: str):
634    @classmethod
635    def new(cls, addr: str):
636        """Create a new cluster configuration with no TLS."""
637        return cls(_FluvioConfig.new(addr))

Create a new cluster configuration with no TLS.

def set_endpoint(self, endpoint: str):
639    def set_endpoint(self, endpoint: str):
640        """set endpoint"""
641        self._inner.set_endpoint(endpoint)

set endpoint

def set_use_spu_local_address(self, val: bool):
643    def set_use_spu_local_address(self, val: bool):
644        """set wheather to use spu local address"""
645        self._inner.set_use_spu_local_address(val)

set wheather to use spu local address

def disable_tls(self):
647    def disable_tls(self):
648        """disable tls for this config"""
649        self._inner.disable_tls()

disable tls for this config

def set_anonymous_tls(self):
651    def set_anonymous_tls(self):
652        """set the config to use anonymous tls"""
653        self._inner.set_anonymous_tls()

set the config to use anonymous tls

def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
655    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
656        """specify inline tls parameters"""
657        self._inner.set_inline_tls(domain, key, cert, ca_cert)

specify inline tls parameters

def set_tls_file_paths(self, domain: str, key_path: str, cert_path: str, ca_cert_path: str):
659    def set_tls_file_paths(
660        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
661    ):
662        """specify paths to tls files"""
663        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)

specify paths to tls files

def set_client_id(self, client_id: str):
665    def set_client_id(self, client_id: str):
666        """set client id"""
667        self._inner.set_client_id(client_id)

set client id

def unset_client_id(self):
669    def unset_client_id(self):
670        """remove the configured client id from config"""
671        self._inner.unset_client_id()

remove the configured client id from config

class FluvioAdmin:
754class FluvioAdmin:
755    _inner: _FluvioAdmin
756
757    def __init__(self, inner: _FluvioAdmin):
758        self._inner = inner
759
760    def connect():
761        return FluvioAdmin(_FluvioAdmin.connect())
762
763    def connect_with_config(config: FluvioConfig):
764        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
765
766    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
767        partitions = 1
768        replication = 1
769        ignore_rack = True
770        dry_run = False
771        spec = (
772            spec
773            if spec is not None
774            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
775        )
776        return self._inner.create_topic(topic, dry_run, spec)
777
778    def create_topic_with_config(
779        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
780    ):
781        return self._inner.create_topic_with_config(topic, req._inner, spec)
782
783    def delete_topic(self, topic: str):
784        return self._inner.delete_topic(topic)
785
786    def all_topics(self) -> typing.List[MetadataTopicSpec]:
787        return self._inner.all_topics()
788
789    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
790        return self._inner.list_topics(filters)
791
792    def list_topics_with_params(
793        self, filters: typing.List[str], summary: bool
794    ) -> typing.List[MetadataTopicSpec]:
795        return self._inner.list_topics_with_params(filters, summary)
796
797    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
798        return self._topic_spec_generator(self._inner.watch_topic())
799
800    def create_smartmodule(self, name: str, path: str, dry_run: bool):
801        spec = SmartModuleSpec.new(path)
802        return self._inner.create_smart_module(name, dry_run, spec._inner)
803
804    def delete_smartmodule(self, name: str):
805        return self._inner.delete_smart_module(name)
806
807    def list_smartmodules(
808        self, filters: typing.List[str]
809    ) -> typing.List[MetadataSmartModuleSpec]:
810        return self._inner.list_smart_modules(filters)
811
812    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
813        return self._smart_module_spec_generator(self._inner.watch_smart_module())
814
815    def list_partitions(
816        self, filters: typing.List[str]
817    ) -> typing.List[MetadataPartitionSpec]:
818        return self._inner.list_partitions(filters)
819
820    def _topic_spec_generator(
821        self, stream: _WatchTopicStream
822    ) -> typing.Iterator[MetaUpdateTopicSpec]:
823        item = stream.next().inner()
824        while item is not None:
825            yield MetaUpdateTopicSpec(item)
826            item = stream.next().inner()
827
828    def _smart_module_spec_generator(
829        self, stream: _WatchSmartModuleStream
830    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
831        item = stream.next().inner()
832        while item is not None:
833            yield MetaUpdateSmartModuleSpec(item)
834            item = stream.next().inner()
FluvioAdmin(inner: FluvioAdmin)
757    def __init__(self, inner: _FluvioAdmin):
758        self._inner = inner
def connect():
760    def connect():
761        return FluvioAdmin(_FluvioAdmin.connect())
def connect_with_config(config: FluvioConfig):
763    def connect_with_config(config: FluvioConfig):
764        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
def create_topic(self, topic: str, spec: Optional[TopicSpec] = None):
766    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
767        partitions = 1
768        replication = 1
769        ignore_rack = True
770        dry_run = False
771        spec = (
772            spec
773            if spec is not None
774            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
775        )
776        return self._inner.create_topic(topic, dry_run, spec)
def create_topic_with_config( self, topic: str, req: CommonCreateRequest, spec: TopicSpec):
778    def create_topic_with_config(
779        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
780    ):
781        return self._inner.create_topic_with_config(topic, req._inner, spec)
def delete_topic(self, topic: str):
783    def delete_topic(self, topic: str):
784        return self._inner.delete_topic(topic)
def all_topics(self) -> List[MetadataTopicSpec]:
786    def all_topics(self) -> typing.List[MetadataTopicSpec]:
787        return self._inner.all_topics()
def list_topics(self, filters: List[str]) -> List[MetadataTopicSpec]:
789    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
790        return self._inner.list_topics(filters)
def list_topics_with_params( self, filters: List[str], summary: bool) -> List[MetadataTopicSpec]:
792    def list_topics_with_params(
793        self, filters: typing.List[str], summary: bool
794    ) -> typing.List[MetadataTopicSpec]:
795        return self._inner.list_topics_with_params(filters, summary)
def watch_topic(self) -> Iterator[MetadataTopicSpec]:
797    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
798        return self._topic_spec_generator(self._inner.watch_topic())
def create_smartmodule(self, name: str, path: str, dry_run: bool):
800    def create_smartmodule(self, name: str, path: str, dry_run: bool):
801        spec = SmartModuleSpec.new(path)
802        return self._inner.create_smart_module(name, dry_run, spec._inner)
def delete_smartmodule(self, name: str):
804    def delete_smartmodule(self, name: str):
805        return self._inner.delete_smart_module(name)
def list_smartmodules(self, filters: List[str]) -> List[fluvio.specs.MetadataSmartModuleSpec]:
807    def list_smartmodules(
808        self, filters: typing.List[str]
809    ) -> typing.List[MetadataSmartModuleSpec]:
810        return self._inner.list_smart_modules(filters)
def watch_smartmodule(self) -> Iterator[fluvio.specs.MetadataSmartModuleSpec]:
812    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
813        return self._smart_module_spec_generator(self._inner.watch_smart_module())
def list_partitions(self, filters: List[str]) -> List[MetadataPartitionSpec]:
815    def list_partitions(
816        self, filters: typing.List[str]
817    ) -> typing.List[MetadataPartitionSpec]:
818        return self._inner.list_partitions(filters)
@dataclass(frozen=True)
class TopicSpec:
 26@dataclass(frozen=True)
 27class TopicSpec:
 28    mode: TopicMode = TopicMode.COMPUTED
 29    partitions: int = 1
 30    replications: int = 1
 31    ignore_rack: bool = True
 32    replica_assignment: Optional[List[PartitionMap]] = None
 33    retention_time: Optional[int] = None
 34    segment_size: Optional[int] = None
 35    compression_type: Optional[CompressionType] = None
 36    max_partition_size: Optional[int] = None
 37    system: bool = False
 38
 39    @classmethod
 40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
 41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
 42
 43    @classmethod
 44    def create(cls) -> "TopicSpec":
 45        """Alternative constructor method"""
 46        return cls()
 47
 48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
 49        """Set the assigned replica configuration"""
 50        return replace(
 51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
 52        )
 53
 54    def as_mirror_topic(self) -> "TopicSpec":
 55        """Set as a mirror topic"""
 56        return replace(self, mode=TopicMode.MIRROR)
 57
 58    def with_partitions(self, partitions: int) -> "TopicSpec":
 59        """Set the specified partitions"""
 60        if partitions < 0:
 61            raise ValueError("Partitions must be a positive integer")
 62        return replace(self, partitions=partitions)
 63
 64    def with_replications(self, replications: int) -> "TopicSpec":
 65        """Set the specified replication factor"""
 66        if replications < 0:
 67            raise ValueError("Replication factor must be a positive integer")
 68        return replace(self, replications=replications)
 69
 70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
 71        """Set the rack ignore setting"""
 72        return replace(self, ignore_rack=ignore)
 73
 74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
 75        """Set the specified compression type"""
 76        return replace(self, compression_type=compression)
 77
 78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
 79        """Set the specified retention time"""
 80
 81        if isinstance(retention_time, int):
 82            return replace(self, retention_time=retention_time)
 83
 84        parsed_time = parse_timespan(retention_time)
 85        return replace(self, retention_time=parsed_time)
 86
 87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
 88        """Set the specified segment size"""
 89        if isinstance(size, int):
 90            return replace(self, segment_size=size)
 91
 92        parsed_size = parse_byte_size(size)
 93        return replace(self, segment_size=parsed_size)
 94
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)
102
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)
106
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec
TopicSpec( mode: TopicMode = <TopicMode.COMPUTED: 'computed'>, partitions: int = 1, replications: int = 1, ignore_rack: bool = True, replica_assignment: Optional[List[PartitionMap]] = None, retention_time: Optional[int] = None, segment_size: Optional[int] = None, compression_type: Optional[CompressionType] = None, max_partition_size: Optional[int] = None, system: bool = False)
mode: TopicMode = <TopicMode.COMPUTED: 'computed'>
partitions: int = 1
replications: int = 1
ignore_rack: bool = True
replica_assignment: Optional[List[PartitionMap]] = None
retention_time: Optional[int] = None
segment_size: Optional[int] = None
compression_type: Optional[CompressionType] = None
max_partition_size: Optional[int] = None
system: bool = False
@classmethod
def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
39    @classmethod
40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
@classmethod
def create(cls) -> TopicSpec:
43    @classmethod
44    def create(cls) -> "TopicSpec":
45        """Alternative constructor method"""
46        return cls()

Alternative constructor method

def with_assignment( self, replica_assignment: List[PartitionMap]) -> TopicSpec:
48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
49        """Set the assigned replica configuration"""
50        return replace(
51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
52        )

Set the assigned replica configuration

def as_mirror_topic(self) -> TopicSpec:
54    def as_mirror_topic(self) -> "TopicSpec":
55        """Set as a mirror topic"""
56        return replace(self, mode=TopicMode.MIRROR)

Set as a mirror topic

def with_partitions(self, partitions: int) -> TopicSpec:
58    def with_partitions(self, partitions: int) -> "TopicSpec":
59        """Set the specified partitions"""
60        if partitions < 0:
61            raise ValueError("Partitions must be a positive integer")
62        return replace(self, partitions=partitions)

Set the specified partitions

def with_replications(self, replications: int) -> TopicSpec:
64    def with_replications(self, replications: int) -> "TopicSpec":
65        """Set the specified replication factor"""
66        if replications < 0:
67            raise ValueError("Replication factor must be a positive integer")
68        return replace(self, replications=replications)

Set the specified replication factor

def with_ignore_rack(self, ignore: bool = True) -> TopicSpec:
70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
71        """Set the rack ignore setting"""
72        return replace(self, ignore_rack=ignore)

Set the rack ignore setting

def with_compression( self, compression: CompressionType) -> TopicSpec:
74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
75        """Set the specified compression type"""
76        return replace(self, compression_type=compression)

Set the specified compression type

def with_retention_time(self, retention_time: Union[str, int]) -> TopicSpec:
78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
79        """Set the specified retention time"""
80
81        if isinstance(retention_time, int):
82            return replace(self, retention_time=retention_time)
83
84        parsed_time = parse_timespan(retention_time)
85        return replace(self, retention_time=parsed_time)

Set the specified retention time

def with_segment_size(self, size: Union[str, int]) -> TopicSpec:
87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
88        """Set the specified segment size"""
89        if isinstance(size, int):
90            return replace(self, segment_size=size)
91
92        parsed_size = parse_byte_size(size)
93        return replace(self, segment_size=parsed_size)

Set the specified segment size

def with_max_partition_size(self, size: Union[str, int]) -> TopicSpec:
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)

Set the specified max partition size

def as_system_topic(self, is_system: bool = True) -> TopicSpec:
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)

Set the topic as an internal system topic

def build(self) -> TopicSpec:
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec

Build the TopicSpec based on the current configuration

class CompressionType(enum.Enum):
17class CompressionType(Enum):
18    NONE = "none"
19    GZIP = "gzip"
20    SNAPPY = "snappy"
21    LZ4 = "lz4"
22    ANY = "any"
23    ZSTD = "zstd"
NONE = <CompressionType.NONE: 'none'>
GZIP = <CompressionType.GZIP: 'gzip'>
SNAPPY = <CompressionType.SNAPPY: 'snappy'>
LZ4 = <CompressionType.LZ4: 'lz4'>
ANY = <CompressionType.ANY: 'any'>
ZSTD = <CompressionType.ZSTD: 'zstd'>
Inherited Members
enum.Enum
name
value
class TopicMode(enum.Enum):
11class TopicMode(Enum):
12    MIRROR = "mirror"
13    ASSIGNED = "assigned"
14    COMPUTED = "computed"
MIRROR = <TopicMode.MIRROR: 'mirror'>
ASSIGNED = <TopicMode.ASSIGNED: 'assigned'>
COMPUTED = <TopicMode.COMPUTED: 'computed'>
Inherited Members
enum.Enum
name
value
class Record:
 9class Record:
10    """The individual record for a given stream."""
11
12    _inner: _Record
13
14    def __init__(self, inner: _Record):
15        self._inner = inner
16
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()
20
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()
24
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()
28
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()
32
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()
36
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

The individual record for a given stream.

Record(inner: Record)
14    def __init__(self, inner: _Record):
15        self._inner = inner
def offset(self) -> int:
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()

The UTF-8 decoded key for this record.

def timestamp(self) -> int:
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

Timestamp of this record.

class RecordMetadata:
42class RecordMetadata:
43    """Metadata of a record send to a topic."""
44
45    _inner: _RecordMetadata
46
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
49
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()
53
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Metadata of a record send to a topic.

RecordMetadata(inner: RecordMetadata)
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
def offset(self) -> int:
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()

Return the offset of the sent record in the topic/partition.

def partition_id(self) -> int:
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Return the partition index the record was sent to.

class Offset:

Describes the location of a record stored in a Fluvio partition.

A topic is composed of one or more partitions.

def absolute(index):

Specifies an absolute offset with the given index within the partition

def beginning():

Specifies an offset starting at the beginning of the partition

def from_beginning(offset):

Specifies an offset relative to the beginning of the partition

def end():

Specifies an offset relative to the beginning of the partition

def from_end(offset):

Specifies an offset relative to the beginning of the partition

class TopicProducer:
523class TopicProducer:
524    """An interface for producing events to a particular topic.
525
526    A `TopicProducer` allows you to send events to the specific topic it was
527    initialized for. Once you have a `TopicProducer`, you can send events to
528    the topic, choosing which partition each event should be delivered to.
529    """
530
531    _inner: _TopicProducer
532
533    def __init__(self, inner: _TopicProducer):
534        self._inner = inner
535
536    def send_string(self, buf: str) -> ProduceOutput:
537        """Sends a string to this producer’s topic"""
538        return self.send([], buf.encode("utf-8"))
539
540    async def async_send_string(self, buf: str) -> ProduceOutput:
541        """Sends a string to this producer’s topic"""
542        return await self.async_send([], buf.encode("utf-8"))
543
544    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
545        """
546        Sends a key/value record to this producer's Topic.
547
548        The partition that the record will be sent to is derived from the Key.
549        """
550        return ProduceOutput(self._inner.send(key, value))
551
552    async def async_send(
553        self, key: typing.List[int], value: typing.List[int]
554    ) -> ProduceOutput:
555        """
556        Async sends a key/value record to this producer's Topic.
557
558        The partition that the record will be sent to is derived from the Key.
559        """
560        produce_output = await self._inner.async_send(key, value)
561        return ProduceOutput(produce_output)
562
563    def flush(self) -> None:
564        """
565        Send all the queued records in the producer batches.
566        """
567        return self._inner.flush()
568
569    async def async_flush(self) -> None:
570        """
571        Async send all the queued records in the producer batches.
572        """
573        await self._inner.async_flush()
574
575    def send_all(
576        self, records: typing.List[typing.Tuple[bytes, bytes]]
577    ) -> typing.List[ProduceOutput]:
578        """
579        Sends a list of key/value records as a batch to this producer's Topic.
580        :param records: The list of records to send
581        """
582        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
583        return [
584            ProduceOutput(output_inner)
585            for output_inner in self._inner.send_all(records_inner)
586        ]
587
588    async def async_send_all(
589        self, records: typing.List[typing.Tuple[bytes, bytes]]
590    ) -> typing.List[ProduceOutput]:
591        """
592        Async sends a list of key/value records as a batch to this producer's Topic.
593        :param records: The list of records to send
594        """
595        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
596        return [
597            ProduceOutput(output_inner)
598            for output_inner in await self._inner.async_send_all(records_inner)
599        ]

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: TopicProducer)
533    def __init__(self, inner: _TopicProducer):
534        self._inner = inner
def send_string(self, buf: str) -> ProduceOutput:
536    def send_string(self, buf: str) -> ProduceOutput:
537        """Sends a string to this producer’s topic"""
538        return self.send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

async def async_send_string(self, buf: str) -> ProduceOutput:
540    async def async_send_string(self, buf: str) -> ProduceOutput:
541        """Sends a string to this producer’s topic"""
542        return await self.async_send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> ProduceOutput:
544    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
545        """
546        Sends a key/value record to this producer's Topic.
547
548        The partition that the record will be sent to is derived from the Key.
549        """
550        return ProduceOutput(self._inner.send(key, value))

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

async def async_send(self, key: List[int], value: List[int]) -> ProduceOutput:
552    async def async_send(
553        self, key: typing.List[int], value: typing.List[int]
554    ) -> ProduceOutput:
555        """
556        Async sends a key/value record to this producer's Topic.
557
558        The partition that the record will be sent to is derived from the Key.
559        """
560        produce_output = await self._inner.async_send(key, value)
561        return ProduceOutput(produce_output)

Async sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
563    def flush(self) -> None:
564        """
565        Send all the queued records in the producer batches.
566        """
567        return self._inner.flush()

Send all the queued records in the producer batches.

async def async_flush(self) -> None:
569    async def async_flush(self) -> None:
570        """
571        Async send all the queued records in the producer batches.
572        """
573        await self._inner.async_flush()

Async send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
575    def send_all(
576        self, records: typing.List[typing.Tuple[bytes, bytes]]
577    ) -> typing.List[ProduceOutput]:
578        """
579        Sends a list of key/value records as a batch to this producer's Topic.
580        :param records: The list of records to send
581        """
582        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
583        return [
584            ProduceOutput(output_inner)
585            for output_inner in self._inner.send_all(records_inner)
586        ]

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
async def async_send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
588    async def async_send_all(
589        self, records: typing.List[typing.Tuple[bytes, bytes]]
590    ) -> typing.List[ProduceOutput]:
591        """
592        Async sends a list of key/value records as a batch to this producer's Topic.
593        :param records: The list of records to send
594        """
595        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
596        return [
597            ProduceOutput(output_inner)
598            for output_inner in await self._inner.async_send_all(records_inner)
599        ]

Async sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class ProduceOutput:
159class ProduceOutput:
160    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
161
162    _inner: _ProduceOutput
163
164    def __init__(self, inner: _ProduceOutput) -> None:
165        self._inner = inner
166
167    def wait(self) -> typing.Optional[RecordMetadata]:
168        """Wait for the record metadata.
169
170        This is a blocking call and may only return a `RecordMetadata` once.
171        Any subsequent call to `wait` will return a `None` value.
172        Errors will be raised as exceptions of type `FluvioError`.
173        """
174        res = self._inner.wait()
175        if res is None:
176            return None
177        return RecordMetadata(res)
178
179    async def async_wait(self) -> typing.Optional[RecordMetadata]:
180        """Asynchronously wait for the record metadata.
181
182        This may only return a `RecordMetadata` once.
183        Any subsequent call to `wait` will return a `None` value.
184        """
185        return await self._inner.async_wait()

Returned by of TopicProducer.send call allowing access to sent record metadata.

ProduceOutput(inner: ProduceOutput)
164    def __init__(self, inner: _ProduceOutput) -> None:
165        self._inner = inner
def wait(self) -> Optional[RecordMetadata]:
167    def wait(self) -> typing.Optional[RecordMetadata]:
168        """Wait for the record metadata.
169
170        This is a blocking call and may only return a `RecordMetadata` once.
171        Any subsequent call to `wait` will return a `None` value.
172        Errors will be raised as exceptions of type `FluvioError`.
173        """
174        res = self._inner.wait()
175        if res is None:
176            return None
177        return RecordMetadata(res)

Wait for the record metadata.

This is a blocking call and may only return a RecordMetadata once. Any subsequent call to wait will return a None value. Errors will be raised as exceptions of type FluvioError.

async def async_wait(self) -> Optional[RecordMetadata]:
179    async def async_wait(self) -> typing.Optional[RecordMetadata]:
180        """Asynchronously wait for the record metadata.
181
182        This may only return a `RecordMetadata` once.
183        Any subsequent call to `wait` will return a `None` value.
184        """
185        return await self._inner.async_wait()

Asynchronously wait for the record metadata.

This may only return a RecordMetadata once. Any subsequent call to wait will return a None value.

class ConsumerConfigExt:
class ConsumerConfigExtBuilder:
def disable_continuous(self, /, val=True):

the fluvio client should disconnect after fetching the specified records

def max_bytes(self, /, max_bytes):
def offset_start(self, /, offset):
def offset_consumer(self, /, id):
def offset_strategy(self, /, strategy=Ellipsis):
def partition(self, /, partition):
def topic(self, /, topic):
def build(self, /):
class ConsumerConfig:
201class ConsumerConfig:
202    _inner: _ConsumerConfig
203
204    def __init__(self):
205        self._inner = _ConsumerConfig()
206
207    def disable_continuous(self, val: bool = True):
208        """Disable continuous mode after fetching specified records"""
209        self._inner.disable_continuous(val)
210
211    def smartmodule(
212        self,
213        name: str = None,
214        path: str = None,
215        kind: SmartModuleKind = None,
216        params: typing.Dict[str, str] = None,
217        aggregate: typing.List[bytes] = None,
218    ):
219        """
220        This is a method for adding a smartmodule to a consumer config either
221        using a `name` of a `SmartModule` or a `path` to a wasm binary.
222
223        Args:
224
225            name: str
226            path: str
227            kind: SmartModuleKind
228            params: Dict[str, str]
229            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
230
231        Raises:
232            "Require either a path or a name for a smartmodule."
233            "Only specify one of path or name not both."
234
235        Returns:
236
237            None
238        """
239
240        if kind is not None:
241            kind = kind.value
242
243        if path is None and name is None:
244            raise Exception("Require either a path or a name for a smartmodule.")
245
246        if path is not None and name is not None:
247            raise Exception("Only specify one of path or name not both.")
248
249        params = {} if params is None else params
250        param_keys = [x for x in params.keys()]
251        param_vals = [x for x in params.values()]
252
253        self._inner.smartmodule(
254            name,
255            path,
256            kind,
257            param_keys,
258            param_vals,
259            aggregate,
260            # These arguments are for Join stuff but that's not implemented on
261            # the python side yet
262            None,
263            None,
264            None,
265            None,
266        )
def disable_continuous(self, val: bool = True):
207    def disable_continuous(self, val: bool = True):
208        """Disable continuous mode after fetching specified records"""
209        self._inner.disable_continuous(val)

Disable continuous mode after fetching specified records

def smartmodule( self, name: str = None, path: str = None, kind: SmartModuleKind = None, params: Dict[str, str] = None, aggregate: List[bytes] = None):
211    def smartmodule(
212        self,
213        name: str = None,
214        path: str = None,
215        kind: SmartModuleKind = None,
216        params: typing.Dict[str, str] = None,
217        aggregate: typing.List[bytes] = None,
218    ):
219        """
220        This is a method for adding a smartmodule to a consumer config either
221        using a `name` of a `SmartModule` or a `path` to a wasm binary.
222
223        Args:
224
225            name: str
226            path: str
227            kind: SmartModuleKind
228            params: Dict[str, str]
229            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
230
231        Raises:
232            "Require either a path or a name for a smartmodule."
233            "Only specify one of path or name not both."
234
235        Returns:
236
237            None
238        """
239
240        if kind is not None:
241            kind = kind.value
242
243        if path is None and name is None:
244            raise Exception("Require either a path or a name for a smartmodule.")
245
246        if path is not None and name is not None:
247            raise Exception("Only specify one of path or name not both.")
248
249        params = {} if params is None else params
250        param_keys = [x for x in params.keys()]
251        param_vals = [x for x in params.values()]
252
253        self._inner.smartmodule(
254            name,
255            path,
256            kind,
257            param_keys,
258            param_vals,
259            aggregate,
260            # These arguments are for Join stuff but that's not implemented on
261            # the python side yet
262            None,
263            None,
264            None,
265            None,
266        )

This is a method for adding a smartmodule to a consumer config either using a name of a SmartModule or a path to a wasm binary.

Args:

name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule

Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."

Returns:

None
class PartitionConsumer:
269class PartitionConsumer:
270    """
271    An interface for consuming events from a particular partition
272
273    There are two ways to consume events: by "fetching" events and by
274    "streaming" events. Fetching involves specifying a range of events that you
275    want to consume via their Offset. A fetch is a sort of one-time batch
276    operation: you’ll receive all of the events in your range all at once. When
277    you consume events via Streaming, you specify a starting Offset and receive
278    an object that will continuously yield new events as they arrive.
279    """
280
281    _inner: _PartitionConsumer
282
283    def __init__(self, inner: _PartitionConsumer):
284        self._inner = inner
285
286    def stream(self, offset: Offset) -> typing.Iterator[Record]:
287        """
288        Continuously streams events from a particular offset in the consumer’s
289        partition. This returns a `Iterator[Record]` which is an
290        iterator.
291
292        Streaming is one of the two ways to consume events in Fluvio. It is a
293        continuous request for new records arriving in a partition, beginning
294        at a particular offset. You specify the starting point of the stream
295        using an Offset and periodically receive events, either individually or
296        in batches.
297        """
298        return self._generator(self._inner.stream(offset))
299
300    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
301        """
302        Continuously streams events from a particular offset in the consumer’s
303        partition. This returns a `AsyncIterator[Record]` which is an
304        iterator.
305
306        Streaming is one of the two ways to consume events in Fluvio. It is a
307        continuous request for new records arriving in a partition, beginning
308        at a particular offset. You specify the starting point of the stream
309        using an Offset and periodically receive events, either individually or
310        in batches.
311        """
312        return self._async_generator(await self._inner.async_stream(offset))
313
314    def stream_with_config(
315        self, offset: Offset, config: ConsumerConfig
316    ) -> typing.Iterator[Record]:
317        """
318        Continuously streams events from a particular offset with a SmartModule
319        WASM module in the consumer’s partition. This returns a
320        `Iterator[Record]` which is an iterator.
321
322        Streaming is one of the two ways to consume events in Fluvio. It is a
323        continuous request for new records arriving in a partition, beginning
324        at a particular offset. You specify the starting point of the stream
325        using an Offset and periodically receive events, either individually or
326        in batches.
327
328        Args:
329            offset: Offset
330            wasm_module_path: str - The absolute path to the WASM file
331
332        Example:
333            import os
334
335            wmp = os.path.abspath("somefilter.wasm")
336            config = ConsumerConfig()
337            config.smartmodule(path=wmp)
338            for i in consumer.stream_with_config(Offset.beginning(), config):
339                # do something with i
340
341        Returns:
342            `Iterator[Record]`
343
344        """
345        return self._generator(self._inner.stream_with_config(offset, config._inner))
346
347    async def async_stream_with_config(
348        self, offset: Offset, config: ConsumerConfig
349    ) -> typing.AsyncIterator[Record]:
350        """
351        Continuously streams events from a particular offset with a SmartModule
352        WASM module in the consumer’s partition. This returns a
353        `AsyncIterator[Record]` which is an async iterator.
354
355        Streaming is one of the two ways to consume events in Fluvio. It is a
356        continuous request for new records arriving in a partition, beginning
357        at a particular offset. You specify the starting point of the stream
358        using an Offset and periodically receive events, either individually or
359        in batches.
360
361        Args:
362            offset: Offset
363            wasm_module_path: str - The absolute path to the WASM file
364
365        Example:
366            import os
367
368            wmp = os.path.abspath("somefilter.wasm")
369            config = ConsumerConfig()
370            config.smartmodule(path=wmp)
371            `AsyncIterator[Record]`
372
373        """
374        return self._async_generator(
375            await self._inner.async_stream_with_config(offset, config._inner)
376        )
377
378    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
379        item = stream.next()
380        while item is not None:
381            yield Record(item)
382            item = stream.next()
383
384    async def _async_generator(
385        self, astream: _AsyncPartitionConsumerStream
386    ) -> typing.AsyncIterator[Record]:
387        item = await astream.async_next()
388        while item is not None:
389            yield Record(item)
390            item = await astream.async_next()

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: PartitionConsumer)
283    def __init__(self, inner: _PartitionConsumer):
284        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
286    def stream(self, offset: Offset) -> typing.Iterator[Record]:
287        """
288        Continuously streams events from a particular offset in the consumer’s
289        partition. This returns a `Iterator[Record]` which is an
290        iterator.
291
292        Streaming is one of the two ways to consume events in Fluvio. It is a
293        continuous request for new records arriving in a partition, beginning
294        at a particular offset. You specify the starting point of the stream
295        using an Offset and periodically receive events, either individually or
296        in batches.
297        """
298        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
300    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
301        """
302        Continuously streams events from a particular offset in the consumer’s
303        partition. This returns a `AsyncIterator[Record]` which is an
304        iterator.
305
306        Streaming is one of the two ways to consume events in Fluvio. It is a
307        continuous request for new records arriving in a partition, beginning
308        at a particular offset. You specify the starting point of the stream
309        using an Offset and periodically receive events, either individually or
310        in batches.
311        """
312        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a AsyncIterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
314    def stream_with_config(
315        self, offset: Offset, config: ConsumerConfig
316    ) -> typing.Iterator[Record]:
317        """
318        Continuously streams events from a particular offset with a SmartModule
319        WASM module in the consumer’s partition. This returns a
320        `Iterator[Record]` which is an iterator.
321
322        Streaming is one of the two ways to consume events in Fluvio. It is a
323        continuous request for new records arriving in a partition, beginning
324        at a particular offset. You specify the starting point of the stream
325        using an Offset and periodically receive events, either individually or
326        in batches.
327
328        Args:
329            offset: Offset
330            wasm_module_path: str - The absolute path to the WASM file
331
332        Example:
333            import os
334
335            wmp = os.path.abspath("somefilter.wasm")
336            config = ConsumerConfig()
337            config.smartmodule(path=wmp)
338            for i in consumer.stream_with_config(Offset.beginning(), config):
339                # do something with i
340
341        Returns:
342            `Iterator[Record]`
343
344        """
345        return self._generator(self._inner.stream_with_config(offset, config._inner))

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
347    async def async_stream_with_config(
348        self, offset: Offset, config: ConsumerConfig
349    ) -> typing.AsyncIterator[Record]:
350        """
351        Continuously streams events from a particular offset with a SmartModule
352        WASM module in the consumer’s partition. This returns a
353        `AsyncIterator[Record]` which is an async iterator.
354
355        Streaming is one of the two ways to consume events in Fluvio. It is a
356        continuous request for new records arriving in a partition, beginning
357        at a particular offset. You specify the starting point of the stream
358        using an Offset and periodically receive events, either individually or
359        in batches.
360
361        Args:
362            offset: Offset
363            wasm_module_path: str - The absolute path to the WASM file
364
365        Example:
366            import os
367
368            wmp = os.path.abspath("somefilter.wasm")
369            config = ConsumerConfig()
370            config.smartmodule(path=wmp)
371            `AsyncIterator[Record]`
372
373        """
374        return self._async_generator(
375            await self._inner.async_stream_with_config(offset, config._inner)
376        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
class MultiplePartitionConsumer:
393class MultiplePartitionConsumer:
394    """
395    An interface for consuming events from multiple partitions
396
397    There are two ways to consume events: by "fetching" events and by
398    "streaming" events. Fetching involves specifying a range of events that you
399    want to consume via their Offset. A fetch is a sort of one-time batch
400    operation: you’ll receive all of the events in your range all at once. When
401    you consume events via Streaming, you specify a starting Offset and receive
402    an object that will continuously yield new events as they arrive.
403    """
404
405    _inner: _MultiplePartitionConsumer
406
407    def __init__(self, inner: _MultiplePartitionConsumer):
408        self._inner = inner
409
410    def stream(self, offset: Offset) -> typing.Iterator[Record]:
411        """
412        Continuously streams events from a particular offset in the consumer’s
413        partitions. This returns a `Iterator[Record]` which is an
414        iterator.
415
416        Streaming is one of the two ways to consume events in Fluvio. It is a
417        continuous request for new records arriving in a partition, beginning
418        at a particular offset. You specify the starting point of the stream
419        using an Offset and periodically receive events, either individually or
420        in batches.
421        """
422        return self._generator(self._inner.stream(offset))
423
424    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
425        """
426        Continuously streams events from a particular offset in the consumer’s
427        partitions. This returns a `AsyncIterator[Record]` which is an
428        async iterator.
429
430        Streaming is one of the two ways to consume events in Fluvio. It is a
431        continuous request for new records arriving in a partition, beginning
432        at a particular offset. You specify the starting point of the stream
433        using an Offset and periodically receive events, either individually or
434        in batches.
435        """
436        return self._async_generator(await self._inner.async_stream(offset))
437
438    def stream_with_config(
439        self, offset: Offset, config: ConsumerConfig
440    ) -> typing.Iterator[Record]:
441        """
442        Continuously streams events from a particular offset with a SmartModule
443        WASM module in the consumer’s partitions. This returns a
444        `Iterator[Record]` which is an iterator.
445
446        Streaming is one of the two ways to consume events in Fluvio. It is a
447        continuous request for new records arriving in a partition, beginning
448        at a particular offset. You specify the starting point of the stream
449        using an Offset and periodically receive events, either individually or
450        in batches.
451
452        Args:
453            offset: Offset
454            wasm_module_path: str - The absolute path to the WASM file
455
456        Example:
457            import os
458
459            wmp = os.path.abspath("somefilter.wasm")
460            config = ConsumerConfig()
461            config.smartmodule(path=wmp)
462            for i in consumer.stream_with_config(Offset.beginning(), config):
463                # do something with i
464
465        Returns:
466            `Iterator[Record]`
467
468        """
469        return self._generator(
470            self._inner.stream_with_config(offset._inner, config._inner)
471        )
472
473    async def async_stream_with_config(
474        self, offset: Offset, config: ConsumerConfig
475    ) -> typing.AsyncIterator[Record]:
476        """
477        Continuously streams events from a particular offset with a SmartModule
478        WASM module in the consumer’s partitions. This returns a
479        `AsyncIterator[Record]` which is an async iterator.
480
481        Streaming is one of the two ways to consume events in Fluvio. It is a
482        continuous request for new records arriving in a partition, beginning
483        at a particular offset. You specify the starting point of the stream
484        using an Offset and periodically receive events, either individually or
485        in batches.
486
487        Args:
488            offset: Offset
489            wasm_module_path: str - The absolute path to the WASM file
490
491        Example:
492            import os
493
494            wmp = os.path.abspath("somefilter.wasm")
495            config = ConsumerConfig()
496            config.smartmodule(path=wmp)
497            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
498                # do something with i
499
500        Returns:
501            `AsyncIterator[Record]`
502
503        """
504        return self._async_generator(
505            await self._inner.async_stream_with_config(offset, config._inner)
506        )
507
508    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
509        item = stream.next()
510        while item is not None:
511            yield Record(item)
512            item = stream.next()
513
514    async def _async_generator(
515        self, astream: _AsyncPartitionConsumerStream
516    ) -> typing.AsyncIterator[Record]:
517        item = await astream.async_next()
518        while item is not None:
519            yield Record(item)
520            item = await astream.async_next()

An interface for consuming events from multiple partitions

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

MultiplePartitionConsumer(inner: MultiplePartitionConsumer)
407    def __init__(self, inner: _MultiplePartitionConsumer):
408        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
410    def stream(self, offset: Offset) -> typing.Iterator[Record]:
411        """
412        Continuously streams events from a particular offset in the consumer’s
413        partitions. This returns a `Iterator[Record]` which is an
414        iterator.
415
416        Streaming is one of the two ways to consume events in Fluvio. It is a
417        continuous request for new records arriving in a partition, beginning
418        at a particular offset. You specify the starting point of the stream
419        using an Offset and periodically receive events, either individually or
420        in batches.
421        """
422        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
424    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
425        """
426        Continuously streams events from a particular offset in the consumer’s
427        partitions. This returns a `AsyncIterator[Record]` which is an
428        async iterator.
429
430        Streaming is one of the two ways to consume events in Fluvio. It is a
431        continuous request for new records arriving in a partition, beginning
432        at a particular offset. You specify the starting point of the stream
433        using an Offset and periodically receive events, either individually or
434        in batches.
435        """
436        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
438    def stream_with_config(
439        self, offset: Offset, config: ConsumerConfig
440    ) -> typing.Iterator[Record]:
441        """
442        Continuously streams events from a particular offset with a SmartModule
443        WASM module in the consumer’s partitions. This returns a
444        `Iterator[Record]` which is an iterator.
445
446        Streaming is one of the two ways to consume events in Fluvio. It is a
447        continuous request for new records arriving in a partition, beginning
448        at a particular offset. You specify the starting point of the stream
449        using an Offset and periodically receive events, either individually or
450        in batches.
451
452        Args:
453            offset: Offset
454            wasm_module_path: str - The absolute path to the WASM file
455
456        Example:
457            import os
458
459            wmp = os.path.abspath("somefilter.wasm")
460            config = ConsumerConfig()
461            config.smartmodule(path=wmp)
462            for i in consumer.stream_with_config(Offset.beginning(), config):
463                # do something with i
464
465        Returns:
466            `Iterator[Record]`
467
468        """
469        return self._generator(
470            self._inner.stream_with_config(offset._inner, config._inner)
471        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
473    async def async_stream_with_config(
474        self, offset: Offset, config: ConsumerConfig
475    ) -> typing.AsyncIterator[Record]:
476        """
477        Continuously streams events from a particular offset with a SmartModule
478        WASM module in the consumer’s partitions. This returns a
479        `AsyncIterator[Record]` which is an async iterator.
480
481        Streaming is one of the two ways to consume events in Fluvio. It is a
482        continuous request for new records arriving in a partition, beginning
483        at a particular offset. You specify the starting point of the stream
484        using an Offset and periodically receive events, either individually or
485        in batches.
486
487        Args:
488            offset: Offset
489            wasm_module_path: str - The absolute path to the WASM file
490
491        Example:
492            import os
493
494            wmp = os.path.abspath("somefilter.wasm")
495            config = ConsumerConfig()
496            config.smartmodule(path=wmp)
497            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
498                # do something with i
499
500        Returns:
501            `AsyncIterator[Record]`
502
503        """
504        return self._async_generator(
505            await self._inner.async_stream_with_config(offset, config._inner)
506        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: AsyncIterator[Record]

class CommonCreateRequest:
29class CommonCreateRequest:
30    _inner: _CommonCreateRequest
31
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
34
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
CommonCreateRequest(inner: CommonCreateRequest)
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
@classmethod
def new(cls, name: str, dry_run: bool, timeout: int):
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
class SmartModuleSpec:
84class SmartModuleSpec:
85    _inner: _SmartModuleSpec
86
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
89
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
SmartModuleSpec(inner: SmartModuleSpec)
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
@classmethod
def new(cls, path: str):
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
class PartitionMap:
18class PartitionMap:
19    _inner: _PartitionMap
20
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
23
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
PartitionMap(inner: PartitionMap)
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
@classmethod
def new(cls, partition: int, replicas: List[int]):
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
class MessageMetadataTopicSpec:
50class MessageMetadataTopicSpec:
51    _inner: _MessageMetadataTopicSpec
52
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
55
56    def is_update(self) -> bool:
57        return self._inner.is_update()
58
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
61
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
MessageMetadataTopicSpec(inner: MessageMetadataTopicSpec)
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
def is_update(self) -> bool:
56    def is_update(self) -> bool:
57        return self._inner.is_update()
def is_delete(self) -> bool:
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
def metadata_topic_spec(self) -> MetadataTopicSpec:
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
class MetadataPartitionSpec:
142class MetadataPartitionSpec:
143    _inner: _MetadataPartitionSpec
144
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
147
148    def name(self) -> str:
149        return self._inner.name()
MetadataPartitionSpec(inner: MetadataPartitionSpec)
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
def name(self) -> str:
148    def name(self) -> str:
149        return self._inner.name()
class MetadataTopicSpec:
40class MetadataTopicSpec:
41    _inner: _MetadataTopicSpec
42
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
45
46    def name(self) -> str:
47        return self._inner.name()
MetadataTopicSpec(inner: MetadataTopicSpec)
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
def name(self) -> str:
46    def name(self) -> str:
47        return self._inner.name()
class MetaUpdateTopicSpec:
66class MetaUpdateTopicSpec:
67    _inner: _MetaUpdateTopicSpec
68
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
71
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
75
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
79
80    def epoch(self) -> int:
81        return self._inner.epoch()
MetaUpdateTopicSpec(inner: MetaUpdateTopicSpec)
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
def all(self) -> List[MetadataTopicSpec]:
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
def changes(self) -> List[MessageMetadataTopicSpec]:
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
def epoch(self) -> int:
80    def epoch(self) -> int:
81        return self._inner.epoch()
class PartitionSelectionStrategy:
602class PartitionSelectionStrategy:
603    """Stragegy to select partitions"""
604
605    _inner: _PartitionSelectionStrategy
606
607    def __init__(self, inner: _FluvioConfig):
608        self._inner = inner
609
610    @classmethod
611    def with_all(cls, topic: str):
612        """select all partitions of one topic"""
613        return cls(_PartitionSelectionStrategy.with_all(topic))
614
615    @classmethod
616    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
617        """select multiple partitions of multiple topics"""
618        return cls(_PartitionSelectionStrategy.with_multiple(topic))

Stragegy to select partitions

PartitionSelectionStrategy(inner: FluvioConfig)
607    def __init__(self, inner: _FluvioConfig):
608        self._inner = inner
@classmethod
def with_all(cls, topic: str):
610    @classmethod
611    def with_all(cls, topic: str):
612        """select all partitions of one topic"""
613        return cls(_PartitionSelectionStrategy.with_all(topic))

select all partitions of one topic

@classmethod
def with_multiple(cls, topic: List[Tuple[str, int]]):
615    @classmethod
616    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
617        """select multiple partitions of multiple topics"""
618        return cls(_PartitionSelectionStrategy.with_multiple(topic))

select multiple partitions of multiple topics

class SmartModuleKind(enum.Enum):
188class SmartModuleKind(Enum):
189    """
190    Use of this is to explicitly set the kind of a smartmodule. Not required
191    but needed for legacy SmartModules.
192    """
193
194    Filter = _SmartModuleKind.Filter
195    Map = _SmartModuleKind.Map
196    ArrayMap = _SmartModuleKind.ArrayMap
197    FilterMap = _SmartModuleKind.FilterMap
198    Aggregate = _SmartModuleKind.Aggregate

Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.

Inherited Members
enum.Enum
name
value