Skip to content

Overview

Time series service in the platform provides storage for time series data organized in TimeSeriesDataset. TimeSeriesDataset contains collection of ITimeSeries which represent individual time series. There are multiple implementations of ITimeSeries interface to deal with differences between time series definition, time series with data, and time series with schema definition. All time series in a dataset have the same TimeSerieProperty that define name and type of properties used across all time series in the dataset. Each time series in a dataset also includes DataFields, which define the data type for the time series data - item and flags, represented by TimeSeriesDataFrame. TimeSeriesDataFrame is a table-like structure that includes time series data - date time values, the item (the variable of interest), and a number of flags.

Item ITimeSeriesItem in the time series service inherits from Platform general model IItem and includes:

  • TimeSeriesType (of type DHI.Services.TimeSeries.TimeSeriesDataType)
  • Quantity
  • Type (as in data type)
  • Name

Compatibility with Domain Services

Internal structures used in the platform Time series service are different from Domain Services, but interoperability at C# level can be achieved using the .AsTimeSeriesData() extension method of TimeSeriesDataFrame class. At web api level the json serialization also matches the format used by Domain Services with minor deviations:

  1. platform time series can contain more than one flag;
  2. the time series is wrapped in response object {"data": [...]}.

Loading of big time series data

In addition to a web app the platform provides times series readers and writers. These can be used to load large sets of time series from files into the time series storage using the platform transfer pipeline. This is useful for example when loading data from files exported from existing 3rd party system into the platform. Current implementation supports loading for these data formats:

Format Description
DFS0 DFS0 files that are widely used with MIKE Powered by DHI software
CSV - DIMS CSV files exported from DIMS as described in Data format for time series pushed into MIKE Data Admin
CSV - GHM CSV files with many time series in one file and custom time series ids in the file header. Data are then stored in a format optimized for high number of time series with similar time steps.

Keep in mind that using existing readers and writes and the transfer pipeline should always be preferred option when loading large volumes of data. If appropriate reader does not exist, you can try to write one yourself (see Conversion-pipeline-and-Transfers/Developer-guidelines/) or reach out the Platform team to assist you.

Loading time series data when no appropriate reader exists

In situations when writing a new reader is not practicable, it is possible to load the data using the REST interface. This Python script illustrates loading of long time series into the Timeseries service from many CSV files.

There were around 60 000 CSV files with 6 time series in each file. All time series had around 200 000 time steps. Time steps were the same across all time series in a file. The CSV files came zipped in around 15 zip files.

Several multiprocessing options were tried. Eventually, the best option found was to parallelize the upload_data_frame_in_chunks function. This function processes a 200 000 step data frame. It splits the data frame into smaller data frames and uploads each serially. This allowed uploading multiple time series in parallel, but all other processing was done serially. In this particular case, the loading still took considerable time (around 10 days), but without multiprocessing, it would take much longer. An option to parallelize processing of multiple zip archives was not successful in this case, but it proved useful when simple uploads of CSVs from zip files was necessary.

Finding the appropriate level of parallelization for your specific case may take a few experiments and depends on network latency and service performance, but also on your capability to track and monitor all parallel process. It is important to build in retry mechanisms, chunking, and/or small delays on your side of the code to distribute the load on the Platform. It is also recommended to write idempotent functions (i.e. f(f(x)) = f(x)) when possible. That is so they can be executed multiple times without any undesired side effects.

Click to show illustration of loading data using multiprocessing in Python

import time
from dhi.platform.authentication import ApiKeyIdentity
from dhi.platform.commonmodels import AttributeDataType, ItemDefinition, TimeSeriesDataType
from dhi.platform.timeseries import TimeSeriesClient
from datetime import datetime
import os
import time
import pandas
from zipfile import ZipFile
from multiprocessing import Pool
from collections import defaultdict


identity = ApiKeyIdentity(customer_id="", environment="dev", apikey="12345123-0000-0000-0000-123451234512")
timeseries_client = TimeSeriesClient(identity=identity, environment="dev")

def _get_value(d, i):
    """Sanitize value from a string file"""
    s = d[i]
    return None if s == '' or s == 'inf' else float(s)


def csv_to_data(lines):
    """Transforms a list of string lines into a pandas DataFrame"""
    data_lines = lines[1:]
    d_WD = []
    d_WS = []
    d_TP = []
    dates = []
    for line in data_lines:
        d = [i for i in line.strip().split(';')]
        t = datetime.strptime(d[0], '%Y-%m-%d %H:%M') if len(d[0]) == 16 else datetime.strptime(d[0], '%Y-%m-%d %H:%M:%S')
        d_WD.append(_get_value(d, 1))
        d_WS.append(_get_value(d, 2))
        d_TP.append(_get_value(d, 3))
        dates.append(t)
    return {
        'WD': pandas.DataFrame(d_WD, index=dates),
        'WS': pandas.DataFrame(d_WS, index=dates),
        'TP': pandas.DataFrame(d_TP, index=dates),
    }


