fluvio

  1from ._fluvio_python import (
  2    Fluvio as _Fluvio,
  3    ConsumerConfig as _ConsumerConfig,
  4    PartitionConsumer as _PartitionConsumer,
  5    PartitionConsumerStream as _PartitionConsumerStream,
  6    TopicProducer as _TopicProducer,
  7    ProducerBatchRecord as _ProducerBatchRecord,
  8    SmartModuleKind as _SmartModuleKind,
  9    Record as _Record,
 10    Offset as _Offset,
 11)
 12from enum import Enum
 13from ._fluvio_python import Error as FluviorError  # noqa: F401
 14import typing
 15
 16
 17class Record:
 18    """The individual record for a given stream."""
 19
 20    _inner: _Record
 21
 22    def __init__(self, inner: _Record):
 23        self._inner = inner
 24
 25    def offset(self) -> int:
 26        """The offset from the initial offset for a given stream."""
 27        return self._inner.offset()
 28
 29    def value(self) -> typing.List[int]:
 30        """Returns the contents of this Record's value"""
 31        return self._inner.value()
 32
 33    def value_string(self) -> str:
 34        """The UTF-8 decoded value for this record."""
 35        return self._inner.value_string()
 36
 37    def key(self) -> typing.List[int]:
 38        """Returns the contents of this Record's key, if it exists"""
 39        return self._inner.key()
 40
 41    def key_string(self) -> str:
 42        """The UTF-8 decoded key for this record."""
 43        return self._inner.key_string()
 44
 45
 46class Offset:
 47    """Describes the location of an event stored in a Fluvio partition."""
 48
 49    _inner: _Offset
 50
 51    @classmethod
 52    def absolute(cls, index: int):
 53        """Creates an absolute offset with the given index"""
 54        return cls(_Offset.absolute(index))
 55
 56    @classmethod
 57    def beginning(cls):
 58        """Creates a relative offset starting at the beginning of the saved log"""
 59        return cls(_Offset.beginning())
 60
 61    @classmethod
 62    def end(cls):
 63        """Creates a relative offset pointing to the newest log entry"""
 64        return cls(_Offset.end())
 65
 66    @classmethod
 67    def from_beginning(cls, offset: int):
 68        """Creates a relative offset a fixed distance after the oldest log
 69        entry
 70        """
 71        return cls(_Offset.from_beginning(offset))
 72
 73    @classmethod
 74    def from_end(cls, offset: int):
 75        """Creates a relative offset a fixed distance before the newest log
 76        entry
 77        """
 78        return cls(_Offset.from_end(offset))
 79
 80    def __init__(self, inner: _Offset):
 81        self._inner = inner
 82
 83
 84class SmartModuleKind(Enum):
 85    """
 86    Use of this is to explicitly set the kind of a smartmodule. Not required
 87    but needed for legacy SmartModules.
 88    """
 89
 90    Filter = _SmartModuleKind.Filter
 91    Map = _SmartModuleKind.Map
 92    ArrayMap = _SmartModuleKind.ArrayMap
 93    FilterMap = _SmartModuleKind.FilterMap
 94    Aggregate = _SmartModuleKind.Aggregate
 95
 96
 97class ConsumerConfig:
 98    _inner: _ConsumerConfig
 99
