fluvio
The __fluvio__ python module provides an extension for working with the Fluvio streaming platform.
This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.
Creating a topic with default settings is as simple as:
fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")
Or just create a topic with custom settings:
import fluvio
fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)
Producing data to a topic in a Fluvio cluster is as simple as:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
producer = fluvio.topic_producer(topic)
for i in range(10):
producer.send_string("Hello %s " % i)
Consuming is also simple:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
Producing with a custom configuration:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = (
TopicProducerConfigBuilder()
.batch_size(32768)
.linger(100)
.compression(Compression.Gzip)
.max_request_size(33554432)
.timeout(600000)
.isolation(Isolation.ReadCommitted)
.delivery_semantic(DeliverySemantic.AtLeastOnce)
)
producer = fluvio.topic_producer_with_config(self.topic, config)
for i in range(10):
producer.send_string("Hello %s " % i)
Also you can consume usign offset management:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
stream.offset_commit()
stream.offset_flush()
For more examples see the integration tests in the fluvio-python repository.
1""" 2The __fluvio__ python module provides an extension for working with the Fluvio 3streaming platform. 4 5This module builds on top of the Fluvio Client Rust Crate and provides a 6pythonic access to the API. 7 8 9Creating a topic with default settings is as simple as: 10 11```python 12fluvio_admin = FluvioAdmin.connect() 13fluvio_admin.create_topic("a_topic") 14``` 15 16Or just create a topic with custom settings: 17 18```python 19import fluvio 20 21fluvio_admin = FluvioAdmin.connect() 22topic_spec = ( 23 TopicSpec.create() 24 .with_retention_time("1h") 25 .with_segment_size("10M") 26 .build() 27) 28fluvio_admin.create_topic("a_topic", topic_spec) 29``` 30 31Producing data to a topic in a Fluvio cluster is as simple as: 32 33```python 34import fluvio 35 36fluvio = Fluvio.connect() 37 38topic = "a_topic" 39producer = fluvio.topic_producer(topic) 40 41for i in range(10): 42 producer.send_string("Hello %s " % i) 43``` 44 45Consuming is also simple: 46 47```python 48import fluvio 49 50fluvio = Fluvio.connect() 51 52topic = "a_topic" 53builder = ConsumerConfigExtBuilder(topic) 54config = builder.build() 55stream = fluvio.consumer_with_config(config) 56 57num_items = 2 58records = [bytearray(next(stream).value()).decode() for _ in range(num_items)] 59``` 60 61Producing with a custom configuration: 62 63```python 64import fluvio 65 66fluvio = Fluvio.connect() 67 68topic = "a_topic" 69builder = ( 70 TopicProducerConfigBuilder() 71 .batch_size(32768) 72 .linger(100) 73 .compression(Compression.Gzip) 74 .max_request_size(33554432) 75 .timeout(600000) 76 .isolation(Isolation.ReadCommitted) 77 .delivery_semantic(DeliverySemantic.AtLeastOnce) 78) 79producer = fluvio.topic_producer_with_config(self.topic, config) 80 81for i in range(10): 82 producer.send_string("Hello %s " % i) 83``` 84 85Also you can consume usign offset management: 86 87```python 88import fluvio 89 90fluvio = Fluvio.connect() 91 92topic = "a_topic" 93builder = ConsumerConfigExtBuilder(topic) 94builder.offset_start(Offset.beginning()) 95builder.offset_strategy(OffsetManagementStrategy.MANUAL) 96builder.offset_consumer("a-consumer") 97config = builder.build() 98stream = fluvio.consumer_with_config(config) 99 100num_items = 2 101records = [bytearray(next(stream).value()).decode() for _ in range(num_items)] 102 103stream.offset_commit() 104stream.offset_flush() 105``` 106 107For more examples see the integration tests in the fluvio-python repository. 108 109[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py) 110[test_consumer.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py) 111 112""" 113 114import typing 115from enum import Enum 116 117from ._fluvio_python import ( 118 Fluvio as _Fluvio, 119 FluvioConfig as _FluvioConfig, 120 Offset, 121 FluvioAdmin as _FluvioAdmin, 122 # consumer types 123 ConsumerConfig as _ConsumerConfig, 124 ConsumerConfigExt, 125 ConsumerConfigExtBuilder, 126 PartitionConsumer as _PartitionConsumer, 127 MultiplePartitionConsumer as _MultiplePartitionConsumer, 128 PartitionSelectionStrategy as _PartitionSelectionStrategy, 129 PartitionConsumerStream as _PartitionConsumerStream, 130 AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream, 131 OffsetManagementStrategy, 132 ConsumerOffset as _ConsumerOffset, 133 # producer types 134 TopicProducer as _TopicProducer, 135 TopicProducerConfig, 136 TopicProducerConfigBuilder, 137 Compression, 138 DeliverySemantic, 139 Isolation, 140 ProduceOutput as _ProduceOutput, 141 ProducerBatchRecord as _ProducerBatchRecord, 142 # admin and misc types 143 SmartModuleKind as _SmartModuleKind, 144 TopicSpec as _TopicSpec, 145 WatchTopicStream as _WatchTopicStream, 146 WatchSmartModuleStream as _WatchSmartModuleStream, 147) 148 149from ._fluvio_python import Error as FluviorError # noqa: F401 150 151from .topic import TopicSpec, CompressionType, TopicMode 152from .record import Record, RecordMetadata 153from .specs import ( 154 # support types 155 CommonCreateRequest, 156 PartitionMap, 157 # specs 158 SmartModuleSpec, 159 MessageMetadataTopicSpec, 160 MetadataPartitionSpec, 161 MetadataSmartModuleSpec, 162 MetadataTopicSpec, 163 MetaUpdateSmartModuleSpec, 164 MetaUpdateTopicSpec, 165) 166 167# this structures the module a bit and allows pydoc to generate better docs 168# with better ordering of types and functions 169# inclusion in __all__, will pull the PyO3 rust inline docs into 170# the pdoc generated documentation 171__all__ = [ 172 # top level types 173 "Fluvio", 174 "FluvioConfig", 175 "FluvioAdmin", 176 # topic 177 "TopicSpec", 178 "CompressionType", 179 "TopicMode", 180 # record 181 "Record", 182 "RecordMetadata", 183 "Offset", 184 # producer 185 "TopicProducer", 186 "TopicProducerConfig", 187 "TopicProducerConfigBuilder", 188 "Compression", 189 "DeliverySemantic", 190 "Isolation", 191 "ProduceOutput", 192 # consumer 193 "ConsumerConfigExt", 194 "ConsumerConfigExtBuilder", 195 "ConsumerConfig", 196 "PartitionConsumer", 197 "MultiplePartitionConsumer", 198 "OffsetManagementStrategy", 199 "ConsumerOffset", 200 # specs 201 "CommonCreateRequest", 202 "SmartModuleSpec", 203 "TopicSpec", 204 "PartitionMap", 205 "MessageMetadataTopicSpec", 206 "MetadataPartitionSpec", 207 "MetadataTopicSpec", 208 "MetaUpdateTopicSpec", 209 # Misc 210 "PartitionSelectionStrategy", 211 "SmartModuleKind", 212] 213 214 215class ProduceOutput: 216 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 217 218 _inner: _ProduceOutput 219 220 def __init__(self, inner: _ProduceOutput) -> None: 221 self._inner = inner 222 223 def wait(self) -> typing.Optional[RecordMetadata]: 224 """Wait for the record metadata. 225 226 This is a blocking call and may only return a `RecordMetadata` once. 227 Any subsequent call to `wait` will return a `None` value. 228 Errors will be raised as exceptions of type `FluvioError`. 229 """ 230 res = self._inner.wait() 231 if res is None: 232 return None 233 return RecordMetadata(res) 234 235 async def async_wait(self) -> typing.Optional[RecordMetadata]: 236 """Asynchronously wait for the record metadata. 237 238 This may only return a `RecordMetadata` once. 239 Any subsequent call to `wait` will return a `None` value. 240 """ 241 return await self._inner.async_wait() 242 243 244class SmartModuleKind(Enum): 245 """ 246 Use of this is to explicitly set the kind of a smartmodule. Not required 247 but needed for legacy SmartModules. 248 """ 249 250 Filter = _SmartModuleKind.Filter 251 Map = _SmartModuleKind.Map 252 ArrayMap = _SmartModuleKind.ArrayMap 253 FilterMap = _SmartModuleKind.FilterMap 254 Aggregate = _SmartModuleKind.Aggregate 255 256 257class ConsumerConfig: 258 _inner: _ConsumerConfig 259 260 def __init__(self): 261 self._inner = _ConsumerConfig() 262 263 def disable_continuous(self, val: bool = True): 264 """Disable continuous mode after fetching specified records""" 265 self._inner.disable_continuous(val) 266 267 def smartmodule( 268 self, 269 name: str = None, 270 path: str = None, 271 kind: SmartModuleKind = None, 272 params: typing.Dict[str, str] = None, 273 aggregate: typing.List[bytes] = None, 274 ): 275 """ 276 This is a method for adding a smartmodule to a consumer config either 277 using a `name` of a `SmartModule` or a `path` to a wasm binary. 278 279 Args: 280 281 name: str 282 path: str 283 kind: SmartModuleKind 284 params: Dict[str, str] 285 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 286 287 Raises: 288 "Require either a path or a name for a smartmodule." 289 "Only specify one of path or name not both." 290 291 Returns: 292 293 None 294 """ 295 296 if kind is not None: 297 kind = kind.value 298 299 if path is None and name is None: 300 raise Exception("Require either a path or a name for a smartmodule.") 301 302 if path is not None and name is not None: 303 raise Exception("Only specify one of path or name not both.") 304 305 params = {} if params is None else params 306 param_keys = [x for x in params.keys()] 307 param_vals = [x for x in params.values()] 308 309 self._inner.smartmodule( 310 name, 311 path, 312 kind, 313 param_keys, 314 param_vals, 315 aggregate, 316 # These arguments are for Join stuff but that's not implemented on 317 # the python side yet 318 None, 319 None, 320 None, 321 None, 322 ) 323 324 325class ConsumerIterator: 326 def __init__(self, stream: _PartitionConsumerStream): 327 self.stream = stream 328 329 def __iter__(self): 330 return self 331 332 def __next__(self): 333 item = self.stream.next() 334 if item is None: 335 raise StopIteration 336 return Record(item) 337 338 def offset_commit(self): 339 self.stream.offset_commit() 340 341 def offset_flush(self): 342 self.stream.offset_flush() 343 344 345class PartitionConsumer: 346 """ 347 An interface for consuming events from a particular partition 348 349 There are two ways to consume events: by "fetching" events and by 350 "streaming" events. Fetching involves specifying a range of events that you 351 want to consume via their Offset. A fetch is a sort of one-time batch 352 operation: you’ll receive all of the events in your range all at once. When 353 you consume events via Streaming, you specify a starting Offset and receive 354 an object that will continuously yield new events as they arrive. 355 """ 356 357 _inner: _PartitionConsumer 358 359 def __init__(self, inner: _PartitionConsumer): 360 self._inner = inner 361 362 def stream(self, offset: Offset) -> typing.Iterator[Record]: 363 """ 364 Continuously streams events from a particular offset in the consumer’s 365 partition. This returns a `Iterator[Record]` which is an 366 iterator. 367 368 Streaming is one of the two ways to consume events in Fluvio. It is a 369 continuous request for new records arriving in a partition, beginning 370 at a particular offset. You specify the starting point of the stream 371 using an Offset and periodically receive events, either individually or 372 in batches. 373 """ 374 return self._generator(self._inner.stream(offset)) 375 376 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 377 """ 378 Continuously streams events from a particular offset in the consumer’s 379 partition. This returns a `AsyncIterator[Record]` which is an 380 iterator. 381 382 Streaming is one of the two ways to consume events in Fluvio. It is a 383 continuous request for new records arriving in a partition, beginning 384 at a particular offset. You specify the starting point of the stream 385 using an Offset and periodically receive events, either individually or 386 in batches. 387 """ 388 return self._async_generator(await self._inner.async_stream(offset)) 389 390 def stream_with_config( 391 self, offset: Offset, config: ConsumerConfig 392 ) -> typing.Iterator[Record]: 393 """ 394 Continuously streams events from a particular offset with a SmartModule 395 WASM module in the consumer’s partition. This returns a 396 `Iterator[Record]` which is an iterator. 397 398 Streaming is one of the two ways to consume events in Fluvio. It is a 399 continuous request for new records arriving in a partition, beginning 400 at a particular offset. You specify the starting point of the stream 401 using an Offset and periodically receive events, either individually or 402 in batches. 403 404 Args: 405 offset: Offset 406 wasm_module_path: str - The absolute path to the WASM file 407 408 Example: 409 import os 410 411 wmp = os.path.abspath("somefilter.wasm") 412 config = ConsumerConfig() 413 config.smartmodule(path=wmp) 414 for i in consumer.stream_with_config(Offset.beginning(), config): 415 # do something with i 416 417 Returns: 418 `Iterator[Record]` 419 420 """ 421 return self._generator(self._inner.stream_with_config(offset, config._inner)) 422 423 async def async_stream_with_config( 424 self, offset: Offset, config: ConsumerConfig 425 ) -> typing.AsyncIterator[Record]: 426 """ 427 Continuously streams events from a particular offset with a SmartModule 428 WASM module in the consumer’s partition. This returns a 429 `AsyncIterator[Record]` which is an async iterator. 430 431 Streaming is one of the two ways to consume events in Fluvio. It is a 432 continuous request for new records arriving in a partition, beginning 433 at a particular offset. You specify the starting point of the stream 434 using an Offset and periodically receive events, either individually or 435 in batches. 436 437 Args: 438 offset: Offset 439 wasm_module_path: str - The absolute path to the WASM file 440 441 Example: 442 import os 443 444 wmp = os.path.abspath("somefilter.wasm") 445 config = ConsumerConfig() 446 config.smartmodule(path=wmp) 447 `AsyncIterator[Record]` 448 449 """ 450 return self._async_generator( 451 await self._inner.async_stream_with_config(offset, config._inner) 452 ) 453 454 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 455 item = stream.next() 456 while item is not None: 457 yield Record(item) 458 item = stream.next() 459 460 async def _async_generator( 461 self, astream: _AsyncPartitionConsumerStream 462 ) -> typing.AsyncIterator[Record]: 463 item = await astream.async_next() 464 while item is not None: 465 yield Record(item) 466 item = await astream.async_next() 467 468 469class MultiplePartitionConsumer: 470 """ 471 An interface for consuming events from multiple partitions 472 473 There are two ways to consume events: by "fetching" events and by 474 "streaming" events. Fetching involves specifying a range of events that you 475 want to consume via their Offset. A fetch is a sort of one-time batch 476 operation: you’ll receive all of the events in your range all at once. When 477 you consume events via Streaming, you specify a starting Offset and receive 478 an object that will continuously yield new events as they arrive. 479 """ 480 481 _inner: _MultiplePartitionConsumer 482 483 def __init__(self, inner: _MultiplePartitionConsumer): 484 self._inner = inner 485 486 def stream(self, offset: Offset) -> typing.Iterator[Record]: 487 """ 488 Continuously streams events from a particular offset in the consumer’s 489 partitions. This returns a `Iterator[Record]` which is an 490 iterator. 491 492 Streaming is one of the two ways to consume events in Fluvio. It is a 493 continuous request for new records arriving in a partition, beginning 494 at a particular offset. You specify the starting point of the stream 495 using an Offset and periodically receive events, either individually or 496 in batches. 497 """ 498 return self._generator(self._inner.stream(offset)) 499 500 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 501 """ 502 Continuously streams events from a particular offset in the consumer’s 503 partitions. This returns a `AsyncIterator[Record]` which is an 504 async iterator. 505 506 Streaming is one of the two ways to consume events in Fluvio. It is a 507 continuous request for new records arriving in a partition, beginning 508 at a particular offset. You specify the starting point of the stream 509 using an Offset and periodically receive events, either individually or 510 in batches. 511 """ 512 return self._async_generator(await self._inner.async_stream(offset)) 513 514 def stream_with_config( 515 self, offset: Offset, config: ConsumerConfig 516 ) -> typing.Iterator[Record]: 517 """ 518 Continuously streams events from a particular offset with a SmartModule 519 WASM module in the consumer’s partitions. This returns a 520 `Iterator[Record]` which is an iterator. 521 522 Streaming is one of the two ways to consume events in Fluvio. It is a 523 continuous request for new records arriving in a partition, beginning 524 at a particular offset. You specify the starting point of the stream 525 using an Offset and periodically receive events, either individually or 526 in batches. 527 528 Args: 529 offset: Offset 530 wasm_module_path: str - The absolute path to the WASM file 531 532 Example: 533 import os 534 535 wmp = os.path.abspath("somefilter.wasm") 536 config = ConsumerConfig() 537 config.smartmodule(path=wmp) 538 for i in consumer.stream_with_config(Offset.beginning(), config): 539 # do something with i 540 541 Returns: 542 `Iterator[Record]` 543 544 """ 545 return self._generator( 546 self._inner.stream_with_config(offset._inner, config._inner) 547 ) 548 549 async def async_stream_with_config( 550 self, offset: Offset, config: ConsumerConfig 551 ) -> typing.AsyncIterator[Record]: 552 """ 553 Continuously streams events from a particular offset with a SmartModule 554 WASM module in the consumer’s partitions. This returns a 555 `AsyncIterator[Record]` which is an async iterator. 556 557 Streaming is one of the two ways to consume events in Fluvio. It is a 558 continuous request for new records arriving in a partition, beginning 559 at a particular offset. You specify the starting point of the stream 560 using an Offset and periodically receive events, either individually or 561 in batches. 562 563 Args: 564 offset: Offset 565 wasm_module_path: str - The absolute path to the WASM file 566 567 Example: 568 import os 569 570 wmp = os.path.abspath("somefilter.wasm") 571 config = ConsumerConfig() 572 config.smartmodule(path=wmp) 573 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 574 # do something with i 575 576 Returns: 577 `AsyncIterator[Record]` 578 579 """ 580 return self._async_generator( 581 await self._inner.async_stream_with_config(offset, config._inner) 582 ) 583 584 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 585 item = stream.next() 586 while item is not None: 587 yield Record(item) 588 item = stream.next() 589 590 async def _async_generator( 591 self, astream: _AsyncPartitionConsumerStream 592 ) -> typing.AsyncIterator[Record]: 593 item = await astream.async_next() 594 while item is not None: 595 yield Record(item) 596 item = await astream.async_next() 597 598 599class TopicProducer: 600 """An interface for producing events to a particular topic. 601 602 A `TopicProducer` allows you to send events to the specific topic it was 603 initialized for. Once you have a `TopicProducer`, you can send events to 604 the topic, choosing which partition each event should be delivered to. 605 """ 606 607 _inner: _TopicProducer 608 609 def __init__(self, inner: _TopicProducer): 610 self._inner = inner 611 612 def send_string(self, buf: str) -> ProduceOutput: 613 """Sends a string to this producer’s topic""" 614 return self.send([], buf.encode("utf-8")) 615 616 async def async_send_string(self, buf: str) -> ProduceOutput: 617 """Sends a string to this producer’s topic""" 618 return await self.async_send([], buf.encode("utf-8")) 619 620 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 621 """ 622 Sends a key/value record to this producer's Topic. 623 624 The partition that the record will be sent to is derived from the Key. 625 """ 626 return ProduceOutput(self._inner.send(key, value)) 627 628 async def async_send( 629 self, key: typing.List[int], value: typing.List[int] 630 ) -> ProduceOutput: 631 """ 632 Async sends a key/value record to this producer's Topic. 633 634 The partition that the record will be sent to is derived from the Key. 635 """ 636 produce_output = await self._inner.async_send(key, value) 637 return ProduceOutput(produce_output) 638 639 def flush(self) -> None: 640 """ 641 Send all the queued records in the producer batches. 642 """ 643 return self._inner.flush() 644 645 async def async_flush(self) -> None: 646 """ 647 Async send all the queued records in the producer batches. 648 """ 649 await self._inner.async_flush() 650 651 def send_all( 652 self, records: typing.List[typing.Tuple[bytes, bytes]] 653 ) -> typing.List[ProduceOutput]: 654 """ 655 Sends a list of key/value records as a batch to this producer's Topic. 656 :param records: The list of records to send 657 """ 658 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 659 return [ 660 ProduceOutput(output_inner) 661 for output_inner in self._inner.send_all(records_inner) 662 ] 663 664 async def async_send_all( 665 self, records: typing.List[typing.Tuple[bytes, bytes]] 666 ) -> typing.List[ProduceOutput]: 667 """ 668 Async sends a list of key/value records as a batch to this producer's Topic. 669 :param records: The list of records to send 670 """ 671 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 672 return [ 673 ProduceOutput(output_inner) 674 for output_inner in await self._inner.async_send_all(records_inner) 675 ] 676 677 678class PartitionSelectionStrategy: 679 """Stragegy to select partitions""" 680 681 _inner: _PartitionSelectionStrategy 682 683 def __init__(self, inner: _FluvioConfig): 684 self._inner = inner 685 686 @classmethod 687 def with_all(cls, topic: str): 688 """select all partitions of one topic""" 689 return cls(_PartitionSelectionStrategy.with_all(topic)) 690 691 @classmethod 692 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 693 """select multiple partitions of multiple topics""" 694 return cls(_PartitionSelectionStrategy.with_multiple(topic)) 695 696 697class ConsumerOffset: 698 """Consumer offset""" 699 700 _inner: _ConsumerOffset 701 702 def __init__(self, inner: _ConsumerOffset): 703 self._inner = inner 704 705 def consumer_id(self) -> str: 706 return self._inner.consumer_id() 707 708 def topic(self) -> str: 709 return self._inner.topic() 710 711 def partition(self) -> int: 712 return self._inner.partition() 713 714 def offset(self) -> int: 715 return self._inner.offset() 716 717 def modified_time(self) -> int: 718 return self._inner.modified_time() 719 720 721class FluvioConfig: 722 """Configuration for Fluvio client""" 723 724 _inner: _FluvioConfig 725 726 def __init__(self, inner: _FluvioConfig): 727 self._inner = inner 728 729 @classmethod 730 def load(cls): 731 """get current cluster config from default profile""" 732 return cls(_FluvioConfig.load()) 733 734 @classmethod 735 def new(cls, addr: str): 736 """Create a new cluster configuration with no TLS.""" 737 return cls(_FluvioConfig.new(addr)) 738 739 def set_endpoint(self, endpoint: str): 740 """set endpoint""" 741 self._inner.set_endpoint(endpoint) 742 743 def set_use_spu_local_address(self, val: bool): 744 """set wheather to use spu local address""" 745 self._inner.set_use_spu_local_address(val) 746 747 def disable_tls(self): 748 """disable tls for this config""" 749 self._inner.disable_tls() 750 751 def set_anonymous_tls(self): 752 """set the config to use anonymous tls""" 753 self._inner.set_anonymous_tls() 754 755 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 756 """specify inline tls parameters""" 757 self._inner.set_inline_tls(domain, key, cert, ca_cert) 758 759 def set_tls_file_paths( 760 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 761 ): 762 """specify paths to tls files""" 763 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 764 765 def set_client_id(self, client_id: str): 766 """set client id""" 767 self._inner.set_client_id(client_id) 768 769 def unset_client_id(self): 770 """remove the configured client id from config""" 771 self._inner.unset_client_id() 772 773 774class Fluvio: 775 """An interface for interacting with Fluvio streaming.""" 776 777 _inner: _Fluvio 778 779 def __init__(self, inner: _Fluvio): 780 self._inner = inner 781 782 @classmethod 783 def connect(cls): 784 """Tries to create a new Fluvio client using the current profile from 785 `~/.fluvio/config` 786 """ 787 return cls(_Fluvio.connect()) 788 789 @classmethod 790 def connect_with_config(cls, config: FluvioConfig): 791 """Creates a new Fluvio client using the given configuration""" 792 return cls(_Fluvio.connect_with_config(config._inner)) 793 794 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 795 """Creates consumer with settings defined in config 796 797 This is the recommended way to create a consume records. 798 """ 799 return ConsumerIterator(self._inner.consumer_with_config(config)) 800 801 def topic_producer_with_config(self, topic: str, config: TopicProducerConfig): 802 """Creates a new `TopicProducer` for the given topic name with config""" 803 return TopicProducer(self._inner.topic_producer_with_config(topic, config)) 804 805 def topic_producer(self, topic: str) -> TopicProducer: 806 """ 807 Creates a new `TopicProducer` for the given topic name. 808 809 Currently, producers are scoped to a specific Fluvio topic. That means 810 when you send events via a producer, you must specify which partition 811 each event should go to. 812 """ 813 return TopicProducer(self._inner.topic_producer(topic)) 814 815 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 816 """Creates a new `PartitionConsumer` for the given topic and partition 817 818 Currently, consumers are scoped to both a specific Fluvio topic and to 819 a particular partition within that topic. That means that if you have a 820 topic with multiple partitions, then in order to receive all of the 821 events in all of the partitions, you will need to create one consumer 822 per partition. 823 """ 824 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 825 826 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 827 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 828 829 Currently, consumers are scoped to both a specific Fluvio topic and to 830 its all partitions within that topic. 831 """ 832 strategy = PartitionSelectionStrategy.with_all(topic) 833 return MultiplePartitionConsumer( 834 self._inner.multi_partition_consumer(strategy._inner) 835 ) 836 837 def multi_topic_partition_consumer( 838 self, selections: typing.List[typing.Tuple[str, int]] 839 ) -> MultiplePartitionConsumer: 840 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 841 842 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 843 """ 844 strategy = PartitionSelectionStrategy.with_multiple(selections) 845 return MultiplePartitionConsumer( 846 self._inner.multi_partition_consumer(strategy._inner) 847 ) 848 849 def consumer_offsets(self) -> typing.List[ConsumerOffset]: 850 """Fetch the current offsets of the consumer""" 851 return self._inner.consumer_offsets() 852 853 def delete_consumer_offset(self, consumer: str, topic: str, partition: int): 854 """Delete the consumer offset""" 855 return self._inner.delete_consumer_offset(consumer, topic, partition) 856 857 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 858 item = stream.next() 859 while item is not None: 860 yield Record(item) 861 item = stream.next() 862 863 864class FluvioAdmin: 865 _inner: _FluvioAdmin 866 867 def __init__(self, inner: _FluvioAdmin): 868 self._inner = inner 869 870 def connect(): 871 return FluvioAdmin(_FluvioAdmin.connect()) 872 873 def connect_with_config(config: FluvioConfig): 874 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 875 876 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 877 partitions = 1 878 replication = 1 879 ignore_rack = True 880 dry_run = False 881 spec = ( 882 spec 883 if spec is not None 884 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 885 ) 886 return self._inner.create_topic(topic, dry_run, spec) 887 888 def create_topic_with_config( 889 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 890 ): 891 return self._inner.create_topic_with_config(topic, req._inner, spec) 892 893 def delete_topic(self, topic: str): 894 return self._inner.delete_topic(topic) 895 896 def all_topics(self) -> typing.List[MetadataTopicSpec]: 897 return self._inner.all_topics() 898 899 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 900 return self._inner.list_topics(filters) 901 902 def list_topics_with_params( 903 self, filters: typing.List[str], summary: bool 904 ) -> typing.List[MetadataTopicSpec]: 905 return self._inner.list_topics_with_params(filters, summary) 906 907 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 908 return self._topic_spec_generator(self._inner.watch_topic()) 909 910 def create_smartmodule(self, name: str, path: str, dry_run: bool): 911 spec = SmartModuleSpec.new(path) 912 return self._inner.create_smart_module(name, dry_run, spec._inner) 913 914 def delete_smartmodule(self, name: str): 915 return self._inner.delete_smart_module(name) 916 917 def list_smartmodules( 918 self, filters: typing.List[str] 919 ) -> typing.List[MetadataSmartModuleSpec]: 920 return self._inner.list_smart_modules(filters) 921 922 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 923 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 924 925 def list_partitions( 926 self, filters: typing.List[str] 927 ) -> typing.List[MetadataPartitionSpec]: 928 return self._inner.list_partitions(filters) 929 930 def _topic_spec_generator( 931 self, stream: _WatchTopicStream 932 ) -> typing.Iterator[MetaUpdateTopicSpec]: 933 item = stream.next().inner() 934 while item is not None: 935 yield MetaUpdateTopicSpec(item) 936 item = stream.next().inner() 937 938 def _smart_module_spec_generator( 939 self, stream: _WatchSmartModuleStream 940 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 941 item = stream.next().inner() 942 while item is not None: 943 yield MetaUpdateSmartModuleSpec(item) 944 item = stream.next().inner()
775class Fluvio: 776 """An interface for interacting with Fluvio streaming.""" 777 778 _inner: _Fluvio 779 780 def __init__(self, inner: _Fluvio): 781 self._inner = inner 782 783 @classmethod 784 def connect(cls): 785 """Tries to create a new Fluvio client using the current profile from 786 `~/.fluvio/config` 787 """ 788 return cls(_Fluvio.connect()) 789 790 @classmethod 791 def connect_with_config(cls, config: FluvioConfig): 792 """Creates a new Fluvio client using the given configuration""" 793 return cls(_Fluvio.connect_with_config(config._inner)) 794 795 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 796 """Creates consumer with settings defined in config 797 798 This is the recommended way to create a consume records. 799 """ 800 return ConsumerIterator(self._inner.consumer_with_config(config)) 801 802 def topic_producer_with_config(self, topic: str, config: TopicProducerConfig): 803 """Creates a new `TopicProducer` for the given topic name with config""" 804 return TopicProducer(self._inner.topic_producer_with_config(topic, config)) 805 806 def topic_producer(self, topic: str) -> TopicProducer: 807 """ 808 Creates a new `TopicProducer` for the given topic name. 809 810 Currently, producers are scoped to a specific Fluvio topic. That means 811 when you send events via a producer, you must specify which partition 812 each event should go to. 813 """ 814 return TopicProducer(self._inner.topic_producer(topic)) 815 816 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 817 """Creates a new `PartitionConsumer` for the given topic and partition 818 819 Currently, consumers are scoped to both a specific Fluvio topic and to 820 a particular partition within that topic. That means that if you have a 821 topic with multiple partitions, then in order to receive all of the 822 events in all of the partitions, you will need to create one consumer 823 per partition. 824 """ 825 return PartitionConsumer(self._inner.partition_consumer(topic, partition)) 826 827 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 828 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 829 830 Currently, consumers are scoped to both a specific Fluvio topic and to 831 its all partitions within that topic. 832 """ 833 strategy = PartitionSelectionStrategy.with_all(topic) 834 return MultiplePartitionConsumer( 835 self._inner.multi_partition_consumer(strategy._inner) 836 ) 837 838 def multi_topic_partition_consumer( 839 self, selections: typing.List[typing.Tuple[str, int]] 840 ) -> MultiplePartitionConsumer: 841 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 842 843 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 844 """ 845 strategy = PartitionSelectionStrategy.with_multiple(selections) 846 return MultiplePartitionConsumer( 847 self._inner.multi_partition_consumer(strategy._inner) 848 ) 849 850 def consumer_offsets(self) -> typing.List[ConsumerOffset]: 851 """Fetch the current offsets of the consumer""" 852 return self._inner.consumer_offsets() 853 854 def delete_consumer_offset(self, consumer: str, topic: str, partition: int): 855 """Delete the consumer offset""" 856 return self._inner.delete_consumer_offset(consumer, topic, partition) 857 858 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 859 item = stream.next() 860 while item is not None: 861 yield Record(item) 862 item = stream.next()
An interface for interacting with Fluvio streaming.
783 @classmethod 784 def connect(cls): 785 """Tries to create a new Fluvio client using the current profile from 786 `~/.fluvio/config` 787 """ 788 return cls(_Fluvio.connect())
Tries to create a new Fluvio client using the current profile from
~/.fluvio/config
790 @classmethod 791 def connect_with_config(cls, config: FluvioConfig): 792 """Creates a new Fluvio client using the given configuration""" 793 return cls(_Fluvio.connect_with_config(config._inner))
Creates a new Fluvio client using the given configuration
795 def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator: 796 """Creates consumer with settings defined in config 797 798 This is the recommended way to create a consume records. 799 """ 800 return ConsumerIterator(self._inner.consumer_with_config(config))
Creates consumer with settings defined in config
This is the recommended way to create a consume records.
802 def topic_producer_with_config(self, topic: str, config: TopicProducerConfig): 803 """Creates a new `TopicProducer` for the given topic name with config""" 804 return TopicProducer(self._inner.topic_producer_with_config(topic, config))
Creates a new TopicProducer
for the given topic name with config
806 def topic_producer(self, topic: str) -> TopicProducer: 807 """ 808 Creates a new `TopicProducer` for the given topic name. 809 810 Currently, producers are scoped to a specific Fluvio topic. That means 811 when you send events via a producer, you must specify which partition 812 each event should go to. 813 """ 814 return TopicProducer(self._inner.topic_producer(topic))
Creates a new TopicProducer
for the given topic name.
Currently, producers are scoped to a specific Fluvio topic. That means when you send events via a producer, you must specify which partition each event should go to.
816 def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer: 817 """Creates a new `PartitionConsumer` for the given topic and partition 818 819 Currently, consumers are scoped to both a specific Fluvio topic and to 820 a particular partition within that topic. That means that if you have a 821 topic with multiple partitions, then in order to receive all of the 822 events in all of the partitions, you will need to create one consumer 823 per partition. 824 """ 825 return PartitionConsumer(self._inner.partition_consumer(topic, partition))
Creates a new PartitionConsumer
for the given topic and partition
Currently, consumers are scoped to both a specific Fluvio topic and to a particular partition within that topic. That means that if you have a topic with multiple partitions, then in order to receive all of the events in all of the partitions, you will need to create one consumer per partition.
827 def multi_partition_consumer(self, topic: str) -> MultiplePartitionConsumer: 828 """Creates a new `MultiplePartitionConsumer` for the given topic and its all partitions 829 830 Currently, consumers are scoped to both a specific Fluvio topic and to 831 its all partitions within that topic. 832 """ 833 strategy = PartitionSelectionStrategy.with_all(topic) 834 return MultiplePartitionConsumer( 835 self._inner.multi_partition_consumer(strategy._inner) 836 )
Creates a new MultiplePartitionConsumer
for the given topic and its all partitions
Currently, consumers are scoped to both a specific Fluvio topic and to its all partitions within that topic.
838 def multi_topic_partition_consumer( 839 self, selections: typing.List[typing.Tuple[str, int]] 840 ) -> MultiplePartitionConsumer: 841 """Creates a new `MultiplePartitionConsumer` for the given topics and partitions 842 843 Currently, consumers are scoped to a list of Fluvio topic and partition tuple. 844 """ 845 strategy = PartitionSelectionStrategy.with_multiple(selections) 846 return MultiplePartitionConsumer( 847 self._inner.multi_partition_consumer(strategy._inner) 848 )
Creates a new MultiplePartitionConsumer
for the given topics and partitions
Currently, consumers are scoped to a list of Fluvio topic and partition tuple.
722class FluvioConfig: 723 """Configuration for Fluvio client""" 724 725 _inner: _FluvioConfig 726 727 def __init__(self, inner: _FluvioConfig): 728 self._inner = inner 729 730 @classmethod 731 def load(cls): 732 """get current cluster config from default profile""" 733 return cls(_FluvioConfig.load()) 734 735 @classmethod 736 def new(cls, addr: str): 737 """Create a new cluster configuration with no TLS.""" 738 return cls(_FluvioConfig.new(addr)) 739 740 def set_endpoint(self, endpoint: str): 741 """set endpoint""" 742 self._inner.set_endpoint(endpoint) 743 744 def set_use_spu_local_address(self, val: bool): 745 """set wheather to use spu local address""" 746 self._inner.set_use_spu_local_address(val) 747 748 def disable_tls(self): 749 """disable tls for this config""" 750 self._inner.disable_tls() 751 752 def set_anonymous_tls(self): 753 """set the config to use anonymous tls""" 754 self._inner.set_anonymous_tls() 755 756 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 757 """specify inline tls parameters""" 758 self._inner.set_inline_tls(domain, key, cert, ca_cert) 759 760 def set_tls_file_paths( 761 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 762 ): 763 """specify paths to tls files""" 764 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path) 765 766 def set_client_id(self, client_id: str): 767 """set client id""" 768 self._inner.set_client_id(client_id) 769 770 def unset_client_id(self): 771 """remove the configured client id from config""" 772 self._inner.unset_client_id()
Configuration for Fluvio client
730 @classmethod 731 def load(cls): 732 """get current cluster config from default profile""" 733 return cls(_FluvioConfig.load())
get current cluster config from default profile
735 @classmethod 736 def new(cls, addr: str): 737 """Create a new cluster configuration with no TLS.""" 738 return cls(_FluvioConfig.new(addr))
Create a new cluster configuration with no TLS.
740 def set_endpoint(self, endpoint: str): 741 """set endpoint""" 742 self._inner.set_endpoint(endpoint)
set endpoint
744 def set_use_spu_local_address(self, val: bool): 745 """set wheather to use spu local address""" 746 self._inner.set_use_spu_local_address(val)
set wheather to use spu local address
752 def set_anonymous_tls(self): 753 """set the config to use anonymous tls""" 754 self._inner.set_anonymous_tls()
set the config to use anonymous tls
756 def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str): 757 """specify inline tls parameters""" 758 self._inner.set_inline_tls(domain, key, cert, ca_cert)
specify inline tls parameters
760 def set_tls_file_paths( 761 self, domain: str, key_path: str, cert_path: str, ca_cert_path: str 762 ): 763 """specify paths to tls files""" 764 self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)
specify paths to tls files
865class FluvioAdmin: 866 _inner: _FluvioAdmin 867 868 def __init__(self, inner: _FluvioAdmin): 869 self._inner = inner 870 871 def connect(): 872 return FluvioAdmin(_FluvioAdmin.connect()) 873 874 def connect_with_config(config: FluvioConfig): 875 return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) 876 877 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 878 partitions = 1 879 replication = 1 880 ignore_rack = True 881 dry_run = False 882 spec = ( 883 spec 884 if spec is not None 885 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 886 ) 887 return self._inner.create_topic(topic, dry_run, spec) 888 889 def create_topic_with_config( 890 self, topic: str, req: CommonCreateRequest, spec: _TopicSpec 891 ): 892 return self._inner.create_topic_with_config(topic, req._inner, spec) 893 894 def delete_topic(self, topic: str): 895 return self._inner.delete_topic(topic) 896 897 def all_topics(self) -> typing.List[MetadataTopicSpec]: 898 return self._inner.all_topics() 899 900 def list_topics(self, filters: typing.List[str]) -> typing.List[MetadataTopicSpec]: 901 return self._inner.list_topics(filters) 902 903 def list_topics_with_params( 904 self, filters: typing.List[str], summary: bool 905 ) -> typing.List[MetadataTopicSpec]: 906 return self._inner.list_topics_with_params(filters, summary) 907 908 def watch_topic(self) -> typing.Iterator[MetadataTopicSpec]: 909 return self._topic_spec_generator(self._inner.watch_topic()) 910 911 def create_smartmodule(self, name: str, path: str, dry_run: bool): 912 spec = SmartModuleSpec.new(path) 913 return self._inner.create_smart_module(name, dry_run, spec._inner) 914 915 def delete_smartmodule(self, name: str): 916 return self._inner.delete_smart_module(name) 917 918 def list_smartmodules( 919 self, filters: typing.List[str] 920 ) -> typing.List[MetadataSmartModuleSpec]: 921 return self._inner.list_smart_modules(filters) 922 923 def watch_smartmodule(self) -> typing.Iterator[MetadataSmartModuleSpec]: 924 return self._smart_module_spec_generator(self._inner.watch_smart_module()) 925 926 def list_partitions( 927 self, filters: typing.List[str] 928 ) -> typing.List[MetadataPartitionSpec]: 929 return self._inner.list_partitions(filters) 930 931 def _topic_spec_generator( 932 self, stream: _WatchTopicStream 933 ) -> typing.Iterator[MetaUpdateTopicSpec]: 934 item = stream.next().inner() 935 while item is not None: 936 yield MetaUpdateTopicSpec(item) 937 item = stream.next().inner() 938 939 def _smart_module_spec_generator( 940 self, stream: _WatchSmartModuleStream 941 ) -> typing.Iterator[MetaUpdateSmartModuleSpec]: 942 item = stream.next().inner() 943 while item is not None: 944 yield MetaUpdateSmartModuleSpec(item) 945 item = stream.next().inner()
877 def create_topic(self, topic: str, spec: typing.Optional[_TopicSpec] = None): 878 partitions = 1 879 replication = 1 880 ignore_rack = True 881 dry_run = False 882 spec = ( 883 spec 884 if spec is not None 885 else _TopicSpec.new_computed(partitions, replication, ignore_rack) 886 ) 887 return self._inner.create_topic(topic, dry_run, spec)
26@dataclass(frozen=True) 27class TopicSpec: 28 mode: TopicMode = TopicMode.COMPUTED 29 partitions: int = 1 30 replications: int = 1 31 ignore_rack: bool = True 32 replica_assignment: Optional[List[PartitionMap]] = None 33 retention_time: Optional[int] = None 34 segment_size: Optional[int] = None 35 compression_type: Optional[CompressionType] = None 36 max_partition_size: Optional[int] = None 37 system: bool = False 38 39 @classmethod 40 def new(cls, partitions: int = 1, replication: int = 1, ignore: bool = True): 41 return cls(_TopicSpec.new_computed(partitions, replication, ignore)) 42 43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls() 47 48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 ) 53 54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR) 57 58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions) 63 64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications) 69 70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore) 73 74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression) 77 78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time) 86 87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size) 94 95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size) 102 103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system) 106 107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
43 @classmethod 44 def create(cls) -> "TopicSpec": 45 """Alternative constructor method""" 46 return cls()
Alternative constructor method
48 def with_assignment(self, replica_assignment: List[PartitionMap]) -> "TopicSpec": 49 """Set the assigned replica configuration""" 50 return replace( 51 self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment 52 )
Set the assigned replica configuration
54 def as_mirror_topic(self) -> "TopicSpec": 55 """Set as a mirror topic""" 56 return replace(self, mode=TopicMode.MIRROR)
Set as a mirror topic
58 def with_partitions(self, partitions: int) -> "TopicSpec": 59 """Set the specified partitions""" 60 if partitions < 0: 61 raise ValueError("Partitions must be a positive integer") 62 return replace(self, partitions=partitions)
Set the specified partitions
64 def with_replications(self, replications: int) -> "TopicSpec": 65 """Set the specified replication factor""" 66 if replications < 0: 67 raise ValueError("Replication factor must be a positive integer") 68 return replace(self, replications=replications)
Set the specified replication factor
70 def with_ignore_rack(self, ignore: bool = True) -> "TopicSpec": 71 """Set the rack ignore setting""" 72 return replace(self, ignore_rack=ignore)
Set the rack ignore setting
74 def with_compression(self, compression: CompressionType) -> "TopicSpec": 75 """Set the specified compression type""" 76 return replace(self, compression_type=compression)
Set the specified compression type
78 def with_retention_time(self, retention_time: Union[str, int]) -> "TopicSpec": 79 """Set the specified retention time""" 80 81 if isinstance(retention_time, int): 82 return replace(self, retention_time=retention_time) 83 84 parsed_time = parse_timespan(retention_time) 85 return replace(self, retention_time=parsed_time)
Set the specified retention time
87 def with_segment_size(self, size: Union[str, int]) -> "TopicSpec": 88 """Set the specified segment size""" 89 if isinstance(size, int): 90 return replace(self, segment_size=size) 91 92 parsed_size = parse_byte_size(size) 93 return replace(self, segment_size=parsed_size)
Set the specified segment size
95 def with_max_partition_size(self, size: Union[str, int]) -> "TopicSpec": 96 """Set the specified max partition size""" 97 if isinstance(size, int): 98 return replace(self, max_partition_size=size) 99 100 parsed_size = parse_byte_size(size) 101 return replace(self, max_partition_size=parsed_size)
Set the specified max partition size
103 def as_system_topic(self, is_system: bool = True) -> "TopicSpec": 104 """Set the topic as an internal system topic""" 105 return replace(self, system=is_system)
Set the topic as an internal system topic
107 def build(self) -> _TopicSpec: 108 """Build the TopicSpec based on the current configuration""" 109 # Similar implementation to the original build method 110 if self.mode == TopicMode.ASSIGNED and self.replica_assignment: 111 spec = _TopicSpec.new_assigned(self.replica_assignment) 112 elif self.mode == TopicMode.MIRROR: 113 spec = _TopicSpec.new_mirror() 114 else: 115 spec = _TopicSpec.new_computed( 116 self.partitions, self.replications, self.ignore_rack 117 ) 118 119 spec.set_system(self.system) 120 121 if self.retention_time is not None: 122 spec.set_retention_time(self.retention_time) 123 124 if self.max_partition_size is not None or self.segment_size is not None: 125 spec.set_storage(self.max_partition_size, self.segment_size) 126 127 if self.compression_type is not None: 128 spec.set_compression_type(self.compression_type.value) 129 130 return spec
Build the TopicSpec based on the current configuration
17class CompressionType(Enum): 18 NONE = "none" 19 GZIP = "gzip" 20 SNAPPY = "snappy" 21 LZ4 = "lz4" 22 ANY = "any" 23 ZSTD = "zstd"
9class Record: 10 """The individual record for a given stream.""" 11 12 _inner: _Record 13 14 def __init__(self, inner: _Record): 15 self._inner = inner 16 17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset() 20 21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value() 24 25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string() 28 29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key() 32 33 def key_string(self) -> str: 34 """The UTF-8 decoded key for this record.""" 35 return self._inner.key_string() 36 37 def timestamp(self) -> int: 38 """Timestamp of this record.""" 39 return self._inner.timestamp()
The individual record for a given stream.
17 def offset(self) -> int: 18 """The offset from the initial offset for a given stream.""" 19 return self._inner.offset()
The offset from the initial offset for a given stream.
21 def value(self) -> typing.List[int]: 22 """Returns the contents of this Record's value""" 23 return self._inner.value()
Returns the contents of this Record's value
25 def value_string(self) -> str: 26 """The UTF-8 decoded value for this record.""" 27 return self._inner.value_string()
The UTF-8 decoded value for this record.
29 def key(self) -> typing.List[int]: 30 """Returns the contents of this Record's key, if it exists""" 31 return self._inner.key()
Returns the contents of this Record's key, if it exists
42class RecordMetadata: 43 """Metadata of a record send to a topic.""" 44 45 _inner: _RecordMetadata 46 47 def __init__(self, inner: _RecordMetadata): 48 self._inner = inner 49 50 def offset(self) -> int: 51 """Return the offset of the sent record in the topic/partition.""" 52 return self._inner.offset() 53 54 def partition_id(self) -> int: 55 """Return the partition index the record was sent to.""" 56 return self._inner.partition_id()
Metadata of a record send to a topic.
Describes the location of a record stored in a Fluvio partition.
A topic is composed of one or more partitions.
600class TopicProducer: 601 """An interface for producing events to a particular topic. 602 603 A `TopicProducer` allows you to send events to the specific topic it was 604 initialized for. Once you have a `TopicProducer`, you can send events to 605 the topic, choosing which partition each event should be delivered to. 606 """ 607 608 _inner: _TopicProducer 609 610 def __init__(self, inner: _TopicProducer): 611 self._inner = inner 612 613 def send_string(self, buf: str) -> ProduceOutput: 614 """Sends a string to this producer’s topic""" 615 return self.send([], buf.encode("utf-8")) 616 617 async def async_send_string(self, buf: str) -> ProduceOutput: 618 """Sends a string to this producer’s topic""" 619 return await self.async_send([], buf.encode("utf-8")) 620 621 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 622 """ 623 Sends a key/value record to this producer's Topic. 624 625 The partition that the record will be sent to is derived from the Key. 626 """ 627 return ProduceOutput(self._inner.send(key, value)) 628 629 async def async_send( 630 self, key: typing.List[int], value: typing.List[int] 631 ) -> ProduceOutput: 632 """ 633 Async sends a key/value record to this producer's Topic. 634 635 The partition that the record will be sent to is derived from the Key. 636 """ 637 produce_output = await self._inner.async_send(key, value) 638 return ProduceOutput(produce_output) 639 640 def flush(self) -> None: 641 """ 642 Send all the queued records in the producer batches. 643 """ 644 return self._inner.flush() 645 646 async def async_flush(self) -> None: 647 """ 648 Async send all the queued records in the producer batches. 649 """ 650 await self._inner.async_flush() 651 652 def send_all( 653 self, records: typing.List[typing.Tuple[bytes, bytes]] 654 ) -> typing.List[ProduceOutput]: 655 """ 656 Sends a list of key/value records as a batch to this producer's Topic. 657 :param records: The list of records to send 658 """ 659 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 660 return [ 661 ProduceOutput(output_inner) 662 for output_inner in self._inner.send_all(records_inner) 663 ] 664 665 async def async_send_all( 666 self, records: typing.List[typing.Tuple[bytes, bytes]] 667 ) -> typing.List[ProduceOutput]: 668 """ 669 Async sends a list of key/value records as a batch to this producer's Topic. 670 :param records: The list of records to send 671 """ 672 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 673 return [ 674 ProduceOutput(output_inner) 675 for output_inner in await self._inner.async_send_all(records_inner) 676 ]
An interface for producing events to a particular topic.
A TopicProducer
allows you to send events to the specific topic it was
initialized for. Once you have a TopicProducer
, you can send events to
the topic, choosing which partition each event should be delivered to.
613 def send_string(self, buf: str) -> ProduceOutput: 614 """Sends a string to this producer’s topic""" 615 return self.send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
617 async def async_send_string(self, buf: str) -> ProduceOutput: 618 """Sends a string to this producer’s topic""" 619 return await self.async_send([], buf.encode("utf-8"))
Sends a string to this producer’s topic
621 def send(self, key: typing.List[int], value: typing.List[int]) -> ProduceOutput: 622 """ 623 Sends a key/value record to this producer's Topic. 624 625 The partition that the record will be sent to is derived from the Key. 626 """ 627 return ProduceOutput(self._inner.send(key, value))
Sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
629 async def async_send( 630 self, key: typing.List[int], value: typing.List[int] 631 ) -> ProduceOutput: 632 """ 633 Async sends a key/value record to this producer's Topic. 634 635 The partition that the record will be sent to is derived from the Key. 636 """ 637 produce_output = await self._inner.async_send(key, value) 638 return ProduceOutput(produce_output)
Async sends a key/value record to this producer's Topic.
The partition that the record will be sent to is derived from the Key.
640 def flush(self) -> None: 641 """ 642 Send all the queued records in the producer batches. 643 """ 644 return self._inner.flush()
Send all the queued records in the producer batches.
646 async def async_flush(self) -> None: 647 """ 648 Async send all the queued records in the producer batches. 649 """ 650 await self._inner.async_flush()
Async send all the queued records in the producer batches.
652 def send_all( 653 self, records: typing.List[typing.Tuple[bytes, bytes]] 654 ) -> typing.List[ProduceOutput]: 655 """ 656 Sends a list of key/value records as a batch to this producer's Topic. 657 :param records: The list of records to send 658 """ 659 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 660 return [ 661 ProduceOutput(output_inner) 662 for output_inner in self._inner.send_all(records_inner) 663 ]
Sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
665 async def async_send_all( 666 self, records: typing.List[typing.Tuple[bytes, bytes]] 667 ) -> typing.List[ProduceOutput]: 668 """ 669 Async sends a list of key/value records as a batch to this producer's Topic. 670 :param records: The list of records to send 671 """ 672 records_inner = [_ProducerBatchRecord(x, y) for (x, y) in records] 673 return [ 674 ProduceOutput(output_inner) 675 for output_inner in await self._inner.async_send_all(records_inner) 676 ]
Async sends a list of key/value records as a batch to this producer's Topic.
Parameters
- records: The list of records to send
216class ProduceOutput: 217 """Returned by of `TopicProducer.send` call allowing access to sent record metadata.""" 218 219 _inner: _ProduceOutput 220 221 def __init__(self, inner: _ProduceOutput) -> None: 222 self._inner = inner 223 224 def wait(self) -> typing.Optional[RecordMetadata]: 225 """Wait for the record metadata. 226 227 This is a blocking call and may only return a `RecordMetadata` once. 228 Any subsequent call to `wait` will return a `None` value. 229 Errors will be raised as exceptions of type `FluvioError`. 230 """ 231 res = self._inner.wait() 232 if res is None: 233 return None 234 return RecordMetadata(res) 235 236 async def async_wait(self) -> typing.Optional[RecordMetadata]: 237 """Asynchronously wait for the record metadata. 238 239 This may only return a `RecordMetadata` once. 240 Any subsequent call to `wait` will return a `None` value. 241 """ 242 return await self._inner.async_wait()
Returned by of TopicProducer.send
call allowing access to sent record metadata.
224 def wait(self) -> typing.Optional[RecordMetadata]: 225 """Wait for the record metadata. 226 227 This is a blocking call and may only return a `RecordMetadata` once. 228 Any subsequent call to `wait` will return a `None` value. 229 Errors will be raised as exceptions of type `FluvioError`. 230 """ 231 res = self._inner.wait() 232 if res is None: 233 return None 234 return RecordMetadata(res)
Wait for the record metadata.
This is a blocking call and may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
Errors will be raised as exceptions of type FluvioError
.
236 async def async_wait(self) -> typing.Optional[RecordMetadata]: 237 """Asynchronously wait for the record metadata. 238 239 This may only return a `RecordMetadata` once. 240 Any subsequent call to `wait` will return a `None` value. 241 """ 242 return await self._inner.async_wait()
Asynchronously wait for the record metadata.
This may only return a RecordMetadata
once.
Any subsequent call to wait
will return a None
value.
258class ConsumerConfig: 259 _inner: _ConsumerConfig 260 261 def __init__(self): 262 self._inner = _ConsumerConfig() 263 264 def disable_continuous(self, val: bool = True): 265 """Disable continuous mode after fetching specified records""" 266 self._inner.disable_continuous(val) 267 268 def smartmodule( 269 self, 270 name: str = None, 271 path: str = None, 272 kind: SmartModuleKind = None, 273 params: typing.Dict[str, str] = None, 274 aggregate: typing.List[bytes] = None, 275 ): 276 """ 277 This is a method for adding a smartmodule to a consumer config either 278 using a `name` of a `SmartModule` or a `path` to a wasm binary. 279 280 Args: 281 282 name: str 283 path: str 284 kind: SmartModuleKind 285 params: Dict[str, str] 286 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 287 288 Raises: 289 "Require either a path or a name for a smartmodule." 290 "Only specify one of path or name not both." 291 292 Returns: 293 294 None 295 """ 296 297 if kind is not None: 298 kind = kind.value 299 300 if path is None and name is None: 301 raise Exception("Require either a path or a name for a smartmodule.") 302 303 if path is not None and name is not None: 304 raise Exception("Only specify one of path or name not both.") 305 306 params = {} if params is None else params 307 param_keys = [x for x in params.keys()] 308 param_vals = [x for x in params.values()] 309 310 self._inner.smartmodule( 311 name, 312 path, 313 kind, 314 param_keys, 315 param_vals, 316 aggregate, 317 # These arguments are for Join stuff but that's not implemented on 318 # the python side yet 319 None, 320 None, 321 None, 322 None, 323 )
264 def disable_continuous(self, val: bool = True): 265 """Disable continuous mode after fetching specified records""" 266 self._inner.disable_continuous(val)
Disable continuous mode after fetching specified records
268 def smartmodule( 269 self, 270 name: str = None, 271 path: str = None, 272 kind: SmartModuleKind = None, 273 params: typing.Dict[str, str] = None, 274 aggregate: typing.List[bytes] = None, 275 ): 276 """ 277 This is a method for adding a smartmodule to a consumer config either 278 using a `name` of a `SmartModule` or a `path` to a wasm binary. 279 280 Args: 281 282 name: str 283 path: str 284 kind: SmartModuleKind 285 params: Dict[str, str] 286 aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule 287 288 Raises: 289 "Require either a path or a name for a smartmodule." 290 "Only specify one of path or name not both." 291 292 Returns: 293 294 None 295 """ 296 297 if kind is not None: 298 kind = kind.value 299 300 if path is None and name is None: 301 raise Exception("Require either a path or a name for a smartmodule.") 302 303 if path is not None and name is not None: 304 raise Exception("Only specify one of path or name not both.") 305 306 params = {} if params is None else params 307 param_keys = [x for x in params.keys()] 308 param_vals = [x for x in params.values()] 309 310 self._inner.smartmodule( 311 name, 312 path, 313 kind, 314 param_keys, 315 param_vals, 316 aggregate, 317 # These arguments are for Join stuff but that's not implemented on 318 # the python side yet 319 None, 320 None, 321 None, 322 None, 323 )
This is a method for adding a smartmodule to a consumer config either
using a name
of a SmartModule
or a path
to a wasm binary.
Args:
name: str
path: str
kind: SmartModuleKind
params: Dict[str, str]
aggregate: List[bytes] # This is used for the initial value of an aggregate smartmodule
Raises: "Require either a path or a name for a smartmodule." "Only specify one of path or name not both."
Returns:
None
346class PartitionConsumer: 347 """ 348 An interface for consuming events from a particular partition 349 350 There are two ways to consume events: by "fetching" events and by 351 "streaming" events. Fetching involves specifying a range of events that you 352 want to consume via their Offset. A fetch is a sort of one-time batch 353 operation: you’ll receive all of the events in your range all at once. When 354 you consume events via Streaming, you specify a starting Offset and receive 355 an object that will continuously yield new events as they arrive. 356 """ 357 358 _inner: _PartitionConsumer 359 360 def __init__(self, inner: _PartitionConsumer): 361 self._inner = inner 362 363 def stream(self, offset: Offset) -> typing.Iterator[Record]: 364 """ 365 Continuously streams events from a particular offset in the consumer’s 366 partition. This returns a `Iterator[Record]` which is an 367 iterator. 368 369 Streaming is one of the two ways to consume events in Fluvio. It is a 370 continuous request for new records arriving in a partition, beginning 371 at a particular offset. You specify the starting point of the stream 372 using an Offset and periodically receive events, either individually or 373 in batches. 374 """ 375 return self._generator(self._inner.stream(offset)) 376 377 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 378 """ 379 Continuously streams events from a particular offset in the consumer’s 380 partition. This returns a `AsyncIterator[Record]` which is an 381 iterator. 382 383 Streaming is one of the two ways to consume events in Fluvio. It is a 384 continuous request for new records arriving in a partition, beginning 385 at a particular offset. You specify the starting point of the stream 386 using an Offset and periodically receive events, either individually or 387 in batches. 388 """ 389 return self._async_generator(await self._inner.async_stream(offset)) 390 391 def stream_with_config( 392 self, offset: Offset, config: ConsumerConfig 393 ) -> typing.Iterator[Record]: 394 """ 395 Continuously streams events from a particular offset with a SmartModule 396 WASM module in the consumer’s partition. This returns a 397 `Iterator[Record]` which is an iterator. 398 399 Streaming is one of the two ways to consume events in Fluvio. It is a 400 continuous request for new records arriving in a partition, beginning 401 at a particular offset. You specify the starting point of the stream 402 using an Offset and periodically receive events, either individually or 403 in batches. 404 405 Args: 406 offset: Offset 407 wasm_module_path: str - The absolute path to the WASM file 408 409 Example: 410 import os 411 412 wmp = os.path.abspath("somefilter.wasm") 413 config = ConsumerConfig() 414 config.smartmodule(path=wmp) 415 for i in consumer.stream_with_config(Offset.beginning(), config): 416 # do something with i 417 418 Returns: 419 `Iterator[Record]` 420 421 """ 422 return self._generator(self._inner.stream_with_config(offset, config._inner)) 423 424 async def async_stream_with_config( 425 self, offset: Offset, config: ConsumerConfig 426 ) -> typing.AsyncIterator[Record]: 427 """ 428 Continuously streams events from a particular offset with a SmartModule 429 WASM module in the consumer’s partition. This returns a 430 `AsyncIterator[Record]` which is an async iterator. 431 432 Streaming is one of the two ways to consume events in Fluvio. It is a 433 continuous request for new records arriving in a partition, beginning 434 at a particular offset. You specify the starting point of the stream 435 using an Offset and periodically receive events, either individually or 436 in batches. 437 438 Args: 439 offset: Offset 440 wasm_module_path: str - The absolute path to the WASM file 441 442 Example: 443 import os 444 445 wmp = os.path.abspath("somefilter.wasm") 446 config = ConsumerConfig() 447 config.smartmodule(path=wmp) 448 `AsyncIterator[Record]` 449 450 """ 451 return self._async_generator( 452 await self._inner.async_stream_with_config(offset, config._inner) 453 ) 454 455 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 456 item = stream.next() 457 while item is not None: 458 yield Record(item) 459 item = stream.next() 460 461 async def _async_generator( 462 self, astream: _AsyncPartitionConsumerStream 463 ) -> typing.AsyncIterator[Record]: 464 item = await astream.async_next() 465 while item is not None: 466 yield Record(item) 467 item = await astream.async_next()
An interface for consuming events from a particular partition
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
363 def stream(self, offset: Offset) -> typing.Iterator[Record]: 364 """ 365 Continuously streams events from a particular offset in the consumer’s 366 partition. This returns a `Iterator[Record]` which is an 367 iterator. 368 369 Streaming is one of the two ways to consume events in Fluvio. It is a 370 continuous request for new records arriving in a partition, beginning 371 at a particular offset. You specify the starting point of the stream 372 using an Offset and periodically receive events, either individually or 373 in batches. 374 """ 375 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
377 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 378 """ 379 Continuously streams events from a particular offset in the consumer’s 380 partition. This returns a `AsyncIterator[Record]` which is an 381 iterator. 382 383 Streaming is one of the two ways to consume events in Fluvio. It is a 384 continuous request for new records arriving in a partition, beginning 385 at a particular offset. You specify the starting point of the stream 386 using an Offset and periodically receive events, either individually or 387 in batches. 388 """ 389 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partition. This returns a AsyncIterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
391 def stream_with_config( 392 self, offset: Offset, config: ConsumerConfig 393 ) -> typing.Iterator[Record]: 394 """ 395 Continuously streams events from a particular offset with a SmartModule 396 WASM module in the consumer’s partition. This returns a 397 `Iterator[Record]` which is an iterator. 398 399 Streaming is one of the two ways to consume events in Fluvio. It is a 400 continuous request for new records arriving in a partition, beginning 401 at a particular offset. You specify the starting point of the stream 402 using an Offset and periodically receive events, either individually or 403 in batches. 404 405 Args: 406 offset: Offset 407 wasm_module_path: str - The absolute path to the WASM file 408 409 Example: 410 import os 411 412 wmp = os.path.abspath("somefilter.wasm") 413 config = ConsumerConfig() 414 config.smartmodule(path=wmp) 415 for i in consumer.stream_with_config(Offset.beginning(), config): 416 # do something with i 417 418 Returns: 419 `Iterator[Record]` 420 421 """ 422 return self._generator(self._inner.stream_with_config(offset, config._inner))
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
424 async def async_stream_with_config( 425 self, offset: Offset, config: ConsumerConfig 426 ) -> typing.AsyncIterator[Record]: 427 """ 428 Continuously streams events from a particular offset with a SmartModule 429 WASM module in the consumer’s partition. This returns a 430 `AsyncIterator[Record]` which is an async iterator. 431 432 Streaming is one of the two ways to consume events in Fluvio. It is a 433 continuous request for new records arriving in a partition, beginning 434 at a particular offset. You specify the starting point of the stream 435 using an Offset and periodically receive events, either individually or 436 in batches. 437 438 Args: 439 offset: Offset 440 wasm_module_path: str - The absolute path to the WASM file 441 442 Example: 443 import os 444 445 wmp = os.path.abspath("somefilter.wasm") 446 config = ConsumerConfig() 447 config.smartmodule(path=wmp) 448 `AsyncIterator[Record]` 449 450 """ 451 return self._async_generator( 452 await self._inner.async_stream_with_config(offset, config._inner) 453 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
`AsyncIterator[Record]`
470class MultiplePartitionConsumer: 471 """ 472 An interface for consuming events from multiple partitions 473 474 There are two ways to consume events: by "fetching" events and by 475 "streaming" events. Fetching involves specifying a range of events that you 476 want to consume via their Offset. A fetch is a sort of one-time batch 477 operation: you’ll receive all of the events in your range all at once. When 478 you consume events via Streaming, you specify a starting Offset and receive 479 an object that will continuously yield new events as they arrive. 480 """ 481 482 _inner: _MultiplePartitionConsumer 483 484 def __init__(self, inner: _MultiplePartitionConsumer): 485 self._inner = inner 486 487 def stream(self, offset: Offset) -> typing.Iterator[Record]: 488 """ 489 Continuously streams events from a particular offset in the consumer’s 490 partitions. This returns a `Iterator[Record]` which is an 491 iterator. 492 493 Streaming is one of the two ways to consume events in Fluvio. It is a 494 continuous request for new records arriving in a partition, beginning 495 at a particular offset. You specify the starting point of the stream 496 using an Offset and periodically receive events, either individually or 497 in batches. 498 """ 499 return self._generator(self._inner.stream(offset)) 500 501 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 502 """ 503 Continuously streams events from a particular offset in the consumer’s 504 partitions. This returns a `AsyncIterator[Record]` which is an 505 async iterator. 506 507 Streaming is one of the two ways to consume events in Fluvio. It is a 508 continuous request for new records arriving in a partition, beginning 509 at a particular offset. You specify the starting point of the stream 510 using an Offset and periodically receive events, either individually or 511 in batches. 512 """ 513 return self._async_generator(await self._inner.async_stream(offset)) 514 515 def stream_with_config( 516 self, offset: Offset, config: ConsumerConfig 517 ) -> typing.Iterator[Record]: 518 """ 519 Continuously streams events from a particular offset with a SmartModule 520 WASM module in the consumer’s partitions. This returns a 521 `Iterator[Record]` which is an iterator. 522 523 Streaming is one of the two ways to consume events in Fluvio. It is a 524 continuous request for new records arriving in a partition, beginning 525 at a particular offset. You specify the starting point of the stream 526 using an Offset and periodically receive events, either individually or 527 in batches. 528 529 Args: 530 offset: Offset 531 wasm_module_path: str - The absolute path to the WASM file 532 533 Example: 534 import os 535 536 wmp = os.path.abspath("somefilter.wasm") 537 config = ConsumerConfig() 538 config.smartmodule(path=wmp) 539 for i in consumer.stream_with_config(Offset.beginning(), config): 540 # do something with i 541 542 Returns: 543 `Iterator[Record]` 544 545 """ 546 return self._generator( 547 self._inner.stream_with_config(offset._inner, config._inner) 548 ) 549 550 async def async_stream_with_config( 551 self, offset: Offset, config: ConsumerConfig 552 ) -> typing.AsyncIterator[Record]: 553 """ 554 Continuously streams events from a particular offset with a SmartModule 555 WASM module in the consumer’s partitions. This returns a 556 `AsyncIterator[Record]` which is an async iterator. 557 558 Streaming is one of the two ways to consume events in Fluvio. It is a 559 continuous request for new records arriving in a partition, beginning 560 at a particular offset. You specify the starting point of the stream 561 using an Offset and periodically receive events, either individually or 562 in batches. 563 564 Args: 565 offset: Offset 566 wasm_module_path: str - The absolute path to the WASM file 567 568 Example: 569 import os 570 571 wmp = os.path.abspath("somefilter.wasm") 572 config = ConsumerConfig() 573 config.smartmodule(path=wmp) 574 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 575 # do something with i 576 577 Returns: 578 `AsyncIterator[Record]` 579 580 """ 581 return self._async_generator( 582 await self._inner.async_stream_with_config(offset, config._inner) 583 ) 584 585 def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]: 586 item = stream.next() 587 while item is not None: 588 yield Record(item) 589 item = stream.next() 590 591 async def _async_generator( 592 self, astream: _AsyncPartitionConsumerStream 593 ) -> typing.AsyncIterator[Record]: 594 item = await astream.async_next() 595 while item is not None: 596 yield Record(item) 597 item = await astream.async_next()
An interface for consuming events from multiple partitions
There are two ways to consume events: by "fetching" events and by "streaming" events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.
487 def stream(self, offset: Offset) -> typing.Iterator[Record]: 488 """ 489 Continuously streams events from a particular offset in the consumer’s 490 partitions. This returns a `Iterator[Record]` which is an 491 iterator. 492 493 Streaming is one of the two ways to consume events in Fluvio. It is a 494 continuous request for new records arriving in a partition, beginning 495 at a particular offset. You specify the starting point of the stream 496 using an Offset and periodically receive events, either individually or 497 in batches. 498 """ 499 return self._generator(self._inner.stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a Iterator[Record]
which is an
iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
501 async def async_stream(self, offset: Offset) -> typing.AsyncIterator[Record]: 502 """ 503 Continuously streams events from a particular offset in the consumer’s 504 partitions. This returns a `AsyncIterator[Record]` which is an 505 async iterator. 506 507 Streaming is one of the two ways to consume events in Fluvio. It is a 508 continuous request for new records arriving in a partition, beginning 509 at a particular offset. You specify the starting point of the stream 510 using an Offset and periodically receive events, either individually or 511 in batches. 512 """ 513 return self._async_generator(await self._inner.async_stream(offset))
Continuously streams events from a particular offset in the consumer’s
partitions. This returns a AsyncIterator[Record]
which is an
async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
515 def stream_with_config( 516 self, offset: Offset, config: ConsumerConfig 517 ) -> typing.Iterator[Record]: 518 """ 519 Continuously streams events from a particular offset with a SmartModule 520 WASM module in the consumer’s partitions. This returns a 521 `Iterator[Record]` which is an iterator. 522 523 Streaming is one of the two ways to consume events in Fluvio. It is a 524 continuous request for new records arriving in a partition, beginning 525 at a particular offset. You specify the starting point of the stream 526 using an Offset and periodically receive events, either individually or 527 in batches. 528 529 Args: 530 offset: Offset 531 wasm_module_path: str - The absolute path to the WASM file 532 533 Example: 534 import os 535 536 wmp = os.path.abspath("somefilter.wasm") 537 config = ConsumerConfig() 538 config.smartmodule(path=wmp) 539 for i in consumer.stream_with_config(Offset.beginning(), config): 540 # do something with i 541 542 Returns: 543 `Iterator[Record]` 544 545 """ 546 return self._generator( 547 self._inner.stream_with_config(offset._inner, config._inner) 548 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
Iterator[Record]
which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
for i in consumer.stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
Iterator[Record]
550 async def async_stream_with_config( 551 self, offset: Offset, config: ConsumerConfig 552 ) -> typing.AsyncIterator[Record]: 553 """ 554 Continuously streams events from a particular offset with a SmartModule 555 WASM module in the consumer’s partitions. This returns a 556 `AsyncIterator[Record]` which is an async iterator. 557 558 Streaming is one of the two ways to consume events in Fluvio. It is a 559 continuous request for new records arriving in a partition, beginning 560 at a particular offset. You specify the starting point of the stream 561 using an Offset and periodically receive events, either individually or 562 in batches. 563 564 Args: 565 offset: Offset 566 wasm_module_path: str - The absolute path to the WASM file 567 568 Example: 569 import os 570 571 wmp = os.path.abspath("somefilter.wasm") 572 config = ConsumerConfig() 573 config.smartmodule(path=wmp) 574 async for i in await consumer.async_stream_with_config(Offset.beginning(), config): 575 # do something with i 576 577 Returns: 578 `AsyncIterator[Record]` 579 580 """ 581 return self._async_generator( 582 await self._inner.async_stream_with_config(offset, config._inner) 583 )
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partitions. This returns a
AsyncIterator[Record]
which is an async iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.
Args: offset: Offset wasm_module_path: str - The absolute path to the WASM file
Example: import os
wmp = os.path.abspath("somefilter.wasm")
config = ConsumerConfig()
config.smartmodule(path=wmp)
async for i in await consumer.async_stream_with_config(Offset.beginning(), config):
# do something with i
Returns:
AsyncIterator[Record]
698class ConsumerOffset: 699 """Consumer offset""" 700 701 _inner: _ConsumerOffset 702 703 def __init__(self, inner: _ConsumerOffset): 704 self._inner = inner 705 706 def consumer_id(self) -> str: 707 return self._inner.consumer_id() 708 709 def topic(self) -> str: 710 return self._inner.topic() 711 712 def partition(self) -> int: 713 return self._inner.partition() 714 715 def offset(self) -> int: 716 return self._inner.offset() 717 718 def modified_time(self) -> int: 719 return self._inner.modified_time()
Consumer offset
29class CommonCreateRequest: 30 _inner: _CommonCreateRequest 31 32 def __init__(self, inner: _CommonCreateRequest): 33 self._inner = inner 34 35 @classmethod 36 def new(cls, name: str, dry_run: bool, timeout: int): 37 return cls(_CommonCreateRequest.new(name, dry_run, timeout))
84class SmartModuleSpec: 85 _inner: _SmartModuleSpec 86 87 def __init__(self, inner: _SmartModuleSpec): 88 self._inner = inner 89 90 @classmethod 91 def new(cls, path: str): 92 f = open(path, mode="rb") 93 data = f.read() 94 f.close() 95 return cls(_SmartModuleSpec.with_binary(data))
18class PartitionMap: 19 _inner: _PartitionMap 20 21 def __init__(self, inner: _PartitionMap): 22 self._inner = inner 23 24 @classmethod 25 def new(cls, partition: int, replicas: typing.List[int]): 26 return cls(_PartitionMap.new(partition, replicas))
50class MessageMetadataTopicSpec: 51 _inner: _MessageMetadataTopicSpec 52 53 def __init__(self, inner: _MessageMetadataTopicSpec): 54 self._inner = inner 55 56 def is_update(self) -> bool: 57 return self._inner.is_update() 58 59 def is_delete(self) -> bool: 60 return self._inner.is_delete() 61 62 def metadata_topic_spec(self) -> MetadataTopicSpec: 63 return MetadataTopicSpec(self._inner.metadata_topic_spec())
142class MetadataPartitionSpec: 143 _inner: _MetadataPartitionSpec 144 145 def __init__(self, inner: _MetadataPartitionSpec): 146 self._inner = inner 147 148 def name(self) -> str: 149 return self._inner.name()
40class MetadataTopicSpec: 41 _inner: _MetadataTopicSpec 42 43 def __init__(self, inner: _MetadataTopicSpec): 44 self._inner = inner 45 46 def name(self) -> str: 47 return self._inner.name()
66class MetaUpdateTopicSpec: 67 _inner: _MetaUpdateTopicSpec 68 69 def __init__(self, inner: _MetaUpdateTopicSpec): 70 self._inner = inner 71 72 def all(self) -> typing.List[MetadataTopicSpec]: 73 inners = self._inner.all() 74 return [MetadataTopicSpec(i) for i in inners] 75 76 def changes(self) -> typing.List[MessageMetadataTopicSpec]: 77 inners = self._inner.changes() 78 return [MessageMetadataTopicSpec(i) for i in inners] 79 80 def epoch(self) -> int: 81 return self._inner.epoch()
679class PartitionSelectionStrategy: 680 """Stragegy to select partitions""" 681 682 _inner: _PartitionSelectionStrategy 683 684 def __init__(self, inner: _FluvioConfig): 685 self._inner = inner 686 687 @classmethod 688 def with_all(cls, topic: str): 689 """select all partitions of one topic""" 690 return cls(_PartitionSelectionStrategy.with_all(topic)) 691 692 @classmethod 693 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 694 """select multiple partitions of multiple topics""" 695 return cls(_PartitionSelectionStrategy.with_multiple(topic))
Stragegy to select partitions
687 @classmethod 688 def with_all(cls, topic: str): 689 """select all partitions of one topic""" 690 return cls(_PartitionSelectionStrategy.with_all(topic))
select all partitions of one topic
692 @classmethod 693 def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]): 694 """select multiple partitions of multiple topics""" 695 return cls(_PartitionSelectionStrategy.with_multiple(topic))
select multiple partitions of multiple topics
245class SmartModuleKind(Enum): 246 """ 247 Use of this is to explicitly set the kind of a smartmodule. Not required 248 but needed for legacy SmartModules. 249 """ 250 251 Filter = _SmartModuleKind.Filter 252 Map = _SmartModuleKind.Map 253 ArrayMap = _SmartModuleKind.ArrayMap 254 FilterMap = _SmartModuleKind.FilterMap 255 Aggregate = _SmartModuleKind.Aggregate
Use of this is to explicitly set the kind of a smartmodule. Not required but needed for legacy SmartModules.