summaryrefslogtreecommitdiff
path: root/content/posts/how-to-create-a-celery-task-that-fills-out-fields-using-django.md
blob: 627f4a346314fc62c2ae90b67fee60bdbd94e8d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
+++
title = "How to create a celery task that fills out fields using Django"
author = ["Roger Gonzalez"]
date = 2020-11-29T15:48:48-03:00
lastmod = 2022-12-29T09:34:16-03:00
tags = ["python", "celery", "django", "docker", "", "dockercompose"]
categories = ["programming"]
draft = false
weight = 2002
+++

Hi everyone!

It's been way too long, I know. In this oportunity, I wanted to talk about
asynchronicity in Django, but first, lets set up the stage:

Imagine you are working in a library and you have to develop an app that allows
users to register new books using a barcode scanner. The system has to read the
ISBN code and use an external resource to fill in the information (title, pages,
authors, etc.). You don't need the complete book information to continue, so the
external resource can't hold the request.

**How can you process the external request asynchronously?** 🤔

For that, we need Celery.


## What is Celery? {#what-is-celery}

[Celery](https://docs.celeryproject.org/en/stable/) is a "distributed task queue". Fron their website:

> Celery is a simple, flexible, and reliable distributed system to process vast
amounts of messages, while providing operations with the tools required to
maintain such a system.

So Celery can get messages from external processes via a broker (like [Redis](https://redis.io/)),
and process them.

The best thing is: Django can connect to Celery very easily, and Celery can
access Django models without any problem. Sweet!


## Lets code! {#lets-code}

Let's assume our project structure is the following:

```nil
- app/
  - manage.py
  - app/
    - __init__.py
    - settings.py
    - urls.py
```


### Celery {#celery}

First, we need to set up Celery in Django. Thankfully, [Celery has an excellent
documentation](https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django), but the entire process can be summarized to this:

In `app/app/celery.py`:

```python
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings")

app = Celery("app")

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    """A debug celery task"""
    print(f"Request: {self.request!r}")
```

What's going on here?

-   First, we set the `DJANGO_SETTINGS_MODULE` environment variable
-   Then, we instantiate our Celery app using the `app` variable.
-   Then, we tell Celery to look for celery configurations in the Django settings
    with the `CELERY` prefix. We will see this later in the post.
-   Finally, we start Celery's `autodiscover_tasks`. Celery is now going to look for
    `tasks.py` files in the Django apps.

In `/app/app/__init__.py`:

```python
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)
```

Finally in `/app/app/settings.py`:

```python
...
# Celery
CELERY_BROKER_URL = env.str("CELERY_BROKER_URL")
CELERY_TIMEZONE = env.str("CELERY_TIMEZONE", "America/Montevideo")
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "django-cache"
...
```

Here, we can see that the `CELERY` prefix is used for all Celery configurations,
because on `celery.py` we told Celery the prefix was `CELERY`

With this, Celery is fully configured. 🎉


### Django {#django}

First, let's create a `core` app. This is going to be used for everything common
in the app

```bash
$ python manage.py startapp core
```

On `core/models.py`, lets set the following models:

```python
"""
Models
"""
import uuid

from django.db import models


class TimeStampMixin(models.Model):
    """
    A base model that all the other models inherit from.
    This is to add created_at and updated_at to every model.
    """

    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        """Setting up the abstract model class"""

        abstract = True


class BaseAttributesModel(TimeStampMixin):
    """
    A base model that sets up all the attibutes models
    """

    name = models.CharField(max_length=255)
    outside_url = models.URLField()

    def __str__(self):
        return self.name

    class Meta:
        abstract = True
```

Then, let's create a new app for our books:

```bash
python manage.py startapp books
```

And on `books/models.py`, let's create the following models:

```python
"""
Books models
"""
from django.db import models

from core.models import TimeStampMixin, BaseAttributesModel


class Author(BaseAttributesModel):
    """Defines the Author model"""


class People(BaseAttributesModel):
    """Defines the People model"""


class Subject(BaseAttributesModel):
    """Defines the Subject model"""


class Book(TimeStampMixin):
    """Defines the Book model"""

    isbn = models.CharField(max_length=13, unique=True)
    title = models.CharField(max_length=255, blank=True, null=True)
    pages = models.IntegerField(default=0)
    publish_date = models.CharField(max_length=255, blank=True, null=True)
    outside_id = models.CharField(max_length=255, blank=True, null=True)
    outside_url = models.URLField(blank=True, null=True)
    author = models.ManyToManyField(Author, related_name="books")
    person = models.ManyToManyField(People, related_name="books")
    subject = models.ManyToManyField(Subject, related_name="books")

    def __str__(self):
        return f"{self.title} - {self.isbn}"
```

`Author`, `People`, and `Subject` are all `BaseAttributesModel`, so their fields
come from the class we defined on `core/models.py`.

For `Book` we add all the fields we need, plus a `many_to_many` with Author,
People and Subjects. Because:

-   _Books can have many authors, and many authors can have many books_

Example: [27 Books by Multiple Authors That Prove the More, the Merrier](https://www.epicreads.com/blog/ya-books-multiple-authors/)

-   _Books can have many persons, and many persons can have many books_

Example: Ron Weasley is in several _Harry Potter_ books

-   _Books can have many subjects, and many subjects can have many books_

Example: A book can be a _comedy_, _fiction_, and _mystery_ at the same time

Let's create `books/serializers.py`:

```python
"""
Serializers for the Books
"""
from django.db.utils import IntegrityError
from rest_framework import serializers

from books.models import Book, Author, People, Subject
from books.tasks import get_books_information


class AuthorInBookSerializer(serializers.ModelSerializer):
    """Serializer for the Author objects inside Book"""

    class Meta:
        model = Author
        fields = ("id", "name")


class PeopleInBookSerializer(serializers.ModelSerializer):
    """Serializer for the People objects inside Book"""

    class Meta:
        model = People
        fields = ("id", "name")


class SubjectInBookSerializer(serializers.ModelSerializer):
    """Serializer for the Subject objects inside Book"""

    class Meta:
        model = Subject
        fields = ("id", "name")


class BookSerializer(serializers.ModelSerializer):
    """Serializer for the Book objects"""

    author = AuthorInBookSerializer(many=True, read_only=True)
    person = PeopleInBookSerializer(many=True, read_only=True)
    subject = SubjectInBookSerializer(many=True, read_only=True)

    class Meta:
        model = Book
        fields = "__all__"


class BulkBookSerializer(serializers.Serializer):
    """Serializer for bulk book creating"""

    isbn = serializers.ListField()

    def create(self, validated_data):
        return_dict = {"isbn": []}
        for isbn in validated_data["isbn"]:
            try:
                Book.objects.create(isbn=isbn)
                return_dict["isbn"].append(isbn)
            except IntegrityError as error:
                pass

        return return_dict

    def update(self, instance, validated_data):
        """The update method needs to be overwritten on
        serializers.Serializer. Since we don't need it, let's just
        pass it"""
        pass


class BaseAttributesSerializer(serializers.ModelSerializer):
    """A base serializer for the attributes objects"""

    books = BookSerializer(many=True, read_only=True)


class AuthorSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = Author
        fields = ("id", "name", "outside_url", "books")


class PeopleSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = People
        fields = ("id", "name", "outside_url", "books")


class SubjectSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = Subject
        fields = ("id", "name", "outside_url", "books")
```

The most important serializer here is `BulkBookSerializer`. It's going to get an
ISBN list and then bulk create them in the DB.

On `books/views.py`, we can set the following views:

```python
"""
Views for the Books
"""
from rest_framework import viewsets, mixins, generics
from rest_framework.permissions import AllowAny

from books.models import Book, Author, People, Subject
from books.serializers import (
    BookSerializer,
    BulkBookSerializer,
    AuthorSerializer,
    PeopleSerializer,
    SubjectSerializer,
)


class BookViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Books and retrieve books by ID
    """

    permission_classes = (AllowAny,)
    queryset = Book.objects.all()
    serializer_class = BookSerializer


class AuthorViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Authors and retrieve authors by ID
    """

    permission_classes = (AllowAny,)
    queryset = Author.objects.all()
    serializer_class = AuthorSerializer


class PeopleViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list People and retrieve people by ID
    """

    permission_classes = (AllowAny,)
    queryset = People.objects.all()
    serializer_class = PeopleSerializer


class SubjectViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Subject and retrieve subject by ID
    """

    permission_classes = (AllowAny,)
    queryset = Subject.objects.all()
    serializer_class = SubjectSerializer


class BulkCreateBook(generics.CreateAPIView):
    """A view to bulk create books"""

    permission_classes = (AllowAny,)
    queryset = Book.objects.all()
    serializer_class = BulkBookSerializer
```

Easy enough, endpoints for getting books, authors, people and subjects and an
endpoint to post ISBN codes in a list.

We can check swagger to see all the endpoints created:

{{< figure src="/2020-11-29-115634.png" >}}

Now, **how are we going to get all the data?** 🤔


## Creating a Celery task {#creating-a-celery-task}

Now that we have our project structure done, we need to create the asynchronous
task Celery is going to run to populate our fields.

To get the information, we are going to use the [OpenLibrary API](https://openlibrary.org/dev/docs/api/books%22%22%22).

First, we need to create `books/tasks.py`:

```python
"""
Celery tasks
"""
import requests
from celery import shared_task

from books.models import Book, Author, People, Subject


def get_book_info(isbn):
    """Gets a book information by using its ISBN.
    More info here https://openlibrary.org/dev/docs/api/books"""
    return requests.get(
        f"https://openlibrary.org/api/books?jscmd=data&format=json&bibkeys=ISBN:{isbn}"
    ).json()


def generate_many_to_many(model, iterable):
    """Generates the many to many relationships to books"""
    return_items = []
    for item in iterable:
        relation = model.objects.get_or_create(
            name=item["name"], outside_url=item["url"]
        )
        return_items.append(relation)
    return return_items


@shared_task
def get_books_information(isbn):
    """Gets a book information"""

    # First, we get the book information by its isbn
    book_info = get_book_info(isbn)

    if len(book_info) > 0:
        # Then, we need to access the json itself. Since the first key is dynamic,
        # we get it by accessing the json keys
        key = list(book_info.keys())[0]
        book_info = book_info[key]

        # Since the book was created on the Serializer, we get the book to edit
        book = Book.objects.get(isbn=isbn)

        # Set the fields we want from the API into the Book
        book.title = book_info["title"]
        book.publish_date = book_info["publish_date"]
        book.outside_id = book_info["key"]
        book.outside_url = book_info["url"]

        # For the optional fields, we try to get them first
        try:
            book.pages = book_info["number_of_pages"]
        except:
            book.pages = 0

        try:
            authors = book_info["authors"]
        except:
            authors = []

        try:
            people = book_info["subject_people"]
        except:
            people = []

        try:
            subjects = book_info["subjects"]
        except:
            subjects = []

        # And generate the appropiate many_to_many relationships
        authors_info = generate_many_to_many(Author, authors)
        people_info = generate_many_to_many(People, people)
        subjects_info = generate_many_to_many(Subject, subjects)

        # Once the relationships are generated, we save them in the book instance
        for author in authors_info:
            book.author.add(author[0])

        for person in people_info:
            book.person.add(person[0])

        for subject in subjects_info:
            book.subject.add(subject[0])

        # Finally, we save the Book
        book.save()

    else:
        raise ValueError("Book not found")
```

So when are we going to run this task? We need to run it in the **serializer**.

On `books/serializers.py`:

```python
from books.tasks import get_books_information
...
class BulkBookSerializer(serializers.Serializer):
    """Serializer for bulk book creating"""

    isbn = serializers.ListField()

    def create(self, validated_data):
        return_dict = {"isbn": []}
        for isbn in validated_data["isbn"]:
            try:
                Book.objects.create(isbn=isbn)
                # We need to add this line
                get_books_information.delay(isbn)
                #################################
                return_dict["isbn"].append(isbn)
            except IntegrityError as error:
                pass

        return return_dict

    def update(self, instance, validated_data):
        pass
```

To trigger the Celery tasks, we need to call our function with the `delay`
function, which has been added by the `shared_task` decorator. This tells Celery
to start running the task in the background since we don't need the result
right now.


## Docker configuration {#docker-configuration}

There are a lot of moving parts we need for this to work, so I created a
`docker-compose` configuration to help with the stack. I'm using the package
[django-environ](https://github.com/joke2k/django-environ) to handle all environment variables.

On `docker-compose.yml`:

```yaml
version: "3.7"

x-common-variables: &common-variables
  DJANGO_SETTINGS_MODULE: "app.settings"
  CELERY_BROKER_URL: "redis://redis:6379"
  DEFAULT_DATABASE: "psql://postgres:postgres@db:5432/app"
  DEBUG: "True"
  ALLOWED_HOSTS: "*,test"
  SECRET_KEY: "this-is-a-secret-key-shhhhh"

services:
  app:
    build:
      context: .
    volumes:
      - ./app:/app
    environment:
      <<: *common-variables
    ports:
      - 8000:8000
    command: >
      sh -c "python manage.py migrate &&
             python manage.py runserver 0.0.0.0:8000"
    depends_on:
      - db
      - redis

  celery-worker:
    build:
      context: .
    volumes:
      - ./app:/app
    environment:
      <<: *common-variables
    command: celery --app app worker -l info
    depends_on:
      - db
      - redis

  db:
    image: postgres:12.4-alpine
    environment:
      - POSTGRES_DB=app
      - POSRGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

  redis:
    image: redis:6.0.8-alpine
```

This is going to set our app, DB, Redis, and most importantly our celery-worker
instance. To run Celery, we need to execute:

```bash
$ celery --app app worker -l info
```

So we are going to run that command on a separate docker instance


## Testing it out {#testing-it-out}

If we run

```bash
$ docker-compose up
```

on our project root folder, the project should come up as usual. You should be
able to open <http://localhost:8000/admin> and enter the admin panel.

To test the app, you can use a curl command from the terminal:

```bash
curl -X POST "http://localhost:8000/books/bulk-create" -H  "accept: application/json" \
    -H  "Content-Type: application/json" -d "{  \"isbn\": [ \"9780345418913\", \
    \"9780451524935\", \"9780451526342\", \"9781101990322\", \"9780143133438\"   ]}"
```

{{< figure src="/2020-11-29-124654.png" >}}

This call lasted 147ms, according to my terminal.

This should return instantly, creating 15 new books and 15 new Celery tasks, one
for each book. You can also see tasks results in the Django admin using the
`django-celery-results` package, check its [documentation](https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend).

{{< figure src="/2020-11-29-124734.png" >}}

Celery tasks list, using `django-celery-results`

{{< figure src="/2020-11-29-124751.png" >}}

Created and processed books list

{{< figure src="/2020-11-29-124813.png" >}}

Single book information

{{< figure src="/2020-11-29-124834.png" >}}

People in books

{{< figure src="/2020-11-29-124851.png" >}}

Authors

{{< figure src="/2020-11-29-124906.png" >}}

Themes

And also, you can interact with the endpoints to search by author, theme,
people, and book. This should change depending on how you created your URLs.


## That's it! {#that-s-it}

This surely was a **LONG** one, but it has been a very good one in my opinion.
I've used Celery in the past for multiple things, from sending emails in the
background to triggering scraping jobs and [running scheduled tasks](https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#using-custom-scheduler-classes) (like a [unix
cronjob](https://en.wikipedia.org/wiki/Cron))

You can check the complete project in my GitLab here: <https://gitlab.com/rogs/books-app>

If you have any doubts, let me know! I always answer emails and/or messages.