-
Notifications
You must be signed in to change notification settings - Fork 0
Apache Kafka Storm Samza
KAFKA ZK
Tutorials
http://www.ashishpaliwal.com/blog/2015/06/apache-kafka-quick-start-guide/ - good quick guide with examples
http://kafka.apache.org/documentation.html#introduction - classic
http://www.infoq.com/articles/apache-kafka - good
http://www.confluent.io/blog/stream-data-platform-1/ - see as stream
Later
http://kafka.apache.org/documentation.html#design
http://www.confluent.io/blog/stream-data-platform-2/ @Database Changes
http://stackoverflow.com/questions/27922898/how-to-decide-kafka-cluster-size - cluster size
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying - good -
http://www.slideshare.net/junrao/kafka-replication-apachecon2013 - layout replication and architecture
http://sookocheff.com/post/kafka/kafka-in-a-nutshell/ - good
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol - protocol needed while writing code
Samza
https://engineering.linkedin.com/samza/real-time-insights-linkedins-performance-using-apache-samza
http://www.infoq.com/articles/linkedin-samza
* Advertising – getting relevant advertisements, as well as tracking and monitoring ad display, clicks and other metrics
* Sophisticated monitoring that allows performing of complex querys like “the top five slowest pages for the last minute.”
* Newsfeed displays when people move to another company, when they like an article, when they join a group, et cetera.
Flume(logs) vs kafka
http://stackoverflow.com/questions/29774223/can-kafka-producer-read-log-files
Samza vs Storm
http://stackoverflow.com/questions/29111549/where-do-apache-samza-and-apache-storm-differ-in-their-use-cases
Storm and Kafka
http://hortonworks.com/blog/storm-kafka-together-real-time-data-refinery/ - Apache Storm is a distributed real-time computation engine that reliably processes unbounded streams of data. While Storm processes stream data at scale, Apache Kafka processes messages at scale. Kafka is a distributed pub-sub real-time messaging system that provides strong durability and fault tolerance guarantees.
http://zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/ - example
http://www.slideshare.net/gschmutz/kafka-andstromeventprocessinginrealtime - example
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/ - integration
http://www.infoworld.com/article/2854894/application-development/spark-and-storm-for-real-time-computation.html - info
http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/ - tutorial
http://zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/ https://github.com/apache/storm/tree/master/external/storm-kafka- to start stuff up
http://www.michael-noll.com/tutorials/ - doing this!!
Kafka Storm and Socket - for round trip
http://www.conductor.com/nightlight/build-real-time-data-stream-miley-cyrus/
zookeeper
brew install zookeeper
Put this in bash profile - /usr/local/Cellar/zookeeper/3.4.7/bin
zkServer start
Config file - /usr/local/etc/zookeeper/zoo.cfg
zkCli -server 127.0.0.1:2181
kafka
brew install kafka
cd /usr/local/Cellar/kafka/0.8.2.2/libexec
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh –list –zookeeper localhost:2181
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testTopic
Created topic “testTopic”.
bin/kafka-topics.sh –list –zookeeper localhost:2181
testTopic
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning–topic testTopic
‘from-beginning–topic’ is not a recognized option
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic testTopic
Setting up multi broker -
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
ps | grep server-1.properties
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
Understand
Basic
-Topics - feeds of messages
-Producer
-Consumer
-Broker - kafka has cluster of servers
Topics
Can have partitions
Stream of writes happen
Each partition must fit on server it resides
kafka retains messages for configurable time
Distribution
Partition on different servers for fault tolerance
Each partition has leaders and follower and the followers just replicate and pickup in case of absence
Each server can act as leader and follower
Consumer
Order and load balance - each partition is consumed by exactly one consumer in the group.
Note however that there cannot be more consumer instances in a consumer group than partitions.
Configure
Till how much time logs stay on etc.
Data Models -
http://www.confluent.io/blog/stream-data-platform-2/ - Avro
Partitions
http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
writes to different partitions can be done fully in parallel
Processing of Kafka Data
http://blog.cloudera.com/blog/2014/09/apache-kafka-for-beginners/
Website activity tracking sends events such as page views and searches Kafka, where they become available for real- time processing, dashboards and offline analytics in Hadoop
[https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying]
The log entry number can be thought of as the “timestamp” of the entry. Describing this ordering as a notion of time seems a bit odd at first, but it has the convenient property that it is decoupled from any particular physical clock. This property will turn out to be essential as we get to distributed systems.
[http://stackoverflow.com/questions/29820384/apache-kafka-order-of-messages-with-multiple-partitions]
Ordering
But if the second case, you might consider partitioning your messages by some key and thus all messages for that key will arrive to one partition (they actually might go to another partition if you resize topic, but that’s a different case) and thus will guarantee that all messages for that key are in order.
{:user-id 101 :viewed “/page1.html” :at #inst “2013-04-12T23:20:50.22Z”}
[http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions?rq=1]
ordering maintained from cluster node to consumer rather than multiple producer to the partition
How many partitions
The number of partitions should scale only with the number of consuming machines not with any characteristic of the data. The recommended approach is to have a single topic and partition it by user_id, this will give you locality and order by user.
Partition can be on different servers - http://www.slideshare.net/saumitra121/distributed-kafka
Producers can select - topic and partition of choice usually round robin
Order garaunteee - Messages sent by a producer to a particular topic partition will be appended in the order they are sent. •
Kafka has a concept of partitions: each topic by default has 1 partition, each partition can be thought as a unit of parallelism. Setup appropriate partitioner, such that each producer writes to it’s own partition, in isolated manner.
[http://stackoverflow.com/questions/24445308/kafka-multiple-producer-writing-to-same-topic-ordering-of-message-and-data-bur]
[https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka - limit of partitions ]
https://www.quora.com/Is-it-possible-to-consume-kafka-message-offset-based-on-timestamp - close to the answer
At LinkedIn, some of the high volume topics are configured with more than
1 partition per broker. Having more partitions increases I/O parallelism
for writes and also increases the degree of parallelism for consumers
(since partition is the unit for distributing data to consumers). On the
other hand, more partitions adds some overhead: (a) there will be more
files and thus more open file handlers; (b) there are more offsets to be
checkpointed by consumers which can increase the load of ZK. So, you want
to balace these tradeoffs.[http://grokbase.com/t/kafka/users/131fk15cvr/number-of-partitions-per-broker]
[http://www.systemswemake.com/papers/kafka]
When a producer publishes a message the payload is appended to the current segment. The messages are flushed to the disk either after a specific number of messages have accumulated or a certain time period has elapsed. The consumer sees the message only after the message has been flushed to the disk.
consume - This implies that all messages that are belong to a particular partition of a topic will be consumed by a consumer in a consumer group.
It also guarantees that messages from a single partition are delivered to a consumer in order. Across partitions no such guarantee is made.
What to do with kafka dat streams
[http://www.confluent.io/blog/stream-data-platform-1/]
What Is a Stream Data Platform For?
A stream data platform has two primary uses:
1. Data Integration: The stream data platform captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores, Hadoop, or the data warehouse.
2. Stream processing: It enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.
Strorage
Messages are written to a file immediately on their arrival (but buffers are not flushed), thus Kafka fully avoids cache management and the troubles it brings. Durability is provided by replicating messages to different brokers rather than immediate persistence to disk. Caching is left to the operating system, which is much more adept at file caching than any JVM application could possibly be. Also, linear reads and writes are very fast on hard disk drives.
http://blog.gravityrd.com/tech-centralized-logging-apache-kafka/
kafka doesn’t use http
Kafka uses a binary protocol over TCP. The protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types.
The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. No handshake is required on connection or disconnection. TCP is happier if you maintain persistent connections used for many requests to amortize the cost of the TCP handshake, but beyond this penalty connecting is pretty cheap.
The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. However it should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).
[https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol]
Zookeeper + kafka
It is basically used to maintain co-ordination between different nodes in a cluster, one of the most important thing for kafka is it uses zookeeper to periodically commit offset so that in case of node failure it can resume from the previously committed offset (imagine yourself taking care of all this by your own).
[http://stackoverflow.com/questions/23751708/kafka-is-zookeeper-a-must]
First the stream [of messages] is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order.
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
Same Zookeeper for solo,kafka
Like the distributed processes it coordinates, ZooKeeper itself is intended to be replicated over a sets of hosts(ensemble). whenever a change is made, it is not considered successful until it has been written to a quorum (at least half) of the servers in the ensemble.
[http://stackoverflow.com/questions/22930137/should-apache-kafka-and-apache-hadoop-share-the-same-zookeeper-instance?rq=1]
as you point out running multiple clusters is a hardware investment,
plus you miss out on opportunities to improve reliability. for example,
if you have three applications that have a cluster of 3 zk servers each,
one failure will result in an outage. if instead of using the 9 servers
you have the same three applications use a zk cluster with 7 servers you
can tolerate three failures without an outage.
http://zookeeper-user.578899.n2.nabble.com/Multiple-ZK-clusters-or-a-single-shared-cluster-td3277547.html
https://engineering.linkedin.com/kafka/running-kafka-scale - kafka at scale - 60 clusters - 1100 nodes
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
In production -
The Kafka cluster is set up on three of the machines. The six drives are directly mounted with no RAID (JBOD style). The remaining three machines I use for Zookeeper and for generating load.
config - https://gist.github.com/jkreps/c7ddb4041ef62a900e6c
https://www.quora.com/What-is-viable-hardware-for-Zookeeper-and-Kafka-brokers - machine requirements
Config Recommended
[https://kafka.apache.org/081/ops.html]
controlled.shutdown.enable=true
auto.leader.rebalance.enable=true
Expanding your cluster
Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers. However these new servers will not automatically be assigned any data partitions, so unless partitions are moved to them they won’t be doing any work until new topics are created. So usually when you add machines to your cluster you will want to migrate some existing data to these machines.
Replication Guarantee
The fundamental guarantee a log replication algorithm must provide is that if it tells the client a message is committed, and the leader fails, the newly elected leader must also have that message. Kafka gives this guarantee by requiring the leader to be elected from a subset of replicas that are “in sync” with the previous leader or, in other words, caught up to the leader’s log. The leader for every partition tracks this in-sync replica (aka ISR) list by computing the lag of every replica from itself. When a producer sends a message to the broker, it is written by the leader and replicated to all the partition’s replicas. A message is committed only after it has been successfully copied to all the in-sync replicas. Since the message replication latency is capped by the slowest in-sync replica, it is important to quickly detect slow replicas and remove them from the in-sync replica list. The details of Kafka’s replication protocol are somewhat nuanced and this blog is not intended to be an exhaustive discussion of the topic. You can read more about how Kafka replication works here. For the sake of this discussion, we’ll focus on operability of the replication protocol.
[http://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/]
A message is considered “committed” when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.
[http://kafka.apache.org/documentation.html#replication]
The determination of stuck and lagging replicas is controlled by the replica.lag.time.max.ms configuration.
Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.
The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message.
This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes.
To survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement)
Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.
If all the nodes replicating a partition die, this guarantee no longer holds.
1. Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data).
2. Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.
This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy
Availability and Durability Guarantees
request.required.acks=-1