fluvio

  1from ._fluvio_python import (
  2    Fluvio as _Fluvio,
  3    PartitionConsumer as _PartitionConsumer,
  4    PartitionConsumerStream as _PartitionConsumerStream,
  5    TopicProducer as _TopicProducer,
  6    ProducerBatchRecord as _ProducerBatchRecord,
  7    Record as _Record,
  8    Offset as _Offset
  9)
 10from ._fluvio_python import Error as FluviorError  # noqa: F401
 11import typing
 12
 13
 14class Record:
 15    '''The individual record for a given stream.
 16    '''
 17    _inner: _Record
 18
 19    def __init__(self, inner: _Record):
 20        self._inner = inner
 21
 22    def offset(self) -> int:
 23        '''The offset from the initial offset for a given stream.
 24        '''
 25        return self._inner.offset()
 26
 27    def value(self) -> typing.List[int]:
 28        '''Returns the contents of this Record's value
 29        '''
 30        return self._inner.value()
 31
 32    def value_string(self) -> str:
 33        '''The UTF-8 decoded value for this record.
 34        '''
 35        return self._inner.value_string()
 36
 37    def key(self) -> typing.List[int]:
 38        '''Returns the contents of this Record's key, if it exists
 39        '''
 40        return self._inner.key()
 41
 42    def key_string(self) -> str:
 43        '''The UTF-8 decoded key for this record.
 44        '''
 45        return self._inner.key_string()
 46
 47
 48class Offset:
 49    '''Describes the location of an event stored in a Fluvio partition.
 50    '''
 51    _inner: _Offset
 52
 53    @classmethod
 54    def absolute(cls, index: int):
 55        '''Creates an absolute offset with the given index'''
 56        return cls(_Offset.absolute(index))
 57
 58    @classmethod
 59    def beginning(cls):
 60        '''Creates a relative offset starting at the beginning of the saved log
 61        '''
 62        return cls(_Offset.beginning())
 63
 64    @classmethod
 65    def end(cls):
 66        '''Creates a relative offset pointing to the newest log entry
 67        '''
 68        return cls(_Offset.end())
 69
 70    @classmethod
 71    def from_beginning(cls, offset: int):
 72        '''Creates a relative offset a fixed distance after the oldest log
 73        entry
 74        '''
 75        return cls(_Offset.from_beginning(offset))
 76
 77    @classmethod
 78    def from_end(cls, offset: int):
 79        '''Creates a relative offset a fixed distance before the newest log
 80        entry
 81        '''
 82        return cls(_Offset.from_end(offset))
 83
 84    def __init__(self, inner: _Offset):
 85        self._inner = inner
 86
 87
 88class PartitionConsumerStream:
 89    '''An iterator for `PartitionConsumer.stream` method where each `__next__`
 90    returns a `Record`.
 91
 92    Usage:
 93
 94    ```python
 95    for i in consumer.stream(0):
 96        print(i.value())
 97        print(i.value_string())
 98    ```
 99    '''
