Sample Code Snippets

Getting values for a range of keys in a sorted map

This can be done in an external client with a handle to a grain or within an action handler within the grain context.

// From external client with handle to a grain.
Iterator<KeyValue> iter =
        grain.mapQuery(MapQuery.newBuilder()
                          .setMapId(0)
                          .setRange(Key.minKey(), true, Key.maxKey(), true)
                          .build());

while (iter.hasNext()) {
     System.out.println(iter.next().getKey().asString());
}

// From action handler with grain context.
Iterator<KeyValue> iter = context.mapQuery(MapQuery.newBuilder()
                          .setMapId(0)
                          .setRange(Key.minKey(), true, Key.maxKey(), true)
                          .build());

while (iter.hasNext()) {
      KeyValue kv = iter.next();
      String customerId = kv.getKey().asString();
      Customer cust = JsonIterator.deserialize(kv.getValue().asString()).as(Customer.class);
}

Handling event processing failures (Java only)

If the action handler encounters an error when processing an event, it can return a failure. Grainite will put that event into an error sink. All the processing by that action handler instance will be rolled back. That is, all updates to the grain, all events sent from that action handler to other grains or topics are all rolled back.

You can then read the error sink topic and reprocess those failed events.

app.yaml
# In app.yaml, in the definition of the Table and action handler, declare an error sink. You can have a common error sink topic or an error sink per action handler. Grainite will put errored event processing into the error sink
tables:
-   key_type: string
    table_name: employee
    action_handlers:
    -   class_name: org.samples.errorsinktest.actions.EmployeeHandler
        actions:
        -   action_name: handleEmployeeEvents
            error_sink_topic: employee_events_errors
            subscriptions:
                - subscription_name: employeeUpdates
                  topic_key: empId
                  topic_name: employee_events_topic

Defer/Schedule processing in grains

Use the timer feature that is available as part of GrainContext.

To process an action periodically at specific times, use startTimerForDeadline which will fire at the deadline.

To process an action after some time delay, use startTimerForDelay which will fire after delay, and will trigger grainAction when it fires.

// Sets the timer to fire after 30 seconds. After 30 seconds, it will
// call action handler targetForTimerFire. 
public ActionResult handleStartTimerFire(Action action, GrainContext context) {
    context.startTimerForDelay("timer_test", "targetForTimerFire", Duration.ofSeconds(30));
    return ActionResult.success(action);
}

To stop an ongoing time loop, use cancelTimer.

Adding observability/instrumentation in action handlers

Grainite provides an API to add counter and gauge metrics. These metrics are stored in Prometheus format. An external monitoring service like Prometheus can get those metrics by querying the Grainite server. From there, these metrics can be visualized using Grafana or similar tools.

// Decrement gauge
context.gauge(Constants.CURRENT_STATUS, CounterParam.of("status", event.statusName)).set(1);
// Increment counter
context.counter(Constants.RIDES_COUNTER).inc();
#Prometheus.yml entry for scrapping Grainite endpoint
- job_name: 'grainite_prometheus'
    metrics_path: '/export-dashboard'
    scrape_interval: 5s

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['127.0.0.1:5154']

Reading contents of a topic from an external client

If you want to start from the beginning every time you run then do not save the cursor. If you want to start from the last place you left off, save the cursor as shown below.

import com.grainite.api.Callback;
import com.grainite.api.Grainite;
import com.grainite.api.GrainiteClient;
import com.grainite.api.Topic;
import java.util.Iterator;

public class TopicTailerClient {
    static int numRecords = 0;

    public static void main(String[] args) {
        if (args.length != 2) {
            System.out.println("Specify the host and topic");
            System.exit(1);
        }
        Grainite g = GrainiteClient.getClient(args[0], 5056);            
        String topicName = args[1];
        Topic t = g.getTopic(Constants.APP_NAME, topicName);

        TopicReadCursor cursor = t.newTopicReadCursor("cursorName", new Vector<>(), 1, 1, false);
        while(cursor.hasNext()) {
            Event ev = cursor.next();
            numRecords++;
            System.out.println("Key: " + ev.getKey().asString() + ", Payload: " + ev.getPayload().asString());
            // Save the cursor
            cursor.save();
        }
    }
}

Enabling Event Deduplication (Java only)

In the face of client, network, or server failures, while appending events into a topic - there is a significant probability that a few events may get duplicated on the topic storage. Grainite provides Event deduplication capabilities that may be enabled on a topic to prevent most scenarios in which duplicates are created due to these failures.

In order to understand if an event might be a duplicate of another event submitted previously, Grainite needs a unique identifier for an event. Usually, for event producers, this could be a source identifier for the event. For instance, if the producer is reading from a file (or a stream), the dedup ID could be the file (or stream) name and the offset of the event in that source. If a dedup ID is not provided to Grainite, a dedup ID is automatically generated solely based on the contents of the key and the payload of the event.

Note that an auto-generated dedup ID might remove legitimate duplicates from the stream. For example, if you have the same event present twice in the source stream, and the intent was to therefore be present twice in the target stream, this might end up filtering the later of the two events in some cases.

Events are uniquely identified by their dedup ID. The dedup ID is the 3rd parameter in the Event constructor:

// this event was on offset 124 in source file sourcefile.txt
topic.append(
    new Event(Key.of(“id”), Value.of(“payload”), Value.of("sourcefile.txt:" + 124)
);

Also note, that the deduplication logic kicks in when the first batch of events is submitted by the client after a restart, or when the client encounters network or server failures as it is appending events. Deduplication is not intended for eliminating duplicate events from a stream in normal processing - for that additional client logic should be used.

In order to enable deduplication of events in a topic, a new TopicDedupOptions parameter may be provided to the GrainiteClient.getTopic method, as in the example below.

Topic topic = client.getTopic(
                "demo_app",
                "demo_topic",
                TopicDedupOptions.newBuilder()
                         .setDedupStatus(true)
               );

When de-duplication is enabled on a topic handle, the Grainite client continuously checkpoints successfully appended events into a checkpoint file created on the client system. A location for the file may be specified using the TopicDedupOptions.setCheckpointFile method, or if not provided, the checkpoint is written to a file in the temporary directory (usually, /tmp on linux). You may invoke Topic.getDedupCheckpointFileLocation to get and save the file location to re-use the checkpoints in case the client restarts.

TopicDedupOptions.newBuilder()
    .setDedupStatus(true)
    .setCheckpointFile(new File(“/path/to/checkpoint/file”));

On the first append, the checkpoint file is read, and if it is not empty, the dedup logic ensures that the next batch of events have not already been appended into the topic previously.

Fetching multiple Grain values asynchronously from a Client

List<Grain> grains = ...;

// Make getValue calls.
List<CompletableFuture<Value>> futures =
    grains.stream()
        .map(g -> (CompletableFuture<Value>) g.getValueAsync())
        .collect(Collectors.toList());

// Wait for all futures to complete.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();

// Get the values for all Grains.
List<Value> values =
    futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

Last updated