fluvio

The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.

This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.

Creating a topic with default settings is as simple as:

fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")

Or just create a topic with custom settings:

import fluvio

fluvio_admin = FluvioAdmin.connect()
topic_spec = (
    TopicSpec.create()
    .with_retention_time("1h")
    .with_segment_size("10M")
    .build()
)
fluvio_admin.create_topic("a_topic", topic_spec)

Producing data to a topic in a Fluvio cluster is as simple as:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
producer = fluvio.topic_producer(topic)

for i in range(10):
    producer.send_string("Hello %s " % i)

Consuming is also simple:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

Also you can consume usign offset management:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

stream.offset_commit()
stream.offset_flush()

For more examples see the integration tests in the fluvio-python repository.

test_produce.py test_consumer.py

  1"""
  2The __fluvio__ python module provides an extension for working with the Fluvio
  3streaming platform.
  4
  5This module builds on top of the Fluvio Client Rust Crate and provides a
  6pythonic access to the API.
  7
  8
  9Creating a topic with default settings is as simple as:
 10
 11```python
 12fluvio_admin = FluvioAdmin.connect()
 13fluvio_admin.create_topic("a_topic")
 14```
 15
 16Or just create a topic with custom settings:
 17
 18```python
 19import fluvio
 20
 21fluvio_admin = FluvioAdmin.connect()
 22topic_spec = (
 23    TopicSpec.create()
 24    .with_retention_time("1h")
 25    .with_segment_size("10M")
 26    .build()
 27)
 28fluvio_admin.create_topic("a_topic", topic_spec)
 29```
 30
 31Producing data to a topic in a Fluvio cluster is as simple as:
 32
 33```python
 34import fluvio
 35
 36fluvio = Fluvio.connect()
 37
 38topic = "a_topic"
 39producer = fluvio.topic_producer(topic)
 40
 41for i in range(10):
 42    producer.send_string("Hello %s " % i)
 43```
 44
 45Consuming is also simple:
 46
 47```python
 48import fluvio
 49
 50fluvio = Fluvio.connect()
 51
 52topic = "a_topic"
 53builder = ConsumerConfigExtBuilder(topic)
 54config = builder.build()
 55stream = fluvio.consumer_with_config(config)
 56
 57num_items = 2
 58records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
 59```
 60
 61Also you can consume usign offset management:
 62
 63```python
 64import fluvio
 65
 66fluvio = Fluvio.connect()
 67
 68topic = "a_topic"
 69builder = ConsumerConfigExtBuilder(topic)
 70builder.offset_start(Offset.beginning())
 71builder.offset_strategy(OffsetManagementStrategy.MANUAL)
 72builder.offset_consumer("a-consumer")
 73config = builder.build()
 74stream = fluvio.consumer_with_config(config)
 75
 76num_items = 2
 77records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
 78
 79stream.offset_commit()
 80stream.offset_flush()
 81```
 82
 83For more examples see the integration tests in the fluvio-python repository.
 84
 85[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py)
 86[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py)
 87
 88"""
 89
 90import typing
 91from enum import Enum
 92
 93from ._fluvio_python import (
 94    Fluvio as _Fluvio,
 95    FluvioConfig as _FluvioConfig,
 96    Offset,
 97    FluvioAdmin as _FluvioAdmin,
 98    # consumer types
 99    ConsumerConfig as _ConsumerConfig,
100    ConsumerConfigExt,
101    ConsumerConfigExtBuilder,
102    PartitionConsumer as _PartitionConsumer,
103    MultiplePartitionConsumer as _MultiplePartitionConsumer,
104    PartitionSelectionStrategy as _PartitionSelectionStrategy,
105    PartitionConsumerStream as _PartitionConsumerStream,
106    AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream,
107    OffsetManagementStrategy,
108    ConsumerOffset as _ConsumerOffset,
109    # producer types
110    TopicProducer as _TopicProducer,
111    ProduceOutput as _ProduceOutput,
112    ProducerBatchRecord as _ProducerBatchRecord,
113    # admin and misc types
114    SmartModuleKind as _SmartModuleKind,
115    TopicSpec as _TopicSpec,
116    WatchTopicStream as _WatchTopicStream,
117    WatchSmartModuleStream as _WatchSmartModuleStream,
118)
119
120from ._fluvio_python import Error as FluviorError  # noqa: F401
121
122from .topic import TopicSpec, CompressionType, TopicMode
123from .record import Record, RecordMetadata
124from .specs import (
125    # support types
126    CommonCreateRequest,
127    PartitionMap,
128    # specs
129    SmartModuleSpec,
130    MessageMetadataTopicSpec,
131    MetadataPartitionSpec,
132    MetadataSmartModuleSpec,
133    MetadataTopicSpec,
134    MetaUpdateSmartModuleSpec,
135    MetaUpdateTopicSpec,
136)
137
138# this structures the module a bit and allows pydoc to generate better docs
139# with better ordering of types and functions
140# inclusion in __all__, will pull the PyO3 rust inline docs into
141# the pdoc generated documentation
142__all__ = [
143    # top level types
144    "Fluvio",
145    "FluvioConfig",
146    "FluvioAdmin",
147    # topic
148    "TopicSpec",
149    "CompressionType",
150    "TopicMode",
151    # record
152    "Record",
153    "RecordMetadata",
154    "Offset",
155    # producer
156    "TopicProducer",
157    "ProduceOutput",
158    # consumer
159    "ConsumerConfigExt",
160    "ConsumerConfigExtBuilder",
161    "ConsumerConfig",
162    "PartitionConsumer",
163    "MultiplePartitionConsumer",
164    "OffsetManagementStrategy",
165    "ConsumerOffset",
166    # specs
167    "CommonCreateRequest",
168    "SmartModuleSpec",
169    "TopicSpec",
170    "PartitionMap",
171    "MessageMetadataTopicSpec",
172    "MetadataPartitionSpec",
173    "MetadataTopicSpec",
174    "MetaUpdateTopicSpec",
175    # Misc
176    "PartitionSelectionStrategy",
177    "SmartModuleKind",
178]
179
180
181class ProduceOutput:
182    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
183
184    _inner: _ProduceOutput
185
186    def __init__(self, inner: _ProduceOutput) -> None:
187        self._inner = inner
188
189    def wait(self) -> typing.Optional[RecordMetadata]:
190        """Wait for the record metadata.
191
192        This is a blocking call and may only return a `RecordMetadata` once.
193        Any subsequent call to `wait` will return a `None` value.
194        Errors will be raised as exceptions of type `FluvioError`.
195        """
196        res = self._inner.wait()
197        if res is None:
198            return None
199        return RecordMetadata(res)
200
201    async def async_wait(self) -> typing.Optional[RecordMetadata]:
202        """Asynchronously wait for the record metadata.
203
204        This may only return a `RecordMetadata` once.
205        Any subsequent call to `wait` will return a `None` value.
206        """
207        return await self._inner.async_wait()
208
209
210class SmartModuleKind(Enum):
211    """
212    Use of this is to explicitly set the kind of a smartmodule. Not required
213    but needed for legacy SmartModules.
214    """
215
216    Filter = _SmartModuleKind.Filter
217    Map = _SmartModuleKind.Map
218    ArrayMap = _SmartModuleKind.ArrayMap
219    FilterMap = _SmartModuleKind.FilterMap
220    Aggregate = _SmartModuleKind.Aggregate
221
222
223class ConsumerConfig:
224    _inner: _ConsumerConfig
225
226    def __init__(self):
227        self._inner = _ConsumerConfig()
228
229    def disable_continuous(self, val: bool = True):
230        """Disable continuous mode after fetching specified records"""
231        self._inner.disable_continuous(val)
232
233    def smartmodule(
234        self,
235        name: str = None,
236        path: str = None,
237        kind: SmartModuleKind = None,
238        params: typing.Dict[str, str] = None,
239        aggregate: typing.List[bytes] = None,
240    ):
241        """
242        This is a method for adding a smartmodule to a consumer config either
243        using a `name` of a `SmartModule` or a `path` to a wasm binary.
244
245        Args:
246
247            name: str
248            path: str
249            kind: SmartModuleKind
250            params: Dict[str, str]
251            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
252
253        Raises:
254            "Require either a path or a name for a smartmodule."
255            "Only specify one of path or name not both."
256
257        Returns:
258
259            None
260        """
261
262        if kind is not None:
263            kind = kind.value
264
265        if path is None and name is None:
266            raise Exception("Require either a path or a name for a smartmodule.")
267
268        if path is not None and name is not None:
269            raise Exception("Only specify one of path or name not both.")
270
271        params = {} if params is None else params
272        param_keys = [x for x in params.keys()]
273        param_vals = [x for x in params.values()]
274
275        self._inner.smartmodule(
276            name,
277            path,
278            kind,
279            param_keys,
280            param_vals,
281            aggregate,
282            # These arguments are for Join stuff but that's not implemented on
283            # the python side yet
284            None,
285            None,
286            None,
287            None,
288        )
289
290
291class ConsumerIterator:
292    def __init__(self, stream: _PartitionConsumerStream):
293        self.stream = stream
294
295    def __iter__(self):
296        return self
297
298    def __next__(self):
299        item = self.stream.next()
300        if item is None:
301            raise StopIteration
302        return Record(item)
303
304    def offset_commit(self):
305        self.stream.offset_commit()
306
307    def offset_flush(self):
308        self.stream.offset_flush()
309
310
311class PartitionConsumer:
312    """
313    An interface for consuming events from a particular partition
314
315    There are two ways to consume events: by "fetching" events and by
316    "streaming" events. Fetching involves specifying a range of events that you
317    want to consume via their Offset. A fetch is a sort of one-time batch
318    operation: you’ll receive all of the events in your range all at once. When
319    you consume events via Streaming, you specify a starting Offset and receive
320    an object that will continuously yield new events as they arrive.
321    """
322
323    _inner: _PartitionConsumer
324
325    def __init__(self, inner: _PartitionConsumer):
326        self._inner = inner
327
328    def stream(self, offset: Offset) -> typing.Iterator[Record]:
329        """
330        Continuously streams events from a particular offset in the consumer’s
331        partition. This returns a `Iterator[Record]` which is an
332        iterator.
333
334        Streaming is one of the two ways to consume events in Fluvio. It is a
335        continuous request for new records arriving in a partition, beginning
336        at a particular offset. You specify the starting point of the stream
337        using an Offset and periodically receive events, either individually or
338        in batches.
339        """
340        return self._generator(self._inner.stream(offset))
341
342    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
343        """
344        Continuously streams events from a particular offset in the consumer’s
345        partition. This returns a `AsyncIterator[Record]` which is an
346        iterator.
347
348        Streaming is one of the two ways to consume events in Fluvio. It is a
349        continuous request for new records arriving in a partition, beginning
350        at a particular offset. You specify the starting point of the stream
351        using an Offset and periodically receive events, either individually or
352        in batches.
353        """
354        return self._async_generator(await self._inner.async_stream(offset))
355
356    def stream_with_config(
357        self, offset: Offset, config: ConsumerConfig
358    ) -> typing.Iterator[Record]:
359        """
360        Continuously streams events from a particular offset with a SmartModule
361        WASM module in the consumer’s partition. This returns a
362        `Iterator[Record]` which is an iterator.
363
364        Streaming is one of the two ways to consume events in Fluvio. It is a
365        continuous request for new records arriving in a partition, beginning
366        at a particular offset. You specify the starting point of the stream
367        using an Offset and periodically receive events, either individually or
368        in batches.
369
370        Args:
371            offset: Offset
372            wasm_module_path: str - The absolute path to the WASM file
373
374        Example:
375            import os
376
377            wmp = os.path.abspath("somefilter.wasm")
378            config = ConsumerConfig()
379            config.smartmodule(path=wmp)
380            for i in consumer.stream_with_config(Offset.beginning(), config):
381                # do something with i
382
383        Returns:
384            `Iterator[Record]`
385
386        """
387        return self._generator(self._inner.stream_with_config(offset, config._inner))
388
389    async def async_stream_with_config(
390        self, offset: Offset, config: ConsumerConfig
391    ) -> typing.AsyncIterator[Record]:
392        """
393        Continuously streams events from a particular offset with a SmartModule
394        WASM module in the consumer’s partition. This returns a
395        `AsyncIterator[Record]` which is an async iterator.
396
397        Streaming is one of the two ways to consume events in Fluvio. It is a
398        continuous request for new records arriving in a partition, beginning
399        at a particular offset. You specify the starting point of the stream
400        using an Offset and periodically receive events, either individually or
401        in batches.
402
403        Args:
404            offset: Offset
405            wasm_module_path: str - The absolute path to the WASM file
406
407        Example:
408            import os
409
410            wmp = os.path.abspath("somefilter.wasm")
411            config = ConsumerConfig()
412            config.smartmodule(path=wmp)
413            `AsyncIterator[Record]`
414
415        """
416        return self._async_generator(
417            await self._inner.async_stream_with_config(offset, config._inner)
418        )
419
420    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
421        item = stream.next()
422        while item is not None:
423            yield Record(item)
424            item = stream.next()
425
426    async def _async_generator(
427        self, astream: _AsyncPartitionConsumerStream
428    ) -> typing.AsyncIterator[Record]:
429        item = await astream.async_next()
430        while item is not None:
431            yield Record(item)
432            item = await astream.async_next()
433
434
435class MultiplePartitionConsumer:
436    """
437    An interface for consuming events from multiple partitions
438
439    There are two ways to consume events: by "fetching" events and by
440    "streaming" events. Fetching involves specifying a range of events that you
441    want to consume via their Offset. A fetch is a sort of one-time batch
442    operation: you’ll receive all of the events in your range all at once. When
443    you consume events via Streaming, you specify a starting Offset and receive
444    an object that will continuously yield new events as they arrive.
445    """
446
447    _inner: _MultiplePartitionConsumer
448
449    def __init__(self, inner: _MultiplePartitionConsumer):
450        self._inner = inner
451
452    def stream(self, offset: Offset) -> typing.Iterator[Record]:
453        """
454        Continuously streams events from a particular offset in the consumer’s
455        partitions. This returns a `Iterator[Record]` which is an
456        iterator.
457
458        Streaming is one of the two ways to consume events in Fluvio. It is a
459        continuous request for new records arriving in a partition, beginning
460        at a particular offset. You specify the starting point of the stream
461        using an Offset and periodically receive events, either individually or
462        in batches.
463        """
464        return self._generator(self._inner.stream(offset))
465
466    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
467        """
468        Continuously streams events from a particular offset in the consumer’s
469        partitions. This returns a `AsyncIterator[Record]` which is an
470        async iterator.
471
472        Streaming is one of the two ways to consume events in Fluvio. It is a
473        continuous request for new records arriving in a partition, beginning
474        at a particular offset. You specify the starting point of the stream
475        using an Offset and periodically receive events, either individually or
476        in batches.
477        """
478        return self._async_generator(await self._inner.async_stream(offset))
479
480    def stream_with_config(
481        self, offset: Offset, config: ConsumerConfig
482    ) -> typing.Iterator[Record]:
483        """
484        Continuously streams events from a particular offset with a SmartModule
485        WASM module in the consumer’s partitions. This returns a
486        `Iterator[Record]` which is an iterator.
487
488        Streaming is one of the two ways to consume events in Fluvio. It is a
489        continuous request for new records arriving in a partition, beginning
490        at a particular offset. You specify the starting point of the stream
491        using an Offset and periodically receive events, either individually or
492        in batches.
493
494        Args:
495            offset: Offset
496            wasm_module_path: str - The absolute path to the WASM file
497
498        Example:
499            import os
500
501            wmp = os.path.abspath("somefilter.wasm")
502            config = ConsumerConfig()
503            config.smartmodule(path=wmp)
504            for i in consumer.stream_with_config(Offset.beginning(), config):
505                # do something with i
506
507        Returns:
508            `Iterator[Record]`
509
510        """
511        return self._generator(
512            self._inner.stream_with_config(offset._inner, config._inner)
513        )
514
515    async def async_stream_with_config(
516        self, offset: Offset, config: ConsumerConfig
517    ) -> typing.AsyncIterator[Record]:
518        """
519        Continuously streams events from a particular offset with a SmartModule
520        WASM module in the consumer’s partitions. This returns a
521        `AsyncIterator[Record]` which is an async iterator.
522
523        Streaming is one of the two ways to consume events in Fluvio. It is a
524        continuous request for new records arriving in a partition, beginning
525        at a particular offset. You specify the starting point of the stream
526        using an Offset and periodically receive events, either individually or
527        in batches.
528
529        Args:
530            offset: Offset
531            wasm_module_path: str - The absolute path to the WASM file
532
533        Example:
534            import os
535
536            wmp = os.path.abspath("somefilter.wasm")
537            config = ConsumerConfig()
538            config.smartmodule(path=wmp)
539            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
540                # do something with i
541
542        Returns:
543            `AsyncIterator[Record]`
544
545        """
546        return self._async_generator(
547            await self._inner.async_stream_with_config(offset, config._inner)
548        )
549
550    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
551        item = stream.next()
552        while item is not None:
553            yield Record(item)
554            item = stream.next()
555
556    async def _async_generator(
557        self, astream: _AsyncPartitionConsumerStream
558    ) -> typing.AsyncIterator[Record]:
559        item = await astream.async_next()
560        while item is not None:
561            yield Record(item)
562            item = await astream.async_next()
563
564
565class TopicProducer:
566    """An interface for producing events to a particular topic.
567
568    A `TopicProducer` allows you to send events to the specific topic it was
569    initialized for. Once you have a `TopicProducer`, you can send events to
570    the topic, choosing which partition each event should be delivered to.
571    """
572
573    _inner: _TopicProducer
574
575    def __init__(self, inner: _TopicProducer):
576        self._inner = inner
577
578    def send_string(self, buf: str) -> ProduceOutput:
579        """Sends a string to this producer’s topic"""
580        return self.send([], buf.encode("utf-8"))
581
582    async def async_send_string(self, buf: str) -> ProduceOutput:
583        """Sends a string to this producer’s topic"""
584        return await self.async_send([], buf.encode("utf-8"))
585
586    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
587        """
588        Sends a key/value record to this producer's Topic.
589
590        The partition that the record will be sent to is derived from the Key.
591        """
592        return ProduceOutput(self._inner.send(key, value))
593
594    async def async_send(
595        self, key: typing.List[int], value: typing.List[int]
596    ) -> ProduceOutput:
597        """
598        Async sends a key/value record to this producer's Topic.
599
600        The partition that the record will be sent to is derived from the Key.
601        """
602        produce_output = await self._inner.async_send(key, value)
603        return ProduceOutput(produce_output)
604
605    def flush(self) -> None:
606        """
607        Send all the queued records in the producer batches.
608        """
609        return self._inner.flush()
610
611    async def async_flush(self) -> None:
612        """
613        Async send all the queued records in the producer batches.
614        """
615        await self._inner.async_flush()
616
617    def send_all(
618        self, records: typing.List[typing.Tuple[bytes, bytes]]
619    ) -> typing.List[ProduceOutput]:
620        """
621        Sends a list of key/value records as a batch to this producer's Topic.
622        :param records: The list of records to send
623        """
624        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
625        return [
626            ProduceOutput(output_inner)
627            for output_inner in self._inner.send_all(records_inner)
628        ]
629
630    async def async_send_all(
631        self, records: typing.List[typing.Tuple[bytes, bytes]]
632    ) -> typing.List[ProduceOutput]:
633        """
634        Async sends a list of key/value records as a batch to this producer's Topic.
635        :param records: The list of records to send
636        """
637        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
638        return [
639            ProduceOutput(output_inner)
640            for output_inner in await self._inner.async_send_all(records_inner)
641        ]
642
643
644class PartitionSelectionStrategy:
645    """Stragegy to select partitions"""
646
647    _inner: _PartitionSelectionStrategy
648
649    def __init__(self, inner: _FluvioConfig):
650        self._inner = inner
651
652    @classmethod
653    def with_all(cls, topic: str):
654        """select all partitions of one topic"""
655        return cls(_PartitionSelectionStrategy.with_all(topic))
656
657    @classmethod
658    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
659        """select multiple partitions of multiple topics"""
660        return cls(_PartitionSelectionStrategy.with_multiple(topic))
661
662
663class ConsumerOffset:
664    """Consumer offset"""
665
666    _inner: _ConsumerOffset
667
668    def __init__(self, inner: _ConsumerOffset):
669        self._inner = inner
670
671    def consumer_id(self) -> str:
672        return self._inner.consumer_id()
673
674    def topic(self) -> str:
675        return self._inner.topic()
676
677    def partition(self) -> int:
678        return self._inner.partition()
679
680    def offset(self) -> int:
681        return self._inner.offset()
682
683    def modified_time(self) -> int:
684        return self._inner.modified_time()
685
686
687class FluvioConfig:
688    """Configuration for Fluvio client"""
689
690    _inner: _FluvioConfig
691
692    def __init__(self, inner: _FluvioConfig):
693        self._inner = inner
694
695    @classmethod
696    def load(cls):
697        """get current cluster config from default profile"""
698        return cls(_FluvioConfig.load())
699
700    @classmethod
701    def new(cls, addr: str):
702        """Create a new cluster configuration with no TLS."""
703        return cls(_FluvioConfig.new(addr))
704
705    def set_endpoint(self, endpoint: str):
706        """set endpoint"""
707        self._inner.set_endpoint(endpoint)
708
709    def set_use_spu_local_address(self, val: bool):
710        """set wheather to use spu local address"""
711        self._inner.set_use_spu_local_address(val)
712
713    def disable_tls(self):
714        """disable tls for this config"""
715        self._inner.disable_tls()
716
717    def set_anonymous_tls(self):
718        """set the config to use anonymous tls"""
719        self._inner.set_anonymous_tls()
720
721    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
722        """specify inline tls parameters"""
723        self._inner.set_inline_tls(domain, key, cert, ca_cert)
724
725    def set_tls_file_paths(
726        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
727    ):
728        """specify paths to tls files"""
729        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
730
731    def set_client_id(self, client_id: str):
732        """set client id"""
733        self._inner.set_client_id(client_id)
734
735    def unset_client_id(self):
736        """remove the configured client id from config"""
737        self._inner.unset_client_id()
738
739
740class Fluvio:
741    """An interface for interacting with Fluvio streaming."""
742
743    _inner: _Fluvio
744
745    def __init__(self, inner: _Fluvio):
746        self._inner = inner
747
748    @classmethod
749    def connect(cls):
750        """Tries to create a new Fluvio client using the current profile from
751        `~/.fluvio/config`
752        """
753        return cls(_Fluvio.connect())
754
755    @classmethod
756    def connect_with_config(cls, config: FluvioConfig):
757        """Creates a new Fluvio client using the given configuration"""
758        return cls(_Fluvio.connect_with_config(config._inner))
759
760    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
761        """Creates consumer with settings defined in config
762
763        This is the recommended way to create a consume records.
764        """
765        return ConsumerIterator(self._inner.consumer_with_config(config))
766
767    def topic_producer(self, topic: str) -> TopicProducer:
768        """
769        Creates a new `TopicProducer` for the given topic name.
770
771        Currently, producers are scoped to a specific Fluvio topic. That means
772        when you send events via a producer, you must specify which partition
773        each event should go to.
774        """
775        return TopicProducer(self._inner.topic_producer(topic))
776
777    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
778        """Creates a new `PartitionConsumer` for the given topic and partition
779
780        Currently, consumers are scoped to both a specific Fluvio topic and to
781        a particular partition within that topic. That means that if you have a
782        topic with multiple partitions, then in order to receive all of the
783        events in all of the partitions, you will need to create one consumer
784        per partition.
785        """
786        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
787
788    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
789        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
790
791        Currently, consumers are scoped to both a specific Fluvio topic and to
792        its all partitions within that topic.
793        """
794        strategy = PartitionSelectionStrategy.with_all(topic)
795        return MultiplePartitionConsumer(
796            self._inner.multi_partition_consumer(strategy._inner)
797        )
798
799    def multi_topic_partition_consumer(
800        self, selections: typing.List[typing.Tuple[str, int]]
801    ) -> MultiplePartitionConsumer:
802        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
803
804        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
805        """
806        strategy = PartitionSelectionStrategy.with_multiple(selections)
807        return MultiplePartitionConsumer(
808            self._inner.multi_partition_consumer(strategy._inner)
809        )
810
811    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
812        """Fetch the current offsets of the consumer"""
813        return self._inner.consumer_offsets()
814
815    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
816        """Delete the consumer offset"""
817        return self._inner.delete_consumer_offset(consumer, topic, partition)
818
819    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
820        item = stream.next()
821        while item is not None:
822            yield Record(item)
823            item = stream.next()
824
825
826class FluvioAdmin:
827    _inner: _FluvioAdmin
828
829    def __init__(self, inner: _FluvioAdmin):
830        self._inner = inner
831
832    def connect():
833        return FluvioAdmin(_FluvioAdmin.connect())
834
835    def connect_with_config(config: FluvioConfig):
836        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
837
838    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
839        partitions = 1
840        replication = 1
841        ignore_rack = True
842        dry_run = False
843        spec = (
844            spec
845            if spec is not None
846            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
847        )
848        return self._inner.create_topic(topic, dry_run, spec)
849
850    def create_topic_with_config(
851        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
852    ):
853        return self._inner.create_topic_with_config(topic, req._inner, spec)
854
855    def delete_topic(self, topic: str):
856        return self._inner.delete_topic(topic)
857
858    def all_topics(self) -> typing.List[MetadataTopicSpec]:
859        return self._inner.all_topics()
860
861    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
862        return self._inner.list_topics(filters)
863
864    def list_topics_with_params(
865        self, filters: typing.List[str], summary: bool
866    ) -> typing.List[MetadataTopicSpec]:
867        return self._inner.list_topics_with_params(filters, summary)
868
869    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
870        return self._topic_spec_generator(self._inner.watch_topic())
871
872    def create_smartmodule(self, name: str, path: str, dry_run: bool):
873        spec = SmartModuleSpec.new(path)
874        return self._inner.create_smart_module(name, dry_run, spec._inner)
875
876    def delete_smartmodule(self, name: str):
877        return self._inner.delete_smart_module(name)
878
879    def list_smartmodules(
880        self, filters: typing.List[str]
881    ) -> typing.List[MetadataSmartModuleSpec]:
882        return self._inner.list_smart_modules(filters)
883
884    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
885        return self._smart_module_spec_generator(self._inner.watch_smart_module())
886
887    def list_partitions(
888        self, filters: typing.List[str]
889    ) -> typing.List[MetadataPartitionSpec]:
890        return self._inner.list_partitions(filters)
891
892    def _topic_spec_generator(
893        self, stream: _WatchTopicStream
894    ) -> typing.Iterator[MetaUpdateTopicSpec]:
895        item = stream.next().inner()
896        while item is not None:
897            yield MetaUpdateTopicSpec(item)
898            item = stream.next().inner()
899
900    def _smart_module_spec_generator(
901        self, stream: _WatchSmartModuleStream
902    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
903        item = stream.next().inner()
904        while item is not None:
905            yield MetaUpdateSmartModuleSpec(item)
906            item = stream.next().inner()
class Fluvio:
741class Fluvio:
742    """An interface for interacting with Fluvio streaming."""
743
744    _inner: _Fluvio
745
746    def __init__(self, inner: _Fluvio):
747        self._inner = inner
748
749    @classmethod
750    def connect(cls):
751        """Tries to create a new Fluvio client using the current profile from
752        `~/.fluvio/config`
753        """
754        return cls(_Fluvio.connect())
755
756    @classmethod
757    def connect_with_config(cls, config: FluvioConfig):
758        """Creates a new Fluvio client using the given configuration"""
759        return cls(_Fluvio.connect_with_config(config._inner))
760
761    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
762        """Creates consumer with settings defined in config
763
764        This is the recommended way to create a consume records.
765        """
766        return ConsumerIterator(self._inner.consumer_with_config(config))
767
768    def topic_producer(self, topic: str) -> TopicProducer:
769        """
770        Creates a new `TopicProducer` for the given topic name.
771
772        Currently, producers are scoped to a specific Fluvio topic. That means
773        when you send events via a producer, you must specify which partition
774        each event should go to.
775        """
776        return TopicProducer(self._inner.topic_producer(topic))
777
778    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
779        """Creates a new `PartitionConsumer` for the given topic and partition
780
781        Currently, consumers are scoped to both a specific Fluvio topic and to
782        a particular partition within that topic. That means that if you have a
783        topic with multiple partitions, then in order to receive all of the
784        events in all of the partitions, you will need to create one consumer
785        per partition.
786        """
787        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
788
789    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
790        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
791
792        Currently, consumers are scoped to both a specific Fluvio topic and to
793        its all partitions within that topic.
794        """
795        strategy = PartitionSelectionStrategy.with_all(topic)
796        return MultiplePartitionConsumer(
797            self._inner.multi_partition_consumer(strategy._inner)
798        )
799
800    def multi_topic_partition_consumer(
801        self, selections: typing.List[typing.Tuple[str, int]]
802    ) -> MultiplePartitionConsumer:
803        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
804
805        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
806        """
807        strategy = PartitionSelectionStrategy.with_multiple(selections)
808        return MultiplePartitionConsumer(
809            self._inner.multi_partition_consumer(strategy._inner)
810        )
811
812    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
813        """Fetch the current offsets of the consumer"""
814        return self._inner.consumer_offsets()
815
816    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
817        """Delete the consumer offset"""
818        return self._inner.delete_consumer_offset(consumer, topic, partition)
819
820    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
821        item = stream.next()
822        while item is not None:
823            yield Record(item)
824            item = stream.next()

An interface for interacting with Fluvio streaming.

Fluvio(inner: Fluvio)
746    def __init__(self, inner: _Fluvio):
747        self._inner = inner
@classmethod
def connect(cls):
749    @classmethod
750    def connect(cls):
751        """Tries to create a new Fluvio client using the current profile from
752        `~/.fluvio/config`
753        """
754        return cls(_Fluvio.connect())

Tries to create a new Fluvio client using the current profile from ~/.fluvio/config

@classmethod
def connect_with_config(cls, config: FluvioConfig):
756    @classmethod
757    def connect_with_config(cls, config: FluvioConfig):
758        """Creates a new Fluvio client using the given configuration"""
759        return cls(_Fluvio.connect_with_config(config._inner))

Creates a new Fluvio client using the given configuration

def consumer_with_config(self, config: ConsumerConfigExt) -> fluvio.ConsumerIterator:
761    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
762        """Creates consumer with settings defined in config
763
764        This is the recommended way to create a consume records.
765        """
766        return ConsumerIterator(self._inner.consumer_with_config(config))

Creates consumer with settings defined in config

This is the recommended way to create a consume records.

def topic_producer(self, topic: str) -> TopicProducer:
768    def topic_producer(self, topic: str) -> TopicProducer:
769        """
770        Creates a new `TopicProducer` for the given topic name.
771
772        Currently, producers are scoped to a specific Fluvio topic. That means
773        when you send events via a producer, you must specify which partition
774        each event should go to.
775        """
776        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.

def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
778    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
779        """Creates a new `PartitionConsumer` for the given topic and partition
780
781        Currently, consumers are scoped to both a specific Fluvio topic and to
782        a particular partition within that topic. That means that if you have a
783        topic with multiple partitions, then in order to receive all of the
784        events in all of the partitions, you will need to create one consumer
785        per partition.
786        """
787        return PartitionConsumer(self._inner.partition_consumer(topic, partition))

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
789    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
790        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
791
792        Currently, consumers are scoped to both a specific Fluvio topic and to
793        its all partitions within that topic.
794        """
795        strategy = PartitionSelectionStrategy.with_all(topic)
796        return MultiplePartitionConsumer(
797            self._inner.multi_partition_consumer(strategy._inner)
798        )

Creates a new MultiplePartitionConsumer for the given topic and its all partitions

Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.

def multi_topic_partition_consumer( self, selections: List[Tuple[str, int]]) -> MultiplePartitionConsumer:
800    def multi_topic_partition_consumer(
801        self, selections: typing.List[typing.Tuple[str, int]]
802    ) -> MultiplePartitionConsumer:
803        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
804
805        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
806        """
807        strategy = PartitionSelectionStrategy.with_multiple(selections)
808        return MultiplePartitionConsumer(
809            self._inner.multi_partition_consumer(strategy._inner)
810        )

Creates a new MultiplePartitionConsumer for the given topics and partitions

Currently, consumers are scoped to a list of Fluvio topic and partition tuple.

def consumer_offsets(self) -> List[ConsumerOffset]:
812    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
813        """Fetch the current offsets of the consumer"""
814        return self._inner.consumer_offsets()

Fetch the current offsets of the consumer

def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
816    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
817        """Delete the consumer offset"""
818        return self._inner.delete_consumer_offset(consumer, topic, partition)

Delete the consumer offset

class FluvioConfig:
688class FluvioConfig:
689    """Configuration for Fluvio client"""
690
691    _inner: _FluvioConfig
692
693    def __init__(self, inner: _FluvioConfig):
694        self._inner = inner
695
696    @classmethod
697    def load(cls):
698        """get current cluster config from default profile"""
699        return cls(_FluvioConfig.load())
700
701    @classmethod
702    def new(cls, addr: str):
703        """Create a new cluster configuration with no TLS."""
704        return cls(_FluvioConfig.new(addr))
705
706    def set_endpoint(self, endpoint: str):
707        """set endpoint"""
708        self._inner.set_endpoint(endpoint)
709
710    def set_use_spu_local_address(self, val: bool):
711        """set wheather to use spu local address"""
712        self._inner.set_use_spu_local_address(val)
713
714    def disable_tls(self):
715        """disable tls for this config"""
716        self._inner.disable_tls()
717
718    def set_anonymous_tls(self):
719        """set the config to use anonymous tls"""
720        self._inner.set_anonymous_tls()
721
722    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
723        """specify inline tls parameters"""
724        self._inner.set_inline_tls(domain, key, cert, ca_cert)
725
726    def set_tls_file_paths(
727        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
728    ):
729        """specify paths to tls files"""
730        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
731
732    def set_client_id(self, client_id: str):
733        """set client id"""
734        self._inner.set_client_id(client_id)
735
736    def unset_client_id(self):
737        """remove the configured client id from config"""
738        self._inner.unset_client_id()

Configuration for Fluvio client

FluvioConfig(inner: FluvioConfig)
693    def __init__(self, inner: _FluvioConfig):
694        self._inner = inner
@classmethod
def load(cls):
696    @classmethod
697    def load(cls):
698        """get current cluster config from default profile"""
699        return cls(_FluvioConfig.load())

get current cluster config from default profile

@classmethod
def new(cls, addr: str):
701    @classmethod
702    def new(cls, addr: str):
703        """Create a new cluster configuration with no TLS."""
704        return cls(_FluvioConfig.new(addr))

Create a new cluster configuration with no TLS.

def set_endpoint(self, endpoint: str):
706    def set_endpoint(self, endpoint: str):
707        """set endpoint"""
708        self._inner.set_endpoint(endpoint)

set endpoint

def set_use_spu_local_address(self, val: bool):
710    def set_use_spu_local_address(self, val: bool):
711        """set wheather to use spu local address"""
712        self._inner.set_use_spu_local_address(val)

set wheather to use spu local address

def disable_tls(self):
714    def disable_tls(self):
715        """disable tls for this config"""
716        self._inner.disable_tls()

disable tls for this config

def set_anonymous_tls(self):
718    def set_anonymous_tls(self):
719        """set the config to use anonymous tls"""
720        self._inner.set_anonymous_tls()

set the config to use anonymous tls

def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
722    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
723        """specify inline tls parameters"""
724        self._inner.set_inline_tls(domain, key, cert, ca_cert)

specify inline tls parameters

def set_tls_file_paths(self, domain: str, key_path: str, cert_path: str, ca_cert_path: str):
726    def set_tls_file_paths(
727        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
728    ):
729        """specify paths to tls files"""
730        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)

