fluvio
1from ._fluvio_python import ( 2 Fluvio as _Fluvio, 3 ConsumerConfig as _ConsumerConfig, 4 PartitionConsumer as _PartitionConsumer, 5 PartitionConsumerStream as _PartitionConsumerStream, 6 TopicProducer as _TopicProducer, 7 ProducerBatchRecord as _ProducerBatchRecord, 8 SmartModuleKind as _SmartModuleKind, 9 Record as _Record, 10 Offset as _Offset, 11) 12from enum import Enum 13from ._fluvio_python import Error as FluviorError # noqa: F401 14import typing 15 16 17class Record: 18 """The individual record for a given stream.""" 19 20 _inner: _Record 21 22 def __init__(self, inner: _Record): 23 self._inner = inner 24 25 def offset(self) -> int: 26 """The offset from the initial offset for a given stream.""" 27 return self._inner.offset() 28 29 def value(self) -> typing.List[int]: 30 """Returns the contents of this Record's value""" 31 return self._inner.value() 32 33 def value_string(self) -> str: 34 """The UTF-8 decoded value for this record.""" 35 return self._inner.value_string() 36 37 def key(self) -> typing.List[int]: 38 """Returns the contents of this Record's key, if it exists""" 39 return self._inner.key() 40 41 def key_string(self) -> str: 42 """The UTF-8 decoded key for this record.""" 43 return self._inner.key_string() 44 45 46class Offset: 47 """Describes the location of an event stored in a Fluvio partition.""" 48 49 _inner: _Offset 50 51 @classmethod 52 def absolute(cls, index: int): 53 """Creates an absolute offset with the given index""" 54 return cls(_Offset.absolute(index)) 55 56 @classmethod 57 def beginning(cls): 58 """Creates a relative offset starting at the beginning of the saved log""" 59 return cls(_Offset.beginning()) 60 61 @classmethod 62 def end(cls): 63 """Creates a relative offset pointing to the newest log entry""" 64 return cls(_Offset.end()) 65 66 @classmethod 67 def from_beginning(cls, offset: int): 68 """Creates a relative offset a fixed distance after the oldest log 69 entry 70 """ 71 return cls(_Offset.from_beginning(offset)) 72 73 @classmethod 74 def from_end(cls, offset: int): 75 """Creates a relative offset a fixed distance before the newest log 76 entry 77 """ 78 return cls(_Offset.from_end(offset)) 79 80 def __init__(self, inner: _Offset): 81 self._inner = inner 82 83 84class SmartModuleKind(Enum): 85 """ 86 Use of this is to explicitly set the kind of a smartmodule. Not required 87 but needed for legacy SmartModules. 88 """ 89 90 Filter = _SmartModuleKind.Filter 91 Map = _SmartModuleKind.Map 92 ArrayMap = _SmartModuleKind.ArrayMap 93 FilterMap = _SmartModuleKind.FilterMap 94 Aggregate = _SmartModuleKind.Aggregate 95 96 97class ConsumerConfig: 98 _inner: _ConsumerConfig 99 100 def __init__(self): 101 self._inner = _ConsumerConfig() 102 103 def smartmodule( 104 self, 105 name: str = None, 106 path: str = None, 107 kind: SmartModuleKind = None, 108 params: typing.Dict[str, str] = None, 109 aggregate: typing.List[bytes] = None, 110 ): 111 """ 112 This is a method for adding a smartmodule to a consumer config either 113 using a `name` of a `SmartModule` or a `path` to a wasm binary. 114 115 Args: 116 117 name: str 118 path: str 119 kind: SmartModuleKind 120 params: Dict[str, str] 121 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 122 123 Raises: 124 "Require either a path or a name for a smartmodule." 125 "Only specify one of path or name not both." 126 127 Returns: 128 129 None 130 """ 131 132 if kind is not None: 133 kind = kind.value 134 135 if path is None and name is None: 136 raise Exception("Require either a path or a name for a smartmodule.") 137 138 if path is not None and name is not None: 139 raise Exception("Only specify one of path or name not both.") 140 141 params = {} if params is None else params 142 param_keys = [x for x in params.keys()] 143 param_vals = [x for x in params.values()] 144 145 self._inner.smartmodule( 146 name, 147 path, 148 kind, 149 param_keys, 150 param_vals, 151 aggregate, 152 # These arguments are for Join stuff but that's not implemented on 153 # the python side yet 154 None, 155 None, 156 None, 157 None, 158 ) 159 160 161class PartitionConsumer: 162 """ 163 An interface for consuming events from a particular partition 164 165 There are two ways to consume events: by "fetching" events and by 166 "streaming" events. Fetching involves specifying a range of events that you 167 want to consume via their Offset. A fetch is a sort of one-time batch 168 operation: you’ll receive all of the events in your range all at once. When 169 you consume events via Streaming, you specify a starting Offset and receive 170 an object that will continuously yield new events as they arrive. 171 """ 172 173 _inner: _PartitionConsumer 174 175 def __init__(self, inner: _PartitionConsumer): 176 self._inner = inner 177 178 def stream(self, offset: Offset) -> typing.Iterator[Record]: 179 """ 180 Continuously streams events from a particular offset in the consumer’s 181 partition. This returns a `Iterator[Record]` which is an 182 iterator. 183 184 Streaming is one of the two ways to consume events in Fluvio. It is a 185 continuous request for new records arriving in a partition, beginning 186 at a particular offset. You specify the starting point of the stream 187 using an Offset and periodically receive events, either individually or 188 in batches. 189 """ 190 return self._generator(self._inner.stream(offset._inner)) 191 192 def stream_with_config( 193 self, offset: Offset, config: ConsumerConfig 194 ) -> typing.Iterator[Record]: 195 """ 196 Continuously streams events from a particular offset with a SmartModule 197 WASM module in the consumer’s partition. This returns a 198 `Iterator[Record]` which is an iterator. 199 200 Streaming is one of the two ways to consume events in Fluvio. It is a 201 continuous request for new records arriving in a partition, beginning 202 at a particular offset. You specify the starting point of the stream 203 using an Offset and periodically receive events, either individually or 204 in batches. 205 206 Args: 207 offset: Offset 208 wasm_module_path: str - The absolute path to the WASM file 209 210 Example: 211 import os 212 213 wmp = os.path.abspath("somefilter.wasm") 214 config = ConsumerConfig() 215 config.smartmodule(path=wmp) 216 for i in consumer.stream_with_config(Offset.beginning(), config): 217 # do something with i 218 219 Returns: 220 `Iterator[Record]` 221 222 """ 223 return self._generator( 224 self._inner.stream_with_config(offset._inner, config._inner) 225 ) 226 227 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 228 item = stream.next() 229 while item is not None: 230 yield Record(item) 231 item = stream.next() 232 233 234class TopicProducer: 235 """An interface for producing events to a particular topic. 236 237 A `TopicProducer` allows you to send events to the specific topic it was 238 initialized for. Once you have a `TopicProducer`, you can send events to 239 the topic, choosing which partition each event should be delivered to. 240 """ 241 242 _inner: _TopicProducer 243 244 def __init__(self, inner: _TopicProducer): 245 self._inner = inner 246 247 def send_string(self, buf: str) -> None: 248 """Sends a string to this producer’s topic""" 249 return self.send([], buf.encode("utf-8")) 250 251 def send(self, key: typing.List[int], value: typing.List[int]) -> None: 252 """ 253 Sends a key/value record to this producer's Topic. 254 255 The partition that the record will be sent to is derived from the Key. 256 """ 257 return self._inner.send(key, value) 258 259 def flush(self) -> None: 260 """ 261 Send all the queued records in the producer batches. 262 """ 263 return self._inner.flush() 264 265 def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]): 266 """ 267 Sends a list of key/value records as a batch to this producer's Topic. 268 :param records: The list of records to send 269 """ 270 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 271 return self._inner.send_all(records_inner) 272 273 274class Fluvio: 275 """An interface for interacting with Fluvio streaming.""" 276 277 _inner: _Fluvio 278 279 def __init__(self, inner: _Fluvio): 280 self._inner = inner 281 282 @classmethod 283 def connect(cls): 284 """Creates a new Fluvio client using the current profile from 285 `~/.fluvio/config` 286 287 If there is no current profile or the `~/.fluvio/config` file does not 288 exist, then this will create a new profile with default settings and 289 set it as current, then try to connect to the cluster using those 290 settings. 291 """ 292 return cls(_Fluvio.connect()) 293 294 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 295 """Creates a new `PartitionConsumer` for the given topic and partition 296 297 Currently, consumers are scoped to both a specific Fluvio topic and to 298 a particular partition within that topic. That means that if you have a 299 topic with multiple partitions, then in order to receive all of the 300 events in all of the partitions, you will need to create one consumer 301 per partition. 302 """ 303 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 304 305 def topic_producer(self, topic: str) -> TopicProducer: 306 """ 307 Creates a new `TopicProducer` for the given topic name. 308 309 Currently, producers are scoped to a specific Fluvio topic. That means 310 when you send events via a producer, you must specify which partition 311 each event should go to. 312 """ 313 return TopicProducer(self._inner.topic_producer(topic))
18class Record: 19 """The individual record for a given stream.""" 20 21 _inner: _Record 22 23 def __init__(self, inner: _Record): 24 self._inner = inner 25 26 def offset(self) -> int: 27 """The offset from the initial offset for a given stream.""" 28 return self._inner.offset() 29 30 def value(self) -> typing.List[int]: 31 """Returns the contents of this Record's value""" 32 return self._inner.value() 33 34 def value_string(self) -> str: 35 """The UTF-8 decoded value for this record.""" 36 return self._inner.value_string() 37 38 def key(self) -> typing.List[int]: 39 """Returns the contents of this Record's key, if it exists""" 40 return self._inner.key() 41 42 def key_string(self) -> str: 43 """The UTF-8 decoded key for this record.""" 44 return self._inner.key_string()
The individual record for a given stream.
26 def offset(self) -> int: 27 """The offset from the initial offset for a given stream.""" 28 return self._inner.offset()
The offset from the initial offset for a given stream.
30 def value(self) -> typing.List[int]: 31 """Returns the contents of this Record's value""" 32 return self._inner.value()
Returns the contents of this Record's value
34 def value_string(self) -> str: 35 """The UTF-8 decoded value for this record.""" 36 return self._inner.value_string()
The UTF-8 decoded value for this record.
47class Offset: 48 """Describes the location of an event stored in a Fluvio partition.""" 49 50 _inner: _Offset 51 52 @classmethod 53 def absolute(cls, index: int): 54 """Creates an absolute offset with the given index""" 55 return cls(_Offset.absolute(index)) 56 57 @classmethod 58 def beginning(cls): 59 """Creates a relative offset starting at the beginning of the saved log""" 60 return cls(_Offset.beginning()) 61 62 @classmethod 63 def end(cls): 64 """Creates a relative offset pointing to the newest log entry""" 65 return cls(_Offset.end()) 66 67 @classmethod 68 def from_beginning(cls, offset: int): 69 """Creates a relative offset a fixed distance after the oldest log 70 entry 71 """ 72 return cls(_Offset.from_beginning(offset)) 73 74 @classmethod 75 def from_end(cls, offset: int): 76 """Creates a relative offset a fixed distance before the newest log 77 entry 78 """ 79 return cls(_Offset.from_end(offset)) 80 81 def __init__(self, inner: _Offset): 82 self._inner = inner
Describes the location of an event stored in a Fluvio partition.
52 @classmethod 53 def absolute(cls, index: int): 54 """Creates an absolute offset with the given index""" 55 return cls(_Offset.absolute(index))
Creates an absolute offset with the given index
57 @classmethod 58 def beginning(cls): 59 """Creates a relative offset starting at the beginning of the saved log""" 60 return cls(_Offset.beginning())
Creates a relative offset starting at the beginning of the saved log
62 @classmethod 63 def end(cls): 64 """Creates a relative offset pointing to the newest log entry""" 65 return cls(_Offset.end())
Creates a relative offset pointing to the newest log entry
67 @classmethod 68 def from_beginning(cls, offset: int): 69 """Creates a relative offset a fixed distance after the oldest log 70 entry 71 """ 72 return cls(_Offset.from_beginning(offset))
Creates a relative offset a fixed distance after the oldest log entry
85class SmartModuleKind(Enum): 86 """ 87 Use of this is to explicitly set the kind of a smartmodule. Not required 88 but needed for legacy SmartModules. 89 """ 90 91 Filter = _SmartModuleKind.Filter 92 Map = _SmartModuleKind.Map 93 ArrayMap = _SmartModuleKind.ArrayMap 94 FilterMap = _SmartModuleKind.FilterMap 95 Aggregate = _SmartModuleKind.Aggregate
Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.
Inherited Members
- enum.Enum
- name
- value
98class ConsumerConfig: 99 _inner: _ConsumerConfig 100 101 def __init__(self): 102 self._inner = _ConsumerConfig() 103 104 def smartmodule( 105 self, 106 name: str = None, 107 path: str = None, 108 kind: SmartModuleKind = None, 109 params: typing.Dict[str, str] = None, 110 aggregate: typing.List[bytes] = None, 111 ): 112 """ 113 This is a method for adding a smartmodule to a consumer config either 114 using a `name` of a `SmartModule` or a `path` to a wasm binary. 115 116 Args: 117 118 name: str 119 path: str 120 kind: SmartModuleKind 121 params: Dict[str, str] 122 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 123 124 Raises: 125 "Require either a path or a name for a smartmodule." 126 "Only specify one of path or name not both." 127 128 Returns: 129 130 None 131 """ 132 133 if kind is not None: 134 kind = kind.value 135 136 if path is None and name is None: 137 raise Exception("Require either a path or a name for a smartmodule.") 138 139 if path is not None and name is not None: 140 raise Exception("Only specify one of path or name not both.") 141 142 params = {} if params is None else params 143 param_keys = [x for x in params.keys()] 144 param_vals = [x for x in params.values()] 145 146 self._inner.smartmodule( 147 name, 148 path, 149 kind, 150 param_keys, 151 param_vals, 152 aggregate, 153 # These arguments are for Join stuff but that's not implemented on 154 # the python side yet 155 None, 156 None, 157 None, 158 None, 159 )
104 def smartmodule( 105 self, 106 name: str = None, 107 path: str = None, 108 kind: SmartModuleKind = None, 109 params: typing.Dict[str, str] = None, 110 aggregate: typing.List[bytes] = None, 111 ): 112 """ 113 This is a method for adding a smartmodule to a consumer config either 114 using a `name` of a `SmartModule` or a `path` to a wasm binary. 115 116 Args: 117 118 name: str 119 path: str 120 kind: SmartModuleKind 121 params: Dict[str, str] 122 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 123 124 Raises: 125 "Require either a path or a name for a smartmodule." 126 "Only specify one of path or name not both." 127 128 Returns: 129 130 None 131 """ 132 133 if kind is not None: 134 kind = kind.value 135 136 if path is None and name is None: 137 raise Exception("Require either a path or a name for a smartmodule.") 138 139 if path is not None and name is not None: 140 raise Exception("Only specify one of path or name not both.") 141 142 params = {} if params is None else params 143 param_keys = [x for x in params.keys()] 144 param_vals = [x for x in params.values()] 145 146 self._inner.smartmodule( 147 name, 148 path, 149 kind, 150 param_keys, 151 param_vals, 152 aggregate, 153 # These arguments are for Join stuff but that's not implemented on 154 # the python side yet 155 None, 156 None, 157 None, 158 None, 159 )
This is a method for adding a smartmodule to a consumer config either
using a name
of a SmartModule
or a path
to a wasm binary.
Args:
name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."
Returns:
None
162class PartitionConsumer: 163 """ 164 An interface for consuming events from a particular partition 165 166 There are two ways to consume events: by "fetching" events and by 167 "streaming" events. Fetching involves specifying a range of events that you 168 want to consume via their Offset. A fetch is a sort of one-time batch 169 operation: you’ll receive all of the events in your range all at once. When 170 you consume events via Streaming, you specify a starting Offset and receive 171 an object that will continuously yield new events as they arrive. 172 """ 173 174 _inner: _PartitionConsumer 175 176 def __init__(self, inner: _PartitionConsumer): 177 self._inner = inner 178 179 def stream(self, offset: Offset) -> typing.Iterator[Record]: 180 """ 181 Continuously streams events from a particular offset in the consumer’s 182 partition. This returns a `Iterator[Record]` which is an 183 iterator. 184 185 Streaming is one of the two ways to consume events in Fluvio. It is a 186 continuous request for new records arriving in a partition, beginning 187 at a particular offset. You specify the starting point of the stream 188 using an Offset and periodically receive events, either individually or 189 in batches. 190 """ 191 return self._generator(self._inner.stream(offset._inner)) 192 193 def stream_with_config( 194 self, offset: Offset, config: ConsumerConfig 195 ) -> typing.Iterator[Record]: 196 """ 197 Continuously streams events from a particular offset with a SmartModule 198 WASM module in the consumer’s partition. This returns a 199 `Iterator[Record]` which is an iterator. 200 201 Streaming is one of the two ways to consume events in Fluvio. It is a 202 continuous request for new records arriving in a partition, beginning 203 at a particular offset. You specify the starting point of the stream 204 using an Offset and periodically receive events, either individually or 205 in batches. 206 207 Args: 208 offset: Offset 209 wasm_module_path: str - The absolute path to the WASM file 210 211 Example: 212 import os 213 214 wmp = os.path.abspath("somefilter.wasm") 215 config = ConsumerConfig() 216 config.smartmodule(path=wmp) 217 for i in consumer.stream_with_config(Offset.beginning(), config): 218 # do something with i 219 220 Returns: 221 `Iterator[Record]` 222 223 """ 224 return self._generator( 225 self._inner.stream_with_config(offset._inner, config._inner) 226 ) 227 228 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 229 item = stream.next() 230 while item is not None: 231 yield Record(item) 232 item = stream.next()
An interface for consuming events from a particular partition
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
179 def stream(self, offset: Offset) -> typing.Iterator[Record]: 180 """ 181 Continuously streams events from a particular offset in the consumer’s 182 partition. This returns a `Iterator[Record]` which is an 183 iterator. 184 185 Streaming is one of the two ways to consume events in Fluvio. It is a 186 continuous request for new records arriving in a partition, beginning 187 at a particular offset. You specify the starting point of the stream 188 using an Offset and periodically receive events, either individually or 189 in batches. 190 """ 191 return self._generator(self._inner.stream(offset._inner))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
193 def stream_with_config( 194 self, offset: Offset, config: ConsumerConfig 195 ) -> typing.Iterator[Record]: 196 """ 197 Continuously streams events from a particular offset with a SmartModule 198 WASM module in the consumer’s partition. This returns a 199 `Iterator[Record]` which is an iterator. 200 201 Streaming is one of the two ways to consume events in Fluvio. It is a 202 continuous request for new records arriving in a partition, beginning 203 at a particular offset. You specify the starting point of the stream 204 using an Offset and periodically receive events, either individually or 205 in batches. 206 207 Args: 208 offset: Offset 209 wasm_module_path: str - The absolute path to the WASM file 210 211 Example: 212 import os 213 214 wmp = os.path.abspath("somefilter.wasm") 215 config = ConsumerConfig() 216 config.smartmodule(path=wmp) 217 for i in consumer.stream_with_config(Offset.beginning(), config): 218 # do something with i 219 220 Returns: 221 `Iterator[Record]` 222 223 """ 224 return self._generator( 225 self._inner.stream_with_config(offset._inner, config._inner) 226 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
235class TopicProducer: 236 """An interface for producing events to a particular topic. 237 238 A `TopicProducer` allows you to send events to the specific topic it was 239 initialized for. Once you have a `TopicProducer`, you can send events to 240 the topic, choosing which partition each event should be delivered to. 241 """ 242 243 _inner: _TopicProducer 244 245 def __init__(self, inner: _TopicProducer): 246 self._inner = inner 247 248 def send_string(self, buf: str) -> None: 249 """Sends a string to this producer’s topic""" 250 return self.send([], buf.encode("utf-8")) 251 252 def send(self, key: typing.List[int], value: typing.List[int]) -> None: 253 """ 254 Sends a key/value record to this producer's Topic. 255 256 The partition that the record will be sent to is derived from the Key. 257 """ 258 return self._inner.send(key, value) 259 260 def flush(self) -> None: 261 """ 262 Send all the queued records in the producer batches. 263 """ 264 return self._inner.flush() 265 266 def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]): 267 """ 268 Sends a list of key/value records as a batch to this producer's Topic. 269 :param records: The list of records to send 270 """ 271 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 272 return self._inner.send_all(records_inner)
An interface for producing events to a particular topic.
A TopicProducer
allows you to send events to the specific topic it was
initialized for. Once you have a TopicProducer
, you can send events to
the topic, choosing which partition each event should be delivered to.
248 def send_string(self, buf: str) -> None: 249 """Sends a string to this producer’s topic""" 250 return self.send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
252 def send(self, key: typing.List[int], value: typing.List[int]) -> None: 253 """ 254 Sends a key/value record to this producer's Topic. 255 256 The partition that the record will be sent to is derived from the Key. 257 """ 258 return self._inner.send(key, value)
Sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
260 def flush(self) -> None: 261 """ 262 Send all the queued records in the producer batches. 263 """ 264 return self._inner.flush()
Send all the queued records in the producer batches.
266 def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]): 267 """ 268 Sends a list of key/value records as a batch to this producer's Topic. 269 :param records: The list of records to send 270 """ 271 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 272 return self._inner.send_all(records_inner)
Sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
275class Fluvio: 276 """An interface for interacting with Fluvio streaming.""" 277 278 _inner: _Fluvio 279 280 def __init__(self, inner: _Fluvio): 281 self._inner = inner 282 283 @classmethod 284 def connect(cls): 285 """Creates a new Fluvio client using the current profile from 286 `~/.fluvio/config` 287 288 If there is no current profile or the `~/.fluvio/config` file does not 289 exist, then this will create a new profile with default settings and 290 set it as current, then try to connect to the cluster using those 291 settings. 292 """ 293 return cls(_Fluvio.connect()) 294 295 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 296 """Creates a new `PartitionConsumer` for the given topic and partition 297 298 Currently, consumers are scoped to both a specific Fluvio topic and to 299 a particular partition within that topic. That means that if you have a 300 topic with multiple partitions, then in order to receive all of the 301 events in all of the partitions, you will need to create one consumer 302 per partition. 303 """ 304 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 305 306 def topic_producer(self, topic: str) -> TopicProducer: 307 """ 308 Creates a new `TopicProducer` for the given topic name. 309 310 Currently, producers are scoped to a specific Fluvio topic. That means 311 when you send events via a producer, you must specify which partition 312 each event should go to. 313 """ 314 return TopicProducer(self._inner.topic_producer(topic))
An interface for interacting with Fluvio streaming.
283 @classmethod 284 def connect(cls): 285 """Creates a new Fluvio client using the current profile from 286 `~/.fluvio/config` 287 288 If there is no current profile or the `~/.fluvio/config` file does not 289 exist, then this will create a new profile with default settings and 290 set it as current, then try to connect to the cluster using those 291 settings. 292 """ 293 return cls(_Fluvio.connect())
Creates a new Fluvio client using the current profile from
~/.fluvio/config
If there is no current profile or the ~/.fluvio/config
file does not
exist, then this will create a new profile with default settings and
set it as current, then try to connect to the cluster using those
settings.
295 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 296 """Creates a new `PartitionConsumer` for the given topic and partition 297 298 Currently, consumers are scoped to both a specific Fluvio topic and to 299 a particular partition within that topic. That means that if you have a 300 topic with multiple partitions, then in order to receive all of the 301 events in all of the partitions, you will need to create one consumer 302 per partition. 303 """ 304 return PartitionConsumer(self._inner.partition_consumer(topic, partition))
Creates a new PartitionConsumer
for the given topic and partition
Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.
306 def topic_producer(self, topic: str) -> TopicProducer: 307 """ 308 Creates a new `TopicProducer` for the given topic name. 309 310 Currently, producers are scoped to a specific Fluvio topic. That means 311 when you send events via a producer, you must specify which partition 312 each event should go to. 313 """ 314 return TopicProducer(self._inner.topic_producer(topic))
Creates a new TopicProducer
for the given topic name.
Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.