Python API#

ai.chronon.group_by.Aggregation(input_column: str | None = None, operation: Operation | Tuple[Operation, Dict[str, str]] | None = None, windows: List[Window] | None = None, buckets: List[str] | None = None, tags: Dict[str, str] | None = None) Aggregation#
Parameters:
  • input_column (str) – Column on which the aggregation needs to be performed. This should be one of the input columns specified on the keys of the select in the Query’s Source

  • operation (ttypes.Operation) – Operation to use to aggregate the input columns. For example, MAX, MIN, COUNT Some operations have arguments, like last_k, approx_percentiles etc., Defaults to “LAST”.

  • windows (List[ttypes.Window]) – Length to window to calculate the aggregates on. Minimum window size is 1hr. Maximum can be arbitrary. When not defined, the computation is un-windowed.

  • buckets (List[str]) – Besides the GroupBy.keys, this is another level of keys for use under this aggregation. Using this would create an output as a map of string to aggregate.

Returns:

An aggregate defined with the specified operation.

ai.chronon.group_by.Derivation(name: str, expression: str) Derivation#

Derivation allows arbitrary SQL select clauses to be computed using columns from the output of group by backfill output schema. It is supported for offline computations for now.

If both name and expression are set to “*”, then every raw column will be included along with the derived columns.

Parameters:
  • name – output column name of the SQL expression

  • expression – any valid Spark SQL select clause based on joinPart or externalPart columns

Returns:

a Derivation object representing a single derived column or a wildcard (“*”) selection.