specify paths to tls files

def set_client_id(self, client_id: str):
732    def set_client_id(self, client_id: str):
733        """set client id"""
734        self._inner.set_client_id(client_id)

set client id

def unset_client_id(self):
736    def unset_client_id(self):
737        """remove the configured client id from config"""
738        self._inner.unset_client_id()

remove the configured client id from config

class FluvioAdmin:
827class FluvioAdmin:
828    _inner: _FluvioAdmin
829
830    def __init__(self, inner: _FluvioAdmin):
831        self._inner = inner
832
833    def connect():
834        return FluvioAdmin(_FluvioAdmin.connect())
835
836    def connect_with_config(config: FluvioConfig):
837        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
838
839    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
840        partitions = 1
841        replication = 1
842        ignore_rack = True
843        dry_run = False
844        spec = (
845            spec
846            if spec is not None
847            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
848        )
849        return self._inner.create_topic(topic, dry_run, spec)
850
851    def create_topic_with_config(
852        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
853    ):
854        return self._inner.create_topic_with_config(topic, req._inner, spec)
855
856    def delete_topic(self, topic: str):
857        return self._inner.delete_topic(topic)
858
859    def all_topics(self) -> typing.List[MetadataTopicSpec]:
860        return self._inner.all_topics()
861
862    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
863        return self._inner.list_topics(filters)
864
865    def list_topics_with_params(
866        self, filters: typing.List[str], summary: bool
867    ) -> typing.List[MetadataTopicSpec]:
868        return self._inner.list_topics_with_params(filters, summary)
869
870    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
871        return self._topic_spec_generator(self._inner.watch_topic())
872
873    def create_smartmodule(self, name: str, path: str, dry_run: bool):
874        spec = SmartModuleSpec.new(path)
875        return self._inner.create_smart_module(name, dry_run, spec._inner)
876
877    def delete_smartmodule(self, name: str):
878        return self._inner.delete_smart_module(name)
879
880    def list_smartmodules(
881        self, filters: typing.List[str]
882    ) -> typing.List[MetadataSmartModuleSpec]:
883        return self._inner.list_smart_modules(filters)
884
885    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
886        return self._smart_module_spec_generator(self._inner.watch_smart_module())
887
888    def list_partitions(
889        self, filters: typing.List[str]
890    ) -> typing.List[MetadataPartitionSpec]:
891        return self._inner.list_partitions(filters)
892
893    def _topic_spec_generator(
894        self, stream: _WatchTopicStream
895    ) -> typing.Iterator[MetaUpdateTopicSpec]:
896        item = stream.next().inner()
897        while item is not None:
898            yield MetaUpdateTopicSpec(item)
899            item = stream.next().inner()
900
901    def _smart_module_spec_generator(
902        self, stream: _WatchSmartModuleStream
903    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
904        item = stream.next().inner()
905        while item is not None:
906            yield MetaUpdateSmartModuleSpec(item)
907            item = stream.next().inner()
FluvioAdmin(inner: FluvioAdmin)
830    def __init__(self, inner: _FluvioAdmin):
831        self._inner = inner
def connect():
833    def connect():
834        return FluvioAdmin(_FluvioAdmin.connect())
def connect_with_config(config: FluvioConfig):
836    def connect_with_config(config: FluvioConfig):
837        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
def create_topic(self, topic: str, spec: Optional[TopicSpec] = None):
839    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
840        partitions = 1
841        replication = 1
842        ignore_rack = True
843        dry_run = False
844        spec = (
845            spec
846            if spec is not None
847            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
848        )
849        return self._inner.create_topic(topic, dry_run, spec)
def create_topic_with_config( self, topic: str, req: CommonCreateRequest, spec: TopicSpec):
851    def create_topic_with_config(
852        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
853    ):
854        return self._inner.create_topic_with_config(topic, req._inner, spec)
def delete_topic(self, topic: str):
856    def delete_topic(self, topic: str):
857        return self._inner.delete_topic(topic)
def all_topics(self) -> List[MetadataTopicSpec]:
859    def all_topics(self) -> typing.List[MetadataTopicSpec]:
860        return self._inner.all_topics()
def list_topics(self, filters: List[str]) -> List[MetadataTopicSpec]:
862    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
863        return self._inner.list_topics(filters)
def list_topics_with_params( self, filters: List[str], summary: bool) -> List[MetadataTopicSpec]:
865    def list_topics_with_params(
866        self, filters: typing.List[str], summary: bool
867    ) -> typing.List[MetadataTopicSpec]:
868        return self._inner.list_topics_with_params(filters, summary)
def watch_topic(self) -> Iterator[MetadataTopicSpec]:
870    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
871        return self._topic_spec_generator(self._inner.watch_topic())
def create_smartmodule(self, name: str, path: str, dry_run: bool):
873    def create_smartmodule(self, name: str, path: str, dry_run: bool):
874        spec = SmartModuleSpec.new(path)
875        return self._inner.create_smart_module(name, dry_run, spec._inner)
def delete_smartmodule(self, name: str):
877    def delete_smartmodule(self, name: str):
878        return self._inner.delete_smart_module(name)
def list_smartmodules(self, filters: List[str]) -> List[fluvio.specs.MetadataSmartModuleSpec]:
880    def list_smartmodules(
881        self, filters: typing.List[str]
882    ) -> typing.List[MetadataSmartModuleSpec]:
883        return self._inner.list_smart_modules(filters)
def watch_smartmodule(self) -> Iterator[fluvio.specs.MetadataSmartModuleSpec]:
885    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
886        return self._smart_module_spec_generator(self._inner.watch_smart_module())
def list_partitions(self, filters: List[str]) -> List[MetadataPartitionSpec]:
888    def list_partitions(
889        self, filters: typing.List[str]
890    ) -> typing.List[MetadataPartitionSpec]:
891        return self._inner.list_partitions(filters)
@dataclass(frozen=True)
class TopicSpec:
 26@dataclass(frozen=True)
 27class TopicSpec:
 28    mode: TopicMode = TopicMode.COMPUTED
 29    partitions: int = 1
 30    replications: int = 1
 31    ignore_rack: bool = True
 32    replica_assignment: Optional[List[PartitionMap]] = None
 33    retention_time: Optional[int] = None
 34    segment_size: Optional[int] = None
 35    compression_type: Optional[CompressionType] = None
 36    max_partition_size: Optional[int] = None
 37    system: bool = False
 38
 39    @classmethod
 40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
 41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
 42
 43    @classmethod
 44    def create(cls) -> "TopicSpec":
 45        """Alternative constructor method"""
 46        return cls()
 47
 48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
 49        """Set the assigned replica configuration"""
 50        return replace(
 51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
 52        )
 53
 54    def as_mirror_topic(self) -> "TopicSpec":
 55        """Set as a mirror topic"""
 56        return replace(self, mode=TopicMode.MIRROR)
 57
 58    def with_partitions(self, partitions: int) -> "TopicSpec":
 59        """Set the specified partitions"""
 60        if partitions < 0:
 61            raise ValueError("Partitions must be a positive integer")
 62        return replace(self, partitions=partitions)
 63
 64    def with_replications(self, replications: int) -> "TopicSpec":
 65        """Set the specified replication factor"""
 66        if replications < 0:
 67            raise ValueError("Replication factor must be a positive integer")
 68        return replace(self, replications=replications)
 69
 70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
 71        """Set the rack ignore setting"""
 72        return replace(self, ignore_rack=ignore)
 73
 74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
 75        """Set the specified compression type"""
 76        return replace(self, compression_type=compression)
 77
 78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
 79        """Set the specified retention time"""
 80
 81        if isinstance(retention_time, int):
 82            return replace(self, retention_time=retention_time)
 83
 84        parsed_time = parse_timespan(retention_time)
 85        return replace(self, retention_time=parsed_time)
 86
 87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
 88        """Set the specified segment size"""
 89        if isinstance(size, int):
 90            return replace(self, segment_size=size)
 91
 92        parsed_size = parse_byte_size(size)
 93        return replace(self, segment_size=parsed_size)
 94
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)
102
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)
106
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec
TopicSpec( mode: TopicMode = <TopicMode.COMPUTED: 'computed'>, partitions: int = 1, replications: int = 1, ignore_rack: bool = True, replica_assignment: Optional[List[PartitionMap]] = None, retention_time: Optional[int] = None, segment_size: Optional[int] = None, compression_type: Optional[CompressionType] = None, max_partition_size: Optional[int] = None, system: bool = False)
mode: TopicMode = <TopicMode.COMPUTED: 'computed'>
partitions: int = 1
replications: int = 1
ignore_rack: bool = True
replica_assignment: Optional[List[PartitionMap]] = None
retention_time: Optional[int] = None
segment_size: Optional[int] = None
compression_type: Optional[CompressionType] = None
max_partition_size: Optional[int] = None
system: bool = False
@classmethod
def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
39    @classmethod
40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
@classmethod
def create(cls) -> TopicSpec:
43    @classmethod
44    def create(cls) -> "TopicSpec":
45        """Alternative constructor method"""
46        return cls()

