847-505-9933 | +91 20 66446300 info@datametica.com

Integration of Spark Streaming with Flume

Flume is a distributed and reliable service which is used for efficiently collecting and moving large amount of streaming event data.

Spark Streaming brings Spark’s language-integrated API to stream processing; enabling us to process live data streaming jobs in a manner similar to batch processing jobs. Data can be ingested from various input sources like Flume, Kafka etc., processed and results are stored into filesystems, databases etc.

Spark Streaming can be configured to receive input data from Flume. The goal of this integration is receiving live data streams via Flume using Spark Streaming into Spark, processing it using Spark and sending the output to the end user in real time. This would enable the end user to process data much quicker than the time consumed when processing in a batch processing manner, thus saving time and money from a business perspective of the end user.
Spark streaming can be configured to receive data from Flume via two approaches:

1. Push-based Receiver
2. Pull-based Receiver

1. Push-based Receiver Approach:

In this approach, Spark Streaming sets up a receiver that acts as an Avro sink for Flume. Flume is configured to push the data into the Avro sink.

flume

 

Configuring Spark Streaming Applications

To configure a Spark Streaming Application, we follow the following steps:

  1. Linking: In your SBT/Maven project definition, link the streaming application against the following artifact.

groupId= org.apache.spark
artifactId= spark-streaming-flume_2.10
version= 1.2.1
  1. Programming: In the streaming of application code, import FlumeUtils (sets up the receiver to be started on a specific worker’s hostname and port) and create input DStream (Discretized Stream) as follows.

import org.apache.spark.streaming.flume.*;
JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine’s hostname], [chosen port]);
  1. Deploying: Package spark-streaming-flume_2.10 and its dependencies (except spark-core_2.10 and spark-streaming_2.10 which are provided by spark-submit) into the application JAR. Then use spark-submit to launch your application.

Advantages:

  • It can be set up quickly and easily.

Disadvantages:

  • Being an unreliable receiver, it does not make use of transactions to receive data. This may lead to data loss in case of the failure of the worker node running the receiver.

  • If the Spark worker running the receiver fails, system will try to launch the receiver at a different location and Flume will need to be reconfigured to send to the new worker.

Configuring Flume Agent:

We configure the flume agent in the following manner:

# Flume Configuration for Push based Approach Spark-Flume Integration
# Name the components on this agent
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# To input data into flume open a new terminal and use the following command: telnet hostname port no
# For the following configuration the command would be : telnet localhost 41444
# The data that is written onto this terminal will be the data that comes into flume.
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 41444
# Describe the sink
# The hostname and port no mentioned for the sink should be the same as the parameters used while running the spark-flume integration code.
a1.sinks.k1.type = avro
a1.sinks.k1.channel = memoryChannel
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9988
# Use a channel which buffers events in memory
# create the directory specified below else flume will give an error.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.dataDirs = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. Pull-based Receiver Approach:

This approach runs a custom Flume sink. Flume pushes data into the intermediate sink, where the data stays buffered and Spark Streaming uses a reliable Flume receiver and transactions to pull data from the intermediate sink. Transactions succeed only after data is received and replicated by Spark Streaming.

flumesink

  1. Configuring Flume

We need to set up the custom sink as a third-party plug-in for Flume. Since the plug-in is written in Scala, we need to add both the plug-in and the Scala library to Flume’s plug-ins.

Configuring Flume on the chosen machine requires the following two steps.

  1. Sink JARs: Add the following JARs to Flume’s class path in the machine designated to run the custom sink.

  1. Custom sink JAR: Download the JAR corresponding to the following artifact

    groupId = org.apache.spark
    artifactId = spark-streaming-flume-sink_2.10
    version = 1.2.1
  2. Scala library JAR: Download the Scala library JAR for Scala 2.10.4. It can be found with the following artifact detail

    groupId = org.scala-lang
    artifactId = scala-library
    version = 2.10.4
  3. Configuration file: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file

    agent.sinks = spark
    agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname =
    agent.sinks.spark.port = agent.sinks.spark.channel = memoryChannel

