Bulk Inserts — Batch Size vs Speed

Recently I’ve been working on a project where I have to insert 1 million+ rows. This is the sort of task that comes around to many a backend engineer every so often. Over the course of time you learn that taking the easiest route, line-by-line inserts, isn’t the best solution if you want to have speed and efficiency.

foreach ($records as $record) {
$record->save();
}

You eventually learn about bulk inserts and how much better suited they are at this type of exercise.

$records->chunk($randomChunkSize)->each(function($chunk){
Model::insert($chunk);
});

This usually leads to way better results and so you pat yourself on the back for a job well done and go on your merry way to your next super-high-must-do-today-priority task. But once in a while, you get just enough of a breather to ask: is my $randomChunkSize really getting the best result possible?


Down the Rabbit Hole

Initially this is what my code looked like:

while ($chunk = array_splice($this->records, 0, 300)) {
$model::insert($chunk);
}

And the amount of time it took to insert 164,157 rows (reduced for the purpose of this article) was 102 seconds.

I decided to arbitrarily increase the chunk size to 1000. For the same amount of rows this yielded 216 seconds.

How could this be? How could it take more time to insert if I was processing bigger chunks?

Research

At this point I started looking for what the max possible insert statement size was for MySQL. I read in more than a few places that the thing I should be paying attention to was the max_allowed_packet setting. Ours was set at 1GB so I thought to myself that I should be able to send a ridiculously high bulk size for insertion. I tried to get a little scientific and actually track the byte size for the entire insert statement and cut it off just before I reached the 1GB limit. To my surprise I came across the following error:

SQLSTATE[HY000]: General error: 1390 Prepared statement contains too many placeholders

After referencing some MySQL documentation and searching around regarding this error, eventually I landed across this great post. After learning some great nuggets from that article, the real linchpin for me was towards the very end of this package Benjamin put together. I’d seen this 65535 number mentioned as a limit imposed by MySQL before as well. But Benjamin figured out that the real limit was floor(65535/ number of columns in destination table) . So what does this mean exactly?

Every time you execute a MySQL statement via a programming language (such as PHP in my case), the query statement must be prepared for execution. I’m not prepared to go into all the minutiae of that process, but specifically related to my issue was the fact that all the values that are to be inserted are replaced with a placeholder ? character. This query statement must then be bound (bind’ed) to their respective values (which are themselves processed in some manner). If you use Laravel, you can see how this is done for yourself in the Grammar class. When you have columns that accept NULL values, the ? placeholders are kept in the query, and when you reach the limit beyond floor(65535 / number of columns in destination table) you get the 1390 error above.

The Fix

Now if you read Benjamin’s post, you know that the absolute king of bulk data insert is the the LOAD DATA statement. I plan to write something up which will handle the additional overhead that requires at some point in the future. The route I decided to go with was what was outlined in the MySQL InnoDB bulk loading optimization page; specifically using a single database transaction where you turn off a few settings that in this instance slow down this process.

// Please reference the gist at the end of this page.
// This is in pseudo-code fashion
$connection->transaction(function ($connection) {
$connection->select("SET autocommit = 0");
$connection->select("SET unique_checks = 0");
$connection->select("SET foreign_key_checks = 0");
    do {
$start = $this->optimalRecordSplit->shift();
        $length = $this->optimalRecordSplit->first() - $start;
        $connection->insert($this->records->splice(0, $length));
}while($this->optimalRecordSplit->isNotEmpty());
    $connection->select("SET autocommit = 1");
$connection->select("SET unique_checks = 1");
$connection->select("SET foreign_key_checks = 1");
});

With this in place I was able to achieve insertion of those 167k records in 89 seconds!

In the table below I tracked the actual amount of time that it takes to execute each insert statement. Just by trying to compare the amount of time it takes a similar amount of rows to upload, you can see why this optimized fashion of bulk insertion is faster than your “standard” way of doing bulk inserts.