Alternative constructor method

def with_assignment( self, replica_assignment: List[PartitionMap]) -> TopicSpec:
48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
49        """Set the assigned replica configuration"""
50        return replace(
51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
52        )

Set the assigned replica configuration

def as_mirror_topic(self) -> TopicSpec:
54    def as_mirror_topic(self) -> "TopicSpec":
55        """Set as a mirror topic"""
56        return replace(self, mode=TopicMode.MIRROR)

Set as a mirror topic

def with_partitions(self, partitions: int) -> TopicSpec:
58    def with_partitions(self, partitions: int) -> "TopicSpec":
59        """Set the specified partitions"""
60        if partitions < 0:
61            raise ValueError("Partitions must be a positive integer")
62        return replace(self, partitions=partitions)

Set the specified partitions

def with_replications(self, replications: int) -> TopicSpec:
64    def with_replications(self, replications: int) -> "TopicSpec":
65        """Set the specified replication factor"""
66        if replications < 0:
67            raise ValueError("Replication factor must be a positive integer")
68        return replace(self, replications=replications)

Set the specified replication factor

def with_ignore_rack(self, ignore: bool = True) -> TopicSpec:
70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
71        """Set the rack ignore setting"""
72        return replace(self, ignore_rack=ignore)

Set the rack ignore setting