Configuring Spark Streaming Application:

To configure a Spark Streaming Application, we follow the following steps:

  1. Linking: In the SBT/Maven project definition, link the streaming application against the spark-streaming-flume_2.10.

  2. Programming: With the data being buffered in the sink, we can now use FlumeUtils to read it. In the streaming application code, import FlumeUtils and create input DStream (Discretized stream) as follows.

import org.apache.spark.streaming.flume.*;
JavaReceiverInputDStreamflumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);

 

The DStream is composed of SparkFlumeEvents. We can access the underlying AvroFlumeEvent through event. Each input DStream can be configured to receive data from multiple sinks.

  1. Deploying: Package spark-streaming-flume_2.10 and its dependencies (except spark-core_2.10 and spark-streaming_2.10 which are provided by spark-submit) into the application JAR. Then use spark-submit to launch the application.

Advantages:

  • Being a reliable receiver, it ensures stronger reliability and fault-tolerance guarantees as the data remains in the sink until Spark Streaming reads and replicates it and tells the sink via a transaction.

Configuring Flume Agent:

We configure a flume agent for this approach in the following manner:

# Flume Configuration for Pull based Approach Spark-Flume Integration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# To input data into flume open a new terminal and use the following command: telnet hostname port no
# For the following configuration the command would be : telnet localhost 41444
# The data that is written onto this terminal will be the data that comes into flume.
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 41444
# Describe the sink
# The hostname and port no mentioned for the sink should be the same as the parameters used while running the spark-flume integration code.
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = memoryChannel
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9999
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.dataDirs = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Ways to increase the Level of Parallelism while Integrating Spark Streaming with Flume

A common way to reduce the processing time of batches is to increase parallelism. We can increase parallelism in the following ways.

  1. Increasing the number of receivers

  2. Explicitly repartitioning received data

The above methods can be used with both the Push based approach and Pull based approach for Spark Streaming Flume Integration.

Increasing the number of receivers

Sometimes, Receivers act as a bottleneck when there are too many records for a single machine to read in and distribute. Then we consider parallelizing the data receiving. Since each input DStream (Discretized Stream) creates a single receiver that receives a single stream of data, hence multiple data streams can be received by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source or data from different sources itself. These multiple DStreams can then be put together to create a single Dstream using the union operator. Then the required transformations that were being applied on the single input DStream can be applied on the unified stream.

Use cases for using multiple receivers:

  • To increase the aggregate throughput of the ingestion (if a single receiver becomes the bottleneck).

  • There are multiple sources from where the data needs to be collected, unified and processed together. Different receivers are created on different sources to receive different kinds of data, which are then combined using joins or co-groups or unions.

union(otherStream) – It returns a new DStream that contains the union of the elements in the source DStream and otherDStream

 

JavaReceiverInputDStream flumeStream1 = FlumeUtils.createPollingStream(ssc, host, port);
JavaReceiverInputDStream flumeStream2 = FlumeUtils.createPollingStream(ssc, host2, port2);
JavaDStream flumeStream = flumeStream1.union(flumeStream2);

Explicitly repartitioning received data

If receivers cannot be increased anymore, we can further redistribute the received data by explicitly repartitioning the input stream using Dstream repartition. This distributes the received batches of data across specified number of machines in the cluster before further processing.

 

repartition(numPartitions) – It changes the level of parallelism in this DStream by creating more or fewer partitions.

 

JavaReceiverInputDStream flumeStream = FlumeUtils.createPollingStream(ssc, host, port);
JavaDStream flumeStream2=flumeStream.repartition(10);

- Sandeep Mehta
Big Data Architect
DataMetica

Leave a Comment

POST COMMENT Back to Top
*
Contact Us

We're not around right now. But you can send us an email and we'll get back to you, asap.