100    def __init__(self):
101        self._inner = _ConsumerConfig()
102
103    def smartmodule(
104        self,
105        name: str = None,
106        path: str = None,
107        kind: SmartModuleKind = None,
108        params: typing.Dict[str, str] = None,
109        aggregate: typing.List[bytes] = None,
110    ):
111        """
112        This is a method for adding a smartmodule to a consumer config either
113        using a `name` of a `SmartModule` or a `path` to a wasm binary.
114
115        Args:
116
117            name: str
118            path: str
119            kind: SmartModuleKind
120            params: Dict[str, str]
121            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
122
123        Raises:
124            "Require either a path or a name for a smartmodule."
125            "Only specify one of path or name not both."
126
127        Returns:
128
129            None
130        """
131
132        if kind is not None:
133            kind = kind.value
134
135        if path is None and name is None:
136            raise Exception("Require either a path or a name for a smartmodule.")
137
138        if path is not None and name is not None:
139            raise Exception("Only specify one of path or name not both.")
140
141        params = {} if params is None else params
142        param_keys = [x for x in params.keys()]
143        param_vals = [x for x in params.values()]
144
145        self._inner.smartmodule(
146            name,
147            path,
148            kind,
149            param_keys,
150            param_vals,
151            aggregate,
152            # These arguments are for Join stuff but that's not implemented on
153            # the python side yet
154            None,
155            None,
156            None,
157            None,
158        )
159
160
161class PartitionConsumer:
162    """
163    An interface for consuming events from a particular partition
164
165    There are two ways to consume events: by "fetching" events and by
166    "streaming" events. Fetching involves specifying a range of events that you
167    want to consume via their Offset. A fetch is a sort of one-time batch
168    operation: you’ll receive all of the events in your range all at once. When
169    you consume events via Streaming, you specify a starting Offset and receive
170    an object that will continuously yield new events as they arrive.
171    """
172
173    _inner: _PartitionConsumer
174
175    def __init__(self, inner: _PartitionConsumer):
176        self._inner = inner
177
178    def stream(self, offset: Offset) -> typing.Iterator[Record]:
179        """
180        Continuously streams events from a particular offset in the consumer’s
181        partition. This returns a `Iterator[Record]` which is an
182        iterator.
183
184        Streaming is one of the two ways to consume events in Fluvio. It is a
185        continuous request for new records arriving in a partition, beginning
186        at a particular offset. You specify the starting point of the stream
187        using an Offset and periodically receive events, either individually or
188        in batches.
189        """
190        return self._generator(self._inner.stream(offset._inner))
191
192    def stream_with_config(
193        self, offset: Offset, config: ConsumerConfig
194    ) -> typing.Iterator[Record]:
195        """
196        Continuously streams events from a particular offset with a SmartModule
197        WASM module in the consumer’s partition. This returns a
198        `Iterator[Record]` which is an iterator.
199
200        Streaming is one of the two ways to consume events in Fluvio. It is a
201        continuous request for new records arriving in a partition, beginning
202        at a particular offset. You specify the starting point of the stream
203        using an Offset and periodically receive events, either individually or
204        in batches.
205
206        Args:
207            offset: Offset
208            wasm_module_path: str - The absolute path to the WASM file
209
210        Example:
211            import os
212
213            wmp = os.path.abspath("somefilter.wasm")
214            config = ConsumerConfig()
215            config.smartmodule(path=wmp)
216            for i in consumer.stream_with_config(Offset.beginning(), config):
217                # do something with i
218
219        Returns:
220            `Iterator[Record]`
221
222        """
223        return self._generator(
224            self._inner.stream_with_config(offset._inner, config._inner)
225        )
226
227    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
228        item = stream.next()
229        while item is not None:
230            yield Record(item)
231            item = stream.next()
232
233
234class TopicProducer:
235    """An interface for producing events to a particular topic.
236
237    A `TopicProducer` allows you to send events to the specific topic it was
238    initialized for. Once you have a `TopicProducer`, you can send events to
239    the topic, choosing which partition each event should be delivered to.
240    """
241
242    _inner: _TopicProducer
243
244    def __init__(self, inner: _TopicProducer):
245        self._inner = inner
246
247    def send_string(self, buf: str) -> None:
248        """Sends a string to this producer’s topic"""
249        return self.send([], buf.encode("utf-8"))
250
251    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
252        """
253        Sends a key/value record to this producer's Topic.
254
255        The partition that the record will be sent to is derived from the Key.
256        """
257        return self._inner.send(key, value)
258
259    def flush(self) -> None:
260        """
261        Send all the queued records in the producer batches.
262        """
263        return self._inner.flush()
264
265    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
266        """
267        Sends a list of key/value records as a batch to this producer's Topic.
268        :param records: The list of records to send
269        """
270        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
271        return self._inner.send_all(records_inner)
272
273
274class Fluvio:
275    """An interface for interacting with Fluvio streaming."""
276
277    _inner: _Fluvio
278
279    def __init__(self, inner: _Fluvio):
280        self._inner = inner
281
282    @classmethod
283    def connect(cls):
284        """Creates a new Fluvio client using the current profile from
285        `~/.fluvio/config`
286
287        If there is no current profile or the `~/.fluvio/config` file does not
288        exist, then this will create a new profile with default settings and
289        set it as current, then try to connect to the cluster using those
290        settings.
291        """
292        return cls(_Fluvio.connect())
293
294    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
295        """Creates a new `PartitionConsumer` for the given topic and partition
296
297        Currently, consumers are scoped to both a specific Fluvio topic and to
298        a particular partition within that topic. That means that if you have a
299        topic with multiple partitions, then in order to receive all of the
300        events in all of the partitions, you will need to create one consumer
301        per partition.
302        """
303        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
304
305    def topic_producer(self, topic: str) -> TopicProducer:
306        """
307        Creates a new `TopicProducer` for the given topic name.
308
309        Currently, producers are scoped to a specific Fluvio topic. That means
310        when you send events via a producer, you must specify which partition
311        each event should go to.
312        """
313        return TopicProducer(self._inner.topic_producer(topic))
class Record:
18class Record:
19    """The individual record for a given stream."""
20
21    _inner: _Record
22
23    def __init__(self, inner: _Record):
24        self._inner = inner
25
26    def offset(self) -> int:
27        """The offset from the initial offset for a given stream."""
28        return self._inner.offset()
29
30    def value(self) -> typing.List[int]:
31        """Returns the contents of this Record's value"""
32        return self._inner.value()
33
34    def value_string(self) -> str:
35        """The UTF-8 decoded value for this record."""
36        return self._inner.value_string()
37
38    def key(self) -> typing.List[int]:
39        """Returns the contents of this Record's key, if it exists"""
40        return self._inner.key()
41
42    def key_string(self) -> str:
43        """The UTF-8 decoded key for this record."""
44        return self._inner.key_string()

