diff --git a/streamz/dask.py b/streamz/dask.py index d0c9d4e2..3adcfa1c 100644 --- a/streamz/dask.py +++ b/streamz/dask.py @@ -2,13 +2,13 @@ from operator import getitem -from tornado import gen - from dask.compatibility import apply +from dask.distributed import get_worker from distributed.client import default_client +from tornado import gen -from .core import Stream from . import core, sources +from .core import Stream class DaskStream(Stream): @@ -198,3 +198,48 @@ class filenames(DaskStream, sources.filenames): @DaskStream.register_api(staticmethod) class from_textfile(DaskStream, sources.from_textfile): pass + + +@DaskStream.register_api() +class to_kafka(DaskStream): + """ Write values to Kafka directly from the Dask worker + + Parameters + ---------- + topic : string + The topic which to write + producer_config : dict + Settings to set up the stream, see + https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + Examples: + bootstrap.servers: Connection string (host:port) to Kafka + + """ + def __init__(self, upstream, topic, producer_config, **kwargs): + self.topic = topic + self.producer_config = producer_config + + stream_name = kwargs.pop('stream_name', None) + DaskStream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + + def get_producer(config): + w = get_worker() + if hasattr(w, 'producer'): + return w.producer + + import confluent_kafka as ck + w.producer = getattr(ck, 'Producer')(config) + return w.producer + + def produce(topic, value, producer_config): + producer = get_producer(producer_config) + producer.produce(topic, value) + producer.flush() + return value + + client = default_client() + result = client.submit(produce, self.topic, x, self.producer_config) + self._emit(result)