61
Tweets Sentiment Analysis with Apache Spark - Python
Data streaming is the process of transmitting a continuous flow of data (also known as streams) typically fed into stream processing software to derive valuable insights. A data stream consists of a series of data elements ordered in time.
In this blog we will use tweepy and PySpark to connect together and stream tweets replies from Twitter API that's related to specified topics then analyze this tweet replies with Machine Learning models to evaluate sentiments which is Positive or Negative.
Before we start it's necessary to have a Twitter Developer Account to get an access to the Twitter API so we can connect it with PySpark and get the data.
You can get apply to a Developer Account from here.
Now let's start with the codes:
We need to import necessary libraries like:
#Necessary Libraries
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
With tweepy we can authenticate our Twitter API credentials, Also it creates a listener instance to connect with PySpark that we will cover later in the blog.
We need socket to create a localhost and a port to connect with Spark.
And json is used to process the message we get from the Twitter API as it's been streamed in json.
We need to insert our credentials:
#Twitter API credentials
CONSUMER_KEY = "secret"
CONSUMER_SECRET = "secret"
ACCESS_TOKEN = "secret"
ACCESS_SECRET = "secret"
After we create the Developer account we should get four credentials:
Consumer Key
Consumer Secret
Access Token
Access Secret
We will use these authentications to connect to the Twitter API.
The user specify the topics:
#User inputs keywords, Topic of the tweets
keywords = []
n = int(input("Enter the number of keywords for your tweets topic\n"))
for i in range(n):
i = input("Enter keyword number: " + str(i+1) + '\n')
keywords.append(i)
We need the user to specify the topics of the tweets replies so we take an input to take the number of keywords the user wants, Then we create a for loop for the number specified and take a input for each.
Create Stream Listener object:
#StreamListener object to get the data from Twitter API
class TweetsListener(StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
#Load the data as json structure
msg = json.loads(data)
#Get only extendable replies from the tweets
if isinstance(msg['in_reply_to_status_id_str'], str):
if "extended_tweet" in msg:
self.client_socket.send(str(msg['extended_tweet']['full_text']+"\n").encode('utf-8'))
print(msg['extended_tweet']['full_text'])
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def if_error(self, status):
print(status)
return True
The StreamListener object will connect with the Twitter API getting the tweets replies for the specified topics, we need to specify in_reply_to_status_id_str to get only replies of the tweets.
Validate the stream:
def send_tweets(c_socket):
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
twitter_stream = Stream(auth, TweetsListener(c_socket))
twitter_stream.filter(track=keywords, languages=['en'])
Another important function to validate the credentials and connect send it to connect the Twitter API with the Listener, Also here we define the keywords the user inputted earlier, And the languages of the tweet replies.
Group it all together in the main function:
if __name__ == "__main__":
#Initiate the socket object
new_skt = socket.socket()
#local machine address (changeble)
host = "127.0.0.2"
#Specific port for the service
port = 9003
#Binding host and port
new_skt.bind((host, port))
print("Now listening on port: %s" % str(port))
#waiting for client connection
new_skt.listen(5)
#Establish connection with client. it returns first a socket object,c, and the address bound to the socket
c, addr = new_skt.accept()
print("Received request from: " + str(addr))
# and after accepting the connection, we aill sent the tweets through the socket
send_tweets(c)
Here we create the socket that binds the localhost and the port together and make a request for PySpark to connect, So when we run the PySpark code we should connect with the stream listener.
Note: This code should run with an IDE different from PySpark's, So we will run this code in CMD as follows:
Import necessary libraries
#Import necessary libraries
import findspark
findspark.init('E:\spark-3.1.2-bin-hadoop3.2')
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
We import findspark to locate our Spark folder, and SparkContext to create a new instance of Spark (You can only create one per run) and StreamingContext is used for initiating the stream context, And SQLContext is necessary for initiating the SQL context.
Initializing variables
sc = SparkContext()
#Initiating the StreamingContext with 10 second batch interval.
ssc = StreamingContext(sc, 10)
#Initiating sqlcontext
sqlContext = SQLContext(sc)
Here we create a Spark Context and storing it in sc and initiating both StreamingContext with 10 second batch interval and SQLContext.
Initiating socket stream and process the tweet replies
#Initiating streaming text from a TCP (socket) source:
socket_stream = ssc.socketTextStream("127.0.0.2", 9003)
#Lines of tweets with socket_stream window of size 60
lines = socket_stream.window(100)
#Creating a tuple to assign names
from collections import namedtuple
Tweet = namedtuple( 'Tweet', 'line' )
#Applying different operations on the tweets and save them to a temporary sql table
( lines.map( lambda word: ( word.lower(), 1 ) )
.map( lambda rec: Tweet( rec[0]) )
.foreachRDD( lambda rdd: rdd.toDF().limit(1000)
.registerTempTable("tweets") ) )
We create a socket_stream variable with the same localhost and port we specified before in Part 1, This is necessary to connect with the Respond to the Stream Listener request.
Also we take each line streamed and store it in a registerTempTable named (Tweets).
Start stream
#Start Streaming
ssc.start()
So simple !, Start the stream.
Start function gives the order to connect to Stream Listener applying all the code above.
Let's take a look at the handsome data streaming:
Export to csv
#Export tweet replies in a csv file
import time
count = 0
while count < 100:
time.sleep(10)
lines_sql = sqlContext.sql( 'Select line from tweets' )
lines_df = lines_sql.toPandas()
lines_df.to_csv('tweet_data.csv', mode='a', header=False)
count += 1
This code creates a loop that updates every 10 seconds for a specified number of times, Taking lines stored in tweets, Converts it to DataFrame, And append it's content into a csv file,
Note: Each loop could take already stored replies, So this is why we need Excel to do some cleaning.
This part is only one step, the data we got is really messy, Look at that:
We got over 19k line but nearly all of them are duplicates, First we need to delete unnecessary columns and second we need to remove duplicates, Look at that:
After removing duplicated rows we got about 1.7k lines, We could do these step with python but as we has a csv file It's faster to do it with Excel.
In this part we used a big dataset from Kaggle.com, That has a 1,6 million tweets with their sentiment analysis, you can find this dataset here.
We will use this dataset to train our model.
Import necessary libraries
import pandas as pd
import numpy as np
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import re
import nltk
import matplotlib.pyplot as plt
import seaborn as sns
We will use WordNetLemmatizer to convert the word to it's base form, TfidfVectorizer is used to tokenize the words into a chunks of two, LogisticRegression is our technique predict the sentiments, train_test_split is used for testing the score of our model, classification_report is used to display the result or the score of the model, re is used for the preprocessing phase to apply changes on words, nltk is used to download the WordNetLemmatizer, matplotlib and seaborn are used for visualizations
Import the dataset
#Reading the csv file storing it into a DataFrame
df = pd.read_csv('E:\DigiSay Project\Trained Tweets.csv', encoding='latin-1' , names = ["sentiment", "ids", "date", "flag", "user", "text"])
df
#Negative = 0 , Positive = 1 , Neutral = 2
df.sentiment.replace(4,1, inplace = True)
#Store the two columns values into two seperate arrays (For Machine Learning stage)
text , sentiment = df['text'].values , df['sentiment'].values
First we import the dataset we got from Kaggle.com as csv, We will store the file in a DataFrame, The positive value in this dataset is 4 by default so we will change it to 1 for realism.
Then we store the two columns (tweets) and (sentiment) into two arrays to use it later in the preprocessing phase.
Plotting the distribution of the Sentiments
#Check the number of Positive, Natural and Negative Sentiments
ax = plt.axes()
ax.set_facecolor("#F5F5DC")
sns.countplot(x='sentiment',data=df, palette = "Set1",\
edgecolor='black', linewidth=0.3,alpha=0.7)
plt.title('Sentiment Distribution',fontsize=14)
plt.xlabel('Sentiment',fontsize=10)
plt.show()
To understand how the distribution of sentiments work we plotted the following plot:
We can tell from the plot that there's only Positive and Negative values of sentiment and they are equal.
Getting ready for the preprocess
#Pre-defined emoji list
emojis = {':)': 'smile', ':-)': 'smile', ';d': 'wink', ':-E': 'vampire', ':(': 'sad',
':-(': 'sad', ':-<': 'sad', ':P': 'raspberry', ':O': 'surprised',
':-@': 'shocked', ':@': 'shocked',':-$': 'confused', ':\\': 'annoyed',
':#': 'mute', ':X': 'mute', ':^)': 'smile', ':-&': 'confused', '$_$': 'greedy',
'@@': 'eyeroll', ':-!': 'confused', ':-D': 'smile', ':-0': 'yell', 'O.o': 'confused',
'<(-_-)>': 'robot', 'd[-_-]b': 'dj', ":'-)": 'sadsmile', ';)': 'wink',
';-)': 'wink', 'O:-)': 'angel','O*-)': 'angel','(:-D': 'gossip', '=^.^=': 'cat'}
#Pre-defined Stopword list
stopwordlist = ['a', 'about', 'above', 'after', 'again', 'ain', 'all', 'am', 'an',
'and','any','are', 'as', 'at', 'be', 'because', 'been', 'before',
'being', 'below', 'between','both', 'by', 'can', 'd', 'did', 'do',
'does', 'doing', 'down', 'during', 'each','few', 'for', 'from',
'further', 'had', 'has', 'have', 'having', 'he', 'her', 'here',
'hers', 'herself', 'him', 'himself', 'his', 'how', 'i', 'if', 'in',
'into','is', 'it', 'its', 'itself', 'just', 'll', 'm', 'ma',
'me', 'more', 'most','my', 'myself', 'now', 'o', 'of', 'on', 'once',
'only', 'or', 'other', 'our', 'ours','ourselves', 'out', 'own', 're',
's', 'same', 'she', "shes", 'should', "shouldve",'so', 'some', 'such',
't', 'than', 'that', "thatll", 'the', 'their', 'theirs', 'them',
'themselves', 'then', 'there', 'these', 'they', 'this', 'those',
'through', 'to', 'too','under', 'until', 'up', 've', 'very', 'was',
'we', 'were', 'what', 'when', 'where','which','while', 'who', 'whom',
'why', 'will', 'with', 'won', 'y', 'you', "youd","youll", "youre",
"youve", 'your', 'yours', 'yourself', 'yourselves']
#A lemmatizer job is to convert the word to the base form
Lem = WordNetLemmatizer()
#Patterns to be replaced
urlPattern = r"((http://)[^ ]*|(https://)[^ ]*|( www\.)[^ ]*)"
userPattern = '@[^\s]+'
alphaPattern = "[^a-zA-Z0-9]"
sequencePattern = r"(.)\1\1+"
seqReplacePattern = r"\1\1"
Here we define a predefined lists these will help us replace emojis and stopwords and url patterns and user patterns and non alpha patterns and sequence patterns in the pre process phase, Also we Initialized the Word Lemmatizer so we also use it in the pre process phase.
Preprocess Function
#Data cleaning function
def Data_preprocess(textdata , processedText):
for tweet in textdata:
tweet = tweet.lower()
#Replace all URls with 'URL'
tweet = re.sub(urlPattern,' URL',tweet)
#Replace all emojis
for emoji in emojis.keys():
tweet = tweet.replace(emoji, "EMOJI" + emojis[emoji])
#Replace @USERNAME to 'USER'
tweet = re.sub(userPattern,' USER', tweet)
#Replace all non alphabets
tweet = re.sub(alphaPattern, " ", tweet)
#Replace 3 or more consecutive letters by 2 letter
tweet = re.sub(sequencePattern, seqReplacePattern, tweet)
tweetwords = ''
for word in tweet.split():
#Checking if the word is a stopword
#if word not in stopwordlist:
if len(word)>1:
#Lemmatizing the word
word = Lem.lemmatize(word)
tweetwords += (word+' ')
processedText.append(tweetwords)
return processedText
Here we define a Pre Process function that we will use on both the training data and our streamed data, This function takes each tweet and lowers it with lower function, Then replaces all URL,Emojis,usernames,Non Alphabets, and sequenced words, Also in the function we use the word Lemmatizer to converts each word to it's base form.
Training the data
#Initiating an array
processedBigData = []
#Cleaning the text array
Data_preprocess(text,processedBigData)
#The beginning of the model test phase starts with splitting the data into train and test,
#It's a big data so 5% portion to test is very fine
X_train, X_test, y_train, y_test = train_test_split(processedBigData, sentiment,
test_size = 0.05, random_state = 0)
#Creating a Vectoriser
#A Vectoriser with range of (1,2) ngrams means we will take the words on two portions
vectoriser = TfidfVectorizer(ngram_range=(1,2), max_features=500000)
#Fitting the Vectoriser for each train tweets and test tweets
vectoriser.fit(X_train)
X_train = vectoriser.transform(X_train)
X_test = vectoriser.transform(X_test)
#Creating Logistic Regression object
lr = LogisticRegression(C = 2, max_iter = 1000, n_jobs=-1)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print(classification_report(y_test, y_pred))
We apply the preprocess function on the text array that contains 1,6 millions tweet and store it in an empty list.
After that we call the train_test_split function to take 95% of data to train and 5% of data to test the score, We tokenize the words with TfidfVectorizer, We will tokenize tweets in a chunks of two so if we take the sentence(I love you) it tokenize (I love) and (love you).
After tokenizing the tweet we will apply Logistic Regression on it and evaluate the test score, We got 83% score as follows:
precision recall f1-score support
0 0.83 0.82 0.83 39989
1 0.82 0.84 0.83 40011
accuracy 0.83 80000
macro avg 0.83 0.83 0.83 80000
weighted avg 0.83 0.83 0.83 80000
Importing and cleaning the streamed data
#Reading the streamed tweets csv file storing it into a DataFrame
tweets_df = pd.read_csv('E:\DigiSay Project\Tweets.csv', names = ['text'],encoding='latin-1')
#Storing the tweets into an array
tweetText = tweets_df['text'].values
len(tweetText)
1714
We import our data after cleaning it from Excel and store the tweet replies in an array, We can tell that we have 1714 rows which means we have 1714 tweet.
#Initiating two lists for cleaning purposes
tweetsNOTclean = []
tweetsNOTclean2 = []
#Cleaning the data with our Preprocessing function
Data_preprocess(tweetText,tweetsNOTclean)
#Choosing only tweets contains strings with our chosed topics and storing it into the second list
for text in tweetsNOTclean:
if ('squid game' in text) or ('sae byeok' in text):
tweetsNOTclean2.append(text)
print(len(tweetsNOTclean2))
945
We take each tweet to check if they contain our key words or not, In this case (Squid Game) and (Sae Byeok), If the tweet doesn't contain any of our keywords so we don't need it, So we drop it, After this cleanse we found that our tweet replies became 945.
#Checking for duplicates
df_preprocessed = pd.DataFrame(tweetsNOTclean2)
df_preprocessed.duplicated().sum()
170
We convert our list to a DataFrame to count duplicates, there's 170 duplicates out of 945 text.
#Dropping duplicates and storing the final result in an 1d list
preprocessedTweets = df_preprocessed.drop_duplicates().values.flatten().tolist()
print(len(preprocessedTweets ), type(preprocessedTweets))
preprocessedTweets
Now we drop the duplicates then convert our DataFrame to a 1 dimension list so we can apply transformation in it.
775 <class 'list'>
['USER liked it more than squid game ',
'USER is squid game must see inquiring mind want know ',
'USER USER just finished the squid game and it actually decent ',
'USER squid game mai ha betrayal to pakistani while playing the game ',
'USER squid game didn really stick the landing ',
'but now squid game put me in drama mood ll have to see if can find something little more uplifting after finish ',
'USER yeah don think in squid game case it wa bad enough to ruin the whole show but it really did disappoint compared to the entire middle of the show ',
'this awesome uncle put together some lego while watching squid game ',
'USER USER USER USER no the way one dy is very important dying honourably is creed everyone sld follow didn he die eventually can save ur life bcoz want to do whatever it take to save it our living right now is squid game our choice decision action all part of game ',
'USER and magic memory an opponent never have existed even if it not reciprocal equally an opponent will never exist mm think still don know lot and sometimes fear paralyze me didn know it amp don know what to dott think would be fine to see squid game ',
We now have 775 tweet replies to predict their sentiment, Look how shiny it is đ€©.
Predicting Sentiment
#Fitting the train data
vectoriser.fit(processedBigData)
#Transforming each Train tweets and our streamed tweets
X_train = vectoriser.transform(processedBigData)
X_test = vectoriser.transform(preprocessedTweets)
#Creating the Logistic Regression model
lr = LogisticRegression(C = 2, max_iter = 1000, n_jobs=-1)
lr.fit(X_train, sentiment)
#Predicting each sentiment for each tweet
y_pred = lr.predict(X_test)
#Storing the results into a DataFrame
finalResult = pd.DataFrame(list(zip(preprocessedTweets, y_pred)) , columns=['Tweets','Sentiment'])
finalResult['Sentiment'].replace([1,0],['Positive','Negative'],inplace=True)
After all the cleaning, We can now vectorize,fit, and transform our data, We will use Logistic Regression here also then we get the sentiment predictions and store it in a DataFrame with the text, After that we replace (1,0) with (Positive,Negative) respectively.
finalResult
Tweets Sentiment
0 USER liked it more than squid game Positive
1 USER is squid game must see inquiring mind wan... Positive
2 USER USER just finished the squid game and it ... Positive
3 USER squid game mai ha betrayal to pakistani w... Negative
4 USER squid game didn really stick the landing Negative
... ... ...
770 USER still mad and for you info ratmy squid ga... Positive
771 maybe it wa not the squid game saw that she wa... Negative
772 sang woo and sae byeok plan together Positive
773 sae byeok and sang woo are similar character i... Positive
774 USER USER USER USER to me squid game basically... Positive
This is our final DataFrame with each tweet and it's sentiment.
#Displaying the distribution of the positive and negative sentiments
ax = plt.axes()
ax.set_facecolor("#F5F5DC")
sns.countplot(x='Sentiment',data=finalResult, palette = "Set1",\
edgecolor='black', linewidth=0.3,alpha=0.7)
plt.title('Sentiment Distribution',fontsize=14)
plt.xlabel('Sentiment',fontsize=10)
plt.show()
61