Online Integration#
This document covers how to integrate Chronon with your online KV store, which is the backend that powers low latency serving of individual feature vectors in the online environment.
This integration gives Chronon the ability to:
Perform batch uploads of feature values to the KV store
Perform streaming updates to those features
Fetch features via the
Fetcher
API
Example#
If you’d to start with an example, please refer to the MongoDB Implementation in the Quickstart Guide. This provides a complete working example of how to integrate Chronon with MongoDB.
Components#
KVStore: The biggest part of the API implementation is the KVStore.
object KVStore {
// `afterTsMillis` implies that this is a range scan of all values with `timestamp` >= to the specified one. This can be implemented efficiently, if `timestamp` can be a secondary key. Some databases have a native version id concept which also can map to timestamp.
case class GetRequest(keyBytes: Array[Byte], dataset: String, afterTsMillis: Option[Long] = None)
// response is a series of values that are
case class TimedValue(bytes: Array[Byte], millis: Long)
case class GetResponse(request: GetRequest, values: Try[Seq[TimedValue]]) {
def latest: Try[TimedValue] = values.map(_.maxBy(_.millis))
}
case class PutRequest(keyBytes: Array[Byte], valueBytes: Array[Byte], dataset: String, tsMillis: Option[Long] = None)
}
trait KVStore {
def create(dataset: String): Unit
// Used by the Chronon client to fetch features
def multiGet(requests: Seq[GetRequest]): Future[Seq[GetResponse]]
// Used by spark streaming job to write values
def multiPut(keyValueDatasets: Seq[PutRequest]): Future[Seq[Boolean]]
// Used by spark upload job to bulk upload data into kv store
def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit
}
There are three functions to implement as part of this integration:
create
: which takes a string and creates a new database/dataset with that name.multiGet
: which takes aSeq
ofGetRequest
and converts them into aFuture[Seq[GetResponse]]
by querying the underlying KVStore.multiPut
: which takes aSeq
ofPutRequest
and converts them intoFuture[Seq[Boolean]]
(success/fail) by attempting to insert them into the underlying KVStore.bulkPut
: to upload a hive table into your kv store. It takes the table name and partitions asString
s as well as the dataset as aString
. If you have another mechanism (like an airflow upload operator) to upload data from hive into your kv stores you don’t need to implement this method.
See the MongoDB example here.
StreamDecoder: This is responsible for “decoding” or converting the raw values that Chronon streaming jobs will read into events that it knows how to process.
case class Mutation(schema: StructType = null, before: Array[Any] = null, after: Array[Any] = null)
abstract class StreamDecoder extends Serializable {
def decode(bytes: Array[Byte]): Mutation
def schema: StructType
}
At a high level, there are two types of inputs streams that Chronon might listen to:
Events: These are the most common type of streaming data, and can be thought of as the “standard” insert-only kafka log. The key differentiator from Mutations is that Events are immutable, and cannot be updated/deleted (although new events with the same key can be emitted).
Mutations: These are streaming updates to specific entities. For example, an item in an online product catalog might get updates to its price or description, or it might get deleted altogether. Unlike normal events, these can be modeled as
INSERT/UPDATE/DELETE
.
Mutations are usually captured changes from production databases. Debezium is one tool that can be used for this if you don’t have a Mutation capture system in place already. The benefit of Mutations processing is that you can use production Databases as sources of feature data, without issuing large range queries against those databases. Chronon can keep large windowed aggregations up-to-date by listening to the Mutations log and lookup requests don’t add additional load to production DBs.
If you don’t have a Mutation capture system and don’t wish to set one up, you can still use Chronon streaming with plain Events.
In the API, the Mutation
is modeled as the general case for both Events
and Mutations
, since Events
can be viewed as a subset of Mutation
.
The StreamDecoder
is responsible for two function implementations:
decode
: Which takes in anArray[Byte]
and converts it to aMutation
,schema
: Which provides theStructType
for the givenGroupBy
Chronon has a type system that can map to Spark’s or Avro’s type system. Schema is based on the below table which contains Java types corresponding to the Chronon schema types. StreamDecoder should produce mutations that comply.
Chronon Type |
Java Type |
---|---|
IntType |
java.lang.Integer |
LongType |
java.lang.Long |
DoubleType |
java.lang.Double |
FloatType |
java.lang.Float |
ShortType |
java.lang.Short |
BooleanType |
java.lang.Boolean |
ByteType |
java.lang.Byte |
StringType |
java.lang.String |
BinaryType |
Array[Byte] |
ListType |
java.util.List[Byte] |
MapType |
java.util.Map[Byte] |
StructType |
Array[Any] |
See the Quickstart example here.
API: The main API that requires implementation is API. This combines the above implementations with other client and logging configuration.
ChrononMongoOnlineImpl Is an example implemenation of the API.
Once you have the api object you can build a fetcher class using the api object like so
val api = new MyApiImplementation(myParams)
val fetcher = api.buildFetcher()
//or
val javaFetcher = api.buildJavaFetcher()
// you can use fetcher to begin fetching values (there is a java version too)
fetcher.fetchJoins(Seq(Request(name="my.join.name", keys=Map("user" -> "bob", "item" -> "pizza"))))
// if your date partition column convention in your warehouse differs from "yyyy-MM-dd" you should set a partitionSpec
fetcher.setPartitionSpec("yyyyMMdd")
userConf
is captured from commandline arguments to the run.py
script or to the chronon-uber-jar
with ai.chronon.spark.Driver
as the main class -Zkey1=value1 -Zkey2=value2
becomes {key1: value1, key2: value2}
initializer argument to the Api class. You can use that to set KVStore params, or kafka params for streaming jobs or bulk upload jobs.