For a quick and easy comparison, notice the last row in the left section, 2,740 rows in 1.02 seconds against an equivalent time on the right side of 2,100 rows in 1.07 seconds.


If you’re a DBA I’d love to chat about why the increased bulk size actually increases the time to insert all these records. And if you’re a developer here’s a gist of the class I wrote to process these bulk inserts, I’d love to hear some feedback.

<?php
namespace App\Classes;
use Doctrine\DBAL\Types\Types;
use Illuminate\Console\OutputStyle;
use Illuminate\Database\Connection;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Query\Grammars\Grammar;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
class BulkEloquentInserter
{
const MYSQL_PLACEHOLDER_LIMIT = 65535;
const QUERY_MAX_ALLOWED_PACKET = "show variables like 'max_allowed_packet';";
const QUERY_MAX_BULK_INSERT_BUFFER_SIZE = "show variables like 'bulk_insert_buffer_size';";
/**
* @var Model $model
*/
protected $model;
/**
* @var array $tableColumns
*/
protected $tableColumns;
/**
* @var int $columnCount
*/
protected $columnCount;
/**
* @var int $optimalPlaceholderLimit
*/
protected $optimalPlaceholderLimit;
/**
* @var int $maxAllowedPacket Max allowed bytes for a single SQL statement
*/
protected $maxAllowedPacket;
/**
* @var int $maxBulkInsertBufferSize Max buffer size for a single SQL Insert Statement
*/
protected $maxBulkInsertBufferSize;
/**
* @var Collection $records
*/
protected $records;
/**
* @var int $totalRecordCount
*/
protected $totalRecordCount;
/**
* @var bool $trackOptimalRecordsSplit Should a table only have one nullable column (auto-incrementing column)
* this will be set to false and the $defaultRecordSplit value used instead.
*/
protected $trackOptimalRecordsSplit = true;
/**
* @var int $defaultRecordSplit To be used when the only nullable value on a table is the auto-incrementing column.
*/
protected $defaultRecordSplit = 100;
/**
* @var Collection $optimalRecordsSplit
*/
protected $optimalRecordsSplit;
/**
* @var int $placeholderCount
*/
private $placeholderCount = 0;
/**
* @var int $byteSize
*/
private $byteSize = 0;
/**
* BulkEloquentInserter constructor.
* @param Model|string $model
* @param Collection|array $records
*/
public function __construct($model, $records)
{
$this->setModel(new $model);
if (! ($records instanceof Collection)) {
$records = collect($records);
}
$this->setRecords($records);
}
/**
* @param Model $model
* @return BulkEloquentInserter
*/
public function setModel(Model $model)
{
$this->model = $model;
$this->setTableMetaData();
$this->setOtherMetaData();
$this->setOptimalPlaceholderLimit();
$this->setRecords(collect());
return $this;
}
/**
* @param Collection $records
* @return BulkEloquentInserter
*/
public function setRecords(Collection $records)
{
$this->optimalRecordsSplit = collect([0]);
/*
* Need to make sure that we order values based on Columns gathered from the destination table
* */
$this->records = $records->transform(function($record, $key) {
if (Arr::isAssoc($record)) {
$newRecord = [];
/*
* $tableColumns have already been placed in alphabetical order. Using this
* function eliminates the need to sort this record by key.
* */
foreach ($this->getTableColumns() as $k => $stringKey) {
$value = @$record[$stringKey];
/*
* Because created_at/updated_at columns could be nullable,
* we are ensuring that they have values so as to not
* take up a placeholder value.
* */
if (in_array($stringKey, ['created_at', 'updated_at']) && is_null($value))
$value = date('Y-m-d H:i:s', time());
$newRecord[] = $value;
}
$record = $newRecord;
}
/*
* Will assume the developer knows what they're doing and have
* placed values according to alphabetically sorted columns
* */
$processedRecord = new ProcessedRecord($this->getConnection()->getQueryGrammar(), $record);
$this->trackRecordSplit($processedRecord, $key);
return $processedRecord;
});
$this->totalRecordCount = $this->records->count();
return $this;
}
public function execute(OutputStyle $output = null)
{
if (! is_null($output))
$output->title("Bulk Record Insertion");
try {
$table = $this->getModel()->getTable();
$columns = $this->getConnection()->getQueryGrammar()->columnize($this->getTableColumns());
if (! is_null($output)) $output->writeln("<comment>Starting transaction…</comment>");
$pdo = $this->getConnection()->getPdo();
$this->getConnection()->getPdo()->setAttribute($pdo::ATTR_EMULATE_PREPARES, false);
$start = microtime(true);
$this->getConnection()->transaction(function ($connection) use ($table, $columns, $output) {
/* @var Connection $connection */
foreach ($this->getPerformanceEnhancingStatements(false) as $statement) {
$connection->select($statement);
}
if (! is_null($output)) {
$formattedRecordCount = number_format($this->totalRecordCount);
$output->writeln("<comment>There are {$formattedRecordCount} records to insert.</comment>");
$output->progressStart($this->totalRecordCount);
}
do {
$length = $this->getOptimalLength();
$processedRecords = $this->records->splice(0, $length);
/* @var ProcessedRecord $firstRecord */
$firstRecord = $processedRecords->shift();
$parameters = $firstRecord->parameterized;
$recordValues = array_merge([], $firstRecord->record);
while($record = $processedRecords->shift()) {
$parameters .= ", $record->parameterized";
$recordValues = array_merge($recordValues, $record->record);
}
$insertQuery = "INSERT INTO {$table} ($columns) VALUES {$parameters}";
$insertStatement = $connection->getPdo()->prepare($insertQuery);
$connection->bindValues($insertStatement, $connection->prepareBindings($recordValues));
$success = $insertStatement->execute();
if (! is_null($output) && $success) $output->progressAdvance($length);
} while ($this->records->isNotEmpty());
foreach ($this->getPerformanceEnhancingStatements(true) as $statement) {
$connection->select($statement);
}
});
$end = microtime(true);
if (! is_null($output)){
$output->progressFinish();
$timeInSeconds = round($end – $start, 3);
$output->writeln("<comment>Transaction took {$timeInSeconds}s to complete.</comment>");
$output->newLine();
}
return true;
} catch (\Throwable $e) {
echo "\n{$e->getMessage()}\n{$e->getTraceAsString()}\n";
}
}
/**
* @param Model|string $model
* @param Collection|array $records
* @param OutputStyle $output
* @return mixed
*/
public static function upload($model, $records, $output = null)
{
return (new self($model, $records))->execute($output);
}
/**
* @return Model
*/
protected function getModel()
{
return $this->model;
}
/**
* @return Connection
*/
private function getConnection()
{
return $this->model->getConnection();
}
protected function setTableMetaData()
{
$this->tableColumns = $this->getConnection()
->getSchemaBuilder()
->getColumnListing(
$this->getModel()->getTable()
);
$containsNullableAutoIncrement = false;
$nullableColumns = 0;
foreach ($this->tableColumns as $column) {
$dColumn = $this->getConnection()
->getDoctrineColumn(
$this->getModel()->getTable(),
$column
);
if ($dColumn->getAutoincrement()) {
$containsNullableAutoIncrement = true;
} else if (! $dColumn->getNotnull()) {
if (in_array($dColumn->getType()->getName(), [Types::DATETIME_MUTABLE, Types::DATETIME_IMMUTABLE])) {
/*
* Do not want to include created_at/updated_at columns in
* the nullable count.
* */
if (in_array($column, ['created_at', 'updated_at']))
continue;
}
$nullableColumns++;
if ($nullableColumns >= 1)
break;
}
}
if ($containsNullableAutoIncrement && $nullableColumns === 0) {
$this->trackOptimalRecordsSplit = false;
}
sort($this->tableColumns);
$this->columnCount = count($this->tableColumns);
}
protected function setOtherMetaData()
{
$result = $this->getConnection()
->selectOne(self::QUERY_MAX_ALLOWED_PACKET);
if ($result)
$this->maxAllowedPacket = $result->Value – 10000; // 10KB for some wiggle room
$result = $this->getConnection()
->selectOne(self::QUERY_MAX_BULK_INSERT_BUFFER_SIZE);
if ($result)
$this->maxBulkInsertBufferSize = $result->Value – 10000;
}
/**
* @return array
*/
protected function getTableColumns()
{
return $this->tableColumns;
}
/**
* @return int
*/
protected function getColumnCount()
{
return $this->columnCount;
}
protected function setOptimalPlaceholderLimit()
{
$this->optimalPlaceholderLimit = floor(self::MYSQL_PLACEHOLDER_LIMIT / $this->getColumnCount());
}
/**
* @param bool $isOn
* @return array
*/
protected function getPerformanceEnhancingStatements(bool $isOn)
{
$databaseVars = [
'autocommit',
'unique_checks',
'foreign_key_checks'
];
$isOn = (int) $isOn;
return array_map(function($var) use ($isOn) {
return "SET $var = $isOn;";
}, $databaseVars);
}
protected function getOptimalLength()
{
if ($this->trackOptimalRecordsSplit) {
$start = $this->optimalRecordsSplit->shift();
return $this->optimalRecordsSplit->get(0, $this->totalRecordCount) – $start;
}
return $this->defaultRecordSplit;
}
private function trackRecordSplit(ProcessedRecord $record, int $currentKey)
{
if ($this->trackOptimalRecordsSplit) {
$this->placeholderCount += $record->getPlaceholderCount();
$this->byteSize += $record->getByteSize();
$isAbovePlaceholderLimit = $this->placeholderCount > $this->optimalPlaceholderLimit;
$isAboveBulkInsertBufferSize = $this->byteSize > $this->maxBulkInsertBufferSize;
$isAboveMaxAllowedPacketSize = $this->byteSize > $this->maxAllowedPacket;
if ($isAbovePlaceholderLimit || $isAboveBulkInsertBufferSize || $isAboveMaxAllowedPacketSize) {
$this->optimalRecordsSplit->push($currentKey);
$this->placeholderCount = $record->getPlaceholderCount();
$this->byteSize = $record->getByteSize();
}
}
}
}
class ProcessedRecord {
/**
* @var string $parameterized
*/
public $parameterized;
/**
* @var array $record
*/
public $record;
/**
* @var int $placeholderCount
*/
protected $placeholderCount;
/**
* @var int $byteSize
*/
protected $byteSize;
public function __construct(Grammar $grammar, array $record)
{
$this->parameterized = '(' . $grammar->parameterize($record) . ')';
$this->record = $record;
}
private function setPlaceholderCount()
{
$count = count($this->record);
$this->placeholderCount = $count – count(array_filter($this->record, function($r){
return $r === null;
}));
}
/**
* @return int
*/
public function getPlaceholderCount()
{
if (is_null($this->placeholderCount))
$this->setPlaceholderCount();
return $this->placeholderCount;
}
private function setByteSize()
{
$particle = '(' . implode(', ', $this->record) . ')';
$this->byteSize = mb_strlen($particle);
}
/**
* @return int
*/
public function getByteSize()
{
if (is_null($this->byteSize))
$this->setByteSize();
return $this->byteSize;
}
}

Leave a Reply

Like what you've read? Then please make sure you've left a like or a comment. If you'd be willing to receive email notifications of new ramblings when they arrive, I'd appreciate it! If not, I'd be happy to have you back soon.

Designed with WordPress

Discover more from Eddie's Code Shop

Subscribe now to keep reading and get access to the full archive.

Continue reading