The individual record for a given stream.

Record(inner: fluvio._fluvio_python.Record)
23    def __init__(self, inner: _Record):
24        self._inner = inner
def offset(self) -> int:
26    def offset(self) -> int:
27        """The offset from the initial offset for a given stream."""
28        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
30    def value(self) -> typing.List[int]:
31        """Returns the contents of this Record's value"""
32        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
34    def value_string(self) -> str:
35        """The UTF-8 decoded value for this record."""
36        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
38    def key(self) -> typing.List[int]:
39        """Returns the contents of this Record's key, if it exists"""
40        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
42    def key_string(self) -> str:
43        """The UTF-8 decoded key for this record."""
44        return self._inner.key_string()

The UTF-8 decoded key for this record.

class Offset:
47class Offset:
48    """Describes the location of an event stored in a Fluvio partition."""
49
50    _inner: _Offset
51
52    @classmethod
53    def absolute(cls, index: int):
54        """Creates an absolute offset with the given index"""
55        return cls(_Offset.absolute(index))
56
57    @classmethod
58    def beginning(cls):
59        """Creates a relative offset starting at the beginning of the saved log"""
60        return cls(_Offset.beginning())
61
62    @classmethod
63    def end(cls):
64        """Creates a relative offset pointing to the newest log entry"""
65        return cls(_Offset.end())
66
67    @classmethod
68    def from_beginning(cls, offset: int):
69        """Creates a relative offset a fixed distance after the oldest log
70        entry
71        """
72        return cls(_Offset.from_beginning(offset))
73
74    @classmethod
75    def from_end(cls, offset: int):
76        """Creates a relative offset a fixed distance before the newest log
77        entry
78        """
79        return cls(_Offset.from_end(offset))
80
81    def __init__(self, inner: _Offset):
82        self._inner = inner

Describes the location of an event stored in a Fluvio partition.

Offset(inner: fluvio._fluvio_python.Offset)
81    def __init__(self, inner: _Offset):
82        self._inner = inner
@classmethod
def absolute(cls, index: int):
52    @classmethod
53    def absolute(cls, index: int):
54        """Creates an absolute offset with the given index"""
55        return cls(_Offset.absolute(index))

Creates an absolute offset with the given index

@classmethod
def beginning(cls):
57    @classmethod
58    def beginning(cls):
59        """Creates a relative offset starting at the beginning of the saved log"""
60        return cls(_Offset.beginning())

Creates a relative offset starting at the beginning of the saved log

@classmethod
def end(cls):
62    @classmethod
63    def end(cls):
64        """Creates a relative offset pointing to the newest log entry"""
65        return cls(_Offset.end())

Creates a relative offset pointing to the newest log entry

@classmethod
def from_beginning(cls, offset: int):
67    @classmethod
68    def from_beginning(cls, offset: int):
69        """Creates a relative offset a fixed distance after the oldest log
70        entry
71        """
72        return cls(_Offset.from_beginning(offset))

Creates a relative offset a fixed distance after the oldest log entry

@classmethod
def from_end(cls, offset: int):
74    @classmethod
75    def from_end(cls, offset: int):
76        """Creates a relative offset a fixed distance before the newest log
77        entry
78        """
79        return cls(_Offset.from_end(offset))

Creates a relative offset a fixed distance before the newest log entry

