Prepare analytics based on streaming data from Twitter using Python

Hi Guys!

Today, we will be projecting an analytics storyline based on streaming data from twitter’s developer account.

I want to make sure that this solely for educational purposes & no data analysis has provided to any agency or third-party apps. So, when you are planning to use this API, make sure that you strictly follow these rules.

In order to create a streaming channel from Twitter, you need to create one developer account.

As I’m a huge soccer fan, I would like to refer to one soccer place on Twitter for this. In this case, we’ll be checking BA Analytics for this.

6. Origin_Site

Please find the steps to create one developer account –

Step -1: 

You have to go to the following link. Over there you need to submit the request in order to create the account. You need to provide proper justification as to why you need that account. I’m not going into those forms. They are self-explanatory.

Once, your developer account activated, you need to click the following link as shown below –

1. TwitterSetup

Once you clicked that, the program will lead to you the following page –

2. TwitterSetup - Continue

If you don’t have any app, the first page will look something like the above page.

Step 2:

3. TwiterSetup - Continue

Now, you need to fill-up the following details. For security reasons, I’ll be hiding sensitive data here.

Step 3:

4. TwitterSetUp - Continue

After creating that, you need to go to the next tab i.e. key’s & tokens. The initial screen will only have Consumer API keys.

Step 4:

To generate the Access token, you need to click the create button from the above screenshot & then the new page will look like this –

5. TwitterSetUp - Continue

Our program will be using all these pieces of information.

So, now we’re ready for our Python program.

In order to access Twitter API through python, you need to install the following package –

pip install python-twitter

Let’s see the directory structure –

7. Directory

Let’s check only the relevant scripts here. We’re not going to discuss the clsL.py as we’ve already discussed. Please refer to the old post.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'ACCESS_TOKEN': '99999999-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        'ACCESS_SECRET': 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY',
        'CONSUMER_KEY': "aaaaaaaaaaaaaaaaaaaaaaa",
        'CONSUMER_SECRET': 'HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH',
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

For security reasons, I’ve removed the original keys with dummy keys. You have to fill-up your own keys.