def with_compression( self, compression: CompressionType) -> TopicSpec:
74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
75        """Set the specified compression type"""
76        return replace(self, compression_type=compression)

Set the specified compression type

def with_retention_time(self, retention_time: Union[str, int]) -> TopicSpec:
78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
79        """Set the specified retention time"""
80
81        if isinstance(retention_time, int):
82            return replace(self, retention_time=retention_time)
83
84        parsed_time = parse_timespan(retention_time)
85        return replace(self, retention_time=parsed_time)

Set the specified retention time

def with_segment_size(self, size: Union[str, int]) -> TopicSpec:
87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
88        """Set the specified segment size"""
89        if isinstance(size, int):
90            return replace(self, segment_size=size)
91
92        parsed_size = parse_byte_size(size)
93        return replace(self, segment_size=parsed_size)

Set the specified segment size

def with_max_partition_size(self, size: Union[str, int]) -> TopicSpec:
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)

Set the specified max partition size

def as_system_topic(self, is_system: bool = True) -> TopicSpec:
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)

Set the topic as an internal system topic

def build(self) -> TopicSpec:
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec

Build the TopicSpec based on the current configuration

class CompressionType(enum.Enum):
17class CompressionType(Enum):
18    NONE = "none"
19    GZIP = "gzip"
20    SNAPPY = "snappy"
21    LZ4 = "lz4"
22    ANY = "any"
23    ZSTD = "zstd"
NONE = <CompressionType.NONE: 'none'>
GZIP = <CompressionType.GZIP: 'gzip'>
SNAPPY = <CompressionType.SNAPPY: 'snappy'>
LZ4 = <CompressionType.LZ4: 'lz4'>
ANY = <CompressionType.ANY: 'any'>
ZSTD = <CompressionType.ZSTD: 'zstd'>
class TopicMode(enum.Enum):
11class TopicMode(Enum):
12    MIRROR = "mirror"
13    ASSIGNED = "assigned"
14    COMPUTED = "computed"
MIRROR = <TopicMode.MIRROR: 'mirror'>
ASSIGNED = <TopicMode.ASSIGNED: 'assigned'>
COMPUTED = <TopicMode.COMPUTED: 'computed'>
class Record:
 9class Record:
10    """The individual record for a given stream."""
11
12    _inner: _Record
13
14    def __init__(self, inner: _Record):
15        self._inner = inner
16
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()
20
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()
24
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()
28
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()
32
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()
36
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

The individual record for a given stream.

Record(inner: Record)
14    def __init__(self, inner: _Record):
15        self._inner = inner
def offset(self) -> int:
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()

The UTF-8 decoded key for this record.

def timestamp(self) -> int:
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

Timestamp of this record.

class RecordMetadata:
42class RecordMetadata:
43    """Metadata of a record send to a topic."""
44
45    _inner: _RecordMetadata
46
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
49
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()
53
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Metadata of a record send to a topic.

RecordMetadata(inner: RecordMetadata)
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
def offset(self) -> int:
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()

Return the offset of the sent record in the topic/partition.

def partition_id(self) -> int:
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Return the partition index the record was sent to.

class Offset:

Describes the location of a record stored in a Fluvio partition.

A topic is composed of one or more partitions.

def absolute(index):

Specifies an absolute offset with the given index within the partition

def beginning():

Specifies an offset starting at the beginning of the partition

def from_beginning(offset):

Specifies an offset relative to the beginning of the partition

def end():

Specifies an offset relative to the beginning of the partition

def from_end(offset):

Specifies an offset relative to the beginning of the partition

