fluvio

The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.

This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.

Creating a topic with default settings is as simple as:

fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")

Or just create a topic with custom settings:

import fluvio

fluvio_admin = FluvioAdmin.connect()
topic_spec = (
    TopicSpec.create()
    .with_retention_time("1h")
    .with_segment_size("10M")
    .build()
)
fluvio_admin.create_topic("a_topic", topic_spec)

Producing data to a topic in a Fluvio cluster is as simple as:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
producer = fluvio.topic_producer(topic)

for i in range(10):
    producer.send_string("Hello %s " % i)

Consuming is also simple:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

Producing with a custom configuration:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = (
    TopicProducerConfigBuilder()
    .batch_size(32768)
    .linger(100)
    .compression(Compression.Gzip)
    .max_request_size(33554432)
    .timeout(600000)
    .isolation(Isolation.ReadCommitted)
    .delivery_semantic(DeliverySemantic.AtLeastOnce)
)
producer = fluvio.topic_producer_with_config(self.topic, config)

for i in range(10):
    producer.send_string("Hello %s " % i)

Also you can consume usign offset management:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

stream.offset_commit()
stream.offset_flush()

For more examples see the integration tests in the fluvio-python repository.

test_produce.py test_consumer.py

  1"""
  2The __fluvio__ python module provides an extension for working with the Fluvio
  3streaming platform.
  4
  5This module builds on top of the Fluvio Client Rust Crate and provides a
  6pythonic access to the API.
  7
  8
  9Creating a topic with default settings is as simple as:
 10
 11```python
 12fluvio_admin = FluvioAdmin.connect()
 13fluvio_admin.create_topic("a_topic")
 14```
 15
 16Or just create a topic with custom settings:
 17
 18```python
 19import fluvio
 20
 21fluvio_admin = FluvioAdmin.connect()
 22topic_spec = (
 23    TopicSpec.create()
 24    .with_retention_time("1h")
 25    .with_segment_size("10M")
 26    .build()
 27)
 28fluvio_admin.create_topic("a_topic", topic_spec)
 29```
 30
 31Producing data to a topic in a Fluvio cluster is as simple as:
 32
 33```python
 34import fluvio
 35
 36fluvio = Fluvio.connect()
 37
 38topic = "a_topic"
 39producer = fluvio.topic_producer(topic)
 40
 41for i in range(10):
 42    producer.send_string("Hello %s " % i)
 43```
 44
 45Consuming is also simple:
 46
 47```python
 48import fluvio
 49
 50fluvio = Fluvio.connect()
 51
 52topic = "a_topic"
 53builder = ConsumerConfigExtBuilder(topic)
 54config = builder.build()
 55stream = fluvio.consumer_with_config(config)
 56
 57num_items = 2
 58records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
 59```
 60
 61Producing with a custom configuration:
 62
 63```python
 64import fluvio
 65
 66fluvio = Fluvio.connect()
 67
 68topic = "a_topic"
 69builder = (
 70    TopicProducerConfigBuilder()
 71    .batch_size(32768)
 72    .linger(100)
 73    .compression(Compression.Gzip)
 74    .max_request_size(33554432)
 75    .timeout(600000)
 76    .isolation(Isolation.ReadCommitted)
 77    .delivery_semantic(DeliverySemantic.AtLeastOnce)
 78)
 79producer = fluvio.topic_producer_with_config(self.topic, config)
 80
 81for i in range(10):
 82    producer.send_string("Hello %s " % i)
 83```
 84
 85Also you can consume usign offset management:
 86
 87```python
 88import fluvio
 89
 90fluvio = Fluvio.connect()
 91
 92topic = "a_topic"
 93builder = ConsumerConfigExtBuilder(topic)
 94builder.offset_start(Offset.beginning())
 95builder.offset_strategy(OffsetManagementStrategy.MANUAL)
 96builder.offset_consumer("a-consumer")
 97config = builder.build()
 98stream = fluvio.consumer_with_config(config)
 99
100num_items = 2
101records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
102
103stream.offset_commit()
104stream.offset_flush()
105```
106
107For more examples see the integration tests in the fluvio-python repository.
108
109[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py)
110[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py)
111
112"""
113
114import typing
115from enum import Enum
116
117from ._fluvio_python import (
118    Fluvio as _Fluvio,
119    FluvioConfig as _FluvioConfig,
120    Offset,
121    FluvioAdmin as _FluvioAdmin,
122    # consumer types
123    ConsumerConfig as _ConsumerConfig,
124    ConsumerConfigExt,
125    ConsumerConfigExtBuilder,
126    PartitionConsumer as _PartitionConsumer,
127    MultiplePartitionConsumer as _MultiplePartitionConsumer,
128    PartitionSelectionStrategy as _PartitionSelectionStrategy,
129    PartitionConsumerStream as _PartitionConsumerStream,
130    AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream,
131    OffsetManagementStrategy,
132    ConsumerOffset as _ConsumerOffset,
133    # producer types
134    TopicProducer as _TopicProducer,
135    TopicProducerConfig,
136    TopicProducerConfigBuilder,
137    Compression,
138    DeliverySemantic,
139    Isolation,
140    ProduceOutput as _ProduceOutput,
141    ProducerBatchRecord as _ProducerBatchRecord,
142    # admin and misc types
143    SmartModuleKind as _SmartModuleKind,
144    TopicSpec as _TopicSpec,
145    WatchTopicStream as _WatchTopicStream,
146    WatchSmartModuleStream as _WatchSmartModuleStream,
147)
148
149from ._fluvio_python import Error as FluviorError  # noqa: F401
150
151from .topic import TopicSpec, CompressionType, TopicMode
152from .record import Record, RecordMetadata
153from .specs import (
154    # support types
155    CommonCreateRequest,
156    PartitionMap,
157    # specs
158    SmartModuleSpec,
159    MessageMetadataTopicSpec,
160    MetadataPartitionSpec,
161    MetadataSmartModuleSpec,
162    MetadataTopicSpec,
163    MetaUpdateSmartModuleSpec,
164    MetaUpdateTopicSpec,
165)
166
167# this structures the module a bit and allows pydoc to generate better docs
168# with better ordering of types and functions
169# inclusion in __all__, will pull the PyO3 rust inline docs into
170# the pdoc generated documentation
171__all__ = [
172    # top level types
173    "Fluvio",
174    "FluvioConfig",
175    "FluvioAdmin",
176    # topic
177    "TopicSpec",
178    "CompressionType",
179    "TopicMode",
180    # record
181    "Record",
182    "RecordMetadata",
183    "Offset",
184    # producer
185    "TopicProducer",
186    "TopicProducerConfig",
187    "TopicProducerConfigBuilder",
188    "Compression",
189    "DeliverySemantic",
190    "Isolation",
191    "ProduceOutput",
192    # consumer
193    "ConsumerConfigExt",
194    "ConsumerConfigExtBuilder",
195    "ConsumerConfig",
196    "PartitionConsumer",
197    "MultiplePartitionConsumer",
198    "OffsetManagementStrategy",
199    "ConsumerOffset",
200    # specs
201    "CommonCreateRequest",
202    "SmartModuleSpec",
203    "TopicSpec",
204    "PartitionMap",
205    "MessageMetadataTopicSpec",
206    "MetadataPartitionSpec",
207    "MetadataTopicSpec",
208    "MetaUpdateTopicSpec",
209    # Misc
210    "PartitionSelectionStrategy",
211    "SmartModuleKind",
212]
213
214
215class ProduceOutput:
216    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
217
218    _inner: _ProduceOutput
219
220    def __init__(self, inner: _ProduceOutput) -> None:
221        self._inner = inner
222
223    def wait(self) -> typing.Optional[RecordMetadata]:
224        """Wait for the record metadata.
225
226        This is a blocking call and may only return a `RecordMetadata` once.
227        Any subsequent call to `wait` will return a `None` value.
228        Errors will be raised as exceptions of type `FluvioError`.
229        """
230        res = self._inner.wait()
231        if res is None:
232            return None
233        return RecordMetadata(res)
234
235    async def async_wait(self) -> typing.Optional[RecordMetadata]:
236        """Asynchronously wait for the record metadata.
237
238        This may only return a `RecordMetadata` once.
239        Any subsequent call to `wait` will return a `None` value.
240        """
241        return await self._inner.async_wait()
242
243
244class SmartModuleKind(Enum):
245    """
246    Use of this is to explicitly set the kind of a smartmodule. Not required
247    but needed for legacy SmartModules.
248    """
249
250    Filter = _SmartModuleKind.Filter
251    Map = _SmartModuleKind.Map
252    ArrayMap = _SmartModuleKind.ArrayMap
253    FilterMap = _SmartModuleKind.FilterMap
254    Aggregate = _SmartModuleKind.Aggregate
255
256
257class ConsumerConfig:
258    _inner: _ConsumerConfig
259
260    def __init__(self):
261        self._inner = _ConsumerConfig()
262
263    def disable_continuous(self, val: bool = True):
264        """Disable continuous mode after fetching specified records"""
265        self._inner.disable_continuous(val)
266
267    def smartmodule(
268        self,
269        name: str = None,
270        path: str = None,
271        kind: SmartModuleKind = None,
272        params: typing.Dict[str, str] = None,
273        aggregate: typing.List[bytes] = None,
274    ):
275        """
276        This is a method for adding a smartmodule to a consumer config either
277        using a `name` of a `SmartModule` or a `path` to a wasm binary.
278
279        Args:
280
281            name: str
282            path: str
283            kind: SmartModuleKind
284            params: Dict[str, str]
285            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
286
287        Raises:
288            "Require either a path or a name for a smartmodule."
289            "Only specify one of path or name not both."
290
291        Returns:
292
293            None
294        """
295
296        if kind is not None:
297            kind = kind.value
298
299        if path is None and name is None:
300            raise Exception("Require either a path or a name for a smartmodule.")
301
302        if path is not None and name is not None:
303            raise Exception("Only specify one of path or name not both.")
304
305        params = {} if params is None else params
306        param_keys = [x for x in params.keys()]
307        param_vals = [x for x in params.values()]
308
309        self._inner.smartmodule(
310            name,
311            path,
312            kind,
313            param_keys,
314            param_vals,
315            aggregate,
316            # These arguments are for Join stuff but that's not implemented on
317            # the python side yet
318            None,
319            None,
320            None,
321            None,
322        )
323
324
325class ConsumerIterator:
326    def __init__(self, stream: _PartitionConsumerStream):
327        self.stream = stream
328
329    def __iter__(self):
330        return self
331
332    def __next__(self):
333        item = self.stream.next()
334        if item is None:
335            raise StopIteration
336        return Record(item)
337
338    def offset_commit(self):
339        self.stream.offset_commit()
340
341    def offset_flush(self):
342        self.stream.offset_flush()
343
344
345class PartitionConsumer:
346    """
347    An interface for consuming events from a particular partition
348
349    There are two ways to consume events: by "fetching" events and by
350    "streaming" events. Fetching involves specifying a range of events that you
351    want to consume via their Offset. A fetch is a sort of one-time batch
352    operation: you’ll receive all of the events in your range all at once. When
353    you consume events via Streaming, you specify a starting Offset and receive
354    an object that will continuously yield new events as they arrive.
355    """
356
357    _inner: _PartitionConsumer
358
359    def __init__(self, inner: _PartitionConsumer):
360        self._inner = inner
361
362    def stream(self, offset: Offset) -> typing.Iterator[Record]:
363        """
364        Continuously streams events from a particular offset in the consumer’s
365        partition. This returns a `Iterator[Record]` which is an
366        iterator.
367
368        Streaming is one of the two ways to consume events in Fluvio. It is a
369        continuous request for new records arriving in a partition, beginning
370        at a particular offset. You specify the starting point of the stream
371        using an Offset and periodically receive events, either individually or
372        in batches.
373        """
374        return self._generator(self._inner.stream(offset))
375
376    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
377        """
378        Continuously streams events from a particular offset in the consumer’s
379        partition. This returns a `AsyncIterator[Record]` which is an
380        iterator.
381
382        Streaming is one of the two ways to consume events in Fluvio. It is a
383        continuous request for new records arriving in a partition, beginning
384        at a particular offset. You specify the starting point of the stream
385        using an Offset and periodically receive events, either individually or
386        in batches.
387        """
388        return self._async_generator(await self._inner.async_stream(offset))
389
390    def stream_with_config(
391        self, offset: Offset, config: ConsumerConfig
392    ) -> typing.Iterator[Record]:
393        """
394        Continuously streams events from a particular offset with a SmartModule
395        WASM module in the consumer’s partition. This returns a
396        `Iterator[Record]` which is an iterator.
397
398        Streaming is one of the two ways to consume events in Fluvio. It is a
399        continuous request for new records arriving in a partition, beginning
400        at a particular offset. You specify the starting point of the stream
401        using an Offset and periodically receive events, either individually or
402        in batches.
403
404        Args:
405            offset: Offset
406            wasm_module_path: str - The absolute path to the WASM file
407
408        Example:
409            import os
410
411            wmp = os.path.abspath("somefilter.wasm")
412            config = ConsumerConfig()
413            config.smartmodule(path=wmp)
414            for i in consumer.stream_with_config(Offset.beginning(), config):
415                # do something with i
416
417        Returns:
418            `Iterator[Record]`
419
420        """
421        return self._generator(self._inner.stream_with_config(offset, config._inner))
422
423    async def async_stream_with_config(
424        self, offset: Offset, config: ConsumerConfig
425    ) -> typing.AsyncIterator[Record]:
426        """
427        Continuously streams events from a particular offset with a SmartModule
428        WASM module in the consumer’s partition. This returns a
429        `AsyncIterator[Record]` which is an async iterator.
430
431        Streaming is one of the two ways to consume events in Fluvio. It is a
432        continuous request for new records arriving in a partition, beginning
433        at a particular offset. You specify the starting point of the stream
434        using an Offset and periodically receive events, either individually or
435        in batches.
436
437        Args:
438            offset: Offset
439            wasm_module_path: str - The absolute path to the WASM file
440
441        Example:
442            import os
443
444            wmp = os.path.abspath("somefilter.wasm")
445            config = ConsumerConfig()
446            config.smartmodule(path=wmp)
447            `AsyncIterator[Record]`
448
449        """
450        return self._async_generator(
451            await self._inner.async_stream_with_config(offset, config._inner)
452        )
453
454    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
455        item = stream.next()
456        while item is not None:
457            yield Record(item)
458            item = stream.next()
459
460    async def _async_generator(
461        self, astream: _AsyncPartitionConsumerStream
462    ) -> typing.AsyncIterator[Record]:
463        item = await astream.async_next()
464        while item is not None:
465            yield Record(item)
466            item = await astream.async_next()
467
468
469class MultiplePartitionConsumer:
470    """
471    An interface for consuming events from multiple partitions
472
473    There are two ways to consume events: by "fetching" events and by
474    "streaming" events. Fetching involves specifying a range of events that you
475    want to consume via their Offset. A fetch is a sort of one-time batch
476    operation: you’ll receive all of the events in your range all at once. When
477    you consume events via Streaming, you specify a starting Offset and receive
478    an object that will continuously yield new events as they arrive.
479    """
480
481    _inner: _MultiplePartitionConsumer
482
483    def __init__(self, inner: _MultiplePartitionConsumer):
484        self._inner = inner
485
486    def stream(self, offset: Offset) -> typing.Iterator[Record]:
487        """
488        Continuously streams events from a particular offset in the consumer’s
489        partitions. This returns a `Iterator[Record]` which is an
490        iterator.
491
492        Streaming is one of the two ways to consume events in Fluvio. It is a
493        continuous request for new records arriving in a partition, beginning
494        at a particular offset. You specify the starting point of the stream
495        using an Offset and periodically receive events, either individually or
496        in batches.
497        """
498        return self._generator(self._inner.stream(offset))
499
500    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
501        """
502        Continuously streams events from a particular offset in the consumer’s
503        partitions. This returns a `AsyncIterator[Record]` which is an
504        async iterator.
505
506        Streaming is one of the two ways to consume events in Fluvio. It is a
507        continuous request for new records arriving in a partition, beginning
508        at a particular offset. You specify the starting point of the stream
509        using an Offset and periodically receive events, either individually or
510        in batches.
511        """
512        return self._async_generator(await self._inner.async_stream(offset))
513
514    def stream_with_config(
515        self, offset: Offset, config: ConsumerConfig
516    ) -> typing.Iterator[Record]:
517        """
518        Continuously streams events from a particular offset with a SmartModule
519        WASM module in the consumer’s partitions. This returns a
520        `Iterator[Record]` which is an iterator.
521
522        Streaming is one of the two ways to consume events in Fluvio. It is a
523        continuous request for new records arriving in a partition, beginning
524        at a particular offset. You specify the starting point of the stream
525        using an Offset and periodically receive events, either individually or
526        in batches.
527
528        Args:
529            offset: Offset
530            wasm_module_path: str - The absolute path to the WASM file
531
532        Example:
533            import os
534
535            wmp = os.path.abspath("somefilter.wasm")
536            config = ConsumerConfig()
537            config.smartmodule(path=wmp)
538            for i in consumer.stream_with_config(Offset.beginning(), config):
539                # do something with i
540
541        Returns:
542            `Iterator[Record]`
543
544        """
545        return self._generator(
546            self._inner.stream_with_config(offset._inner, config._inner)
547        )
548
549    async def async_stream_with_config(
550        self, offset: Offset, config: ConsumerConfig
551    ) -> typing.AsyncIterator[Record]:
552        """
553        Continuously streams events from a particular offset with a SmartModule
554        WASM module in the consumer’s partitions. This returns a
555        `AsyncIterator[Record]` which is an async iterator.
556
557        Streaming is one of the two ways to consume events in Fluvio. It is a
558        continuous request for new records arriving in a partition, beginning
559        at a particular offset. You specify the starting point of the stream
560        using an Offset and periodically receive events, either individually or
561        in batches.
562
563        Args:
564            offset: Offset
565            wasm_module_path: str - The absolute path to the WASM file
566
567        Example:
568            import os
569
570            wmp = os.path.abspath("somefilter.wasm")
571            config = ConsumerConfig()
572            config.smartmodule(path=wmp)
573            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
574                # do something with i
575
576        Returns:
577            `AsyncIterator[Record]`
578
579        """
580        return self._async_generator(
581            await self._inner.async_stream_with_config(offset, config._inner)
582        )
583
584    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
585        item = stream.next()
586        while item is not None:
587            yield Record(item)
588            item = stream.next()
589
590    async def _async_generator(
591        self, astream: _AsyncPartitionConsumerStream
592    ) -> typing.AsyncIterator[Record]:
593        item = await astream.async_next()
594        while item is not None:
595            yield Record(item)
596            item = await astream.async_next()
597
598
599class TopicProducer:
600    """An interface for producing events to a particular topic.
601
602    A `TopicProducer` allows you to send events to the specific topic it was
603    initialized for. Once you have a `TopicProducer`, you can send events to
604    the topic, choosing which partition each event should be delivered to.
605    """
606
607    _inner: _TopicProducer
608
609    def __init__(self, inner: _TopicProducer):
610        self._inner = inner
611
612    def send_string(self, buf: str) -> ProduceOutput:
613        """Sends a string to this producer’s topic"""
614        return self.send([], buf.encode("utf-8"))
615
616    async def async_send_string(self, buf: str) -> ProduceOutput:
617        """Sends a string to this producer’s topic"""
618        return await self.async_send([], buf.encode("utf-8"))
619
620    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
621        """
622        Sends a key/value record to this producer's Topic.
623
624        The partition that the record will be sent to is derived from the Key.
625        """
626        return ProduceOutput(self._inner.send(key, value))
627
628    async def async_send(
629        self, key: typing.List[int], value: typing.List[int]
630    ) -> ProduceOutput:
631        """
632        Async sends a key/value record to this producer's Topic.
633
634        The partition that the record will be sent to is derived from the Key.
635        """
636        produce_output = await self._inner.async_send(key, value)
637        return ProduceOutput(produce_output)
638
639    def flush(self) -> None:
640        """
641        Send all the queued records in the producer batches.
642        """
643        return self._inner.flush()
644
645    async def async_flush(self) -> None:
646        """
647        Async send all the queued records in the producer batches.
648        """
649        await self._inner.async_flush()
650
651    def send_all(
652        self, records: typing.List[typing.Tuple[bytes, bytes]]
653    ) -> typing.List[ProduceOutput]:
654        """
655        Sends a list of key/value records as a batch to this producer's Topic.
656        :param records: The list of records to send
657        """
658        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
659        return [
660            ProduceOutput(output_inner)
661            for output_inner in self._inner.send_all(records_inner)
662        ]
663
664    async def async_send_all(
665        self, records: typing.List[typing.Tuple[bytes, bytes]]
666    ) -> typing.List[ProduceOutput]:
667        """
668        Async sends a list of key/value records as a batch to this producer's Topic.
669        :param records: The list of records to send
670        """
671        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
672        return [
673            ProduceOutput(output_inner)
674            for output_inner in await self._inner.async_send_all(records_inner)
675        ]
676
677
678class PartitionSelectionStrategy:
679    """Stragegy to select partitions"""
680
681    _inner: _PartitionSelectionStrategy
682
683    def __init__(self, inner: _FluvioConfig):
684        self._inner = inner
685
686    @classmethod
687    def with_all(cls, topic: str):
688        """select all partitions of one topic"""
689        return cls(_PartitionSelectionStrategy.with_all(topic))
690
691    @classmethod
692    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
693        """select multiple partitions of multiple topics"""
694        return cls(_PartitionSelectionStrategy.with_multiple(topic))
695
696
697class ConsumerOffset:
698    """Consumer offset"""
699
700    _inner: _ConsumerOffset
701
702    def __init__(self, inner: _ConsumerOffset):
703        self._inner = inner
704
705    def consumer_id(self) -> str:
706        return self._inner.consumer_id()
707
708    def topic(self) -> str:
709        return self._inner.topic()
710
711    def partition(self) -> int:
712        return self._inner.partition()
713
714    def offset(self) -> int:
715        return self._inner.offset()
716
717    def modified_time(self) -> int:
718        return self._inner.modified_time()
719
720
721class FluvioConfig:
722    """Configuration for Fluvio client"""
723
724    _inner: _FluvioConfig
725
726    def __init__(self, inner: _FluvioConfig):
727        self._inner = inner
728
729    @classmethod
730    def load(cls):
731        """get current cluster config from default profile"""
732        return cls(_FluvioConfig.load())
733
734    @classmethod
735    def new(cls, addr: str):
736        """Create a new cluster configuration with no TLS."""
737        return cls(_FluvioConfig.new(addr))
738
739    def set_endpoint(self, endpoint: str):
740        """set endpoint"""
741        self._inner.set_endpoint(endpoint)
742
743    def set_use_spu_local_address(self, val: bool):
744        """set wheather to use spu local address"""
745        self._inner.set_use_spu_local_address(val)
746
747    def disable_tls(self):
748        """disable tls for this config"""
749        self._inner.disable_tls()
750
751    def set_anonymous_tls(self):
752        """set the config to use anonymous tls"""
753        self._inner.set_anonymous_tls()
754
755    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
756        """specify inline tls parameters"""
757        self._inner.set_inline_tls(domain, key, cert, ca_cert)
758
759    def set_tls_file_paths(
760        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
761    ):
762        """specify paths to tls files"""
763        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
764
765    def set_client_id(self, client_id: str):
766        """set client id"""
767        self._inner.set_client_id(client_id)
768
769    def unset_client_id(self):
770        """remove the configured client id from config"""
771        self._inner.unset_client_id()
772
773
774class Fluvio:
775    """An interface for interacting with Fluvio streaming."""
776
777    _inner: _Fluvio
778
779    def __init__(self, inner: _Fluvio):
780        self._inner = inner
781
782    @classmethod
783    def connect(cls):
784        """Tries to create a new Fluvio client using the current profile from
785        `~/.fluvio/config`
786        """
787        return cls(_Fluvio.connect())
788
789    @classmethod
790    def connect_with_config(cls, config: FluvioConfig):
791        """Creates a new Fluvio client using the given configuration"""
792        return cls(_Fluvio.connect_with_config(config._inner))
793
794    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
795        """Creates consumer with settings defined in config
796
797        This is the recommended way to create a consume records.
798        """
799        return ConsumerIterator(self._inner.consumer_with_config(config))
800
801    def topic_producer_with_config(self, topic: str, config: TopicProducerConfig):
802        """Creates a new `TopicProducer` for the given topic name with config"""
803        return TopicProducer(self._inner.topic_producer_with_config(topic, config))
804
805    def topic_producer(self, topic: str) -> TopicProducer:
806        """
807        Creates a new `TopicProducer` for the given topic name.
808
809        Currently, producers are scoped to a specific Fluvio topic. That means
810        when you send events via a producer, you must specify which partition
811        each event should go to.
812        """
813        return TopicProducer(self._inner.topic_producer(topic))
814
815    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
816        """Creates a new `PartitionConsumer` for the given topic and partition
817
818        Currently, consumers are scoped to both a specific Fluvio topic and to
819        a particular partition within that topic. That means that if you have a
820        topic with multiple partitions, then in order to receive all of the
821        events in all of the partitions, you will need to create one consumer
822        per partition.
823        """
824        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
825
826    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
827        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
828
829        Currently, consumers are scoped to both a specific Fluvio topic and to
830        its all partitions within that topic.
831        """
832        strategy = PartitionSelectionStrategy.with_all(topic)
833        return MultiplePartitionConsumer(
834            self._inner.multi_partition_consumer(strategy._inner)
835        )
836
837    def multi_topic_partition_consumer(
838        self, selections: typing.List[typing.Tuple[str, int]]
839    ) -> MultiplePartitionConsumer:
840        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
841
842        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
843        """
844        strategy = PartitionSelectionStrategy.with_multiple(selections)
845        return MultiplePartitionConsumer(
846            self._inner.multi_partition_consumer(strategy._inner)
847        )
848
849    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
850        """Fetch the current offsets of the consumer"""
851        return self._inner.consumer_offsets()
852
853    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
854        """Delete the consumer offset"""
855        return self._inner.delete_consumer_offset(consumer, topic, partition)
856
857    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
858        item = stream.next()
859        while item is not None:
860            yield Record(item)
861            item = stream.next()
862
863
864class FluvioAdmin:
865    _inner: _FluvioAdmin
866
867    def __init__(self, inner: _FluvioAdmin):
868        self._inner = inner
869
870    def connect():
871        return FluvioAdmin(_FluvioAdmin.connect())
872
873    def connect_with_config(config: FluvioConfig):
874        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
875
876    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
877        partitions = 1
878        replication = 1
879        ignore_rack = True
880        dry_run = False
881        spec = (
882            spec
883            if spec is not None
884            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
885        )
886        return self._inner.create_topic(topic, dry_run, spec)
887
888    def create_topic_with_config(
889        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
890    ):
891        return self._inner.create_topic_with_config(topic, req._inner, spec)
892
893    def delete_topic(self, topic: str):
894        return self._inner.delete_topic(topic)
895
896    def all_topics(self) -> typing.List[MetadataTopicSpec]:
897        return self._inner.all_topics()
898
899    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
900        return self._inner.list_topics(filters)
901
902    def list_topics_with_params(
903        self, filters: typing.List[str], summary: bool
904    ) -> typing.List[MetadataTopicSpec]:
905        return self._inner.list_topics_with_params(filters, summary)
906
907    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
908        return self._topic_spec_generator(self._inner.watch_topic())
909
910    def create_smartmodule(self, name: str, path: str, dry_run: bool):
911        spec = SmartModuleSpec.new(path)
912        return self._inner.create_smart_module(name, dry_run, spec._inner)
913
914    def delete_smartmodule(self, name: str):
915        return self._inner.delete_smart_module(name)
916
917    def list_smartmodules(
918        self, filters: typing.List[str]
919    ) -> typing.List[MetadataSmartModuleSpec]:
920        return self._inner.list_smart_modules(filters)
921
922    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
923        return self._smart_module_spec_generator(self._inner.watch_smart_module())
924
925    def list_partitions(
926        self, filters: typing.List[str]
927    ) -> typing.List[MetadataPartitionSpec]:
928        return self._inner.list_partitions(filters)
929
930    def _topic_spec_generator(
931        self, stream: _WatchTopicStream
932    ) -> typing.Iterator[MetaUpdateTopicSpec]:
933        item = stream.next().inner()
934        while item is not None:
935            yield MetaUpdateTopicSpec(item)
936            item = stream.next().inner()
937
938    def _smart_module_spec_generator(
939        self, stream: _WatchSmartModuleStream
940    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
941        item = stream.next().inner()
942        while item is not None:
943            yield MetaUpdateSmartModuleSpec(item)
944            item = stream.next().inner()
class Fluvio:
775class Fluvio:
776    """An interface for interacting with Fluvio streaming."""
777
778    _inner: _Fluvio
779
780    def __init__(self, inner: _Fluvio):
781        self._inner = inner
782
783    @classmethod
784    def connect(cls):
785        """Tries to create a new Fluvio client using the current profile from
786        `~/.fluvio/config`
787        """
788        return cls(_Fluvio.connect())
789
790    @classmethod
791    def connect_with_config(cls, config: FluvioConfig):
792        """Creates a new Fluvio client using the given configuration"""
793        return cls(_Fluvio.connect_with_config(config._inner))
794
795    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
796        """Creates consumer with settings defined in config
797
798        This is the recommended way to create a consume records.
799        """
800        return ConsumerIterator(self._inner.consumer_with_config(config))
801
802    def topic_producer_with_config(self, topic: str, config: TopicProducerConfig):
803        """Creates a new `TopicProducer` for the given topic name with config"""
804        return TopicProducer(self._inner.topic_producer_with_config(topic, config))
805
806    def topic_producer(self, topic: str) -> TopicProducer:
807        """
808        Creates a new `TopicProducer` for the given topic name.
809
810        Currently, producers are scoped to a specific Fluvio topic. That means
811        when you send events via a producer, you must specify which partition
812        each event should go to.
813        """
814        return TopicProducer(self._inner.topic_producer(topic))
815
816    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
817        """Creates a new `PartitionConsumer` for the given topic and partition
818
819        Currently, consumers are scoped to both a specific Fluvio topic and to
820        a particular partition within that topic. That means that if you have a
821        topic with multiple partitions, then in order to receive all of the
822        events in all of the partitions, you will need to create one consumer
823        per partition.
824        """
825        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
826
827    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
828        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
829
830        Currently, consumers are scoped to both a specific Fluvio topic and to
831        its all partitions within that topic.
832        """
833        strategy = PartitionSelectionStrategy.with_all(topic)
834        return MultiplePartitionConsumer(
835            self._inner.multi_partition_consumer(strategy._inner)
836        )
837
838    def multi_topic_partition_consumer(
839        self, selections: typing.List[typing.Tuple[str, int]]
840    ) -> MultiplePartitionConsumer:
841        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
842
843        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
844        """
845        strategy = PartitionSelectionStrategy.with_multiple(selections)
846        return MultiplePartitionConsumer(
847            self._inner.multi_partition_consumer(strategy._inner)
848        )
849
850    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
851        """Fetch the current offsets of the consumer"""
852        return self._inner.consumer_offsets()
853
854    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
855        """Delete the consumer offset"""
856        return self._inner.delete_consumer_offset(consumer, topic, partition)
857
858    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
859        item = stream.next()
860        while item is not None:
861            yield Record(item)
862            item = stream.next()

An interface for interacting with Fluvio streaming.

Fluvio(inner: Fluvio)
780    def __init__(self, inner: _Fluvio):
781        self._inner = inner
@classmethod
def connect(cls):
783    @classmethod
784    def connect(cls):
785        """Tries to create a new Fluvio client using the current profile from
786        `~/.fluvio/config`
787        """
788        return cls(_Fluvio.connect())

Tries to create a new Fluvio client using the current profile from ~/.fluvio/config

@classmethod
def connect_with_config(cls, config: FluvioConfig):
790    @classmethod
791    def connect_with_config(cls, config: FluvioConfig):
792        """Creates a new Fluvio client using the given configuration"""
793        return cls(_Fluvio.connect_with_config(config._inner))

Creates a new Fluvio client using the given configuration

def consumer_with_config(self, config: ConsumerConfigExt) -> fluvio.ConsumerIterator:
795    def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
796        """Creates consumer with settings defined in config
797
798        This is the recommended way to create a consume records.
799        """
800        return ConsumerIterator(self._inner.consumer_with_config(config))

Creates consumer with settings defined in config

This is the recommended way to create a consume records.

def topic_producer_with_config(self, topic: str, config: fluvio.__init__.TopicProducerConfig):
802    def topic_producer_with_config(self, topic: str, config: TopicProducerConfig):
803        """Creates a new `TopicProducer` for the given topic name with config"""
804        return TopicProducer(self._inner.topic_producer_with_config(topic, config))

Creates a new TopicProducer for the given topic name with config

def topic_producer(self, topic: str) -> TopicProducer:
806    def topic_producer(self, topic: str) -> TopicProducer:
807        """
808        Creates a new `TopicProducer` for the given topic name.
809
810        Currently, producers are scoped to a specific Fluvio topic. That means
811        when you send events via a producer, you must specify which partition
812        each event should go to.
813        """
814        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.

def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
816    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
817        """Creates a new `PartitionConsumer` for the given topic and partition
818
819        Currently, consumers are scoped to both a specific Fluvio topic and to
820        a particular partition within that topic. That means that if you have a
821        topic with multiple partitions, then in order to receive all of the
822        events in all of the partitions, you will need to create one consumer
823        per partition.
824        """
825        return PartitionConsumer(self._inner.partition_consumer(topic, partition))

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
827    def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer:
828        """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions
829
830        Currently, consumers are scoped to both a specific Fluvio topic and to
831        its all partitions within that topic.
832        """
833        strategy = PartitionSelectionStrategy.with_all(topic)
834        return MultiplePartitionConsumer(
835            self._inner.multi_partition_consumer(strategy._inner)
836        )

Creates a new MultiplePartitionConsumer for the given topic and its all partitions

Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.

def multi_topic_partition_consumer( self, selections: List[Tuple[str, int]]) -> MultiplePartitionConsumer:
838    def multi_topic_partition_consumer(
839        self, selections: typing.List[typing.Tuple[str, int]]
840    ) -> MultiplePartitionConsumer:
841        """Creates a new `MultiplePartitionConsumer` for the given topics and partitions
842
843        Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
844        """
845        strategy = PartitionSelectionStrategy.with_multiple(selections)
846        return MultiplePartitionConsumer(
847            self._inner.multi_partition_consumer(strategy._inner)
848        )

Creates a new MultiplePartitionConsumer for the given topics and partitions

Currently, consumers are scoped to a list of Fluvio topic and partition tuple.

def consumer_offsets(self) -> List[ConsumerOffset]:
850    def consumer_offsets(self) -> typing.List[ConsumerOffset]:
851        """Fetch the current offsets of the consumer"""
852        return self._inner.consumer_offsets()

Fetch the current offsets of the consumer

def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
854    def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
855        """Delete the consumer offset"""
856        return self._inner.delete_consumer_offset(consumer, topic, partition)

Delete the consumer offset

class FluvioConfig:
722class FluvioConfig:
723    """Configuration for Fluvio client"""
724
725    _inner: _FluvioConfig
726
727    def __init__(self, inner: _FluvioConfig):
728        self._inner = inner
729
730    @classmethod
731    def load(cls):
732        """get current cluster config from default profile"""
733        return cls(_FluvioConfig.load())
734
735    @classmethod
736    def new(cls, addr: str):
737        """Create a new cluster configuration with no TLS."""
738        return cls(_FluvioConfig.new(addr))
739
740    def set_endpoint(self, endpoint: str):
741        """set endpoint"""
742        self._inner.set_endpoint(endpoint)
743
744    def set_use_spu_local_address(self, val: bool):
745        """set wheather to use spu local address"""
746        self._inner.set_use_spu_local_address(val)
747
748    def disable_tls(self):
749        """disable tls for this config"""
750        self._inner.disable_tls()
751
752    def set_anonymous_tls(self):
753        """set the config to use anonymous tls"""
754        self._inner.set_anonymous_tls()
755
756    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
757        """specify inline tls parameters"""
758        self._inner.set_inline_tls(domain, key, cert, ca_cert)
759
760    def set_tls_file_paths(
761        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
762    ):
763        """specify paths to tls files"""
764        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
765
766    def set_client_id(self, client_id: str):
767        """set client id"""
768        self._inner.set_client_id(client_id)
769
770    def unset_client_id(self):
771        """remove the configured client id from config"""
772        self._inner.unset_client_id()

Configuration for Fluvio client

FluvioConfig(inner: FluvioConfig)
727    def __init__(self, inner: _FluvioConfig):
728        self._inner = inner
@classmethod
def load(cls):
730    @classmethod
731    def load(cls):
732        """get current cluster config from default profile"""
733        return cls(_FluvioConfig.load())

get current cluster config from default profile

@classmethod
def new(cls, addr: str):
735    @classmethod
736    def new(cls, addr: str):
737        """Create a new cluster configuration with no TLS."""
738        return cls(_FluvioConfig.new(addr))

Create a new cluster configuration with no TLS.

def set_endpoint(self, endpoint: str):
740    def set_endpoint(self, endpoint: str):
741        """set endpoint"""
742        self._inner.set_endpoint(endpoint)

set endpoint

def set_use_spu_local_address(self, val: bool):
744    def set_use_spu_local_address(self, val: bool):
745        """set wheather to use spu local address"""
746        self._inner.set_use_spu_local_address(val)

set wheather to use spu local address

def disable_tls(self):
748    def disable_tls(self):
749        """disable tls for this config"""
750        self._inner.disable_tls()

disable tls for this config

def set_anonymous_tls(self):
752    def set_anonymous_tls(self):
753        """set the config to use anonymous tls"""
754        self._inner.set_anonymous_tls()

set the config to use anonymous tls

def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
756    def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
757        """specify inline tls parameters"""
758        self._inner.set_inline_tls(domain, key, cert, ca_cert)

specify inline tls parameters

def set_tls_file_paths(self, domain: str, key_path: str, cert_path: str, ca_cert_path: str):
760    def set_tls_file_paths(
761        self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
762    ):
763        """specify paths to tls files"""
764        self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)

