How to make multiple parallel readers to export data using Google Spanner?

External Backups / Snapshots for Google Cloud Spanner recommends using timestamped queries to create snapshots for export. At the bottom of the Timestamp Bounds documentation it says:

Cloud Spanner constantly garbage collects deleted and overwritten data in the background to reclaim storage space. This process is known as the GC version. By default, the GC version rebuilds the versions after they are one hour. Because of this, Cloud Spanner is unable to read the read timestamp more than one hour in the past.

Thus, any export should be completed in an hour. One reader (i.e. select * from table;

using timestamp X) will not be able to export the entire table for an hour.

How can multiple parallel wrench readers be used?


Note. It is mentioned in one of the comments that support Apache Beam, but it looks like it is used by one reader:

/** A simplest read function implementation. Parallelism support is coming. */

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn. java # L26

Is there a way to make the parallel reader that is required today using outbound APIs? Or should Beam use something that hasn't been released on Google's daw yet?

+3


source to share


2 answers


Edit 2018-03-30 - Sample project has been updated to use the BatchClient offered by Google Cloud Spanner

Following the release of the BatchClient to read / download large amounts of data, the sample project below has been updated to use the new batch client instead of the standard database client. The basic idea of ​​the project is still the same: copying data to / from Cloud Spanner and any other database using standard jdbc functions. The following code snippet establishes a jdbc connection in batch read mode:

if (source.isWrapperFor(ICloudSpannerConnection.class))
{
    ICloudSpannerConnection con = source.unwrap(ICloudSpannerConnection.class);
    // Make sure no transaction is running
    if (!con.isBatchReadOnly())
    {
        if (con.getAutoCommit())
        {
            con.setAutoCommit(false);
        }
        else
        {
            con.commit();
        }
        con.setBatchReadOnly(true);
    }
}

      

When the connection is in read-only mode, the connection will use the BatchClient from the Google Cloud Spanner instead of the default database client. When one of the methods is called, Statement#execute(String)

or PreparedStatement#execute()

(since they allow multiple result sets to be returned), the jdbc driver will create a partitioned query instead of a regular query. The result of this partitioned query will be a series of result sets (one for each section) that can be retrieved using the # getResultSet () and Statement # getMoreResults (int) methods.

Statement statement = source.createStatement();
boolean hasResults = statement.execute(select);
int workerNumber = 0;
while (hasResults)
{
    ResultSet rs = statement.getResultSet();
    PartitionWorker worker = new PartitionWorker("PartionWorker-" + workerNumber, config, rs, tableSpec, table, insertCols);
    workers.add(worker);
    hasResults = statement.getMoreResults(Statement.KEEP_CURRENT_RESULT);
    workerNumber++;
}

      

The result sets returned Statement#execute(String)

are not executed directly, but only after the first call ResultSet#next()

. Passing these result sets to split workflows allows for parallel loading and copying of data.




Original answer:

This project was originally created to convert in a different direction (from local database to Cloud Spanner), but since it uses JDBC for both source and destination, it can also be used in a different way: converting Cloud Spanner database to local database PostgreSQL data. Large tables are converted in parallel using a thread pool.

The project uses this open source JDBC driver instead of the Google-supplied JDBC driver. The original JDBC Cloud Spanner connection is set to read-only and autocommit = false. This ensures that the connection automatically creates a read-only transaction using the current time as the timestamp when the request is first run. All subsequent requests within the same transaction (read-only) will use the same timestamp, which gives you a consistent snapshot of the Google Cloud Spanner database.

It works like this:

  • Set the source database to read-only transactional mode.
  • Conversion method (String catalog, String schema) iterates over all tables in the source database (Cloud Spanner)
  • For each table, the number of records is determined, and depending on the size of the table, the table is copied using either the main application thread or the working pool.
  • The UploadWorker class is responsible for parallel copying. Each employee is assigned a number of records from the table (for example, rows 1 through 2400). The range is selected by a select statement in this format: 'SELECT * FROM $ TABLE ORDER BY $ PK_COLUMNS LIMIT $ BATCH_SIZE OFFSET $ CURRENT_OFFSET'
  • Lock a read-only transaction on the source database after converting all tables.

Below is a snippet of the most important parts.

public void convert(String catalog, String schema) throws SQLException
{
    int batchSize = config.getBatchSize();
    destination.setAutoCommit(false);
    // Set the source connection to transaction mode (no autocommit) and read-only
    source.setAutoCommit(false);
    source.setReadOnly(true);
    try (ResultSet tables = destination.getMetaData().getTables(catalog, schema, null, new String[] { "TABLE" }))
    {
        while (tables.next())
        {
            String tableSchema = tables.getString("TABLE_SCHEM");
            if (!config.getDestinationDatabaseType().isSystemSchema(tableSchema))
            {
                String table = tables.getString("TABLE_NAME");
                // Check whether the destination table is empty.
                int destinationRecordCount = getDestinationRecordCount(table);
                if (destinationRecordCount == 0 || config.getDataConvertMode() == ConvertMode.DropAndRecreate)
                {
                    if (destinationRecordCount > 0)
                    {
                        deleteAll(table);
                    }
                    int sourceRecordCount = getSourceRecordCount(getTableSpec(catalog, tableSchema, table));
                    if (sourceRecordCount > batchSize)
                    {
                        convertTableWithWorkers(catalog, tableSchema, table);
                    }
                    else
                    {
                        convertTable(catalog, tableSchema, table);
                    }
                }
                else
                {
                    if (config.getDataConvertMode() == ConvertMode.ThrowExceptionIfExists)
                        throw new IllegalStateException("Table " + table + " is not empty");
                    else if (config.getDataConvertMode() == ConvertMode.SkipExisting)
                        log.info("Skipping data copy for table " + table);
                }
            }
        }
    }
    source.commit();
}

