Skip to content

Data Fusion Job

The Data Fusion Job performs adjustment of satellite rainfall data with measurement from in-situ time series data from rain gauge stations.

Approach and method

The Data Fusion job calculates the point residual errors between satellite data and observed rain gauge data, then corrects the satellite data with a smoothed and distance weighted error residual field interpolated from the point residual errors.

Method:

  1. Calculate point estimate of residual error in the satellite data at each rain gauge point
  2. Calculate satellite error residuals at all grid points of the satellite data, using a Gaussian kernal to smooth the point residuals from the gauge locations. The degree of smoothing is defined by the kernal bandwidth
  3. Estimate the distance weighting functions for each gauged location using a Gaussian weighting function, the weights decay with distance from the gauges from 1 towards 0 putting more weight on the estimated error residuals closer to the gauging stations (the weight is 1 at the rain gauge where the satellite image is corrected to the gauge estimate). The decay of the weights is determined by the bandwidth of the Gaussian weighting function.
  4. Correct the satellite rainfall data by subtracting the distance weighted error residual from the satellite data.
  5. Update no rain areas so that satellite grid points with no rain in the original satellite data are set to zero rainfall in the corrected data maintaining the original satellite no rain distribution.
  6. The result at a point should fall between the original satellite data and the observed gauged data (for the same time series, where there is observed rain gauge data).

Note - If there is 'no data' at a rain gauging station then this station is not used and correction is based only on the other stations.

Data Fusion vs Bias Correction

The Data Fusion Job blends the satellite and rainfall gauge station datasets at each time step and is therefore different to bias correction which is based on a statistical property present in the dataset. In the Data Fusion Job, the error calculated between the satellite and rainfall gauge station data is different each time step. If this error was averaged over a long time period, then this would be the bias.

Parameters

Parameters of the data fusion job can be provided in a json file with entries summarized in the table below.

Name Required Type Description
catchmentShapefile string Shapefile of model catchments in WGS84 coordinate system.
catchmentIdField Enum/string Name of an integer identifier field in catchmenShapefile.
rainGaugeStations string Shapefile of rain gauge locations in WGS84 coordinate system.
gaugeStationIdField string Name of an integer identifier field in rainGaugeStations.
rainfallGaugeDataDirectory string Directory with rainfall gauge (time series) data as CSV files like <station identifier>_<text>.csv, a header like Dates,Sambor,Flag, and each row as <day>/<month>/<year>,<value>,<flag>, e.g. 01/01/1985,0,E.
satelliteDataFiles array[string] list of netcdf files to use as satellite data input. Either satelliteDataFiles or satelliteDataset must be specified.
satelliteDataset object projectId, datasetId, and apiKey of the dataset to use for satellite data input. Optionally also wktExtent of area of interest as WKT Polygon in WGS84 coordinate system. if wktExtent is not provided, extent of catchmentShapefile and rainGaugeStations will be used.
satelliteDataItemName string Name of the item to use from the satellite data. All satellite data inputs must have this item. e.g. 'precip' for CHIRPS rainfall data.
analysisStartTime string Analysis start time in the format '%Y-%m-%d GMT'
analysisEndTime string Analysis end time in the format '%Y-%m-%d GMT'.
rainfallPercentageLimit string Optional. If satellite rain cover percentage is less than rainfallPercentageLimit*100.0 % then assume no rain. Default is 0.05
minimalPrecipitation float Optional. If satellite precipitation is less than minimalPrecipitation mm then assume no rain in satellite data. Default is 0.1
correlationTarget float Optional. Correlation target. Lower correlation target means the calculated bandwith based on Moran’s 1 statistic will be used more often. Default is 0.6.
defaultBdwdth integer Distance in m of distance weighting. If set to negative value, bandwidth will be calculated from data based on Moran’s 1 statistic. Default is 100000. To use bootstrapping, set this parameter to negative number.
bGaugeAdap integer Distance in m of smoothing. Can calculate using bootstrapping method or set manually. Set to lower value if you want less smoothing. Default is 100000. Set to negative number for bootstrapping.
disaggregateFactor integer Controls the resolution of raster brick going into the residual smoothing. If the satellite data is coarse, consider setting disaggregateFactor to 2 or more for finer resolution. Default is 1 for no disaggregation.
projString string Proj string for the local coordinate system for processing. See coordinate system service or https://epsg.io
compositeInformationPlotTimeIndexes array[integer] Indexes of time steps to produce composite information plots for.
originalVsAdjustedTimeIndex array[integer] Indexes of time steps to produce comparison of original and adjusted data plots.
plotHeight string Optional height of produced plots in pixels, default is 760.
plotWidth string Optional width of produced plots in pixels, default is 1024.
satelliteDateSeriesFormat string Optional. Format of time step definition as read by the R brick function. Overriding this parameter could be useful if the satellite data time step is finer than daily. Default is "%Y.%m.%d".