specify paths to tls files

def set_client_id(self, client_id: str):
766    def set_client_id(self, client_id: str):
767        """set client id"""
768        self._inner.set_client_id(client_id)

set client id

def unset_client_id(self):
770    def unset_client_id(self):
771        """remove the configured client id from config"""
772        self._inner.unset_client_id()

remove the configured client id from config

class FluvioAdmin:
865class FluvioAdmin:
866    _inner: _FluvioAdmin
867
868    def __init__(self, inner: _FluvioAdmin):
869        self._inner = inner
870
871    def connect():
872        return FluvioAdmin(_FluvioAdmin.connect())
873
874    def connect_with_config(config: FluvioConfig):
875        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
876
877    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
878        partitions = 1
879        replication = 1
880        ignore_rack = True
881        dry_run = False
882        spec = (
883            spec
884            if spec is not None
885            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
886        )
887        return self._inner.create_topic(topic, dry_run, spec)
888
889    def create_topic_with_config(
890        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
891    ):
892        return self._inner.create_topic_with_config(topic, req._inner, spec)
893
894    def delete_topic(self, topic: str):
895        return self._inner.delete_topic(topic)
896
897    def all_topics(self) -> typing.List[MetadataTopicSpec]:
898        return self._inner.all_topics()
899
900    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
901        return self._inner.list_topics(filters)
902
903    def list_topics_with_params(
904        self, filters: typing.List[str], summary: bool
905    ) -> typing.List[MetadataTopicSpec]:
906        return self._inner.list_topics_with_params(filters, summary)
907
908    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
909        return self._topic_spec_generator(self._inner.watch_topic())
910
911    def create_smartmodule(self, name: str, path: str, dry_run: bool):
912        spec = SmartModuleSpec.new(path)
913        return self._inner.create_smart_module(name, dry_run, spec._inner)
914
915    def delete_smartmodule(self, name: str):
916        return self._inner.delete_smart_module(name)
917
918    def list_smartmodules(
919        self, filters: typing.List[str]
920    ) -> typing.List[MetadataSmartModuleSpec]:
921        return self._inner.list_smart_modules(filters)
922
923    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
924        return self._smart_module_spec_generator(self._inner.watch_smart_module())
925
926    def list_partitions(
927        self, filters: typing.List[str]
928    ) -> typing.List[MetadataPartitionSpec]:
929        return self._inner.list_partitions(filters)
930
931    def _topic_spec_generator(
932        self, stream: _WatchTopicStream
933    ) -> typing.Iterator[MetaUpdateTopicSpec]:
934        item = stream.next().inner()
935        while item is not None:
936            yield MetaUpdateTopicSpec(item)
937            item = stream.next().inner()
938
939    def _smart_module_spec_generator(
940        self, stream: _WatchSmartModuleStream
941    ) -> typing.Iterator[MetaUpdateSmartModuleSpec]:
942        item = stream.next().inner()
943        while item is not None:
944            yield MetaUpdateSmartModuleSpec(item)
945            item = stream.next().inner()
FluvioAdmin(inner: FluvioAdmin)
868    def __init__(self, inner: _FluvioAdmin):
869        self._inner = inner
def connect():
871    def connect():
872        return FluvioAdmin(_FluvioAdmin.connect())
def connect_with_config(config: FluvioConfig):
874    def connect_with_config(config: FluvioConfig):
875        return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))
def create_topic(self, topic: str, spec: Optional[TopicSpec] = None):
877    def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None):
878        partitions = 1
879        replication = 1
880        ignore_rack = True
881        dry_run = False
882        spec = (
883            spec
884            if spec is not None
885            else _TopicSpec.new_computed(partitions, replication, ignore_rack)
886        )
887        return self._inner.create_topic(topic, dry_run, spec)
def create_topic_with_config( self, topic: str, req: CommonCreateRequest, spec: TopicSpec):
889    def create_topic_with_config(
890        self, topic: str, req: CommonCreateRequest, spec: _TopicSpec
891    ):
892        return self._inner.create_topic_with_config(topic, req._inner, spec)
def delete_topic(self, topic: str):
894    def delete_topic(self, topic: str):
895        return self._inner.delete_topic(topic)
def all_topics(self) -> List[MetadataTopicSpec]:
897    def all_topics(self) -> typing.List[MetadataTopicSpec]:
898        return self._inner.all_topics()
def list_topics(self, filters: List[str]) -> List[MetadataTopicSpec]:
900    def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]:
901        return self._inner.list_topics(filters)
def list_topics_with_params( self, filters: List[str], summary: bool) -> List[MetadataTopicSpec]:
903    def list_topics_with_params(
904        self, filters: typing.List[str], summary: bool
905    ) -> typing.List[MetadataTopicSpec]:
906        return self._inner.list_topics_with_params(filters, summary)
def watch_topic(self) -> Iterator[MetadataTopicSpec]:
908    def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]:
909        return self._topic_spec_generator(self._inner.watch_topic())
def create_smartmodule(self, name: str, path: str, dry_run: bool):
911    def create_smartmodule(self, name: str, path: str, dry_run: bool):
912        spec = SmartModuleSpec.new(path)
913        return self._inner.create_smart_module(name, dry_run, spec._inner)
def delete_smartmodule(self, name: str):
915    def delete_smartmodule(self, name: str):
916        return self._inner.delete_smart_module(name)
def list_smartmodules(self, filters: List[str]) -> List[fluvio.specs.MetadataSmartModuleSpec]:
918    def list_smartmodules(
919        self, filters: typing.List[str]
920    ) -> typing.List[MetadataSmartModuleSpec]:
921        return self._inner.list_smart_modules(filters)
def watch_smartmodule(self) -> Iterator[fluvio.specs.MetadataSmartModuleSpec]:
923    def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]:
924        return self._smart_module_spec_generator(self._inner.watch_smart_module())
def list_partitions(self, filters: List[str]) -> List[MetadataPartitionSpec]:
926    def list_partitions(
927        self, filters: typing.List[str]
928    ) -> typing.List[MetadataPartitionSpec]:
929        return self._inner.list_partitions(filters)
@dataclass(frozen=True)
class TopicSpec:
 26@dataclass(frozen=True)
 27class TopicSpec:
 28    mode: TopicMode = TopicMode.COMPUTED
 29    partitions: int = 1
 30    replications: int = 1
 31    ignore_rack: bool = True
 32    replica_assignment: Optional[List[PartitionMap]] = None
 33    retention_time: Optional[int] = None
 34    segment_size: Optional[int] = None
 35    compression_type: Optional[CompressionType] = None
 36    max_partition_size: Optional[int] = None
 37    system: bool = False
 38
 39    @classmethod
 40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
 41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
 42
 43    @classmethod
 44    def create(cls) -> "TopicSpec":
 45        """Alternative constructor method"""
 46        return cls()
 47
 48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
 49        """Set the assigned replica configuration"""
 50        return replace(
 51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
 52        )
 53
 54    def as_mirror_topic(self) -> "TopicSpec":
 55        """Set as a mirror topic"""
 56        return replace(self, mode=TopicMode.MIRROR)
 57
 58    def with_partitions(self, partitions: int) -> "TopicSpec":
 59        """Set the specified partitions"""
 60        if partitions < 0:
 61            raise ValueError("Partitions must be a positive integer")
 62        return replace(self, partitions=partitions)
 63
 64    def with_replications(self, replications: int) -> "TopicSpec":
 65        """Set the specified replication factor"""
 66        if replications < 0:
 67            raise ValueError("Replication factor must be a positive integer")
 68        return replace(self, replications=replications)
 69
 70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
 71        """Set the rack ignore setting"""
 72        return replace(self, ignore_rack=ignore)
 73
 74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
 75        """Set the specified compression type"""
 76        return replace(self, compression_type=compression)
 77
 78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
 79        """Set the specified retention time"""
 80
 81        if isinstance(retention_time, int):
 82            return replace(self, retention_time=retention_time)
 83
 84        parsed_time = parse_timespan(retention_time)
 85        return replace(self, retention_time=parsed_time)
 86
 87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
 88        """Set the specified segment size"""
 89        if isinstance(size, int):
 90            return replace(self, segment_size=size)
 91
 92        parsed_size = parse_byte_size(size)
 93        return replace(self, segment_size=parsed_size)
 94
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)
102
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)
106
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec
TopicSpec( mode: TopicMode = <TopicMode.COMPUTED: 'computed'>, partitions: int = 1, replications: int = 1, ignore_rack: bool = True, replica_assignment: Optional[List[PartitionMap]] = None, retention_time: Optional[int] = None, segment_size: Optional[int] = None, compression_type: Optional[CompressionType] = None, max_partition_size: Optional[int] = None, system: bool = False)
mode: TopicMode = <TopicMode.COMPUTED: 'computed'>
partitions: int = 1
replications: int = 1
ignore_rack: bool = True
replica_assignment: Optional[List[PartitionMap]] = None
retention_time: Optional[int] = None
segment_size: Optional[int] = None
compression_type: Optional[CompressionType] = None
max_partition_size: Optional[int] = None
system: bool = False
@classmethod
def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
39    @classmethod
40    def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True):
41        return cls(_TopicSpec.new_computed(partitions, replication, ignore))
@classmethod
def create(cls) -> TopicSpec:
43    @classmethod
44    def create(cls) -> "TopicSpec":
45        """Alternative constructor method"""
46        return cls()

