Implement Application Logic

Line Event Handler

LineEventHandler.java contains the method handleLineEvent that is responsible for handling events from line_topic as part of the subscription. Here, you will write code to parse the line and forward information to the Word Stats table and Doc Stats Table.

Step 0: Import necessary dependencies

We'll need to add the following to our import statements in LineEventHandler.java:

LineEventHandler.java
import com.grainite.api.Value;
import com.grainite.api.Key;
import com.grainite.api.context.Action.TopicEvent;
import com.grainite.api.context.GrainContext.GrainOp;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

Step 1: Defining our data structures

First, you will define two structures to represent:

  1. the event coming in from the topic.

  2. the payload to send to Doc Stats Table.

Step 1.1 – Line Events

Implement the first structure to represent an event from line_topic in LineEventHandler.java:

LineEventHandler.java
public class LineEventHandler {
    ...
  /**
   * Event payload in `line_topic`.
   */  
  public static class LineEvent implements Serializable {
    private String docName;
    private String line;

    public LineEvent() {}

    public LineEvent(String docName, String line) {
      this.docName = docName;
      this.line = line;
    }

    public String getLine() {
      return line;
    }

    public String getDocName() {
      return docName;
    }
  }
}

Step 1.2 - Document Stats

For the second structure representing the payload to the Doc Stats table, you will want to create a class in a separate file, as you'll be using it in both the LineEventHandler and DocStatsHandler classes. Change to the same directory as the Java source files generated by GX (cd /src/main/java/org/sample/wordcount/) and create a new file in which to define our DocumentStats structure (touch DocumentStats.java) with the following code:

DocumentStats.java
package org.sample.wordcount;

import java.io.Serializable;

/**
 * Structure to store in `doc_stats_table`.
 */
public class DocumentStats implements Serializable {
  public long numPeriods;
  public long numWords;

  public DocumentStats() {}

  public DocumentStats(long numPeriods, long numWords) {
    this.numPeriods = numPeriods;
    this.numWords = numWords;
  }
} 

Step 2: Handling Line Events

Next, you will go back to LineEventHandler.java write code to parse words from each input line, track the word count, and send the words to the Word Stats Table.

LineEventHandler.java
public ActionResult handleLineEvent(Action action, GrainContext context) {
  // Get payload from the topic event action.
  LineEvent event = ((TopicEvent) action).getPayload().asType(LineEvent.class);

  final AtomicLong wordCount = new AtomicLong(0);
  Stream.of(event.getLine().split("[. ]+")).forEach(w -> {
    // For each word, send to word stats table, and increment document count.
    if (w.trim().length() == 0) return;

    w = w.toLowerCase();
    wordCount.incrementAndGet();
    // Send the parsed word to a grain in the Word Stats Table.
    // The key of the grain is the first letter of the word.
    GrainOp.Invoke invoke = new GrainOp.Invoke(Constants.HANDLE_WORD_EVENT_ACTION, Value.of(w));
    context.sendToGrain(Constants.WORD_STATS_TABLE, Key.of(w.substring(0, 1)), invoke, null);
  });

  return ActionResult.success(action);
}

Action contains data about why this handler was invoked. There are many types of Actions, and all of them are subclasses of Action . In this case, handleLineEvent would be triggered because it received an event from line_topic and as a result, action will be an instance of TopicEvent .

GrainContext refers to the context of the grain for which this action handler has been executed. For example, GrainContext allows us to access and store data of the grain, access the logger, invoke other grains, send events to topics, and many more things.

GrainOp.Invoke allows us to invoke and send data to other grains (even across apps). You are using the following code to invoke an action (Constants.HANDLE_WORD_EVENTS_ACTION) with a payload (Value.of(w)).

GrainOp.Invoke invoke = new GrainOp.Invoke(Constants.HANDLE_WORD_EVENTS_ACTION, Value.of(w));

Next, you are using the following code to send a message to a grain, keyed by the first letter of the word, in the Word Stats Table.

context.sendToGrain(Constants.WORDS_TABLE, Key.of(w.substring(0, 1)), invoke, null);

