How to fill mutation for another Cassandra table in a trigger
I'm trying to implement a Cassandra trigger in such a way that when updated or deleted on keypace1.tableA, the trigger will add a row to keypace1.tableB.
The column names in table B are completely different from the columns in table A.
I am working with Cassandra 2.1, there is no way to upgrade to a newer version. Looking at the InvertedIndex trigger example at https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java I see the basics of adding a mutation:
From the InvertedIndex example:
for (Cell cell : update)
{
// Skip the row marker and other empty values, since they lead to an empty key.
if (cell.value().remaining() > 0)
{
Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
mutations.add(mutation);
}
}
The challenge is that in this example, the cell name passed to mutation.add is cell.name (), which is an existing object whose name we can simply use with this function.
I am currently just trying to save the time the change was made to tableA, so tableB has two columns:
- changetime timeuuid
- operation text
I need to add a mutation that will add a row to tableB with the time change and the operation being performed. How can I add such a string mutation in Cassandra 2.1.12?
I tried this, but I am getting a null pointer exception in the trigger:
...
String keycol = "changetime";
ByteBuffer uuidKey = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
ColumnIdentifier ci = new ColumnIdentifier(keycol, false);
CellName cn = CellNames.simpleSparse(ci);
mutation = new Mutation(keyspace, uuidKey);
mutation.add(tableName,cn, uuidKey, System.currentTimeMillis());
...
Any help would be much appreciated - I don't know the internal staff at Cassandra, so the amount of detail isn't too much information.
source to share
The answer is to use the CFMetaData comparator to create the CellName needed by Mutation.add (...). To provide a concrete example, I'll use the schema and example from Cassandra 3.0 AuditTrigger available at https://github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers
In this case, the table we will be recording is the test.audit table, defined as follows:
CREATE TABLE test.audit (key timeuuid, keyspace_name text,
table_name text, primary_key text, PRIMARY KEY(key));
This table has a section key named "key" and clustering columns. For definitions, see https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt , Section Classes and Clustering Columns section.
This is important to note because the call to makeCellName (which we will see in the next code example) takes a variable argument list, where each argument is the value we want the corresponding clustering column to execute for the row to be affected, and the final argument will be the column name in text format.
If there are no clustering columns (as is the case with this schema), then the makeCellName call takes one argument: the column name.
Putting it all together, the AuditTrigger function for Cassandra 2.1, which does the same as Example 3.0, looks like this:
public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
CFMetaData cfm = update.metadata();
List<Mutation> mutations = new ArrayList<>(update.getColumnCount());
String keyspaceName = "";
String tableName = "";
String keyStr = "";
keyspaceName = cfm.ksName;
tableName = cfm.cfName;
try {
keyStr = ByteBufferUtil.string(key);
} catch (CharacterCodingException e) {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
logger.error(errors.toString());
}
for (Cell cell : update)
{
// Skip the row marker and other empty values, since they lead to an empty key.
if (cell.value().remaining() > 0)
{
CFMetaData other = Schema.instance.getCFMetaData("test","audit");
CellNameType cnt = other.comparator;
ByteBuffer auditkey = UUIDType.instance.decompose(UUIDGen.getTimeUUID());
// create CellName objects for each of the columns in the audit table row we are inserting
CellName primaryKeyCellName = cnt.makeCellName("primary_key");
CellName keyspaceCellName = cnt.makeCellName("keyspace_name");
CellName tableCellName = cnt.makeCellName("table_name");
try {
// put the values we want to write to the audit table into ByteBuffer objects
ByteBuffer ksvalbb,tablevalbb,keyvalbb;
ksvalbb=ByteBuffer.wrap(keyspaceName.getBytes("UTF8"));
tablevalbb=ByteBuffer.wrap(tableName.getBytes("UTF8"));
keyvalbb=ByteBuffer.wrap(keyStr.getBytes("UTF8"));
// create the mutation object
Mutation mutation = new Mutation(keyspaceName, auditkey);
// get the time which will be needed for the call to mutation.add
long mutationTime=System.currentTimeMillis();
// add each of the column values to the mutation
mutation.add("audit", primaryKeyCellName, keyvalbb, mutationTime);
mutation.add("audit", keyspaceCellName, ksvalbb, mutationTime);
mutation.add("audit", tableCellName, tablevalbb, mutationTime);
mutations.add(mutation);
} catch (UnsupportedEncodingException e) {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
logger.error(errors.toString());
}
}
}
return mutations;
}
source to share