Python API for data analytics
Python API for data analytics

Python API for data analytics

1. Install KAWA’s Python client

In order to install it, you need python 3.8 at least.

Run the following command to install KAWA Client:

pip install kywy

(It is hosted in PyPi)

2. Login to KAWA via the Python Client

2.a) Retrieve the API Key

In order to authenticate on KAWA via the Python client, you need to use your API Key. To retrieve your API key, you need to generate it from there:

⚠️
Once the API Key has been generated, it cannot be recovered (KAWA does not store it anywhere). You can always generate a new one if you wish.
image

2.b) Login to KAWA via the Python Client

To login into KAWA from the Python client, use the following script:

from kywy.client.kawa_client import KawaClient

k = KawaClient(kawa_api_url='http(s)://<YOUR URL>')

# Copy and paste your key into a file and load it
# to authenticate.
# (The file must contain only one line, which is the key.
k.set_api_key(api_key_file='kawa.key')

# Alternatively, copy and paste your key into your script
# (Not recommended)
k.set_api_key(api_key='*********')

# You can optionally set a worksapce id
# If you don't, it will position you in the last workspace you were in
k.set_active_workspace_id(workspace_id=1)

sheets = k.entities.sheets()

print('There are {} sheets in workspace {}'.format(len(sheets.list_entities()), k.active_workspace_id))

Profiles You can also authenticate using a profile file - this is convenient when you often switch between various environments.

Here is an example of a valid profile json file:

{
    "url":"https://wayne.kawa.ai",
    "apiKey":"3q4ggrwm42r52nkot42mnio2t4nio2324"
}

In order to load a profile:

k = KawaClient()
k.open_profile(profile='/home/lucius/production.json')

3. Data Source management

3.a) Create a datasource

Creating datasources in KAWA requires a data loader.

In order to create a data loader, use the new_data_loader method on the KAWA client.

# Creates a new data loader with a simple dataframe and a name
loader = k.new_data_loader(
    datasource_name='Orders',
    df=pd.DataFrame([{
       'boolean1':True,
       'text1':'bar',
       'measure1': 1.124}])
    )

Once the loader is created, use it to create a datasource. If a datasource with the same already exists, this command won’t have any effect. It won’t update the datasource.

# Creates a datasource based on the schema of the dataframe, with
# the given name.
# NO DATA WILL BE INSERTED AT THAT POINT
ds = loader.create_datasource()

You can also specify the primary keys of your datasource when you create it:

# Here, create the datasource by specifying that the primary key is 'text1'
ds = loader.create_datasource(primary_keys=['text1'])
ℹ️
If you do not specify any primary key, an auto increment key will be added automatically for you. It will be named: record_id
⚠️
The primary keys will be in the same order as the columns of the dataframe, not in the order of the primary_keys array. Please refer to the paragraph about indexation to learn more about how to order your primary keys.

The indicators of the datasource will be build by introspecting the schema of the dataframe:

Pandas type(s)
Kawa type
Note
int, int8, int16, int32, int64, uint8, uint16, uint32, uint64
integer
float, float16, float32, float64
decimal
date
date
datetime, pd.Timestamp
date_time
If the timezone is not specified, will use the local timezone.
string
text
boolean
boolean
number of days since epoch
date
The column has to be in date_columns array.
number of milliseconds since epoch
date_time
The column has to be in datetime_columns array.
array of texts
list(integer,text)
array of integers
list(integer,integer)
array of floats
list(integer,decimal)
ℹ️
If a column of the dataframe has an unrecognised type, the datasource cannot be built.

3.b) Adding columns to an existing datasource

In order to add indicators to an existing datasource, create a new loader with a dataframe containing the new indicators and call the add_new_indicators_to_datasource method. This method won’t have any effect if no new indicator is present.

# Creates a new data loader on an existing datasource
loader = k.new_data_loader(
    datasource_name='Existing datasource name',
    df=pd.DataFrame([{
       'new measure': 123.34,
       'boolean1':True,
       'text1':'bar',
       'measure1': 1.124}])
    )

# The 'new measure' indicator was not part of the datasource,
# it will be added as a decimal indicator
loader.add_new_indicators_to_datasource()

3.c) Loading data into KAWA

Loading data into a datasource requires a data loader:

dtf = pd.DataFrame([{...}])
loader = self.kawa.new_data_loader(df=dtf, datasource_name='Orders')
# Make sure to call: loader.create_datasource() before loading the data
# otherwise an exception will be raised.
loader.load_data(
    reset_before_insert=True,
    create_sheet=True,
    nb_threads=2
)

reset_before_insert:

By default, this is False.

  • If set to False, the data of the new dataframe will be added on top of what was there before. This is an incremental load. If some primary keys are defined, new values for existing keys will replace existing ones (upsert). If no primary keys were defined, KAWA will have introduced an auto increment indicator. In that case, the incoming data will be appended to the existing ones without any replacement.
  • If set to True, the data of the new dataframe will replace whatever was there before.

create_sheet:

If set to True, a sheet will be created after the load. Its URL will be printed in the standard output. By default, this is False.

nb_threads:

In order to speed up the load, increase that number. Do not specify a number above the number of cores of the Clickhouse server. In general, values higher than 10 won’t add anything to the loading speed.

When this parameter has a value above 1, the dataframe will be split into nb_threads parquet files and each one will be sent to the server in a separate thread. This allows to use multiple cores when streaming data into the warehouse.

3.d) Example script

from kywy.client.kawa_client import KawaClient
import pandas as pd
from datetime import date, datetime

URL = 'https://<YOUR URL>'
KEY = '<PATH TO YOUR KEY>'
WORKSPACE_ID = 1

k = KawaClient(kawa_api_url=URL)
k.set_api_key(api_key_file=KEY)
k.set_active_workspace_id(workspace_id=WORKSPACE_ID)

df = pd.DataFrame([{
    'boolean': True,
    'text': 'bar',
    'integer': 1,
    'decimal': 3.14,
    'date': date(2023, 1, 1),
    'date_time': datetime(2023, 1, 1, 23, 23, 2),
}])

loader = k.new_data_loader(df=df, datasource_name='New Datasource')
# Idempotent, won't do anything if the datasource already exists
loader.create_datasource() 
loader.load_data(create_sheet=True)

'''
SAMPLE OUTPUT:
Authenticated in workspace 1
Starting an ingestion session with id=1ac2756a-21a0-4aef-a042-157c8abce18c
> Exporting the dataframe a parquet file
> Streaming file /tmp/16d66d28-de5e-47f1-84d3-ff2c2f7bfde2/data.parquet to KAWA
> 1 rows were imported in 0.13490009307861328ms
> Import was successfully finalized
Sheet New Datasource was created: https://url/to/sheet

Process finished with exit code 0
'''

3.e) Indexation and Partitioning (advanced)

When your data is stored on Clickhouse, you can use the Python API to index it and configure how it is partitionned. Here is a comprehensive documentation that explains the architecture of Clickhouse indexes:

https://clickhouse.com/docs/en/optimize/sparse-primary-indexes

KAWA API exposes one command to let you change the primary keys and the partition key of your table.

At any point, you can check the schema of your Clickhouse table by running:

id = datasource.get('id')
k.entities.get_datasource_schema(datasource_id=id)

This helps you control the current status of partitions and primary and order keys.

The primary keys:

KAWA will always use the primary key set as the order key set in Clickhouse. This means that the data will be sorted by each of the primary key, in THAT order. In the first example below: the data will be sorted first by date, then by portfolio, and then by record id.

This will result in higher performances for queries with filters on the date column. The further down a column is, the less significant the speedup will be by filtering on that column.

💡
It is recommended to put in the first position the dimension on which most of the views are filtered - a date for instance. [‘date’,‘portfolio’,‘record_id’] If a data source contains a natural hierarchy, putting the hierarchy in the correct order is also a good idea: [‘region’,‘country’,‘city’,‘record_id’] ⚠️ To avoid losing data, THE REAL PRIMARY KEY MUST ALWAYS BE IN THE PROVIDED LIST.

The partition key:

This will tell clickhouse how to partition the data on the hard drive. It will improve performance for ingestion and select queries. Please refer to this link for more details about partitions:

https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/custom-partitioning-key

Setting a partition key will also make it possible to remove old / unwanted partitions through the KAWA API.

ℹ️
Only partitions by dates are supported.

This command can take several minutes to run (it depends on the size of the data - data will be copied over to the newly configured table), do not interrupt it or attempt to run it multiple times. It is strongly recommended to use it outside of business hours.

# Command to set a new set of primary keys and a partition
cmd = k.commands
cmd.replace_datasource_primary_keys(
   datasource=datasource,
   new_primary_keys=['date','record_id'],
   partition_key='date')
⚠️

Setting new primary keys can result in the loss of data. When you use this command, make sure that the set of primary keys reflect the unicity of your records.

KAWA uses the ReplacingMergeTree storage engine for most of its tables. https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

3.f) Removing data from a datasource (advanced)

The only way to delete data from KAWA is by removing partitions. Before using this command, please refer to the previous paragraph to setup a date partition.

