Data is the oil of the 21st century — a valuable commodity prized and hoarded throughout the business world. In an attempt to turn users’ preferences into business profits, every internet activity is stored and analyzed. As a result, internet profile data is increasing rapidly, both in size and rate of reception. Increasingly, we need flexible and powerful tools to process and store big data. Fortunately, that’s exactly where Spark can help.
First, we need to import all necessary dependencies. To do this, we will use the tweepy library, which is a simple Twitter client for Python. Let’s create a file called tweet_stream.py, filling it with the following lines:
import tweepy from tweepy import OAuthHandler from tweepy import Stream from tweepy.streaming import StreamListener import socket import json
Then, we need to set up your Twitter credentials as follows:
consumer_key=’<YOUR_CONSUMER_KEY>’ consumer_secret=’<YOUR_CONSUMER_SECRET>’ access_token =’<YOUR_ACCESS_TOKEN>’ access_secret=’<YOUR_ACCESS_SECRET>’
After that, we will create a TweetsListener, which will be responsible for the streaming itself. For initialization, we provide a socket object and implement two methods — on_data and on_error. The first method is responsible for receiving data from the Twitter stream and sending it to a socket. The second is used for receiving error messages.
class TweetsListener(StreamListener): def __init__(self, csocket): self.client_socket = csocket def on_data(self, data): try: msg = json.loads( data ) self.client_socket.send( msg['text'].encode('utf-8') ) return True except BaseException as e: print("Error on_data: %s" % str(e)) return True def on_error(self, status): print(status) return True
Next, we’ll connect to Twitter streaming. To do this, we’ll create a sendDate method — which will have a socket object as a parameter — where we authorize with our credentials and create a Stream object instance. For this example, we will pick every tweet containing the guitar tag.
def sendData(c_socket): auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_secret) twitter_stream = Stream(auth, TweetsListener(c_socket)) twitter_stream.filter(track=['guitar'])
Finally, we will create a socket object and start the streaming process. To do this, we’ll follow these steps (see the corresponding code in the snippet below):
1 s = socket.socket() 2 host = "0.0.0.0" 3 port = 5555 4 s.bind( (host, port) ) 5 s.listen(5) 6 c, addr = s.accept() 7 sendData(c)
And that’s it for Twitter streaming API. We have just built a listener class, which is responsible for sending data to a socket and picking up every tweet with a specific tag. Now let’s move on to Spark.
Notice: The next example was run via Jupyter Notebook.
To begin, as in the previous example, we will first import dependencies:
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from pyspark.sql.functions import desc from collections import namedtuple
Then, we’ll need to create a SparkContext instance. (Be warned, if you try to run this line more than once, you may have an error.)
sc = SparkContext()
The next step is to create StreamingContext and SQLContext instances. The first allows us to get access to data streaming; the second is an SQL query builder.
ssc = StreamingContext(sc, 10 ) sqlContext = SQLContext(sc)
Next, let’s create a socketTextStream, where we will be expecting a Twitter streaming connection (described in the previous section).
socket_stream = ssc.socketTextStream("0.0.0.0", 5555)
Then, create a DStream via window method on socket_stream.
lines = socket_stream.window( 20 )
Now, we need to create a class via namedtuple. This is a very simple approach to creating a class. First, we need to define the fields; then, we pass them — along with a class name — to the namedtuple method. For every tweet, we want to store its name and count. Thus, our fields will be tag and count.
fields = ("tag", "count" ) Tweet = namedtuple( 'Tweet', fields )
The next lines of code may look frustrating, so I’ll first go over what we’re going to do in each line (see the corresponding code in the snippet below):
1 ( lines.flatMap( lambda text: text.split( " " ) ) 2 .filter( lambda word: word.lower().startswith("#") ) 3 .map( lambda word: ( word.lower(), 1 ) ) 4 .reduceByKey( lambda a, b: a + b ) 5 .map( lambda rec: Tweet( rec, rec ) ) 6 .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) 7 .limit(10).registerTempTable("tweets") ) )
Now it’s time to run the pyspark Stream instance:
When you run it from the terminal, you will see a list of tweets containing our particular tag.
For the final part of the example, we will import all necessary dependencies:
import time from IPython import display import matplotlib.pyplot as plt import seaborn as sns %matplotlib inline # Only works for Jupyter Notebooks!
Now, we will make 10 plots. Before each of them, we will set the sleep time to 3 and grab the top 10 tweets from SQLContext instance. Next, we will transform the results to a DataFrame and build a plot for each of them.
count = 0 while count < 10: time.sleep( 3 ) top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' ) top_10_df = top_10_tweets.toPandas() display.clear_output(wait=True) sns.plt.figure( figsize = ( 10, 8 ) ) sns.barplot( x="count", y="tag", data=top_10_df) sns.plt.show() count = count + 1
The result will be a plot that will change every 3 seconds. It will look like the following example:
Vadim Sokoltsov joined the Distillery team in 2017. Being a Ruby on Rails professional, he also loves learning Python and experimenting with new technologies. His true passion, however, is for big data and machine learning – and he's always prepared to use his mixed martial arts training to defend his point of view on these topics.