Kafka

. Real-Time Application (Twitter)

. Real-Time Application (Twitter)

 

Earlier we understood the integration of Kafka with Storm and Spark. Let us explore how to use it to get a real-time Twitter feed with hashtags using Twitter Streaming API.

 

First, you need to create a Twitter developer account to get the Twitter apps credentials. After you are done creating it, you will need a TwitterKafka Producer that can help -

 

  • Read the Twitter feed and process it.
  • Extract the hashtags.
  • Send hashtags to Kafka. 

 

Twitter Streaming API

 

You can access the Twitter Streaming API in any programming language. The open-source, Java-based module is known as ‘twitter4j’ which provides a listener-based framework to easily access tweets and Twitter Streaming API. For this process, we need to create a Twitter developer account.

After it is created, download the ‘twitter4j’ jar files and get these OAuth authentication details -

 

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTokenSecret



 

Here is the complete Twitter Kafka producer coding -

 

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.LinkedBlockingQueue;

 

import twitter4j.*;

import twitter4j.conf.*;

 

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

 

public class KafkaTwitterProducer {

   public static void main(String[] args) throws Exception {

      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);

      

      if(args.length < 5){

         System.out.println(

            "Usage: KafkaTwitterProducer <twitter-consumer-key>

            <twitter-consumer-secret> <twitter-access-token>

            <twitter-access-token-secret>

            <topic-name> <twitter-search-keywords>");

         return;

      }

      

      String consumerKey = args[0].toString();

      String consumerSecret = args[1].toString();

      String accessToken = args[2].toString();

      String accessTokenSecret = args[3].toString();

      String topicName = args[4].toString();

      String[] arguments = args.clone();

      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

 

      ConfigurationBuilder cb = new ConfigurationBuilder();

      cb.setDebugEnabled(true)

         .setOAuthConsumerKey(consumerKey)

         .setOAuthConsumerSecret(consumerSecret)

         .setOAuthAccessToken(accessToken)

         .setOAuthAccessTokenSecret(accessTokenSecret);

 

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();

      StatusListener listener = new StatusListener() {

        

         @Override

         public void onStatus(Status status) {      

            queue.offer(status);

 

            // System.out.println("@" + status.getUser().getScreenName() 

               + " - " + status.getText());

            // System.out.println("@" + status.getUser().getScreen-Name());

 

            /*for(URLEntity urle : status.getURLEntities()) {

               System.out.println(urle.getDisplayURL());

            }*/

 

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {

               System.out.println(hashtage.getText());

            }*/

         }

         

         @Override

         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {

            // System.out.println("Got a status deletion notice id:" 

               + statusDeletionNotice.getStatusId());

         }

         

         @Override

         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

            // System.out.println("Got track limitation notice:" + 

               num-berOfLimitedStatuses);

         }

 

         @Override

         public void onScrubGeo(long userId, long upToStatusId) {

            // System.out.println("Got scrub_geo event userId:" + userId + 

            "upToStatusId:" + upToStatusId);

         }      

         

         @Override

         public void onStallWarning(StallWarning warning) {

            // System.out.println("Got stall warning:" + warning);

         }

         

         @Override

         public void onException(Exception ex) {

            ex.printStackTrace();

         }

      };

      twitterStream.addListener(listener);

      

      FilterQuery query = new FilterQuery().track(keyWords);

      twitterStream.filter(query);

 

      Thread.sleep(5000);

      

      //Add Kafka producer config settings

      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");

      props.put("acks", "all");

      props.put("retries", 0);

      props.put("batch.size", 16384);

      props.put("linger.ms", 1);

      props.put("buffer.memory", 33554432);

      

      props.put("key.serializer", 

         "org.apache.kafka.common.serializa-tion.StringSerializer");

      props.put("value.serializer", 

         "org.apache.kafka.common.serializa-tion.StringSerializer");

      

      Producer<String, String> producer = new KafkaProducer<String, String>(props);

      int i = 0;

      int j = 0;

      

      while(i < 10) {

         Status ret = queue.poll();

         

         if (ret == null) {

            Thread.sleep(100);

            i++;

         }else {

            for(HashtagEntity hashtage : ret.getHashtagEntities()) {

               System.out.println("Hashtag: " + hashtage.getText());

               producer.send(new ProducerRecord<String, String>(

                  top-icName, Integer.toString(j++), hashtage.getText()));

            }

         }

      }

      producer.close();

      Thread.sleep(5000);

      twitterStream.shutdown();

   }

}

After creating the Twitter Kafka Producer, you can send tweets to Kafka. Simply run the Twitter Kafka Producer - Right-click KafkaTwitterProducer.java > Run configurations > Arguments.

Then add Twitter oAuth tokens and Kafka topic as shown below -

 

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:

. KafkaTwitterProducer <twitter-consumer-key>

<twitter-consumer-secret>

<twitter-access-token>

<twitter-ac-cess-token-secret>

my-first-topic games

 

The output will depend on the Twitter feed at that time and the keywords. Sample output is shown here -

 

. . .

games: 1

gaming: 2

gameinsights : 1

. . .

 

Top course recommendations for you

    Introduction to DevOps
    3 hrs
    Beginner
    54.9K+ Learners
    4.58  (2414)
    Introduction To AngularJS
    2 hrs
    Beginner
    22.8K+ Learners
    4.54  (865)
    Introduction to JavaScript
    3 hrs
    Beginner
    87.5K+ Learners
    4.45  (3789)
    Data Structure & Algorithms in Java for Intermediate Level
    4 hrs
    Intermediate
    14.4K+ Learners
    4.48  (1984)
    Building Games using Java
    2 hrs
    Beginner
    26K+ Learners
    4.28  (152)
    Algorithms in C
    3 hrs
    Beginner
    26.3K+ Learners
    4.44  (580)
    Angular7 for Beginners
    3 hrs
    Beginner
    19.1K+ Learners
    4.54  (651)
    Angular7 for Intermediate Level
    3 hrs
    Intermediate
    8.5K+ Learners
    4.59  (215)
    Introduction to Kubernetes
    2 hrs
    Beginner
    6.8K+ Learners
    4.29  (252)
    Angular7 for Advanced Level
    3 hrs
    Advanced
    9.6K+ Learners
    4.62  (229)