Alternative constructor method

def with_assignment( self, replica_assignment: List[PartitionMap]) -> TopicSpec:
48    def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec":
49        """Set the assigned replica configuration"""
50        return replace(
51            self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
52        )

Set the assigned replica configuration

def as_mirror_topic(self) -> TopicSpec:
54    def as_mirror_topic(self) -> "TopicSpec":
55        """Set as a mirror topic"""
56        return replace(self, mode=TopicMode.MIRROR)

Set as a mirror topic

def with_partitions(self, partitions: int) -> TopicSpec:
58    def with_partitions(self, partitions: int) -> "TopicSpec":
59        """Set the specified partitions"""
60        if partitions < 0:
61            raise ValueError("Partitions must be a positive integer")
62        return replace(self, partitions=partitions)

Set the specified partitions

def with_replications(self, replications: int) -> TopicSpec:
64    def with_replications(self, replications: int) -> "TopicSpec":
65        """Set the specified replication factor"""
66        if replications < 0:
67            raise ValueError("Replication factor must be a positive integer")
68        return replace(self, replications=replications)

Set the specified replication factor

def with_ignore_rack(self, ignore: bool = True) -> TopicSpec:
70    def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec":
71        """Set the rack ignore setting"""
72        return replace(self, ignore_rack=ignore)