100    _inner: _PartitionConsumerStream
101
102    def __init__(self, inner: _PartitionConsumerStream):
103        self._inner = inner
104
105    def __iter__(self):
106        return self
107
108    def __next__(self) -> typing.Optional[Record]:
109        return Record(self._inner.next())
110
111
112class PartitionConsumer:
113    '''
114    An interface for consuming events from a particular partition
115
116    There are two ways to consume events: by "fetching" events and by
117    "streaming" events. Fetching involves specifying a range of events that you
118    want to consume via their Offset. A fetch is a sort of one-time batch
119    operation: you’ll receive all of the events in your range all at once. When
120    you consume events via Streaming, you specify a starting Offset and receive
121    an object that will continuously yield new events as they arrive.
122    '''
123
124    _inner: _PartitionConsumer
125
126    def __init__(self, inner: _PartitionConsumer):
127        self._inner = inner
128
129    def stream(self, offset: Offset) -> PartitionConsumerStream:
130        '''
131        Continuously streams events from a particular offset in the consumer’s
132        partition. This returns a `PartitionConsumerStream` which is an
133        iterator.
134
135        Streaming is one of the two ways to consume events in Fluvio. It is a
136        continuous request for new records arriving in a partition, beginning
137        at a particular offset. You specify the starting point of the stream
138        using an Offset and periodically receive events, either individually or
139        in batches.
140        '''
141        return PartitionConsumerStream(self._inner.stream(offset._inner))
142
143
144class TopicProducer:
145    '''An interface for producing events to a particular topic.
146
147    A `TopicProducer` allows you to send events to the specific topic it was
148    initialized for. Once you have a `TopicProducer`, you can send events to
149    the topic, choosing which partition each event should be delivered to.
150    '''
151    _inner: _TopicProducer
152
153    def __init__(self, inner: _TopicProducer):
154        self._inner = inner
155
156    def send_string(self, buf: str) -> None:
157        '''Sends a string to this producer’s topic
158        '''
159        return self.send([], buf.encode('utf-8'))
160
161    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
162        '''
163        Sends a key/value record to this producer's Topic.
164
165        The partition that the record will be sent to is derived from the Key.
166        '''
167        return self._inner.send(key, value)
168
169    def flush(self) -> None:
170        '''
171        Send all the queued records in the producer batches.
172        '''
173        return self._inner.flush()
174
175    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
176        '''
177        Sends a list of key/value records as a batch to this producer's Topic.
178        :param records: The list of records to send
179        '''
180        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
181        return self._inner.send_all(records_inner)
182
183
184class Fluvio:
185    '''An interface for interacting with Fluvio streaming.'''
186    _inner: _Fluvio
187
188    def __init__(self, inner: _Fluvio):
189        self._inner = inner
190
191    @classmethod
192    def connect(cls):
193        '''Creates a new Fluvio client using the current profile from
194        `~/.fluvio/config`
195
196        If there is no current profile or the `~/.fluvio/config` file does not
197        exist, then this will create a new profile with default settings and
198        set it as current, then try to connect to the cluster using those
199        settings.
200        '''
201        return cls(_Fluvio.connect())
202
203    def partition_consumer(
204        self,
205        topic: str,
206        partition: int
207    ) -> PartitionConsumer:
208        '''Creates a new `PartitionConsumer` for the given topic and partition
209
210        Currently, consumers are scoped to both a specific Fluvio topic and to
211        a particular partition within that topic. That means that if you have a
212        topic with multiple partitions, then in order to receive all of the
213        events in all of the partitions, you will need to create one consumer
214        per partition.
215        '''
216        return PartitionConsumer(
217            self._inner.partition_consumer(topic, partition)
218        )
219
220    def topic_producer(self, topic: str) -> TopicProducer:
221        '''
222        Creates a new `TopicProducer` for the given topic name.
223
224        Currently, producers are scoped to a specific Fluvio topic. That means
225        when you send events via a producer, you must specify which partition
226        each event should go to.
227        '''
228        return TopicProducer(self._inner.topic_producer(topic))
class Record:
15class Record:
16    '''The individual record for a given stream.
17    '''
18    _inner: _Record
19
20    def __init__(self, inner: _Record):
21        self._inner = inner
22
23    def offset(self) -> int:
24        '''The offset from the initial offset for a given stream.
25        '''
26        return self._inner.offset()
27
28    def value(self) -> typing.List[int]:
29        '''Returns the contents of this Record's value
30        '''
31        return self._inner.value()
32
33    def value_string(self) -> str:
34        '''The UTF-8 decoded value for this record.
35        '''
36        return self._inner.value_string()
37
38    def key(self) -> typing.List[int]:
39        '''Returns the contents of this Record's key, if it exists
40        '''
41        return self._inner.key()
42
43    def key_string(self) -> str:
44        '''The UTF-8 decoded key for this record.
45        '''
46        return self._inner.key_string()

The individual record for a given stream.

Record(inner: fluvio._fluvio_python.Record)
20    def __init__(self, inner: _Record):
21        self._inner = inner
def offset(self) -> int:
23    def offset(self) -> int:
24        '''The offset from the initial offset for a given stream.
25        '''
26        return self._inner.offset()

The offset from the initial offset for a given stream.

def value(self) -> List[int]:
28    def value(self) -> typing.List[int]:
29        '''Returns the contents of this Record's value
30        '''
31        return self._inner.value()

