GrainiteClient API

API for Clients and Producers

This API allows clients to access Topics, Tables, and Grains; enabling functionality like - pushing events to topics, invoking Grain action handlers, scanning Topics, and Tables, and accessing Grain state directly.

GrainiteClient is used to connect with Grainite, which can then be used to interact with Topics, Tables, and Grains.

import com.grainite.api.Grainite;
import com.grainite.api.GrainiteClient;

public class Client {
  public static void main(String[] args) {
    Grainite grainite = GrainiteClient.getClient("localhost", 5056);
  }
}

An optional clientID parameter in the getClient() method allows for creating and re-using connections to the server. This can be advantageous for when a single client wants to create multiple connections to the same server.

Topic

Topics are defined in the YAML configuration for an application and loaded into Grainite using gx load. Using Grainite, we can access these topics and perform various actions on them. The following example gets a reference to a topic called "orders_topic" in an app called "food_app".

Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Topic topic = grainite.getTopic("food_app", "orders_topic");

Append Events

Events can be appended to topics using append(Event) . Event is an object that contains a key and a payload that will be sent to the topic. The following example sends an event with the key "#123" and payload "burger" to a topic.

Synchronous

Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Topic topic = grainite.getTopic("food_app", "orders_topic");
topic.append(new Event(Key.of("#123"), Value.of("burger")));

Asynchronous

Using Callbacks:

topic.appendAsync(
  new Event(Key.of("#123"), Value.of("impossible burger"))),
  (completion, exception) -> {
    if (exception != null) {
      exception.printStackTrace();
    }
  }
});

Using Futures:

Future<RecordMetadata> rmdFuture = topic.appendAsync(new Event(Key.of("#123"), Value.of("impossible burger")));
// Do some work while waiting for the event to get appended into the topic.
RecordMetadata rmd = rmdFuture.get();

Note that an event key can be up to 4KB in size and the payload can be up to 512KB in size. Future updates to Grainite will increase these limits.

Reading a topic

All events in a topic can be read by using a TopicReadCursor. This cursor can be created by using the newTopicReadCursor method and providing it with a cursorName, a list of keys to read events for (or null to read all the events in the topic), a consumerIndex that the cursor should start at, a numConsumers for the cursor to read, and a boolean stopAtEol that says whether to stop reading at EOL. (Javadocs, Pydocs)

// Read only one event from the topic that has key #123
TopicReadCursor cursor =
    topic.newTopicReadCursor(
        "myCursor", Arrays.asList(Key.of("#123")), 1, 1, false);
Event nextEvent = cursor.next();

The cursor can be saved and then loaded later to continue reading from where it left off:

cursor.save();
// Load a new cursor "myLoadedCursor" to pick up where "myCursor" left off.
cursor = topic.loadReadCursor("myCursor", "myLoadedCursor", 1, 1, false);
cursor.hasNext(); // Should be false

Table

Tables are defined in the YAML configuration for an application and loaded into Grainite using gx load. Using Grainite, we can access these tables and perform various actions on them. The following example gets a reference to a table called "orders_table" in an app called "food_app".

Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Table table = grainite.getTable("food_app", "orders_table");

Accessing a Grain

Tables contain Grains, which can hold state and also have action handlers associated with them. Grains can be accessed by using get(Value), where Value is the key of the Grain. The following example gets a reference to the Grain with the key "#123" from the "orders_table".

Synchronous

Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Table table = grainite.getTable("food_app", "orders_table");
Grain grain = table.getGrain(Key.of("#123"));

Asynchronous

Using Callbacks:

table.getGrainAsync(Key.of("#123"), true, (grain, exception) -> {
  if(exception != null) {
    System.out.println("Unable to get grain: " + exception.getMessage());
  } else {
    System.out.println("Got grain: " + grain.getValue());
  }
});

Using Futures:

Future<Grain> grainFuture = table.getGrainAsync(Key.of("#123"), true);
// Do some work while waiting to get Grain.
Grain grain = grainFuture.get();

Accessing a Grain's state

Accessing the Grain Value

The Grain object can be used to access or modify the state of a particular Grain. The following example access and modifies the value of the "#123" Grain by incrementing the number of dishes in this order.

Synchronous