class SmartModuleKind(enum.Enum):
85class SmartModuleKind(Enum):
86    """
87    Use of this is to explicitly set the kind of a smartmodule. Not required
88    but needed for legacy SmartModules.
89    """
90
91    Filter = _SmartModuleKind.Filter
92    Map = _SmartModuleKind.Map
93    ArrayMap = _SmartModuleKind.ArrayMap
94    FilterMap = _SmartModuleKind.FilterMap
95    Aggregate = _SmartModuleKind.Aggregate

Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.

Filter = <SmartModuleKind.Filter: 0>
Map = <SmartModuleKind.Map: 1>
ArrayMap = <SmartModuleKind.ArrayMap: 2>
FilterMap = <SmartModuleKind.FilterMap: 3>
Aggregate = <SmartModuleKind.Aggregate: 6>
Inherited Members
enum.Enum
name
value
class ConsumerConfig:
 98class ConsumerConfig:
 99    _inner: _ConsumerConfig
100
101    def __init__(self):
102        self._inner = _ConsumerConfig()
103
104    def smartmodule(
105        self,
106        name: str = None,
107        path: str = None,
108        kind: SmartModuleKind = None,
109        params: typing.Dict[str, str] = None,
110        aggregate: typing.List[bytes] = None,
111    ):
112        """
113        This is a method for adding a smartmodule to a consumer config either
114        using a `name` of a `SmartModule` or a `path` to a wasm binary.
115
116        Args:
117
118            name: str
119            path: str
120            kind: SmartModuleKind
121            params: Dict[str, str]
122            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
123
124        Raises:
125            "Require either a path or a name for a smartmodule."
126            "Only specify one of path or name not both."
127
128        Returns:
129
130            None
131        """
132
133        if kind is not None:
134            kind = kind.value
135
136        if path is None and name is None:
137            raise Exception("Require either a path or a name for a smartmodule.")
138
139        if path is not None and name is not None:
140            raise Exception("Only specify one of path or name not both.")
141
142        params = {} if params is None else params
143        param_keys = [x for x in params.keys()]
144        param_vals = [x for x in params.values()]
145
146        self._inner.smartmodule(
147            name,
148            path,
149            kind,
150            param_keys,
151            param_vals,
152            aggregate,
153            # These arguments are for Join stuff but that's not implemented on
154            # the python side yet
155            None,
156            None,
157            None,
158            None,
159        )
ConsumerConfig()
101    def __init__(self):
102        self._inner = _ConsumerConfig()
def smartmodule( self, name: str = None, path: str = None, kind: fluvio.SmartModuleKind = None, params: Dict[str, str] = None, aggregate: List[bytes] = None):
104    def smartmodule(
105        self,
106        name: str = None,
107        path: str = None,
108        kind: SmartModuleKind = None,
109        params: typing.Dict[str, str] = None,
110        aggregate: typing.List[bytes] = None,
111    ):
112        """
113        This is a method for adding a smartmodule to a consumer config either
114        using a `name` of a `SmartModule` or a `path` to a wasm binary.
115
116        Args:
117
118            name: str
119            path: str
120            kind: SmartModuleKind
121            params: Dict[str, str]
122            aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
123
124        Raises:
125            "Require either a path or a name for a smartmodule."
126            "Only specify one of path or name not both."
127
128        Returns:
129
130            None
131        """
132
133        if kind is not None:
134            kind = kind.value
135
136        if path is None and name is None:
137            raise Exception("Require either a path or a name for a smartmodule.")
138
139        if path is not None and name is not None:
140            raise Exception("Only specify one of path or name not both.")
141
142        params = {} if params is None else params
143        param_keys = [x for x in params.keys()]
144        param_vals = [x for x in params.values()]
145
146        self._inner.smartmodule(
147            name,
148            path,
149            kind,
150            param_keys,
151            param_vals,
152            aggregate,
153            # These arguments are for Join stuff but that's not implemented on
154            # the python side yet
155            None,
156            None,
157            None,
158            None,
159        )

This is a method for adding a smartmodule to a consumer config either using a name of a SmartModule or a path to a wasm binary.

Args:

name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule

Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."

Returns:

None
class PartitionConsumer:
162class PartitionConsumer:
163    """
164    An interface for consuming events from a particular partition
165
166    There are two ways to consume events: by "fetching" events and by
167    "streaming" events. Fetching involves specifying a range of events that you
168    want to consume via their Offset. A fetch is a sort of one-time batch
169    operation: you’ll receive all of the events in your range all at once. When
170    you consume events via Streaming, you specify a starting Offset and receive
171    an object that will continuously yield new events as they arrive.
172    """
173
174    _inner: _PartitionConsumer
175
176    def __init__(self, inner: _PartitionConsumer):
177        self._inner = inner
178
179    def stream(self, offset: Offset) -> typing.Iterator[Record]:
180        """
181        Continuously streams events from a particular offset in the consumer’s
182        partition. This returns a `Iterator[Record]` which is an
183        iterator.
184
185        Streaming is one of the two ways to consume events in Fluvio. It is a
186        continuous request for new records arriving in a partition, beginning
187        at a particular offset. You specify the starting point of the stream
188        using an Offset and periodically receive events, either individually or
189        in batches.
190        """
191        return self._generator(self._inner.stream(offset._inner))
192
193    def stream_with_config(
194        self, offset: Offset, config: ConsumerConfig
195    ) -> typing.Iterator[Record]:
196        """
197        Continuously streams events from a particular offset with a SmartModule
198        WASM module in the consumer’s partition. This returns a
199        `Iterator[Record]` which is an iterator.
200
201        Streaming is one of the two ways to consume events in Fluvio. It is a
202        continuous request for new records arriving in a partition, beginning
203        at a particular offset. You specify the starting point of the stream
204        using an Offset and periodically receive events, either individually or
205        in batches.
206
207        Args:
208            offset: Offset
209            wasm_module_path: str - The absolute path to the WASM file
210
211        Example:
212            import os
213
214            wmp = os.path.abspath("somefilter.wasm")
215            config = ConsumerConfig()
216            config.smartmodule(path=wmp)
217            for i in consumer.stream_with_config(Offset.beginning(), config):
218                # do something with i
219
220        Returns:
221            `Iterator[Record]`
222
223        """
224        return self._generator(
225            self._inner.stream_with_config(offset._inner, config._inner)
226        )
227
228    def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
229        item = stream.next()
230        while item is not None:
231            yield Record(item)
232            item = stream.next()

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: fluvio._fluvio_python.PartitionConsumer)
176    def __init__(self, inner: _PartitionConsumer):
177        self._inner = inner
def stream(self, offset: fluvio.Offset) -> Iterator[fluvio.Record]:
179    def stream(self, offset: Offset) -> typing.Iterator[Record]:
180        """
181        Continuously streams events from a particular offset in the consumer’s
182        partition. This returns a `Iterator[Record]` which is an
183        iterator.
184
185        Streaming is one of the two ways to consume events in Fluvio. It is a
186        continuous request for new records arriving in a partition, beginning
187        at a particular offset. You specify the starting point of the stream
188        using an Offset and periodically receive events, either individually or
189        in batches.
190        """
191        return self._generator(self._inner.stream(offset._inner))

Continuously streams events from a particular offset in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: fluvio.Offset, config: fluvio.ConsumerConfig) -> Iterator[fluvio.Record]:
193    def stream_with_config(
194        self, offset: Offset, config: ConsumerConfig
195    ) -> typing.Iterator[Record]:
196        """
197        Continuously streams events from a particular offset with a SmartModule
198        WASM module in the consumer’s partition. This returns a
199        `Iterator[Record]` which is an iterator.
200
201        Streaming is one of the two ways to consume events in Fluvio. It is a
202        continuous request for new records arriving in a partition, beginning
203        at a particular offset. You specify the starting point of the stream
204        using an Offset and periodically receive events, either individually or
205        in batches.
206
207        Args:
208            offset: Offset
209            wasm_module_path: str - The absolute path to the WASM file
210
211        Example:
212            import os
213
214            wmp = os.path.abspath("somefilter.wasm")
215            config = ConsumerConfig()
216            config.smartmodule(path=wmp)
217            for i in consumer.stream_with_config(Offset.beginning(), config):
218                # do something with i
219
220        Returns:
221            `Iterator[Record]`
222
223        """
224        return self._generator(
225            self._inner.stream_with_config(offset._inner, config._inner)
226        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a Iterator[Record] which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
    # do something with i

Returns: Iterator[Record]

class TopicProducer:
235class TopicProducer:
236    """An interface for producing events to a particular topic.
237
238    A `TopicProducer` allows you to send events to the specific topic it was
239    initialized for. Once you have a `TopicProducer`, you can send events to
240    the topic, choosing which partition each event should be delivered to.
241    """
242
243    _inner: _TopicProducer
244
245    def __init__(self, inner: _TopicProducer):
246        self._inner = inner
247
248    def send_string(self, buf: str) -> None:
249        """Sends a string to this producer’s topic"""
250        return self.send([], buf.encode("utf-8"))
251
252    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
253        """
254        Sends a key/value record to this producer's Topic.
255
256        The partition that the record will be sent to is derived from the Key.
257        """
258        return self._inner.send(key, value)
259
260    def flush(self) -> None:
261        """
262        Send all the queued records in the producer batches.
263        """
264        return self._inner.flush()
265
266    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
267        """
268        Sends a list of key/value records as a batch to this producer's Topic.
269        :param records: The list of records to send
270        """
271        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
272        return self._inner.send_all(records_inner)

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: fluvio._fluvio_python.TopicProducer)
245    def __init__(self, inner: _TopicProducer):
246        self._inner = inner
def send_string(self, buf: str) -> None:
248    def send_string(self, buf: str) -> None:
249        """Sends a string to this producer’s topic"""
250        return self.send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> None:
252    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
253        """
254        Sends a key/value record to this producer's Topic.
255
256        The partition that the record will be sent to is derived from the Key.
257        """
258        return self._inner.send(key, value)

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
260    def flush(self) -> None:
261        """
262        Send all the queued records in the producer batches.
263        """
264        return self._inner.flush()

