Data ingest workfow support

Some simple helper tools to manage data ingest


The basic workflow is:

(A) DataProvider: Announce availability of data (sets) via bash script or python interface (to the Ingest database)

(B) DataCenter: Query Ingest database and trigger download tasks

(C) DataCenter: execute tasks, update task states, redo broken tasks

(D) DataCenter: Monitor overall execution and initiate specific repair tasks e

(A) Data announcement

for details on DB setup and contact see: Database setup and query

Data providers use a script to notify data ready events to a central DB


cmip5_in -n /a/b/file -c 'md5checksum' : file /a/b/file is ready for ingest and has checksum ...

cmip5_in -n /a/b/dir -l experiment : directory /a/b/dir containing an exp. is ready ..

cmip5_up -n /a/b/file -c 'checksum' -s changed : file /a/b/file was changed and has new checksum

(B) Data query and (downlod) task submission

The data center query the DB and uses the results to parameterize the necessary (ingest) tasks e.g. for data download. The tasks are handed over and operated by a distributed task queue.


>> import db_helper
>> session = Session()
>> new_files = db_helper.query_new_since(last_time_stamp,session)
>> submit_tasks(my_task,new_files)

(C) Task execution and state tracking

Using the celery distributed task queue (configured with the rabbitmq AMQP message bus and a sqlalchemy result database backend), the following steps are necassary:

  • start cerleryd with events and beat enabled (in production worker nodes are already running on your servers, so this step can be skipped normally

    celeryd  -B -E -l INFO

you should see a the worker coming up with a concurrency level according to the number of cores of your machine:

[2010-11-17 15:54:37,365: WARNING/MainProcess] celery@albedo2 v2.1.3 is starting.
[2010-11-17 15:54:37,365: WARNING/MainProcess]
Configuration ->
 . broker -> amqp://guest@
 . queues ->
     . celery -> exchange:celery (direct) binding:celery
 . concurrency -> 8
 . loader -> celery.loaders.default.Loader
 . logfile -> [stderr]@INFO
 . events -> ON
 . beat -> ON
 . tasks ->
     . tasks.touch

(D) Task monitoring

Possibility 1: use the celeryev monitoring tool:

start the monitoring tool with:


Data is transfered to the final destination / ingested

Successfull data movement is notified to the DB


cmip5_up /a/b/file -s 'transfered'

The data center looks up and updates the DB according to e.g. data publication status


cmip5_up -n /a/b/dir -s published


This is a note ...

Indices and tables