# Command to drop a partition of data
cmd = k.commands
cmd.drop_date_partition(
   datasource=datasource, 
   date_partition=date(2024, 1, 1)
)

4. KAWA Data Frame Computation API

Log into KAWA using your API key:

>>> k = KawaClient(kawa_api_url='https://<YOUR URL>')
>>> k.set_api_key(api_key_file='kawa.key')
>>> k.set_active_workspace_id(workspace_id=1)

Create an instance of a KAWA Sheet:

>>> sheet = k.sheet(sheet_name='Orders')

Query the schema of the frame:

>>> sheet.schema()
{'Profit': 'integer', 'Client ID': 'text', 'Order date': 'date' }

This API currently supports the following types:

  • integer
  • decimal
  • date
  • date_time
  • boolean
  • text

4.a) Build a computation query

There is 2 ways to request data from KAWA, either by requesting a sheet and configuring all the filters, grouping, sort … in the request or by requesting an existing view in KAWA.

See examples below:

Request a view:

You can request the data from a KAWA view:

df = (
    k.sheet()
        .view_id(view_id=3118)
        .limit(100000)
        .compute()
)

You can override the default limit (300) with another number to load larger data sets.

The view id can be found in the last part of the URL:

image

Request a widget:

You can request the data from a KAWA widget from within a dashboard:

df = k.widget(dashboard_name='DSH1', widget_name='Grid').limit(10).compute()

The dashboard name and the widget name can be found as shown below. The computations on dashboards will take in account the dashboard filters.

image

Request by sheet: In order to build a query, use the sheet method. It takes to arguments: - The name of the sheet (A sheet with that name has to exist into your current workspace) - (Optional) A timezone from which to do the computation in. It will default to your current timezone. ℹ️ The query will not be executed as long as a computation operator is not called.

>>> query = (
    k.sheet(sheet_name='Orders', force_tz='Asia/Tokyo')
     .group_by('Order date')
     .sample(sampler='YEAR_AND_MONTH')
     .agg(k.col('Profit').sum())
     .filter(k.col('Profit').lt(100).gt(0)))
     .filter(k.col('Profit').sum().lt(1000))
     .limit(1000))

The computation operators:

In order to perform the computation, use the following operators:

compute
Retrieve the result inside a pandas data frame. Computes needs: - If the frame is grouped (presence of the group_by operator): the agg operator - If the frame is not grouped: the select operator.

This will put in the df data frame the result of the query. The query will be computed on KAWA’s server, using the API Token to authenticate the user and apply the relevant Row Level Security policies.

>>> df = query.compute()

The expression operators:

group_by
Group the frame by the indicated column name
sample
Define the sampler to apply to the grouping column [date]: WEEK, MONTH, QUARTER, SEMESTER, YEAR, YEAR_AND_WEEK, YEAR_AND_MONTH, YEAR_AND_QUARTER, YEAR_AND_SEMESTER, DAY_OF_YEAR, DAY_OF_WEEK [date_time]: (Computed in your local time zone) Same as for date plus: DAY, TWELVE_HOURS, SIX_HOURS, HOUR, THIRTY_MINUTES, FIFTEEN_MINUTES, TEN_MINUTES, FIVE_MINUTES, MINUTE, THIRTY_SECONDS [decimal and integer]: FIXED_NUMBER_OF_BINS (with extra argument: how_many_buckets) LIST_OF_BINS (with extra argument: buckets) FIXED_SIZE_BINS (with extra argument: bucket_size) For example: sample(sampler=’FIXED_SIZE_BINS’, bucket_size=30) Will create buckets of 30 numbers.
agg
Define the aggregation functions for all the columns. Accepts a list of column aggregates. Only meaningful if the frame has the group_by operation. Example: agg( k.col(’Profit’).sum(), k.col(’Quantity’).avg())
filter
Define how the data will be filtered. Accepts a column filter, cf the column api
select
Defines what columns to retrieve. Accepts a list of column names. For example: select(’Country’,’Date) Also accepts column objects: select( k.col(’Profit’), k.col(’Quantity’) )
limit
Will add a limit to the number or returned rows. by default, the limit is 100.

The column operators:

This API lets you build expressions based on individual or multiple columns.

