Time Series¶
The Time Series component of the Genomcore API allows you to store and query time-series observations.
Get more information about the Time Series API here
Create¶
Create time-series observations. A time-series body must be a list of dictionaries with the following structure:
[
{
"meta": {
"userId": int,
"source": str,
"metric": str,
"externalId": str,
"batch": str
},
"point": {
"start": str (ISO 8601 datetime),
"end": str (ISO 8601 datetime),
"value": int
}
},
...
]
The following code starts the sdk, the logging module and loads an .env file. Reads a putative csv file with the necessary columns. Construct the list of dictionaries required and send the observations to the time series API in chunks of 300 observations. The code will retry the creation of a chunk up to 3 times in case of failure:
import csv
import logging
import os
from dotenv import load_dotenv
from genomcore.client import GenomcoreApiClient
load_dotenv(override=True)
logging.basicConfig(
level="INFO",
format="[%(asctime)s][%(levelname)s][%(name)s] -- %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
with open(input_csv) as csv_file:
csv_reader = csv.DictReader(csv_file)
data_list = []
for row in csv_reader:
data_list.append({
'meta': {
'userId': int(row['meta.userId']),
'source': row['meta.source'],
'metric': row['meta.metric'],
'externalId': row['meta.externalId'],
'batch': row['meta.batch'],
},
'point': {
'start': row['point.start'],
'end': row['point.end'],
'value': int(row['point.value']),
}
})
api = GenomcoreApiClient(
token=os.getenv("TOKEN"),
refresh_token=os.getenv("REFRESH_TOKEN")
)
responses = api.time_series.create_time_series(
body=data_list,
chunksize=300,
max_retries=3
)
Warning
The method only checks if the batch value is None or an empty string. It is the user responsibility to ensure that the batch value is unique for each group of observations.
Tip
The method will automatically detect if the total number of observations is greater than the chunk size and will upload them accordingly in chunks of the specified size. Or wihout chunks if the total number of observations is less than the chunk size.
In case of a chunked upload, if The parameter chunk_details is set to True, the method will log some information about the first and last observation of the chunk.
The parameter max_retries is deactivated (defaults to 0). The user can specify the number of retries in case of connection failure when uploading a chunk of observations.
Query¶
This example queries the time-series observations of userId “1” in the batch “TestBatch” between the dates 2024-04-01 and 2024-04-08 and sorts them by timestamp in descending order.
More information about MongoDB filters can be found here.
from genomcore.client import GenomcoreApiClient
api = GenomcoreApiClient(token="A_VALID_TOKEN", refresh_token="A_VALID_REFRESH_TOKEN")
body = {
"filter": {
"meta.userId": {"$in": [1]},
"meta.batch": {"$in": ["TestBatch"]}
},
"sort": {
"timestamp": -1
},
"beforeDate": "2024-04-01",
"afterDate": "2024-04-08"
}
api.time_series.query_time_series(page = 0, pageSize = 500, body = body)
Note
The request is paginated with a page size of 500. The response will contain the first 500 observations sorted by timestamp in descending order.
Delete¶
This example deletes the time-series observations of userId “1” in the batch “TestBatch” and metric “TestMetric”.
from genomcore.client import GenomcoreApiClient
api = GenomcoreApiClient(token="A_VALID_TOKEN", refresh_token="A_VALID_REFRESH_TOKEN")
body = {
'userId': 1,
'batch': "TestBatch",
'metric': "TestMetric"
}
api.time_series.delete_time_series(body = body)
Note
The only tags you can use to filter the observations for the deletion are userId, batch and metric.
Warning
If the connection is closed before receiving the response, it could be due to a large deletion that takes longer than usual. The server might close the connection but still process the request successfully. Please verify if the data points were deleted correctly.