fluvio

  1from ._fluvio_python import (
  2    Fluvio as _Fluvio,
  3    PartitionConsumer as _PartitionConsumer,
  4    PartitionConsumerStream as _PartitionConsumerStream,
  5    TopicProducer as _TopicProducer,
  6    ProducerBatchRecord as _ProducerBatchRecord,
  7    Record as _Record,
  8    Offset as _Offset,
  9)
 10from ._fluvio_python import Error as FluviorError  # noqa: F401
 11import typing
 12
 13
 14class Record:
 15    """The individual record for a given stream."""
 16
 17    _inner: _Record
 18
 19    def __init__(self, inner: _Record):
 20        self._inner = inner
 21
 22    def offset(self) -> int:
 23        """The offset from the initial offset for a given stream."""
 24        return self._inner.offset()
 25
 26    def value(self) -> typing.List[int]:
 27        """Returns the contents of this Record's value"""
 28        return self._inner.value()
 29
 30    def value_string(self) -> str:
 31        """The UTF-8 decoded value for this record."""
 32        return self._inner.value_string()
 33
 34    def key(self) -> typing.List[int]:
 35        """Returns the contents of this Record's key, if it exists"""
 36        return self._inner.key()
 37
 38    def key_string(self) -> str:
 39        """The UTF-8 decoded key for this record."""
 40        return self._inner.key_string()
 41
 42
 43class Offset:
 44    """Describes the location of an event stored in a Fluvio partition."""
 45
 46    _inner: _Offset
 47
 48    @classmethod
 49    def absolute(cls, index: int):
 50        """Creates an absolute offset with the given index"""
 51        return cls(_Offset.absolute(index))
 52
 53    @classmethod
 54    def beginning(cls):
 55        """Creates a relative offset starting at the beginning of the saved log"""
 56        return cls(_Offset.beginning())
 57
 58    @classmethod
 59    def end(cls):
 60        """Creates a relative offset pointing to the newest log entry"""
 61        return cls(_Offset.end())
 62
 63    @classmethod
 64    def from_beginning(cls, offset: int):
 65        """Creates a relative offset a fixed distance after the oldest log
 66        entry
 67        """
 68        return cls(_Offset.from_beginning(offset))
 69
 70    @classmethod
 71    def from_end(cls, offset: int):
 72        """Creates a relative offset a fixed distance before the newest log
 73        entry
 74        """
 75        return cls(_Offset.from_end(offset))
 76
 77    def __init__(self, inner: _Offset):
 78        self._inner = inner
 79
 80
 81class PartitionConsumerStream:
 82    """An iterator for `PartitionConsumer.stream` method where each `__next__`
 83    returns a `Record`.
 84
 85    Usage:
 86
 87    ```python
 88    for i in consumer.stream(0):
 89        print(i.value())
 90        print(i.value_string())
 91    ```
 92    """
 93
 94    _inner: _PartitionConsumerStream
 95
 96    def __init__(self, inner: _PartitionConsumerStream):
 97        self._inner = inner
 98
 99    def __iter__(self):