col
Loads a column object, by its name. You can use the lf.schema() on a lazy data frame to load the list of all the columns.
cols
Loads multiple columns based on a regular expression: k.cols('Measure.*') All the columns whose name starts with Measure. k.cols() All the columns of the sheet. This operator can be used in the select and the agg statements.
aggregate
Defines a column aggregate. (Will be meaningful only if the lazy frame contains a group_by clause) The list of available aggregates are: (all aggregations can be written as col(’A’).aggregate(’sum’) or col(’A’).sum()) [decimal and integer]: - sum - avg, median - min, max. min_abs, max_abs - var_sample, var_pop, std_dev_sample, std_dev_pop - lowest_decile, lowest_quartile, highest_decile, highest_quartile [text]: - count, count_unique, percent_filled, percent_empty, count_empty - identical, identical_ignore_empty - [date and date_time]: - min, max - identical - count_unique
filter
Defines a column filter. Can be built out of a row value or aggregated column: All profits greater than 0: k.col('Profit').gt(0) All groups whose profit adds up to less than 1000: k.col('Profit').sum().lt(1000) Filters can be chained on a given column, this will result in a ‘OR’ operation. All the profits smaller than 0 or above 5: k.col('Profit').sum().lt(0).gt(5) ⚠️ The chaining cannot be done with the in_list operator or date / date time filters If you apply multiple filters, the ‘AND’ operation will be applied. query = ( k.lazy_frame(sheet_name='Orders') .group_by('Order date') .sample(sampler='YEAR_AND_MONTH') .sort('Order date', direction='ASCENDING') .agg(k.col('Profit').sum()) .filter(k.col('Profit').lt(0).gt(100))) .filter(k.col('Profit').sum().lt(1000)) ) In this example, the filter will be: All negative profits or profits greater than 100 AND all dates for which the global profit is lesser than 1000. They can be reversed using the exclude() operator. For example to keep all the profit whose sum is different from 0: k.col(’Profit’).sum().eq(0).exclude() Another example: k.col(’Profit’).sum().eq(0).gte(100).exclude() All profits whose sum is not 0 and is lesser than 100. (The exclusion happens on the result of the OR operation) [decimal and integer]: lt(number) Strictly lesser than a number gt(number) Strictly greater than a number lte(number) Lesser or equal to a number gte(number) Greater or equal to a number eq(number) Equals a number ne(number) Different from a number empty()Empty numbers not_empty() Non empty numbers [text]: All text filters are case insensitive. in_list(list_of_values) Keeps only the values that are in the list. For example: k.col('Country').in_list(’GB’,’US’) starts_with(prefix) Starts with the specified prefix ends_with(suffix) Ends with the specified suffix contains(substring) Contains a substring empty() Empty texts not_empty() Non empty texts does_not_end_with() Does not end with a given suffix does_start_with() Does not start with a given prefix does_not_contain() Does not contain a substring [date]: Date filters operate on ranges of datetime.date objects. For example: col('Order Date').date_range(from_inclusive=date(2023,1,1), to_inclusive=date.today()) (This is a YTD filter on the year 2023) Both boundaries are optional. yoy_ytd()This filter applies a year on year, year to date filter on the selected column Additionally, you can add the weekdays_only() operator to any date filter to filter out weekends. For example: col('Order Date').date_range(date(2023,1,1), date.today()).weekdays_ony() Will be a YTD filter with weekdays only. [date time]: Date time filters operate on ranges on datetime objects. For example: col('TS').datetime_range(from_inclusive=datetime(2023,1,1), to_inclusive=date.timetoday()) A YTD filter on date time column: TS

Example script:

from kywy.client.kawa_client import KawaClient
from datetime import date, datetime

k = KawaClient(kawa_api_url='https://<YOUR URL>')
k.set_api_key(api_key_file='<PATH TO THE KEY FILE>')
k.set_active_workspace_id(workspace_id=1)

print(k.sheet(sheet_name='orders').schema())

by_state = (
    k.sheet(sheet_name='orders', force_tz='Asia/Tokyo')
        .group_by('State')
        .agg(k.col('Profit').sum(), k.col('State').identical())
)

flat = (
    k.sheet(sheet_name='orders')
        .filter(k.col('Order date').yoy_ytd().weekdays_only())
        .filter(k.col('State').contains('Kentucky').empty().exclude())
        .filter(k.col('State').identical().contains('Kentucky'))
        .select('Order ID', 'Profit', 'State', 'Order Date')
        .limit(100 * 1000)

)

df1 = flat.compute().sort_values(by=['Order Date'], ascending=False)
df2 = by_state.compute().sort_values(by=['Order Date'], ascending=False)

Request by view: You can find the view ID in the URL

from kywy.client.kawa_client import KawaClient
from datetime import date, datetime

k = KawaClient(kawa_api_url='https://<YOUR URL>')
k.set_api_key(api_key_file='<PATH TO THE KEY FILE>')
k.set_active_workspace_id(workspace_id=1)


data = (
    k.sheet()
        .view_id(3118)
        .limit(100 * 1000)
)

df = data.compute()