fluvio
The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.
This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.
Creating a topic with default settings (1 partition, 1 replication) is as simple as:
fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")
Or just create a topic with custom settings:
import fluvio
fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_partitions(3)
.with_replications(3)
.with_max_partition_size("1Gb")
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)
Producing data to a topic in a Fluvio cluster is as simple as:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
producer = fluvio.topic_producer(topic)
for i in range(10):
producer.send_string("Hello %s " % i)
Consuming is also simple:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
For more examples see the integration tests in the fluvio-python repository.
1""" 2The __fluvio__ python module provides an extension for working with the Fluvio 3streaming platform. 4 5This module builds on top of the Fluvio Client Rust Crate and provides a 6pythonic access to the API. 7 8 9Creating a topic with default settings (1 partition, 1 replication) is as simple as: 10 11```python 12fluvio_admin = FluvioAdmin.connect() 13fluvio_admin.create_topic("a_topic") 14``` 15 16Or just create a topic with custom settings: 17 18```python 19import fluvio 20 21fluvio_admin = FluvioAdmin.connect() 22topic_spec = ( 23 TopicSpec.create() 24 .with_partitions(3) 25 .with_replications(3) 26 .with_max_partition_size("1Gb") 27 .with_retention_time("1h") 28 .with_segment_size("10M") 29 .build() 30) 31fluvio_admin.create_topic("a_topic", topic_spec) 32``` 33 34Producing data to a topic in a Fluvio cluster is as simple as: 35 36```python 37import fluvio 38 39fluvio = Fluvio.connect() 40 41topic = "a_topic" 42producer = fluvio.topic_producer(topic) 43 44for i in range(10): 45 producer.send_string("Hello %s " % i) 46``` 47 48Consuming is also simple: 49 50```python 51import fluvio 52 53fluvio = Fluvio.connect() 54 55topic = "a_topic" 56builder = ConsumerConfigExtBuilder(topic) 57config = builder.build() 58stream = fluvio.consumer_with_config(config) 59 60num_items = 2 61records = [bytearray(next(stream).value()).decode() for _ in range(num_items)] 62``` 63 64For more examples see the integration tests in the fluvio-python repository. 65 66[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py) 67[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py) 68 69""" 70 71import typing 72from enum import Enum 73 74from ._fluvio_python import ( 75 Fluvio as _Fluvio, 76 FluvioConfig as _FluvioConfig, 77 Offset, 78 FluvioAdmin as _FluvioAdmin, 79 # consumer types 80 ConsumerConfig as _ConsumerConfig, 81 ConsumerConfigExt, 82 ConsumerConfigExtBuilder, 83 PartitionConsumer as _PartitionConsumer, 84 MultiplePartitionConsumer as _MultiplePartitionConsumer, 85 PartitionSelectionStrategy as _PartitionSelectionStrategy, 86 PartitionConsumerStream as _PartitionConsumerStream, 87 AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream, 88 # producer types 89 TopicProducer as _TopicProducer, 90 ProduceOutput as _ProduceOutput, 91 ProducerBatchRecord as _ProducerBatchRecord, 92 # admin and misc types 93 SmartModuleKind as _SmartModuleKind, 94 TopicSpec as _TopicSpec, 95 WatchTopicStream as _WatchTopicStream, 96 WatchSmartModuleStream as _WatchSmartModuleStream, 97) 98 99from ._fluvio_python import Error as FluviorError # noqa: F401 100 101from .topic import TopicSpec, CompressionType, TopicMode 102from .record import Record, RecordMetadata 103from .specs import ( 104 # support types 105 CommonCreateRequest, 106 PartitionMap, 107 # specs 108 SmartModuleSpec, 109 MessageMetadataTopicSpec, 110 MetadataPartitionSpec, 111 MetadataSmartModuleSpec, 112 MetadataTopicSpec, 113 MetaUpdateSmartModuleSpec, 114 MetaUpdateTopicSpec, 115) 116 117# this structures the module a bit and allows pydoc to generate better docs 118# with better ordering of types and functions 119# inclusion in __all__, will pull the PyO3 rust inline docs into 120# the pdoc generated documentation 121__all__ = [ 122 # top level types 123 "Fluvio", 124 "FluvioConfig", 125 "FluvioAdmin", 126 # topic 127 "TopicSpec", 128 "CompressionType", 129 "TopicMode", 130 # record 131 "Record", 132 "RecordMetadata", 133 "Offset", 134 # producer 135 "TopicProducer", 136 "ProduceOutput", 137 # consumer 138 "ConsumerConfigExt", 139 "ConsumerConfigExtBuilder", 140 "ConsumerConfig", 141 "PartitionConsumer", 142 "MultiplePartitionConsumer", 143 # specs 144 "CommonCreateRequest", 145 "SmartModuleSpec", 146 "TopicSpec", 147 "PartitionMap", 148 "MessageMetadataTopicSpec", 149 "MetadataPartitionSpec", 150 "MetadataTopicSpec", 151 "MetaUpdateTopicSpec", 152 # Misc 153 "PartitionSelectionStrategy", 154 "SmartModuleKind", 155] 156 157 158class ProduceOutput: 159 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 160 161 _inner: _ProduceOutput 162 163 def __init__(self, inner: _ProduceOutput) -> None: 164 self._inner = inner 165 166 def wait(self) -> typing.Optional[RecordMetadata]: 167 """Wait for the record metadata. 168 169 This is a blocking call and may only return a `RecordMetadata` once. 170 Any subsequent call to `wait` will return a `None` value. 171 Errors will be raised as exceptions of type `FluvioError`. 172 """ 173 res = self._inner.wait() 174 if res is None: 175 return None 176 return RecordMetadata(res) 177 178 async def async_wait(self) -> typing.Optional[RecordMetadata]: 179 """Asynchronously wait for the record metadata. 180 181 This may only return a `RecordMetadata` once. 182 Any subsequent call to `wait` will return a `None` value. 183 """ 184 return await self._inner.async_wait() 185 186 187class SmartModuleKind(Enum): 188 """ 189 Use of this is to explicitly set the kind of a smartmodule. Not required 190 but needed for legacy SmartModules. 191 """ 192 193 Filter = _SmartModuleKind.Filter 194 Map = _SmartModuleKind.Map 195 ArrayMap = _SmartModuleKind.ArrayMap 196 FilterMap = _SmartModuleKind.FilterMap 197 Aggregate = _SmartModuleKind.Aggregate 198 199 200class ConsumerConfig: 201 _inner: _ConsumerConfig 202 203 def __init__(self): 204 self._inner = _ConsumerConfig() 205 206 def disable_continuous(self, val: bool = True): 207 """Disable continuous mode after fetching specified records""" 208 self._inner.disable_continuous(val) 209 210 def smartmodule( 211 self, 212 name: str = None, 213 path: str = None, 214 kind: SmartModuleKind = None, 215 params: typing.Dict[str, str] = None, 216 aggregate: typing.List[bytes] = None, 217 ): 218 """ 219 This is a method for adding a smartmodule to a consumer config either 220 using a `name` of a `SmartModule` or a `path` to a wasm binary. 221 222 Args: 223 224 name: str 225 path: str 226 kind: SmartModuleKind 227 params: Dict[str, str] 228 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 229 230 Raises: 231 "Require either a path or a name for a smartmodule." 232 "Only specify one of path or name not both." 233 234 Returns: 235 236 None 237 """ 238 239 if kind is not None: 240 kind = kind.value 241 242 if path is None and name is None: 243 raise Exception("Require either a path or a name for a smartmodule.") 244 245 if path is not None and name is not None: 246 raise Exception("Only specify one of path or name not both.") 247 248 params = {} if params is None else params 249 param_keys = [x for x in params.keys()] 250 param_vals = [x for x in params.values()] 251 252 self._inner.smartmodule( 253 name, 254 path, 255 kind, 256 param_keys, 257 param_vals, 258 aggregate, 259 # These arguments are for Join stuff but that's not implemented on 260 # the python side yet 261 None, 262 None, 263 None, 264 None, 265 ) 266 267 268class PartitionConsumer: 269 """ 270 An interface for consuming events from a particular partition 271 272 There are two ways to consume events: by "fetching" events and by 273 "streaming" events. Fetching involves specifying a range of events that you 274 want to consume via their Offset. A fetch is a sort of one-time batch 275 operation: you’ll receive all of the events in your range all at once. When 276 you consume events via Streaming, you specify a starting Offset and receive 277 an object that will continuously yield new events as they arrive. 278 """ 279 280 _inner: _PartitionConsumer 281 282 def __init__(self, inner: _PartitionConsumer): 283 self._inner = inner 284 285 def stream(self, offset: Offset) -> typing.Iterator[Record]: 286 """ 287 Continuously streams events from a particular offset in the consumer’s 288 partition. This returns a `Iterator[Record]` which is an 289 iterator. 290 291 Streaming is one of the two ways to consume events in Fluvio. It is a 292 continuous request for new records arriving in a partition, beginning 293 at a particular offset. You specify the starting point of the stream 294 using an Offset and periodically receive events, either individually or 295 in batches. 296 """ 297 return self._generator(self._inner.stream(offset)) 298 299 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 300 """ 301 Continuously streams events from a particular offset in the consumer’s 302 partition. This returns a `AsyncIterator[Record]` which is an 303 iterator. 304 305 Streaming is one of the two ways to consume events in Fluvio. It is a 306 continuous request for new records arriving in a partition, beginning 307 at a particular offset. You specify the starting point of the stream 308 using an Offset and periodically receive events, either individually or 309 in batches. 310 """ 311 return self._async_generator(await self._inner.async_stream(offset)) 312 313 def stream_with_config( 314 self, offset: Offset, config: ConsumerConfig 315 ) -> typing.Iterator[Record]: 316 """ 317 Continuously streams events from a particular offset with a SmartModule 318 WASM module in the consumer’s partition. This returns a 319 `Iterator[Record]` which is an iterator. 320 321 Streaming is one of the two ways to consume events in Fluvio. It is a 322 continuous request for new records arriving in a partition, beginning 323 at a particular offset. You specify the starting point of the stream 324 using an Offset and periodically receive events, either individually or 325 in batches. 326 327 Args: 328 offset: Offset 329 wasm_module_path: str - The absolute path to the WASM file 330 331 Example: 332 import os 333 334 wmp = os.path.abspath("somefilter.wasm") 335 config = ConsumerConfig() 336 config.smartmodule(path=wmp) 337 for i in consumer.stream_with_config(Offset.beginning(), config): 338 # do something with i 339 340 Returns: 341 `Iterator[Record]` 342 343 """ 344 return self._generator(self._inner.stream_with_config(offset, config._inner)) 345 346 async def async_stream_with_config( 347 self, offset: Offset, config: ConsumerConfig 348 ) -> typing.AsyncIterator[Record]: 349 """ 350 Continuously streams events from a particular offset with a SmartModule 351 WASM module in the consumer’s partition. This returns a 352 `AsyncIterator[Record]` which is an async iterator. 353 354 Streaming is one of the two ways to consume events in Fluvio. It is a 355 continuous request for new records arriving in a partition, beginning 356 at a particular offset. You specify the starting point of the stream 357 using an Offset and periodically receive events, either individually or 358 in batches. 359 360 Args: 361 offset: Offset 362 wasm_module_path: str - The absolute path to the WASM file 363 364 Example: 365 import os 366 367 wmp = os.path.abspath("somefilter.wasm") 368 config = ConsumerConfig() 369 config.smartmodule(path=wmp) 370 `AsyncIterator[Record]` 371 372 """ 373 return self._async_generator( 374 await self._inner.async_stream_with_config(offset, config._inner) 375 ) 376 377 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 378 item = stream.next() 379 while item is not None: 380 yield Record(item) 381 item = stream.next() 382 383 async def _async_generator( 384 self, astream: _AsyncPartitionConsumerStream 385 ) -> typing.AsyncIterator[Record]: 386 item = await astream.async_next() 387 while item is not None: 388 yield Record(item) 389 item = await astream.async_next() 390 391 392class MultiplePartitionConsumer: 393 """ 394 An interface for consuming events from multiple partitions 395 396 There are two ways to consume events: by "fetching" events and by 397 "streaming" events. Fetching involves specifying a range of events that you 398 want to consume via their Offset. A fetch is a sort of one-time batch 399 operation: you’ll receive all of the events in your range all at once. When 400 you consume events via Streaming, you specify a starting Offset and receive 401 an object that will continuously yield new events as they arrive. 402 """ 403 404 _inner: _MultiplePartitionConsumer 405 406 def __init__(self, inner: _MultiplePartitionConsumer): 407 self._inner = inner 408 409 def stream(self, offset: Offset) -> typing.Iterator[Record]: 410 """ 411 Continuously streams events from a particular offset in the consumer’s 412 partitions. This returns a `Iterator[Record]` which is an 413 iterator. 414 415 Streaming is one of the two ways to consume events in Fluvio. It is a 416 continuous request for new records arriving in a partition, beginning 417 at a particular offset. You specify the starting point of the stream 418 using an Offset and periodically receive events, either individually or 419 in batches. 420 """ 421 return self._generator(self._inner.stream(offset)) 422 423 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 424 """ 425 Continuously streams events from a particular offset in the consumer’s 426 partitions. This returns a `AsyncIterator[Record]` which is an 427 async iterator. 428 429 Streaming is one of the two ways to consume events in Fluvio. It is a 430 continuous request for new records arriving in a partition, beginning 431 at a particular offset. You specify the starting point of the stream 432 using an Offset and periodically receive events, either individually or 433 in batches. 434 """ 435 return self._async_generator(await self._inner.async_stream(offset)) 436 437 def stream_with_config( 438 self, offset: Offset, config: ConsumerConfig 439 ) -> typing.Iterator[Record]: 440 """ 441 Continuously streams events from a particular offset with a SmartModule 442 WASM module in the consumer’s partitions. This returns a 443 `Iterator[Record]` which is an iterator. 444 445 Streaming is one of the two ways to consume events in Fluvio. It is a 446 continuous request for new records arriving in a partition, beginning 447 at a particular offset. You specify the starting point of the stream 448 using an Offset and periodically receive events, either individually or 449 in batches. 450 451 Args: 452 offset: Offset 453 wasm_module_path: str - The absolute path to the WASM file 454 455 Example: 456 import os 457 458 wmp = os.path.abspath("somefilter.wasm") 459 config = ConsumerConfig() 460 config.smartmodule(path=wmp) 461 for i in consumer.stream_with_config(Offset.beginning(), config): 462 # do something with i 463 464 Returns: 465 `Iterator[Record]` 466 467 """ 468 return self._generator( 469 self._inner.stream_with_config(offset._inner, config._inner) 470 ) 471 472 async def async_stream_with_config( 473 self, offset: Offset, config: ConsumerConfig 474 ) -> typing.AsyncIterator[Record]: 475 """ 476 Continuously streams events from a particular offset with a SmartModule 477 WASM module in the consumer’s partitions. This returns a 478 `AsyncIterator[Record]` which is an async iterator. 479 480 Streaming is one of the two ways to consume events in Fluvio. It is a 481 continuous request for new records arriving in a partition, beginning 482 at a particular offset. You specify the starting point of the stream 483 using an Offset and periodically receive events, either individually or 484 in batches. 485 486 Args: 487 offset: Offset 488 wasm_module_path: str - The absolute path to the WASM file 489 490 Example: 491 import os 492 493 wmp = os.path.abspath("somefilter.wasm") 494 config = ConsumerConfig() 495 config.smartmodule(path=wmp) 496 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 497 # do something with i 498 499 Returns: 500 `AsyncIterator[Record]` 501 502 """ 503 return self._async_generator( 504 await self._inner.async_stream_with_config(offset, config._inner) 505 ) 506 507 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 508 item = stream.next() 509 while item is not None: 510 yield Record(item) 511 item = stream.next() 512 513 async def _async_generator( 514 self, astream: _AsyncPartitionConsumerStream 515 ) -> typing.AsyncIterator[Record]: 516 item = await astream.async_next() 517 while item is not None: 518 yield Record(item) 519 item = await astream.async_next() 520 521 522class TopicProducer: 523 """An interface for producing events to a particular topic. 524 525 A `TopicProducer` allows you to send events to the specific topic it was 526 initialized for. Once you have a `TopicProducer`, you can send events to 527 the topic, choosing which partition each event should be delivered to. 528 """ 529 530 _inner: _TopicProducer 531 532 def __init__(self, inner: _TopicProducer): 533 self._inner = inner 534 535 def send_string(self, buf: str) -> ProduceOutput: 536 """Sends a string to this producer’s topic""" 537 return self.send([], buf.encode("utf-8")) 538 539 async def async_send_string(self, buf: str) -> ProduceOutput: 540 """Sends a string to this producer’s topic""" 541 return await self.async_send([], buf.encode("utf-8")) 542 543 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 544 """ 545 Sends a key/value record to this producer's Topic. 546 547 The partition that the record will be sent to is derived from the Key. 548 """ 549 return ProduceOutput(self._inner.send(key, value)) 550 551 async def async_send( 552 self, key: typing.List[int], value: typing.List[int] 553 ) -> ProduceOutput: 554 """ 555 Async sends a key/value record to this producer's Topic. 556 557 The partition that the record will be sent to is derived from the Key. 558 """ 559 produce_output = await self._inner.async_send(key, value) 560 return ProduceOutput(produce_output) 561 562 def flush(self) -> None: 563 """ 564 Send all the queued records in the producer batches. 565 """ 566 return self._inner.flush() 567 568 async def async_flush(self) -> None: 569 """ 570 Async send all the queued records in the producer batches. 571 """ 572 await self._inner.async_flush() 573 574 def send_all( 575 self, records: typing.List[typing.Tuple[bytes, bytes]] 576 ) -> typing.List[ProduceOutput]: 577 """ 578 Sends a list of key/value records as a batch to this producer's Topic. 579 :param records: The list of records to send 580 """ 581 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 582 return [ 583 ProduceOutput(output_inner) 584 for output_inner in self._inner.send_all(records_inner) 585 ] 586 587 async def async_send_all( 588 self, records: typing.List[typing.Tuple[bytes, bytes]] 589 ) -> typing.List[ProduceOutput]: 590 """ 591 Async sends a list of key/value records as a batch to this producer's Topic. 592 :param records: The list of records to send 593 """ 594 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 595 return [ 596 ProduceOutput(output_inner) 597 for output_inner in await self._inner.async_send_all(records_inner) 598 ] 599 600 601class PartitionSelectionStrategy: 602 """Stragegy to select partitions""" 603 604 _inner: _PartitionSelectionStrategy 605 606 def __init__(self, inner: _FluvioConfig): 607 self._inner = inner 608 609 @classmethod 610 def with_all(cls, topic: str): 611 """select all partitions of one topic""" 612 return cls(_PartitionSelectionStrategy.with_all(topic)) 613 614 @classmethod 615 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 616 """select multiple partitions of multiple topics""" 617 return cls(_PartitionSelectionStrategy.with_multiple(topic)) 618 619 620class FluvioConfig: 621 """Configuration for Fluvio client""" 622 623 _inner: _FluvioConfig 624 625 def __init__(self, inner: _FluvioConfig): 626 self._inner = inner 627 628 @classmethod 629 def load(cls): 630 """get current cluster config from default profile""" 631 return cls(_FluvioConfig.load()) 632 633 @classmethod 634 def new(cls, addr: str): 635 """Create a new cluster configuration with no TLS.""" 636 return cls(_FluvioConfig.new(addr)) 637 638 def set_endpoint(self, endpoint: str): 639 """set endpoint""" 640 self._inner.set_endpoint(endpoint) 641 642 def set_use_spu_local_address(self, val: bool): 643 """set wheather to use spu local address""" 644 self._inner.set_use_spu_local_address(val) 645 646 def disable_tls(self): 647 """disable tls for this config""" 648 self._inner.disable_tls() 649 650 def set_anonymous_tls(self): 651 """set the config to use anonymous tls""" 652 self._inner.set_anonymous_tls() 653 654 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 655 """specify inline tls parameters""" 656 self._inner.set_inline_tls(domain, key, cert, ca_cert) 657 658 def set_tls_file_paths( 659 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 660 ): 661 """specify paths to tls files""" 662 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 663 664 def set_client_id(self, client_id: str): 665 """set client id""" 666 self._inner.set_client_id(client_id) 667 668 def unset_client_id(self): 669 """remove the configured client id from config""" 670 self._inner.unset_client_id() 671 672 673class Fluvio: 674 """An interface for interacting with Fluvio streaming.""" 675 676 _inner: _Fluvio 677 678 def __init__(self, inner: _Fluvio): 679 self._inner = inner 680 681 @classmethod 682 def connect(cls): 683 """Tries to create a new Fluvio client using the current profile from 684 `~/.fluvio/config` 685 """ 686 return cls(_Fluvio.connect()) 687 688 @classmethod 689 def connect_with_config(cls, config: FluvioConfig): 690 """Creates a new Fluvio client using the given configuration""" 691 return cls(_Fluvio.connect_with_config(config._inner)) 692 693 def consumer_with_config( 694 self, config: ConsumerConfigExt 695 ) -> typing.Iterator[Record]: 696 """Creates consumer with settings defined in config 697 698 This is the recommended way to create a consume records. 699 """ 700 return self._generator(self._inner.consumer_with_config(config)) 701 702 def topic_producer(self, topic: str) -> TopicProducer: 703 """ 704 Creates a new `TopicProducer` for the given topic name. 705 706 Currently, producers are scoped to a specific Fluvio topic. That means 707 when you send events via a producer, you must specify which partition 708 each event should go to. 709 """ 710 return TopicProducer(self._inner.topic_producer(topic)) 711 712 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 713 """Creates a new `PartitionConsumer` for the given topic and partition 714 715 Currently, consumers are scoped to both a specific Fluvio topic and to 716 a particular partition within that topic. That means that if you have a 717 topic with multiple partitions, then in order to receive all of the 718 events in all of the partitions, you will need to create one consumer 719 per partition. 720 """ 721 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 722 723 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 724 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 725 726 Currently, consumers are scoped to both a specific Fluvio topic and to 727 its all partitions within that topic. 728 """ 729 strategy = PartitionSelectionStrategy.with_all(topic) 730 return MultiplePartitionConsumer( 731 self._inner.multi_partition_consumer(strategy._inner) 732 ) 733 734 def multi_topic_partition_consumer( 735 self, selections: typing.List[typing.Tuple[str, int]] 736 ) -> MultiplePartitionConsumer: 737 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 738 739 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 740 """ 741 strategy = PartitionSelectionStrategy.with_multiple(selections) 742 return MultiplePartitionConsumer( 743 self._inner.multi_partition_consumer(strategy._inner) 744 ) 745 746 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 747 item = stream.next() 748 while item is not None: 749 yield Record(item) 750 item = stream.next() 751 752 753class FluvioAdmin: 754 _inner: _FluvioAdmin 755 756 def __init__(self, inner: _FluvioAdmin): 757 self._inner = inner 758 759 def connect(): 760 return FluvioAdmin(_FluvioAdmin.connect()) 761 762 def connect_with_config(config: FluvioConfig): 763 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 764 765 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 766 partitions = 1 767 replication = 1 768 ignore_rack = True 769 dry_run = False 770 spec = ( 771 spec 772 if spec is not None 773 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 774 ) 775 return self._inner.create_topic(topic, dry_run, spec) 776 777 def create_topic_with_config( 778 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 779 ): 780 return self._inner.create_topic_with_config(topic, req._inner, spec) 781 782 def delete_topic(self, topic: str): 783 return self._inner.delete_topic(topic) 784 785 def all_topics(self) -> typing.List[MetadataTopicSpec]: 786 return self._inner.all_topics() 787 788 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 789 return self._inner.list_topics(filters) 790 791 def list_topics_with_params( 792 self, filters: typing.List[str], summary: bool 793 ) -> typing.List[MetadataTopicSpec]: 794 return self._inner.list_topics_with_params(filters, summary) 795 796 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 797 return self._topic_spec_generator(self._inner.watch_topic()) 798 799 def create_smartmodule(self, name: str, path: str, dry_run: bool): 800 spec = SmartModuleSpec.new(path) 801 return self._inner.create_smart_module(name, dry_run, spec._inner) 802 803 def delete_smartmodule(self, name: str): 804 return self._inner.delete_smart_module(name) 805 806 def list_smartmodules( 807 self, filters: typing.List[str] 808 ) -> typing.List[MetadataSmartModuleSpec]: 809 return self._inner.list_smart_modules(filters) 810 811 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 812 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 813 814 def list_partitions( 815 self, filters: typing.List[str] 816 ) -> typing.List[MetadataPartitionSpec]: 817 return self._inner.list_partitions(filters) 818 819 def _topic_spec_generator( 820 self, stream: _WatchTopicStream 821 ) -> typing.Iterator[MetaUpdateTopicSpec]: 822 item = stream.next().inner() 823 while item is not None: 824 yield MetaUpdateTopicSpec(item) 825 item = stream.next().inner() 826 827 def _smart_module_spec_generator( 828 self, stream: _WatchSmartModuleStream 829 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 830 item = stream.next().inner() 831 while item is not None: 832 yield MetaUpdateSmartModuleSpec(item) 833 item = stream.next().inner()
674class Fluvio: 675 """An interface for interacting with Fluvio streaming.""" 676 677 _inner: _Fluvio 678 679 def __init__(self, inner: _Fluvio): 680 self._inner = inner 681 682 @classmethod 683 def connect(cls): 684 """Tries to create a new Fluvio client using the current profile from 685 `~/.fluvio/config` 686 """ 687 return cls(_Fluvio.connect()) 688 689 @classmethod 690 def connect_with_config(cls, config: FluvioConfig): 691 """Creates a new Fluvio client using the given configuration""" 692 return cls(_Fluvio.connect_with_config(config._inner)) 693 694 def consumer_with_config( 695 self, config: ConsumerConfigExt 696 ) -> typing.Iterator[Record]: 697 """Creates consumer with settings defined in config 698 699 This is the recommended way to create a consume records. 700 """ 701 return self._generator(self._inner.consumer_with_config(config)) 702 703 def topic_producer(self, topic: str) -> TopicProducer: 704 """ 705 Creates a new `TopicProducer` for the given topic name. 706 707 Currently, producers are scoped to a specific Fluvio topic. That means 708 when you send events via a producer, you must specify which partition 709 each event should go to. 710 """ 711 return TopicProducer(self._inner.topic_producer(topic)) 712 713 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 714 """Creates a new `PartitionConsumer` for the given topic and partition 715 716 Currently, consumers are scoped to both a specific Fluvio topic and to 717 a particular partition within that topic. That means that if you have a 718 topic with multiple partitions, then in order to receive all of the 719 events in all of the partitions, you will need to create one consumer 720 per partition. 721 """ 722 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 723 724 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 725 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 726 727 Currently, consumers are scoped to both a specific Fluvio topic and to 728 its all partitions within that topic. 729 """ 730 strategy = PartitionSelectionStrategy.with_all(topic) 731 return MultiplePartitionConsumer( 732 self._inner.multi_partition_consumer(strategy._inner) 733 ) 734 735 def multi_topic_partition_consumer( 736 self, selections: typing.List[typing.Tuple[str, int]] 737 ) -> MultiplePartitionConsumer: 738 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 739 740 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 741 """ 742 strategy = PartitionSelectionStrategy.with_multiple(selections) 743 return MultiplePartitionConsumer( 744 self._inner.multi_partition_consumer(strategy._inner) 745 ) 746 747 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 748 item = stream.next() 749 while item is not None: 750 yield Record(item) 751 item = stream.next()
An interface for interacting with Fluvio streaming.
682 @classmethod 683 def connect(cls): 684 """Tries to create a new Fluvio client using the current profile from 685 `~/.fluvio/config` 686 """ 687 return cls(_Fluvio.connect())
Tries to create a new Fluvio client using the current profile from
~/.fluvio/config
689 @classmethod 690 def connect_with_config(cls, config: FluvioConfig): 691 """Creates a new Fluvio client using the given configuration""" 692 return cls(_Fluvio.connect_with_config(config._inner))
Creates a new Fluvio client using the given configuration
694 def consumer_with_config( 695 self, config: ConsumerConfigExt 696 ) -> typing.Iterator[Record]: 697 """Creates consumer with settings defined in config 698 699 This is the recommended way to create a consume records. 700 """ 701 return self._generator(self._inner.consumer_with_config(config))
Creates consumer with settings defined in config
This is the recommended way to create a consume records.
703 def topic_producer(self, topic: str) -> TopicProducer: 704 """ 705 Creates a new `TopicProducer` for the given topic name. 706 707 Currently, producers are scoped to a specific Fluvio topic. That means 708 when you send events via a producer, you must specify which partition 709 each event should go to. 710 """ 711 return TopicProducer(self._inner.topic_producer(topic))
Creates a new TopicProducer
for the given topic name.
Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.
713 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 714 """Creates a new `PartitionConsumer` for the given topic and partition 715 716 Currently, consumers are scoped to both a specific Fluvio topic and to 717 a particular partition within that topic. That means that if you have a 718 topic with multiple partitions, then in order to receive all of the 719 events in all of the partitions, you will need to create one consumer 720 per partition. 721 """ 722 return PartitionConsumer(self._inner.partition_consumer(topic, partition))
Creates a new PartitionConsumer
for the given topic and partition
Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.
724 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 725 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 726 727 Currently, consumers are scoped to both a specific Fluvio topic and to 728 its all partitions within that topic. 729 """ 730 strategy = PartitionSelectionStrategy.with_all(topic) 731 return MultiplePartitionConsumer( 732 self._inner.multi_partition_consumer(strategy._inner) 733 )
Creates a new MultiplePartitionConsumer
for the given topic and its all partitions
Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.
735 def multi_topic_partition_consumer( 736 self, selections: typing.List[typing.Tuple[str, int]] 737 ) -> MultiplePartitionConsumer: 738 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 739 740 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 741 """ 742 strategy = PartitionSelectionStrategy.with_multiple(selections) 743 return MultiplePartitionConsumer( 744 self._inner.multi_partition_consumer(strategy._inner) 745 )
Creates a new MultiplePartitionConsumer
for the given topics and partitions
Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
621class FluvioConfig: 622 """Configuration for Fluvio client""" 623 624 _inner: _FluvioConfig 625 626 def __init__(self, inner: _FluvioConfig): 627 self._inner = inner 628 629 @classmethod 630 def load(cls): 631 """get current cluster config from default profile""" 632 return cls(_FluvioConfig.load()) 633 634 @classmethod 635 def new(cls, addr: str): 636 """Create a new cluster configuration with no TLS.""" 637 return cls(_FluvioConfig.new(addr)) 638 639 def set_endpoint(self, endpoint: str): 640 """set endpoint""" 641 self._inner.set_endpoint(endpoint) 642 643 def set_use_spu_local_address(self, val: bool): 644 """set wheather to use spu local address""" 645 self._inner.set_use_spu_local_address(val) 646 647 def disable_tls(self): 648 """disable tls for this config""" 649 self._inner.disable_tls() 650 651 def set_anonymous_tls(self): 652 """set the config to use anonymous tls""" 653 self._inner.set_anonymous_tls() 654 655 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 656 """specify inline tls parameters""" 657 self._inner.set_inline_tls(domain, key, cert, ca_cert) 658 659 def set_tls_file_paths( 660 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 661 ): 662 """specify paths to tls files""" 663 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 664 665 def set_client_id(self, client_id: str): 666 """set client id""" 667 self._inner.set_client_id(client_id) 668 669 def unset_client_id(self): 670 """remove the configured client id from config""" 671 self._inner.unset_client_id()
Configuration for Fluvio client
629 @classmethod 630 def load(cls): 631 """get current cluster config from default profile""" 632 return cls(_FluvioConfig.load())
get current cluster config from default profile
634 @classmethod 635 def new(cls, addr: str): 636 """Create a new cluster configuration with no TLS.""" 637 return cls(_FluvioConfig.new(addr))
Create a new cluster configuration with no TLS.
639 def set_endpoint(self, endpoint: str): 640 """set endpoint""" 641 self._inner.set_endpoint(endpoint)
set endpoint
643 def set_use_spu_local_address(self, val: bool): 644 """set wheather to use spu local address""" 645 self._inner.set_use_spu_local_address(val)
set wheather to use spu local address
651 def set_anonymous_tls(self): 652 """set the config to use anonymous tls""" 653 self._inner.set_anonymous_tls()
set the config to use anonymous tls
655 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 656 """specify inline tls parameters""" 657 self._inner.set_inline_tls(domain, key, cert, ca_cert)
specify inline tls parameters
659 def set_tls_file_paths( 660 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 661 ): 662 """specify paths to tls files""" 663 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
specify paths to tls files
754class FluvioAdmin: 755 _inner: _FluvioAdmin 756 757 def __init__(self, inner: _FluvioAdmin): 758 self._inner = inner 759 760 def connect(): 761 return FluvioAdmin(_FluvioAdmin.connect()) 762 763 def connect_with_config(config: FluvioConfig): 764 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 765 766 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 767 partitions = 1 768 replication = 1 769 ignore_rack = True 770 dry_run = False 771 spec = ( 772 spec 773 if spec is not None 774 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 775 ) 776 return self._inner.create_topic(topic, dry_run, spec) 777 778 def create_topic_with_config( 779 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 780 ): 781 return self._inner.create_topic_with_config(topic, req._inner, spec) 782 783 def delete_topic(self, topic: str): 784 return self._inner.delete_topic(topic) 785 786 def all_topics(self) -> typing.List[MetadataTopicSpec]: 787 return self._inner.all_topics() 788 789 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 790 return self._inner.list_topics(filters) 791 792 def list_topics_with_params( 793 self, filters: typing.List[str], summary: bool 794 ) -> typing.List[MetadataTopicSpec]: 795 return self._inner.list_topics_with_params(filters, summary) 796 797 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 798 return self._topic_spec_generator(self._inner.watch_topic()) 799 800 def create_smartmodule(self, name: str, path: str, dry_run: bool): 801 spec = SmartModuleSpec.new(path) 802 return self._inner.create_smart_module(name, dry_run, spec._inner) 803 804 def delete_smartmodule(self, name: str): 805 return self._inner.delete_smart_module(name) 806 807 def list_smartmodules( 808 self, filters: typing.List[str] 809 ) -> typing.List[MetadataSmartModuleSpec]: 810 return self._inner.list_smart_modules(filters) 811 812 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 813 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 814 815 def list_partitions( 816 self, filters: typing.List[str] 817 ) -> typing.List[MetadataPartitionSpec]: 818 return self._inner.list_partitions(filters) 819 820 def _topic_spec_generator( 821 self, stream: _WatchTopicStream 822 ) -> typing.Iterator[MetaUpdateTopicSpec]: 823 item = stream.next().inner() 824 while item is not None: 825 yield MetaUpdateTopicSpec(item) 826 item = stream.next().inner() 827 828 def _smart_module_spec_generator( 829 self, stream: _WatchSmartModuleStream 830 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 831 item = stream.next().inner() 832 while item is not None: 833 yield MetaUpdateSmartModuleSpec(item) 834 item = stream.next().inner()
766 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 767 partitions = 1 768 replication = 1 769 ignore_rack = True 770 dry_run = False 771 spec = ( 772 spec 773 if spec is not None 774 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 775 ) 776 return self._inner.create_topic(topic, dry_run, spec)
26@dataclass(frozen=True) 27class TopicSpec: 28 mode: TopicMode = TopicMode.COMPUTED 29 partitions: int = 1 30 replications: int = 1 31 ignore_rack: bool = True 32 replica_assignment: Optional[List[PartitionMap]] = None 33 retention_time: Optional[int] = None 34 segment_size: Optional[int] = None 35 compression_type: Optional[CompressionType] = None 36 max_partition_size: Optional[int] = None 37 system: bool = False 38 39 @classmethod 40 def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True): 41 return cls(_TopicSpec.new_computed(partitions, replication, ignore)) 42 43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls() 47 48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 ) 53 54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR) 57 58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions) 63 64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications) 69 70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore) 73 74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression) 77 78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time) 86 87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size) 94 95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size) 102 103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system) 106 107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls()
Alternative constructor method
48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 )
Set the assigned replica configuration
54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR)
Set as a mirror topic
58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions)
Set the specified partitions
64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications)
Set the specified replication factor
70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore)
Set the rack ignore setting
74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression)
Set the specified compression type
78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time)
Set the specified retention time
87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size)
Set the specified segment size
95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size)
Set the specified max partition size
103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system)
Set the topic as an internal system topic
107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
Build the TopicSpec based on the current configuration
17class CompressionType(Enum): 18 NONE = "none" 19 GZIP = "gzip" 20 SNAPPY = "snappy" 21 LZ4 = "lz4" 22 ANY = "any" 23 ZSTD = "zstd"
Inherited Members
- enum.Enum
- name
- value
Inherited Members
- enum.Enum
- name
- value
9class Record: 10 """The individual record for a given stream.""" 11 12 _inner: _Record 13 14 def __init__(self, inner: _Record): 15 self._inner = inner 16 17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset() 20 21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value() 24 25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string() 28 29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key() 32 33 def key_string(self) -> str: 34 """The UTF-8 decoded key for this record.""" 35 return self._inner.key_string() 36 37 def timestamp(self) -> int: 38 """Timestamp of this record.""" 39 return self._inner.timestamp()
The individual record for a given stream.
17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset()
The offset from the initial offset for a given stream.
21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value()
Returns the contents of this Record's value
25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string()
The UTF-8 decoded value for this record.
29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key()
Returns the contents of this Record's key, if it exists
42class RecordMetadata: 43 """Metadata of a record send to a topic.""" 44 45 _inner: _RecordMetadata 46 47 def __init__(self, inner: _RecordMetadata): 48 self._inner = inner 49 50 def offset(self) -> int: 51 """Return the offset of the sent record in the topic/partition.""" 52 return self._inner.offset() 53 54 def partition_id(self) -> int: 55 """Return the partition index the record was sent to.""" 56 return self._inner.partition_id()
Metadata of a record send to a topic.
Describes the location of a record stored in a Fluvio partition.
A topic is composed of one or more partitions.
523class TopicProducer: 524 """An interface for producing events to a particular topic. 525 526 A `TopicProducer` allows you to send events to the specific topic it was 527 initialized for. Once you have a `TopicProducer`, you can send events to 528 the topic, choosing which partition each event should be delivered to. 529 """ 530 531 _inner: _TopicProducer 532 533 def __init__(self, inner: _TopicProducer): 534 self._inner = inner 535 536 def send_string(self, buf: str) -> ProduceOutput: 537 """Sends a string to this producer’s topic""" 538 return self.send([], buf.encode("utf-8")) 539 540 async def async_send_string(self, buf: str) -> ProduceOutput: 541 """Sends a string to this producer’s topic""" 542 return await self.async_send([], buf.encode("utf-8")) 543 544 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 545 """ 546 Sends a key/value record to this producer's Topic. 547 548 The partition that the record will be sent to is derived from the Key. 549 """ 550 return ProduceOutput(self._inner.send(key, value)) 551 552 async def async_send( 553 self, key: typing.List[int], value: typing.List[int] 554 ) -> ProduceOutput: 555 """ 556 Async sends a key/value record to this producer's Topic. 557 558 The partition that the record will be sent to is derived from the Key. 559 """ 560 produce_output = await self._inner.async_send(key, value) 561 return ProduceOutput(produce_output) 562 563 def flush(self) -> None: 564 """ 565 Send all the queued records in the producer batches. 566 """ 567 return self._inner.flush() 568 569 async def async_flush(self) -> None: 570 """ 571 Async send all the queued records in the producer batches. 572 """ 573 await self._inner.async_flush() 574 575 def send_all( 576 self, records: typing.List[typing.Tuple[bytes, bytes]] 577 ) -> typing.List[ProduceOutput]: 578 """ 579 Sends a list of key/value records as a batch to this producer's Topic. 580 :param records: The list of records to send 581 """ 582 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 583 return [ 584 ProduceOutput(output_inner) 585 for output_inner in self._inner.send_all(records_inner) 586 ] 587 588 async def async_send_all( 589 self, records: typing.List[typing.Tuple[bytes, bytes]] 590 ) -> typing.List[ProduceOutput]: 591 """ 592 Async sends a list of key/value records as a batch to this producer's Topic. 593 :param records: The list of records to send 594 """ 595 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 596 return [ 597 ProduceOutput(output_inner) 598 for output_inner in await self._inner.async_send_all(records_inner) 599 ]
An interface for producing events to a particular topic.
A TopicProducer
allows you to send events to the specific topic it was
initialized for. Once you have a TopicProducer
, you can send events to
the topic, choosing which partition each event should be delivered to.
536 def send_string(self, buf: str) -> ProduceOutput: 537 """Sends a string to this producer’s topic""" 538 return self.send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
540 async def async_send_string(self, buf: str) -> ProduceOutput: 541 """Sends a string to this producer’s topic""" 542 return await self.async_send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
544 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 545 """ 546 Sends a key/value record to this producer's Topic. 547 548 The partition that the record will be sent to is derived from the Key. 549 """ 550 return ProduceOutput(self._inner.send(key, value))
Sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
552 async def async_send( 553 self, key: typing.List[int], value: typing.List[int] 554 ) -> ProduceOutput: 555 """ 556 Async sends a key/value record to this producer's Topic. 557 558 The partition that the record will be sent to is derived from the Key. 559 """ 560 produce_output = await self._inner.async_send(key, value) 561 return ProduceOutput(produce_output)
Async sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
563 def flush(self) -> None: 564 """ 565 Send all the queued records in the producer batches. 566 """ 567 return self._inner.flush()
Send all the queued records in the producer batches.
569 async def async_flush(self) -> None: 570 """ 571 Async send all the queued records in the producer batches. 572 """ 573 await self._inner.async_flush()
Async send all the queued records in the producer batches.
575 def send_all( 576 self, records: typing.List[typing.Tuple[bytes, bytes]] 577 ) -> typing.List[ProduceOutput]: 578 """ 579 Sends a list of key/value records as a batch to this producer's Topic. 580 :param records: The list of records to send 581 """ 582 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 583 return [ 584 ProduceOutput(output_inner) 585 for output_inner in self._inner.send_all(records_inner) 586 ]
Sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
588 async def async_send_all( 589 self, records: typing.List[typing.Tuple[bytes, bytes]] 590 ) -> typing.List[ProduceOutput]: 591 """ 592 Async sends a list of key/value records as a batch to this producer's Topic. 593 :param records: The list of records to send 594 """ 595 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 596 return [ 597 ProduceOutput(output_inner) 598 for output_inner in await self._inner.async_send_all(records_inner) 599 ]
Async sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
159class ProduceOutput: 160 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 161 162 _inner: _ProduceOutput 163 164 def __init__(self, inner: _ProduceOutput) -> None: 165 self._inner = inner 166 167 def wait(self) -> typing.Optional[RecordMetadata]: 168 """Wait for the record metadata. 169 170 This is a blocking call and may only return a `RecordMetadata` once. 171 Any subsequent call to `wait` will return a `None` value. 172 Errors will be raised as exceptions of type `FluvioError`. 173 """ 174 res = self._inner.wait() 175 if res is None: 176 return None 177 return RecordMetadata(res) 178 179 async def async_wait(self) -> typing.Optional[RecordMetadata]: 180 """Asynchronously wait for the record metadata. 181 182 This may only return a `RecordMetadata` once. 183 Any subsequent call to `wait` will return a `None` value. 184 """ 185 return await self._inner.async_wait()
Returned by of TopicProducer.send
call allowing access to sent record metadata.
167 def wait(self) -> typing.Optional[RecordMetadata]: 168 """Wait for the record metadata. 169 170 This is a blocking call and may only return a `RecordMetadata` once. 171 Any subsequent call to `wait` will return a `None` value. 172 Errors will be raised as exceptions of type `FluvioError`. 173 """ 174 res = self._inner.wait() 175 if res is None: 176 return None 177 return RecordMetadata(res)
Wait for the record metadata.
This is a blocking call and may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
Errors will be raised as exceptions of type FluvioError
.
179 async def async_wait(self) -> typing.Optional[RecordMetadata]: 180 """Asynchronously wait for the record metadata. 181 182 This may only return a `RecordMetadata` once. 183 Any subsequent call to `wait` will return a `None` value. 184 """ 185 return await self._inner.async_wait()
Asynchronously wait for the record metadata.
This may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
201class ConsumerConfig: 202 _inner: _ConsumerConfig 203 204 def __init__(self): 205 self._inner = _ConsumerConfig() 206 207 def disable_continuous(self, val: bool = True): 208 """Disable continuous mode after fetching specified records""" 209 self._inner.disable_continuous(val) 210 211 def smartmodule( 212 self, 213 name: str = None, 214 path: str = None, 215 kind: SmartModuleKind = None, 216 params: typing.Dict[str, str] = None, 217 aggregate: typing.List[bytes] = None, 218 ): 219 """ 220 This is a method for adding a smartmodule to a consumer config either 221 using a `name` of a `SmartModule` or a `path` to a wasm binary. 222 223 Args: 224 225 name: str 226 path: str 227 kind: SmartModuleKind 228 params: Dict[str, str] 229 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 230 231 Raises: 232 "Require either a path or a name for a smartmodule." 233 "Only specify one of path or name not both." 234 235 Returns: 236 237 None 238 """ 239 240 if kind is not None: 241 kind = kind.value 242 243 if path is None and name is None: 244 raise Exception("Require either a path or a name for a smartmodule.") 245 246 if path is not None and name is not None: 247 raise Exception("Only specify one of path or name not both.") 248 249 params = {} if params is None else params 250 param_keys = [x for x in params.keys()] 251 param_vals = [x for x in params.values()] 252 253 self._inner.smartmodule( 254 name, 255 path, 256 kind, 257 param_keys, 258 param_vals, 259 aggregate, 260 # These arguments are for Join stuff but that's not implemented on 261 # the python side yet 262 None, 263 None, 264 None, 265 None, 266 )
207 def disable_continuous(self, val: bool = True): 208 """Disable continuous mode after fetching specified records""" 209 self._inner.disable_continuous(val)
Disable continuous mode after fetching specified records
211 def smartmodule( 212 self, 213 name: str = None, 214 path: str = None, 215 kind: SmartModuleKind = None, 216 params: typing.Dict[str, str] = None, 217 aggregate: typing.List[bytes] = None, 218 ): 219 """ 220 This is a method for adding a smartmodule to a consumer config either 221 using a `name` of a `SmartModule` or a `path` to a wasm binary. 222 223 Args: 224 225 name: str 226 path: str 227 kind: SmartModuleKind 228 params: Dict[str, str] 229 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 230 231 Raises: 232 "Require either a path or a name for a smartmodule." 233 "Only specify one of path or name not both." 234 235 Returns: 236 237 None 238 """ 239 240 if kind is not None: 241 kind = kind.value 242 243 if path is None and name is None: 244 raise Exception("Require either a path or a name for a smartmodule.") 245 246 if path is not None and name is not None: 247 raise Exception("Only specify one of path or name not both.") 248 249 params = {} if params is None else params 250 param_keys = [x for x in params.keys()] 251 param_vals = [x for x in params.values()] 252 253 self._inner.smartmodule( 254 name, 255 path, 256 kind, 257 param_keys, 258 param_vals, 259 aggregate, 260 # These arguments are for Join stuff but that's not implemented on 261 # the python side yet 262 None, 263 None, 264 None, 265 None, 266 )
This is a method for adding a smartmodule to a consumer config either
using a name
of a SmartModule
or a path
to a wasm binary.
Args:
name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."
Returns:
None
269class PartitionConsumer: 270 """ 271 An interface for consuming events from a particular partition 272 273 There are two ways to consume events: by "fetching" events and by 274 "streaming" events. Fetching involves specifying a range of events that you 275 want to consume via their Offset. A fetch is a sort of one-time batch 276 operation: you’ll receive all of the events in your range all at once. When 277 you consume events via Streaming, you specify a starting Offset and receive 278 an object that will continuously yield new events as they arrive. 279 """ 280 281 _inner: _PartitionConsumer 282 283 def __init__(self, inner: _PartitionConsumer): 284 self._inner = inner 285 286 def stream(self, offset: Offset) -> typing.Iterator[Record]: 287 """ 288 Continuously streams events from a particular offset in the consumer’s 289 partition. This returns a `Iterator[Record]` which is an 290 iterator. 291 292 Streaming is one of the two ways to consume events in Fluvio. It is a 293 continuous request for new records arriving in a partition, beginning 294 at a particular offset. You specify the starting point of the stream 295 using an Offset and periodically receive events, either individually or 296 in batches. 297 """ 298 return self._generator(self._inner.stream(offset)) 299 300 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 301 """ 302 Continuously streams events from a particular offset in the consumer’s 303 partition. This returns a `AsyncIterator[Record]` which is an 304 iterator. 305 306 Streaming is one of the two ways to consume events in Fluvio. It is a 307 continuous request for new records arriving in a partition, beginning 308 at a particular offset. You specify the starting point of the stream 309 using an Offset and periodically receive events, either individually or 310 in batches. 311 """ 312 return self._async_generator(await self._inner.async_stream(offset)) 313 314 def stream_with_config( 315 self, offset: Offset, config: ConsumerConfig 316 ) -> typing.Iterator[Record]: 317 """ 318 Continuously streams events from a particular offset with a SmartModule 319 WASM module in the consumer’s partition. This returns a 320 `Iterator[Record]` which is an iterator. 321 322 Streaming is one of the two ways to consume events in Fluvio. It is a 323 continuous request for new records arriving in a partition, beginning 324 at a particular offset. You specify the starting point of the stream 325 using an Offset and periodically receive events, either individually or 326 in batches. 327 328 Args: 329 offset: Offset 330 wasm_module_path: str - The absolute path to the WASM file 331 332 Example: 333 import os 334 335 wmp = os.path.abspath("somefilter.wasm") 336 config = ConsumerConfig() 337 config.smartmodule(path=wmp) 338 for i in consumer.stream_with_config(Offset.beginning(), config): 339 # do something with i 340 341 Returns: 342 `Iterator[Record]` 343 344 """ 345 return self._generator(self._inner.stream_with_config(offset, config._inner)) 346 347 async def async_stream_with_config( 348 self, offset: Offset, config: ConsumerConfig 349 ) -> typing.AsyncIterator[Record]: 350 """ 351 Continuously streams events from a particular offset with a SmartModule 352 WASM module in the consumer’s partition. This returns a 353 `AsyncIterator[Record]` which is an async iterator. 354 355 Streaming is one of the two ways to consume events in Fluvio. It is a 356 continuous request for new records arriving in a partition, beginning 357 at a particular offset. You specify the starting point of the stream 358 using an Offset and periodically receive events, either individually or 359 in batches. 360 361 Args: 362 offset: Offset 363 wasm_module_path: str - The absolute path to the WASM file 364 365 Example: 366 import os 367 368 wmp = os.path.abspath("somefilter.wasm") 369 config = ConsumerConfig() 370 config.smartmodule(path=wmp) 371 `AsyncIterator[Record]` 372 373 """ 374 return self._async_generator( 375 await self._inner.async_stream_with_config(offset, config._inner) 376 ) 377 378 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 379 item = stream.next() 380 while item is not None: 381 yield Record(item) 382 item = stream.next() 383 384 async def _async_generator( 385 self, astream: _AsyncPartitionConsumerStream 386 ) -> typing.AsyncIterator[Record]: 387 item = await astream.async_next() 388 while item is not None: 389 yield Record(item) 390 item = await astream.async_next()
An interface for consuming events from a particular partition
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
286 def stream(self, offset: Offset) -> typing.Iterator[Record]: 287 """ 288 Continuously streams events from a particular offset in the consumer’s 289 partition. This returns a `Iterator[Record]` which is an 290 iterator. 291 292 Streaming is one of the two ways to consume events in Fluvio. It is a 293 continuous request for new records arriving in a partition, beginning 294 at a particular offset. You specify the starting point of the stream 295 using an Offset and periodically receive events, either individually or 296 in batches. 297 """ 298 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
300 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 301 """ 302 Continuously streams events from a particular offset in the consumer’s 303 partition. This returns a `AsyncIterator[Record]` which is an 304 iterator. 305 306 Streaming is one of the two ways to consume events in Fluvio. It is a 307 continuous request for new records arriving in a partition, beginning 308 at a particular offset. You specify the starting point of the stream 309 using an Offset and periodically receive events, either individually or 310 in batches. 311 """ 312 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a AsyncIterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
314 def stream_with_config( 315 self, offset: Offset, config: ConsumerConfig 316 ) -> typing.Iterator[Record]: 317 """ 318 Continuously streams events from a particular offset with a SmartModule 319 WASM module in the consumer’s partition. This returns a 320 `Iterator[Record]` which is an iterator. 321 322 Streaming is one of the two ways to consume events in Fluvio. It is a 323 continuous request for new records arriving in a partition, beginning 324 at a particular offset. You specify the starting point of the stream 325 using an Offset and periodically receive events, either individually or 326 in batches. 327 328 Args: 329 offset: Offset 330 wasm_module_path: str - The absolute path to the WASM file 331 332 Example: 333 import os 334 335 wmp = os.path.abspath("somefilter.wasm") 336 config = ConsumerConfig() 337 config.smartmodule(path=wmp) 338 for i in consumer.stream_with_config(Offset.beginning(), config): 339 # do something with i 340 341 Returns: 342 `Iterator[Record]` 343 344 """ 345 return self._generator(self._inner.stream_with_config(offset, config._inner))
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
347 async def async_stream_with_config( 348 self, offset: Offset, config: ConsumerConfig 349 ) -> typing.AsyncIterator[Record]: 350 """ 351 Continuously streams events from a particular offset with a SmartModule 352 WASM module in the consumer’s partition. This returns a 353 `AsyncIterator[Record]` which is an async iterator. 354 355 Streaming is one of the two ways to consume events in Fluvio. It is a 356 continuous request for new records arriving in a partition, beginning 357 at a particular offset. You specify the starting point of the stream 358 using an Offset and periodically receive events, either individually or 359 in batches. 360 361 Args: 362 offset: Offset 363 wasm_module_path: str - The absolute path to the WASM file 364 365 Example: 366 import os 367 368 wmp = os.path.abspath("somefilter.wasm") 369 config = ConsumerConfig() 370 config.smartmodule(path=wmp) 371 `AsyncIterator[Record]` 372 373 """ 374 return self._async_generator( 375 await self._inner.async_stream_with_config(offset, config._inner) 376 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
393class MultiplePartitionConsumer: 394 """ 395 An interface for consuming events from multiple partitions 396 397 There are two ways to consume events: by "fetching" events and by 398 "streaming" events. Fetching involves specifying a range of events that you 399 want to consume via their Offset. A fetch is a sort of one-time batch 400 operation: you’ll receive all of the events in your range all at once. When 401 you consume events via Streaming, you specify a starting Offset and receive 402 an object that will continuously yield new events as they arrive. 403 """ 404 405 _inner: _MultiplePartitionConsumer 406 407 def __init__(self, inner: _MultiplePartitionConsumer): 408 self._inner = inner 409 410 def stream(self, offset: Offset) -> typing.Iterator[Record]: 411 """ 412 Continuously streams events from a particular offset in the consumer’s 413 partitions. This returns a `Iterator[Record]` which is an 414 iterator. 415 416 Streaming is one of the two ways to consume events in Fluvio. It is a 417 continuous request for new records arriving in a partition, beginning 418 at a particular offset. You specify the starting point of the stream 419 using an Offset and periodically receive events, either individually or 420 in batches. 421 """ 422 return self._generator(self._inner.stream(offset)) 423 424 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 425 """ 426 Continuously streams events from a particular offset in the consumer’s 427 partitions. This returns a `AsyncIterator[Record]` which is an 428 async iterator. 429 430 Streaming is one of the two ways to consume events in Fluvio. It is a 431 continuous request for new records arriving in a partition, beginning 432 at a particular offset. You specify the starting point of the stream 433 using an Offset and periodically receive events, either individually or 434 in batches. 435 """ 436 return self._async_generator(await self._inner.async_stream(offset)) 437 438 def stream_with_config( 439 self, offset: Offset, config: ConsumerConfig 440 ) -> typing.Iterator[Record]: 441 """ 442 Continuously streams events from a particular offset with a SmartModule 443 WASM module in the consumer’s partitions. This returns a 444 `Iterator[Record]` which is an iterator. 445 446 Streaming is one of the two ways to consume events in Fluvio. It is a 447 continuous request for new records arriving in a partition, beginning 448 at a particular offset. You specify the starting point of the stream 449 using an Offset and periodically receive events, either individually or 450 in batches. 451 452 Args: 453 offset: Offset 454 wasm_module_path: str - The absolute path to the WASM file 455 456 Example: 457 import os 458 459 wmp = os.path.abspath("somefilter.wasm") 460 config = ConsumerConfig() 461 config.smartmodule(path=wmp) 462 for i in consumer.stream_with_config(Offset.beginning(), config): 463 # do something with i 464 465 Returns: 466 `Iterator[Record]` 467 468 """ 469 return self._generator( 470 self._inner.stream_with_config(offset._inner, config._inner) 471 ) 472 473 async def async_stream_with_config( 474 self, offset: Offset, config: ConsumerConfig 475 ) -> typing.AsyncIterator[Record]: 476 """ 477 Continuously streams events from a particular offset with a SmartModule 478 WASM module in the consumer’s partitions. This returns a 479 `AsyncIterator[Record]` which is an async iterator. 480 481 Streaming is one of the two ways to consume events in Fluvio. It is a 482 continuous request for new records arriving in a partition, beginning 483 at a particular offset. You specify the starting point of the stream 484 using an Offset and periodically receive events, either individually or 485 in batches. 486 487 Args: 488 offset: Offset 489 wasm_module_path: str - The absolute path to the WASM file 490 491 Example: 492 import os 493 494 wmp = os.path.abspath("somefilter.wasm") 495 config = ConsumerConfig() 496 config.smartmodule(path=wmp) 497 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 498 # do something with i 499 500 Returns: 501 `AsyncIterator[Record]` 502 503 """ 504 return self._async_generator( 505 await self._inner.async_stream_with_config(offset, config._inner) 506 ) 507 508 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 509 item = stream.next() 510 while item is not None: 511 yield Record(item) 512 item = stream.next() 513 514 async def _async_generator( 515 self, astream: _AsyncPartitionConsumerStream 516 ) -> typing.AsyncIterator[Record]: 517 item = await astream.async_next() 518 while item is not None: 519 yield Record(item) 520 item = await astream.async_next()
An interface for consuming events from multiple partitions
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
410 def stream(self, offset: Offset) -> typing.Iterator[Record]: 411 """ 412 Continuously streams events from a particular offset in the consumer’s 413 partitions. This returns a `Iterator[Record]` which is an 414 iterator. 415 416 Streaming is one of the two ways to consume events in Fluvio. It is a 417 continuous request for new records arriving in a partition, beginning 418 at a particular offset. You specify the starting point of the stream 419 using an Offset and periodically receive events, either individually or 420 in batches. 421 """ 422 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
424 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 425 """ 426 Continuously streams events from a particular offset in the consumer’s 427 partitions. This returns a `AsyncIterator[Record]` which is an 428 async iterator. 429 430 Streaming is one of the two ways to consume events in Fluvio. It is a 431 continuous request for new records arriving in a partition, beginning 432 at a particular offset. You specify the starting point of the stream 433 using an Offset and periodically receive events, either individually or 434 in batches. 435 """ 436 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a AsyncIterator[Record]
which is an
async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
438 def stream_with_config( 439 self, offset: Offset, config: ConsumerConfig 440 ) -> typing.Iterator[Record]: 441 """ 442 Continuously streams events from a particular offset with a SmartModule 443 WASM module in the consumer’s partitions. This returns a 444 `Iterator[Record]` which is an iterator. 445 446 Streaming is one of the two ways to consume events in Fluvio. It is a 447 continuous request for new records arriving in a partition, beginning 448 at a particular offset. You specify the starting point of the stream 449 using an Offset and periodically receive events, either individually or 450 in batches. 451 452 Args: 453 offset: Offset 454 wasm_module_path: str - The absolute path to the WASM file 455 456 Example: 457 import os 458 459 wmp = os.path.abspath("somefilter.wasm") 460 config = ConsumerConfig() 461 config.smartmodule(path=wmp) 462 for i in consumer.stream_with_config(Offset.beginning(), config): 463 # do something with i 464 465 Returns: 466 `Iterator[Record]` 467 468 """ 469 return self._generator( 470 self._inner.stream_with_config(offset._inner, config._inner) 471 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
473 async def async_stream_with_config( 474 self, offset: Offset, config: ConsumerConfig 475 ) -> typing.AsyncIterator[Record]: 476 """ 477 Continuously streams events from a particular offset with a SmartModule 478 WASM module in the consumer’s partitions. This returns a 479 `AsyncIterator[Record]` which is an async iterator. 480 481 Streaming is one of the two ways to consume events in Fluvio. It is a 482 continuous request for new records arriving in a partition, beginning 483 at a particular offset. You specify the starting point of the stream 484 using an Offset and periodically receive events, either individually or 485 in batches. 486 487 Args: 488 offset: Offset 489 wasm_module_path: str - The absolute path to the WASM file 490 491 Example: 492 import os 493 494 wmp = os.path.abspath("somefilter.wasm") 495 config = ConsumerConfig() 496 config.smartmodule(path=wmp) 497 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 498 # do something with i 499 500 Returns: 501 `AsyncIterator[Record]` 502 503 """ 504 return self._async_generator( 505 await self._inner.async_stream_with_config(offset, config._inner) 506 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
AsyncIterator[Record]
29class CommonCreateRequest: 30 _inner: _CommonCreateRequest 31 32 def __init__(self, inner: _CommonCreateRequest): 33 self._inner = inner 34 35 @classmethod 36 def new(cls, name: str, dry_run: bool, timeout: int): 37 return cls(_CommonCreateRequest.new(name, dry_run, timeout))
84class SmartModuleSpec: 85 _inner: _SmartModuleSpec 86 87 def __init__(self, inner: _SmartModuleSpec): 88 self._inner = inner 89 90 @classmethod 91 def new(cls, path: str): 92 f = open(path, mode="rb") 93 data = f.read() 94 f.close() 95 return cls(_SmartModuleSpec.with_binary(data))
18class PartitionMap: 19 _inner: _PartitionMap 20 21 def __init__(self, inner: _PartitionMap): 22 self._inner = inner 23 24 @classmethod 25 def new(cls, partition: int, replicas: typing.List[int]): 26 return cls(_PartitionMap.new(partition, replicas))
50class MessageMetadataTopicSpec: 51 _inner: _MessageMetadataTopicSpec 52 53 def __init__(self, inner: _MessageMetadataTopicSpec): 54 self._inner = inner 55 56 def is_update(self) -> bool: 57 return self._inner.is_update() 58 59 def is_delete(self) -> bool: 60 return self._inner.is_delete() 61 62 def metadata_topic_spec(self) -> MetadataTopicSpec: 63 return MetadataTopicSpec(self._inner.metadata_topic_spec())
142class MetadataPartitionSpec: 143 _inner: _MetadataPartitionSpec 144 145 def __init__(self, inner: _MetadataPartitionSpec): 146 self._inner = inner 147 148 def name(self) -> str: 149 return self._inner.name()
40class MetadataTopicSpec: 41 _inner: _MetadataTopicSpec 42 43 def __init__(self, inner: _MetadataTopicSpec): 44 self._inner = inner 45 46 def name(self) -> str: 47 return self._inner.name()
66class MetaUpdateTopicSpec: 67 _inner: _MetaUpdateTopicSpec 68 69 def __init__(self, inner: _MetaUpdateTopicSpec): 70 self._inner = inner 71 72 def all(self) -> typing.List[MetadataTopicSpec]: 73 inners = self._inner.all() 74 return [MetadataTopicSpec(i) for i in inners] 75 76 def changes(self) -> typing.List[MessageMetadataTopicSpec]: 77 inners = self._inner.changes() 78 return [MessageMetadataTopicSpec(i) for i in inners] 79 80 def epoch(self) -> int: 81 return self._inner.epoch()
602class PartitionSelectionStrategy: 603 """Stragegy to select partitions""" 604 605 _inner: _PartitionSelectionStrategy 606 607 def __init__(self, inner: _FluvioConfig): 608 self._inner = inner 609 610 @classmethod 611 def with_all(cls, topic: str): 612 """select all partitions of one topic""" 613 return cls(_PartitionSelectionStrategy.with_all(topic)) 614 615 @classmethod 616 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 617 """select multiple partitions of multiple topics""" 618 return cls(_PartitionSelectionStrategy.with_multiple(topic))
Stragegy to select partitions
610 @classmethod 611 def with_all(cls, topic: str): 612 """select all partitions of one topic""" 613 return cls(_PartitionSelectionStrategy.with_all(topic))
select all partitions of one topic
615 @classmethod 616 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 617 """select multiple partitions of multiple topics""" 618 return cls(_PartitionSelectionStrategy.with_multiple(topic))
select multiple partitions of multiple topics
188class SmartModuleKind(Enum): 189 """ 190 Use of this is to explicitly set the kind of a smartmodule. Not required 191 but needed for legacy SmartModules. 192 """ 193 194 Filter = _SmartModuleKind.Filter 195 Map = _SmartModuleKind.Map 196 ArrayMap = _SmartModuleKind.ArrayMap 197 FilterMap = _SmartModuleKind.FilterMap 198 Aggregate = _SmartModuleKind.Aggregate
Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.
Inherited Members
- enum.Enum
- name
- value