100        return self
101
102    def __next__(self) -> typing.Optional[Record]:
103        return Record(self._inner.next())
104
105
106class PartitionConsumer:
107    """
108    An interface for consuming events from a particular partition
109
110    There are two ways to consume events: by "fetching" events and by
111    "streaming" events. Fetching involves specifying a range of events that you
112    want to consume via their Offset. A fetch is a sort of one-time batch
113    operation: you’ll receive all of the events in your range all at once. When
114    you consume events via Streaming, you specify a starting Offset and receive
115    an object that will continuously yield new events as they arrive.
116    """
117
118    _inner: _PartitionConsumer
119
120    def __init__(self, inner: _PartitionConsumer):
121        self._inner = inner
122
123    def stream(self, offset: Offset) -> PartitionConsumerStream:
124        """
125        Continuously streams events from a particular offset in the consumer’s
126        partition. This returns a `PartitionConsumerStream` which is an
127        iterator.
128
129        Streaming is one of the two ways to consume events in Fluvio. It is a
130        continuous request for new records arriving in a partition, beginning
131        at a particular offset. You specify the starting point of the stream
132        using an Offset and periodically receive events, either individually or
133        in batches.
134        """
135        return PartitionConsumerStream(self._inner.stream(offset._inner))
136
137    def stream_with_config(
138        self, offset: Offset, wasm_path: str
139    ) -> PartitionConsumerStream:
140        """
141        Continuously streams events from a particular offset with a SmartModule
142        WASM module in the consumer’s partition. This returns a
143        `PartitionConsumerStream` which is an iterator.
144
145        Streaming is one of the two ways to consume events in Fluvio. It is a
146        continuous request for new records arriving in a partition, beginning
147        at a particular offset. You specify the starting point of the stream
148        using an Offset and periodically receive events, either individually or
149        in batches.
150
151        Args:
152            offset: Offset
153            wasm_module_path: str - The absolute path to the WASM file
154
155        Example:
156            import os
157
158            wmp = os.path.abspath("somefilter.wasm")
159            for i in consumer.stream_with_config(Offset.beginning(), wmp):
160                # do something with i
161
162        Returns:
163            PartionConsumerStream
164
165        """
166        return PartitionConsumerStream(
167            self._inner.stream_with_config(offset._inner, wasm_path)
168        )
169
170
171class TopicProducer:
172    """An interface for producing events to a particular topic.
173
174    A `TopicProducer` allows you to send events to the specific topic it was
175    initialized for. Once you have a `TopicProducer`, you can send events to
176    the topic, choosing which partition each event should be delivered to.
177    """
178
179    _inner: _TopicProducer
180
181    def __init__(self, inner: _TopicProducer):
182        self._inner = inner
183
184    def send_string(self, buf: str) -> None:
185        """Sends a string to this producer’s topic"""
186        return self.send([], buf.encode("utf-8"))
187
188    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
189        """
190        Sends a key/value record to this producer's Topic.
191
192        The partition that the record will be sent to is derived from the Key.
193        """
194        return self._inner.send(key, value)
195
196    def flush(self) -> None:
197        """
198        Send all the queued records in the producer batches.
199        """
200        return self._inner.flush()
201
202    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
203        """
204        Sends a list of key/value records as a batch to this producer's Topic.
205        :param records: The list of records to send
206        """
207        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
208        return self._inner.send_all(records_inner)
209
210
211class Fluvio:
212    """An interface for interacting with Fluvio streaming."""
213
214    _inner: _Fluvio
215
216    def __init__(self, inner: _Fluvio):
217        self._inner = inner
218
219    @classmethod
220    def connect(cls):
221        """Creates a new Fluvio client using the current profile from
222        `~/.fluvio/config`
223
224        If there is no current profile or the `~/.fluvio/config` file does not
225        exist, then this will create a new profile with default settings and
226        set it as current, then try to connect to the cluster using those
227        settings.
228        """
229        return cls(_Fluvio.connect())
230
231    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
232        """Creates a new `PartitionConsumer` for the given topic and partition
233
234        Currently, consumers are scoped to both a specific Fluvio topic and to
235        a particular partition within that topic. That means that if you have a
236        topic with multiple partitions, then in order to receive all of the
237        events in all of the partitions, you will need to create one consumer
238        per partition.
239        """
240        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
241
242    def topic_producer(self, topic: str) -> TopicProducer:
243        """
244        Creates a new `TopicProducer` for the given topic name.
245
246        Currently, producers are scoped to a specific Fluvio topic. That means
247        when you send events via a producer, you must specify which partition
248        each event should go to.
249        """
250        return TopicProducer(self._inner.topic_producer(topic))
class Record:
15class Record:
16    """The individual record for a given stream."""
17
18    _inner: _Record
19
20    def __init__(self, inner: _Record):
21        self._inner = inner
22
23    def offset(self) -> int:
24        """The offset from the initial offset for a given stream."""
25        return self._inner.offset()
26
27    def value(self) -> typing.List[int]:
28        """Returns the contents of this Record's value"""
29        return self._inner.value()
30
31    def value_string(self) -> str:
32        """The UTF-8 decoded value for this record."""
33        return self._inner.value_string()
34
35    def key(self) -> typing.List[int]:
36        """Returns the contents of this Record's key, if it exists"""
37        return self._inner.key()
38
39    def key_string(self) -> str:
40        """The UTF-8 decoded key for this record."""
41        return self._inner.key_string()

The individual record for a given stream.

Record(inner: fluvio._fluvio_python.Record)
20    def __init__(self, inner: _Record):
21        self._inner = inner
def offset(self) -> int:
23    def offset(self) -> int:
24        """The offset from the initial offset for a given stream."""
25        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
27    def value(self) -> typing.List[int]:
28        """Returns the contents of this Record's value"""
29        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
31    def value_string(self) -> str:
32        """The UTF-8 decoded value for this record."""
33        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
35    def key(self) -> typing.List[int]:
36        """Returns the contents of this Record's key, if it exists"""
37        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
39    def key_string(self) -> str:
40        """The UTF-8 decoded key for this record."""
41        return self._inner.key_string()