ai.chronon.group_by.GroupBy(sources: List[Source | EventSource | EntitySource | JoinSource] | Source | EventSource | EntitySource | JoinSource, keys: List[str], aggregations: List[Aggregation] | None, online: bool | None = None, production: bool | None = None, backfill_start_date: str | None = None, dependencies: List[str] | None = None, env: Dict[str, Dict[str, str]] | None = None, table_properties: Dict[str, str] | None = None, output_namespace: str | None = None, accuracy: Accuracy | None = None, lag: int = 0, offline_schedule: str = '@daily', name: str | None = None, tags: Dict[str, str] | None = None, derivations: List[Derivation] | None = None, **kwargs) GroupBy#
Parameters:
  • sources (List[ai.chronon.api.ttypes.Events|ai.chronon.api.ttypes.Entities]) –

    can be constructed as entities or events or joinSource:

    import ai.chronon.api.ttypes as chronon
    events = chronon.Source(events=chronon.Events(
        table=YOUR_TABLE,
        topic=YOUR_TOPIC #  <- OPTIONAL for serving
        query=chronon.Query(...)
        isCumulative=False  # <- defaults to false.
    ))
    Or
    entities = chronon.Source(entities=chronon.Entities(
        snapshotTable=YOUR_TABLE,
        mutationTopic=YOUR_TOPIC,
        mutationTable=YOUR_MUTATION_TABLE
        query=chronon.Query(...)
    ))
    or
    joinSource =  chronon.Source(joinSource=chronon.JoinSource(
        join = YOUR_CHRONON_PARENT_JOIN,
        query = chronon.Query(...)
    ))
    

    Multiple sources can be supplied to backfill the historical values with their respective start and end partitions. However, only one source is allowed to be a streaming one.

  • keys (List[String]) – List of primary keys that defines the data that needs to be collected in the result table. Similar to the GroupBy in the SQL context.

  • aggregations (List[ai.chronon.api.ttypes.Aggregation]) –

    List of aggregations that needs to be computed for the data following the grouping defined by the keys:

    import ai.chronon.api.ttypes as chronon
    aggregations = [
        chronon.Aggregation(input_column="entity", operation=Operation.LAST),
        chronon.Aggregation(input_column="entity", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
    ],
    

  • online (bool) – Should we upload the result data of this conf into the KV store so that we can fetch/serve this GroupBy online. Once Online is set to True, you ideally should not change the conf.

  • production (bool) – This when set can be integrated to trigger alerts. You will have to integrate this flag into your alerting system yourself.

  • backfill_start_date (str) – Start date from which GroupBy data should be computed. This will determine how back of a time that Chronon would goto to compute the resultant table and its aggregations.

  • dependencies (List[str]) – This goes into MetaData.dependencies - which is a list of string representing which table partitions to wait for Typically used by engines like airflow to create partition sensors.

  • env (Dict[str, Dict[str, str]]) –

    This is a dictionary of “mode name” to dictionary of “env var name” to “env var value”:

    {
        'backfill' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' },
        'upload' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' }
        'streaming' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' }
    }
    

    These vars then flow into run.py and the underlying spark_submit.sh. These vars can be set in other places as well. The priority order (descending) is as below

    1. env vars set while using run.py “VAR=VAL run.py –mode=backfill <name>”

    2. env vars set here in Join’s env param

    3. env vars set in team.json[‘team.production.<MODE NAME>’]

    4. env vars set in team.json[‘default.production.<MODE NAME>’]

  • table_properties (Dict[str, str]) – Specifies the properties on output hive tables. Can be specified in teams.json.

  • output_namespace (str) – In backfill mode, we will produce data into hive. This represents the hive namespace that the data will be written into. You can set this at the teams.json level.

  • accuracy (ai.chronon.api.ttypes.SNAPSHOT or ai.chronon.api.ttypes.TEMPORAL) – Defines the computing accuracy of the GroupBy. If “Snapshot” is selected, the aggregations are computed based on the partition identifier - “ds” time column. If “Temporal” is selected, the aggregations are computed based on the event time - “ts” time column.

  • lag (int) – Param that goes into customJson. You can pull this out of the json at path “metaData.customJson.lag” This is used by airflow integration to pick an older hive partition to wait on.

  • offline_schedule (str) –

    the offline schedule interval for batch jobs. Below is the equivalent of the cron tab commands:

    '@hourly': '0 * * * *',
    '@daily': '0 0 * * *',
    '@weekly': '0 0 * * 0',
    '@monthly': '0 0 1 * *',
    '@yearly': '0 0 1 1 *',
    

  • tags (Dict[str, str]) – Additional metadata that does not directly affect feature computation, but is useful to track for management purposes.

  • derivations (List[ai.chronon.api.ttypes.Drivation]) – Derivation allows arbitrary SQL select clauses to be computed using columns from the output of group by backfill output schema. It is supported for offline computations for now.

  • kwargs (Dict[str, str]) – Additional properties that would be passed to run.py if specified under additional_args property. And provides an option to pass custom values to the processing logic.

Returns:

A GroupBy object containing specified aggregations.

ai.chronon.join.BootstrapPart(table: str, key_columns: List[str] | None = None, query: Query | None = None) BootstrapPart#

Bootstrap is the concept of using pre-computed feature values and skipping backfill computation during the training data generation phase. Bootstrap can be used for many purposes: - Generating ongoing feature values from logs - Backfilling feature values for external features (in which case Chronon is unable to run backfill) - Initializing a new Join by migrating old data from an older Join and reusing data

One can bootstrap against any of these:

  • join part fields:

    Bootstrap can happen at individual field level within a join part. If all fields within a group by are bootstrapped, then we skip computation for group by. Otherwise, the whole thing will be re-run but only the values for the non-bootstrapped fields will be retained in the final table.

  • external part fields:

    Bootstrap can happen at individual field level within an external part. Since there is no backfill logic in chronon for external part, all non-bootstrapped fields in external parts are left as NULLs.

  • derivation fields:

    Derived fields can also be bootstrapped. Since derived fields depend on “base” fields (either join part or external part), chronon will try to trigger the least amount of computation possible. For example, if there is a join part where all derived fields that depend on the join part have been bootstrapped, then we skip the computation for this join part.

  • keys:

    Keys of both join parts and external parts can be bootstrapped. During offline table generation, we will first try to utilize key’s data from left table; if it’s not there, then we utilize bootstrap. For contextual features, we also support propagating the key bootstrap to the values.

Dependencies are auto-generated based on source table and optional start_partition/end_partition. To override, add overriding dependencies to the main one (join.dependencies)

Parameters:
  • table – Name of hive table that contains feature values where rows are 1:1 mapped to left table

  • key_columns – Keys to join bootstrap table to left table

  • query – Selected columns (features & keys) and filtering conditions of the bootstrap tables.

ai.chronon.join.ContextualSource(fields: List[Tuple[str, TDataType]], team='default') ExternalSource#

Contextual source values are passed along for logging. No external request is actually made.

class ai.chronon.join.DataType#

Helper class to generate data types for declaring schema. This supports primitive like numerics, string etc., and complex types like Map, List, Struct etc.

ai.chronon.join.Derivation(name: str, expression: str) Derivation#

Derivation allows arbitrary SQL select clauses to be computed using columns from joinPart and externalParts, and saves the result as derived columns. The results will be available both in online fetching response map, and in offline Hive table.

joinPart column names are automatically constructed according to the below convention {join_part_prefix}_{group_by_name}_{input_column_name}_{aggregation_operation}_{window}_{by_bucket} prefix, window and bucket are optional. You can find the type information of columns using the analyzer tool.

externalPart column names are automatically constructed according to the below convention ext_{external_source_name}_{value_column}. Types are defined along with the schema by users for external sources.

Note that only values can be used in derivations, not keys. If you want to use a key in the derivation, you must define it as a contextual field. You also must refer to a contextual field with its prefix included, for example: ext_contextual_request_id.

If both name and expression are set to “*”, then every raw column will be included along with the derived columns.

Parameters:
  • name – output column name of the SQL expression

  • expression – any valid Spark SQL select clause based on joinPart or externalPart columns

Returns:

a Derivation object representing a single derived column or a wildcard (“*”) selection.

ai.chronon.join.ExternalPart(source: ExternalSource, key_mapping: Dict[str, str] | None = None, prefix: str | None = None) ExternalPart#

Used to describe which ExternalSources to pull features from while fetching online. This data also goes into logs based on sample percent.

Just as in JoinPart, key_mapping is used to map the join left’s keys to external source’s keys. “vendor” and “buyer” on left side (query map) could both map to a “user” in an account data external source. You would create one ExternalPart for vendor with params: (key_mapping={vendor: user}, prefix=vendor) another for buyer.

This doesn’t have any implications offline besides logging. “right_parts” can be both backfilled and logged. Whereas, “external_parts” can only be logged. If you need the ability to backfill an external source, look into creating an EntitySource with mutation data for point-in-time-correctness.

Parameters:
  • source – External source to join with

  • key_mapping – How to map the keys from the query/left side to the source

  • prefix – Sometime you want to use the same source to fetch data for different entities in the query. Eg., A transaction between a buyer and a seller might query “user information” serivce/source that has information about both buyer & seller

ai.chronon.join.ExternalSource(name: str, team: str, key_fields: List[Tuple[str, TDataType]], value_fields: List[Tuple[str, TDataType]], custom_json: str | None = None) ExternalSource#

External sources are online only data sources. During fetching, using chronon java client, they consume a Request containing a key map (name string to value). And produce a Response containing a value map.

This is primarily used in Joins. We also expose a fetchExternal method in java client library that can be used to fetch a batch of External source requests efficiently.

Internally Chronon will batch these requests to the service and parallelize fetching from different services, while de-duplicating given a batch of join requests.

The implementation of how to fetch is an ExternalSourceHandler in scala/java api that needs to be registered while implementing ai.chronon.online.Api with the name used in the ExternalSource. This is meant for re-usability of external source definitions.

Parameters:
  • name – name of the external source to fetch from. Should match the name in the registry.

  • key_fields – List of tuples of string and DataType. This is what will be given to ExternalSource handler registered in Java API. Eg., [(‘key1’, DataType.INT, ‘key2’, DataType.STRING)]

  • value_fields

    List of tuples of string and DataType. This is what the ExternalSource handler will respond with:

    [
        ('value0', DataType.INT),
        ('value1', DataType.MAP(DataType.STRING, DataType.LONG),
        ('value2', DataType.STRUCT(
            name = 'Context',
            ('field1', DataType.INT),
            ('field2', DataType.DOUBLE)
        ))
    ]
    

ai.chronon.join.Join(left: Source, right_parts: List[JoinPart], check_consistency: bool = False, additional_args: List[str] | None = None, additional_env: List[str] | None = None, dependencies: List[str] | None = None, online: bool = False, production: bool = False, output_namespace: str | None = None, table_properties: Dict[str, str] | None = None, env: Dict[str, Dict[str, str]] | None = None, lag: int = 0, skew_keys: Dict[str, List[str]] | None = None, sample_percent: float = 100.0, consistency_sample_percent: float = 5.0, online_external_parts: List[ExternalPart] | None = None, offline_schedule: str = '@daily', historical_backfill: bool | None = None, row_ids: List[str] | None = None, bootstrap_parts: List[BootstrapPart] | None = None, bootstrap_from_log: bool = False, label_part: LabelPart | None = None, derivations: List[Derivation] | None = None, tags: Dict[str, str] | None = None, **kwargs) Join#

Construct a join object. A join can pull together data from various GroupBy’s both offline and online. This is also the focal point for logging, data quality computation and monitoring. A join maps 1:1 to models in ML usage.

Parameters:
  • left (ai.chronon.api.Source) – The source on the left side, when Entities, all GroupBys are join with SNAPSHOT accuracy (midnight values). When left is events, if on the right, either when GroupBy’s are TEMPORAL, or when topic is specified, we perform a TEMPORAL / point-in-time join.

  • right_parts (List[ai.chronon.api.JoinPart]) – The list of groupBy’s to join with. GroupBy’s are wrapped in a JoinPart, which contains additional information on how to join the left side with the GroupBy.

  • check_consistency (bool) – If online serving data should be compared with backfill data - as online-offline-consistency metrics. The metrics go into hive and your configured kv store for further visualization and monitoring.

  • additional_args (List[str]) – Additional args go into customJson of ai.chronon.api.MetaData within the ai.chronon.api.Join object. This is a place for arbitrary information you want to tag your conf with.

  • additional_env (List[str]) – Deprecated, see env

  • dependencies (List[str]) – This goes into MetaData.dependencies - which is a list of string representing which table partitions to wait for Typically used by engines like airflow to create partition sensors.

  • online (bool) – Should we upload this conf into kv store so that we can fetch/serve this join online. Once Online is set to True, you ideally should not change the conf.

  • production (bool) – This when set can be integrated to trigger alerts. You will have to integrate this flag into your alerting system yourself.

  • output_namespace (str) – In backfill mode, we will produce data into hive. This represents the hive namespace that the data will be written into. You can set this at the teams.json level.

  • table_properties – Specifies the properties on output hive tables. Can be specified in teams.json.

  • env (Dict[str, Dict[str, str]]) –

    This is a dictionary of “mode name” to dictionary of “env var name” to “env var value”:

    {
        'backfill' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' },
        'upload' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' },
        'streaming' : { 'VAR1' : 'VAL1', 'VAR2' : 'VAL2' }
    }
    

    These vars then flow into run.py and the underlying spark_submit.sh. These vars can be set in other places as well. The priority order (descending) is as below

    1. env vars set while using run.py “VAR=VAL run.py –mode=backfill <name>”

    2. env vars set here in Join’s env param

    3. env vars set in team.json[‘team.production.<MODE NAME>’]

    4. env vars set in team.json[‘default.production.<MODE NAME>’]

  • lag – Param that goes into customJson. You can pull this out of the json at path “metaData.customJson.lag” This is used by airflow integration to pick an older hive partition to wait on.

  • skew_keys – While back-filling, if there are known irrelevant keys - like user_id = 0 / NULL etc. You can specify them here. This is used to blacklist crawlers etc

  • sample_percent – Online only parameter. What percent of online serving requests to this join should be logged into warehouse.

  • consistency_sample_percent – Online only parameter. What percent of online serving requests to this join should be sampled to compute online offline consistency metrics. if sample_percent=50.0 and consistency_sample_percent=10.0, then basically the consistency job runs on 5% of total traffic.

  • online_external_parts – users can register external sources into Api implementation. Chronon fetcher can invoke the implementation. This is applicable only for online fetching. Offline this will not be produce any values.

  • offline_schedule – Cron expression for Airflow to schedule a DAG for offline join compute tasks

  • row_ids – Columns of the left table that uniquely define a training record. Used as default keys during bootstrap

  • bootstrap_parts – A list of BootstrapPart used for the Join. See BootstrapPart doc for more details

  • bootstrap_from_log – If set to True, will use logging table to generate training data by default and skip continuous backfill. Logging will be treated as another bootstrap source, but other bootstrap_parts will take precedence.

  • label_part – Label part which contains a list of labels and label refresh window boundary used for the Join

  • tags (Dict[str, str]) – Additional metadata about the Join that you wish to track. Does not effect computation.

  • historical_backfill (bool) – Flag to indicate whether join backfill should backfill previous holes. Setting to false will only backfill latest single partition

Returns:

A join object that can be used to backfill or serve data. For ML use-cases this should map 1:1 to model.

ai.chronon.join.JoinPart(group_by: GroupBy, key_mapping: Dict[str, str] | None = None, prefix: str | None = None, tags: Dict[str, str] | None = None) JoinPart#

Specifies HOW to join the left of a Join with GroupBy’s.

Parameters:
  • group_by (ai.chronon.api.GroupBy) – The GroupBy object to join with. Keys on left are used to equi join with keys on right. When left is entities all GroupBy’s are computed as of midnight. When left is events, we do a point-in-time join when right.accuracy == TEMPORAL OR right.source.topic != null

  • key_mapping (Dict[str, str]) – Names of keys don’t always match on left and right, this mapping tells us how to map when they don’t.

  • prefix – All the output columns of the groupBy will be prefixed with this string. This is used when you need to join the same groupBy more than once with left. Say on the left you have seller and buyer, on the group you have a user’s avg_price, and you want to join the left (seller, buyer) with (seller_avg_price, buyer_avg_price) you would use key_mapping and prefix parameters.

  • tags (Dict[str, str]) – Additional metadata about the JoinPart that you wish to track. Does not effect computation.

Returns:

JoinPart specifies how the left side of a join, or the query in online setting, would join with the right side components like GroupBys.

ai.chronon.join.LabelPart(labels: List[JoinPart], left_start_offset: int, left_end_offset: int, label_offline_schedule: str = '@daily') LabelPart#

Used to describe labels in join. Label part can be viewed as regular join part but represent label data instead of regular feature data. Once labels are mature, label join job would join labels with features in the training window user specified using leftStartOffset and leftEndOffset.

The offsets are relative days compared to given label landing date label_ds. This parameter is required to be passed in for each label join job. For example, given label_ds = 2023-04-30, left_start_offset = 30, and left_end_offset = 10, the left size start date will be computed as 30 days before label_ds (inclusive), which is 2023-04-01. Similarly, the left end date will be 2023-04-21. Labels will be refreshed within this window [2023-04-01, 2023-04-21] in this specific label job run.

Since label join job will run continuously based on the schedule, multiple labels could be generated but with different label_ds or label version. Label join job would have all computed label versions available, as well as a view of latest version for easy label retrieval.

LabelPart definition can be updated along the way, but label join job can only accommodate these changes going forward unless a backfill is manually triggered.

Label aggregation is also supported but with conditions applied. Single aggregation with one window is allowed for now. If aggregation is present, we would infer the left_start_offset and left_end_offset same as window size and the param input will be ignored.

Parameters:
  • labels – List of labels

  • left_start_offset – Relative integer to define the earliest date label should be refreshed compared to label_ds date specified. For labels with aggregations, this param has to be same as aggregation window size.

  • left_end_offset – Relative integer to define the most recent date(inclusive) label should be refreshed. e.g. left_end_offset = 3 most recent label available will be 3 days prior to ‘label_ds’ (including label_ds). For labels with aggregations, this param has to be same as aggregation window size.

  • label_offline_schedule – Cron expression for Airflow to schedule a DAG for offline label join compute tasks

ai.chronon.query.Query(selects: Dict[str, str] | None = None, wheres: List[str] | None = None, start_partition: str | None = None, end_partition: str | None = None, time_column: str | None = None, setups: List[str] = [], mutation_time_column: str | None = None, reversal_column: str | None = None) Query#

Create a query object that is used to scan data from various data sources. This contains partition ranges, row level transformations and filtering logic. Additionally we also require a time_column for TEMPORAL events, mutation_time_column & reversal for TEMPORAL entities.

Parameters:
  • selects (List[str], optional) –

    Spark sql expressions with only arithmetic, function application & inline lambdas. You can also apply udfs see setups param below.:

    Example: {
        "alias": "built_in_function(col1) * my_udf(col2)",
        "alias1": "aggregate(array_col, 0, (acc, x) -> acc + x)"
    }
    

    See: https://spark.apache.org/docs/latest/api/sql/#built-in-functions When none, we will assume that no transformations are needed and will pick columns necessary for aggregations.

  • wheres (List[str], optional) – Used for filtering. Same as above, but each expression must return boolean. Expressions are joined using AND.

  • start_partition (str, optional) – From which partition of the source is the data valid from - inclusive. When absent we will consider all available data is usable.

  • end_partition (str, optional) – Till what partition of the source is the data valid till - inclusive. Not specified unless you know for a fact that a particular source has expired after a partition and you should instead use another source after this partition.

  • time_column (str, optional) – a single expression to produce time as ** milliseconds since epoch**.

  • setups (List[str], optional) – you can register UDFs using setups [“ADD JAR YOUR_JAR”, “create temporary function YOU_UDF_NAME as YOUR_CLASS”]

  • mutation_time_column (str, optional) – For entities, with real time accuracy, you need to specify an expression that represents mutation time. Time should be milliseconds since epoch. This is not necessary for event sources, defaults to “mutation_ts”

  • reversal_column – str, optional For entities with realtime accuracy, we divide updates into two additions & reversal. updates have two rows - one with is_before = True (the old value) & is_before = False (the new value) inserts only have is_before = false (just the new value). deletes only have is_before = true (just the old value). This is not necessary for event sources.

  • reversal_column – str, optional (defaults to “is_before”)

Returns:

A Query object that Chronon can use to scan just the necessary data efficiently.

class ai.chronon.api.ttypes.Aggregation(inputColumn=None, operation=None, argMap=None, windows=None, buckets=None)#

Chronon provides a powerful aggregations primitive - that takes the familiar aggregation operation, via groupBy in SQL and extends it with three things - windowing, bucketing and auto-explode.

Attributes:
  • inputColumn: The column as specified in source.query.selects - on which we need to aggregate with.

  • operation: The type of aggregation that needs to be performed on the inputColumn.

  • argMap: Extra arguments that needs to be passed to some of the operations like LAST_K, APPROX_PERCENTILE.

  • windows: For TEMPORAL case windows are sawtooth. Meaning head slides ahead continuously in time, whereas, the tail only hops ahead, at discrete points in time. Hop is determined by the window size automatically. The maximum hop size is 1/12 of window size. You can specify multiple such windows at once.

  • Window > 12 days -> Hop Size = 1 day

  • Window > 12 hours -> Hop Size = 1 hr

  • Window > 1hr -> Hop Size = 5 minutes

  • buckets: This is an additional layer of aggregation. You can key a group_by by user, and bucket a “item_view” count by “item_category”. This will produce one row per user, with column containing map of “item_category” to “view_count”. You can specify multiple such buckets at once

class ai.chronon.api.ttypes.EntitySource(snapshotTable=None, mutationTable=None, mutationTopic=None, query=None)#

Entity Sources represent data that gets mutated over-time - at row-level. This is a group of three data elements. snapshotTable, mutationTable and mutationTopic. mutationTable and mutationTopic are only necessary if we are trying to create realtime or point-in-time aggregations over these sources. Entity sources usually map 1:1 with a database tables in your OLTP store that typically serves live application traffic. When mutation data is absent they map 1:1 to dim tables in star schema.

Attributes:
  • snapshotTable: Snapshot table currently needs to be a ‘ds’ (date string - yyyy-MM-dd) partitioned hive table.

  • mutationTable: Topic is a kafka table. The table contains all the events that historically came through this topic.

  • mutationTopic: The logic used to scan both the table and the topic. Contains row level transformations and filtering expressed as Spark SQL statements.

  • query: If each new hive partition contains not just the current day’s events but the entire set of events since the begininng. The key property is that the events are not mutated across partitions.

class ai.chronon.api.ttypes.EventSource(table=None, topic=None, query=None, isCumulative=None)#
Attributes:
  • table: Table currently needs to be a ‘ds’ (date string - yyyy-MM-dd) partitioned hive table. Table names can contain subpartition specs, example db.table/system=mobile/currency=USD

  • topic: Topic is a kafka table. The table contains all the events historically came through this topic.

  • query: The logic used to scan both the table and the topic. Contains row level transformations and filtering expressed as Spark SQL statements.

  • isCumulative: If each new hive partition contains not just the current day’s events but the entire set of events since the begininng. The key property is that the events are not mutated across partitions.

class ai.chronon.api.ttypes.StagingQuery(metaData=None, query=None, startPartition=None, setups=None)#

Staging Query encapsulates arbitrary spark computation. One key feature is that the computation follows a “fill-what’s-missing” pattern. Basically instead of explicitly specifying dates you specify two macros. {{ start_date }} and {{end_date}}. Chronon will pass in earliest-missing-partition for start_date and execution-date / today for end_date. So the query will compute multiple partitions at once.

Attributes:
  • metaData: Contains name, team, output_namespace, execution parameters etc. Things that don’t change the semantics of the computation itself.

  • query: Arbitrary spark query that should be written with {{ start_date }}, {{ end_date }} and {{ latest_date }} templates
    • {{ start_date }} will be set to this user provided start date, future incremental runs will set it to the latest existing partition + 1 day.

    • {{ end_date }} is the end partition of the computing range.

    • {{ latest_date }} is the end partition independent of the computing range (meant for cumulative sources).

    • {{ max_date(table=namespace.my_table) }} is the max partition available for a given table.

  • startPartition: on the first run, {{ start_date }} will be set to this user provided start date, future incremental runs will set it to the latest existing partition + 1 day.

  • setups: Spark SQL setup statements. Used typically to register UDFs.

class ai.chronon.api.ttypes.Window(length=None, timeUnit=None)#
Attributes:
  • length

  • timeUnit