Grain grain = table.getGrain(Key.of("#123"));
// Get the current number of dishes for this order.  
long numDishes = grain.getValue().asLong();
// Increment and store the number of dishes for this order.
grain.setValue(Value.of(numDishes + 1));

Asynchronous

Using Callbacks:

// Get the current number of dishes for this order.
grain.getValueAsync((value) -> {
    long numDishes = value.asLong();
    
    // Increment and store the numer of dishes for this order.
    grain.setValueAsync(Value.of(numDishes + 1), (status) -> {
        if(status.isError()) {
            System.out.println("The Grain value failed to update: " + status.getDescription());
        } else {
            System.out.println("The Grain value was updated.");
        }
    });
});

Using Futures:

// Get the current number of dishes for this order.
Future<Value> grainValueFuture = grain.getValueAsync();
// Do something while waiting to get Grain value.
long numDishes = grainValueFuture.get().asLong();

// Increment and store the numer of dishes for this order.
Future<Status> grainValueFuture = grain.setValueAsync(Value.of(numDishes + 1));
// Do something while waiting to set Grain value.
Status status = grainValueFuture.get();

Similarly, a Grain's key can be accessed using getKey().

Accessing the Grain's Maps

Along with the value, a Grain also contains 200 sorted maps to store state. These maps can be defined in the application YAML configuration file, even though this is not required. A name for the map can be defined in the application YAML configuration file and can be loaded into Grainite using gx load.

pageApp Configuration Options

The following example stores a timestamp and some payload, in the "history" map of a grain.

app.yaml
table:
    ...
    
    maps:
    -   id: 0
        name: stats
        key_type: string
        value_type: double

Synchronous

grain.mapPut("history", Key.of(timestamp), Value.of(payload));

Asynchronous

Using Callbacks:

grain.mapPutAsync("history", Key.of(timestamp), Value.of(payload),
    (status, exception) -> {
        if(exception == null) {
            System.out.println("Inserted value into map.");
        } else {
            System.out.println("Error inserting value into map: " + exception.getMessage())
        }
    }
);

Using Futures:

Future<Status> statusFuture = grain.mapPutAsync("history", Key.of(timestamp), Value.of(payload));
// Do something while waiting for value to be inserted into the map.
Status status = statusFuture.get();

Maps can be accessed by both name (eg: "history") and id (eg: 0).

Similarly, a map for a grain can be accessed by using mapGet().

Synchronous

grain.mapGet("history", Key.of(timestamp));

Asynchronous

Using Callbacks:

grain.mapGetAsync("history", Key.of(timestamp), (value, exception) -> {
    if(exception == null) {
        System.out.println("Grain map value is: " + value.toString());
    } else {
        System.out.println("Failed to get Grain map value:" + exception.getMessage());
    }
});

Using Futures:

Future<Value> mapValueFuture = grain.mapGetAsync("history", Key.of(timestamp));
// Do something while waiting to get map value.
Value mapValue = mapValueFuture.get();

mapQuery() can be used to scan a Grain's map to get a range of values. The following example scans the "history" map of a Grain, to get all the data from Monday to Friday (keyed by timestamp).

The Python API instead uses map_get_range(), which is equivalent to the mapGetRange()API for Java that is now deprecated and was replaced with MapQuery.

Synchronous

MapQuery query = MapQuery.newBuilder().setMapName("history")
                                .setRange(Key.of(mondayTS), Key.of(fridayTS))
                                .build()
Iterator<KeyValue> iter = grain.mapQuery(query);
// Iterate over the result of mapQuery
iter.forEachRemaining(kv -> {
    System.out.println(kv.getKey().asString());
    System.out.println(kv.getValue().asString());
});

Asynchronous

Using Callbacks:

MapQuery query = MapQuery.newBuilder().setMapName("history")
                                .setRange(Key.of(mondayTS), Key.of(fridayTS))
                                .build()

grain.mapQueryAsync(query, (iter, exception) -> {
      if(exception == null) {
         // Iterate over the result of mapQuery
        iter.forEachRemaining(kv -> {
            System.out.println(kv.getKey().asString());
            System.out.println(kv.getValue().asString());
        });
      } else {
        System.out.println("Failed to query map: " + exception.getMessage());
      }
});

Using Futures:

MapQuery query = MapQuery.newBuilder().setMapName("history")
                                .setRange(Key.of(mondayTS), Key.of(fridayTS))
                                .build()