The UTF-8 decoded key for this record.

class Offset:
44class Offset:
45    """Describes the location of an event stored in a Fluvio partition."""
46
47    _inner: _Offset
48
49    @classmethod
50    def absolute(cls, index: int):
51        """Creates an absolute offset with the given index"""
52        return cls(_Offset.absolute(index))
53
54    @classmethod
55    def beginning(cls):
56        """Creates a relative offset starting at the beginning of the saved log"""
57        return cls(_Offset.beginning())
58
59    @classmethod
60    def end(cls):
61        """Creates a relative offset pointing to the newest log entry"""
62        return cls(_Offset.end())
63
64    @classmethod
65    def from_beginning(cls, offset: int):
66        """Creates a relative offset a fixed distance after the oldest log
67        entry
68        """
69        return cls(_Offset.from_beginning(offset))
70
71    @classmethod
72    def from_end(cls, offset: int):
73        """Creates a relative offset a fixed distance before the newest log
74        entry
75        """
76        return cls(_Offset.from_end(offset))
77
78    def __init__(self, inner: _Offset):
79        self._inner = inner

Describes the location of an event stored in a Fluvio partition.

Offset(inner: fluvio._fluvio_python.Offset)
78    def __init__(self, inner: _Offset):
79        self._inner = inner
@classmethod
def absolute(cls, index: int):
49    @classmethod
50    def absolute(cls, index: int):
51        """Creates an absolute offset with the given index"""
52        return cls(_Offset.absolute(index))

Creates an absolute offset with the given index

@classmethod
def beginning(cls):
54    @classmethod
55    def beginning(cls):
56        """Creates a relative offset starting at the beginning of the saved log"""
57        return cls(_Offset.beginning())

Creates a relative offset starting at the beginning of the saved log

@classmethod
def end(cls):
59    @classmethod
60    def end(cls):
61        """Creates a relative offset pointing to the newest log entry"""
62        return cls(_Offset.end())

Creates a relative offset pointing to the newest log entry

@classmethod
def from_beginning(cls, offset: int):
64    @classmethod
65    def from_beginning(cls, offset: int):
66        """Creates a relative offset a fixed distance after the oldest log
67        entry
68        """
69        return cls(_Offset.from_beginning(offset))

Creates a relative offset a fixed distance after the oldest log entry

@classmethod
def from_end(cls, offset: int):
71    @classmethod
72    def from_end(cls, offset: int):
73        """Creates a relative offset a fixed distance before the newest log
74        entry
75        """
76        return cls(_Offset.from_end(offset))

Creates a relative offset a fixed distance before the newest log entry

class PartitionConsumerStream:
 82class PartitionConsumerStream:
 83    """An iterator for `PartitionConsumer.stream` method where each `__next__`
 84    returns a `Record`.
 85
 86    Usage:
 87
 88    ```python
 89    for i in consumer.stream(0):
 90        print(i.value())
 91        print(i.value_string())
 92    ```
 93    """
 94
 95    _inner: _PartitionConsumerStream
 96
 97    def __init__(self, inner: _PartitionConsumerStream):
 98        self._inner = inner
 99
100    def __iter__(self):
101        return self
102
103    def __next__(self) -> typing.Optional[Record]:
104        return Record(self._inner.next())

An iterator for PartitionConsumer.stream method where each __next__ returns a Record.

Usage:

for i in consumer.stream(0):
    print(i.value())
    print(i.value_string())