Set the rack ignore setting

def with_compression( self, compression: CompressionType) -> TopicSpec:
74    def with_compression(self, compression: CompressionType) -> "TopicSpec":
75        """Set the specified compression type"""
76        return replace(self, compression_type=compression)

Set the specified compression type

def with_retention_time(self, retention_time: Union[str, int]) -> TopicSpec:
78    def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec":
79        """Set the specified retention time"""
80
81        if isinstance(retention_time, int):
82            return replace(self, retention_time=retention_time)
83
84        parsed_time = parse_timespan(retention_time)
85        return replace(self, retention_time=parsed_time)

Set the specified retention time

def with_segment_size(self, size: Union[str, int]) -> TopicSpec:
87    def with_segment_size(self, size: Union[str, int]) -> "TopicSpec":
88        """Set the specified segment size"""
89        if isinstance(size, int):
90            return replace(self, segment_size=size)
91
92        parsed_size = parse_byte_size(size)
93        return replace(self, segment_size=parsed_size)

Set the specified segment size

def with_max_partition_size(self, size: Union[str, int]) -> TopicSpec:
 95    def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec":
 96        """Set the specified max partition size"""
 97        if isinstance(size, int):
 98            return replace(self, max_partition_size=size)
 99
100        parsed_size = parse_byte_size(size)
101        return replace(self, max_partition_size=parsed_size)

