# Task Queues

Task (or job) queues are an architecture of dissecting and streamlining processes with flexibilities in scheduling and concurrency of job executions.

Sounds familiar? In fact, from [Part 14 - Distributed Computation](../14-distributed-computation/notebook.ipynb), we have already seen both Spark and Dask examples that leverage such a mechanism. Though they were a bit more abstracted away from us thus feel more implicit.

## News Headline Parsing

The subject for this part is the more pronounced and explicit task queue usages. But before that, let's start with a simple use-case where we build a script to parse news headlines from some websites without a task queue yet:

In [31]:
%%time

from tqdm import tqdm
import requests


def fetch(domain):
    r = requests.get(f'https://{domain}')
    return r.text

sites = [
    'bbc.com',
    'theguardian.com',
    'washingtonpost.com',
    'foxnews.com',
    'wsj.com',
]
data = {}
for domain in tqdm(sites, ncols=100):
    data[domain] = fetch(domain)

100%|█████████████████████████████████████████████████████████████████| 5/5 [00:11<00:00,  2.30s/it]

CPU times: user 190 ms, sys: 31 ms, total: 221 ms
Wall time: 11.5 s





The iterative approach is logical yet inefficient.

We will proceed to leverage an HTML parser to parse the retrieved source text from various websites.

In [33]:
from bs4 import BeautifulSoup

soup = BeautifulSoup(data['bbc.com'], 'html.parser')
print(soup.find('body').find('header').prettify()[:400])

<header aria-label="BBC" id="orb-banner" role="banner">
 <div class="orb-nav-pri orb-nav-pri-white orb-nav-empty" dir="ltr" id="orb-header">
  <div class="orb-nav-pri-container b-r b-g-p">
   <div class="orb-nav-section orb-nav-blocks">
    <a href="https://www.bbc.co.uk">
     Homepage
    </a>
   </div>
   <section>
    <div class="orb-skip-links">
     <h2>
      Accessibility links
     </h2>



[Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/bs4/doc/) is an HTML/XML parsing library for Python. It builds on top of the Python built-in [XML processing modules](https://docs.python.org/3/library/xml.html) with a touch of user-friendliness to navigate and extract needed information with relative ease.

In [3]:
titles = []

for tag in soup.find_all('a', {'class': True}):
    if 'media__link' in tag['class']:
        titles.append({
            'text': tag.text.strip(),
            'link': tag['href'] or tag.parent['href'],
        })

titles[:10]

[{'text': 'Belarus athlete told by grandmother not to return',
  'link': '/news/world-europe-58104195'},
 {'text': 'What is Fox host Tucker Carlson doing in Hungary?',
  'link': 'https://www.bbc.com/news/world-europe-58104200'},
 {'text': 'Messi will not stay at Barca say club',
  'link': 'https://www.bbc.com/sport/football/58108298'},
 {'text': "The most influential band you've never heard of",
  'link': 'https://www.bbc.com/culture/article/20210804-sparks-the-greatest-band-youve-never-heard-of'},
 {'text': 'Why reverse ageism is worse than ever',
  'link': 'https://www.bbc.com/worklife/article/20210730-the-acute-ageism-problem-hurting-young-workers'},
 {'text': 'Ethiopian rebels take Unesco world heritage town',
  'link': '/news/world-africa-58101912'},
 {'text': 'Can you party and stay safe from Delta?',
  'link': '/news/world-us-canada-58080853'},
 {'text': 'Chronic illness influencers accused of faking it',
  'link': '/news/stories-58093455'},
 {'text': 'Man City sign £100m Greali

Beautiful Soup can also find relevant nodes and information through custom HTML tag-centric functions:

In [4]:
def bbc(tag):
    return tag.name == 'a' and 'media__link' in tag.get('class', []) and tag.get('href', '').startswith('/') and tag.text.strip()

def guardian(tag):
    return tag.name == 'a' and tag.get('data-link-name') == 'article' and tag.text.strip()

def wp(tag):
    return tag.name == 'span' and tag.parent.name == 'a' and tag.text.strip()

def fox(tag):
    return tag.name == 'a' and tag.parent.name == 'h2' and 'title' in tag.parent.get('class') and tag.text.strip()

def wsj(tag):
    return any(['headline' in cls for cls in tag.get('class', [])]) and tag.text.strip()

In [5]:
titles = []

soup = BeautifulSoup(data['foxnews.com'], 'html.parser')
for tag in soup.find_all(fox):
    titles.append({
        'text': tag.text.strip(),
        'link': tag['href'] or tag.parent['href'],
    })

titles[:10]

[{'text': "Tom Homan sounds alarm on border crisis: Biden admin needs 'wake the hell up'",
  'link': 'https://video.foxnews.com/v/6266695144001/'},
 {'text': 'Former CDC Director: Existing vaccines are working again Delta',
  'link': 'https://video.foxnews.com/v/6266693467001/'},
 {'text': 'Russ Vought explains alleged ties between Biden admin and pro-CRT group',
  'link': 'https://video.foxnews.com/v/6266694247001/'},
 {'text': 'Gutfeld: Media not bothering to address unvaccinated minority communities',
  'link': 'https://video.foxnews.com/v/6266598477001/'},
 {'text': "Lt. Sutton: America isn't seeing a 'spike in violent crime,' it's a 'tsunami'",
  'link': 'https://video.foxnews.com/v/6266599724001/'},
 {'text': 'Ingraham: A moratorium on the Constitution',
  'link': 'https://video.foxnews.com/v/6266597187001/'},
 {'text': 'Sean Hannity calls out Biden over COVID spike at border',
  'link': 'https://video.foxnews.com/v/6266588538001/'},
 {'text': 'Stocks open in the green during hig

We can reasonably put things together from end-to-end:

In [6]:
%%time

filter_rules = {
    'bbc.com': bbc,
    'theguardian.com': guardian,
    'washingtonpost.com': wp,
    'foxnews.com': fox,
    'wsj.com': wsj,
}
titles = []

for domain in tqdm(sites, ncols=100):
    text = fetch(domain)
    soup = BeautifulSoup(text, 'html.parser')
    for tag in soup.find_all(filter_rules[domain]):
        titles.append({
            'text': tag.text.strip(),
            'link': tag.get('href') or tag.parent.get('href'),
            'domain': domain,
        })

100%|█████████████████████████████████████████████████████████████████| 5/5 [00:12<00:00,  2.47s/it]

CPU times: user 854 ms, sys: 42.6 ms, total: 897 ms
Wall time: 12.4 s





In [7]:
from random import sample

sample(titles, 10)

[{'text': 'Wildlife  Meet the tuatara, ‘living fossil’ with fastest sperm in reptile world',
  'link': 'https://www.theguardian.com/world/2021/aug/05/meet-the-tuatara-the-sluggish-living-fossil-with-the-fastest-sperm-in-the-reptile-world',
  'domain': 'theguardian.com'},
 {'text': 'Bill Gates  Billionaire says meeting with Jeffrey Epstein was ‘a huge mistake’',
  'link': 'https://www.theguardian.com/us-news/2021/aug/05/bill-gates-jeffrey-epstein-meeting-huge-mistake',
  'domain': 'theguardian.com'},
 {'text': "Woodland hermit's cabin fire leads to state investigation",
  'link': '//www.foxnews.com/us/new-hampshire-hermits-cabin-fire-investigation',
  'domain': 'foxnews.com'},
 {'text': 'Thatcherism is the big Tory scam that still distorts our politics',
  'link': 'https://www.theguardian.com/commentisfree/2021/aug/05/boris-johnson-thatcherism-state-taxes-high-earners-poor',
  'domain': 'theguardian.com'},
 {'text': "Don't miss out: Average mortgage refinance rates hold at 180-day low |

## Task Queue Concepts

### Workers

The workers carry out unit tasks of an end-to-end process. The tasks are dissected in a way that can run in isolation and incremental steps.

In general, granular tasks are easier to implement, test, and in the context of a task queue, also more accessible to parallelize and maximize computing resources.

### Queue

The queue holds enqueued or scheduled tasks from arbitrary processes that do not require blocking (synchronous) execution. At the same time, workers spawned through other parallel computational means, such as processes, threads, or coroutines, dequeue and execute the tasks from the queue on a schedule or when there are available computing resources.

### Communication protocol and broker

Typical task queue design does not directly spawn processes (and subprocesses) to manage the queue data structure and tasks' operation (schedule and execution). It would be a struggle when dealing with the uncertainty of the complexity and scale of tasks and results.

Instead, most task queues rely on a relatively agnostic communication broker to manage the queue data structure and a programming language agnostic serialization protocol (such as JSON) to transmit the task definitions and results through the queue. Some data stores are more suitable for tasks queues, categorized as _Message Queues_, specializing in inter-process communications.

Such design also allows extensions, such as queue monitoring and scaling beyond a single machine, since most message queues provide standalone access and transmission across networks.

![task-queue](https://user-images.githubusercontent.com/2837532/126832454-82a4a8e9-34a0-4ccc-9c39-83248a32be16.png)

## Adapt Task Queue

Let's extend the use-case and adapt a task queue to our advantage.

From here, we will leverage another library called [Celery](https://docs.celeryproject.org/en/stable/index.html), a distributed task queue implementation. The Celery library handles the communication protocol and brokerage aspect of a task queue. It provides mature support for various queue backend and a simple interface for worker tasks definition and usage.

In this part, we will use [Redis](https://redis.io/), an in-memory data store, as the backend for both the broker and results from worker tasks.

Also, due to the nature of the task queue implementation, all task definitions would need to reside in standalone modules instead of notebook inline scripts. All task definitions can be found in [extract_tasks.py](extract_tasks.py).

In [10]:
# the tasks reside in its own module
import extract_tasks as tasks

First, let's try a simple task that has a bare implementation as:

```python
@queue.task
def add(a, b):
    return a + b
```

Note the `@queue.task` on top of the function definition, a pattern known as a [decorator](https://wiki.python.org/moin/PythonDecorators), which is a technique that alters the default behaviour of a code block (in this case, function), without the need to change the original code block directly.

The `@queue.task` is a decorator enabled by creating a Celery queue (or application) instance:

```python
from celery import Celery

queue = Celery(
    'news',  # name of the queue
    broker='redis://localhost:6379/0',  # queue message broker backend
    result_backend='redis://localhost:6379/1',  # task result backend
)
```

By itself, the `add()` function would behave as expected:

In [11]:
tasks.add(5, 6)

11

However, the decorator also grants it additional interfaces to behave like a worker task:

In [12]:
r = tasks.add.delay(5, 6)
r

<AsyncResult: 678c5b92-3f91-4f41-861a-59dd0d4b358e>

In [13]:
r.status

'PENDING'

At first, it may seem counter-intuitive to delay a task and have it in pending status after the invocation. The design is intended for the task queue to dispatch the particular task to an available worker to run asynchronously, thus not blocking the "main" process (in this case, the notebook we are in).

At the moment, we still don't have any workers to take on the task. Therefore it would remain in the pending status:

In [14]:
r.status

'PENDING'

As mentioned before, the Celery project covers that part for us. All we have to do is to spawn a worker pool using its CLI (command-line-interface):

```shell
% celery --app extract_tasks worker --loglevel=INFO
```

We supply the command with the same module that holds the queue (application) and all the task definitions and a log-level at INFO (practically everything) to monitor all the details.

As soon as we run the command, we would get something along the line of the following:

```shell
[2021-08-05 15:50:06,700: INFO/MainProcess] Task extract_tasks.add[678c5b92-3f91-4f41-861a-59dd0d4b358e] received
[2021-08-05 15:50:06,724: INFO/ForkPoolWorker-1] Task extract_tasks.add[678c5b92-3f91-4f41-861a-59dd0d4b358e] succeeded in 0.01997902699999976s: 11
```

The worker `MainProcess` receives tasks from anywhere that invokes them, such as this notebook instance. Then an available `Worker-N` of a specific type (in this case, a multi-processing "forked" process) would take on the task and execute it. The information log indicates that the task succeeded, with a returned result of `11`.

In this case, it would be somewhat meaningless if we cannot get the intended result. That's what the result backend serves its purpose, and Celery provides a straightforward interface for us to obtain it:

In [15]:
r.status

'SUCCESS'

In [16]:
r.get()

11

Let's incorporate the task version of the `fetch` function through the task queue:

In [17]:
%%time

sites = [
    'bbc.com',
    'theguardian.com',
    'washingtonpost.com',
    'foxnews.com',
    'wsj.com',
]
data = {}
for domain in tqdm(sites, ncols=100):
    data[domain] = tasks.fetch_website_task.delay(domain)

data

100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 222.83it/s]

CPU times: user 11.5 ms, sys: 4.28 ms, total: 15.8 ms
Wall time: 26.5 ms





{'bbc.com': <AsyncResult: ba871623-b104-4c17-9b00-bf8b9660b252>,
 'theguardian.com': <AsyncResult: dab084d7-c483-4220-a8ff-0c5492e243de>,
 'washingtonpost.com': <AsyncResult: 2f81e454-b7da-4fa6-93e3-7eb498a39f22>,
 'foxnews.com': <AsyncResult: ca14d847-7a37-4144-ac2f-e47ae2520251>,
 'wsj.com': <AsyncResult: 39f019b7-3b21-4068-8236-0a7d095cf026>}

Naturally, unlike the iterative and blocking version from the beginning, the above snippet merely enqueues the tasks and responds with the `AsyncResult` object, as seen with the `add` task case, which means we can query the status and potentially get its result on success:

In [18]:
import time
from pprint import pprint

while data['wsj.com'].status == 'PENDING':
    time.sleep(0.01)

print(data['wsj.com'].get()['raw'][:300])


  <!doctype html><!--GRAND CANYON PREBID -->
  <html lang=en>
    <head>
      <meta charSet='utf-8' />
      <meta name="description" content="We can’t find the page you are looking for."/>
    <meta name="keywords" content="" />
    <meta name="page.section" content="Error" />
    <meta name="pag


From above, we use a `while` loop with a condition on `.status == 'PENDING'` to emulate the synchronous and blocking operation. After observing the task has succeeded, we then obtain the result using the `.get()` method. We can verify this by observing the worker logs:

```shell
[2021-08-05 17:12:39,554: INFO/MainProcess] Task extract_tasks.fetch_website_task[ba871623-b104-4c17-9b00-bf8b9660b252] received
[2021-08-05 17:12:39,556: INFO/MainProcess] Task extract_tasks.fetch_website_task[dab084d7-c483-4220-a8ff-0c5492e243de] received
[2021-08-05 17:12:39,558: INFO/MainProcess] Task extract_tasks.fetch_website_task[2f81e454-b7da-4fa6-93e3-7eb498a39f22] received
[2021-08-05 17:12:39,562: INFO/MainProcess] Task extract_tasks.fetch_website_task[ca14d847-7a37-4144-ac2f-e47ae2520251] received
[2021-08-05 17:12:39,575: INFO/MainProcess] Task extract_tasks.fetch_website_task[39f019b7-3b21-4068-8236-0a7d095cf026] received
[2021-08-05 17:12:40,356: INFO/ForkPoolWorker-8] Task extract_tasks.fetch_website_task[ba871623-b104-4c17-9b00-bf8b9660b252] succeeded in 0.7991691030000005s: <...>
[2021-08-05 17:12:40,510: INFO/ForkPoolWorker-2] Task extract_tasks.fetch_website_task[ca14d847-7a37-4144-ac2f-e47ae2520251] succeeded in 0.9463052399999974s: <...>
[2021-08-05 17:12:40,880: INFO/ForkPoolWorker-10] Task extract_tasks.fetch_website_task[39f019b7-3b21-4068-8236-0a7d095cf026] succeeded in 1.3033900759999995s: <...>
[2021-08-05 17:12:40,919: INFO/ForkPoolWorker-1] Task extract_tasks.fetch_website_task[dab084d7-c483-4220-a8ff-0c5492e243de] succeeded in 1.361990458000001s: <...>
[2021-08-05 17:12:50,535: INFO/ForkPoolWorker-9] Task extract_tasks.fetch_website_task[2f81e454-b7da-4fa6-93e3-7eb498a39f22] succeeded in 10.970096680999998s: <...>
```

A few observations:
1. The task IDs correlate precisely like the ones after the `AsyncResult` object. The receiving order of tasks follows precisely as the for loop iteration dictated.
2. There are differently numbered workers that took on the tasks.
3. Both workers and tasks are paired out-of-order, naturally so are the completions.

## Pipelining

Task queue implementations sometimes come with pipelining (or chaining) capability to arrange unit tasks as a pipeline with a specific logical order for data to flow through them. The scheduling mechanism manages available workers to take on unit tasks as they become available (as demonstrated as an out-of-order pairing between workers and tasks) while maintaining data flow between unit tasks in a given pipeline.

Let's compose such a pipeline chain using the unit tasks from [extract_tasks.py](extract_tasks.py) to complete the workflow to extract headline title text and links as we have done before without the task queue:

In [22]:
def pipe(domain):
    chain = tasks.fetch_website_task.s(domain) | tasks.extract_titles.s()
    return chain()

The `.s()` method, granted by the `@queue.task` decorator like `.delay()`, generates the function "signature" for the task worker to understand how to execute it. The pipe operator `|` pipes the returned value from left to right, in such order.

The resulting `chain()` returns an `AsyncResult` object, just like when we invoke individual task functions with the decorated `.delay()` method:

In [23]:
r = pipe(domain='bbc.com')
r

<AsyncResult: 6de32df7-6702-4635-ab9b-ddb1cc1cda22>

In [24]:
r.status

'SUCCESS'

In [27]:
r.get()['titles'][:10]

[{'text': 'Belarus athlete told by grandmother not to return',
  'link': 'https://bbc.com/news/world-europe-58104195'},
 {'text': 'Chronic illness influencers accused of faking it',
  'link': 'https://bbc.com/news/stories-58093455'},
 {'text': 'Ethiopian rebels take Unesco world heritage town',
  'link': 'https://bbc.com/news/world-africa-58101912'},
 {'text': "Turkish influencer prosecuted 'for sex-toy photos'",
  'link': 'https://bbc.com/news/world-europe-58102368'},
 {'text': 'Man City sign £100m Grealish from Villa',
  'link': 'https://bbc.com/sport/football/57818660'},
 {'text': 'Mixed day for USA on day 13 in Tokyo',
  'link': 'https://bbc.com/sport/olympics/58096833'},
 {'text': 'Anderson leads England fightback',
  'link': 'https://bbc.com/sport/cricket/58106765'},
 {'text': "Olympic athlete: 'It's dangerous for me in Belarus'",
  'link': 'https://bbc.com/news/world-europe-58099987'},
 {'text': "Olympic athlete: 'It's dangerous for me in...",
  'link': 'https://bbc.com/news/wor

Let's replay through all the websites with the pipeline version:

In [28]:
%%time

sites = [
    'bbc.com',
    'theguardian.com',
    'washingtonpost.com',
    'foxnews.com',
    'wsj.com',
]
data = {}
for domain in tqdm(sites, ncols=100):
    data[domain] = pipe(domain)

data

100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 387.21it/s]

CPU times: user 13.6 ms, sys: 3.7 ms, total: 17.3 ms
Wall time: 17.3 ms





{'bbc.com': <AsyncResult: 667ceb82-8b0c-4af0-8edf-1c01e1748a40>,
 'theguardian.com': <AsyncResult: c4a3c7a3-3184-4daf-a336-72674006b70b>,
 'washingtonpost.com': <AsyncResult: 1be94109-51a0-488e-966f-e4b8c41f3f04>,
 'foxnews.com': <AsyncResult: a9a953d2-b455-4bce-bf3d-8a19518feed8>,
 'wsj.com': <AsyncResult: 89f08eb7-dbbd-4f93-970f-21fe8bf36deb>}

And observe the worker logs:

```shell
[2021-08-05 17:26:10,319: INFO/MainProcess] Task extract_tasks.fetch_website_task[10f709fd-b350-4729-973f-d04ef7f1415a] received
[2021-08-05 17:26:10,323: INFO/MainProcess] Task extract_tasks.fetch_website_task[92ebcebd-2853-406d-b44f-97ed23fc6454] received
[2021-08-05 17:26:10,325: INFO/MainProcess] Task extract_tasks.fetch_website_task[87f036c6-fbff-45d3-a14a-a6dfe17cf942] received
[2021-08-05 17:26:10,328: INFO/MainProcess] Task extract_tasks.fetch_website_task[cda76c38-1e09-4267-aaef-9816787ddc97] received
[2021-08-05 17:26:10,331: INFO/MainProcess] Task extract_tasks.fetch_website_task[f43a7963-ac43-4afe-b962-be469b7075e2] received
[2021-08-05 17:26:10,568: INFO/ForkPoolWorker-8] Task extract_tasks.fetch_website_task[10f709fd-b350-4729-973f-d04ef7f1415a] succeeded in 0.24766298300005474s: <...>
[2021-08-05 17:26:10,576: INFO/MainProcess] Task extract_tasks.extract_titles[667ceb82-8b0c-4af0-8edf-1c01e1748a40] received
[2021-08-05 17:26:10,671: INFO/ForkPoolWorker-8] Task extract_tasks.extract_titles[667ceb82-8b0c-4af0-8edf-1c01e1748a40] succeeded in 0.08766871499994977s: <...>
[2021-08-05 17:26:10,722: INFO/ForkPoolWorker-10] Task extract_tasks.fetch_website_task[f43a7963-ac43-4afe-b962-be469b7075e2] succeeded in 0.38966748499990445s: <...>
[2021-08-05 17:26:10,741: INFO/MainProcess] Task extract_tasks.extract_titles[89f08eb7-dbbd-4f93-970f-21fe8bf36deb] received
[2021-08-05 17:26:10,765: INFO/ForkPoolWorker-2] Task extract_tasks.fetch_website_task[cda76c38-1e09-4267-aaef-9816787ddc97] succeeded in 0.43293487899995853s: <...>
[2021-08-05 17:26:10,770: INFO/ForkPoolWorker-1] Task extract_tasks.fetch_website_task[92ebcebd-2853-406d-b44f-97ed23fc6454] succeeded in 0.44436259199994765s: <...>
[2021-08-05 17:26:10,779: INFO/MainProcess] Task extract_tasks.extract_titles[c4a3c7a3-3184-4daf-a336-72674006b70b] received
[2021-08-05 17:26:10,795: INFO/MainProcess] Task extract_tasks.extract_titles[a9a953d2-b455-4bce-bf3d-8a19518feed8] received
[2021-08-05 17:26:10,923: INFO/ForkPoolWorker-8] Task extract_tasks.extract_titles[89f08eb7-dbbd-4f93-970f-21fe8bf36deb] succeeded in 0.1257382599999346s: <...>
[2021-08-05 17:26:11,128: INFO/ForkPoolWorker-2] Task extract_tasks.extract_titles[a9a953d2-b455-4bce-bf3d-8a19518feed8] succeeded in 0.3050345090000519s: <...>
[2021-08-05 17:26:11,221: INFO/ForkPoolWorker-1] Task extract_tasks.extract_titles[c4a3c7a3-3184-4daf-a336-72674006b70b] succeeded in 0.3850136209999846s: <...>
[2021-08-05 17:26:20,781: INFO/ForkPoolWorker-9] Task extract_tasks.fetch_website_task[87f036c6-fbff-45d3-a14a-a6dfe17cf942] succeeded in 10.455224110000017s: <...>
[2021-08-05 17:26:20,796: INFO/MainProcess] Task extract_tasks.extract_titles[1be94109-51a0-488e-966f-e4b8c41f3f04] received
[2021-08-05 17:26:20,880: INFO/ForkPoolWorker-8] Task extract_tasks.extract_titles[1be94109-51a0-488e-966f-e4b8c41f3f04] succeeded in 0.07039165100002265s: <...>
```

It may be puzzling at first, but this behaviour matches our expectation from the previously single-task observation, with some subtle differences:
1. The order of tasks and task IDs exhibit some differences:
    a. The receiving order of the `fetch_website_task` tasks follow precisely as the for loop iteration dictated, but not the `extract_titles` tasks.
    b. The `AsyncResult` IDs correlate to the IDs of the `extract_titles` tasks.
2. There are differently numbered workers that took on the tasks.
3. The pairing of workers and tasks is also an interesting observation:
    a. Both workers and tasks are paired out-of-order, naturally so are the completions.
    b. The queue would only dispatch a `extract_titles` task after its predecessor task (`fetch_website_task`) of the same pipeline/chain succeeds.
    c. The same worker can take on the same or different types of unit tasks, and from each of the worker's perspectives, does not care about the order of the unit tasks execution (the queue manages the order).
    
Workers still pair and execute unit tasks out-of-order while the queue seamlessly assembles the result back in order, further empowers task queues to take on more complex applications.

## Server Application

One of the most appealing traits of task queue is that it offers asynchronous exit for the "main" process, which makes it a valuable adoption for web services to defer and execute potentially slow tasks in the backend.

First, let's build the server out using the [Flask](https://flask.palletsprojects.com/en/2.0.x/) web framework for Python, by adapting the previous `pipe()` function:

```python
from urllib.parse import urlparse

from flask import Flask, jsonify, request

from extract_tasks import (
    filter_rules,
    fetch_website_task,
    extract_titles,
)


app = Flask(__name__)


def pipe(domain):
    chain = fetch_website_task.s(domain) | extract_titles.s()
    return chain()


@app.route('/', methods=['GET', 'POST'])
def get_news():
    domain = urlparse(request.args.get('domain'))
    domain = domain.netloc or domain.path or 'bbc.com'

    if domain not in filter_rules:
        return jsonify({
            'error': f'{domain} not supported. it needs to be one of {", ".join(filter_rules.keys())}',
        })

    # enqueue the task while not blocking the server to immediately respond to the requesting client
    pipe(domain, response_url)
    return jsonify({
        'text': f'parsing {domain} headlines',
    })
```

Like the task queue implementation, the server application would also need to reside in its module, found in [server.py](./server.py). Just like the Celery queue implementation, we can also launch a Flask application server using its CLI:

```shell
% FLASK_APP=server FLASK_DEBUG=1 flask run      
 * Serving Flask app 'server' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 798-476-896
```

The string `FLASK_APP=server FLASK_DEBUG=1` before the actual command `flask run` defines two environment variables to control the command behaviour. This is a different approach than what the Celery CLI uses with the options such as `--app` and `--loglevel`, but ultimately achieving the same end-goals. The debug mode of Flask also enables "hot-reloading" of the server, making it convenient to modify our server code without restarting it.

Once the server starts, we can access it through our "localhost" (or the IP address `127.0.0.1`) at the specified port `:5000`:

![server](https://user-images.githubusercontent.com/2837532/128428104-3c405b69-2cd3-4da8-9742-13520f2b0eec.png)

The deference of tasks also means that the server can handle much more traffic than otherwise, especially if we leverage additional server nodes to function as task queue workers.

### Where do we send the results?

While the task queue allows the server to respond immediately, there is also a substantial (and obvious) trade-off -- the requesting client-side would not receive the requested information unless we have a place to deliver the deferred results.

The usual delivery destinations can be an email, SMS (Short Message Service), and increasingly group chats, such as Slack. So let's build a companion slash command `/news` in our Slack workspace to request and obtain news headlines by modifying the pipe function to include the task to post to Slack and the server handler to expect the specific requesting payload from a Slack slash command.

![pipeline-pool](https://user-images.githubusercontent.com/2837532/128397170-aec0a57f-104f-4407-86ff-a3a8d5ba301f.png)

```python
# ...previous imports...

from extract_tasks import post_slack

def pipe_v2(domain, response_url):
    chain = fetch_website_task.s(domain) | extract_titles.s() | post_slack.s(response_url)
    return chain()


@app.route('/', methods=['GET', 'POST'])
def get_news():
    # request.form
    # command - the slash command
    # text - after command
    # response_url - the POST back URL for follow up messages
    domain = urlparse(request.form.get('text') or request.args.get('domain'))
    domain = domain.netloc or domain.path or 'bbc.com'

    if domain not in filter_rules:
        return jsonify({
            'error': f'{domain} not supported. it needs to be one of {", ".join(filter_rules.keys())}',
        })

    # queue up the task
    response_url = request.form.get('response_url')
    if not response_url:
        pipe(domain)
    else:
        pipe_v2(domain, response_url)

    return jsonify({
        'text': f'parsing {domain} headlines',
    })
```

Once we define the slash command in an installed Slack App and direct its "Request URL" to our Flask server, we are all set to get news headlines in Slack by typing `/news`:

![slash](https://user-images.githubusercontent.com/2837532/128430462-7ae10cf2-9d90-4fc5-a383-8aa7c0e5024c.png)

![slash-request](https://user-images.githubusercontent.com/2837532/128430738-f1d927a8-471f-4a0f-8d13-a93eed3dfd09.png)

And upon the post-task completion, we will receive a follow-up of all the headlines delivered to where we requested them:

![slack-response](https://user-images.githubusercontent.com/2837532/128430856-cec456de-dea4-4c99-9fe6-3749c53554f2.png)

## Remarks

Task (or job) queues can often fit in rather complex software, and as a result, offer some significant benefits:

* Simplify the software architecture
* Improve the utilization of computing resources
* Improve the scalability of the software

## References

* [Part 14 - Distributed Computation](../14-distributed-computation/notebook.ipynb)
* [Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/bs4/doc/)
* [Celery](https://docs.celeryproject.org/en/stable/index.html)
* [Flask](https://flask.palletsprojects.com/en/2.0.x/)
* [tqdm](https://tqdm.github.io/)