Kernal smoothing (bGaugeAdap parameter)

Kernal bandwidth - a larger bandwidth (e.g. 15 000 m) gives a smoother residual field than a smaller bandwidth (e.g. 5 000 m), it reduces the bias but also reduces the variance. If you have gauges close together then it will take into account multiple gauges (because smoother/wider curves) so the adjusted satellite data is unlikely to match the observed rain gauge data at each gauge location unless you use a smaller bandwidth value. A very low value tends to show an abrupt change in the adjusted satellite data from one adjustment area (around a rain gauge) to another, so you many want to increase the value for a smoother adjustment field. You can choose to estimate the bandwidth using a bootstrapping method which is a good choice for sparse rain gauge networks, or you can estimate the bandwidth manually by visually evaluating the results (in the plot_composit_information_i output plot, plot number 4 'raster gauge smooth'). The default is to estimate manually, with a default value of 10 000 m.

If you choose to estimate using a bootstrapping method, this can take a long time. But as the bandwidth selection does not depend on the data values, it need only be estimated once unless the ground station network changes (so don't need to re-run the bootstrapping method each time you run the script, you can manually enter the output from the bootstrapping method as the parameter value).

Distance weighting function bandwidth (cutting kernal) (defaultBwdth parameter)

The distance weighting function bandwidth is 1 at the rain gauges meaning that the full error residual is subtracted from the satellite data, and the weight decreases with increasing distance from the gauges with a corresponding smaller correction to the satellite data. At far away distances the weight is 0 and there is no correction to the satellite data.

It's hard to estimate a bandwidth because the error residuals are only known at the gauge location points. If the gauge network is dense one could obtain information of how the error residual decay with distance. If the values being fusioned are accumulative and not instantaneous in nature the time step should also be considered as the spatial correlation of the errors often increase with time step. If the bandwidth is set to a very high number then the whole satellite data field will be adjusted with interpolation between the gauges and no distance weighting from the gauges. When using the Job to calculate the Mean Area Rainfall (weighted rainfall) for each model catchment, it is preferable to set the bandwidth to a high value so that more of the catchment area gets corrected for the weighted average. You can estimate an appropriate value from looking at the Job output plots (in the plot_composit_information_i output plot, plot number 3 ‘cutting kernal’). The default is to manually choose the value (defaultBwdth parameters), with a default value of 100 000 m.

Alternatively you can set the value to a negative value and this will mean that the job calculates the bandwidth from the data. To estimate the bandwidth from the data, the job uses a global spatial autocorrelation function (based in Moran's I statistic) of the satellite data to approximate the correlation of the unknown error residual field. Moran's I is a cross-correlation estimate between the values in the satellite data spatial field as a function of distance. The distance where Moran's I is 0.6 is used as the kernal bandwidth. If the spatial correlation is very low then a localized correction is made. If the spatial correlation is beyond a threshold then the default bandwidth is used instead of the estimated result using Moran's 1 statistic.

Gauge interpolation mode

By setting the smoothing kernel very small and the distance bandwidth very large the final rain field will result in the thiessen-weighted values from the rain gauges. But the no-rain areas from the satellite observations are still transferred to the calculated rain field. We therefore use observed rainfall from gauges alone and only the spatial coverage of the rainfall from the satellite

Outputs

Name Type Description
SatAdjusted_TS_point.csv CSV CSV time series of adjusted satellite data extracted at location of gauge stations.
SatOrig_TS_point.csv CSV CSV time series of original satellite data extracted at location of gauge stations .
SatAdjusted_MAR_daily.csv CSV Mean annual rainfall TS for each model catchment calculated from adjusted satellite data.
SatOrig_MAR.csv CSV Mean annual rainfall TS for each model catchment calculated from original satellite data.
adjusted_satellite_data.nc NetCDF Gridded satellite data adjusted by measurements from rain gauges.
distance_weighted_bandwith.png PNG Plot of distance weighting bandwidth (distance m) for each timestep (index in days) as calculated based on when Moran's I spatial correlation of satellite data is 0.6, or default bandwidth.
plot_composit_information_i.png PNG Plot of 5 charts of (1) raster distance (kernal bandwidth for smoothing, constant for each time step, based on the gauge locations from the bootstrap method or default value); (2) zone smoother; (3) cutting kernel (distance weighting bandwidth, default value or calculated based on Moran’s I and changes each timestep depending on the spatial correlation); (4) raster gauge smooth (smoothed error residual field); (5) raster adjusted merged for time step index i.
plot_original_vs_adjusted_i.png PNG Plot of original and adjusted satellite raster data (mm/day) side by side for the time steps (days) specified e.g. [1,2,3,4,5] = each day from day 1 to 5. For time step index i.
plot_average_annual_rainfall.png PNG Plot of average annual rainfall of adjusted raster.
plot_adjusted_data.png PNG Plot of adjusted (corrected) satellite data.
plot_original_data.png PNG Plot of original satellite data.

Example:

Parameters file with satellite data in netcdf files.

{
  "catchmentShapefile": "catchments/wshed_v5_LatLong.shp",
  "modelCatchmentsList": ["catchments/Gauge_catchmentsWGS84.shp"],
  "modelCatchmentsIdField": "GaugeSt",
  "rainGaugeStations":"raingauge/RainfallStations_globalProj_v1.shp",
  "gaugeStationIdField":"StationID",
  "rainfallGaugeDataDirectory": "raingauge/Rainfall/Extended 2017_2019",
  "satelliteDataFiles": [
    "SatelliteInputData/Rainfall_(CHIRPS)_20000101_to_20200531_Cambodia.nc",
    "SatelliteInputData/Rainfall_(CHIRPS)_20000101_to_20200531_Vietnam.nc",
    "SatelliteInputData/Rainfall_(CHIRPS)_20000101_to_20200531_Laos.nc"
  ],
  "satelliteDataItemName": "precip",
  "catchmentIdField": "Id",
  "analysisStartTime": "2017-01-01 GMT",
  "analysisEndTime": "2019-12-31 GMT",
  "projString": "+proj=utm +zone=48 +datum=WGS84 +units=m +no_defs",
  "rainfallPercentageLimit": 0.05,
  "minimalPrecipitation": 0.1,
  "correlationTarget": 0.6,
  "defaultBdwdth": 100000,
  "bGaugeAdap": 100000,
}
Parameters file with satellite data in a remote dataset.

{
  "catchmentShapefile": "catchments/wshed_v5_LatLong.shp",
  "modelCatchmentsList": ["catchments/Gauge_catchmentsWGS84.shp"],
  "modelCatchmentsIdField": "GaugeSt",
  "rainGaugeStations":"raingauge/RainfallStations_globalProj_v1.shp",
  "gaugeStationIdField":"StationID",
  "rainfallGaugeDataDirectory": "raingauge/Rainfall/Extended 2017_2019",
  "satelliteDataset": {
    "apiKey": "<api key>",
    "projectId": "<project id>",
    "datasetId": "<dataset id>",
    "wktExtent": "POLYGON((99.82707955191717 22.77596345419172,109.97844673941717 22.77596345419172,109.97844673941717 8.124257173047255,99.82707955191717 8.124257173047255,99.82707955191717 22.77596345419172))"
  },
  "satelliteDataItemName": "precip",
  "catchmentIdField": "Id",
  "analysisStartTime": "2017-01-01 GMT",
  "analysisEndTime": "2019-12-31 GMT",
  "projString": "+proj=utm +zone=48 +datum=WGS84 +units=m +no_defs",
  "rainfallPercentageLimit": 0.05,
  "minimalPrecipitation": 0.1,
  "correlationTarget": 0.6,
  "defaultBdwdth": 100000,
  "bGaugeAdap": 100000,
  "plotHeight": 760,
  "plotWidth": 1024
}

Executing data fusion job

The data fusion job can be executed in multiple ways, as any other job: - Using REST API - Using DHI.Platform.SDK dotnet SDK - Using dhi-platform Python SDK

Here we provide a sample using REST API and Python SDK. As a prerequisite, you need to upload your inputs and parameters json file into a project in the MIKE Cloud Platform. The Python sample includes this preparation of a new project with all input data in the MIKE Cloud Platform.

In all cases, you need to provide the right command to execute, and the right docker image to run. The latest docker image for the data fusion job is:

dhiacrdev.azurecr.io/datafusion/datafusion:20230328.3-60712-bias-correction

REST API

Make a POST request to api.mike-cloud.com/api/process/job with the following headers and body. It expects that all input data are already loaded into a project in MIKE Cloud Platform, results will be stored back in the project in subproject called results_chirps.

'api-version': 3
'dhi-service-id': 'job'
'dhi-project-id': <your project id>

{
    "Runtime": {
        "type": "ContainerRuntimeSpec",
        "Containers": [
            {
                "Image": "dhiacrdev.azurecr.io/datafusion/datafusion:20230328.3-60712-bias-correction",
                "Command": ["/bin/bash", "-c", "cp * /work/; cd /work; mkdir results; Rscript main.R '/work' 'parameters.chirps.json'"]
            }
        ]
    },
    "InputData": {
        "Locations": [
            {
                "type": "PlatformInputLocation",
                "ProjectId": "<your project id>",
                "LocalPath": ""
            }
        ]
    },
    "OutputData": {
        "Locations": [
            {
                "type": "PlatformOutputLocation",
                "LocalPath": "results",
                "RelativePlatformPath": "results_chirps"
            }
        ]
    }
}

Python SDK

Let's say you have all necessary input data including the parameters json file in a local folder c:/data/myproject.

First, create a new project and load the data into it:

from dhi.platform.authentication import ApiKeyIdentity, InteractiveIdentity
from dhi.platform import metadata, transfer

key = "<open api key with write access to the project>"
identity = ApiKeyIdentity(customer_id=None, apikey=key, environment='prod')
metadata = metadata.MetadataClient(identity=identity)
transfer = transfer.TransferClient(identity=identity)

data_folder = r"c:\data\myproject"
projectInput = metadata.CreateProjectInput("DataFusionMyProject")
project = metadata.create_project(projectInput)
uploaded = transfer.upload_folder_to_project(data_folder, project.id)

Run the Data Fusion Job with your project as input and store the results of the job back in the project is a subproject called 'results123'. The job can run quite a long time, but we also don't want to run it indefinitely, so we select a timeout period of 240 minutes for our run. We also select to cancel the job if it times out. We could set some more advanced parameters, like how do we want to report progress of the job.

from dhi.platform.generated.jobgen import ContainerInputV1, ContainerRuntimeSpecV1, InputDataSpecV1, JobDefinitionInputV1, OutputDataSpecV1, PlatformInputLocationV1, PlatformOutputLocationV1
from dhi.platform import job

jobclient = job.JobClient(identity=identity)

containers = [
    ContainerInputV1(
        image="dhiacrdev.azurecr.io/datafusion/datafusion:20230328.3-60712-bias-correction",
        command=["/bin/bash", "-c", "cp * /work/; cd /work; mkdir results; Rscript main.R '/work' 'parameters.chirps.json'"]
    )
]

runtime_spec = ContainerRuntimeSpecV1(containers)
input_data = InputDataSpecV1([PlatformInputLocationV1(localPath="", projectId=project.id)])
output_data = OutputDataSpecV1([PlatformOutputLocationV1(localPath="results", relativePlatformPath="results123")])

input = JobDefinitionInputV1(runtime_spec, input_data, outputData=output_data)

job = jobclient.execute_job_and_wait(project.id, input, timeout_minutes = 240, cancel_job_on_timeout=True)

Finally, download the results from MIKE Cloud Platform to local folder

import os
results_folder = os.path.join(data_folder, 'results')
os.mkdir(results_folder)
downloaded = transfer.download_project_to_folder(project.id, results_folder)