Returns the contents of this Record's value

def value_string(self) -> str:
33    def value_string(self) -> str:
34        '''The UTF-8 decoded value for this record.
35        '''
36        return self._inner.value_string()

The UTF-8 decoded value for this record.

def key(self) -> List[int]:
38    def key(self) -> typing.List[int]:
39        '''Returns the contents of this Record's key, if it exists
40        '''
41        return self._inner.key()

Returns the contents of this Record's key, if it exists

def key_string(self) -> str:
43    def key_string(self) -> str:
44        '''The UTF-8 decoded key for this record.
45        '''
46        return self._inner.key_string()

The UTF-8 decoded key for this record.

class Offset:
49class Offset:
50    '''Describes the location of an event stored in a Fluvio partition.
51    '''
52    _inner: _Offset
53
54    @classmethod
55    def absolute(cls, index: int):
56        '''Creates an absolute offset with the given index'''
57        return cls(_Offset.absolute(index))
58
59    @classmethod
60    def beginning(cls):
61        '''Creates a relative offset starting at the beginning of the saved log
62        '''
63        return cls(_Offset.beginning())
64
65    @classmethod
66    def end(cls):
67        '''Creates a relative offset pointing to the newest log entry
68        '''
69        return cls(_Offset.end())
70
71    @classmethod
72    def from_beginning(cls, offset: int):
73        '''Creates a relative offset a fixed distance after the oldest log
74        entry
75        '''
76        return cls(_Offset.from_beginning(offset))
77
78    @classmethod
79    def from_end(cls, offset: int):
80        '''Creates a relative offset a fixed distance before the newest log
81        entry
82        '''
83        return cls(_Offset.from_end(offset))
84
85    def __init__(self, inner: _Offset):
86        self._inner = inner

Describes the location of an event stored in a Fluvio partition.

Offset(inner: fluvio._fluvio_python.Offset)
85    def __init__(self, inner: _Offset):
86        self._inner = inner
@classmethod
def absolute(cls, index: int)
54    @classmethod
55    def absolute(cls, index: int):
56        '''Creates an absolute offset with the given index'''
57        return cls(_Offset.absolute(index))

Creates an absolute offset with the given index

@classmethod
def beginning(cls)
59    @classmethod
60    def beginning(cls):
61        '''Creates a relative offset starting at the beginning of the saved log
62        '''
63        return cls(_Offset.beginning())

Creates a relative offset starting at the beginning of the saved log

@classmethod
def end(cls)
65    @classmethod
66    def end(cls):
67        '''Creates a relative offset pointing to the newest log entry
68        '''
69        return cls(_Offset.end())

Creates a relative offset pointing to the newest log entry

@classmethod
def from_beginning(cls, offset: int)
71    @classmethod
72    def from_beginning(cls, offset: int):
73        '''Creates a relative offset a fixed distance after the oldest log
74        entry
75        '''
76        return cls(_Offset.from_beginning(offset))

Creates a relative offset a fixed distance after the oldest log entry

@classmethod
def from_end(cls, offset: int)
78    @classmethod
79    def from_end(cls, offset: int):
80        '''Creates a relative offset a fixed distance before the newest log
81        entry
82        '''
83        return cls(_Offset.from_end(offset))

Creates a relative offset a fixed distance before the newest log entry

class PartitionConsumerStream:
 89class PartitionConsumerStream:
 90    '''An iterator for `PartitionConsumer.stream` method where each `__next__`
 91    returns a `Record`.
 92
 93    Usage:
 94
 95    ```python
 96    for i in consumer.stream(0):
 97        print(i.value())
 98        print(i.value_string())
 99    ```
100    '''
101    _inner: _PartitionConsumerStream
102
103    def __init__(self, inner: _PartitionConsumerStream):
104        self._inner = inner
105
106    def __iter__(self):
107        return self
108
109    def __next__(self) -> typing.Optional[Record]:
110        return Record(self._inner.next())

An iterator for PartitionConsumer.stream method where each __next__ returns a Record.

Usage:

for i in consumer.stream(0):
    print(i.value())
    print(i.value_string())
