fluvio
The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.
This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.
Creating a topic with default settings is as simple as:
fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")
Or just create a topic with custom settings:
import fluvio
fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)
Producing data to a topic in a Fluvio cluster is as simple as:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
producer = fluvio.topic_producer(topic)
for i in range(10):
producer.send_string("Hello %s " % i)
Consuming is also simple:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
Also you can consume usign offset management:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
stream.offset_commit()
stream.offset_flush()
For more examples see the integration tests in the fluvio-python repository.
1""" 2The __fluvio__ python module provides an extension for working with the Fluvio 3streaming platform. 4 5This module builds on top of the Fluvio Client Rust Crate and provides a 6pythonic access to the API. 7 8 9Creating a topic with default settings is as simple as: 10 11```python 12fluvio_admin = FluvioAdmin.connect() 13fluvio_admin.create_topic("a_topic") 14``` 15 16Or just create a topic with custom settings: 17 18```python 19import fluvio 20 21fluvio_admin = FluvioAdmin.connect() 22topic_spec = ( 23 TopicSpec.create() 24 .with_retention_time("1h") 25 .with_segment_size("10M") 26 .build() 27) 28fluvio_admin.create_topic("a_topic", topic_spec) 29``` 30 31Producing data to a topic in a Fluvio cluster is as simple as: 32 33```python 34import fluvio 35 36fluvio = Fluvio.connect() 37 38topic = "a_topic" 39producer = fluvio.topic_producer(topic) 40 41for i in range(10): 42 producer.send_string("Hello %s " % i) 43``` 44 45Consuming is also simple: 46 47```python 48import fluvio 49 50fluvio = Fluvio.connect() 51 52topic = "a_topic" 53builder = ConsumerConfigExtBuilder(topic) 54config = builder.build() 55stream = fluvio.consumer_with_config(config) 56 57num_items = 2 58records = [bytearray(next(stream).value()).decode() for _ in range(num_items)] 59``` 60 61Also you can consume usign offset management: 62 63```python 64import fluvio 65 66fluvio = Fluvio.connect() 67 68topic = "a_topic" 69builder = ConsumerConfigExtBuilder(topic) 70builder.offset_start(Offset.beginning()) 71builder.offset_strategy(OffsetManagementStrategy.MANUAL) 72builder.offset_consumer("a-consumer") 73config = builder.build() 74stream = fluvio.consumer_with_config(config) 75 76num_items = 2 77records = [bytearray(next(stream).value()).decode() for _ in range(num_items)] 78 79stream.offset_commit() 80stream.offset_flush() 81``` 82 83For more examples see the integration tests in the fluvio-python repository. 84 85[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py) 86[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py) 87 88""" 89 90import typing 91from enum import Enum 92 93from ._fluvio_python import ( 94 Fluvio as _Fluvio, 95 FluvioConfig as _FluvioConfig, 96 Offset, 97 FluvioAdmin as _FluvioAdmin, 98 # consumer types 99 ConsumerConfig as _ConsumerConfig, 100 ConsumerConfigExt, 101 ConsumerConfigExtBuilder, 102 PartitionConsumer as _PartitionConsumer, 103 MultiplePartitionConsumer as _MultiplePartitionConsumer, 104 PartitionSelectionStrategy as _PartitionSelectionStrategy, 105 PartitionConsumerStream as _PartitionConsumerStream, 106 AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream, 107 OffsetManagementStrategy, 108 ConsumerOffset as _ConsumerOffset, 109 # producer types 110 TopicProducer as _TopicProducer, 111 ProduceOutput as _ProduceOutput, 112 ProducerBatchRecord as _ProducerBatchRecord, 113 # admin and misc types 114 SmartModuleKind as _SmartModuleKind, 115 TopicSpec as _TopicSpec, 116 WatchTopicStream as _WatchTopicStream, 117 WatchSmartModuleStream as _WatchSmartModuleStream, 118) 119 120from ._fluvio_python import Error as FluviorError # noqa: F401 121 122from .topic import TopicSpec, CompressionType, TopicMode 123from .record import Record, RecordMetadata 124from .specs import ( 125 # support types 126 CommonCreateRequest, 127 PartitionMap, 128 # specs 129 SmartModuleSpec, 130 MessageMetadataTopicSpec, 131 MetadataPartitionSpec, 132 MetadataSmartModuleSpec, 133 MetadataTopicSpec, 134 MetaUpdateSmartModuleSpec, 135 MetaUpdateTopicSpec, 136) 137 138# this structures the module a bit and allows pydoc to generate better docs 139# with better ordering of types and functions 140# inclusion in __all__, will pull the PyO3 rust inline docs into 141# the pdoc generated documentation 142__all__ = [ 143 # top level types 144 "Fluvio", 145 "FluvioConfig", 146 "FluvioAdmin", 147 # topic 148 "TopicSpec", 149 "CompressionType", 150 "TopicMode", 151 # record 152 "Record", 153 "RecordMetadata", 154 "Offset", 155 # producer 156 "TopicProducer", 157 "ProduceOutput", 158 # consumer 159 "ConsumerConfigExt", 160 "ConsumerConfigExtBuilder", 161 "ConsumerConfig", 162 "PartitionConsumer", 163 "MultiplePartitionConsumer", 164 "OffsetManagementStrategy", 165 "ConsumerOffset", 166 # specs 167 "CommonCreateRequest", 168 "SmartModuleSpec", 169 "TopicSpec", 170 "PartitionMap", 171 "MessageMetadataTopicSpec", 172 "MetadataPartitionSpec", 173 "MetadataTopicSpec", 174 "MetaUpdateTopicSpec", 175 # Misc 176 "PartitionSelectionStrategy", 177 "SmartModuleKind", 178] 179 180 181class ProduceOutput: 182 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 183 184 _inner: _ProduceOutput 185 186 def __init__(self, inner: _ProduceOutput) -> None: 187 self._inner = inner 188 189 def wait(self) -> typing.Optional[RecordMetadata]: 190 """Wait for the record metadata. 191 192 This is a blocking call and may only return a `RecordMetadata` once. 193 Any subsequent call to `wait` will return a `None` value. 194 Errors will be raised as exceptions of type `FluvioError`. 195 """ 196 res = self._inner.wait() 197 if res is None: 198 return None 199 return RecordMetadata(res) 200 201 async def async_wait(self) -> typing.Optional[RecordMetadata]: 202 """Asynchronously wait for the record metadata. 203 204 This may only return a `RecordMetadata` once. 205 Any subsequent call to `wait` will return a `None` value. 206 """ 207 return await self._inner.async_wait() 208 209 210class SmartModuleKind(Enum): 211 """ 212 Use of this is to explicitly set the kind of a smartmodule. Not required 213 but needed for legacy SmartModules. 214 """ 215 216 Filter = _SmartModuleKind.Filter 217 Map = _SmartModuleKind.Map 218 ArrayMap = _SmartModuleKind.ArrayMap 219 FilterMap = _SmartModuleKind.FilterMap 220 Aggregate = _SmartModuleKind.Aggregate 221 222 223class ConsumerConfig: 224 _inner: _ConsumerConfig 225 226 def __init__(self): 227 self._inner = _ConsumerConfig() 228 229 def disable_continuous(self, val: bool = True): 230 """Disable continuous mode after fetching specified records""" 231 self._inner.disable_continuous(val) 232 233 def smartmodule( 234 self, 235 name: str = None, 236 path: str = None, 237 kind: SmartModuleKind = None, 238 params: typing.Dict[str, str] = None, 239 aggregate: typing.List[bytes] = None, 240 ): 241 """ 242 This is a method for adding a smartmodule to a consumer config either 243 using a `name` of a `SmartModule` or a `path` to a wasm binary. 244 245 Args: 246 247 name: str 248 path: str 249 kind: SmartModuleKind 250 params: Dict[str, str] 251 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 252 253 Raises: 254 "Require either a path or a name for a smartmodule." 255 "Only specify one of path or name not both." 256 257 Returns: 258 259 None 260 """ 261 262 if kind is not None: 263 kind = kind.value 264 265 if path is None and name is None: 266 raise Exception("Require either a path or a name for a smartmodule.") 267 268 if path is not None and name is not None: 269 raise Exception("Only specify one of path or name not both.") 270 271 params = {} if params is None else params 272 param_keys = [x for x in params.keys()] 273 param_vals = [x for x in params.values()] 274 275 self._inner.smartmodule( 276 name, 277 path, 278 kind, 279 param_keys, 280 param_vals, 281 aggregate, 282 # These arguments are for Join stuff but that's not implemented on 283 # the python side yet 284 None, 285 None, 286 None, 287 None, 288 ) 289 290 291class ConsumerIterator: 292 def __init__(self, stream: _PartitionConsumerStream): 293 self.stream = stream 294 295 def __iter__(self): 296 return self 297 298 def __next__(self): 299 item = self.stream.next() 300 if item is None: 301 raise StopIteration 302 return Record(item) 303 304 def offset_commit(self): 305 self.stream.offset_commit() 306 307 def offset_flush(self): 308 self.stream.offset_flush() 309 310 311class PartitionConsumer: 312 """ 313 An interface for consuming events from a particular partition 314 315 There are two ways to consume events: by "fetching" events and by 316 "streaming" events. Fetching involves specifying a range of events that you 317 want to consume via their Offset. A fetch is a sort of one-time batch 318 operation: you’ll receive all of the events in your range all at once. When 319 you consume events via Streaming, you specify a starting Offset and receive 320 an object that will continuously yield new events as they arrive. 321 """ 322 323 _inner: _PartitionConsumer 324 325 def __init__(self, inner: _PartitionConsumer): 326 self._inner = inner 327 328 def stream(self, offset: Offset) -> typing.Iterator[Record]: 329 """ 330 Continuously streams events from a particular offset in the consumer’s 331 partition. This returns a `Iterator[Record]` which is an 332 iterator. 333 334 Streaming is one of the two ways to consume events in Fluvio. It is a 335 continuous request for new records arriving in a partition, beginning 336 at a particular offset. You specify the starting point of the stream 337 using an Offset and periodically receive events, either individually or 338 in batches. 339 """ 340 return self._generator(self._inner.stream(offset)) 341 342 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 343 """ 344 Continuously streams events from a particular offset in the consumer’s 345 partition. This returns a `AsyncIterator[Record]` which is an 346 iterator. 347 348 Streaming is one of the two ways to consume events in Fluvio. It is a 349 continuous request for new records arriving in a partition, beginning 350 at a particular offset. You specify the starting point of the stream 351 using an Offset and periodically receive events, either individually or 352 in batches. 353 """ 354 return self._async_generator(await self._inner.async_stream(offset)) 355 356 def stream_with_config( 357 self, offset: Offset, config: ConsumerConfig 358 ) -> typing.Iterator[Record]: 359 """ 360 Continuously streams events from a particular offset with a SmartModule 361 WASM module in the consumer’s partition. This returns a 362 `Iterator[Record]` which is an iterator. 363 364 Streaming is one of the two ways to consume events in Fluvio. It is a 365 continuous request for new records arriving in a partition, beginning 366 at a particular offset. You specify the starting point of the stream 367 using an Offset and periodically receive events, either individually or 368 in batches. 369 370 Args: 371 offset: Offset 372 wasm_module_path: str - The absolute path to the WASM file 373 374 Example: 375 import os 376 377 wmp = os.path.abspath("somefilter.wasm") 378 config = ConsumerConfig() 379 config.smartmodule(path=wmp) 380 for i in consumer.stream_with_config(Offset.beginning(), config): 381 # do something with i 382 383 Returns: 384 `Iterator[Record]` 385 386 """ 387 return self._generator(self._inner.stream_with_config(offset, config._inner)) 388 389 async def async_stream_with_config( 390 self, offset: Offset, config: ConsumerConfig 391 ) -> typing.AsyncIterator[Record]: 392 """ 393 Continuously streams events from a particular offset with a SmartModule 394 WASM module in the consumer’s partition. This returns a 395 `AsyncIterator[Record]` which is an async iterator. 396 397 Streaming is one of the two ways to consume events in Fluvio. It is a 398 continuous request for new records arriving in a partition, beginning 399 at a particular offset. You specify the starting point of the stream 400 using an Offset and periodically receive events, either individually or 401 in batches. 402 403 Args: 404 offset: Offset 405 wasm_module_path: str - The absolute path to the WASM file 406 407 Example: 408 import os 409 410 wmp = os.path.abspath("somefilter.wasm") 411 config = ConsumerConfig() 412 config.smartmodule(path=wmp) 413 `AsyncIterator[Record]` 414 415 """ 416 return self._async_generator( 417 await self._inner.async_stream_with_config(offset, config._inner) 418 ) 419 420 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 421 item = stream.next() 422 while item is not None: 423 yield Record(item) 424 item = stream.next() 425 426 async def _async_generator( 427 self, astream: _AsyncPartitionConsumerStream 428 ) -> typing.AsyncIterator[Record]: 429 item = await astream.async_next() 430 while item is not None: 431 yield Record(item) 432 item = await astream.async_next() 433 434 435class MultiplePartitionConsumer: 436 """ 437 An interface for consuming events from multiple partitions 438 439 There are two ways to consume events: by "fetching" events and by 440 "streaming" events. Fetching involves specifying a range of events that you 441 want to consume via their Offset. A fetch is a sort of one-time batch 442 operation: you’ll receive all of the events in your range all at once. When 443 you consume events via Streaming, you specify a starting Offset and receive 444 an object that will continuously yield new events as they arrive. 445 """ 446 447 _inner: _MultiplePartitionConsumer 448 449 def __init__(self, inner: _MultiplePartitionConsumer): 450 self._inner = inner 451 452 def stream(self, offset: Offset) -> typing.Iterator[Record]: 453 """ 454 Continuously streams events from a particular offset in the consumer’s 455 partitions. This returns a `Iterator[Record]` which is an 456 iterator. 457 458 Streaming is one of the two ways to consume events in Fluvio. It is a 459 continuous request for new records arriving in a partition, beginning 460 at a particular offset. You specify the starting point of the stream 461 using an Offset and periodically receive events, either individually or 462 in batches. 463 """ 464 return self._generator(self._inner.stream(offset)) 465 466 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 467 """ 468 Continuously streams events from a particular offset in the consumer’s 469 partitions. This returns a `AsyncIterator[Record]` which is an 470 async iterator. 471 472 Streaming is one of the two ways to consume events in Fluvio. It is a 473 continuous request for new records arriving in a partition, beginning 474 at a particular offset. You specify the starting point of the stream 475 using an Offset and periodically receive events, either individually or 476 in batches. 477 """ 478 return self._async_generator(await self._inner.async_stream(offset)) 479 480 def stream_with_config( 481 self, offset: Offset, config: ConsumerConfig 482 ) -> typing.Iterator[Record]: 483 """ 484 Continuously streams events from a particular offset with a SmartModule 485 WASM module in the consumer’s partitions. This returns a 486 `Iterator[Record]` which is an iterator. 487 488 Streaming is one of the two ways to consume events in Fluvio. It is a 489 continuous request for new records arriving in a partition, beginning 490 at a particular offset. You specify the starting point of the stream 491 using an Offset and periodically receive events, either individually or 492 in batches. 493 494 Args: 495 offset: Offset 496 wasm_module_path: str - The absolute path to the WASM file 497 498 Example: 499 import os 500 501 wmp = os.path.abspath("somefilter.wasm") 502 config = ConsumerConfig() 503 config.smartmodule(path=wmp) 504 for i in consumer.stream_with_config(Offset.beginning(), config): 505 # do something with i 506 507 Returns: 508 `Iterator[Record]` 509 510 """ 511 return self._generator( 512 self._inner.stream_with_config(offset._inner, config._inner) 513 ) 514 515 async def async_stream_with_config( 516 self, offset: Offset, config: ConsumerConfig 517 ) -> typing.AsyncIterator[Record]: 518 """ 519 Continuously streams events from a particular offset with a SmartModule 520 WASM module in the consumer’s partitions. This returns a 521 `AsyncIterator[Record]` which is an async iterator. 522 523 Streaming is one of the two ways to consume events in Fluvio. It is a 524 continuous request for new records arriving in a partition, beginning 525 at a particular offset. You specify the starting point of the stream 526 using an Offset and periodically receive events, either individually or 527 in batches. 528 529 Args: 530 offset: Offset 531 wasm_module_path: str - The absolute path to the WASM file 532 533 Example: 534 import os 535 536 wmp = os.path.abspath("somefilter.wasm") 537 config = ConsumerConfig() 538 config.smartmodule(path=wmp) 539 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 540 # do something with i 541 542 Returns: 543 `AsyncIterator[Record]` 544 545 """ 546 return self._async_generator( 547 await self._inner.async_stream_with_config(offset, config._inner) 548 ) 549 550 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 551 item = stream.next() 552 while item is not None: 553 yield Record(item) 554 item = stream.next() 555 556 async def _async_generator( 557 self, astream: _AsyncPartitionConsumerStream 558 ) -> typing.AsyncIterator[Record]: 559 item = await astream.async_next() 560 while item is not None: 561 yield Record(item) 562 item = await astream.async_next() 563 564 565class TopicProducer: 566 """An interface for producing events to a particular topic. 567 568 A `TopicProducer` allows you to send events to the specific topic it was 569 initialized for. Once you have a `TopicProducer`, you can send events to 570 the topic, choosing which partition each event should be delivered to. 571 """ 572 573 _inner: _TopicProducer 574 575 def __init__(self, inner: _TopicProducer): 576 self._inner = inner 577 578 def send_string(self, buf: str) -> ProduceOutput: 579 """Sends a string to this producer’s topic""" 580 return self.send([], buf.encode("utf-8")) 581 582 async def async_send_string(self, buf: str) -> ProduceOutput: 583 """Sends a string to this producer’s topic""" 584 return await self.async_send([], buf.encode("utf-8")) 585 586 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 587 """ 588 Sends a key/value record to this producer's Topic. 589 590 The partition that the record will be sent to is derived from the Key. 591 """ 592 return ProduceOutput(self._inner.send(key, value)) 593 594 async def async_send( 595 self, key: typing.List[int], value: typing.List[int] 596 ) -> ProduceOutput: 597 """ 598 Async sends a key/value record to this producer's Topic. 599 600 The partition that the record will be sent to is derived from the Key. 601 """ 602 produce_output = await self._inner.async_send(key, value) 603 return ProduceOutput(produce_output) 604 605 def flush(self) -> None: 606 """ 607 Send all the queued records in the producer batches. 608 """ 609 return self._inner.flush() 610 611 async def async_flush(self) -> None: 612 """ 613 Async send all the queued records in the producer batches. 614 """ 615 await self._inner.async_flush() 616 617 def send_all( 618 self, records: typing.List[typing.Tuple[bytes, bytes]] 619 ) -> typing.List[ProduceOutput]: 620 """ 621 Sends a list of key/value records as a batch to this producer's Topic. 622 :param records: The list of records to send 623 """ 624 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 625 return [ 626 ProduceOutput(output_inner) 627 for output_inner in self._inner.send_all(records_inner) 628 ] 629 630 async def async_send_all( 631 self, records: typing.List[typing.Tuple[bytes, bytes]] 632 ) -> typing.List[ProduceOutput]: 633 """ 634 Async sends a list of key/value records as a batch to this producer's Topic. 635 :param records: The list of records to send 636 """ 637 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 638 return [ 639 ProduceOutput(output_inner) 640 for output_inner in await self._inner.async_send_all(records_inner) 641 ] 642 643 644class PartitionSelectionStrategy: 645 """Stragegy to select partitions""" 646 647 _inner: _PartitionSelectionStrategy 648 649 def __init__(self, inner: _FluvioConfig): 650 self._inner = inner 651 652 @classmethod 653 def with_all(cls, topic: str): 654 """select all partitions of one topic""" 655 return cls(_PartitionSelectionStrategy.with_all(topic)) 656 657 @classmethod 658 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 659 """select multiple partitions of multiple topics""" 660 return cls(_PartitionSelectionStrategy.with_multiple(topic)) 661 662 663class ConsumerOffset: 664 """Consumer offset""" 665 666 _inner: _ConsumerOffset 667 668 def __init__(self, inner: _ConsumerOffset): 669 self._inner = inner 670 671 def consumer_id(self) -> str: 672 return self._inner.consumer_id() 673 674 def topic(self) -> str: 675 return self._inner.topic() 676 677 def partition(self) -> int: 678 return self._inner.partition() 679 680 def offset(self) -> int: 681 return self._inner.offset() 682 683 def modified_time(self) -> int: 684 return self._inner.modified_time() 685 686 687class FluvioConfig: 688 """Configuration for Fluvio client""" 689 690 _inner: _FluvioConfig 691 692 def __init__(self, inner: _FluvioConfig): 693 self._inner = inner 694 695 @classmethod 696 def load(cls): 697 """get current cluster config from default profile""" 698 return cls(_FluvioConfig.load()) 699 700 @classmethod 701 def new(cls, addr: str): 702 """Create a new cluster configuration with no TLS.""" 703 return cls(_FluvioConfig.new(addr)) 704 705 def set_endpoint(self, endpoint: str): 706 """set endpoint""" 707 self._inner.set_endpoint(endpoint) 708 709 def set_use_spu_local_address(self, val: bool): 710 """set wheather to use spu local address""" 711 self._inner.set_use_spu_local_address(val) 712 713 def disable_tls(self): 714 """disable tls for this config""" 715 self._inner.disable_tls() 716 717 def set_anonymous_tls(self): 718 """set the config to use anonymous tls""" 719 self._inner.set_anonymous_tls() 720 721 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 722 """specify inline tls parameters""" 723 self._inner.set_inline_tls(domain, key, cert, ca_cert) 724 725 def set_tls_file_paths( 726 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 727 ): 728 """specify paths to tls files""" 729 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 730 731 def set_client_id(self, client_id: str): 732 """set client id""" 733 self._inner.set_client_id(client_id) 734 735 def unset_client_id(self): 736 """remove the configured client id from config""" 737 self._inner.unset_client_id() 738 739 740class Fluvio: 741 """An interface for interacting with Fluvio streaming.""" 742 743 _inner: _Fluvio 744 745 def __init__(self, inner: _Fluvio): 746 self._inner = inner 747 748 @classmethod 749 def connect(cls): 750 """Tries to create a new Fluvio client using the current profile from 751 `~/.fluvio/config` 752 """ 753 return cls(_Fluvio.connect()) 754 755 @classmethod 756 def connect_with_config(cls, config: FluvioConfig): 757 """Creates a new Fluvio client using the given configuration""" 758 return cls(_Fluvio.connect_with_config(config._inner)) 759 760 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 761 """Creates consumer with settings defined in config 762 763 This is the recommended way to create a consume records. 764 """ 765 return ConsumerIterator(self._inner.consumer_with_config(config)) 766 767 def topic_producer(self, topic: str) -> TopicProducer: 768 """ 769 Creates a new `TopicProducer` for the given topic name. 770 771 Currently, producers are scoped to a specific Fluvio topic. That means 772 when you send events via a producer, you must specify which partition 773 each event should go to. 774 """ 775 return TopicProducer(self._inner.topic_producer(topic)) 776 777 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 778 """Creates a new `PartitionConsumer` for the given topic and partition 779 780 Currently, consumers are scoped to both a specific Fluvio topic and to 781 a particular partition within that topic. That means that if you have a 782 topic with multiple partitions, then in order to receive all of the 783 events in all of the partitions, you will need to create one consumer 784 per partition. 785 """ 786 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 787 788 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 789 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 790 791 Currently, consumers are scoped to both a specific Fluvio topic and to 792 its all partitions within that topic. 793 """ 794 strategy = PartitionSelectionStrategy.with_all(topic) 795 return MultiplePartitionConsumer( 796 self._inner.multi_partition_consumer(strategy._inner) 797 ) 798 799 def multi_topic_partition_consumer( 800 self, selections: typing.List[typing.Tuple[str, int]] 801 ) -> MultiplePartitionConsumer: 802 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 803 804 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 805 """ 806 strategy = PartitionSelectionStrategy.with_multiple(selections) 807 return MultiplePartitionConsumer( 808 self._inner.multi_partition_consumer(strategy._inner) 809 ) 810 811 def consumer_offsets(self) -> typing.List[ConsumerOffset]: 812 """Fetch the current offsets of the consumer""" 813 return self._inner.consumer_offsets() 814 815 def delete_consumer_offset(self, consumer: str, topic: str, partition: int): 816 """Delete the consumer offset""" 817 return self._inner.delete_consumer_offset(consumer, topic, partition) 818 819 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 820 item = stream.next() 821 while item is not None: 822 yield Record(item) 823 item = stream.next() 824 825 826class FluvioAdmin: 827 _inner: _FluvioAdmin 828 829 def __init__(self, inner: _FluvioAdmin): 830 self._inner = inner 831 832 def connect(): 833 return FluvioAdmin(_FluvioAdmin.connect()) 834 835 def connect_with_config(config: FluvioConfig): 836 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 837 838 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 839 partitions = 1 840 replication = 1 841 ignore_rack = True 842 dry_run = False 843 spec = ( 844 spec 845 if spec is not None 846 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 847 ) 848 return self._inner.create_topic(topic, dry_run, spec) 849 850 def create_topic_with_config( 851 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 852 ): 853 return self._inner.create_topic_with_config(topic, req._inner, spec) 854 855 def delete_topic(self, topic: str): 856 return self._inner.delete_topic(topic) 857 858 def all_topics(self) -> typing.List[MetadataTopicSpec]: 859 return self._inner.all_topics() 860 861 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 862 return self._inner.list_topics(filters) 863 864 def list_topics_with_params( 865 self, filters: typing.List[str], summary: bool 866 ) -> typing.List[MetadataTopicSpec]: 867 return self._inner.list_topics_with_params(filters, summary) 868 869 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 870 return self._topic_spec_generator(self._inner.watch_topic()) 871 872 def create_smartmodule(self, name: str, path: str, dry_run: bool): 873 spec = SmartModuleSpec.new(path) 874 return self._inner.create_smart_module(name, dry_run, spec._inner) 875 876 def delete_smartmodule(self, name: str): 877 return self._inner.delete_smart_module(name) 878 879 def list_smartmodules( 880 self, filters: typing.List[str] 881 ) -> typing.List[MetadataSmartModuleSpec]: 882 return self._inner.list_smart_modules(filters) 883 884 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 885 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 886 887 def list_partitions( 888 self, filters: typing.List[str] 889 ) -> typing.List[MetadataPartitionSpec]: 890 return self._inner.list_partitions(filters) 891 892 def _topic_spec_generator( 893 self, stream: _WatchTopicStream 894 ) -> typing.Iterator[MetaUpdateTopicSpec]: 895 item = stream.next().inner() 896 while item is not None: 897 yield MetaUpdateTopicSpec(item) 898 item = stream.next().inner() 899 900 def _smart_module_spec_generator( 901 self, stream: _WatchSmartModuleStream 902 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 903 item = stream.next().inner() 904 while item is not None: 905 yield MetaUpdateSmartModuleSpec(item) 906 item = stream.next().inner()
741class Fluvio: 742 """An interface for interacting with Fluvio streaming.""" 743 744 _inner: _Fluvio 745 746 def __init__(self, inner: _Fluvio): 747 self._inner = inner 748 749 @classmethod 750 def connect(cls): 751 """Tries to create a new Fluvio client using the current profile from 752 `~/.fluvio/config` 753 """ 754 return cls(_Fluvio.connect()) 755 756 @classmethod 757 def connect_with_config(cls, config: FluvioConfig): 758 """Creates a new Fluvio client using the given configuration""" 759 return cls(_Fluvio.connect_with_config(config._inner)) 760 761 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 762 """Creates consumer with settings defined in config 763 764 This is the recommended way to create a consume records. 765 """ 766 return ConsumerIterator(self._inner.consumer_with_config(config)) 767 768 def topic_producer(self, topic: str) -> TopicProducer: 769 """ 770 Creates a new `TopicProducer` for the given topic name. 771 772 Currently, producers are scoped to a specific Fluvio topic. That means 773 when you send events via a producer, you must specify which partition 774 each event should go to. 775 """ 776 return TopicProducer(self._inner.topic_producer(topic)) 777 778 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 779 """Creates a new `PartitionConsumer` for the given topic and partition 780 781 Currently, consumers are scoped to both a specific Fluvio topic and to 782 a particular partition within that topic. That means that if you have a 783 topic with multiple partitions, then in order to receive all of the 784 events in all of the partitions, you will need to create one consumer 785 per partition. 786 """ 787 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 788 789 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 790 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 791 792 Currently, consumers are scoped to both a specific Fluvio topic and to 793 its all partitions within that topic. 794 """ 795 strategy = PartitionSelectionStrategy.with_all(topic) 796 return MultiplePartitionConsumer( 797 self._inner.multi_partition_consumer(strategy._inner) 798 ) 799 800 def multi_topic_partition_consumer( 801 self, selections: typing.List[typing.Tuple[str, int]] 802 ) -> MultiplePartitionConsumer: 803 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 804 805 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 806 """ 807 strategy = PartitionSelectionStrategy.with_multiple(selections) 808 return MultiplePartitionConsumer( 809 self._inner.multi_partition_consumer(strategy._inner) 810 ) 811 812 def consumer_offsets(self) -> typing.List[ConsumerOffset]: 813 """Fetch the current offsets of the consumer""" 814 return self._inner.consumer_offsets() 815 816 def delete_consumer_offset(self, consumer: str, topic: str, partition: int): 817 """Delete the consumer offset""" 818 return self._inner.delete_consumer_offset(consumer, topic, partition) 819 820 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 821 item = stream.next() 822 while item is not None: 823 yield Record(item) 824 item = stream.next()
An interface for interacting with Fluvio streaming.
749 @classmethod 750 def connect(cls): 751 """Tries to create a new Fluvio client using the current profile from 752 `~/.fluvio/config` 753 """ 754 return cls(_Fluvio.connect())
Tries to create a new Fluvio client using the current profile from
~/.fluvio/config
756 @classmethod 757 def connect_with_config(cls, config: FluvioConfig): 758 """Creates a new Fluvio client using the given configuration""" 759 return cls(_Fluvio.connect_with_config(config._inner))
Creates a new Fluvio client using the given configuration
761 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 762 """Creates consumer with settings defined in config 763 764 This is the recommended way to create a consume records. 765 """ 766 return ConsumerIterator(self._inner.consumer_with_config(config))
Creates consumer with settings defined in config
This is the recommended way to create a consume records.
768 def topic_producer(self, topic: str) -> TopicProducer: 769 """ 770 Creates a new `TopicProducer` for the given topic name. 771 772 Currently, producers are scoped to a specific Fluvio topic. That means 773 when you send events via a producer, you must specify which partition 774 each event should go to. 775 """ 776 return TopicProducer(self._inner.topic_producer(topic))
Creates a new TopicProducer
for the given topic name.
Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.
778 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 779 """Creates a new `PartitionConsumer` for the given topic and partition 780 781 Currently, consumers are scoped to both a specific Fluvio topic and to 782 a particular partition within that topic. That means that if you have a 783 topic with multiple partitions, then in order to receive all of the 784 events in all of the partitions, you will need to create one consumer 785 per partition. 786 """ 787 return PartitionConsumer(self._inner.partition_consumer(topic, partition))
Creates a new PartitionConsumer
for the given topic and partition
Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.
789 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 790 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 791 792 Currently, consumers are scoped to both a specific Fluvio topic and to 793 its all partitions within that topic. 794 """ 795 strategy = PartitionSelectionStrategy.with_all(topic) 796 return MultiplePartitionConsumer( 797 self._inner.multi_partition_consumer(strategy._inner) 798 )
Creates a new MultiplePartitionConsumer
for the given topic and its all partitions
Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.
800 def multi_topic_partition_consumer( 801 self, selections: typing.List[typing.Tuple[str, int]] 802 ) -> MultiplePartitionConsumer: 803 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 804 805 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 806 """ 807 strategy = PartitionSelectionStrategy.with_multiple(selections) 808 return MultiplePartitionConsumer( 809 self._inner.multi_partition_consumer(strategy._inner) 810 )
Creates a new MultiplePartitionConsumer
for the given topics and partitions
Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
688class FluvioConfig: 689 """Configuration for Fluvio client""" 690 691 _inner: _FluvioConfig 692 693 def __init__(self, inner: _FluvioConfig): 694 self._inner = inner 695 696 @classmethod 697 def load(cls): 698 """get current cluster config from default profile""" 699 return cls(_FluvioConfig.load()) 700 701 @classmethod 702 def new(cls, addr: str): 703 """Create a new cluster configuration with no TLS.""" 704 return cls(_FluvioConfig.new(addr)) 705 706 def set_endpoint(self, endpoint: str): 707 """set endpoint""" 708 self._inner.set_endpoint(endpoint) 709 710 def set_use_spu_local_address(self, val: bool): 711 """set wheather to use spu local address""" 712 self._inner.set_use_spu_local_address(val) 713 714 def disable_tls(self): 715 """disable tls for this config""" 716 self._inner.disable_tls() 717 718 def set_anonymous_tls(self): 719 """set the config to use anonymous tls""" 720 self._inner.set_anonymous_tls() 721 722 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 723 """specify inline tls parameters""" 724 self._inner.set_inline_tls(domain, key, cert, ca_cert) 725 726 def set_tls_file_paths( 727 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 728 ): 729 """specify paths to tls files""" 730 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 731 732 def set_client_id(self, client_id: str): 733 """set client id""" 734 self._inner.set_client_id(client_id) 735 736 def unset_client_id(self): 737 """remove the configured client id from config""" 738 self._inner.unset_client_id()
Configuration for Fluvio client
696 @classmethod 697 def load(cls): 698 """get current cluster config from default profile""" 699 return cls(_FluvioConfig.load())
get current cluster config from default profile
701 @classmethod 702 def new(cls, addr: str): 703 """Create a new cluster configuration with no TLS.""" 704 return cls(_FluvioConfig.new(addr))
Create a new cluster configuration with no TLS.
706 def set_endpoint(self, endpoint: str): 707 """set endpoint""" 708 self._inner.set_endpoint(endpoint)
set endpoint
710 def set_use_spu_local_address(self, val: bool): 711 """set wheather to use spu local address""" 712 self._inner.set_use_spu_local_address(val)
set wheather to use spu local address
718 def set_anonymous_tls(self): 719 """set the config to use anonymous tls""" 720 self._inner.set_anonymous_tls()
set the config to use anonymous tls
722 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 723 """specify inline tls parameters""" 724 self._inner.set_inline_tls(domain, key, cert, ca_cert)
specify inline tls parameters
726 def set_tls_file_paths( 727 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 728 ): 729 """specify paths to tls files""" 730 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
specify paths to tls files
827class FluvioAdmin: 828 _inner: _FluvioAdmin 829 830 def __init__(self, inner: _FluvioAdmin): 831 self._inner = inner 832 833 def connect(): 834 return FluvioAdmin(_FluvioAdmin.connect()) 835 836 def connect_with_config(config: FluvioConfig): 837 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 838 839 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 840 partitions = 1 841 replication = 1 842 ignore_rack = True 843 dry_run = False 844 spec = ( 845 spec 846 if spec is not None 847 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 848 ) 849 return self._inner.create_topic(topic, dry_run, spec) 850 851 def create_topic_with_config( 852 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 853 ): 854 return self._inner.create_topic_with_config(topic, req._inner, spec) 855 856 def delete_topic(self, topic: str): 857 return self._inner.delete_topic(topic) 858 859 def all_topics(self) -> typing.List[MetadataTopicSpec]: 860 return self._inner.all_topics() 861 862 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 863 return self._inner.list_topics(filters) 864 865 def list_topics_with_params( 866 self, filters: typing.List[str], summary: bool 867 ) -> typing.List[MetadataTopicSpec]: 868 return self._inner.list_topics_with_params(filters, summary) 869 870 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 871 return self._topic_spec_generator(self._inner.watch_topic()) 872 873 def create_smartmodule(self, name: str, path: str, dry_run: bool): 874 spec = SmartModuleSpec.new(path) 875 return self._inner.create_smart_module(name, dry_run, spec._inner) 876 877 def delete_smartmodule(self, name: str): 878 return self._inner.delete_smart_module(name) 879 880 def list_smartmodules( 881 self, filters: typing.List[str] 882 ) -> typing.List[MetadataSmartModuleSpec]: 883 return self._inner.list_smart_modules(filters) 884 885 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 886 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 887 888 def list_partitions( 889 self, filters: typing.List[str] 890 ) -> typing.List[MetadataPartitionSpec]: 891 return self._inner.list_partitions(filters) 892 893 def _topic_spec_generator( 894 self, stream: _WatchTopicStream 895 ) -> typing.Iterator[MetaUpdateTopicSpec]: 896 item = stream.next().inner() 897 while item is not None: 898 yield MetaUpdateTopicSpec(item) 899 item = stream.next().inner() 900 901 def _smart_module_spec_generator( 902 self, stream: _WatchSmartModuleStream 903 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 904 item = stream.next().inner() 905 while item is not None: 906 yield MetaUpdateSmartModuleSpec(item) 907 item = stream.next().inner()
839 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 840 partitions = 1 841 replication = 1 842 ignore_rack = True 843 dry_run = False 844 spec = ( 845 spec 846 if spec is not None 847 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 848 ) 849 return self._inner.create_topic(topic, dry_run, spec)
26@dataclass(frozen=True) 27class TopicSpec: 28 mode: TopicMode = TopicMode.COMPUTED 29 partitions: int = 1 30 replications: int = 1 31 ignore_rack: bool = True 32 replica_assignment: Optional[List[PartitionMap]] = None 33 retention_time: Optional[int] = None 34 segment_size: Optional[int] = None 35 compression_type: Optional[CompressionType] = None 36 max_partition_size: Optional[int] = None 37 system: bool = False 38 39 @classmethod 40 def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True): 41 return cls(_TopicSpec.new_computed(partitions, replication, ignore)) 42 43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls() 47 48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 ) 53 54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR) 57 58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions) 63 64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications) 69 70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore) 73 74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression) 77 78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time) 86 87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size) 94 95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size) 102 103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system) 106 107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls()
Alternative constructor method
48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 )
Set the assigned replica configuration
54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR)
Set as a mirror topic
58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions)
Set the specified partitions
64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications)
Set the specified replication factor
70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore)
Set the rack ignore setting
74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression)
Set the specified compression type
78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time)
Set the specified retention time
87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size)
Set the specified segment size
95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size)
Set the specified max partition size
103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system)
Set the topic as an internal system topic
107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
Build the TopicSpec based on the current configuration
17class CompressionType(Enum): 18 NONE = "none" 19 GZIP = "gzip" 20 SNAPPY = "snappy" 21 LZ4 = "lz4" 22 ANY = "any" 23 ZSTD = "zstd"
9class Record: 10 """The individual record for a given stream.""" 11 12 _inner: _Record 13 14 def __init__(self, inner: _Record): 15 self._inner = inner 16 17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset() 20 21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value() 24 25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string() 28 29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key() 32 33 def key_string(self) -> str: 34 """The UTF-8 decoded key for this record.""" 35 return self._inner.key_string() 36 37 def timestamp(self) -> int: 38 """Timestamp of this record.""" 39 return self._inner.timestamp()
The individual record for a given stream.
17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset()
The offset from the initial offset for a given stream.
21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value()
Returns the contents of this Record's value
25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string()
The UTF-8 decoded value for this record.
29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key()
Returns the contents of this Record's key, if it exists
42class RecordMetadata: 43 """Metadata of a record send to a topic.""" 44 45 _inner: _RecordMetadata 46 47 def __init__(self, inner: _RecordMetadata): 48 self._inner = inner 49 50 def offset(self) -> int: 51 """Return the offset of the sent record in the topic/partition.""" 52 return self._inner.offset() 53 54 def partition_id(self) -> int: 55 """Return the partition index the record was sent to.""" 56 return self._inner.partition_id()
Metadata of a record send to a topic.
Describes the location of a record stored in a Fluvio partition.
A topic is composed of one or more partitions.
566class TopicProducer: 567 """An interface for producing events to a particular topic. 568 569 A `TopicProducer` allows you to send events to the specific topic it was 570 initialized for. Once you have a `TopicProducer`, you can send events to 571 the topic, choosing which partition each event should be delivered to. 572 """ 573 574 _inner: _TopicProducer 575 576 def __init__(self, inner: _TopicProducer): 577 self._inner = inner 578 579 def send_string(self, buf: str) -> ProduceOutput: 580 """Sends a string to this producer’s topic""" 581 return self.send([], buf.encode("utf-8")) 582 583 async def async_send_string(self, buf: str) -> ProduceOutput: 584 """Sends a string to this producer’s topic""" 585 return await self.async_send([], buf.encode("utf-8")) 586 587 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 588 """ 589 Sends a key/value record to this producer's Topic. 590 591 The partition that the record will be sent to is derived from the Key. 592 """ 593 return ProduceOutput(self._inner.send(key, value)) 594 595 async def async_send( 596 self, key: typing.List[int], value: typing.List[int] 597 ) -> ProduceOutput: 598 """ 599 Async sends a key/value record to this producer's Topic. 600 601 The partition that the record will be sent to is derived from the Key. 602 """ 603 produce_output = await self._inner.async_send(key, value) 604 return ProduceOutput(produce_output) 605 606 def flush(self) -> None: 607 """ 608 Send all the queued records in the producer batches. 609 """ 610 return self._inner.flush() 611 612 async def async_flush(self) -> None: 613 """ 614 Async send all the queued records in the producer batches. 615 """ 616 await self._inner.async_flush() 617 618 def send_all( 619 self, records: typing.List[typing.Tuple[bytes, bytes]] 620 ) -> typing.List[ProduceOutput]: 621 """ 622 Sends a list of key/value records as a batch to this producer's Topic. 623 :param records: The list of records to send 624 """ 625 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 626 return [ 627 ProduceOutput(output_inner) 628 for output_inner in self._inner.send_all(records_inner) 629 ] 630 631 async def async_send_all( 632 self, records: typing.List[typing.Tuple[bytes, bytes]] 633 ) -> typing.List[ProduceOutput]: 634 """ 635 Async sends a list of key/value records as a batch to this producer's Topic. 636 :param records: The list of records to send 637 """ 638 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 639 return [ 640 ProduceOutput(output_inner) 641 for output_inner in await self._inner.async_send_all(records_inner) 642 ]
An interface for producing events to a particular topic.
A TopicProducer
allows you to send events to the specific topic it was
initialized for. Once you have a TopicProducer
, you can send events to
the topic, choosing which partition each event should be delivered to.
579 def send_string(self, buf: str) -> ProduceOutput: 580 """Sends a string to this producer’s topic""" 581 return self.send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
583 async def async_send_string(self, buf: str) -> ProduceOutput: 584 """Sends a string to this producer’s topic""" 585 return await self.async_send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
587 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 588 """ 589 Sends a key/value record to this producer's Topic. 590 591 The partition that the record will be sent to is derived from the Key. 592 """ 593 return ProduceOutput(self._inner.send(key, value))
Sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
595 async def async_send( 596 self, key: typing.List[int], value: typing.List[int] 597 ) -> ProduceOutput: 598 """ 599 Async sends a key/value record to this producer's Topic. 600 601 The partition that the record will be sent to is derived from the Key. 602 """ 603 produce_output = await self._inner.async_send(key, value) 604 return ProduceOutput(produce_output)
Async sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
606 def flush(self) -> None: 607 """ 608 Send all the queued records in the producer batches. 609 """ 610 return self._inner.flush()
Send all the queued records in the producer batches.
612 async def async_flush(self) -> None: 613 """ 614 Async send all the queued records in the producer batches. 615 """ 616 await self._inner.async_flush()
Async send all the queued records in the producer batches.
618 def send_all( 619 self, records: typing.List[typing.Tuple[bytes, bytes]] 620 ) -> typing.List[ProduceOutput]: 621 """ 622 Sends a list of key/value records as a batch to this producer's Topic. 623 :param records: The list of records to send 624 """ 625 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 626 return [ 627 ProduceOutput(output_inner) 628 for output_inner in self._inner.send_all(records_inner) 629 ]
Sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
631 async def async_send_all( 632 self, records: typing.List[typing.Tuple[bytes, bytes]] 633 ) -> typing.List[ProduceOutput]: 634 """ 635 Async sends a list of key/value records as a batch to this producer's Topic. 636 :param records: The list of records to send 637 """ 638 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 639 return [ 640 ProduceOutput(output_inner) 641 for output_inner in await self._inner.async_send_all(records_inner) 642 ]
Async sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
182class ProduceOutput: 183 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 184 185 _inner: _ProduceOutput 186 187 def __init__(self, inner: _ProduceOutput) -> None: 188 self._inner = inner 189 190 def wait(self) -> typing.Optional[RecordMetadata]: 191 """Wait for the record metadata. 192 193 This is a blocking call and may only return a `RecordMetadata` once. 194 Any subsequent call to `wait` will return a `None` value. 195 Errors will be raised as exceptions of type `FluvioError`. 196 """ 197 res = self._inner.wait() 198 if res is None: 199 return None 200 return RecordMetadata(res) 201 202 async def async_wait(self) -> typing.Optional[RecordMetadata]: 203 """Asynchronously wait for the record metadata. 204 205 This may only return a `RecordMetadata` once. 206 Any subsequent call to `wait` will return a `None` value. 207 """ 208 return await self._inner.async_wait()
Returned by of TopicProducer.send
call allowing access to sent record metadata.
190 def wait(self) -> typing.Optional[RecordMetadata]: 191 """Wait for the record metadata. 192 193 This is a blocking call and may only return a `RecordMetadata` once. 194 Any subsequent call to `wait` will return a `None` value. 195 Errors will be raised as exceptions of type `FluvioError`. 196 """ 197 res = self._inner.wait() 198 if res is None: 199 return None 200 return RecordMetadata(res)
Wait for the record metadata.
This is a blocking call and may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
Errors will be raised as exceptions of type FluvioError
.
202 async def async_wait(self) -> typing.Optional[RecordMetadata]: 203 """Asynchronously wait for the record metadata. 204 205 This may only return a `RecordMetadata` once. 206 Any subsequent call to `wait` will return a `None` value. 207 """ 208 return await self._inner.async_wait()
Asynchronously wait for the record metadata.
This may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
224class ConsumerConfig: 225 _inner: _ConsumerConfig 226 227 def __init__(self): 228 self._inner = _ConsumerConfig() 229 230 def disable_continuous(self, val: bool = True): 231 """Disable continuous mode after fetching specified records""" 232 self._inner.disable_continuous(val) 233 234 def smartmodule( 235 self, 236 name: str = None, 237 path: str = None, 238 kind: SmartModuleKind = None, 239 params: typing.Dict[str, str] = None, 240 aggregate: typing.List[bytes] = None, 241 ): 242 """ 243 This is a method for adding a smartmodule to a consumer config either 244 using a `name` of a `SmartModule` or a `path` to a wasm binary. 245 246 Args: 247 248 name: str 249 path: str 250 kind: SmartModuleKind 251 params: Dict[str, str] 252 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 253 254 Raises: 255 "Require either a path or a name for a smartmodule." 256 "Only specify one of path or name not both." 257 258 Returns: 259 260 None 261 """ 262 263 if kind is not None: 264 kind = kind.value 265 266 if path is None and name is None: 267 raise Exception("Require either a path or a name for a smartmodule.") 268 269 if path is not None and name is not None: 270 raise Exception("Only specify one of path or name not both.") 271 272 params = {} if params is None else params 273 param_keys = [x for x in params.keys()] 274 param_vals = [x for x in params.values()] 275 276 self._inner.smartmodule( 277 name, 278 path, 279 kind, 280 param_keys, 281 param_vals, 282 aggregate, 283 # These arguments are for Join stuff but that's not implemented on 284 # the python side yet 285 None, 286 None, 287 None, 288 None, 289 )
230 def disable_continuous(self, val: bool = True): 231 """Disable continuous mode after fetching specified records""" 232 self._inner.disable_continuous(val)
Disable continuous mode after fetching specified records
234 def smartmodule( 235 self, 236 name: str = None, 237 path: str = None, 238 kind: SmartModuleKind = None, 239 params: typing.Dict[str, str] = None, 240 aggregate: typing.List[bytes] = None, 241 ): 242 """ 243 This is a method for adding a smartmodule to a consumer config either 244 using a `name` of a `SmartModule` or a `path` to a wasm binary. 245 246 Args: 247 248 name: str 249 path: str 250 kind: SmartModuleKind 251 params: Dict[str, str] 252 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 253 254 Raises: 255 "Require either a path or a name for a smartmodule." 256 "Only specify one of path or name not both." 257 258 Returns: 259 260 None 261 """ 262 263 if kind is not None: 264 kind = kind.value 265 266 if path is None and name is None: 267 raise Exception("Require either a path or a name for a smartmodule.") 268 269 if path is not None and name is not None: 270 raise Exception("Only specify one of path or name not both.") 271 272 params = {} if params is None else params 273 param_keys = [x for x in params.keys()] 274 param_vals = [x for x in params.values()] 275 276 self._inner.smartmodule( 277 name, 278 path, 279 kind, 280 param_keys, 281 param_vals, 282 aggregate, 283 # These arguments are for Join stuff but that's not implemented on 284 # the python side yet 285 None, 286 None, 287 None, 288 None, 289 )
This is a method for adding a smartmodule to a consumer config either
using a name
of a SmartModule
or a path
to a wasm binary.
Args:
name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."
Returns:
None
312class PartitionConsumer: 313 """ 314 An interface for consuming events from a particular partition 315 316 There are two ways to consume events: by "fetching" events and by 317 "streaming" events. Fetching involves specifying a range of events that you 318 want to consume via their Offset. A fetch is a sort of one-time batch 319 operation: you’ll receive all of the events in your range all at once. When 320 you consume events via Streaming, you specify a starting Offset and receive 321 an object that will continuously yield new events as they arrive. 322 """ 323 324 _inner: _PartitionConsumer 325 326 def __init__(self, inner: _PartitionConsumer): 327 self._inner = inner 328 329 def stream(self, offset: Offset) -> typing.Iterator[Record]: 330 """ 331 Continuously streams events from a particular offset in the consumer’s 332 partition. This returns a `Iterator[Record]` which is an 333 iterator. 334 335 Streaming is one of the two ways to consume events in Fluvio. It is a 336 continuous request for new records arriving in a partition, beginning 337 at a particular offset. You specify the starting point of the stream 338 using an Offset and periodically receive events, either individually or 339 in batches. 340 """ 341 return self._generator(self._inner.stream(offset)) 342 343 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 344 """ 345 Continuously streams events from a particular offset in the consumer’s 346 partition. This returns a `AsyncIterator[Record]` which is an 347 iterator. 348 349 Streaming is one of the two ways to consume events in Fluvio. It is a 350 continuous request for new records arriving in a partition, beginning 351 at a particular offset. You specify the starting point of the stream 352 using an Offset and periodically receive events, either individually or 353 in batches. 354 """ 355 return self._async_generator(await self._inner.async_stream(offset)) 356 357 def stream_with_config( 358 self, offset: Offset, config: ConsumerConfig 359 ) -> typing.Iterator[Record]: 360 """ 361 Continuously streams events from a particular offset with a SmartModule 362 WASM module in the consumer’s partition. This returns a 363 `Iterator[Record]` which is an iterator. 364 365 Streaming is one of the two ways to consume events in Fluvio. It is a 366 continuous request for new records arriving in a partition, beginning 367 at a particular offset. You specify the starting point of the stream 368 using an Offset and periodically receive events, either individually or 369 in batches. 370 371 Args: 372 offset: Offset 373 wasm_module_path: str - The absolute path to the WASM file 374 375 Example: 376 import os 377 378 wmp = os.path.abspath("somefilter.wasm") 379 config = ConsumerConfig() 380 config.smartmodule(path=wmp) 381 for i in consumer.stream_with_config(Offset.beginning(), config): 382 # do something with i 383 384 Returns: 385 `Iterator[Record]` 386 387 """ 388 return self._generator(self._inner.stream_with_config(offset, config._inner)) 389 390 async def async_stream_with_config( 391 self, offset: Offset, config: ConsumerConfig 392 ) -> typing.AsyncIterator[Record]: 393 """ 394 Continuously streams events from a particular offset with a SmartModule 395 WASM module in the consumer’s partition. This returns a 396 `AsyncIterator[Record]` which is an async iterator. 397 398 Streaming is one of the two ways to consume events in Fluvio. It is a 399 continuous request for new records arriving in a partition, beginning 400 at a particular offset. You specify the starting point of the stream 401 using an Offset and periodically receive events, either individually or 402 in batches. 403 404 Args: 405 offset: Offset 406 wasm_module_path: str - The absolute path to the WASM file 407 408 Example: 409 import os 410 411 wmp = os.path.abspath("somefilter.wasm") 412 config = ConsumerConfig() 413 config.smartmodule(path=wmp) 414 `AsyncIterator[Record]` 415 416 """ 417 return self._async_generator( 418 await self._inner.async_stream_with_config(offset, config._inner) 419 ) 420 421 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 422 item = stream.next() 423 while item is not None: 424 yield Record(item) 425 item = stream.next() 426 427 async def _async_generator( 428 self, astream: _AsyncPartitionConsumerStream 429 ) -> typing.AsyncIterator[Record]: 430 item = await astream.async_next() 431 while item is not None: 432 yield Record(item) 433 item = await astream.async_next()
An interface for consuming events from a particular partition
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
329 def stream(self, offset: Offset) -> typing.Iterator[Record]: 330 """ 331 Continuously streams events from a particular offset in the consumer’s 332 partition. This returns a `Iterator[Record]` which is an 333 iterator. 334 335 Streaming is one of the two ways to consume events in Fluvio. It is a 336 continuous request for new records arriving in a partition, beginning 337 at a particular offset. You specify the starting point of the stream 338 using an Offset and periodically receive events, either individually or 339 in batches. 340 """ 341 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
343 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 344 """ 345 Continuously streams events from a particular offset in the consumer’s 346 partition. This returns a `AsyncIterator[Record]` which is an 347 iterator. 348 349 Streaming is one of the two ways to consume events in Fluvio. It is a 350 continuous request for new records arriving in a partition, beginning 351 at a particular offset. You specify the starting point of the stream 352 using an Offset and periodically receive events, either individually or 353 in batches. 354 """ 355 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a AsyncIterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
357 def stream_with_config( 358 self, offset: Offset, config: ConsumerConfig 359 ) -> typing.Iterator[Record]: 360 """ 361 Continuously streams events from a particular offset with a SmartModule 362 WASM module in the consumer’s partition. This returns a 363 `Iterator[Record]` which is an iterator. 364 365 Streaming is one of the two ways to consume events in Fluvio. It is a 366 continuous request for new records arriving in a partition, beginning 367 at a particular offset. You specify the starting point of the stream 368 using an Offset and periodically receive events, either individually or 369 in batches. 370 371 Args: 372 offset: Offset 373 wasm_module_path: str - The absolute path to the WASM file 374 375 Example: 376 import os 377 378 wmp = os.path.abspath("somefilter.wasm") 379 config = ConsumerConfig() 380 config.smartmodule(path=wmp) 381 for i in consumer.stream_with_config(Offset.beginning(), config): 382 # do something with i 383 384 Returns: 385 `Iterator[Record]` 386 387 """ 388 return self._generator(self._inner.stream_with_config(offset, config._inner))
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
390 async def async_stream_with_config( 391 self, offset: Offset, config: ConsumerConfig 392 ) -> typing.AsyncIterator[Record]: 393 """ 394 Continuously streams events from a particular offset with a SmartModule 395 WASM module in the consumer’s partition. This returns a 396 `AsyncIterator[Record]` which is an async iterator. 397 398 Streaming is one of the two ways to consume events in Fluvio. It is a 399 continuous request for new records arriving in a partition, beginning 400 at a particular offset. You specify the starting point of the stream 401 using an Offset and periodically receive events, either individually or 402 in batches. 403 404 Args: 405 offset: Offset 406 wasm_module_path: str - The absolute path to the WASM file 407 408 Example: 409 import os 410 411 wmp = os.path.abspath("somefilter.wasm") 412 config = ConsumerConfig() 413 config.smartmodule(path=wmp) 414 `AsyncIterator[Record]` 415 416 """ 417 return self._async_generator( 418 await self._inner.async_stream_with_config(offset, config._inner) 419 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
436class MultiplePartitionConsumer: 437 """ 438 An interface for consuming events from multiple partitions 439 440 There are two ways to consume events: by "fetching" events and by 441 "streaming" events. Fetching involves specifying a range of events that you 442 want to consume via their Offset. A fetch is a sort of one-time batch 443 operation: you’ll receive all of the events in your range all at once. When 444 you consume events via Streaming, you specify a starting Offset and receive 445 an object that will continuously yield new events as they arrive. 446 """ 447 448 _inner: _MultiplePartitionConsumer 449 450 def __init__(self, inner: _MultiplePartitionConsumer): 451 self._inner = inner 452 453 def stream(self, offset: Offset) -> typing.Iterator[Record]: 454 """ 455 Continuously streams events from a particular offset in the consumer’s 456 partitions. This returns a `Iterator[Record]` which is an 457 iterator. 458 459 Streaming is one of the two ways to consume events in Fluvio. It is a 460 continuous request for new records arriving in a partition, beginning 461 at a particular offset. You specify the starting point of the stream 462 using an Offset and periodically receive events, either individually or 463 in batches. 464 """ 465 return self._generator(self._inner.stream(offset)) 466 467 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 468 """ 469 Continuously streams events from a particular offset in the consumer’s 470 partitions. This returns a `AsyncIterator[Record]` which is an 471 async iterator. 472 473 Streaming is one of the two ways to consume events in Fluvio. It is a 474 continuous request for new records arriving in a partition, beginning 475 at a particular offset. You specify the starting point of the stream 476 using an Offset and periodically receive events, either individually or 477 in batches. 478 """ 479 return self._async_generator(await self._inner.async_stream(offset)) 480 481 def stream_with_config( 482 self, offset: Offset, config: ConsumerConfig 483 ) -> typing.Iterator[Record]: 484 """ 485 Continuously streams events from a particular offset with a SmartModule 486 WASM module in the consumer’s partitions. This returns a 487 `Iterator[Record]` which is an iterator. 488 489 Streaming is one of the two ways to consume events in Fluvio. It is a 490 continuous request for new records arriving in a partition, beginning 491 at a particular offset. You specify the starting point of the stream 492 using an Offset and periodically receive events, either individually or 493 in batches. 494 495 Args: 496 offset: Offset 497 wasm_module_path: str - The absolute path to the WASM file 498 499 Example: 500 import os 501 502 wmp = os.path.abspath("somefilter.wasm") 503 config = ConsumerConfig() 504 config.smartmodule(path=wmp) 505 for i in consumer.stream_with_config(Offset.beginning(), config): 506 # do something with i 507 508 Returns: 509 `Iterator[Record]` 510 511 """ 512 return self._generator( 513 self._inner.stream_with_config(offset._inner, config._inner) 514 ) 515 516 async def async_stream_with_config( 517 self, offset: Offset, config: ConsumerConfig 518 ) -> typing.AsyncIterator[Record]: 519 """ 520 Continuously streams events from a particular offset with a SmartModule 521 WASM module in the consumer’s partitions. This returns a 522 `AsyncIterator[Record]` which is an async iterator. 523 524 Streaming is one of the two ways to consume events in Fluvio. It is a 525 continuous request for new records arriving in a partition, beginning 526 at a particular offset. You specify the starting point of the stream 527 using an Offset and periodically receive events, either individually or 528 in batches. 529 530 Args: 531 offset: Offset 532 wasm_module_path: str - The absolute path to the WASM file 533 534 Example: 535 import os 536 537 wmp = os.path.abspath("somefilter.wasm") 538 config = ConsumerConfig() 539 config.smartmodule(path=wmp) 540 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 541 # do something with i 542 543 Returns: 544 `AsyncIterator[Record]` 545 546 """ 547 return self._async_generator( 548 await self._inner.async_stream_with_config(offset, config._inner) 549 ) 550 551 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 552 item = stream.next() 553 while item is not None: 554 yield Record(item) 555 item = stream.next() 556 557 async def _async_generator( 558 self, astream: _AsyncPartitionConsumerStream 559 ) -> typing.AsyncIterator[Record]: 560 item = await astream.async_next() 561 while item is not None: 562 yield Record(item) 563 item = await astream.async_next()
An interface for consuming events from multiple partitions
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
453 def stream(self, offset: Offset) -> typing.Iterator[Record]: 454 """ 455 Continuously streams events from a particular offset in the consumer’s 456 partitions. This returns a `Iterator[Record]` which is an 457 iterator. 458 459 Streaming is one of the two ways to consume events in Fluvio. It is a 460 continuous request for new records arriving in a partition, beginning 461 at a particular offset. You specify the starting point of the stream 462 using an Offset and periodically receive events, either individually or 463 in batches. 464 """ 465 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
467 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 468 """ 469 Continuously streams events from a particular offset in the consumer’s 470 partitions. This returns a `AsyncIterator[Record]` which is an 471 async iterator. 472 473 Streaming is one of the two ways to consume events in Fluvio. It is a 474 continuous request for new records arriving in a partition, beginning 475 at a particular offset. You specify the starting point of the stream 476 using an Offset and periodically receive events, either individually or 477 in batches. 478 """ 479 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a AsyncIterator[Record]
which is an
async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
481 def stream_with_config( 482 self, offset: Offset, config: ConsumerConfig 483 ) -> typing.Iterator[Record]: 484 """ 485 Continuously streams events from a particular offset with a SmartModule 486 WASM module in the consumer’s partitions. This returns a 487 `Iterator[Record]` which is an iterator. 488 489 Streaming is one of the two ways to consume events in Fluvio. It is a 490 continuous request for new records arriving in a partition, beginning 491 at a particular offset. You specify the starting point of the stream 492 using an Offset and periodically receive events, either individually or 493 in batches. 494 495 Args: 496 offset: Offset 497 wasm_module_path: str - The absolute path to the WASM file 498 499 Example: 500 import os 501 502 wmp = os.path.abspath("somefilter.wasm") 503 config = ConsumerConfig() 504 config.smartmodule(path=wmp) 505 for i in consumer.stream_with_config(Offset.beginning(), config): 506 # do something with i 507 508 Returns: 509 `Iterator[Record]` 510 511 """ 512 return self._generator( 513 self._inner.stream_with_config(offset._inner, config._inner) 514 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
516 async def async_stream_with_config( 517 self, offset: Offset, config: ConsumerConfig 518 ) -> typing.AsyncIterator[Record]: 519 """ 520 Continuously streams events from a particular offset with a SmartModule 521 WASM module in the consumer’s partitions. This returns a 522 `AsyncIterator[Record]` which is an async iterator. 523 524 Streaming is one of the two ways to consume events in Fluvio. It is a 525 continuous request for new records arriving in a partition, beginning 526 at a particular offset. You specify the starting point of the stream 527 using an Offset and periodically receive events, either individually or 528 in batches. 529 530 Args: 531 offset: Offset 532 wasm_module_path: str - The absolute path to the WASM file 533 534 Example: 535 import os 536 537 wmp = os.path.abspath("somefilter.wasm") 538 config = ConsumerConfig() 539 config.smartmodule(path=wmp) 540 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 541 # do something with i 542 543 Returns: 544 `AsyncIterator[Record]` 545 546 """ 547 return self._async_generator( 548 await self._inner.async_stream_with_config(offset, config._inner) 549 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
AsyncIterator[Record]
664class ConsumerOffset: 665 """Consumer offset""" 666 667 _inner: _ConsumerOffset 668 669 def __init__(self, inner: _ConsumerOffset): 670 self._inner = inner 671 672 def consumer_id(self) -> str: 673 return self._inner.consumer_id() 674 675 def topic(self) -> str: 676 return self._inner.topic() 677 678 def partition(self) -> int: 679 return self._inner.partition() 680 681 def offset(self) -> int: 682 return self._inner.offset() 683 684 def modified_time(self) -> int: 685 return self._inner.modified_time()
Consumer offset
29class CommonCreateRequest: 30 _inner: _CommonCreateRequest 31 32 def __init__(self, inner: _CommonCreateRequest): 33 self._inner = inner 34 35 @classmethod 36 def new(cls, name: str, dry_run: bool, timeout: int): 37 return cls(_CommonCreateRequest.new(name, dry_run, timeout))
84class SmartModuleSpec: 85 _inner: _SmartModuleSpec 86 87 def __init__(self, inner: _SmartModuleSpec): 88 self._inner = inner 89 90 @classmethod 91 def new(cls, path: str): 92 f = open(path, mode="rb") 93 data = f.read() 94 f.close() 95 return cls(_SmartModuleSpec.with_binary(data))
18class PartitionMap: 19 _inner: _PartitionMap 20 21 def __init__(self, inner: _PartitionMap): 22 self._inner = inner 23 24 @classmethod 25 def new(cls, partition: int, replicas: typing.List[int]): 26 return cls(_PartitionMap.new(partition, replicas))
50class MessageMetadataTopicSpec: 51 _inner: _MessageMetadataTopicSpec 52 53 def __init__(self, inner: _MessageMetadataTopicSpec): 54 self._inner = inner 55 56 def is_update(self) -> bool: 57 return self._inner.is_update() 58 59 def is_delete(self) -> bool: 60 return self._inner.is_delete() 61 62 def metadata_topic_spec(self) -> MetadataTopicSpec: 63 return MetadataTopicSpec(self._inner.metadata_topic_spec())
142class MetadataPartitionSpec: 143 _inner: _MetadataPartitionSpec 144 145 def __init__(self, inner: _MetadataPartitionSpec): 146 self._inner = inner 147 148 def name(self) -> str: 149 return self._inner.name()
40class MetadataTopicSpec: 41 _inner: _MetadataTopicSpec 42 43 def __init__(self, inner: _MetadataTopicSpec): 44 self._inner = inner 45 46 def name(self) -> str: 47 return self._inner.name()
66class MetaUpdateTopicSpec: 67 _inner: _MetaUpdateTopicSpec 68 69 def __init__(self, inner: _MetaUpdateTopicSpec): 70 self._inner = inner 71 72 def all(self) -> typing.List[MetadataTopicSpec]: 73 inners = self._inner.all() 74 return [MetadataTopicSpec(i) for i in inners] 75 76 def changes(self) -> typing.List[MessageMetadataTopicSpec]: 77 inners = self._inner.changes() 78 return [MessageMetadataTopicSpec(i) for i in inners] 79 80 def epoch(self) -> int: 81 return self._inner.epoch()
645class PartitionSelectionStrategy: 646 """Stragegy to select partitions""" 647 648 _inner: _PartitionSelectionStrategy 649 650 def __init__(self, inner: _FluvioConfig): 651 self._inner = inner 652 653 @classmethod 654 def with_all(cls, topic: str): 655 """select all partitions of one topic""" 656 return cls(_PartitionSelectionStrategy.with_all(topic)) 657 658 @classmethod 659 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 660 """select multiple partitions of multiple topics""" 661 return cls(_PartitionSelectionStrategy.with_multiple(topic))
Stragegy to select partitions
653 @classmethod 654 def with_all(cls, topic: str): 655 """select all partitions of one topic""" 656 return cls(_PartitionSelectionStrategy.with_all(topic))
select all partitions of one topic
658 @classmethod 659 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 660 """select multiple partitions of multiple topics""" 661 return cls(_PartitionSelectionStrategy.with_multiple(topic))
select multiple partitions of multiple topics
211class SmartModuleKind(Enum): 212 """ 213 Use of this is to explicitly set the kind of a smartmodule. Not required 214 but needed for legacy SmartModules. 215 """ 216 217 Filter = _SmartModuleKind.Filter 218 Map = _SmartModuleKind.Map 219 ArrayMap = _SmartModuleKind.ArrayMap 220 FilterMap = _SmartModuleKind.FilterMap 221 Aggregate = _SmartModuleKind.Aggregate
Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.