This post was developed in collaboration with
TimescaleDB
& is also available on theTimescaleDB
Blog
Over 2022-23 while working at Mainstream Renewable Power
on an internal web application, I maintained a “data pipeline” that fetches files of sensor data readings from the world’s most remote places, and transforms them into useful datasets. These datasets form the basis upon which the construction of renewables (wind turbines or solar panels) on site hinges. I rebuilt the pipeline on top of TimescaleDB
1, which enabled me to massively reduce the complexity of the system involved.
I reflect on this experience in detail in Struggling to Sync Sensors & Databases. I do not, however, discuss how I adapted Django
to play nicely with this database. In my case, Django
served as the “glue” between web browsers and the database. Specifically, to display a web page, it asks a database for the data it needs to render files that the browser interprets (HTML, CSS, and JavaScript) so it can display a user interface.
Let’s walk through an example project to make these adaptations a bit more concrete.
This tutorial assumes some familiarity with Django
or a similar web framework. If you have never used Django
I highly recommend the official tutorial
If you want to follow along locally, you can setup a developer environment via
django-timescaledb-example
If you have any trouble getting setup, feel free to ask a question at
django-timescaledb-example/discussions
Table of Contents
Create a Sensor Data App Using Django
Let’s first run
python manage.py startapp sensor
to create files
sensor
├── __init__.py
├── admin.py
├── apps.py
├── migrations
│ └── __init__.py
├── models.py
├── tests.py
└── views.py
and register the app in core/settings.py
INSTALLED_APPS = [
# Builtin
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# Custom
'sensor'
]
Create a Homepage
Let’s quickly create a home page which will be displayed on first opening this web application in a browser.
# sensor/views.py
from django.shortcuts import render
def index(request):
return render(request, "index.html")
<!--- sensor/templates/index.html -->
<div style="text-align: center">
<h1>django-timescaledb-example</h1>
</div>
# sensor/urls.py
from django.urls import path
from . import views
app_name = "sensor"
urlpatterns = [
path('', views.index, name="root"),
]
# core/urls.py
from django.contrib import admin
from django.shortcuts import redirect
from django.urls import include
from django.urls import path
urlpatterns = [
path('', lambda request: redirect('sensor:root')),
path('admin/', admin.site.urls),
path('sensor/', include('sensor.urls')),
]
So now http://localhost:8000
should display index.html
. We can build on this index.html
to link to other pages.
Create a Data Model for Files
Now I can adapt sensor/models.py
to add a File
model to track uploaded files,
# sensor/models.py
class File(models.Model):
file = models.FileField(upload_to="readings/", blank=False, null=False)
uploaded_at = models.DateTimeField(auto_now_add=True)
parsed = models.DateTimeField(blank=False, null=False)
parse_error = models.TextField(blank=True, null=True)
create its database migration2
python manage.py makemigrations sensor
and roll it out
python manage.py migrate
Any change to
sensor/models.py
requires a corresponding database migration!
Handle File Uploads via Browser
Now that we have somewhere to store files of readings, we need to handle file uploads.
Let’s consider only the case where time-series data originates only from text files. How do I copy data from files into TimescaleDB via Django?
The Django
documentation covers File Uploads. However, it doesn’t advise on importing file contents to a database. One normally uses Django
to add and save new entries to a database using input from a browser:
Django
sends a web page to a browser containing one or more<form>
elements.- Once filled in, these
<form>
elements are sent back to Django. Django
processes these entries and saves them to the database using theDjango ORM
.
The key enabler here is the ORM (or “Object Relational Mapper”). It maps a Python class to a database table so that this table’s data is easily accessible from within Python. Without an ORM, one would have to use the SQL language to communicate with the database.
We need to do a bit of work to adapt this workflow to handle file contents.
In a similar manner, I can create a “view” to render HTML
to accept browser file uploads:
# sensor/views.py
from django.shortcuts import render
from django.shortcuts import redirect
from django.http import HttpResponse
from .forms import FileForm
# ...
def upload_file(request):
if request.method == "POST":
form = FileForm(request.POST, request.FILES)
if form.is_valid():
form.save()
return HttpResponse("File upload was successful")
else:
return HttpResponse("File upload failed")
else:
form = FileForm()
return render(request, "upload_file.html", {"form": form})
# ...
# sensor/forms.py
from django.forms import ModelForm
from .models import File
class FileForm(ModelForm):
class Meta:
model = File
fields = "__all__"
<!--- sensor/templates/upload_file.html -->
<div style="text-align: center">
<h1>Upload File</h1>
<form enctype="multipart/form-data" method="post">
{% csrf_token %}
{% for field in form %}
<div style="margin-bottom: 10px">
{{ field.label_tag }}
{{ field }}
{% if field.help_text %}
<span class="question-mark" title="{{ field.help_text }}">?</span>
{% endif %}
</div>
{% endfor %}
<input type="submit" value="Save"></input>
</form>
</div>
# sensor/urls.py
from django.urls import path
from . import views
app_name = "sensor"
urlpatterns = [
path('', views.index, name="root"),
path('upload-file/', views.upload_file, name="upload-file"),
]
This requires someone clicking through this web application every time they want to add new data. If data is synced automatically from remote sensors to a file system somewhere, then why not setup automatic file uploads? For this we need an API.
An API (or Application Programming Interface) lets our web application accept file uploads from another program.
The django-rest-framework
library does a lot of heavy lifting here so let’s use it.
If you are not interested in APIs feel free to skip the API sections
If you have never used
django-rest-framework
consider first completing the official tutorial before continuing on here
Create an API home page
As suggested by Two Scoops of Django 3.x
let’s create core/api_urls.py
to wire up our API.
# core/urls.py
from django.contrib import admin
from django.shortcuts import redirect
from django.urls import include
from django.urls import path
urlpatterns = [
path('', lambda request: redirect('sensor:root')),
path('admin/', admin.site.urls),
path('api/', include('core.api_urls')),
path('sensor/', include('sensor.urls')),
]
# core/api_urls.py
from django.urls import include
from django.urls import path
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework.reverse import reverse
app_name = "api"
@api_view(['GET'])
def api_root(request, format=None):
return Response({
'sensors': reverse('api:sensor:api-root', request=request, format=format),
})
urlpatterns = [
path('', api_root, name="api-root"),
path('sensor/', include('sensor.api_urls'), name='sensor'),
]
# core/api_urls.py
from django.urls import include
from django.urls import path
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework.reverse import reverse
from .api import viewsets
app_name = "sensor"
@api_view(['GET'])
def api_root(request, format=None):
return Response({})
urlpatterns = [
path('', api_root, name="api-root"),
]
So now we can add new views and/or viewsets to sensor/api_urls.py
and they will be “connectable” via /api/sensor/
Handle file uploads via API
We can use a “viewset” to create an endpoint like /api/sensor/file/
to which another program can upload files:
# sensor/api/viewsets.py
from rest_framework import viewsets
from ..models import File
from .serializers import FileSerializer
class FileViewSet(viewsets.ReadOnlyModelViewSet):
queryset = File.objects.all()
serializer_class = FileSerializer
# sensor/api/serializers.py
from rest_framework import serializers
from ..models import File
class FileSerializer(serializers.ModelSerializer):
class Meta:
model = File
fields = ['__all__']
Create a Data Model for Readings
Let’s add a Reading
model to store readings:
# sensor/models.py
from django.db import models
# ....
class Reading(models.Model):
class Meta:
managed = False
file = models.ForeignKey(File, on_delete=models.RESTRICT)
timestamp = models.DateTimeField(blank=False, null=False, primary_key=True)
sensor_name = models.TextField(blank=False, null=False)
reading = models.TextField(blank=False, null=False)
This time we’re using timestamp
instead of the default id
field as a primary key, since row uniqueness can be defined by a composite of file
, timestamp
and sensor_name
if required.
Don’t we want to store readings in a TimescaleDB
Hypertable
1 to make them easier to work with? Django
won’t automatically create a Hypertable
(it wasn’t designed to) so we need to do so ourselves. Since we need to customise table creation ourselves rather than letting Django
do it we need to set managed
to False
.
Let’s create a “base” migration,
python manage.py makemigrations sensor --name "sensor_reading"
and manually edit the migration:
# sensor/migrations/0002_sensor_reading.py
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('sensor', '0001_initial')
]
operations = [
migrations.CreateModel(
name='Reading',
fields=[
('file', models.ForeignKey(default=None, on_delete=django.db.models.deletion.RESTRICT, to='sensor.file')),
('timestamp', models.DateTimeField(primary_key=True)),
('sensor_name', models.TextField()),
('reading', models.FloatField()),
],
options={
'managed': False,
},
),
migrations.RunSQL(
"""
CREATE TABLE sensor_reading (
file_id INTEGER NOT NULL REFERENCES sensor_file (id),
timestamp TIMESTAMP NOT NULL,
sensor_name TEXT NOT NULL,
reading FLOAT
);
SELECT create_hypertable('sensor_reading', 'timestamp');
""",
reverse_sql="""
DROP TABLE sensor_reading;
"""
),
]
Now we can roll out migrations,
python manage.py migrate
and connect to the database3 to inspect the newly created Hypertable
.
Import Files
Let’s imagine that all of our readings are stored nicely formatted in JSON
files, like:
[
{
"reading": 20.54,
"sensor_name": "M(m/s)",
"timestamp": "2015-12-22 00:00:00",
},
{
"reading": 211.0,
"sensor_name": "D(deg)",
"timestamp": "2015-12-22 00:00:00",
},
]
How do we import these readings?
We can add a method to File
to bring the JSON
file into Python
and create a new Reading
entry for each reading in the file:
# sensor/models.py
from datetime import datetime
from datetime import timezone
from itertools import islice
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.db import transaction
from .io import validate_datetime_fieldnames_in_lines
from .io import yield_readings_in_narrow_format
# ...
class File(models.Model):
# ...
def import_to_db(self):
# NOTE: assume uploaded file is JSON
with self.file.open(mode="rb") as f:
reading_objs = (
Reading(
file=self,
timestamp=r["timestamp"],
sensor_name=r["sensor_name"],
reading=r["reading"]
)
for r in json.load(f)
)
batch_size = 1_000
with transaction.atomic():
while True:
batch = list(islice(reading_objs, batch_size))
if not batch:
break
Reading.objects.bulk_create(batch, batch_size)
How do we call the import_to_db
method?
We can go about this a few different ways but perhaps the simplest is just to implement it directly in the views and viewsets so that it will be triggered on browser and API file uploads.
For Django
we can call it directly in our upload-file
view like:
if form.is_valid():
form.save()
form.instance.import_to_db()
And for django-rest-framework
we can override the perform_create
method.
class FileViewSet(viewsets.ModelViewSet):
# ...
def perform_create(self, serializer):
instance = serializer.save()
instance.import_to_db()
Import Files via Celery
What if each file contains a few gigabytes of readings? Won’t this take an age to process?
If you can’t guarantee that the sensor files are small enough that they can be processed quickly then you might need to offload file importing to a task queue.
A task queue works like a restaurant. The waiters add an order to the queue and the chefs pull orders from the queue when they have time to process it.
Celery
is a mature Python
task queue library and works well with Django
so let’s use it. It coordinates “waiters” and “chefs” using the above analogy by leveraging a database (or message broker), typically Redis
or RabbitMQ
.
A task queue can significantly improves performance here. It makes file uploads instant from the user’s perspective, since now file upload tasks are added to a queue rather than run immediately. It also enables parallel processing of files since task queue workers run in parallel to one another.
Celery
may not work well onWindows
, so consider tryingdramatiq
instead if this is a hard requirement
Why not use
Postgres
as a message broker? Thedjango-q
implements enables this via theDjango ORM
message broker. For small-scale applications this might be a better choice since it reduces system complexity
To setup Celery
, we can follow their official tutorial:
# core/celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
app = Celery('core')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# core/settings.py
# ...
CELERY_BROKER_URL = "redis://localhost:6379/0"
So now we can add tasks to sensor/tasks.py
like:
# sensor/tasks.py
from celery import shared_task
from .models import File
@shared_task
def import_to_db(file_id):
file_obj = File.objects.get(id=file_id)
file_obj.import_to_db()
And replace all calls to <file_obj>.import_to_db()
with tasks.import_to_db(file_obj)
, and this task won’t be run immediately but rather will be run by Celery
when it has availability to do so!
Next Steps?
If you really want to further eke out import performance you’ll have to go deeper and experiment with:
- Batch sizes: how many readings do you want to save at once?
- Compression:
TimescaleDB
really shines onceHypertables
are compressed since it reduces storage costs and faster analytics queries
Bonus
Import Messy Files
How do we convert files like
Lat=0 Lon=0 Hub-Height=160 Timezone=00.0 Terrain-Height=0.0
Computed at 100 m resolution
YYYYMMDD HHMM M(m/s) D(deg) SD(m/s) DSD(deg) Gust3s(m/s) T(C) PRE(hPa) RiNumber VertM(m/s)
20151222 0000 20.54 211.0 1.22 0.3 21.00 11.9 992.8 0.15 0.18
20151222 0010 21.02 212.2 2.55 0.6 21.35 11.8 992.7 0.29 -0.09
into
[
{
'reading': '20.54',
'sensor_name': 'M(m/s)',
'timestamp': datetime.datetime(2015, 12, 22, 0, 0),
},
{
'reading': '211.0',
'sensor_name': 'D(deg)',
'timestamp': datetime.datetime(2015, 12, 22, 0, 0),
},
# ...
)
so we can store them in the Reading
data model?
In this file YYYYMMDD
and HHMM
clearly represent the timestamp and so 20151222 0000
corresponds to datetime.datetime(2015, 12, 22, 0, 0)
. However, this may differ between sources.
One way to generalise the importer is to upload a FileType
specification alongside each file so we know how to standardise it.
We can create a new model FileType
and link it to File
like:
# sensor/models.py
from django.contrib.postgres.fields import ArrayField
from django.db import models
class FileType(models.Model):
name = models.TextField()
na_values = ArrayField(
base_field=models.CharField(max_length=10),
default=["NaN"],
help_text=textwrap.dedent(
r"""A list of strings to recognise as empty values.
Default: ["NaN"]
Note: "" is also included by default
Example: ["NAN", "-9999", "-9999.0"]
"""
),
)
delimiter = models.CharField(
max_length=5,
help_text=textwrap.dedent(
r"""The character used to separate fields in the file.
Default: ","
Examples: "," or ";" or "\s+" for whitespace or "\t" for tabs
"""
),
default=",",
)
datetime_fieldnames = ArrayField(
base_field=models.CharField(max_length=50),
default=["Tmstamp"],
help_text=textwrap.dedent(
r"""A list of datetime field names.
Examples:
1) Data has a single datetime field named "Tmstamp" which has values like
'2021-06-29 00:00:00.000': ["Tmstamp"]
2) Data has two datetime fields named "Date" and "Time" which have values
like '01.01.1999' and '00:00' respectively: ["Date","Time"]
"""
),
)
encoding = models.CharField(
max_length=25,
help_text=textwrap.dedent(
r"""The encoding of the file.
Default: "utf-8"
Examples: utf-8 or latin-1 or cp1252
"""
),
default="utf-8",
)
datetime_formats = ArrayField(
base_field=models.CharField(max_length=25),
help_text=textwrap.dedent(
r"""The datetime format of `datetime_columns`.
See https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes
for format codes
Default: "%Y-%m-%d %H:%M:%S"
Examples: "%Y-%m-%d %H:%M:%S" for "2021-03-01 00:00:00"
"""
),
default=[r"%Y-%m-%d %H:%M:%S"],
)
class File(models.Model):
# ...
type = models.ForeignKey(FileType, on_delete=models.RESTRICT)
# ...
Django
forms are smart enough to automatically render the upload-file
view with a type
field since we specified fields = "__all__"
in sensor/forms.py
.
django-rest-framework
viewsets will also include it thanks to fields = "__all__"
in sensor/api/serializers.py
. However, its default behaviour for foreign keys is not ideal. It expects to recieve a numeric id
for field type
, whereas it’s more intuitive to specify the name
field instead.
We can easily override this default behaviour by specifying SlugRelatedField
in our serializer:
# sensor/api/serializers.py
from rest_framework import serializers
from ..models import File
from ..models import FileType
class FileSerializer(serializers.ModelSerializer):
type = serializers.SlugRelatedField(
slug_field="name", queryset=FileType.objects.all()
)
class Meta:
model = File
fields = '__all__'
so we can create files by passing the endpoint a payload like:
{
"file": "file",
"type": "name-of-file-type",
}
Now we have all of the information we need to extract timeseries from files into our data model.
Validate Messy Files
What if a File
is created with an inappropriate FileType
? How do we catch this before it causes importing to fail?
We can implement a clean
method on File
! Django
will automatically call this method on running form.is_valid()
in our view, however, we’ll have to connect django-rest-framework
ourselves. We can just add a validate
method to our serializer to achieve the same behaviour.
# sensor/api/serializers.py
from rest_framework import serializers
from ..models import File
from ..models import FileType
class FileSerializer(serializers.ModelSerializer):
# ...
def validate(self, attrs):
instance = File(**attrs)
instance.clean()
return attrs
Now we can implement the clean
method to check file contents prior to saving a file:
# sensor/models.py
from django.db import models
from django.core.exceptions import ValidationError
from .io import validate_datetime_fieldnames_in_lines
# ...
class File(models.Model):
# ...
def clean(self):
if self.type is None:
raise ValidationError("File type must be specified!")
# NOTE: This file is automatically closed upon saving a model instance
# ... each time a file is read the file pointer must be reset to enable rereads
f = self.file.open(mode="rb")
# NOTE: automatically called by Django Forms and DRF Serializer Validate Method
validate_datetime_fieldnames_in_lines(
lines=f,
encoding=self.type.encoding,
delimiter=self.type.delimiter,
datetime_fieldnames=self.type.datetime_fieldnames,
)
self.file.seek(0)
# ...
# sensor/io.py
import re
import typing
from django.core.exceptions import ValidationError
def yield_split_lines(
lines: typing.Iterable[bytes],
encoding: str,
delimiter: str,
) -> typing.Iterator[typing.Tuple[typing.Any]]:
# Unescape strings `\\t` to `\t` for use in a regular expression
# https://stackoverflow.com/questions/1885181/how-to-un-escape-a-backslash-escaped-string
unescape_backslash = lambda s: (
s.encode('raw_unicode_escape').decode('unicode_escape')
)
split = lambda s: re.split(unescape_backslash(delimiter), s)
return (split(line.decode(encoding)) for line in iter(lines))
def validate_datetime_fieldnames_in_lines(
lines: typing.Iterable[bytes],
encoding: str,
delimiter: str,
datetime_fieldnames: typing.Iterable[str],
) -> None:
split_lines = yield_split_lines(lines=lines, encoding=encoding, delimiter=delimiter)
fieldnames = None
for line in split_lines:
if set(datetime_fieldnames).issubset(set(line)):
fieldnames = line
break
if fieldnames == None:
raise ValidationError(f"No `datetime_fieldnames` {datetime_fieldnames} found!")
Import and Validate Messy Files
Now we have everything we need to import files:
# sensor/models.py
from datetime import datetime
from datetime import timezone
from itertools import islice
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.db import transaction
from .io import validate_datetime_fieldnames_in_lines
from .io import yield_readings_in_narrow_format
# ...
class File(models.Model):
# ...
def import_to_db(self):
with self.file.open(mode="rb") as f:
reading_objs = (
Reading(
file=self,
timestamp=r["timestamp"],
sensor_name=r["sensor_name"],
reading=r["reading"]
)
for r in yield_readings_in_narrow_format(
lines=f,
encoding=self.type.encoding,
delimiter=self.type.delimiter,
datetime_fieldnames=self.type.datetime_fieldnames,
datetime_formats=self.type.datetime_formats,
)
)
batch_size = 1_000
try:
with transaction.atomic():
while True:
batch = list(islice(reading_objs, batch_size))
if not batch:
break
Reading.objects.bulk_create(batch, batch_size)
except Exception as e:
self.parsed_at = None
self.parse_error = str(e)
self.save()
raise e
else:
self.parsed_at = datetime.now(timezone.utc)
self.parse_error = None
self.save()
# sensor/io.py
from collections import OrderedDict
from datetime import datetime
import re
import typing
from django.core.exceptions import ValidationError
# ...
def yield_readings_in_narrow_format(
lines: typing.Iterable[bytes],
encoding: str,
delimiter: str,
datetime_fieldnames: typing.Iterable[str],
datetime_formats: typing.Iterable[str],
) -> typing.Iterator[typing.Tuple[typing.Any]]:
"""
https://en.wikipedia.org/wiki/Wide_and_narrow_data
"""
split_lines = yield_split_lines(lines=lines, encoding=encoding, delimiter=delimiter)
fieldnames = None
for line in split_lines:
if set(datetime_fieldnames).issubset(set(line)):
fieldnames = line
break
if fieldnames == None:
raise ValidationError(f"No `datetime_fieldnames` {datetime_fieldnames} found!")
# NOTE: `split_lines` is an iterator so prior loop exhausts the header lines
for line in split_lines:
fields = OrderedDict([(f, v) for f, v in zip(fieldnames, line)])
readings = OrderedDict(
[(f, v) for f, v in fields.items() if f not in datetime_fieldnames]
)
timestamp_strs = [fields[k] for k in datetime_fieldnames]
timestamp_str = " ".join(
str(item) for item in timestamp_strs if item is not None
)
for datetime_format in datetime_formats:
try:
timestamp = datetime.strptime(timestamp_str, datetime_format)
except ValueError:
pass
else:
for sensor, reading in readings.items():
yield {
"timestamp": timestamp,
"sensor_name": sensor,
"reading": reading,
}
Once again, let’s adapt the views and viewsets to call the import_to_db
method.
For Django
we can call it directly in our upload-file
view like
if form.is_valid():
form.save()
form.instance.import_to_db()
And for django-rest-framework
we can override the perform_create
method:
class FileViewSet(viewsets.ModelViewSet):
# ...
def perform_create(self, serializer):
instance = serializer.save()
instance.import_to_db()
Footnotes
-
Don’t know what TimescaleDB is? Read this article. ↩ ↩2
-
A database migration is a set of instructions which specify updates to a database. Since the database only understands
SQL
changes tomodels.py
must be translated toSQL
and rolled out via migrations in order to persist them ↩ -
I use
DBeaver
, one could also just usepsql
which ships withPostgres
↩