PartitionConsumerStream(inner: fluvio._fluvio_python.PartitionConsumerStream)
103    def __init__(self, inner: _PartitionConsumerStream):
104        self._inner = inner
class PartitionConsumer:
113class PartitionConsumer:
114    '''
115    An interface for consuming events from a particular partition
116
117    There are two ways to consume events: by "fetching" events and by
118    "streaming" events. Fetching involves specifying a range of events that you
119    want to consume via their Offset. A fetch is a sort of one-time batch
120    operation: you’ll receive all of the events in your range all at once. When
121    you consume events via Streaming, you specify a starting Offset and receive
122    an object that will continuously yield new events as they arrive.
123    '''
124
125    _inner: _PartitionConsumer
126
127    def __init__(self, inner: _PartitionConsumer):
128        self._inner = inner
129
130    def stream(self, offset: Offset) -> PartitionConsumerStream:
131        '''
132        Continuously streams events from a particular offset in the consumer’s
133        partition. This returns a `PartitionConsumerStream` which is an
134        iterator.
135
136        Streaming is one of the two ways to consume events in Fluvio. It is a
137        continuous request for new records arriving in a partition, beginning
138        at a particular offset. You specify the starting point of the stream
139        using an Offset and periodically receive events, either individually or
140        in batches.
141        '''
142        return PartitionConsumerStream(self._inner.stream(offset._inner))

An interface for consuming events from a particular partition

There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

PartitionConsumer(inner: fluvio._fluvio_python.PartitionConsumer)
127    def __init__(self, inner: _PartitionConsumer):
128        self._inner = inner
def stream(self, offset: fluvio.Offset) -> fluvio.PartitionConsumerStream:
130    def stream(self, offset: Offset) -> PartitionConsumerStream:
131        '''
132        Continuously streams events from a particular offset in the consumer’s
133        partition. This returns a `PartitionConsumerStream` which is an
134        iterator.
135
136        Streaming is one of the two ways to consume events in Fluvio. It is a
137        continuous request for new records arriving in a partition, beginning
138        at a particular offset. You specify the starting point of the stream
139        using an Offset and periodically receive events, either individually or
140        in batches.
141        '''
142        return PartitionConsumerStream(self._inner.stream(offset._inner))

Continuously streams events from a particular offset in the consumer’s partition. This returns a PartitionConsumerStream which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

class TopicProducer:
145class TopicProducer:
146    '''An interface for producing events to a particular topic.
147
148    A `TopicProducer` allows you to send events to the specific topic it was
149    initialized for. Once you have a `TopicProducer`, you can send events to
150    the topic, choosing which partition each event should be delivered to.
151    '''
152    _inner: _TopicProducer
153
154    def __init__(self, inner: _TopicProducer):
155        self._inner = inner
156
157    def send_string(self, buf: str) -> None:
158        '''Sends a string to this producer’s topic
159        '''
160        return self.send([], buf.encode('utf-8'))
161
162    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
163        '''
164        Sends a key/value record to this producer's Topic.
165
166        The partition that the record will be sent to is derived from the Key.
167        '''
168        return self._inner.send(key, value)
169
170    def flush(self) -> None:
171        '''
172        Send all the queued records in the producer batches.
173        '''
174        return self._inner.flush()
175
176    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
177        '''
178        Sends a list of key/value records as a batch to this producer's Topic.
179        :param records: The list of records to send
180        '''
181        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
182        return self._inner.send_all(records_inner)

An interface for producing events to a particular topic.

A TopicProducer allows you to send events to the specific topic it was initialized for. Once you have a TopicProducer, you can send events to the topic, choosing which partition each event should be delivered to.

TopicProducer(inner: fluvio._fluvio_python.TopicProducer)
154    def __init__(self, inner: _TopicProducer):
155        self._inner = inner
def send_string(self, buf: str) -> None:
157    def send_string(self, buf: str) -> None:
158        '''Sends a string to this producer’s topic
159        '''
160        return self.send([], buf.encode('utf-8'))

Sends a string to this producer’s topic

def send(self, key: List[int], value: List[int]) -> None:
162    def send(self, key: typing.List[int], value: typing.List[int]) -> None:
163        '''
164        Sends a key/value record to this producer's Topic.
165
166        The partition that the record will be sent to is derived from the Key.
167        '''
168        return self._inner.send(key, value)

