v1.4 To v1.5 Migration

The v1.5 release of Delta includes a lot of improvements like storing the compacted/expanded forms of resources to get immutability and improve performance.

(To have a complete list of improvements, see here).

These improvements require to replay all existing events to a new keyspace and to reindex all data to elasticsearch/Blazegraph in order to:

  • Adopt the new event model for every event
  • Detect and report broken resources in the system.

Depending on the number and the complexity of existing resources and schemas in the system, the migration may last several days (about 25,000 events per hour) to complete.

Nonetheless, migration does not require an interruption of service of Delta 1.4 to be performed, so a double-run of delta instances is recommended during this period.

Note

Please do not perform any write operations on the Delta v1.5 instance as they may have an impact on the ongoing migration.

Requirements

As migration requires having two coexisting keyspaces, the following allocations are recommended:

  • Disk space for Cassandra: 4x more disk space than the amount 1.4 is using (as we now store the compacted/expanded form of the resources)
  • The same amount of CPU/RAM to run the instance performing migration as the one running 1.4.

The migration steps are as follows:

Prepare Cassandra

Perform a backup

Perform a backup of the original keyspace delta_1_4 and perform a repair of the Cassandra nodes:

    nodetool repair

Create a materialized view on the messages table on the original keyspace

Create a materialized view on the messages table on the original keyspace delta_1_4:

    CREATE MATERIALIZED VIEW {delta_1_4}.ordered_messages AS
        SELECT ser_manifest, ser_id, event
        FROM delta.messages
        WHERE timebucket is not null
        AND persistence_id is not null
        AND partition_nr is not null
        AND sequence_nr is not null
        AND timestamp is not null
        PRIMARY KEY(timebucket, timestamp, persistence_id, partition_nr, sequence_nr)
        WITH CLUSTERING ORDER BY (timestamp asc);

The materialized view will take some time to build depending on the number of events, we will come back to it later.

Create the new keyspace for Delta v1.5

Run the following command in the new keyspace for delta 1_5:

    CREATE KEYSPACE IF NOT EXISTS delta_1_5
      WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'}  AND durable_writes = true;

    CREATE KEYSPACE IF NOT EXISTS delta_1_5_snapshot
      WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'}  AND durable_writes = true;

    CREATE TABLE IF NOT EXISTS delta_1_5.messages(
        persistence_id    text,
        partition_nr      bigint,
        sequence_nr       bigint,
        timestamp         timeuuid,
        timebucket        text,
        writer_uuid       text,
        ser_id            int,
        ser_manifest      text,
        event_manifest    text,
        event             blob,
        meta_ser_id       int,
        meta_ser_manifest text,
        meta              blob,
        tags              set<text>,
        PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp))
        WITH gc_grace_seconds =864000
        AND compression = {'class': 'LZ4Compressor'}
        AND compaction = {
            'class' : 'SizeTieredCompactionStrategy',
            'enabled' : true,
            'tombstone_compaction_interval' : 86400,
            'tombstone_threshold' : 0.2,
            'unchecked_tombstone_compaction' : false,
            'bucket_high' : 1.5,
            'bucket_low' : 0.5,
            'max_threshold' : 32,
            'min_threshold' : 4,
            'min_sstable_size' : 50
    };

    CREATE TABLE IF NOT EXISTS delta_1_5.tag_views(
        tag_name            text,
        persistence_id      text,
        sequence_nr         bigint,
        timebucket          bigint,
        timestamp           timeuuid,
        tag_pid_sequence_nr bigint,
        writer_uuid         text,
        ser_id              int,
        ser_manifest        text,
        event_manifest      text,
        event               blob,
        meta_ser_id         int,
        meta_ser_manifest   text,
        meta                blob,
        PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr))
        WITH gc_grace_seconds =864000
        AND compression = {'class': 'LZ4Compressor'}
        AND compaction = {
            'class' : 'SizeTieredCompactionStrategy',
            'enabled' : true,
            'tombstone_compaction_interval' : 86400,
            'tombstone_threshold' : 0.2,
            'unchecked_tombstone_compaction' : false,
            'bucket_high' : 1.5,
            'bucket_low' : 0.5,
            'max_threshold' : 32,
            'min_threshold' : 4,
            'min_sstable_size' : 50
    };

    CREATE TABLE IF NOT EXISTS delta_1_5.tag_write_progress(
        persistence_id      text,
        tag                 text,
        sequence_nr         bigint,
        tag_pid_sequence_nr bigint,
        offset              timeuuid,
        PRIMARY KEY (persistence_id, tag)
    );

    CREATE TABLE IF NOT EXISTS delta_1_5.tag_scanning(
        persistence_id text,
        sequence_nr    bigint,
        PRIMARY KEY (persistence_id)
    );

    CREATE TABLE IF NOT EXISTS delta_1_5.metadata(
        persistence_id text PRIMARY KEY,
        deleted_to     bigint,
        properties     map<text,text>
    );

    CREATE TABLE IF NOT EXISTS delta_1_5.all_persistence_ids(
        persistence_id text PRIMARY KEY
    );

    CREATE TABLE IF NOT EXISTS delta_1_5_snapshot.snapshots(
        persistence_id    text,
        sequence_nr       bigint,
        timestamp         bigint,
        ser_id            int,
        ser_manifest      text,
        snapshot_data     blob,
        snapshot          blob,
        meta_ser_id       int,
        meta_ser_manifest text,
        meta              blob,
        PRIMARY KEY (persistence_id, sequence_nr))
        WITH CLUSTERING ORDER BY (sequence_nr DESC) AND gc_grace_seconds =864000
        AND compression = {'class': 'LZ4Compressor'}
        AND compaction = {
        'class' : 'SizeTieredCompactionStrategy',
        'enabled' : true,
        'tombstone_compaction_interval' : 86400,
        'tombstone_threshold' : 0.2,
        'unchecked_tombstone_compaction' : false,
        'bucket_high' : 1.5,
        'bucket_low' : 0.5,
        'max_threshold' : 32,
        'min_threshold' : 4,
        'min_sstable_size' : 50
    };

    CREATE TABLE IF NOT EXISTS delta_1_5.projections_progress(
        projection_id text primary key,
        offset              timeuuid,
        timestamp           bigint,
        processed           bigint,
        discarded           bigint,
        warnings            bigint,
        failed              bigint,
        value               text,
        value_timestamp     bigint,
    );

    CREATE TABLE IF NOT EXISTS delta_1_5.projections_errors(
        projection_id  text,
        offset         timeuuid,
        timestamp      bigint,
        persistence_id text,
        sequence_nr    bigint,
        value          text,
        value_timestamp     bigint,
        severity       text,
        error_type     text,
        message        text,
        PRIMARY KEY ((projection_id), timestamp, persistence_id, sequence_nr))
        WITH CLUSTERING ORDER BY (timestamp ASC, persistence_id ASC, sequence_nr ASC)
        AND compression = {'class': 'LZ4Compressor'};

