- 1. Install KAWA’s Python client
- 2. Login to KAWA via the Python Client
- 2.a) Retrieve the API Key
- 2.b) Login to KAWA via the Python Client
- 3. Data Source management
- 3.a) Create a datasource
- 3.b) Adding columns to an existing datasource
- 3.c) Loading data into KAWA
- 3.d) Example script
- 3.e) Indexation and Partitioning (advanced)
- 3.f) Removing data from a datasource (advanced)
- 4. KAWA Data Frame Computation API
- 4.a) Build a computation query
1. Install KAWA’s Python client
In order to install it, you need python 3.8 at least.
Run the following command to install KAWA Client:
pip install kywy
(It is hosted in PyPi)
2. Login to KAWA via the Python Client
2.a) Retrieve the API Key
In order to authenticate on KAWA via the Python client, you need to use your API Key. To retrieve your API key, you need to generate it from there:
2.b) Login to KAWA via the Python Client
To login into KAWA from the Python client, use the following script:
from kywy.client.kawa_client import KawaClient
k = KawaClient(kawa_api_url='http(s)://<YOUR URL>')
# Copy and paste your key into a file and load it
# to authenticate.
# (The file must contain only one line, which is the key.
k.set_api_key(api_key_file='kawa.key')
# Alternatively, copy and paste your key into your script
# (Not recommended)
k.set_api_key(api_key='*********')
# You can optionally set a worksapce id
# If you don't, it will position you in the last workspace you were in
k.set_active_workspace_id(workspace_id=1)
sheets = k.entities.sheets()
print('There are {} sheets in workspace {}'.format(len(sheets.list_entities()), k.active_workspace_id))
Profiles You can also authenticate using a profile file - this is convenient when you often switch between various environments.
Here is an example of a valid profile json file:
{
"url":"https://wayne.kawa.ai",
"apiKey":"3q4ggrwm42r52nkot42mnio2t4nio2324"
}
In order to load a profile:
k = KawaClient()
k.open_profile(profile='/home/lucius/production.json')
3. Data Source management
3.a) Create a datasource
Creating datasources in KAWA requires a data loader.
In order to create a data loader, use the new_data_loader
method on the KAWA client.
# Creates a new data loader with a simple dataframe and a name
loader = k.new_data_loader(
datasource_name='Orders',
df=pd.DataFrame([{
'boolean1':True,
'text1':'bar',
'measure1': 1.124}])
)
Once the loader is created, use it to create a datasource. If a datasource with the same already exists, this command won’t have any effect. It won’t update the datasource.
# Creates a datasource based on the schema of the dataframe, with
# the given name.
# NO DATA WILL BE INSERTED AT THAT POINT
ds = loader.create_datasource()
You can also specify the primary keys of your datasource when you create it:
# Here, create the datasource by specifying that the primary key is 'text1'
ds = loader.create_datasource(primary_keys=['text1'])
record_id
The indicators of the datasource will be build by introspecting the schema of the dataframe:
Pandas type(s) | Kawa type | Note |
int, int8, int16, int32, int64,
uint8, uint16, uint32, uint64 | integer | |
float, float16, float32, float64 | decimal | |
date | date | |
datetime, pd.Timestamp | date_time | If the timezone is not specified, will use the local timezone. |
string | text | |
boolean | boolean | |
number of days since epoch | date | The column has to be in date_columns array. |
number of milliseconds since epoch | date_time | The column has to be in datetime_columns array. |
array of texts | list(integer,text) | |
array of integers | list(integer,integer) | |
array of floats | list(integer,decimal) |
3.b) Adding columns to an existing datasource
In order to add indicators to an existing datasource, create a new loader with a dataframe containing the new indicators and call the add_new_indicators_to_datasource
method. This method won’t have any effect if no new indicator is present.
# Creates a new data loader on an existing datasource
loader = k.new_data_loader(
datasource_name='Existing datasource name',
df=pd.DataFrame([{
'new measure': 123.34,
'boolean1':True,
'text1':'bar',
'measure1': 1.124}])
)
# The 'new measure' indicator was not part of the datasource,
# it will be added as a decimal indicator
loader.add_new_indicators_to_datasource()
3.c) Loading data into KAWA
Loading data into a datasource requires a data loader:
dtf = pd.DataFrame([{...}])
loader = self.kawa.new_data_loader(df=dtf, datasource_name='Orders')
# Make sure to call: loader.create_datasource() before loading the data
# otherwise an exception will be raised.
loader.load_data(
reset_before_insert=True,
create_sheet=True,
nb_threads=2
)
reset_before_insert:
By default, this is False
.
- If set to
False
, the data of the new dataframe will be added on top of what was there before. This is an incremental load. If some primary keys are defined, new values for existing keys will replace existing ones (upsert
). If no primary keys were defined, KAWA will have introduced an auto increment indicator. In that case, the incoming data will be appended to the existing ones without any replacement. - If set to
True
, the data of the new dataframe will replace whatever was there before.
create_sheet:
If set to True
, a sheet will be created after the load. Its URL will be printed in the standard output. By default, this is False
.
nb_threads:
In order to speed up the load, increase that number. Do not specify a number above the number of cores of the Clickhouse server. In general, values higher than 10 won’t add anything to the loading speed.
When this parameter has a value above 1, the dataframe will be split into nb_threads
parquet files and each one will be sent to the server in a separate thread. This allows to use multiple cores when streaming data into the warehouse.
3.d) Example script
from kywy.client.kawa_client import KawaClient
import pandas as pd
from datetime import date, datetime
URL = 'https://<YOUR URL>'
KEY = '<PATH TO YOUR KEY>'
WORKSPACE_ID = 1
k = KawaClient(kawa_api_url=URL)
k.set_api_key(api_key_file=KEY)
k.set_active_workspace_id(workspace_id=WORKSPACE_ID)
df = pd.DataFrame([{
'boolean': True,
'text': 'bar',
'integer': 1,
'decimal': 3.14,
'date': date(2023, 1, 1),
'date_time': datetime(2023, 1, 1, 23, 23, 2),
}])
loader = k.new_data_loader(df=df, datasource_name='New Datasource')
# Idempotent, won't do anything if the datasource already exists
loader.create_datasource()
loader.load_data(create_sheet=True)
'''
SAMPLE OUTPUT:
Authenticated in workspace 1
Starting an ingestion session with id=1ac2756a-21a0-4aef-a042-157c8abce18c
> Exporting the dataframe a parquet file
> Streaming file /tmp/16d66d28-de5e-47f1-84d3-ff2c2f7bfde2/data.parquet to KAWA
> 1 rows were imported in 0.13490009307861328ms
> Import was successfully finalized
Sheet New Datasource was created: https://url/to/sheet
Process finished with exit code 0
'''
3.e) Indexation and Partitioning (advanced)
When your data is stored on Clickhouse, you can use the Python API to index it and configure how it is partitionned. Here is a comprehensive documentation that explains the architecture of Clickhouse indexes:
https://clickhouse.com/docs/en/optimize/sparse-primary-indexes
KAWA API exposes one command to let you change the primary keys and the partition key of your table.
At any point, you can check the schema of your Clickhouse table by running:
id = datasource.get('id')
k.entities.get_datasource_schema(datasource_id=id)
This helps you control the current status of partitions and primary and order keys.
The primary keys:
KAWA will always use the primary key set as the order key set in Clickhouse. This means that the data will be sorted by each of the primary key, in THAT order. In the first example below: the data will be sorted first by date, then by portfolio, and then by record id.
This will result in higher performances for queries with filters on the date column. The further down a column is, the less significant the speedup will be by filtering on that column.
[‘date’,‘portfolio’,‘record_id’]
If a data source contains a natural hierarchy, putting the hierarchy in the correct order is also a good idea: [‘region’,‘country’,‘city’,‘record_id’]
⚠️ To avoid losing data, THE REAL PRIMARY KEY MUST ALWAYS BE IN THE PROVIDED LIST.The partition key:
This will tell clickhouse how to partition the data on the hard drive. It will improve performance for ingestion and select queries. Please refer to this link for more details about partitions:
https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/custom-partitioning-key
Setting a partition key will also make it possible to remove old / unwanted partitions through the KAWA API.
This command can take several minutes to run (it depends on the size of the data - data will be copied over to the newly configured table), do not interrupt it or attempt to run it multiple times. It is strongly recommended to use it outside of business hours.
# Command to set a new set of primary keys and a partition
cmd = k.commands
cmd.replace_datasource_primary_keys(
datasource=datasource,
new_primary_keys=['date','record_id'],
partition_key='date')
Setting new primary keys can result in the loss of data. When you use this command, make sure that the set of primary keys reflect the unicity of your records.
KAWA uses the ReplacingMergeTree storage engine for most of its tables. https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
3.f) Removing data from a datasource (advanced)
The only way to delete data from KAWA is by removing partitions. Before using this command, please refer to the previous paragraph to setup a date partition.
# Command to drop a partition of data
cmd = k.commands
cmd.drop_date_partition(
datasource=datasource,
date_partition=date(2024, 1, 1)
)
4. KAWA Data Frame Computation API
Log into KAWA using your API key:
>>> k = KawaClient(kawa_api_url='https://<YOUR URL>')
>>> k.set_api_key(api_key_file='kawa.key')
>>> k.set_active_workspace_id(workspace_id=1)
Create an instance of a KAWA Sheet:
>>> sheet = k.sheet(sheet_name='Orders')
Query the schema of the frame:
>>> sheet.schema()
{'Profit': 'integer', 'Client ID': 'text', 'Order date': 'date' }
This API currently supports the following types:
- integer
- decimal
- date
- date_time
- boolean
- text
4.a) Build a computation query
There is 2 ways to request data from KAWA, either by requesting a sheet and configuring all the filters, grouping, sort … in the request or by requesting an existing view in KAWA.
See examples below:
Request a view:
You can request the data from a KAWA view:
df = (
k.sheet()
.view_id(view_id=3118)
.limit(100000)
.compute()
)
You can override the default limit (300) with another number to load larger data sets.
The view id can be found in the last part of the URL:
Request a widget:
You can request the data from a KAWA widget from within a dashboard:
df = k.widget(dashboard_name='DSH1', widget_name='Grid').limit(10).compute()
The dashboard name and the widget name can be found as shown below. The computations on dashboards will take in account the dashboard filters.
Request by sheet: In order to build a query, use the sheet method. It takes to arguments: - The name of the sheet (A sheet with that name has to exist into your current workspace) - (Optional) A timezone from which to do the computation in. It will default to your current timezone. ℹ️ The query will not be executed as long as a computation operator is not called.
>>> query = (
k.sheet(sheet_name='Orders', force_tz='Asia/Tokyo')
.group_by('Order date')
.sample(sampler='YEAR_AND_MONTH')
.agg(k.col('Profit').sum())
.filter(k.col('Profit').lt(100).gt(0)))
.filter(k.col('Profit').sum().lt(1000))
.limit(1000))
The computation operators:
In order to perform the computation, use the following operators:
compute | Retrieve the result inside a pandas data frame.
Computes needs:
- If the frame is grouped (presence of the group_by operator): the agg operator
- If the frame is not grouped: the select operator. |
This will put in the df data frame the result of the query. The query will be computed on KAWA’s server, using the API Token to authenticate the user and apply the relevant Row Level Security policies.
>>> df = query.compute()
The expression operators:
group_by | Group the frame by the indicated column name |
sample | Define the sampler to apply to the grouping column
[date]:
WEEK, MONTH, QUARTER, SEMESTER, YEAR, YEAR_AND_WEEK, YEAR_AND_MONTH, YEAR_AND_QUARTER, YEAR_AND_SEMESTER, DAY_OF_YEAR, DAY_OF_WEEK
[date_time]:
(Computed in your local time zone)
Same as for date plus: DAY, TWELVE_HOURS, SIX_HOURS, HOUR, THIRTY_MINUTES, FIFTEEN_MINUTES, TEN_MINUTES, FIVE_MINUTES, MINUTE, THIRTY_SECONDS
[decimal and integer]:
FIXED_NUMBER_OF_BINS (with extra argument: how_many_buckets )
LIST_OF_BINS (with extra argument: buckets )
FIXED_SIZE_BINS (with extra argument: bucket_size )
For example:
sample(sampler=’FIXED_SIZE_BINS’, bucket_size=30) Will create buckets of 30 numbers. |
agg | Define the aggregation functions for all the columns.
Accepts a list of column aggregates.
Only meaningful if the frame has the group_by operation.
Example:
agg( k.col(’Profit’).sum(), k.col(’Quantity’).avg()) |
filter | Define how the data will be filtered.
Accepts a column filter, cf the column api |
select | Defines what columns to retrieve.
Accepts a list of column names. For example: select(’Country’,’Date)
Also accepts column objects:
select( k.col(’Profit’), k.col(’Quantity’) ) |
limit | Will add a limit to the number or returned rows. by default, the limit is 100. |
The column operators:
This API lets you build expressions based on individual or multiple columns.
col | Loads a column object, by its name.
You can use the lf.schema() on a lazy data frame to load the list of all the columns. |
cols | Loads multiple columns based on a regular expression:
k.cols('Measure.*') All the columns whose name starts with Measure.
k.cols() All the columns of the sheet.
This operator can be used in the select and the agg statements. |
aggregate | Defines a column aggregate. (Will be meaningful only if the lazy frame contains a group_by clause)
The list of available aggregates are:
(all aggregations can be written as col(’A’).aggregate(’sum’) or col(’A’).sum() )
[decimal and integer]:
- sum
- avg, median
- min, max. min_abs, max_abs
- var_sample, var_pop, std_dev_sample, std_dev_pop
- lowest_decile, lowest_quartile, highest_decile, highest_quartile
[text]:
- count, count_unique, percent_filled, percent_empty, count_empty
- identical, identical_ignore_empty
-
[date and date_time]:
- min, max
- identical
- count_unique |
filter | Defines a column filter. Can be built out of a row value or aggregated column:
All profits greater than 0: k.col('Profit').gt(0)
All groups whose profit adds up to less than 1000: k.col('Profit').sum().lt(1000)
Filters can be chained on a given column, this will result in a ‘OR’ operation.
All the profits smaller than 0 or above 5: k.col('Profit').sum().lt(0).gt(5)
⚠️ The chaining cannot be done with the in_list operator or date / date time filters
If you apply multiple filters, the ‘AND’ operation will be applied.
query = (
k.lazy_frame(sheet_name='Orders')
.group_by('Order date')
.sample(sampler='YEAR_AND_MONTH')
.sort('Order date', direction='ASCENDING')
.agg(k.col('Profit').sum())
.filter(k.col('Profit').lt(0).gt(100)))
.filter(k.col('Profit').sum().lt(1000))
)
In this example, the filter will be:
All negative profits or profits greater than 100 AND all dates for which the global profit is lesser than 1000.
They can be reversed using the exclude() operator.
For example to keep all the profit whose sum is different from 0:
k.col(’Profit’).sum().eq(0).exclude()
Another example:
k.col(’Profit’).sum().eq(0).gte(100).exclude()
All profits whose sum is not 0 and is lesser than 100.
(The exclusion happens on the result of the OR operation)
[decimal and integer]:
lt(number) Strictly lesser than a number
gt(number) Strictly greater than a number
lte(number) Lesser or equal to a number
gte(number) Greater or equal to a number
eq(number) Equals a number
ne(number) Different from a number
empty() Empty numbers
not_empty() Non empty numbers
[text]:
All text filters are case insensitive.
in_list(list_of_values) Keeps only the values that are in the list. For example: k.col('Country').in_list(’GB’,’US’)
starts_with(prefix) Starts with the specified prefix
ends_with(suffix) Ends with the specified suffix
contains(substring) Contains a substring
empty() Empty texts
not_empty() Non empty texts
does_not_end_with() Does not end with a given suffix
does_start_with() Does not start with a given prefix
does_not_contain() Does not contain a substring
[date]:
Date filters operate on ranges of datetime.date objects.
For example: col('Order Date').date_range(from_inclusive=date(2023,1,1), to_inclusive=date.today())
(This is a YTD filter on the year 2023)
Both boundaries are optional.
yoy_ytd() This filter applies a year on year, year to date filter on the selected column
Additionally, you can add the weekdays_only() operator to any date filter to filter out weekends.
For example: col('Order Date').date_range(date(2023,1,1), date.today()).weekdays_ony()
Will be a YTD filter with weekdays only.
[date time]:
Date time filters operate on ranges on datetime objects.
For example: col('TS').datetime_range(from_inclusive=datetime(2023,1,1), to_inclusive=date.timetoday())
A YTD filter on date time column: TS
|
Example script:
from kywy.client.kawa_client import KawaClient
from datetime import date, datetime
k = KawaClient(kawa_api_url='https://<YOUR URL>')
k.set_api_key(api_key_file='<PATH TO THE KEY FILE>')
k.set_active_workspace_id(workspace_id=1)
print(k.sheet(sheet_name='orders').schema())
by_state = (
k.sheet(sheet_name='orders', force_tz='Asia/Tokyo')
.group_by('State')
.agg(k.col('Profit').sum(), k.col('State').identical())
)
flat = (
k.sheet(sheet_name='orders')
.filter(k.col('Order date').yoy_ytd().weekdays_only())
.filter(k.col('State').contains('Kentucky').empty().exclude())
.filter(k.col('State').identical().contains('Kentucky'))
.select('Order ID', 'Profit', 'State', 'Order Date')
.limit(100 * 1000)
)
df1 = flat.compute().sort_values(by=['Order Date'], ascending=False)
df2 = by_state.compute().sort_values(by=['Order Date'], ascending=False)
Request by view: You can find the view ID in the URL
from kywy.client.kawa_client import KawaClient
from datetime import date, datetime
k = KawaClient(kawa_api_url='https://<YOUR URL>')
k.set_api_key(api_key_file='<PATH TO THE KEY FILE>')
k.set_active_workspace_id(workspace_id=1)
data = (
k.sheet()
.view_id(3118)
.limit(100 * 1000)
)
df = data.compute()