2. clsTwitter.py (This script will fetch data from Twitter API & process the same & send it to the calling method.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 12-Oct-2019               ####
#### Modified On 12-Oct-2019               ####
####                                       ####
#### Objective: Main class fetching sample ####
#### data from Twitter API.                ####
###############################################

import twitter
from clsConfig import clsConfig as cf
import json
import re
import string
import logging

class clsTwitter:
    def __init__(self):
        self.access_token = cf.config['ACCESS_TOKEN']
        self.access_secret = cf.config['ACCESS_SECRET']
        self.consumer_key = cf.config['CONSUMER_KEY']
        self.consumer_secret = cf.config['CONSUMER_SECRET']

    def find_element(self, srcJson, key):
        """Pull all values of specified key from nested JSON."""
        arr = []

        def fetch(srcJson, arr, key):
            """Recursively search for values of key in JSON tree."""
            if isinstance(srcJson, dict):
                for k, v in srcJson.items():
                    if isinstance(v, (dict, list)):
                        fetch(v, arr, key)
                    elif k == key:
                        arr.append(v)
            elif isinstance(srcJson, list):
                for item in srcJson:
                    fetch(item, arr, key)
            return arr

        finJson = fetch(srcJson, arr, key)
        return finJson

    def searchQry(self, rawQry):
        try:
            fin_dict = {}
            finJson = ''
            res = ''
            cnt = 0

            # Parameters to invoke Twitter API
            ACCESS_TOKEN = self.access_token
            ACCESS_SECRET = self.access_secret
            CONSUMER_KEY = self.consumer_key
            CONSUMER_SECRET = self.consumer_secret

            tmpR20 = 'Raw Query: ' + str(rawQry)
            logging.info(tmpR20)

            finJson = '['

            if rawQry == '':
                print('No data to proceed!')
                logging.info('No data to proceed!')
            else:
                t = twitter.Api(
                                  consumer_key = CONSUMER_KEY,
                                  consumer_secret = CONSUMER_SECRET,
                                  access_token_key = ACCESS_TOKEN,
                                  access_token_secret = ACCESS_SECRET
                               )

                response = t.GetSearch(raw_query=rawQry)
                print('Total Records fetched:', str(len(response)))

                for i in response:

                    # Converting them to json
                    data = str(i)
                    res_json = json.loads(data)

                    # Calling individual key
                    id = res_json['id']
                    tmpR19 = 'Id: ' + str(id)
                    logging.info(tmpR19)

                    try:
                        f_count = res_json['quoted_status']['user']['followers_count']
                    except:
                        f_count = 0
                    tmpR21 = 'Followers Count: ' + str(f_count)
                    logging.info(tmpR21)

                    try:
                        r_count = res_json['quoted_status']['retweet_count']
                    except:
                        r_count = 0
                    tmpR22 = 'Retweet Count: ' + str(r_count)
                    logging.info(tmpR22)

                    text = self.find_element(res_json, 'text')

                    for j in text:
                        strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
                        pat = re.compile(r'[\t\n]')
                        strG = pat.sub("", strF)
                        res = "".join(strG)

                    # Forming return dictionary
                    #fin_dict.update({id:'id', f_count: 'followerCount', r_count: 'reTweetCount', res: 'msgPost'})
                    if cnt == 0:
                        finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
                    else:
                        finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

                    cnt += 1

            finJson = finJson + ']'

            jdata = json.dumps(finJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails' : x}

            return ResJson

The key lines from this snippet are as follows –

def find_element(self, srcJson, key):
    """Pull all values of specified key from nested JSON."""
    arr = []

    def fetch(srcJson, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(srcJson, dict):
            for k, v in srcJson.items():
                if isinstance(v, (dict, list)):
                    fetch(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(srcJson, list):
            for item in srcJson:
                fetch(item, arr, key)
        return arr

    finJson = fetch(srcJson, arr, key)
    return finJson

This function will check against a specific key & based on that it will search from the supplied JSON & returns the value. This would be particularly very useful when you don’t have any fixed position of your elements.

t = twitter.Api(
                  consumer_key = CONSUMER_KEY,
                  consumer_secret = CONSUMER_SECRET,
                  access_token_key = ACCESS_TOKEN,
                  access_token_secret = ACCESS_SECRET
               )

response = t.GetSearch(raw_query=rawQry)

In this case, Python application will receive the JSON response using the new Twitter API.

id = res_json['id']
try:
    f_count = res_json['quoted_status']['user']['followers_count']
except:
    f_count = 0
try:
    r_count = res_json['quoted_status']['retweet_count']
except:
    r_count = 0

Fetching specific fixed position elements from the response API.

text = self.find_element(res_json, 'text')

Fetching the dynamic position based element using our customized function.

for j in text:
    strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
    pat = re.compile(r'[\t\n]')
    strG = pat.sub("", strF)
    res = "".join(strG)

Removing non-printable characters & white spaces from the extracted text field in order to get clean data.

if cnt == 0:
    finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
else:
    finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

Finally, generating a JSON string dynamically.

jdata = json.dumps(finJson)
ResJson = json.loads(jdata)

And, returning the JSON to our calling program.

3. callTwitterAPI.py (This is the main script that will invoke the Twitter API & then project the analytic report based on the available Twitter data.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
#### Modified On 12-Oct-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsTwitter as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = 'q=from%3ABlades_analytic&src=typd'

        x1 = ct.clsTwitter()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        df_ret = p.read_json(ret_2, orient='records')

        # Resetting the column orders as per JSON
        df_ret = df_ret[list(res[0].keys())]

        l.logr('1.Twitter_' + var + '.csv', debug_ind, df_ret, 'log')

        print('Realtime Twitter Data:: ')
        logging.info('Realtime Twitter Data:: ')
        print(df_ret)
        print()

        # Checking execution status
        ret_val_2 = df_ret.shape[0]

        if ret_val_2 == 0:
            print("Twitter hasn't returned any rows. Please check your queries!")
            logging.info("Twitter hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)

        # Performing Basic Aggregate
        # 1. Find the user who has maximum Followers
        df_ret['MaxFollower'] = getMaximumFollower(df_ret)

        # 2. Find the user who has maximum Re-Tweets
        df_ret['MaxTweet'] = getMaximumRetweet(df_ret)

        # Getting Status
        df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

        # Dropping Columns
        df_MaxFollower.drop(['reTweetCount'], axis=1, inplace=True)
        df_MaxFollower.drop(['MaxTweet'], axis=1, inplace=True)

        l.logr('2.Twitter_Maximum_Follower_' + var + '.csv', debug_ind, df_MaxFollower, 'log')

        print('Maximum Follower:: ')
        print(df_MaxFollower)
        print("*" * 157)
        logging.info(tmpR0)

        df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]
        print()

        # Dropping Columns
        df_MaxTwitter.drop(['followerCount'], axis=1, inplace=True)
        df_MaxTwitter.drop(['MaxFollower'], axis=1, inplace=True)

        l.logr('3.Twitter_Maximum_Retweet_' + var + '.csv', debug_ind, df_MaxTwitter, 'log')

        print('Maximum Re-Twitt:: ')
        print(df_MaxTwitter)
        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

And, here are the key lines –

x1 = ct.clsTwitter()
ret_2 = x1.searchQry(rawQry)

Our application is instantiating the newly developed class.

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.read_json(ret_2, orient='records')

# Resetting the column orders as per JSON
df_ret = df_ret[list(res[0].keys())]

Converting the JSON to pandas dataframe for our analytic data point.

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

These two functions declared above in the calling script are generating the maximum data point from the Re-Tweet & Followers from our returned dataset.

# Getting Status
df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

And, this is the way, our application will fetch the maximum twitter dataset –

df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]

And, you can customize your output by dropping unwanted columns in the specific dataset.

And, here is the output on Windows, which looks like –

8. WindowsRun

And, here is the windows log directory –

WindowsRunLog

So, we’ve achieved our target data point.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.