Note that compression has been added in v1.5.

Check the materialized view

These operations may take a while to complete depending on the number of rows in the messages table.

  • On the messages table:

    cqlsh -e 'select timebucket from delta.messages' | grep rows
    
  • On the materialized view:

    cqlsh -e 'select timebucket from delta.ordered_messages' | grep rows
    

When the materialized view has the same number of rows as the table then it is ready for migration.

Run Delta in migration mode

Pull the docker image for Delta 1.5

Pull the docker image for Delta 1.5 from Docker Hub

    docker pull bluebrain/nexus-delta:1.5.x

Configure Delta in migration mode

This section defines the mandatory configuration to run Delta in migration mode.

Environment variables:

  • KAMON_ENABLED: false (not mandatory but recommended)
  • MIGRATE_DATA: true
  • SCHEMA_VALIDATION_DISABLED: “true”
  • DISABLE_INDEXING: “true”
  • DELTA_PLUGINS: /opt/docker/plugins/

JVM Parameters:

  • Adopt the same values as the one for your current deployment

Java Properties:

Description Property Example value
Service binding interface app.http.interface 0.0.0.0
Service Uri Path prefix app.http.base-uri {delta-url}:8080/v1
Cassandra contact point (1) app.database.cassandra.contact-points.1 cassandra-1:9042
Cassandra contact point (2) app.database.cassandra.contact-points.2 cassandra-2:9042
Cassandra contact point (3) app.database.cassandra.contact-points.3 cassandra-3:9042
Cassandra username app.database.cassandra.username
Cassandra password app.database.cassandra.password
Cassandra timeout datastax-java-driver.basic.request.timeout 10 seconds
Secrets encryption password app.encryption.password
Secrets encryption salt app.encryption.salt
Remote storage enabling plugins.storage.storages.remote-disk.enabled true/false
S3 storage enabling plugins.storage.storages.amazon.enabled true/false
Delta 1.4 keyspace migration.replay.keyspace delta
Bucket of the first event migration.replay.first-time-bucket YYYYMMDDTHH:MM (Same as the one on your Delta 1.4 deployment)

Start Delta 1.5 with this configuration.

Check the delta logs

  • The following messages indicating migration should appear at startup:
    2021-05-05 14:34:42 INFO  ch.epfl.bluebrain.nexus.delta.Main - Plugins discovered: blazegraph-1.5.xxx, elasticsearch-1.5.xxx, storage-1.5.xxx, composite-views-1.5.xxx, archive-1.5.xxx
    2021-05-05 14:34:46 INFO  ch.epfl.bluebrain.nexus.delta.Main - Starting Delta in migration mode
    

If remote/S3 storage are enabled, these lines should appear too:

    2021-05-05 14:35:00 INFO  c.e.b.n.d.p.s.StoragePluginConfig - Remote-disk storage is enabled

Keep looking regularly in the logs to look if the migration is still ongoing without problems. If the migration crashes, a restart should allow it to start back from where it stopped.

Monitor the health of the platform

Please check the general health of the platform (especially Cassandra available disk space).