PartitionConsumerStream(inner: fluvio._fluvio_python.PartitionConsumerStream)
97    def __init__(self, inner: _PartitionConsumerStream):
98        self._inner = inner
class PartitionConsumer:
107class PartitionConsumer:
108    """
109    An interface for consuming events from a particular partition
110
111    There are two ways to consume events: by "fetching" events and by
112    "streaming" events. Fetching involves specifying a range of events that you
113    want to consume via their Offset. A fetch is a sort of one-time batch
114    operation: you’ll receive all of the events in your range all at once. When
115    you consume events via Streaming, you specify a starting Offset and receive
116    an object that will continuously yield new events as they arrive.
117    """
118
119    _inner: _PartitionConsumer
120
121    def __init__(self, inner: _PartitionConsumer):
122        self._inner = inner
123
124    def stream(self, offset: Offset) -> PartitionConsumerStream:
125        """
126        Continuously streams events from a particular offset in the consumer’s
127        partition. This returns a `PartitionConsumerStream` which is an
128        iterator.
129
130        Streaming is one of the two ways to consume events in Fluvio. It is a
131        continuous request for new records arriving in a partition, beginning
132        at a particular offset. You specify the starting point of the stream
133        using an Offset and periodically receive events, either individually or
134        in batches.
135        """
136        return PartitionConsumerStream(self._inner.stream(offset._inner))
137
138    def stream_with_config(
139        self, offset: Offset, wasm_path: str
140    ) -> PartitionConsumerStream:
141        """
142        Continuously streams events from a particular offset with a SmartModule
143        WASM module in the consumer’s partition. This returns a
144        `PartitionConsumerStream` which is an iterator.
145
146        Streaming is one of the two ways to consume events in Fluvio. It is a
147        continuous request for new records arriving in a partition, beginning
148        at a particular offset. You specify the starting point of the stream
149        using an Offset and periodically receive events, either individually or
150        in batches.
151
152        Args:
153            offset: Offset
154            wasm_module_path: str - The absolute path to the WASM file
155
156        Example:
157            import os
158
159            wmp = os.path.abspath("somefilter.wasm")
160            for i in consumer.stream_with_config(Offset.beginning(), wmp):
161                # do something with i
162
163        Returns:
164            PartionConsumerStream
165
166        """
167        return PartitionConsumerStream(
168            self._inner.stream_with_config(offset._inner, wasm_path)
169        )

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: fluvio._fluvio_python.PartitionConsumer)
121    def __init__(self, inner: _PartitionConsumer):
122        self._inner = inner
def stream(self, offset: fluvio.Offset) -> fluvio.PartitionConsumerStream:
124    def stream(self, offset: Offset) -> PartitionConsumerStream:
125        """
126        Continuously streams events from a particular offset in the consumer’s
127        partition. This returns a `PartitionConsumerStream` which is an
128        iterator.
129
130        Streaming is one of the two ways to consume events in Fluvio. It is a
131        continuous request for new records arriving in a partition, beginning
132        at a particular offset. You specify the starting point of the stream
133        using an Offset and periodically receive events, either individually or
134        in batches.
135        """
136        return PartitionConsumerStream(self._inner.stream(offset._inner))

Continuously streams events from a particular offset in the consumer’s partition. This returns a PartitionConsumerStream which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

def stream_with_config( self, offset: fluvio.Offset, wasm_path: str) -> fluvio.PartitionConsumerStream:
138    def stream_with_config(
139        self, offset: Offset, wasm_path: str
140    ) -> PartitionConsumerStream:
141        """
142        Continuously streams events from a particular offset with a SmartModule
143        WASM module in the consumer’s partition. This returns a
144        `PartitionConsumerStream` which is an iterator.
145
146        Streaming is one of the two ways to consume events in Fluvio. It is a
147        continuous request for new records arriving in a partition, beginning
148        at a particular offset. You specify the starting point of the stream
149        using an Offset and periodically receive events, either individually or
150        in batches.
151
152        Args:
153            offset: Offset
154            wasm_module_path: str - The absolute path to the WASM file
155
156        Example:
157            import os
158
159            wmp = os.path.abspath("somefilter.wasm")
160            for i in consumer.stream_with_config(Offset.beginning(), wmp):
161                # do something with i
162
163        Returns:
164            PartionConsumerStream
165
166        """
167        return PartitionConsumerStream(
168            self._inner.stream_with_config(offset._inner, wasm_path)
169        )

Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s partition. This returns a PartitionConsumerStream which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file

Example: import os

wmp = os.path.abspath("somefilter.wasm")
for i in consumer.stream_with_config(Offset.beginning(), wmp):
    # do something with i

Returns: PartionConsumerStream