Set the specified max partition size

def as_system_topic(self, is_system: bool = True) -> TopicSpec:
103    def as_system_topic(self, is_system: bool = True) -> "TopicSpec":
104        """Set the topic as an internal system topic"""
105        return replace(self, system=is_system)

Set the topic as an internal system topic

def build(self) -> TopicSpec:
107    def build(self) -> _TopicSpec:
108        """Build the TopicSpec based on the current configuration"""
109        # Similar implementation to the original build method
110        if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
111            spec = _TopicSpec.new_assigned(self.replica_assignment)
112        elif self.mode == TopicMode.MIRROR:
113            spec = _TopicSpec.new_mirror()
114        else:
115            spec = _TopicSpec.new_computed(
116                self.partitions, self.replications, self.ignore_rack
117            )
118
119        spec.set_system(self.system)
120
121        if self.retention_time is not None:
122            spec.set_retention_time(self.retention_time)
123
124        if self.max_partition_size is not None or self.segment_size is not None:
125            spec.set_storage(self.max_partition_size, self.segment_size)
126
127        if self.compression_type is not None:
128            spec.set_compression_type(self.compression_type.value)
129
130        return spec

Build the TopicSpec based on the current configuration

class CompressionType(enum.Enum):
17class CompressionType(Enum):
18    NONE = "none"
19    GZIP = "gzip"
20    SNAPPY = "snappy"
21    LZ4 = "lz4"
22    ANY = "any"
23    ZSTD = "zstd"
NONE = <CompressionType.NONE: 'none'>
GZIP = <CompressionType.GZIP: 'gzip'>
SNAPPY = <CompressionType.SNAPPY: 'snappy'>
LZ4 = <CompressionType.LZ4: 'lz4'>
ANY = <CompressionType.ANY: 'any'>
ZSTD = <CompressionType.ZSTD: 'zstd'>
class TopicMode(enum.Enum):
11class TopicMode(Enum):
12    MIRROR = "mirror"
13    ASSIGNED = "assigned"
14    COMPUTED = "computed"
MIRROR = <TopicMode.MIRROR: 'mirror'>
ASSIGNED = <TopicMode.ASSIGNED: 'assigned'>
COMPUTED = <TopicMode.COMPUTED: 'computed'>
class Record:
 9class Record:
10    """The individual record for a given stream."""
11
12    _inner: _Record
13
14    def __init__(self, inner: _Record):
15        self._inner = inner
16
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()
20
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()
24
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()
28
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()
32
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()
36
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

The individual record for a given stream.

Record(inner: Record)
14    def __init__(self, inner: _Record):
15        self._inner = inner
def offset(self) -> int:
17    def offset(self) -> int:
18        """The offset from the initial offset for a given stream."""
19        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
21    def value(self) -> typing.List[int]:
22        """Returns the contents of this Record's value"""
23        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
25    def value_string(self) -> str:
26        """The UTF-8 decoded value for this record."""
27        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
29    def key(self) -> typing.List[int]:
30        """Returns the contents of this Record's key, if it exists"""
31        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
33    def key_string(self) -> str:
34        """The UTF-8 decoded key for this record."""
35        return self._inner.key_string()

The UTF-8 decoded key for this record.

def timestamp(self) -> int:
37    def timestamp(self) -> int:
38        """Timestamp of this record."""
39        return self._inner.timestamp()

Timestamp of this record.

class RecordMetadata:
42class RecordMetadata:
43    """Metadata of a record send to a topic."""
44
45    _inner: _RecordMetadata
46
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
49
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()
53
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Metadata of a record send to a topic.

RecordMetadata(inner: RecordMetadata)
47    def __init__(self, inner: _RecordMetadata):
48        self._inner = inner
def offset(self) -> int:
50    def offset(self) -> int:
51        """Return the offset of the sent record in the topic/partition."""
52        return self._inner.offset()

Return the offset of the sent record in the topic/partition.

def partition_id(self) -> int:
54    def partition_id(self) -> int:
55        """Return the partition index the record was sent to."""
56        return self._inner.partition_id()

Return the partition index the record was sent to.

class Offset:

Describes the location of a record stored in a Fluvio partition.

A topic is composed of one or more partitions.

def absolute(index):

Specifies an absolute offset with the given index within the partition

def beginning():

Specifies an offset starting at the beginning of the partition

def from_beginning(offset):

Specifies an offset relative to the beginning of the partition

def end():

Specifies an offset relative to the beginning of the partition

def from_end(offset):

Specifies an offset relative to the beginning of the partition