def zip_file_names(zip_path, extension = None):
    """List names of files in a zip archive. If extension is specified, lists only files with that extension."""
    files = []
    with ZipFile(zip_path) as zf:
        for file in zf.namelist():
            if extension is not None and not file.endswith(extension):
                continue
            files.append(file)
    return files


def zip_to_lines(zip_path, filename):
    """Reads a file from zip archive into a list of string lines"""
    lines = []
    with ZipFile(zip_path) as zf:
        with zf.open(filename) as f:
            for line in f:
                lines.append(line.rstrip(b'\r\n').decode('utf-8'))
    return lines

def upload_data_frame_in_chunks(df, project_id, dataset_id, time_series_id, maxlength=10000, delay=0.5):
    """Uploads a time series in the MIKE Cloud Platform."""
    rowcount = len(df)
    chunks = []
    added_chunks = []
    if(rowcount <= maxlength):
        chunks.append(df)
    else:
        breaks = range(0, rowcount, maxlength)
        n = maxlength
        chunks = [df[i:i+n] for i in breaks[:-1]]
        chunks.append(df[breaks[-1]:rowcount])

    for chunk in chunks:
        time.sleep(delay)
        added_chunk = timeseries_client.add_timeseries_values(project_id, dataset_id, time_series_id, chunk)
        added_chunks.append(added_chunk)
    return sum(added_chunks)


def process_block(project_id, csv_zip_path, ts_log, dataset_id):
    """Reads CSV files from a single .zip archive and prepares a list of arguments to be processed by another function in parallel"""

    csvs = [c for c in zip_file_names(csv_zip_path, '_ts.csv')]
    items = [
        ItemDefinition("WD", "eumUUnitUndefined", "eumIItemUndefined", AttributeDataType.SINGLE,TimeSeriesDataType.INSTANTANEOUS),
        ItemDefinition("WS", "eumUUnitUndefined", "eumIItemUndefined", AttributeDataType.SINGLE,TimeSeriesDataType.INSTANTANEOUS),
        ItemDefinition("TP", "eumUUnitUndefined", "eumIItemUndefined", AttributeDataType.SINGLE,TimeSeriesDataType.INSTANTANEOUS),
    ]

    item_count = len(items)

    existing_time_series = [t.id for t in timeseries_client.list_timeseries(project_id, dataset_id)]
    existing_ts_counts = defaultdict(int)
    existing_ts_stations = (t.split('_')[0] for t in existing_time_series)
    for s in existing_ts_stations:
        existing_ts_counts[s] +=1

    max_chunk_size = 20 * 1000
    counter = 1
    for csv in csvs:
        station_id = csv.split('_')[0]
        if existing_ts_counts[station_id] == item_count:
            print(f"Skipping {station_id} because it was done")
            counter += 1
            continue
        lines = zip_to_lines(csv_zip_path, csv)
        dfs = csv_to_data(lines)
        arguments = []
        for item in items:
            ts_id = station_id + "_" + item.name
            if ts_id in existing_time_series:
                timeseries_client.delete_timeseries(project_id, dataset_id, ts_id)
                time.sleep(1.0)

            ts_def = timeseries_client.add_timeseries_with_id(project_id, dataset_id, ts_id, item)
            df = dfs[item.name][0]
            arguments.append((df, project_id, dataset_id, ts_id, max_chunk_size))

        pool = Pool(processes=8)
        pool.map(upload_data_frame_in_chunks, arguments)

        with open(ts_log, 'a') as lg:
            lg.write(f"{csv};{station_id};item.name;{1}\n")
        counter += 1

def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def import_ts_in_parallel(project_id, zip_blocks_path, log_directory, ts_dataset_id):
    """Lists zip files in a folder and prepares a list of arguments to process by another function."""
    blocks = [os.path.join(zip_blocks_path, i) for i in os.listdir(zip_blocks_path)]
    arguments = []
    for b in blocks:
        log_name = os.path.basename(b).split('.')[0] + '.txt'
        log_path = os.path.join(log_directory, log_name)
        arguments.append((project_id, b, log_path, ts_dataset_id))

    for a in arguments:
        process_block(*a, verbose=True)

def main():
    """Creates a time series dataset and calls another function to process the data"""
    blocks_path = "C:/data"
    project_id = '12345123-0000-46a4-a59f-989c91842743'
    log_directory = 'c:/temp/'

    dataset = timeseries_client.create_timeseries_dataset_from_schema(project_id, f"My Timeseries", f"Big time series dataset.", tuple())

    import_ts_in_parallel(project_id, blocks_path, log_directory, dataset.id)


if __name__ == "__main__":
    main()

Limitations

  • One flag can have at most 15 distinct values.
  • Maximum difference between time steps is approximately 65 years, this is due to the type of compression used in the time series storage.
  • The service does not consider time zones and expects datetime in the format yyyy-mm-ddThh:mm:ss
  • TimeSeries datasets cannot store text (string) as a value or column value. The intention is to use flags instead - for instance, if you need to store text like 'raw', 'measured', 'simulated', define a numerical flag and record in timeseries properties what numerical flag value refers to the relevant string.