Send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]]):
266    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
267        """
268        Sends a list of key/value records as a batch to this producer's Topic.
269        :param records: The list of records to send
270        """
271        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
272        return self._inner.send_all(records_inner)

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class Fluvio:
275class Fluvio:
276    """An interface for interacting with Fluvio streaming."""
277
278    _inner: _Fluvio
279
280    def __init__(self, inner: _Fluvio):
281        self._inner = inner
282
283    @classmethod
284    def connect(cls):
285        """Creates a new Fluvio client using the current profile from
286        `~/.fluvio/config`
287
288        If there is no current profile or the `~/.fluvio/config` file does not
289        exist, then this will create a new profile with default settings and
290        set it as current, then try to connect to the cluster using those
291        settings.
292        """
293        return cls(_Fluvio.connect())
294
295    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
296        """Creates a new `PartitionConsumer` for the given topic and partition
297
298        Currently, consumers are scoped to both a specific Fluvio topic and to
299        a particular partition within that topic. That means that if you have a
300        topic with multiple partitions, then in order to receive all of the
301        events in all of the partitions, you will need to create one consumer
302        per partition.
303        """
304        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
305
306    def topic_producer(self, topic: str) -> TopicProducer:
307        """
308        Creates a new `TopicProducer` for the given topic name.
309
310        Currently, producers are scoped to a specific Fluvio topic. That means
311        when you send events via a producer, you must specify which partition
312        each event should go to.
313        """
314        return TopicProducer(self._inner.topic_producer(topic))

An interface for interacting with Fluvio streaming.

Fluvio(inner: fluvio._fluvio_python.Fluvio)
280    def __init__(self, inner: _Fluvio):
281        self._inner = inner
@classmethod
def connect(cls):
283    @classmethod
284    def connect(cls):
285        """Creates a new Fluvio client using the current profile from
286        `~/.fluvio/config`
287
288        If there is no current profile or the `~/.fluvio/config` file does not
289        exist, then this will create a new profile with default settings and
290        set it as current, then try to connect to the cluster using those
291        settings.
292        """
293        return cls(_Fluvio.connect())

Creates a new Fluvio client using the current profile from ~/.fluvio/config

If there is no current profile or the ~/.fluvio/config file does not exist, then this will create a new profile with default settings and set it as current, then try to connect to the cluster using those settings.

def partition_consumer(self, topic: str, partition: int) -> fluvio.PartitionConsumer:
295    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
296        """Creates a new `PartitionConsumer` for the given topic and partition
297
298        Currently, consumers are scoped to both a specific Fluvio topic and to
299        a particular partition within that topic. That means that if you have a
300        topic with multiple partitions, then in order to receive all of the
301        events in all of the partitions, you will need to create one consumer
302        per partition.
303        """
304        return PartitionConsumer(self._inner.partition_consumer(topic, partition))

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def topic_producer(self, topic: str) -> fluvio.TopicProducer:
306    def topic_producer(self, topic: str) -> TopicProducer:
307        """
308        Creates a new `TopicProducer` for the given topic name.
309
310        Currently, producers are scoped to a specific Fluvio topic. That means
311        when you send events via a producer, you must specify which partition
312        each event should go to.
313        """
314        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.