class TopicProducer:
600class TopicProducer:
601    """An interface for producing events to a particular topic.
602
603    A `TopicProducer` allows you to send events to the specific topic it was
604    initialized for. Once you have a `TopicProducer`, you can send events to
605    the topic, choosing which partition each event should be delivered to.
606    """
607
608    _inner: _TopicProducer
609
610    def __init__(self, inner: _TopicProducer):
611        self._inner = inner
612
613    def send_string(self, buf: str) -> ProduceOutput:
614        """Sends a string to this producer’s topic"""
615        return self.send([], buf.encode("utf-8"))
616
617    async def async_send_string(self, buf: str) -> ProduceOutput:
618        """Sends a string to this producer’s topic"""
619        return await self.async_send([], buf.encode("utf-8"))
620
621    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
622        """
623        Sends a key/value record to this producer's Topic.
624
625        The partition that the record will be sent to is derived from the Key.
626        """
627        return ProduceOutput(self._inner.send(key, value))
628
629    async def async_send(
630        self, key: typing.List[int], value: typing.List[int]
631    ) -> ProduceOutput:
632        """
633        Async sends a key/value record to this producer's Topic.
634
635        The partition that the record will be sent to is derived from the Key.
636        """
637        produce_output = await self._inner.async_send(key, value)
638        return ProduceOutput(produce_output)
639
640    def flush(self) -> None:
641        """
642        Send all the queued records in the producer batches.
643        """
644        return self._inner.flush()
645
646    async def async_flush(self) -> None:
647        """
648        Async send all the queued records in the producer batches.
649        """
650        await self._inner.async_flush()
651
652    def send_all(
653        self, records: typing.List[typing.Tuple[bytes, bytes]]
654    ) -> typing.List[ProduceOutput]:
655        """
656        Sends a list of key/value records as a batch to this producer's Topic.
657        :param records: The list of records to send
658        """
659        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
660        return [
661            ProduceOutput(output_inner)
662            for output_inner in self._inner.send_all(records_inner)
663        ]
664
665    async def async_send_all(
666        self, records: typing.List[typing.Tuple[bytes, bytes]]
667    ) -> typing.List[ProduceOutput]:
668        """
669        Async sends a list of key/value records as a batch to this producer's Topic.
670        :param records: The list of records to send
671        """
672        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
673        return [
674            ProduceOutput(output_inner)
675            for output_inner in await self._inner.async_send_all(records_inner)
676        ]

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: TopicProducer)
610    def __init__(self, inner: _TopicProducer):
611        self._inner = inner
def send_string(self, buf: str) -> ProduceOutput:
613    def send_string(self, buf: str) -> ProduceOutput:
614        """Sends a string to this producer’s topic"""
615        return self.send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

async def async_send_string(self, buf: str) -> ProduceOutput:
617    async def async_send_string(self, buf: str) -> ProduceOutput:
618        """Sends a string to this producer’s topic"""
619        return await self.async_send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> ProduceOutput:
621    def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput:
622        """
623        Sends a key/value record to this producer's Topic.
624
625        The partition that the record will be sent to is derived from the Key.
626        """
627        return ProduceOutput(self._inner.send(key, value))

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

async def async_send(self, key: List[int], value: List[int]) -> ProduceOutput:
629    async def async_send(
630        self, key: typing.List[int], value: typing.List[int]
631    ) -> ProduceOutput:
632        """
633        Async sends a key/value record to this producer's Topic.
634
635        The partition that the record will be sent to is derived from the Key.
636        """
637        produce_output = await self._inner.async_send(key, value)
638        return ProduceOutput(produce_output)

Async sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
640    def flush(self) -> None:
641        """
642        Send all the queued records in the producer batches.
643        """
644        return self._inner.flush()

Send all the queued records in the producer batches.

async def async_flush(self) -> None:
646    async def async_flush(self) -> None:
647        """
648        Async send all the queued records in the producer batches.
649        """
650        await self._inner.async_flush()

Async send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
652    def send_all(
653        self, records: typing.List[typing.Tuple[bytes, bytes]]
654    ) -> typing.List[ProduceOutput]:
655        """
656        Sends a list of key/value records as a batch to this producer's Topic.
657        :param records: The list of records to send
658        """
659        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
660        return [
661            ProduceOutput(output_inner)
662            for output_inner in self._inner.send_all(records_inner)
663        ]

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
async def async_send_all(self, records: List[Tuple[bytes, bytes]]) -> List[ProduceOutput]:
665    async def async_send_all(
666        self, records: typing.List[typing.Tuple[bytes, bytes]]
667    ) -> typing.List[ProduceOutput]:
668        """
669        Async sends a list of key/value records as a batch to this producer's Topic.
670        :param records: The list of records to send
671        """
672        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
673        return [
674            ProduceOutput(output_inner)
675            for output_inner in await self._inner.async_send_all(records_inner)
676        ]

Async sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class TopicProducerConfig:
class TopicProducerConfigBuilder:
def batch_size(self, /, batch_size):

The type of the None singleton.

def max_request_size(self, /, max_request_size):

The type of the None singleton.

def batch_queue_size(self, /, batch_queue_size):

The type of the None singleton.

def linger(self, /, linger):

The type of the None singleton.

def set_specific_partitioner(self, /, partitioner):

The type of the None singleton.

def compression(self, /, compression):

The type of the None singleton.

def timeout(self, /, timeout):

The type of the None singleton.

def isolation(self, /, isolation):

The type of the None singleton.

def delivery_semantic(self, /, semantics):

The type of the None singleton.

def build(self, /):

The type of the None singleton.