Check the migration progress:

Run the following query on table projections_progress in cqlsh (or your favorite cql client):

    select projection_id, processed, failed, warnings, toTimestamp(offset) as migratedUpTo, timestamp from delta_1_5.projections_progress;

This will return a similar response:

    projection_id  | processed | failed | warnings | migratedupto
    -----------+--------+----------+--------------------------------------------------
    migration-v1.5 |     38499 |  10000 |       12 | 2019-04-10 08:53:01.482000+0000 

The table contains the following information:

  • processed: total number of processed events
  • failed: number of failures risen during the migration
  • warnings: number of warnings risen during the migration
  • migratedUpTo: timestamp of the last event processed by migration when migration last saved progress

Once Migration terminates, the sum of the processed and failed columns should be equal to the number of events in your Delta 1.4 deployment.

When looking at the logs, these lines should be also repeated (the timestamp should match the current date/hour):

    2021-05-06 04:00:20 INFO  c.e.b.n.m.replay.ReplayMessageEvents - Next offset is 25683800-ae17-11eb-8080-808080808080 (2021-05-06 03:00:00:000)
    2021-05-06 04:00:20 INFO  c.e.b.n.m.replay.ReplayMessageEvents - We got 0 rows

Check the number of messages in the Delta 1.5

These operations may take a while to complete but they should give the same result (minus the errors that may have happened)

  • On the Delta messages table:

    cqlsh -e 'select timebucket from delta.messages' | grep rows
    
  • On the Delta 1.5 message table:

    cqlsh -e 'select timebucket from delta_1_5.messages' | grep rows
    

Check the migration errors/warnings:

The errors/warnings are reported in the projections_errors table.

There are two types of errors:

Failures

The event could not be migrated for the given reason:

  • Invalid data due to previous bugs in Delta (invalid iris, acl paths, …)
  • Revision error as a previous revision has been rejected

There should be very few of them, and their resolution should be addressed in a process that is beyond the scope of migration.

Warnings

The event could be migrated, but a fix had to be performed on the event source (original payload posted by the user). Example:

  • When an id in the source can’t be resolved to match the one computed in v1.4, it is overwritten by this id

The errors table contains the following information:

  • projection_id: the id of the projection (migration-v1.5)
  • timestamp: when the error was risen
  • persistence_id: the internal id of the event in v1.4
  • sequence_nr: the sequence number of the event in v1.4
  • offset: the offset of the event in v1.4
  • error_type: the type of error
  • message: the message and the context of the error
  • severity: the level of error (Failure or Warning as explaining above)

To get the errors:

    select * from delta_1_5.projections_errors;

Note that Cassandra allows you to export table as a CSV file which could be useful if you want to share, process or filter them.

    copy delta_1_5.projections_errors to '/path/file.csv'

To get a better clue at the error, check the resource in your running v1.4 deployment and or read the event in the delta v1.4 table:

    select blobAsText(event) from {delta_1_4}.messages where persistence_id = '{persistence_id}' and sequence_nr={sequence_nr} and partition_nr =0;

Restart in normal mode

Once migration is completed and the previous checks have been performed, Delta can be restarted in normal mode.

The previous Delta instance can be stopped. After the restart, a complete reindexing of the data in elasticsearch/blazegraph will take place.

  • Stop the Delta 1.4 instance ;
  • Delete all ElasticSearch indices:
    curl -XDELETE 'http://{elasticsearch_host}/delta_*'
    
  • Delete all BlazeGraph namespaces:

    for i in `curl -s 'http://{blazegraph_host}/blazegraph/namespace?describe-each-named-graph=false' | grep sparqlEndpoint | grep -o --color "rdf:resource=\"[^\"]*" | sed 's/rdf:resource="//' | sed 's#/sparql$##' | grep -v kb | grep -v LBS`
       do curl -X DELETE "$i"
    done
    
  • Restart the Delta v1.5 instance in normal mode according to the Delta v1.5 deployment documentation and by removing the environment variables MIGRATE_DATA, SCHEMA_VALIDATION_DISABLED and DISABLE_INDEXING.

  • Check the delta logs after restart
  • As the entire indexing will take place, some timeouts may occur

  • Leaving kamon disabled (environment variable KAMON_ENABLED set to false) during this indexing is recommended
  • Allocating temporarily more resources to Cassandra/Delta/elasticsearch/Blazegraph may help as this part is greedy in resources
  • Increasing temporarily the delay in retry strategies can also help reduce the pressure on the platform. See: Delta configuration
  • Check the statistics views for different views, they should progressively tend to get to 0 remaining events.

  • Check the cluster stats endpoint in elasticsearch, the number of indices and docs should stabilize after a while.
  • When indexing is finished, the CPU/memory used by the platform should decrease.

Clean-up

  • Once Delta v1.5 is up and running, you can delete the old keyspace:

    DROP KEYSPACE {delta_1_4};
    
  • The old Delta v1.4 instance can also be removed.

  • Reverse the temporary resources / configuration changes that were needed for global reindexing.