This API allows clients to access Topics, Tables, and Grains; enabling functionality like - pushing events to topics, invoking Grain action handlers, scanning Topics, and Tables, and accessing Grain state directly.
GrainiteClient is used to connect with Grainite, which can then be used to interact with Topics, Tables, and Grains.
An optional clientID parameter in the getClient() method allows for creating and re-using connections to the server. This can be advantageous for when a single client wants to create multiple connections to the same server.
Topics are defined in the YAML configuration for an application and loaded into Grainite using gx load. Using Grainite, we can access these topics and perform various actions on them. The following example gets a reference to a topic called "orders_topic" in an app called "food_app".
Events can be appended to topics using append(Event) . Event is an object that contains a key and a payload that will be sent to the topic. The following example sends an event with the key "#123" and payload "burger" to a topic.
Future<RecordMetadata> rmdFuture =topic.appendAsync(newEvent(Key.of("#123"),Value.of("impossible burger")));// Do some work while waiting for the event to get appended into the topic.RecordMetadata rmd =rmdFuture.get();
from grainite_cl.api.callback import Callback# Define our own Callback to call after append_async is completeclassMyCallback(Callback):defon_completion(completion: RecordMetadata,exception:Exception):if exception isnotNone: ...topic.append_async( event=Event(Key("#123"), Value("impossible burger")), callback=MyCallback())
Note that an event key can be up to 4KB in size and the payload can be up to 512KB in size. Future updates to Grainite will increase these limits.
Reading a topic
All events in a topic can be read by using a TopicReadCursor. This cursor can be created by using the newTopicReadCursor method and providing it with a cursorName, a list of keys to read events for (or null to read all the events in the topic), a consumerIndex that the cursor should start at, a numConsumers for the cursor to read, and a boolean stopAtEolthat says whether to stop reading at EOL. (Javadocs, Pydocs)
// Read only one event from the topic that has key #123TopicReadCursor cursor =topic.newTopicReadCursor("myCursor",Arrays.asList(Key.of("#123")),1,1,false);Event nextEvent =cursor.next();
# Read only one event from the topic that has key #123cursor = topic.new_read_cursor("my_cursor", [Key("#123")], 1, 1, False)next_event = cursor.next()
The cursor can be saved and then loaded later to continue reading from where it left off:
cursor.save();// Load a new cursor "myLoadedCursor" to pick up where "myCursor" left off.cursor =topic.loadReadCursor("myCursor","myLoadedCursor",1,1,false);cursor.hasNext(); // Should be false
cursor.save()# Load a new cursor "my_loaded_cursor" to pick up where "my_cursor" left off.cursor = topic.load_read_cursor("my_cursor", "my_loaded_cursor", 1, 1, False)cursor.has_next()# Should be False
Table
Tables are defined in the YAML configuration for an application and loaded into Grainite using gx load. Using Grainite, we can access these tables and perform various actions on them. The following example gets a reference to a table called "orders_table" in an app called "food_app".
Tables contain Grains, which can hold state and also have action handlers associated with them. Grains can be accessed by using get(Value), where Value is the key of the Grain. The following example gets a reference to the Grain with the key "#123" from the "orders_table".
The Grain object can be used to access or modify the state of a particular Grain. The following example access and modifies the value of the "#123" Grain by incrementing the number of dishes in this order.
Synchronous
Grain grain =table.getGrain(Key.of("#123"));// Get the current number of dishes for this order. long numDishes =grain.getValue().asLong();// Increment and store the number of dishes for this order.grain.setValue(Value.of(numDishes +1));
Asynchronous
Using Callbacks:
// Get the current number of dishes for this order.grain.getValueAsync((value) -> {long numDishes =value.asLong();// Increment and store the numer of dishes for this order.grain.setValueAsync(Value.of(numDishes +1), (status) -> {if(status.isError()) {System.out.println("The Grain value failed to update: "+status.getDescription()); } else {System.out.println("The Grain value was updated."); } });});
Using Futures:
// Get the current number of dishes for this order.Future<Value> grainValueFuture =grain.getValueAsync();// Do something while waiting to get Grain value.long numDishes =grainValueFuture.get().asLong();// Increment and store the numer of dishes for this order.Future<Status> grainValueFuture =grain.setValueAsync(Value.of(numDishes +1));// Do something while waiting to set Grain value.Status status =grainValueFuture.get();
grain = table.get_grain(Key("#123"))# Get the current number of dishes for this order. num_dishes = grain.get_value().as_long()# Increment and store the number of dishes for this order.grain.set_value(Value(num_dishes +1))
Similarly, a Grain's key can be accessed using getKey().
Accessing the Grain's Maps
Along with the value, a Grain also contains 200 sorted maps to store state. These maps can be defined in the application YAML configuration file, even though this is not required. A name for the map can be defined in the application YAML configuration file and can be loaded into Grainite using gx load.
grain.mapPutAsync("history",Key.of(timestamp),Value.of(payload), (status, exception) -> {if(exception ==null) {System.out.println("Inserted value into map."); } else {System.out.println("Error inserting value into map: "+exception.getMessage()) } });
Using Futures:
Future<Status> statusFuture =grain.mapPutAsync("history",Key.of(timestamp),Value.of(payload));// Do something while waiting for value to be inserted into the map.Status status =statusFuture.get();
Maps can be accessed by both name (eg: "history") and id (eg: 0).
Similarly, a map for a grain can be accessed by using mapGet().
Synchronous
grain.mapGet("history",Key.of(timestamp));
Asynchronous
Using Callbacks:
grain.mapGetAsync("history",Key.of(timestamp), (value, exception) -> {if(exception ==null) {System.out.println("Grain map value is: "+value.toString()); } else {System.out.println("Failed to get Grain map value:"+exception.getMessage()); }});
Using Futures:
Future<Value> mapValueFuture =grain.mapGetAsync("history",Key.of(timestamp));// Do something while waiting to get map value.Value mapValue =mapValueFuture.get();
grain.map_get("history", Key(timestamp))
mapQuery() can be used to scan a Grain's map to get a range of values. The following example scans the "history" map of a Grain, to get all the data from Mondayto Friday (keyed by timestamp).
The Python API instead uses map_get_range(), which is equivalent to the mapGetRange()API for Java that is now deprecated and was replaced with MapQuery.
Synchronous
MapQuery query =MapQuery.newBuilder().setMapName("history").setRange(Key.of(mondayTS),Key.of(fridayTS)).build()Iterator<KeyValue> iter =grain.mapQuery(query);// Iterate over the result of mapQueryiter.forEachRemaining(kv -> {System.out.println(kv.getKey().asString());System.out.println(kv.getValue().asString());});
Asynchronous
Using Callbacks:
MapQuery query =MapQuery.newBuilder().setMapName("history").setRange(Key.of(mondayTS),Key.of(fridayTS)).build()grain.mapQueryAsync(query, (iter, exception) -> {if(exception ==null) {// Iterate over the result of mapQueryiter.forEachRemaining(kv -> {System.out.println(kv.getKey().asString());System.out.println(kv.getValue().asString()); }); } else {System.out.println("Failed to query map: "+exception.getMessage()); }});
Using Futures:
MapQuery query =MapQuery.newBuilder().setMapName("history").setRange(Key.of(mondayTS),Key.of(fridayTS)).build()Future<Iterator<KeyValue>> iterFuture =grain.mapQueryAsync(query);// Do something while waiting for mapQuery to return.// Iterate over the result of mapQueryiter.forEachRemaining(kv -> {System.out.println(kv.getKey().asString());System.out.println(kv.getValue().asString());});
cursor =MapScanCursor(Key(mondayTS), Key(fridayTS))it = grain.map_get_range(0, cursor)# Iterate over the result of map_get_rangewhile it.has_next(): kv = it.next()print(kv.key.as_string())print(kv.value.as_string())
Note that a Grain key can be up to 4KB in size and the value can be up to 512KB in size. The same limits apply to map keys (4KB maximum) and map values (512KB maximum). Future updates to Grainite will increase these limits.
Early Access: Querying Secondary Indexes
As part of the early access secondary indexes feature, queries against indexes may be performed using the Client API. The Table API has a new find() API that either accepts a QueryExpression object directly or a string query which is parsed and converted into a QueryExpression object by the client.
Valid queries are composed of individual "Simple" (property ==/!= value) or "Range" (property </>/<=/>= value) expressions, which can themselves be combined with AND/OR/NOT logical operators to form arbitrarily complex query expressions.
An example logical query (assuming indexes have been created for "income" and "city") could be:
income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')
We can pass the query to table.find() as a string and get an iterator of Grains as follows:
Synchronous
Iterator<Grain> it =table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");// Iterate over the results of the querywhile(it.hasNext()) {Grain grain =it.next();Value value =grain.getValue();...}
Asynchronous
Using Callbacks:
table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')", (iter, exception) -> {if(exception ==null) {// Iterate over the result of the queryiter.forEachRemaining(grain -> {System.out.println(grain.getKey().asString());System.out.println(grain.getValue().asString()); }); } else {System.out.println("Failed to query table: "+exception.getMessage()); }});
Using Futures:
Future<Iterator<Grain>> findFuture = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");
// Do something while waiting for find to return.Iterator<Grain> find =findFuture.get();find.forEachRemaining(grain -> {System.out.println(grain.getKey().asString());System.out.println(grain.getValue().asString());});
table.find()became available in the Python API starting in 2321.
String queries became available in the Python table.find() API in 2323.
it = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')")# Iterate over the results of the queryfor grain in it: value = grain.get_value() ...
Equivalent using a QueryExpression instead of a string query:
query =AndQueryExpression(RangeQueryExpression(Value("income"), Value(100000), Value.max_key(), False, True),OrQueryExpression(SimpleQueryExpression(Value("city"), Value("Los Angeles")),SimpleQueryExpression(Value("city"), Value("San Francisco")) ),)it = table.find(query)# Iterate over the results of the queryfor grain in it: value = grain.get_value() ...
Invoking an Action on a Grain
A Grain's Action Handler can be triggered directly from a client using invoke(ActionName, Value). This method takes a name of an action name, as well as a payload to send to the Action Handler. The following example invokes an action called addFoodToOrder for a Grain, with the payload - "Burger".
Future<ResultOrStatus<Value>> resultFuture =grain.invokeAsync("addFoodToOrder",Key.of("Burger"));// Do something while waiting for invoke to return.ResultOrStatus<Value> result =resultFuture.get();if(res.isError()) {System.out.println("Invoke retured an error: "+res.getStatus().getDescription());} else {System.out.println("Invoke result: "+res.getResult().toString());}
grain.invoke("add_food_to_order", Key("Burger"))
Scanning a Table for Grains
All the Grains in a Table can be scanned by using scan(Cursor, NumItems). This method takes a cursor (which can be used in proceeding scans), and a limit in the form of an Integer to specify the number of items to return. The following example scans a table for 10 Grains and prints their keys.
Synchronous
Cursor cursor =Cursor.getScanCursor();// Read first 10 grains in the table.Iterator<Grain> iter =table.scan(cursor,10);iter.forEachRemaining(grain -> {System.out.println(grain.getKey().asString());});
Asynchronous
Using Callbacks:
Cursor cursor =Cursor.getScanCursor();// Read first 10 grains in the table.table.scanAsync(cursor,10, (iter, exception) -> {if(exception ==null) {iter.forEachRemaining(grain -> {System.out.println(grain.getKey().asString()); }); } else {System.out.println("Error scanning table: "+exception.getMessage()); }});
Using Futures:
Cursor cursor =Cursor.getScanCursor();// Read first 10 grains in the table.Future<Iterator<Grain>> iterFuture =table.scanAsync(cursor,10);// Do something while waiting for table scan to return.Iterator<Grain> iter =iterFuture.get();iter.forEachRemaining(grain -> {System.out.println(grain.getKey().asString());});
cursor =ScanCursor()# Read first 10 grains in the table.it = table.scan(cursor, 10)while it.has_next(): Grain grain =iter.next()print(grain.get_key().as_string())