class Compression:
class DeliverySemantic:
class Isolation:
ReadCommitted = Isolation.ReadCommitted
ReadUncommitted = Isolation.ReadUncommitted
class ProduceOutput:
216class ProduceOutput:
217    """Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
218
219    _inner: _ProduceOutput
220
221    def __init__(self, inner: _ProduceOutput) -> None:
222        self._inner = inner
223
224    def wait(self) -> typing.Optional[RecordMetadata]:
225        """Wait for the record metadata.
226
227        This is a blocking call and may only return a `RecordMetadata` once.
228        Any subsequent call to `wait` will return a `None` value.
229        Errors will be raised as exceptions of type `FluvioError`.
230        """
231        res = self._inner.wait()
232        if res is None:
233            return None
234        return RecordMetadata(res)
235
236    async def async_wait(self) -> typing.Optional[RecordMetadata]:
237        """Asynchronously wait for the record metadata.
238
239        This may only return a `RecordMetadata` once.
240        Any subsequent call to `wait` will return a `None` value.
241        """
242        return await self._inner.async_wait()

Returned by of TopicProducer.send call allowing access to sent record metadata.

ProduceOutput(inner: ProduceOutput)
221    def __init__(self, inner: _ProduceOutput) -> None:
222        self._inner = inner
def wait(self) -> Optional[RecordMetadata]:
224    def wait(self) -> typing.Optional[RecordMetadata]:
225        """Wait for the record metadata.
226
227        This is a blocking call and may only return a `RecordMetadata` once.
228        Any subsequent call to `wait` will return a `None` value.
229        Errors will be raised as exceptions of type `FluvioError`.
230        """
231        res = self._inner.wait()
232        if res is None:
233            return None
234        return RecordMetadata(res)

Wait for the record metadata.

This is a blocking call and may only return a RecordMetadata once. Any subsequent call to wait will return a None value. Errors will be raised as exceptions of type FluvioError.

async def async_wait(self) -> Optional[RecordMetadata]:
236    async def async_wait(self) -> typing.Optional[RecordMetadata]:
237        """Asynchronously wait for the record metadata.
238
239        This may only return a `RecordMetadata` once.
240        Any subsequent call to `wait` will return a `None` value.
241        """
242        return await self._inner.async_wait()

Asynchronously wait for the record metadata.

This may only return a RecordMetadata once. Any subsequent call to wait will return a None value.

class ConsumerConfigExt:
class ConsumerConfigExtBuilder:
def disable_continuous(self, /, val=True):

the fluvio client should disconnect after fetching the specified records

def max_bytes(self, /, max_bytes):

The type of the None singleton.

def offset_start(self, /, offset):

The type of the None singleton.

def offset_consumer(self, /, id):

The type of the None singleton.

def offset_strategy(self, /, strategy=Ellipsis):

The type of the None singleton.

def partition(self, /, partition):

The type of the None singleton.

def topic(self, /, topic):

The type of the None singleton.

def build(self, /):

The type of the None singleton.

class ConsumerConfig:
258class ConsumerConfig:
259    _inner: _ConsumerConfig
260
261    def __init__(self):
262        self._inner = _ConsumerConfig()
263
264    def disable_continuous(self, val: bool = True):
265        """Disable continuous mode after fetching specified records"""
266        self._inner.disable_continuous(val)
267
268    def smartmodule(
269        self,
270        name: str = None,
271        path: str = None,
272        kind: SmartModuleKind = None,
273        params: typing.Dict[str, str] = None,
274        aggregate: typing.List[bytes] = None,
275    ):
276        """
277        This is a method for adding a smartmodule to a consumer config either
278        using a `name` of a `SmartModule` or a `path` to a wasm binary.
279
280        Args:
281
282            name: str
283            path: str
284            kind: SmartModuleKind
285            params: Dict[str, str]
286            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
287
288        Raises:
289            "Require either a path or a name for a smartmodule."
290            "Only specify one of path or name not both."
291
292        Returns:
293
294            None
295        """
296
297        if kind is not None:
298            kind = kind.value
299
300        if path is None and name is None:
301            raise Exception("Require either a path or a name for a smartmodule.")
302
303        if path is not None and name is not None:
304            raise Exception("Only specify one of path or name not both.")
305
306        params = {} if params is None else params
307        param_keys = [x for x in params.keys()]
308        param_vals = [x for x in params.values()]
309
310        self._inner.smartmodule(
311            name,
312            path,
313            kind,
314            param_keys,
315            param_vals,
316            aggregate,
317            # These arguments are for Join stuff but that's not implemented on
318            # the python side yet
319            None,
320            None,
321            None,
322            None,
323        )
def disable_continuous(self, val: bool = True):
264    def disable_continuous(self, val: bool = True):
265        """Disable continuous mode after fetching specified records"""
266        self._inner.disable_continuous(val)

Disable continuous mode after fetching specified records

def smartmodule( self, name: str = None, path: str = None, kind: SmartModuleKind = None, params: Dict[str, str] = None, aggregate: List[bytes] = None):
268    def smartmodule(
269        self,
270        name: str = None,
271        path: str = None,
272        kind: SmartModuleKind = None,
273        params: typing.Dict[str, str] = None,
274        aggregate: typing.List[bytes] = None,
275    ):
276        """
277        This is a method for adding a smartmodule to a consumer config either
278        using a `name` of a `SmartModule` or a `path` to a wasm binary.
279
280        Args:
281
282            name: str
283            path: str
284            kind: SmartModuleKind
285            params: Dict[str, str]
286            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
287
288        Raises:
289            "Require either a path or a name for a smartmodule."
290            "Only specify one of path or name not both."
291
292        Returns:
293
294            None
295        """
296
297        if kind is not None:
298            kind = kind.value
299
300        if path is None and name is None:
301            raise Exception("Require either a path or a name for a smartmodule.")
302
303        if path is not None and name is not None:
304            raise Exception("Only specify one of path or name not both.")
305
306        params = {} if params is None else params
307        param_keys = [x for x in params.keys()]
308        param_vals = [x for x in params.values()]
309
310        self._inner.smartmodule(
311            name,
312            path,
313            kind,
314            param_keys,
315            param_vals,
316            aggregate,
317            # These arguments are for Join stuff but that's not implemented on
318            # the python side yet
319            None,
320            None,
321            None,
322            None,
323        )

This is a method for adding a smartmodule to a consumer config either using a name of a SmartModule or a path to a wasm binary.

Args:

name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule

Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."

Returns:

None
class PartitionConsumer:
346class PartitionConsumer:
347    """
348    An interface for consuming events from a particular partition
349
350    There are two ways to consume events: by "fetching" events and by
351    "streaming" events. Fetching involves specifying a range of events that you
352    want to consume via their Offset. A fetch is a sort of one-time batch
353    operation: you’ll receive all of the events in your range all at once. When
354    you consume events via Streaming, you specify a starting Offset and receive
355    an object that will continuously yield new events as they arrive.
356    """
357
358    _inner: _PartitionConsumer
359
360    def __init__(self, inner: _PartitionConsumer):
361        self._inner = inner
362
363    def stream(self, offset: Offset) -> typing.Iterator[Record]:
364        """
365        Continuously streams events from a particular offset in the consumer’s
366        partition. This returns a `Iterator[Record]` which is an
367        iterator.
368
369        Streaming is one of the two ways to consume events in Fluvio. It is a
370        continuous request for new records arriving in a partition, beginning
371        at a particular offset. You specify the starting point of the stream
372        using an Offset and periodically receive events, either individually or
373        in batches.
374        """
375        return self._generator(self._inner.stream(offset))
376
377    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
378        """
379        Continuously streams events from a particular offset in the consumer’s
380        partition. This returns a `AsyncIterator[Record]` which is an
381        iterator.
382
383        Streaming is one of the two ways to consume events in Fluvio. It is a
384        continuous request for new records arriving in a partition, beginning
385        at a particular offset. You specify the starting point of the stream
386        using an Offset and periodically receive events, either individually or
387        in batches.
388        """
389        return self._async_generator(await self._inner.async_stream(offset))
390
391    def stream_with_config(
392        self, offset: Offset, config: ConsumerConfig
393    ) -> typing.Iterator[Record]:
394        """
395        Continuously streams events from a particular offset with a SmartModule
396        WASM module in the consumer’s partition. This returns a
397        `Iterator[Record]` which is an iterator.
398
399        Streaming is one of the two ways to consume events in Fluvio. It is a
400        continuous request for new records arriving in a partition, beginning
401        at a particular offset. You specify the starting point of the stream
402        using an Offset and periodically receive events, either individually or
403        in batches.
404
405        Args:
406            offset: Offset
407            wasm_module_path: str - The absolute path to the WASM file
408
409        Example:
410            import os
411
412            wmp = os.path.abspath("somefilter.wasm")
413            config = ConsumerConfig()
414            config.smartmodule(path=wmp)
415            for i in consumer.stream_with_config(Offset.beginning(), config):
416                # do something with i
417
418        Returns:
419            `Iterator[Record]`
420
421        """
422        return self._generator(self._inner.stream_with_config(offset, config._inner))
423
424    async def async_stream_with_config(
425        self, offset: Offset, config: ConsumerConfig
426    ) -> typing.AsyncIterator[Record]:
427        """
428        Continuously streams events from a particular offset with a SmartModule
429        WASM module in the consumer’s partition. This returns a
430        `AsyncIterator[Record]` which is an async iterator.
431
432        Streaming is one of the two ways to consume events in Fluvio. It is a
433        continuous request for new records arriving in a partition, beginning
434        at a particular offset. You specify the starting point of the stream
435        using an Offset and periodically receive events, either individually or
436        in batches.
437
438        Args:
439            offset: Offset
440            wasm_module_path: str - The absolute path to the WASM file
441
442        Example:
443            import os
444
445            wmp = os.path.abspath("somefilter.wasm")
446            config = ConsumerConfig()
447            config.smartmodule(path=wmp)
448            `AsyncIterator[Record]`
449
450        """
451        return self._async_generator(
452            await self._inner.async_stream_with_config(offset, config._inner)
453        )
454
455    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
456        item = stream.next()
457        while item is not None:
458            yield Record(item)
459            item = stream.next()
460
461    async def _async_generator(
462        self, astream: _AsyncPartitionConsumerStream
463    ) -> typing.AsyncIterator[Record]:
464        item = await astream.async_next()
465        while item is not None:
466            yield Record(item)
467            item = await astream.async_next()

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: PartitionConsumer)
360    def __init__(self, inner: _PartitionConsumer):
361        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
363    def stream(self, offset: Offset) -> typing.Iterator[Record]:
364        """
365        Continuously streams events from a particular offset in the consumer’s
366        partition. This returns a `Iterator[Record]` which is an
367        iterator.
368
369        Streaming is one of the two ways to consume events in Fluvio. It is a
370        continuous request for new records arriving in a partition, beginning
371        at a particular offset. You specify the starting point of the stream
372        using an Offset and periodically receive events, either individually or
373        in batches.
374        """
375        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
377    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
378        """
379        Continuously streams events from a particular offset in the consumer’s
380        partition. This returns a `AsyncIterator[Record]` which is an
381        iterator.
382
383        Streaming is one of the two ways to consume events in Fluvio. It is a
384        continuous request for new records arriving in a partition, beginning
385        at a particular offset. You specify the starting point of the stream
386        using an Offset and periodically receive events, either individually or
387        in batches.
388        """
389        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partition. This returns a AsyncIterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
391    def stream_with_config(
392        self, offset: Offset, config: ConsumerConfig
393    ) -> typing.Iterator[Record]:
394        """
395        Continuously streams events from a particular offset with a SmartModule
396        WASM module in the consumer’s partition. This returns a
397        `Iterator[Record]` which is an iterator.
398
399        Streaming is one of the two ways to consume events in Fluvio. It is a
400        continuous request for new records arriving in a partition, beginning
401        at a particular offset. You specify the starting point of the stream
402        using an Offset and periodically receive events, either individually or
403        in batches.
404
405        Args:
406            offset: Offset
407            wasm_module_path: str - The absolute path to the WASM file
408
409        Example:
410            import os
411
412            wmp = os.path.abspath("somefilter.wasm")
413            config = ConsumerConfig()
414            config.smartmodule(path=wmp)
415            for i in consumer.stream_with_config(Offset.beginning(), config):
416                # do something with i
417
418        Returns:
419            `Iterator[Record]`
420
421        """
422        return self._generator(self._inner.stream_with_config(offset, config._inner))

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
424    async def async_stream_with_config(
425        self, offset: Offset, config: ConsumerConfig
426    ) -> typing.AsyncIterator[Record]:
427        """
428        Continuously streams events from a particular offset with a SmartModule
429        WASM module in the consumer’s partition. This returns a
430        `AsyncIterator[Record]` which is an async iterator.
431
432        Streaming is one of the two ways to consume events in Fluvio. It is a
433        continuous request for new records arriving in a partition, beginning
434        at a particular offset. You specify the starting point of the stream
435        using an Offset and periodically receive events, either individually or
436        in batches.
437
438        Args:
439            offset: Offset
440            wasm_module_path: str - The absolute path to the WASM file
441
442        Example:
443            import os
444
445            wmp = os.path.abspath("somefilter.wasm")
446            config = ConsumerConfig()
447            config.smartmodule(path=wmp)
448            `AsyncIterator[Record]`
449
450        """
451        return self._async_generator(
452            await self._inner.async_stream_with_config(offset, config._inner)
453        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
class MultiplePartitionConsumer:
470class MultiplePartitionConsumer:
471    """
472    An interface for consuming events from multiple partitions
473
474    There are two ways to consume events: by "fetching" events and by
475    "streaming" events. Fetching involves specifying a range of events that you
476    want to consume via their Offset. A fetch is a sort of one-time batch
477    operation: you’ll receive all of the events in your range all at once. When
478    you consume events via Streaming, you specify a starting Offset and receive
479    an object that will continuously yield new events as they arrive.
480    """
481
482    _inner: _MultiplePartitionConsumer
483
484    def __init__(self, inner: _MultiplePartitionConsumer):
485        self._inner = inner
486
487    def stream(self, offset: Offset) -> typing.Iterator[Record]:
488        """
489        Continuously streams events from a particular offset in the consumer’s
490        partitions. This returns a `Iterator[Record]` which is an
491        iterator.
492
493        Streaming is one of the two ways to consume events in Fluvio. It is a
494        continuous request for new records arriving in a partition, beginning
495        at a particular offset. You specify the starting point of the stream
496        using an Offset and periodically receive events, either individually or
497        in batches.
498        """
499        return self._generator(self._inner.stream(offset))
500
501    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
502        """
503        Continuously streams events from a particular offset in the consumer’s
504        partitions. This returns a `AsyncIterator[Record]` which is an
505        async iterator.
506
507        Streaming is one of the two ways to consume events in Fluvio. It is a
508        continuous request for new records arriving in a partition, beginning
509        at a particular offset. You specify the starting point of the stream
510        using an Offset and periodically receive events, either individually or
511        in batches.
512        """
513        return self._async_generator(await self._inner.async_stream(offset))
514
515    def stream_with_config(
516        self, offset: Offset, config: ConsumerConfig
517    ) -> typing.Iterator[Record]:
518        """
519        Continuously streams events from a particular offset with a SmartModule
520        WASM module in the consumer’s partitions. This returns a
521        `Iterator[Record]` which is an iterator.
522
523        Streaming is one of the two ways to consume events in Fluvio. It is a
524        continuous request for new records arriving in a partition, beginning
525        at a particular offset. You specify the starting point of the stream
526        using an Offset and periodically receive events, either individually or
527        in batches.
528
529        Args:
530            offset: Offset
531            wasm_module_path: str - The absolute path to the WASM file
532
533        Example:
534            import os
535
536            wmp = os.path.abspath("somefilter.wasm")
537            config = ConsumerConfig()
538            config.smartmodule(path=wmp)
539            for i in consumer.stream_with_config(Offset.beginning(), config):
540                # do something with i
541
542        Returns:
543            `Iterator[Record]`
544
545        """
546        return self._generator(
547            self._inner.stream_with_config(offset._inner, config._inner)
548        )
549
550    async def async_stream_with_config(
551        self, offset: Offset, config: ConsumerConfig
552    ) -> typing.AsyncIterator[Record]:
553        """
554        Continuously streams events from a particular offset with a SmartModule
555        WASM module in the consumer’s partitions. This returns a
556        `AsyncIterator[Record]` which is an async iterator.
557
558        Streaming is one of the two ways to consume events in Fluvio. It is a
559        continuous request for new records arriving in a partition, beginning
560        at a particular offset. You specify the starting point of the stream
561        using an Offset and periodically receive events, either individually or
562        in batches.
563
564        Args:
565            offset: Offset
566            wasm_module_path: str - The absolute path to the WASM file
567
568        Example:
569            import os
570
571            wmp = os.path.abspath("somefilter.wasm")
572            config = ConsumerConfig()
573            config.smartmodule(path=wmp)
574            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
575                # do something with i
576
577        Returns:
578            `AsyncIterator[Record]`
579
580        """
581        return self._async_generator(
582            await self._inner.async_stream_with_config(offset, config._inner)
583        )
584
585    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
586        item = stream.next()
587        while item is not None:
588            yield Record(item)
589            item = stream.next()
590
591    async def _async_generator(
592        self, astream: _AsyncPartitionConsumerStream
593    ) -> typing.AsyncIterator[Record]:
594        item = await astream.async_next()
595        while item is not None:
596            yield Record(item)
597            item = await astream.async_next()

An interface for consuming events from multiple partitions

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

MultiplePartitionConsumer(inner: MultiplePartitionConsumer)
484    def __init__(self, inner: _MultiplePartitionConsumer):
485        self._inner = inner
def stream(self, offset: Offset) -> Iterator[Record]:
487    def stream(self, offset: Offset) -> typing.Iterator[Record]:
488        """
489        Continuously streams events from a particular offset in the consumer’s
490        partitions. This returns a `Iterator[Record]` which is an
491        iterator.
492
493        Streaming is one of the two ways to consume events in Fluvio. It is a
494        continuous request for new records arriving in a partition, beginning
495        at a particular offset. You specify the starting point of the stream
496        using an Offset and periodically receive events, either individually or
497        in batches.
498        """
499        return self._generator(self._inner.stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

async def async_stream(self, offset: Offset) -> AsyncIterator[Record]:
501    async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]:
502        """
503        Continuously streams events from a particular offset in the consumer’s
504        partitions. This returns a `AsyncIterator[Record]` which is an
505        async iterator.
506
507        Streaming is one of the two ways to consume events in Fluvio. It is a
508        continuous request for new records arriving in a partition, beginning
509        at a particular offset. You specify the starting point of the stream
510        using an Offset and periodically receive events, either individually or
511        in batches.
512        """
513        return self._async_generator(await self._inner.async_stream(offset))

Continuously streams events from a particular offset in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: Offset, config: ConsumerConfig) -> Iterator[Record]:
515    def stream_with_config(
516        self, offset: Offset, config: ConsumerConfig
517    ) -> typing.Iterator[Record]:
518        """
519        Continuously streams events from a particular offset with a SmartModule
520        WASM module in the consumer’s partitions. This returns a
521        `Iterator[Record]` which is an iterator.
522
523        Streaming is one of the two ways to consume events in Fluvio. It is a
524        continuous request for new records arriving in a partition, beginning
525        at a particular offset. You specify the starting point of the stream
526        using an Offset and periodically receive events, either individually or
527        in batches.
528
529        Args:
530            offset: Offset
531            wasm_module_path: str - The absolute path to the WASM file
532
533        Example:
534            import os
535
536            wmp = os.path.abspath("somefilter.wasm")
537            config = ConsumerConfig()
538            config.smartmodule(path=wmp)
539            for i in consumer.stream_with_config(Offset.beginning(), config):
540                # do something with i
541
542        Returns:
543            `Iterator[Record]`
544
545        """
546        return self._generator(
547            self._inner.stream_with_config(offset._inner, config._inner)
548        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

async def async_stream_with_config( self, offset: Offset, config: ConsumerConfig) -> AsyncIterator[Record]:
550    async def async_stream_with_config(
551        self, offset: Offset, config: ConsumerConfig
552    ) -> typing.AsyncIterator[Record]:
553        """
554        Continuously streams events from a particular offset with a SmartModule
555        WASM module in the consumer’s partitions. This returns a
556        `AsyncIterator[Record]` which is an async iterator.
557
558        Streaming is one of the two ways to consume events in Fluvio. It is a
559        continuous request for new records arriving in a partition, beginning
560        at a particular offset. You specify the starting point of the stream
561        using an Offset and periodically receive events, either individually or
562        in batches.
563
564        Args:
565            offset: Offset
566            wasm_module_path: str - The absolute path to the WASM file
567
568        Example:
569            import os
570
571            wmp = os.path.abspath("somefilter.wasm")
572            config = ConsumerConfig()
573            config.smartmodule(path=wmp)
574            async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
575                # do something with i
576
577        Returns:
578            `AsyncIterator[Record]`
579
580        """
581        return self._async_generator(
582            await self._inner.async_stream_with_config(offset, config._inner)
583        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partitions. This returns a AsyncIterator[Record] which is an async iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: AsyncIterator[Record]

class OffsetManagementStrategy:
NONE = <builtins.OffsetManagementStrategy object>
MANUAL = <builtins.OffsetManagementStrategy object>
AUTO = <builtins.OffsetManagementStrategy object>
class ConsumerOffset:
698class ConsumerOffset:
699    """Consumer offset"""
700
701    _inner: _ConsumerOffset
702
703    def __init__(self, inner: _ConsumerOffset):
704        self._inner = inner
705
706    def consumer_id(self) -> str:
707        return self._inner.consumer_id()
708
709    def topic(self) -> str:
710        return self._inner.topic()
711
712    def partition(self) -> int:
713        return self._inner.partition()
714
715    def offset(self) -> int:
716        return self._inner.offset()
717
718    def modified_time(self) -> int:
719        return self._inner.modified_time()

Consumer offset

ConsumerOffset(inner: ConsumerOffset)
703    def __init__(self, inner: _ConsumerOffset):
704        self._inner = inner
def consumer_id(self) -> str:
706    def consumer_id(self) -> str:
707        return self._inner.consumer_id()
def topic(self) -> str:
709    def topic(self) -> str:
710        return self._inner.topic()
def partition(self) -> int:
712    def partition(self) -> int:
713        return self._inner.partition()
def offset(self) -> int:
715    def offset(self) -> int:
716        return self._inner.offset()
def modified_time(self) -> int:
718    def modified_time(self) -> int:
719        return self._inner.modified_time()
class CommonCreateRequest:
29class CommonCreateRequest:
30    _inner: _CommonCreateRequest
31
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
34
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
CommonCreateRequest(inner: CommonCreateRequest)
32    def __init__(self, inner: _CommonCreateRequest):
33        self._inner = inner
@classmethod
def new(cls, name: str, dry_run: bool, timeout: int):
35    @classmethod
36    def new(cls, name: str, dry_run: bool, timeout: int):
37        return cls(_CommonCreateRequest.new(name, dry_run, timeout))
class SmartModuleSpec:
84class SmartModuleSpec:
85    _inner: _SmartModuleSpec
86
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
89
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
SmartModuleSpec(inner: SmartModuleSpec)
87    def __init__(self, inner: _SmartModuleSpec):
88        self._inner = inner
@classmethod
def new(cls, path: str):
90    @classmethod
91    def new(cls, path: str):
92        f = open(path, mode="rb")
93        data = f.read()
94        f.close()
95        return cls(_SmartModuleSpec.with_binary(data))
class PartitionMap:
18class PartitionMap:
19    _inner: _PartitionMap
20
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
23
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
PartitionMap(inner: PartitionMap)
21    def __init__(self, inner: _PartitionMap):
22        self._inner = inner
@classmethod
def new(cls, partition: int, replicas: List[int]):
24    @classmethod
25    def new(cls, partition: int, replicas: typing.List[int]):
26        return cls(_PartitionMap.new(partition, replicas))
class MessageMetadataTopicSpec:
50class MessageMetadataTopicSpec:
51    _inner: _MessageMetadataTopicSpec
52
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
55
56    def is_update(self) -> bool:
57        return self._inner.is_update()
58
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
61
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
MessageMetadataTopicSpec(inner: MessageMetadataTopicSpec)
53    def __init__(self, inner: _MessageMetadataTopicSpec):
54        self._inner = inner
def is_update(self) -> bool:
56    def is_update(self) -> bool:
57        return self._inner.is_update()
def is_delete(self) -> bool:
59    def is_delete(self) -> bool:
60        return self._inner.is_delete()
def metadata_topic_spec(self) -> MetadataTopicSpec:
62    def metadata_topic_spec(self) -> MetadataTopicSpec:
63        return MetadataTopicSpec(self._inner.metadata_topic_spec())
class MetadataPartitionSpec:
142class MetadataPartitionSpec:
143    _inner: _MetadataPartitionSpec
144
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
147
148    def name(self) -> str:
149        return self._inner.name()
MetadataPartitionSpec(inner: MetadataPartitionSpec)
145    def __init__(self, inner: _MetadataPartitionSpec):
146        self._inner = inner
def name(self) -> str:
148    def name(self) -> str:
149        return self._inner.name()
class MetadataTopicSpec:
40class MetadataTopicSpec:
41    _inner: _MetadataTopicSpec
42
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
45
46    def name(self) -> str:
47        return self._inner.name()
MetadataTopicSpec(inner: MetadataTopicSpec)
43    def __init__(self, inner: _MetadataTopicSpec):
44        self._inner = inner
def name(self) -> str:
46    def name(self) -> str:
47        return self._inner.name()
class MetaUpdateTopicSpec:
66class MetaUpdateTopicSpec:
67    _inner: _MetaUpdateTopicSpec
68
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
71
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
75
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
79
80    def epoch(self) -> int:
81        return self._inner.epoch()
MetaUpdateTopicSpec(inner: MetaUpdateTopicSpec)
69    def __init__(self, inner: _MetaUpdateTopicSpec):
70        self._inner = inner
def all(self) -> List[MetadataTopicSpec]:
72    def all(self) -> typing.List[MetadataTopicSpec]:
73        inners = self._inner.all()
74        return [MetadataTopicSpec(i) for i in inners]
def changes(self) -> List[MessageMetadataTopicSpec]:
76    def changes(self) -> typing.List[MessageMetadataTopicSpec]:
77        inners = self._inner.changes()
78        return [MessageMetadataTopicSpec(i) for i in inners]
def epoch(self) -> int:
80    def epoch(self) -> int:
81        return self._inner.epoch()
class PartitionSelectionStrategy:
679class PartitionSelectionStrategy:
680    """Stragegy to select partitions"""
681
682    _inner: _PartitionSelectionStrategy
683
684    def __init__(self, inner: _FluvioConfig):
685        self._inner = inner
686
687    @classmethod
688    def with_all(cls, topic: str):
689        """select all partitions of one topic"""
690        return cls(_PartitionSelectionStrategy.with_all(topic))
691
692    @classmethod
693    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
694        """select multiple partitions of multiple topics"""
695        return cls(_PartitionSelectionStrategy.with_multiple(topic))

Stragegy to select partitions

PartitionSelectionStrategy(inner: FluvioConfig)
684    def __init__(self, inner: _FluvioConfig):
685        self._inner = inner
@classmethod
def with_all(cls, topic: str):
687    @classmethod
688    def with_all(cls, topic: str):
689        """select all partitions of one topic"""
690        return cls(_PartitionSelectionStrategy.with_all(topic))

select all partitions of one topic

@classmethod
def with_multiple(cls, topic: List[Tuple[str, int]]):
692    @classmethod
693    def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
694        """select multiple partitions of multiple topics"""
695        return cls(_PartitionSelectionStrategy.with_multiple(topic))

select multiple partitions of multiple topics

class SmartModuleKind(enum.Enum):
245class SmartModuleKind(Enum):
246    """
247    Use of this is to explicitly set the kind of a smartmodule. Not required
248    but needed for legacy SmartModules.
249    """
250
251    Filter = _SmartModuleKind.Filter
252    Map = _SmartModuleKind.Map
253    ArrayMap = _SmartModuleKind.ArrayMap
254    FilterMap = _SmartModuleKind.FilterMap
255    Aggregate = _SmartModuleKind.Aggregate

Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.