Sends a key/value record to this producer's Topic.

The partition that the record will be sent to is derived from the Key.

def flush(self) -> None:
170    def flush(self) -> None:
171        '''
172        Send all the queued records in the producer batches.
173        '''
174        return self._inner.flush()

Send all the queued records in the producer batches.

def send_all(self, records: List[Tuple[bytes, bytes]])
176    def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
177        '''
178        Sends a list of key/value records as a batch to this producer's Topic.
179        :param records: The list of records to send
180        '''
181        records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records]
182        return self._inner.send_all(records_inner)

Sends a list of key/value records as a batch to this producer's Topic.

Parameters
  • records: The list of records to send
class Fluvio:
185class Fluvio:
186    '''An interface for interacting with Fluvio streaming.'''
187    _inner: _Fluvio
188
189    def __init__(self, inner: _Fluvio):
190        self._inner = inner
191
192    @classmethod
193    def connect(cls):
194        '''Creates a new Fluvio client using the current profile from
195        `~/.fluvio/config`
196
197        If there is no current profile or the `~/.fluvio/config` file does not
198        exist, then this will create a new profile with default settings and
199        set it as current, then try to connect to the cluster using those
200        settings.
201        '''
202        return cls(_Fluvio.connect())
203
204    def partition_consumer(
205        self,
206        topic: str,
207        partition: int
208    ) -> PartitionConsumer:
209        '''Creates a new `PartitionConsumer` for the given topic and partition
210
211        Currently, consumers are scoped to both a specific Fluvio topic and to
212        a particular partition within that topic. That means that if you have a
213        topic with multiple partitions, then in order to receive all of the
214        events in all of the partitions, you will need to create one consumer
215        per partition.
216        '''
217        return PartitionConsumer(
218            self._inner.partition_consumer(topic, partition)
219        )
220
221    def topic_producer(self, topic: str) -> TopicProducer:
222        '''
223        Creates a new `TopicProducer` for the given topic name.
224
225        Currently, producers are scoped to a specific Fluvio topic. That means
226        when you send events via a producer, you must specify which partition
227        each event should go to.
228        '''
229        return TopicProducer(self._inner.topic_producer(topic))

An interface for interacting with Fluvio streaming.

Fluvio(inner: fluvio._fluvio_python.Fluvio)
189    def __init__(self, inner: _Fluvio):
190        self._inner = inner
@classmethod
def connect(cls)
192    @classmethod
193    def connect(cls):
194        '''Creates a new Fluvio client using the current profile from
195        `~/.fluvio/config`
196
197        If there is no current profile or the `~/.fluvio/config` file does not
198        exist, then this will create a new profile with default settings and
199        set it as current, then try to connect to the cluster using those
200        settings.
201        '''
202        return cls(_Fluvio.connect())

Creates a new Fluvio client using the current profile from ~/.fluvio/config

If there is no current profile or the ~/.fluvio/config file does not exist, then this will create a new profile with default settings and set it as current, then try to connect to the cluster using those settings.

def partition_consumer(self, topic: str, partition: int) -> fluvio.PartitionConsumer:
204    def partition_consumer(
205        self,
206        topic: str,
207        partition: int
208    ) -> PartitionConsumer:
209        '''Creates a new `PartitionConsumer` for the given topic and partition
210
211        Currently, consumers are scoped to both a specific Fluvio topic and to
212        a particular partition within that topic. That means that if you have a
213        topic with multiple partitions, then in order to receive all of the
214        events in all of the partitions, you will need to create one consumer
215        per partition.
216        '''
217        return PartitionConsumer(
218            self._inner.partition_consumer(topic, partition)
219        )

Creates a new PartitionConsumer for the given topic and partition

Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.

def topic_producer(self, topic: str) -> fluvio.TopicProducer:
221    def topic_producer(self, topic: str) -> TopicProducer:
222        '''
223        Creates a new `TopicProducer` for the given topic name.
224
225        Currently, producers are scoped to a specific Fluvio topic. That means
226        when you send events via a producer, you must specify which partition
227        each event should go to.
228        '''
229        return TopicProducer(self._inner.topic_producer(topic))

Creates a new TopicProducer for the given topic name.

Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.