< Back to Blog

Bringing Big Tools to Big Data: Spark and Spark Streaming

Bringing Big Tools to Big Data: Spark and Spark Streaming

Data is the oil of the 21st century — a valuable commodity prized and hoarded throughout the business world. In an attempt to turn users’ preferences into business profits, every internet activity is stored and analyzed. As a result, internet profile data is increasing rapidly, both in size and rate of reception. Increasingly, we need flexible and powerful tools to process and store big data. Fortunately, that’s exactly where Spark can help.

Using the Twitter streaming API as an example, I’ll lay out the various benefits of using Spark and Spark Streaming.

Twitter Streaming

First, we need to import all necessary dependencies. To do this, we will use the tweepy library, which is a simple Twitter client for Python. Let’s create a file called tweet_stream.py, filling it with the following lines:

import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json

Then, we need to set up your Twitter credentials as follows:

consumer_key=’<YOUR_CONSUMER_KEY>’
consumer_secret=’<YOUR_CONSUMER_SECRET>’
access_token =’<YOUR_ACCESS_TOKEN>’
access_secret=’<YOUR_ACCESS_SECRET>’

After that, we will create a TweetsListener, which will be responsible for the streaming itself. For initialization, we provide a socket object and implement two methods — on_data and on_error. The first method is responsible for receiving data from the Twitter stream and sending it to a socket. The second is used for receiving error messages.

class TweetsListener(StreamListener):

  def __init__(self, csocket):
      self.client_socket = csocket

  def on_data(self, data):
      try:
          msg = json.loads( data )
          self.client_socket.send( msg['text'].encode('utf-8') )
          return True
      except BaseException as e:
          print("Error on_data: %s" % str(e))
      return True

  def on_error(self, status):
      print(status)
      return True

Next, we’ll connect to Twitter streaming. To do this, we’ll create a sendDate method — which will have a socket object as a parameter — where we authorize with our credentials and create a Stream object instance. For this example, we will pick every tweet containing the guitar tag.

def sendData(c_socket):
  auth = OAuthHandler(consumer_key, consumer_secret)
  auth.set_access_token(access_token, access_secret)

  twitter_stream = Stream(auth, TweetsListener(c_socket))
  twitter_stream.filter(track=['guitar'])

Finally, we will create a socket object and start the streaming process. To do this, we’ll follow these steps (see the corresponding code in the snippet below):

  1. Create a socket object.
  2. Obtain the local machine name. Currently it is 0.0.0.0.
  3. Reserve a port for your service on the host.
  4. Bind to the port.
  5. Wait for a client connection.
  6. Establish a connection with client.
  7. Call sendData method to build a new connection.
1   s = socket.socket()
2   host = "0.0.0.0"
3   port = 5555
4   s.bind1host, port
5   s.listen(5)
6   c, addr = s.accept()
7   sendData(c)

And that’s it for Twitter streaming API. We have just built a listener class, which is responsible for sending data to a socket and picking up every tweet with a specific tag. Now let’s move on to Spark.

Spark Streaming

Notice: The next example was run via Jupyter Notebook.

To begin, as in the previous example, we will first import dependencies:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple

Then, we’ll need to create a SparkContext instance. (Be warned, if you try to run this line more than once, you may have an error.)

sc = SparkContext()

The next step is to create StreamingContext and SQLContext instances. The first allows us to get access to data streaming; the second is an SQL query builder.

ssc = StreamingContext(sc, 10 )
sqlContext = SQLContext(sc)

Next, let’s create a socketTextStream, where we will be expecting a Twitter streaming connection (described in the previous section).

socket_stream = ssc.socketTextStream("0.0.0.0", 5555)

Then, create a DStream via window method on socket_stream.

lines = socket_stream.window( 20 )

Now, we need to create a class via namedtuple. This is a very simple approach to creating a class. First, we need to define the fields; then, we pass them — along with a class name — to the namedtuple method. For every tweet, we want to store its name and count. Thus, our fields will be tag and count.

fields = ("tag", "count" )
Tweet = namedtuple( 'Tweet', fields )

The next lines of code may look frustrating, so I’ll first go over what we’re going to do in each line (see the corresponding code in the snippet below):

  1. Grab the tweet line and split it into a word list.
  2. Check for hashtag calls. Filter the message with the hashtag.
  3. Convert the words to lowercase.
  4. Count and remove duplicates.
  5. Store the words in the Tweet object.
  6. Sort them in a DataFrame.
  7. Register to a table. (Refreshes will be executed every 10 seconds.)
1   ( lines.flatMap( lambda text: text.split( " " ) )
2   .filter( lambda word: word.lower().startswith("#") )
3   .map( lambda word: ( word.lower(), 1 ) )
4   .reduceByKey( lambda a, b: a + b )
5   .map( lambda rec: Tweet( rec[0], rec[1] ) )
6   .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") )
7   .limit(10).registerTempTable("tweets") ) )

Running tweet_stream.py

Now it’s time to run the pyspark Stream instance:

ssc.start()

When you run it from the terminal, you will see a list of tweets containing our particular tag.

For the final part of the example, we will import all necessary dependencies:

import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline # Only works for Jupyter Notebooks!

Now, we will make 10 plots. Before each of them, we will set the sleep time to 3 and grab the top 10 tweets from SQLContext instance. Next, we will transform the results to a DataFrame and build a plot for each of them.

count = 0
while count < 10:
    
    time.sleep( 3 )
    top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait=True)
    sns.plt.figure( figsize = ( 10, 8 ) )
    sns.barplot( x="count", y="tag", data=top_10_df)
    sns.plt.show()
    count = count + 1

The result will be a plot that will change every 3 seconds. It will look like the following example:
Plot graph
 

About the Author

Vadim Sokoltsov joined the Distillery team in 2017. Being a Ruby on Rails professional, he also loves learning Python and experimenting with new technologies. His true passion, however, is for big data and machine learning – and he's always prepared to use his mixed martial arts training to defend his point of view on these topics.

References   [ + ]

1. host, port