class TopicProducer:
566class TopicProducer:
567    """An interface for producing events to a particular topic.
568
569    A `TopicProducer` allows you to send events to the specific topic it was
570    initialized for. Once you have a `TopicProducer`, you can send events to
571    the topic, choosing which partition each event should be delivered to.
572    """
573
574    _inner: _TopicProducer
575
576    def __init__(self, inner: _TopicProducer):
577        self._inner = inner
578
579    def send_string(self, buf: str) -> ProduceOutput:
580        """Sends a string to this producer’s topic"""
581        return self.send([], buf.encode("utf-8"))
582
583    async def async_send_string(self, buf: str) -> ProduceOutput:
584        """Sends a string to this producer’s topic"""
585        return await self.async_send([], buf.encode("utf-8"))
586
587    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
588        """
589        Sends a key/value record to this producer's Topic.
590
591        The partition that the record will be sent to is derived from the Key.
592        """
593        return ProduceOutput(self._inner.send(key, value))
594
595    async def async_send(
596        self, key: typing.List[int], value: typing.List[int]
597    ) -> ProduceOutput:
598        """
599        Async sends a key/value record to this producer's Topic.
600
601        The partition that the record will be sent to is derived from the Key.
602        """
603        produce_output = await self._inner.async_send(key, value)
604        return ProduceOutput(produce_output)
605
606    def flush(self) -> None:
607        """
608        Send all the queued records in the producer batches.
609        """
610        return self._inner.flush()
611
612    async def async_flush(self) -> None:
613        """
614        Async send all the queued records in the producer batches.
615        """
616        await self._inner.async_flush()
617
618    def send_all(
619        self, records: typing.List[typing.Tuple[bytes, bytes]]
620    ) -> typing.List[ProduceOutput]:
621        """
622        Sends a list of key/value records as a batch to this producer's Topic.
623        :param records: The list of records to send
624        """
625        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
626        return [
627            ProduceOutput(output_inner)
628            for output_inner in self._inner.send_all(records_inner)
629        ]
630
631    async def async_send_all(
632        self, records: typing.List[typing.Tuple[bytes, bytes]]
633    ) -> typing.List[ProduceOutput]:
634        """
635        Async sends a list of key/value records as a batch to this producer's Topic.
636        :param records: The list of records to send
637        """
638        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
639        return [
640            ProduceOutput(output_inner)
641            for output_inner in await self._inner.async_send_all(records_inner)
642        ]

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: TopicProducer)
576    def __init__(self, inner: _TopicProducer):
577        self._inner = inner
def send_string(self, buf: str) -> ProduceOutput:
579    def send_string(self, buf: str) -> ProduceOutput:
580        """Sends a string to this producer’s topic"""
581        return self.send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

async def async_send_string(self, buf: str) -> ProduceOutput:
583    async def async_send_string(self, buf: str) -> ProduceOutput:
584        """Sends a string to this producer’s topic"""
585        return await self.async_send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> ProduceOutput:
587    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
588        """
589        Sends a key/value record to this producer's Topic.
590
591        The partition that the record will be sent to is derived from the Key.
592        """
593        return ProduceOutput(self._inner.send(key, value))

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

async def async_send(self, key: List[int], value: List[int]) -> ProduceOutput:
595    async def async_send(
596        self, key: typing.List[int], value: typing.List[int]
597    ) -> ProduceOutput:
598        """
599        Async sends a key/value record to this producer's Topic.
600
601        The partition that the record will be sent to is derived from the Key.
602        """
603        produce_output = await self._inner.async_send(key, value)
604        return ProduceOutput(produce_output)

Async sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
606    def flush(self) -> None:
607        """
608        Send all the queued records in the producer batches.
609        """
610        return self._inner.flush()

Send all the queued records in the producer batches.

async def async_flush(self) -> None:
612    async def async_flush(self) -> None:
613        """
614        Async send all the queued records in the producer batches.
615        """
616        await self._inner.async_flush()

Async send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
618    def send_all(
619        self, records: typing.List[typing.Tuple[bytes, bytes]]
620    ) -> typing.List[ProduceOutput]:
621        """
622        Sends a list of key/value records as a batch to this producer's Topic.
623        :param records: The list of records to send
624        """
625        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
626        return [
627            ProduceOutput(output_inner)
628            for output_inner in self._inner.send_all(records_inner)
629        ]

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
async def async_send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
631    async def async_send_all(
632        self, records: typing.List[typing.Tuple[bytes, bytes]]
633    ) -> typing.List[ProduceOutput]:
634        """
635        Async sends a list of key/value records as a batch to this producer's Topic.
636        :param records: The list of records to send
637        """
638        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
639        return [
640            ProduceOutput(output_inner)
641            for output_inner in await self._inner.async_send_all(records_inner)
642        ]

Async sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class ProduceOutput:
182class ProduceOutput:
183    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
184
185    _inner: _ProduceOutput
186
187    def __init__(self, inner: _ProduceOutput) -> None:
188        self._inner = inner
189
190    def wait(self) -> typing.Optional[RecordMetadata]:
191        """Wait for the record metadata.
192
193        This is a blocking call and may only return a `RecordMetadata` once.
194        Any subsequent call to `wait` will return a `None` value.
195        Errors will be raised as exceptions of type `FluvioError`.
196        """
197        res = self._inner.wait()
198        if res is None:
199            return None
200        return RecordMetadata(res)
201
202    async def async_wait(self) -> typing.Optional[RecordMetadata]:
203        """Asynchronously wait for the record metadata.
204
205        This may only return a `RecordMetadata` once.
206        Any subsequent call to `wait` will return a `None` value.
207        """
208        return await self._inner.async_wait()

Returned by of TopicProducer.send call allowing access to sent record metadata.

ProduceOutput(inner: ProduceOutput)
187    def __init__(self, inner: _ProduceOutput) -> None:
188        self._inner = inner
def wait(self) -> Optional[RecordMetadata]:
190    def wait(self) -> typing.Optional[RecordMetadata]:
191        """Wait for the record metadata.
192
193        This is a blocking call and may only return a `RecordMetadata` once.
194        Any subsequent call to `wait` will return a `None` value.
195        Errors will be raised as exceptions of type `FluvioError`.
196        """
197        res = self._inner.wait()
198        if res is None:
199            return None
200        return RecordMetadata(res)

Wait for the record metadata.

This is a blocking call and may only return a RecordMetadata once. Any subsequent call to wait will return a None value. Errors will be raised as exceptions of type FluvioError.

async def async_wait(self) -> Optional[RecordMetadata]:
202    async def async_wait(self) -> typing.Optional[RecordMetadata]:
203        """Asynchronously wait for the record metadata.
204
205        This may only return a `RecordMetadata` once.
206        Any subsequent call to `wait` will return a `None` value.
207        """
208        return await self._inner.async_wait()

Asynchronously wait for the record metadata.

This may only return a RecordMetadata once. Any subsequent call to wait will return a None value.

class ConsumerConfigExt:
class ConsumerConfigExtBuilder:
def disable_continuous(self, /, val=True):

the fluvio client should disconnect after fetching the specified records

def max_bytes(self, /, max_bytes):

The type of the None singleton.

def offset_start(self, /, offset):

The type of the None singleton.

def offset_consumer(self, /, id):

The type of the None singleton.

def offset_strategy(self, /, strategy=Ellipsis):

The type of the None singleton.

def partition(self, /, partition):

The type of the None singleton.

def topic(self, /, topic):

The type of the None singleton.

def build(self, /):

The type of the None singleton.

class ConsumerConfig:
224class ConsumerConfig:
225    _inner: _ConsumerConfig
226
227    def __init__(self):
228        self._inner = _ConsumerConfig()
229
230    def disable_continuous(self, val: bool = True):
231        """Disable continuous mode after fetching specified records"""
232        self._inner.disable_continuous(val)
233
234    def smartmodule(
235        self,
236        name: str = None,
237        path: str = None,
238        kind: SmartModuleKind = None,
239        params: typing.Dict[str, str] = None,
240        aggregate: typing.List[bytes] = None,
241    ):
242        """
243        This is a method for adding a smartmodule to a consumer config either
244        using a `name` of a `SmartModule` or a `path` to a wasm binary.
245
246        Args:
247
248            name: str
249            path: str
250            kind: SmartModuleKind
251            params: Dict[str, str]
252            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
253
254        Raises:
255            "Require either a path or a name for a smartmodule."
256            "Only specify one of path or name not both."
257
258        Returns:
259
260            None
261        """
262
263        if kind is not None:
264            kind = kind.value
265
266        if path is None and name is None:
267            raise Exception("Require either a path or a name for a smartmodule.")
268
269        if path is not None and name is not None:
270            raise Exception("Only specify one of path or name not both.")
271
272        params = {} if params is None else params
273        param_keys = [x for x in params.keys()]
274        param_vals = [x for x in params.values()]
275
276        self._inner.smartmodule(
277            name,
278            path,
279            kind,
280            param_keys,
281            param_vals,
282            aggregate,
283            # These arguments are for Join stuff but that's not implemented on
284            # the python side yet
285            None,
286            None,
287            None,
288            None,
289        )
def disable_continuous(self, val: bool = True):
230    def disable_continuous(self, val: bool = True):
231        """Disable continuous mode after fetching specified records"""
232        self._inner.disable_continuous(val)

Disable continuous mode after fetching specified records

def smartmodule( self, name: str = None, path: str = None, kind: SmartModuleKind = None, params: Dict[str, str] = None, aggregate: List[bytes] = None):
234    def smartmodule(
235        self,
236        name: str = None,
237        path: str = None,
238        kind: SmartModuleKind = None,
239        params: typing.Dict[str, str] = None,
240        aggregate: typing.List[bytes] = None,
241    ):
242        """
243        This is a method for adding a smartmodule to a consumer config either
244        using a `name` of a `SmartModule` or a `path` to a wasm binary.
245
246        Args:
247
248            name: str
249            path: str
250            kind: SmartModuleKind
251            params: Dict[str, str]
252            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
253
254        Raises:
255            "Require either a path or a name for a smartmodule."
256            "Only specify one of path or name not both."
257
258        Returns:
259
260            None
261        """
262
263        if kind is not None:
264            kind = kind.value
265
266        if path is None and name is None:
267            raise Exception("Require either a path or a name for a smartmodule.")
268
269        if path is not None and name is not None:
270            raise Exception("Only specify one of path or name not both.")
271
272        params = {} if params is None else params
273        param_keys = [x for x in params.keys()]
274        param_vals = [x for x in params.values()]
275
276        self._inner.smartmodule(
277            name,
278            path,
279            kind,
280            param_keys,
281            param_vals,
282            aggregate,
283            # These arguments are for Join stuff but that's not implemented on
284            # the python side yet
285            None,
286            None,
287            None,
288            None,
289        )

