Skip to content

Commit a27fd0d

Browse files
author
Prathap P
committed
Add support for extra options for stream source
1 parent 51c16b1 commit a27fd0d

File tree

1 file changed

+15
-13
lines changed

1 file changed

+15
-13
lines changed

sdk/python/feast/infra/contrib/spark_kafka_processor.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from types import MethodType
2-
from typing import List, Optional, no_type_check
2+
from typing import Dict, List, Optional, no_type_check
33

44
import pandas as pd
55
from pyspark.sql import DataFrame, SparkSession
@@ -29,6 +29,7 @@ class SparkKafkaProcessor(StreamProcessor):
2929
format: str
3030
preprocess_fn: Optional[MethodType]
3131
join_keys: List[str]
32+
stream_source_options: Optional[Dict[str, str]]
3233

3334
def __init__(
3435
self,
@@ -37,6 +38,7 @@ def __init__(
3738
sfv: StreamFeatureView,
3839
config: ProcessorConfig,
3940
preprocess_fn: Optional[MethodType] = None,
41+
stream_source_options: Optional[Dict[str, str]],
4042
):
4143
if not isinstance(sfv.stream_source, KafkaSource):
4244
raise ValueError("data source is not kafka source")
@@ -59,6 +61,7 @@ def __init__(
5961
raise ValueError("config is not spark processor config")
6062
self.spark = config.spark_session
6163
self.preprocess_fn = preprocess_fn
64+
self.stream_source_options = stream_source_options
6265
self.processing_time = config.processing_time
6366
self.query_timeout = config.query_timeout
6467
self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities]
@@ -80,19 +83,23 @@ def ingest_stream_feature_view(
8083
@no_type_check
8184
def _ingest_stream_data(self) -> StreamTable:
8285
"""Only supports json and avro formats currently."""
86+
kafka_options: Dict[str, str] = {
87+
"kafka.bootstrap.servers": self.data_source.kafka_options.kafka_bootstrap_servers,
88+
"subscribe": self.data_source.kafka_options.topic,
89+
"startingOffsets": "latest",
90+
}
91+
if self.stream_source_options:
92+
# Update user-provided options to override defaults
93+
kafka_options.update(self.stream_source_options)
94+
8395
if self.format == "json":
8496
if not isinstance(
8597
self.data_source.kafka_options.message_format, JsonFormat
8698
):
8799
raise ValueError("kafka source message format is not jsonformat")
88100
stream_df = (
89101
self.spark.readStream.format("kafka")
90-
.option(
91-
"kafka.bootstrap.servers",
92-
self.data_source.kafka_options.kafka_bootstrap_servers,
93-
)
94-
.option("subscribe", self.data_source.kafka_options.topic)
95-
.option("startingOffsets", "latest") # Query start
102+
.options(**kafka_options)
96103
.load()
97104
.selectExpr("CAST(value AS STRING)")
98105
.select(
@@ -110,12 +117,7 @@ def _ingest_stream_data(self) -> StreamTable:
110117
raise ValueError("kafka source message format is not avro format")
111118
stream_df = (
112119
self.spark.readStream.format("kafka")
113-
.option(
114-
"kafka.bootstrap.servers",
115-
self.data_source.kafka_options.kafka_bootstrap_servers,
116-
)
117-
.option("subscribe", self.data_source.kafka_options.topic)
118-
.option("startingOffsets", "latest") # Query start
120+
.options(**kafka_options)
119121
.load()
120122
.selectExpr("CAST(value AS STRING)")
121123
.select(

0 commit comments

Comments
 (0)