In addition to sending a message to word_stats_table , you will also send a message to the doc_stats_table with the number of sentences and words in the document. For this application, you will keep it simple and consider a period ('.'), the end of a sentence.

Edit LineEventHandler to count the number of sentences and words in the input and send a message to the Doc Stats table. You can use the following helper function to count the number of periods in the line:

LineEventHandler.java
private int countPeriods(String line) {
  int count = 0;
  int nextIndex = -1;

  while ((nextIndex = line.indexOf('.', nextIndex + 1)) != -1) {
    count++;
  }
  return count;
}

And you can use it in handleLineEvent by adding the following:

LineEventHandler.java
public ActionResult handleLineEvent(Action action, GrainContext context) {
  // Get payload from the topic event action.
  LineEvent event = ((TopicEvent) action).getPayload().asType(LineEvent.class);

  final AtomicLong wordCount = new AtomicLong(0);
  Stream.of(event.getLine().split("[. ]+")).forEach(w -> {
    // For each word, send to word stats table, and increment document count.
    if (w.trim().length() == 0) return;

    w = w.toLowerCase();
    wordCount.incrementAndGet();

    // Send the parsed word to a grain in the Word Stats Table.
    // The key of the grain is the first letter of the word.
    GrainOp.Invoke invoke = new GrainOp.Invoke(Constants.HANDLE_WORD_EVENT_ACTION, Value.of(w));
    context.sendToGrain(Constants.WORD_STATS_TABLE, Key.of(w.substring(0, 1)), invoke, null);
  });

  // Count the number of sentences in the line and get the document word count
  long numPeriods = countPeriods(event.getLine());
  GrainOp.Invoke invokeDoc = new GrainOp.Invoke(
    Constants.HANDLE_DOC_STATS_EVENT_ACTION, Value.ofObject(new DocumentStats(numPeriods, wordCount.get())));

  // Now update document count.
  context.sendToGrain(Constants.DOC_STATS_TABLE, Key.of(event.getDocName()), invokeDoc, null);

  return ActionResult.success(action);
}

Word Stats Handler

WordStatsHandler.java contains the method handleWordEvent that is responsible for handling messages sent from line_table.

First , we'll need to add the following import statements to WordStatsHandler.java:

WordStatsHandler.java
import com.grainite.api.Value;
import com.grainite.api.context.Action.GrainRequest;

In WordStatsHandler.java, implement the logic to keep track of each word's count and store it in map 0 of the grain.

WordStatsHandler.java
// Increments the count of this word. Note that this count is across all documents.
public ActionResult handleWordEvent(Action action, GrainContext context) {
  Key word = Key.from(((GrainRequest) action).getPayload());

  // Update and store the word count in map 0.
  context.mapPut(0, word, Value.of(context.mapGet(0, word).asLong(0) + 1));
  return ActionResult.success(action);
}

The following line from the code above uses mapGet() to fetch the current count for the word from map 0, increments it, and puts it back into the map with mapPut():

context.mapPut(0, word, Value.of(context.mapGet(0, word).asLong(0) + 1));

Doc Stats Handler

DocStatsHandler.java will store the number of sentences and words in a document. As a result, all grains in doc_stats_table will be keyed by document name. Now you'll implement the logic in DocStatsHandler.java that will store these statistics in the grain's value.

As before, we'll first need to add the following import statements to DocStatsHandler.java:

DocStatsHandler.java
import com.grainite.api.Value;
import com.grainite.api.context.Action.GrainRequest;

Now, complete the application logic for storing the document statistics:

DocStatsHandler.java
public class DocStatsHandler {
  public ActionResult handleDocStatsEvent(Action action, GrainContext context) {
    DocumentStats prevStats = context.getValue().asType(DocumentStats.class);
    DocumentStats evt = ((GrainRequest) action).getPayload().asType(DocumentStats.class);

    prevStats.numPeriods += evt.numPeriods;
    prevStats.numWords += evt.numWords;

    context.setValue(Value.ofObject(prevStats));
    return ActionResult.success(action);
  }
}

In handleDocStatsEvent you are storing an instance of DocumentStats into the grain's value. To update the document statistics, you are modifying the corresponding instance of DocumentStats , as opposed to storing them as separate values in a map.

Last updated