This is a method for adding a smartmodule to a consumer config either using a name of a SmartModule or a path to a wasm binary.

Args:

name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule

Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."

Returns:

None
class PartitionConsumer:
312class PartitionConsumer:
313    """
314    An interface for consuming events from a particular partition
315
316    There are two ways to consume events: by "fetching" events and by
317    "streaming" events. Fetching involves specifying a range of events that you
318    want to consume via their Offset. A fetch is a sort of one-time batch
319    operation: you’ll receive all of the events in your range all at once. When
320    you consume events via Streaming, you specify a starting Offset and receive
321    an object that will continuously yield new events as they arrive.
322    """
323
324    _inner: _PartitionConsumer
325
326    def __init__(self, inner: _PartitionConsumer):
327        self._inner = inner
328
329    def stream(self, offset: Offset) -> typing.Iterator[Record]:
330        """
331        Continuously streams events from a particular offset in the consumer’s
332        partition. This returns a `Iterator[Record]` which is an
333        iterator.
334
335        Streaming is one of the two ways to consume events in Fluvio. It is a
336        continuous request for new records arriving in a partition, beginning
337        at a particular offset. You specify the starting point of the stream
338        using an Offset and periodically receive events, either individually or
339        in batches.
340        """
341        return self._generator(self._inner.stream(offset))
342
343    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
344        """
345        Continuously streams events from a particular offset in the consumer’s
346        partition. This returns a `AsyncIterator[Record]` which is an
347        iterator.
348
349        Streaming is one of the two ways to consume events in Fluvio. It is a
350        continuous request for new records arriving in a partition, beginning
351        at a particular offset. You specify the starting point of the stream
352        using an Offset and periodically receive events, either individually or
353        in batches.
354        """
355        return self._async_generator(await self._inner.async_stream(offset))
356
357    def stream_with_config(
358        self, offset: Offset, config: ConsumerConfig
359    ) -> typing.Iterator[Record]:
360        """
361        Continuously streams events from a particular offset with a SmartModule
362        WASM module in the consumer’s partition. This returns a
363        `Iterator[Record]` which is an iterator.
364
365        Streaming is one of the two ways to consume events in Fluvio. It is a
366        continuous request for new records arriving in a partition, beginning
367        at a particular offset. You specify the starting point of the stream
368        using an Offset and periodically receive events, either individually or
369        in batches.
370
371        Args:
372            offset: Offset
373            wasm_module_path: str - The absolute path to the WASM file
374
375        Example:
376            import os
377
378            wmp = os.path.abspath("somefilter.wasm")
379            config = ConsumerConfig()
380            config.smartmodule(path=wmp)
381            for i in consumer.stream_with_config(Offset.beginning(), config):
382                # do something with i
383
384        Returns:
385            `Iterator[Record]`
386
387        """
388        return self._generator(self._inner.stream_with_config(offset, config._inner))
389
390    async def async_stream_with_config(
391        self, offset: Offset, config: ConsumerConfig
392    ) -> typing.AsyncIterator[Record]:
393        """
394        Continuously streams events from a particular offset with a SmartModule
395        WASM module in the consumer’s partition. This returns a
396        `AsyncIterator[Record]` which is an async iterator.
397
398        Streaming is one of the two ways to consume events in Fluvio. It is a
399        continuous request for new records arriving in a partition, beginning
400        at a particular offset. You specify the starting point of the stream
401        using an Offset and periodically receive events, either individually or
402        in batches.
403
404        Args:
405            offset: Offset
406            wasm_module_path: str - The absolute path to the WASM file
407
408        Example:
409            import os
410
411            wmp = os.path.abspath("somefilter.wasm")
412            config = ConsumerConfig()
413            config.smartmodule(path=wmp)
414            `AsyncIterator[Record]`
415
416        """
417        return self._async_generator(
418            await self._inner.async_stream_with_config(offset, config._inner)
419        )
420
421    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
422        item = stream.next()
423        while item is not None:
424            yield Record(item)
425            item = stream.next()
426
427    async def _async_generator(
428        self, astream: _AsyncPartitionConsumerStream
429    ) -> typing.AsyncIterator[Record]:
430        item = await astream.async_next()
431        while item is not None:
432            yield Record(item)
433            item = await astream.async_next()

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: PartitionConsumer)
326    def __init__(self, inner: _PartitionConsumer):
327        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
329    def stream(self, offset: Offset) -> typing.Iterator[Record]:
330        """
331        Continuously streams events from a particular offset in the consumer’s
332        partition. This returns a `Iterator[Record]` which is an
333        iterator.
334
335        Streaming is one of the two ways to consume events in Fluvio. It is a
336        continuous request for new records arriving in a partition, beginning
337        at a particular offset. You specify the starting point of the stream
338        using an Offset and periodically receive events, either individually or
339        in batches.
340        """
341        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
343    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
344        """
345        Continuously streams events from a particular offset in the consumer’s
346        partition. This returns a `AsyncIterator[Record]` which is an
347        iterator.
348
349        Streaming is one of the two ways to consume events in Fluvio. It is a
350        continuous request for new records arriving in a partition, beginning
351        at a particular offset. You specify the starting point of the stream
352        using an Offset and periodically receive events, either individually or
353        in batches.
354        """
355        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a AsyncIterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
357    def stream_with_config(
358        self, offset: Offset, config: ConsumerConfig
359    ) -> typing.Iterator[Record]:
360        """
361        Continuously streams events from a particular offset with a SmartModule
362        WASM module in the consumer’s partition. This returns a
363        `Iterator[Record]` which is an iterator.
364
365        Streaming is one of the two ways to consume events in Fluvio. It is a
366        continuous request for new records arriving in a partition, beginning
367        at a particular offset. You specify the starting point of the stream
368        using an Offset and periodically receive events, either individually or
369        in batches.
370
371        Args:
372            offset: Offset
373            wasm_module_path: str - The absolute path to the WASM file
374
375        Example:
376            import os
377
378            wmp = os.path.abspath("somefilter.wasm")
379            config = ConsumerConfig()
380            config.smartmodule(path=wmp)
381            for i in consumer.stream_with_config(Offset.beginning(), config):
382                # do something with i
383
384        Returns:
385            `Iterator[Record]`
386
387        """
388        return self._generator(self._inner.stream_with_config(offset, config._inner))

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
390    async def async_stream_with_config(
391        self, offset: Offset, config: ConsumerConfig
392    ) -> typing.AsyncIterator[Record]:
393        """
394        Continuously streams events from a particular offset with a SmartModule
395        WASM module in the consumer’s partition. This returns a
396        `AsyncIterator[Record]` which is an async iterator.
397
398        Streaming is one of the two ways to consume events in Fluvio. It is a
399        continuous request for new records arriving in a partition, beginning
400        at a particular offset. You specify the starting point of the stream
401        using an Offset and periodically receive events, either individually or
402        in batches.
403
404        Args:
405            offset: Offset
406            wasm_module_path: str - The absolute path to the WASM file
407
408        Example:
409            import os
410
411            wmp = os.path.abspath("somefilter.wasm")
412            config = ConsumerConfig()
413            config.smartmodule(path=wmp)
414            `AsyncIterator[Record]`
415
416        """
417        return self._async_generator(
418            await self._inner.async_stream_with_config(offset, config._inner)
419        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
class MultiplePartitionConsumer:
436class MultiplePartitionConsumer:
437    """
438    An interface for consuming events from multiple partitions
439
440    There are two ways to consume events: by "fetching" events and by
441    "streaming" events. Fetching involves specifying a range of events that you
442    want to consume via their Offset. A fetch is a sort of one-time batch
443    operation: you’ll receive all of the events in your range all at once. When
444    you consume events via Streaming, you specify a starting Offset and receive
445    an object that will continuously yield new events as they arrive.
446    """
447
448    _inner: _MultiplePartitionConsumer
449
450    def __init__(self, inner: _MultiplePartitionConsumer):
451        self._inner = inner
452
453    def stream(self, offset: Offset) -> typing.Iterator[Record]:
454        """
455        Continuously streams events from a particular offset in the consumer’s
456        partitions. This returns a `Iterator[Record]` which is an
457        iterator.
458
459        Streaming is one of the two ways to consume events in Fluvio. It is a
460        continuous request for new records arriving in a partition, beginning
461        at a particular offset. You specify the starting point of the stream
462        using an Offset and periodically receive events, either individually or
463        in batches.
464        """
465        return self._generator(self._inner.stream(offset))
466
467    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
468        """
469        Continuously streams events from a particular offset in the consumer’s
470        partitions. This returns a `AsyncIterator[Record]` which is an
471        async iterator.
472
473        Streaming is one of the two ways to consume events in Fluvio. It is a
474        continuous request for new records arriving in a partition, beginning
475        at a particular offset. You specify the starting point of the stream
476        using an Offset and periodically receive events, either individually or
477        in batches.
478        """
479        return self._async_generator(await self._inner.async_stream(offset))
480
481    def stream_with_config(
482        self, offset: Offset, config: ConsumerConfig
483    ) -> typing.Iterator[Record]:
484        """
485        Continuously streams events from a particular offset with a SmartModule
486        WASM module in the consumer’s partitions. This returns a
487        `Iterator[Record]` which is an iterator.
488
489        Streaming is one of the two ways to consume events in Fluvio. It is a
490        continuous request for new records arriving in a partition, beginning
491        at a particular offset. You specify the starting point of the stream
492        using an Offset and periodically receive events, either individually or
493        in batches.
494
495        Args:
496            offset: Offset
497            wasm_module_path: str - The absolute path to the WASM file
498
499        Example:
500            import os
501
502            wmp = os.path.abspath("somefilter.wasm")
503            config = ConsumerConfig()
504            config.smartmodule(path=wmp)
505            for i in consumer.stream_with_config(Offset.beginning(), config):
506                # do something with i
507
508        Returns:
509            `Iterator[Record]`
510
511        """
512        return self._generator(
513            self._inner.stream_with_config(offset._inner, config._inner)
514        )
515
516    async def async_stream_with_config(
517        self, offset: Offset, config: ConsumerConfig
518    ) -> typing.AsyncIterator[Record]:
519        """
520        Continuously streams events from a particular offset with a SmartModule
521        WASM module in the consumer’s partitions. This returns a
522        `AsyncIterator[Record]` which is an async iterator.
523
524        Streaming is one of the two ways to consume events in Fluvio. It is a
525        continuous request for new records arriving in a partition, beginning
526        at a particular offset. You specify the starting point of the stream
527        using an Offset and periodically receive events, either individually or
528        in batches.
529
530        Args:
531            offset: Offset
532            wasm_module_path: str - The absolute path to the WASM file
533
534        Example:
535            import os
536
537            wmp = os.path.abspath("somefilter.wasm")
538            config = ConsumerConfig()
539            config.smartmodule(path=wmp)
540            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
541                # do something with i
542
543        Returns:
544            `AsyncIterator[Record]`
545
546        """
547        return self._async_generator(
548            await self._inner.async_stream_with_config(offset, config._inner)
549        )
550
551    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
552        item = stream.next()
553        while item is not None:
554            yield Record(item)
555            item = stream.next()
556
557    async def _async_generator(
558        self, astream: _AsyncPartitionConsumerStream
559    ) -> typing.AsyncIterator[Record]:
560        item = await astream.async_next()
561        while item is not None:
562            yield Record(item)
563            item = await astream.async_next()

An interface for consuming events from multiple partitions

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

MultiplePartitionConsumer(inner: MultiplePartitionConsumer)
450    def __init__(self, inner: _MultiplePartitionConsumer):
451        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
453    def stream(self, offset: Offset) -> typing.Iterator[Record]:
454        """
455        Continuously streams events from a particular offset in the consumer’s
456        partitions. This returns a `Iterator[Record]` which is an
457        iterator.
458
459        Streaming is one of the two ways to consume events in Fluvio. It is a
460        continuous request for new records arriving in a partition, beginning
461        at a particular offset. You specify the starting point of the stream
462        using an Offset and periodically receive events, either individually or
463        in batches.
464        """
465        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
467    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
468        """
469        Continuously streams events from a particular offset in the consumer’s
470        partitions. This returns a `AsyncIterator[Record]` which is an
471        async iterator.
472
473        Streaming is one of the two ways to consume events in Fluvio. It is a
474        continuous request for new records arriving in a partition, beginning
475        at a particular offset. You specify the starting point of the stream
476        using an Offset and periodically receive events, either individually or
477        in batches.
478        """
479        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
481    def stream_with_config(
482        self, offset: Offset, config: ConsumerConfig
483    ) -> typing.Iterator[Record]:
484        """
485        Continuously streams events from a particular offset with a SmartModule
486        WASM module in the consumer’s partitions. This returns a
487        `Iterator[Record]` which is an iterator.
488
489        Streaming is one of the two ways to consume events in Fluvio. It is a
490        continuous request for new records arriving in a partition, beginning
491        at a particular offset. You specify the starting point of the stream
492        using an Offset and periodically receive events, either individually or
493        in batches.
494
495        Args:
496            offset: Offset
497            wasm_module_path: str - The absolute path to the WASM file
498
499        Example:
500            import os
501
502            wmp = os.path.abspath("somefilter.wasm")
503            config = ConsumerConfig()
504            config.smartmodule(path=wmp)
505            for i in consumer.stream_with_config(Offset.beginning(), config):
506                # do something with i
507
508        Returns:
509            `Iterator[Record]`
510
511        """
512        return self._generator(
513            self._inner.stream_with_config(offset._inner, config._inner)
514        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
516    async def async_stream_with_config(
517        self, offset: Offset, config: ConsumerConfig
518    ) -> typing.AsyncIterator[Record]:
519        """
520        Continuously streams events from a particular offset with a SmartModule
521        WASM module in the consumer’s partitions. This returns a
522        `AsyncIterator[Record]` which is an async iterator.
523
524        Streaming is one of the two ways to consume events in Fluvio. It is a
525        continuous request for new records arriving in a partition, beginning
526        at a particular offset. You specify the starting point of the stream
527        using an Offset and periodically receive events, either individually or
528        in batches.
529
530        Args:
531            offset: Offset
532            wasm_module_path: str - The absolute path to the WASM file
533
534        Example:
535            import os
536
537            wmp = os.path.abspath("somefilter.wasm")
538            config = ConsumerConfig()
539            config.smartmodule(path=wmp)
540            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
541                # do something with i
542
543        Returns:
544            `AsyncIterator[Record]`
545
546        """
547        return self._async_generator(
548            await self._inner.async_stream_with_config(offset, config._inner)
549        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: AsyncIterator[Record]

class OffsetManagementStrategy:
NONE = <builtins.OffsetManagementStrategy object>
MANUAL = <builtins.OffsetManagementStrategy object>
AUTO = <builtins.OffsetManagementStrategy object>
class ConsumerOffset:
664class ConsumerOffset:
665    """Consumer offset"""
666
667    _inner: _ConsumerOffset
668
669    def __init__(self, inner: _ConsumerOffset):
670        self._inner = inner
671
672    def consumer_id(self) -> str:
673        return self._inner.consumer_id()
674
675    def topic(self) -> str:
676        return self._inner.topic()
677
678    def partition(self) -> int:
679        return self._inner.partition()
680
681    def offset(self) -> int:
682        return self._inner.offset()
683
684    def modified_time(self) -> int:
685        return self._inner.modified_time()

Consumer offset

ConsumerOffset(inner: ConsumerOffset)
669    def __init__(self, inner: _ConsumerOffset):
670        self._inner = inner
def consumer_id(self) -> str:
672    def consumer_id(self) -> str:
673        return self._inner.consumer_id()
def topic(self) -> str:
675    def topic(self) -> str:
676        return self._inner.topic()
def partition(self) -> int:
678    def partition(self) -> int:
679        return self._inner.partition()
def offset(self) -> int:
681    def offset(self) -> int:
682        return self._inner.offset()
def modified_time(self) -> int:
684    def modified_time(self) -> int:
685        return self._inner.modified_time()
class CommonCreateRequest:
29class CommonCreateRequest:
30    _inner: _CommonCreateRequest
31
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
34
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
CommonCreateRequest(inner: CommonCreateRequest)
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
@classmethod
def new(cls, name: str, dry_run: bool, timeout: int):
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
class SmartModuleSpec:
84class SmartModuleSpec:
85    _inner: _SmartModuleSpec
86
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
89
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
SmartModuleSpec(inner: SmartModuleSpec)
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
@classmethod
def new(cls, path: str):
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
class PartitionMap:
18class PartitionMap:
19    _inner: _PartitionMap
20
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
23
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
PartitionMap(inner: PartitionMap)
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
@classmethod
def new(cls, partition: int, replicas: List[int]):
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
class MessageMetadataTopicSpec:
50class MessageMetadataTopicSpec:
51    _inner: _MessageMetadataTopicSpec
52
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
55
56    def is_update(self) -> bool:
57        return self._inner.is_update()
58
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
61
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
MessageMetadataTopicSpec(inner: MessageMetadataTopicSpec)
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
def is_update(self) -> bool:
56    def is_update(self) -> bool:
57        return self._inner.is_update()
def is_delete(self) -> bool:
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
def metadata_topic_spec(self) -> MetadataTopicSpec:
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
class MetadataPartitionSpec:
142class MetadataPartitionSpec:
143    _inner: _MetadataPartitionSpec
144
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
147
148    def name(self) -> str:
149        return self._inner.name()
MetadataPartitionSpec(inner: MetadataPartitionSpec)
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
def name(self) -> str:
148    def name(self) -> str:
149        return self._inner.name()
class MetadataTopicSpec:
40class MetadataTopicSpec:
41    _inner: _MetadataTopicSpec
42
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
45
46    def name(self) -> str:
47        return self._inner.name()
MetadataTopicSpec(inner: MetadataTopicSpec)
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
def name(self) -> str:
46    def name(self) -> str:
47        return self._inner.name()
class MetaUpdateTopicSpec:
66class MetaUpdateTopicSpec:
67    _inner: _MetaUpdateTopicSpec
68
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
71
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
75
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
79
80    def epoch(self) -> int:
81        return self._inner.epoch()
MetaUpdateTopicSpec(inner: MetaUpdateTopicSpec)
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
def all(self) -> List[MetadataTopicSpec]:
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
def changes(self) -> List[MessageMetadataTopicSpec]:
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
def epoch(self) -> int:
80    def epoch(self) -> int:
81        return self._inner.epoch()
class PartitionSelectionStrategy:
645class PartitionSelectionStrategy:
646    """Stragegy to select partitions"""
647
648    _inner: _PartitionSelectionStrategy
649
650    def __init__(self, inner: _FluvioConfig):
651        self._inner = inner
652
653    @classmethod
654    def with_all(cls, topic: str):
655        """select all partitions of one topic"""
656        return cls(_PartitionSelectionStrategy.with_all(topic))
657
658    @classmethod
659    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
660        """select multiple partitions of multiple topics"""
661        return cls(_PartitionSelectionStrategy.with_multiple(topic))

Stragegy to select partitions

PartitionSelectionStrategy(inner: FluvioConfig)
650    def __init__(self, inner: _FluvioConfig):
651        self._inner = inner
@classmethod
def with_all(cls, topic: str):
653    @classmethod
654    def with_all(cls, topic: str):
655        """select all partitions of one topic"""
656        return cls(_PartitionSelectionStrategy.with_all(topic))

select all partitions of one topic

@classmethod
def with_multiple(cls, topic: List[Tuple[str, int]]):
658    @classmethod
659    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
660        """select multiple partitions of multiple topics"""
661        return cls(_PartitionSelectionStrategy.with_multiple(topic))

select multiple partitions of multiple topics

class SmartModuleKind(enum.Enum):
211class SmartModuleKind(Enum):
212    """
213    Use of this is to explicitly set the kind of a smartmodule. Not required
214    but needed for legacy SmartModules.
215    """
216
217    Filter = _SmartModuleKind.Filter
218    Map = _SmartModuleKind.Map
219    ArrayMap = _SmartModuleKind.ArrayMap
220    FilterMap = _SmartModuleKind.FilterMap
221    Aggregate = _SmartModuleKind.Aggregate

Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.