private void convertTableWithWorkers(String catalog, String schema, String table) throws SQLException
{
    String tableSpec = getTableSpec(catalog, schema, table);
    Columns insertCols = getColumns(catalog, schema, table, false);
    Columns selectCols = getColumns(catalog, schema, table, true);
    if (insertCols.primaryKeyCols.isEmpty())
    {
        log.warning("Table " + tableSpec + " does not have a primary key. No data will be copied.");
        return;
    }
    log.info("About to copy data from table " + tableSpec);

    int batchSize = config.getBatchSize();
    int totalRecordCount = getSourceRecordCount(tableSpec);
    int numberOfWorkers = calculateNumberOfWorkers(totalRecordCount);
    int numberOfRecordsPerWorker = totalRecordCount / numberOfWorkers;
    if (totalRecordCount % numberOfWorkers > 0)
        numberOfRecordsPerWorker++;
    int currentOffset = 0;
    ExecutorService service = Executors.newFixedThreadPool(numberOfWorkers);
    for (int workerNumber = 0; workerNumber < numberOfWorkers; workerNumber++)
    {
        int workerRecordCount = Math.min(numberOfRecordsPerWorker, totalRecordCount - currentOffset);
        UploadWorker worker = new UploadWorker("UploadWorker-" + workerNumber, selectFormat, tableSpec, table,
                insertCols, selectCols, currentOffset, workerRecordCount, batchSize, source,
                config.getUrlDestination(), config.isUseJdbcBatching());
        service.submit(worker);
        currentOffset = currentOffset + numberOfRecordsPerWorker;
    }
    service.shutdown();
    try
    {
        service.awaitTermination(config.getUploadWorkerMaxWaitInMinutes(), TimeUnit.MINUTES);
    }
    catch (InterruptedException e)
    {
        log.severe("Error while waiting for workers to finish: " + e.getMessage());
        throw new RuntimeException(e);
    }

}

public class UploadWorker implements Runnable
{
private static final Logger log = Logger.getLogger(UploadWorker.class.getName());

private final String name;

private String selectFormat;

private String sourceTable;

private String destinationTable;

private Columns insertCols;

private Columns selectCols;

private int beginOffset;

private int numberOfRecordsToCopy;

private int batchSize;

private Connection source;

private String urlDestination;

private boolean useJdbcBatching;

UploadWorker(String name, String selectFormat, String sourceTable, String destinationTable, Columns insertCols,
        Columns selectCols, int beginOffset, int numberOfRecordsToCopy, int batchSize, Connection source,
        String urlDestination, boolean useJdbcBatching)
{
    this.name = name;
    this.selectFormat = selectFormat;
    this.sourceTable = sourceTable;
    this.destinationTable = destinationTable;
    this.insertCols = insertCols;
    this.selectCols = selectCols;
    this.beginOffset = beginOffset;
    this.numberOfRecordsToCopy = numberOfRecordsToCopy;
    this.batchSize = batchSize;
    this.source = source;
    this.urlDestination = urlDestination;
    this.useJdbcBatching = useJdbcBatching;
}

@Override
public void run()
{
    // Connection source = DriverManager.getConnection(urlSource);
    try (Connection destination = DriverManager.getConnection(urlDestination))
    {
        log.info(name + ": " + sourceTable + ": Starting copying " + numberOfRecordsToCopy + " records");

        destination.setAutoCommit(false);
        String sql = "INSERT INTO " + destinationTable + " (" + insertCols.getColumnNames() + ") VALUES \n";
        sql = sql + "(" + insertCols.getColumnParameters() + ")";
        PreparedStatement statement = destination.prepareStatement(sql);

        int lastRecord = beginOffset + numberOfRecordsToCopy;
        int recordCount = 0;
        int currentOffset = beginOffset;
        while (true)
        {
            int limit = Math.min(batchSize, lastRecord - currentOffset);
            String select = selectFormat.replace("$COLUMNS", selectCols.getColumnNames());
            select = select.replace("$TABLE", sourceTable);
            select = select.replace("$PRIMARY_KEY", selectCols.getPrimaryKeyColumns());
            select = select.replace("$BATCH_SIZE", String.valueOf(limit));
            select = select.replace("$OFFSET", String.valueOf(currentOffset));
            try (ResultSet rs = source.createStatement().executeQuery(select))
            {
                while (rs.next())
                {
                    int index = 1;
                    for (Integer type : insertCols.columnTypes)
                    {
                        Object object = rs.getObject(index);
                        statement.setObject(index, object, type);
                        index++;
                    }
                    if (useJdbcBatching)
                        statement.addBatch();
                    else
                        statement.executeUpdate();
                    recordCount++;
                }
                if (useJdbcBatching)
                    statement.executeBatch();
            }
            destination.commit();
            log.info(name + ": " + sourceTable + ": Records copied so far: " + recordCount + " of "
                    + numberOfRecordsToCopy);
            currentOffset = currentOffset + batchSize;
            if (recordCount >= numberOfRecordsToCopy)
                break;
        }
    }
    catch (SQLException e)
    {
        log.severe("Error during data copy: " + e.getMessage());
        throw new RuntimeException(e);
    }
    log.info(name + ": Finished copying");
}

}

      

0


source


You can read data in parallel from a cloud key with a class BatchClient

. For more information follow https://cloud.google.com/spanner/docs/reads#read_data_in_parallel .



If you want to export data from Cloud Spanner, I would recommend using Cloud Dataflow (see integration details here ) as it provides higher level abstractions and handles data processing details such as scaling and failover.

0


source







All Articles