class TopicProducer:
172class TopicProducer:
173    """An interface for producing events to a particular topic.
174
175    A `TopicProducer` allows you to send events to the specific topic it was
176    initialized for. Once you have a `TopicProducer`, you can send events to
177    the topic, choosing which partition each event should be delivered to.
178    """
179
180    _inner: _TopicProducer
181
182    def __init__(self, inner: _TopicProducer):
183        self._inner = inner
184
185    def send_string(self, buf: str) -> None:
186        """Sends a string to this producer’s topic"""
187        return self.send([], buf.encode("utf-8"))
188
189    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
190        """
191        Sends a key/value record to this producer's Topic.
192
193        The partition that the record will be sent to is derived from the Key.
194        """
195        return self._inner.send(key, value)
196
197    def flush(self) -> None:
198        """
199        Send all the queued records in the producer batches.
200        """
201        return self._inner.flush()
202
203    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
204        """
205        Sends a list of key/value records as a batch to this producer's Topic.
206        :param records: The list of records to send
207        """
208        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
209        return self._inner.send_all(records_inner)

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: fluvio._fluvio_python.TopicProducer)
182    def __init__(self, inner: _TopicProducer):
183        self._inner = inner
def send_string(self, buf: str) -> None:
185    def send_string(self, buf: str) -> None:
186        """Sends a string to this producer’s topic"""
187        return self.send([], buf.encode("utf-8"))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> None:
189    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
190        """
191        Sends a key/value record to this producer's Topic.
192
193        The partition that the record will be sent to is derived from the Key.
194        """
195        return self._inner.send(key, value)

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
197    def flush(self) -> None:
198        """
199        Send all the queued records in the producer batches.
200        """
201        return self._inner.flush()

Send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]]):
203    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
204        """
205        Sends a list of key/value records as a batch to this producer's Topic.
206        :param records: The list of records to send
207        """
208        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
209        return self._inner.send_all(records_inner)

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class Fluvio:
212class Fluvio:
213    """An interface for interacting with Fluvio streaming."""
214
215    _inner: _Fluvio
216
217    def __init__(self, inner: _Fluvio):
218        self._inner = inner
219
220    @classmethod
221    def connect(cls):
222        """Creates a new Fluvio client using the current profile from
223        `~/.fluvio/config`
224
225        If there is no current profile or the `~/.fluvio/config` file does not
226        exist, then this will create a new profile with default settings and
227        set it as current, then try to connect to the cluster using those
228        settings.
229        """
230        return cls(_Fluvio.connect())
231
232    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
233        """Creates a new `PartitionConsumer` for the given topic and partition
234
235        Currently, consumers are scoped to both a specific Fluvio topic and to
236        a particular partition within that topic. That means that if you have a
237        topic with multiple partitions, then in order to receive all of the
238        events in all of the partitions, you will need to create one consumer
239        per partition.
240        """
241        return PartitionConsumer(self._inner.partition_consumer(topic, partition))
242
243    def topic_producer(self, topic: str) -> TopicProducer:
244        """
245        Creates a new `TopicProducer` for the given topic name.
246
247        Currently, producers are scoped to a specific Fluvio topic. That means
248        when you send events via a producer, you must specify which partition
249        each event should go to.
250        """
251        return TopicProducer(self._inner.topic_producer(topic))

An interface for interacting with Fluvio streaming.

Fluvio(inner: fluvio._fluvio_python.Fluvio)
217    def __init__(self, inner: _Fluvio):
218        self._inner = inner
@classmethod
def connect(cls):
220    @classmethod
221    def connect(cls):
222        """Creates a new Fluvio client using the current profile from
223        `~/.fluvio/config`
224
225        If there is no current profile or the `~/.fluvio/config` file does not
226        exist, then this will create a new profile with default settings and
227        set it as current, then try to connect to the cluster using those
228        settings.
229        """
230        return cls(_Fluvio.connect())

Creates a new Fluvio client using the current profile from ~/.fluvio/config

If there is no current profile or the ~/.fluvio/config file does not exist, then this will create a new profile with default settings and set it as current, then try to connect to the cluster using those settings.

def partition_consumer(self, topic: str, partition: int) -> fluvio.PartitionConsumer:
232    def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
233        """Creates a new `PartitionConsumer` for the given topic and partition
234
235        Currently, consumers are scoped to both a specific Fluvio topic and to
236        a particular partition within that topic. That means that if you have a
237        topic with multiple partitions, then in order to receive all of the
238        events in all of the partitions, you will need to create one consumer
239        per partition.
240        """
241        return PartitionConsumer(self._inner.partition_consumer(topic, partition))

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def topic_producer(self, topic: str) -> fluvio.TopicProducer:
243    def topic_producer(self, topic: str) -> TopicProducer:
244        """
245        Creates a new `TopicProducer` for the given topic name.
246
247        Currently, producers are scoped to a specific Fluvio topic. That means
248        when you send events via a producer, you must specify which partition
249        each event should go to.
250        """
251        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.