Future<Iterator<KeyValue>> iterFuture = grain.mapQueryAsync(query);
// Do something while waiting for mapQuery to return.

// Iterate over the result of mapQuery
iter.forEachRemaining(kv -> {
    System.out.println(kv.getKey().asString());
    System.out.println(kv.getValue().asString());
});

Note that a Grain key can be up to 4KB in size and the value can be up to 512KB in size. The same limits apply to map keys (4KB maximum) and map values (512KB maximum). Future updates to Grainite will increase these limits.

Early Access: Querying Secondary Indexes

As part of the early access secondary indexes feature, queries against indexes may be performed using the Client API. The Table API has a new find() API that either accepts a QueryExpression object directly or a string query which is parsed and converted into a QueryExpression object by the client.

Valid queries are composed of individual "Simple" (property ==/!= value) or "Range" (property </>/<=/>= value) expressions, which can themselves be combined with AND/OR/NOT logical operators to form arbitrarily complex query expressions.

An example logical query (assuming indexes have been created for "income" and "city") could be:

income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')

We can pass the query to table.find() as a string and get an iterator of Grains as follows:

Synchronous

Iterator<Grain> it = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");
// Iterate over the results of the query
while(it.hasNext()) {
    Grain grain = it.next();
    Value value = grain.getValue();
    ...
}

Asynchronous

Using Callbacks:

table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')", (iter, exception) -> {
    if(exception == null) {
         // Iterate over the result of the query
        iter.forEachRemaining(grain -> {
            System.out.println(grain.getKey().asString());
            System.out.println(grain.getValue().asString());
        });
    } else {
        System.out.println("Failed to query table: " + exception.getMessage());
    }
});

Using Futures:

Future<Iterator<Grain>> findFuture = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");
// Do something while waiting for find to return.
Iterator<Grain> find = findFuture.get();
find.forEachRemaining(grain -> {
    System.out.println(grain.getKey().asString());
    System.out.println(grain.getValue().asString());
});

Invoking an Action on a Grain

A Grain's Action Handler can be triggered directly from a client using invoke(ActionName, Value). This method takes a name of an action name, as well as a payload to send to the Action Handler. The following example invokes an action called addFoodToOrder for a Grain, with the payload - "Burger".

Synchronous

grain.invoke("addFoodToOrder", Key.of("Burger"));

Asynchronous

Using Callbacks:

grain.invokeAsync("addFoodToOrder", Key.of("Burger"), 
    (res, exception) -> {
        if(exception == null) {
            if(res.isError()) {
                System.out.println("Invoke retured an error: " + res.getStatus().getDescription());
            } else {
                System.out.println("Invoke result: " + res.getResult().toString());
            }
        } else {
            System.out.println("Failed to invoke: " + exception.getMessage());
        }
    }
);

Using Futures:

Future<ResultOrStatus<Value>> resultFuture = grain.invokeAsync("addFoodToOrder", Key.of("Burger"));
// Do something while waiting for invoke to return.
ResultOrStatus<Value> result = resultFuture.get();
if(res.isError()) {
    System.out.println("Invoke retured an error: " + res.getStatus().getDescription());
} else {
    System.out.println("Invoke result: " + res.getResult().toString());
}

Scanning a Table for Grains

All the Grains in a Table can be scanned by using scan(Cursor, NumItems). This method takes a cursor (which can be used in proceeding scans), and a limit in the form of an Integer to specify the number of items to return. The following example scans a table for 10 Grains and prints their keys.

Synchronous

Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
Iterator<Grain> iter = table.scan(cursor, 10);
iter.forEachRemaining(grain -> {
    System.out.println(grain.getKey().asString());
});

Asynchronous

Using Callbacks:

Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
table.scanAsync(cursor, 10, (iter, exception) -> {
    if(exception == null) {
        iter.forEachRemaining(grain -> {
            System.out.println(grain.getKey().asString());
        });
    } else {
        System.out.println("Error scanning table: " + exception.getMessage());
    }
});

Using Futures:

Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
Future<Iterator<Grain>> iterFuture = table.scanAsync(cursor, 10);
// Do something while waiting for table scan to return.
Iterator<Grain> iter = iterFuture.get();
iter.forEachRemaining(grain -> {
    System.out.println(grain.getKey().asString());
});

Last updated