At Beyond we are mainly using PostgreSQL databases for our services, consisting of over a hundred tables. With some of the largest ones we’ve already had a few close calls as they were reaching the maximum supported 32TB table size - but that’s a story for another day. Backfilling tables that large is not a trivial task, assuming we want to perform it safely, and within a reasonable timeframe. In this post, we are going to outline a few methods and tools similar to what we have successfully used to make this process scalable, and a little less painful.
What’s backfilling and why can it be a problem?
Backfilling simply means modifying or adding new data to existing database records, i.e. running UPDATE
statements, but with an emphasis on performing it on all or most of the records in a table.
When would it be needed?
- We want to modify a column’s value to fix faulty data
- We have to add a new field, i.e. insert a new column to the table, with either a default value or with data computed by arbitrary business logic.
In any case, the main issue is that if we want to perform a backfill on a seriously large table, it just simply takes a very long time. And by that I mean if we just naively issue an UPDATE table SET field=value WHERE id=n
for each and every row, it will finish in weeks, months, or even years, depending on the use case’s context. In a specific case, we had to update a 22TB table with hundreds of billions of rows where the estimated time to completion was over 2 years.
With tables of this size, just creating a copy of the whole table and doing the update on it to avoid certain problems is not always a viable option, so we’ll focus on the use case where we really want to update the live table. The first challenge is to complete this process as quickly as possible, naturally. The second one is to accept that it won’t be quick enough, and come up with tools that make this task more controlled and manageable, while making sure our application can still operate on the table without major hiccups.
What can we do?
Our fictional scenario will be performed on a table named users. We’ll act like we have billions of users in our system. We want to backfill this table, inserting values into our newly created reversed_last_name
column where we want to store our users’ last name, but, wait for it: reversed. And we want to do this because of, well, reasons. We will also pretend this is something that needs some calculations beforehand in a script and we couldn’t possibly do the update with just a snappy SQL expression (you know, like REVERSE()
).
Let’s perform the update in batches
Updating rows one by one puts a massive overhead on the process, and is the primary culprit for slowness. The solution seems simple enough: use as few individual UPDATE
statements as possible. To do this, we want to write our data migration script so that it will output SQL which uses PostgreSQL’s CASE
expression:
1
2
3
4
5
6
7
8
9
10
11
UPDATE users
SET reversed_last_name = (
CASE
WHEN id = 1 THEN 'htulB'
WHEN id = 2 THEN 'nasemraP'
WHEN id = 3 THEN 'walboL'
[...]
ELSE last_name
END
)
WHERE id >= 1 AND id <= 500
As you can see from the WHERE
clause, we want to build update statements only for a subset of data. We might be tempted to build one huge UPDATE
on all the rows, but there are a couple of issues with that. You’ll find that PostgreSQL puts an entire table lock on this huge table until the statement execution is completed, which unfortunately still won’t happen fast enough. Besides our application being denied write access to any rows, it would very likely cause a myriad of other issues. What works best is finding an equilibrium, and executing the updates in batches, balancing the number of rows affected in one batch, being aware that they will be locked while being updated, but processed and released from the lock in a reasonable amount of time.
To help us compose these batch operations, we’ll create a simple iterator class that will receive a database connection, the name of the table we want to backfill, the desired batch size, and an optional start_batch
indicator that will come in handy later, when we implement a pause/resume feature and parallel processing. The class retrieves the max id of the table, and will yield us id
boundaries according to the batch size, with a begin_id
and an end_id
which will help us build our select queries and batched update statements.
This is a simplified example and it assumes we have integer primary keys called id and we have a data set starting from id=1.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import math
class TableBatchIterator:
def __init__(self, conn, table, batch_size, start_batch=1):
self.batch_size = batch_size
self.start_batch = start_batch
self.max_id = self.table_max_id(conn, table)
self.total_batches = math.ceil(self.max_id / batch_size)
def __iter__(self):
self.current_batch = self.start_batch
return self
def __next__(self):
if self.current_batch > self.total_batches:
raise StopIteration
begin_id = (self.current_batch - 1) * self.batch_size + 1
end_id = min(self.current_batch * self.batch_size, self.max_id)
self.current_batch += 1
return (begin_id, end_id)
@staticmethod
def table_max_id(conn, table):
with self.conn.cursor() as cursor:
cursor.execute(f'SELECT max(id) FROM {table};')
row = cursor.fetchone()
return row[0]
To build the update statements shown above, we’ll also need something that takes the begin and end id values, the field name, and the new value. We’ll use a simple to use class for this.
The BatchUpdateBuilder
class exposes two public methods: update()
, which will be responsible for accumulating all the new values we want to persist for a given id and column name, and build_sql()
, which will return the corresponding UPDATE
SQL statement string with the CASE/WHEN
expressions built from the values stored by update()
. It will also append a WHERE
clause to the statement to narrow down the scope to rows concerned in the given batch.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import json
class BatchUpdateBuilder:
def __init__(self, table, begin_id, end_id):
self.table = table
self.begin_id = begin_id
self.end_id = end_id
self.updates = {}
def update(self, _id, field, new_value):
field_updates = self.updates.setdefault(field, {})
field_updates[_id] = new_value
def build_sql(self, begin_id, end_id):
if not self.updates:
return None
sql = (
f'UPDATE {self.table}\\n'
f'SET\\n{self._build_cases()}\\n'
f'WHERE {self._build_where(begin_id, end_id)}'
)
return sql
def _build_cases(self):
cases = []
for field, field_updates in self.updates.items():
cases.append(self._build_field_case(field, field_updates))
return ',\\n'.join(cases)
def _build_field_case(self, field, field_updates):
cases = []
for _id, new_value in field_updates.items():
cases.append(f'WHEN id = {_id} THEN {self._to_sql_value(new_value)}')
cases = '\\n '.join(cases)
return (
f' {field} = (\\n'
' CASE\\n'
f' {cases}\\n'
f' ELSE {field}\\n'
' END\\n'
' )'
)
def _build_where(self):
return f'id >= {self.begin_id} AND id <= {self.end_id}'
@staticmethod
def _to_sql_value(value):
if type(value) in (dict, list):
return f'\\'{json.dumps(value)}\\'::jsonb'
elif type(value) is str:
return f'\\'{value}\\''
elif value is None:
return 'null'
else:
return str(value)
With these in place, we can do something like this, demonstrating with a Django example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from django.db import connection
from . import BatchUpdateBuilder, TableBatchIterator
BATCH_SIZE = 500
TABLE_NAME = 'users'
table_batch_iterator = TableBatchIterator(
conn=connection, table=TABLE_NAME, batch_size=BATCH_SIZE
)
cursor = connection.cursor()
for boundaries in table_batch_iterator:
begin_id, end_id = boundaries
batch_update_builder = BatchUpdateBuilder(TABLE_NAME, begin_id, end_id)
users = User.objects.select_for_update().filter(id__gte=begin_id, id__lte=end_id)
for user in users:
reversed_last_name = user.last_name[::-1]
batch_update_builder.update(user.id, 'reversed_last_name', reversed_last_name)
sql = batch_update_builder.build_sql()
cursor.execute(sql)
cursor.close()
Pretty straightforward. We create an instance of the TableBatchIterator
instrumented to perform the updates in batches of 500 (at most, actual batch size may vary if rows with certain id values are missing) on our users table, then iterate through it to build and execute a bunch of UPDATE
statements generated by the BatchUpdateBuilder.
Moving forward
In the second part of this blog post we’ll be going further, adding a progress calculator to help us monitor the status of the process, and as a last step we will create a lightweight queue system that makes it possible to run the task in parallel processes while also allowing us to pause and resume the process on demand.