Using Twisted to Massively Parallelize Web Clients

Mon 13 April 2020 by Moshe Zadka

The Twisted Requests (treq) package is an HTTP client built on the popular Twisted library that is used for web requests. Async libraries offer the ability to do large amounts of network requests in parallel with relatively little CPU impact. This can be useful in HTTP clients that need to make several requests before they have all the information they need.

This post shows an example of a problem like this, and how to solve it using treq.

I enjoy playing the real-time strategy game Clash Royale. Clash Royale is a mobile strategy player-vs-player game where players play cards in an arena to win. Each card has different strengths and weaknesses, and different players prefer different cards. Clash Royale remembers which card a player plays the most; this is their "favorite" card. Players come together in clans where they can help each other. Supercell, Clash Royale's developer, released an HTTP-based API where different statistics can be queried.

How can we write a program that will output the most popular favorite cards in a clan?

If you want to follow along, you will need to register an account. If you register an account, create an API token via the Clash Royale developer portal. Then choose "Create New Key" under your profile, and enter a name, description, and a valid IP address. (An exact address is required.) Since you should never save an API key in your code, keep it as a separate file in ~/.crtoken:

$ ls ~/.crtoken
/home/moshez/.crtoken

To make it easier to see what is going on, let's start with this introductory program that prints Hello world, and then we'll talk through what it does:

import collections, json, os, sys, urllib.parse
from twisted.internet import task, defer
import treq

with open(os.path.expanduser("~/.crtoken")) as fpin:
    token = fpin.read().strip()

def main(reactor):
    print("Hello world")
    return defer.succeed(None)

task.react(main, sys.argv[1:])

This imports many more modules than we need for the "Hello world" example. We will need these modules for the final version of the program, which will accomplish the more complex task of asynchronously querying an API. After the import, the program reads the token from the file and stores it in the variable token. (We are not going to do anything with the token right now, but it's good to see that code.) Next there is a main function that accepts a Twisted reactor. A reactor is sort of like an interface to the machinery of the Twisted package. In this case the function main is sent as a parameter to task.react, and, which will call main with the reactor and any arguments we give -- the command-line arguments, in this case.

The main function returns a defer.succeed(None). This is how it returns a value of the right type: a deferred value, but one that already has been "fired" or "called." Because of that, the program will exit immediately after printing Hello world, as we need.

Next, we will look at the concepts of async functions and ensureDeferred:

async def get_clan_details(clan):
     print("Hello world", clan)

def main(reactor, clan):
    return defer.ensureDeferred(get_clan_details(clan))

task.react(main, sys.argv[1:])

In this program, which should start with the same imports, we moved all the logic to the async function get_clan_details. Just like a regular function, an async function has an implicit return None at the end. However, async functions, sometimes called co-routines, are a different type than Deferred. In order to let Twisted, which has existed since Python 1.5.2, use this modern feature, we must adapt the co-routine using ensureDeferred.

While we could write all the logic without using co-routines, using the async syntax will allow us to write code that is easier to understand, and we will need to move a lot less of the code into embedded callbacks.

The next concept to introduce is that of await. Later, we will await a network call, but for simplicity, right now, we will await on a timer. Twisted has a special function, task.deferLater, which will call a function with given parameters after some time has passed.

The following program will take five seconds to complete:

async def get_clan_details(clan, reactor):
     out = await task.deferLater(
         reactor,
         5,
         lambda clan: f"Hello world {clan}",
         clan
     )
     print(out)

def main(reactor, clan):
    return defer.ensureDeferred(get_clan_details(clan, reactor))

task.react(main, sys.argv[1:])

A note about types: task.deferLater returns a Deferred, as do most Twisted functions that do not have the value already available. When running the Twisted event loop, we can await on both Deferred values and co-routines.

The function task.deferLater will wait five seconds and then call our lambda, calculating the string to print out.

Now we have all the Twisted building blocks needed to write an efficient clan-analysis program!

Since we will be using the global reactor, we no longer need to accept the reactor as a parameter in the function that calculates these statistics. The way to use the token is as a "bearer" token in the headers:

async def get_clan_details(clan):
    headers={b'Authorization': b'Bearer '+token.encode('ascii')}

We want clan tags to be sent, which will be strings. Clan tags begin with #, so they must be quoted before they're put in URLs. This is because # has the special meaning "URL fragment":

async def get_clan_details(clan):
     # ...
     clan = urllib.parse.quote(clan)

The first step is to get the details of the clan, including the clan members:

async def get_clan_details(clan):
     # ...
     res = await treq.get("https://api.clashroyale.com/v1/clans/" + clan,
                          headers=headers)

Notice that we have to await the treq.get call. We have to be explicit about when to wait and get information since it is an asynchronous network call. Just using the await syntax to call a Deferred function does not let us take full power of asynchronicity (we will see how to do it later).

Next, after getting the headers, we need to get the content. The treq library gives us a helper method that parses the JSON directly:

async def get_clan_details(clan):
     # ...
     content = await res.json()

The content includes some metadata about the clan, which is not interesting for our current purposes, and a memberList field that contains the clan members. Note that while it has some data about the players, the current favorite card is not part of it. It does include the unique "player tag" that we can use to retrieve further data.

We collect all player tags, and, since they also begin with #, we URL-quote them:

async def get_clan_details(clan):
     # ...
     player_tags = [urllib.parse.quote(player['tag'])
                    for player in content['memberList']]

Finally, we come to the real power of treq and Twisted: generating all requests for player data at once! That can really speed up tasks like this one, which queries an API over and over again. In cases of APIs with rate-limiting, this can be problematic.

There are times when we need to be considerate to our API owners and not run up against any rate limits. There are techniques to support rate-limiting explicitly in Twisted, but they are beyond the scope of this post. (One important tool is defer.DeferredSemaphore.)

async def get_clan_details(clan):
     # ...
     requests = [treq.get("https://api.clashroyale.com/v1/players/" + tag,
                          headers=headers)
                 for tag in player_tags]

Remember that requests do not return the JSON body directly. Earlier, we used await so that we did not have to worry about exactly what the requests return. They actually return a Deferred. A Deferred can have an attached callback that will modify the Deferred. If the callback returns a Deferred, the final value of the Deferred will be the value of the returned Deferred.

So, to each deferred, we attach a callback that will retrieve the JSON of the body:

async def get_clan_details(clan):
     # ...
     for request in requests:
         request.addCallback(lambda result: result.json())

Attaching callbacks to Deferreds is a more manual technique, which makes code that is harder to follow but uses the async features more efficiently. Specifically, because we are attaching all the callbacks at the same time, we do not need to wait for the network calls, which potentially can take a long time, to indicate how to post-process the result.

From Deferreds to values

We cannot calculate the most popular favorite cards until all results have been gathered. We have a list of Deferreds, but what we want is a Deferred that gets a list value. This inversion is exactly what the Twisted function defer.gatherResults does:

async def get_clan_details(clan):
     # ...
     all_players = await defer.gatherResults(requests)

This seemingly innocent call is where we use the full power of Twisted. The defer.gatherResults function immediately returns a deferred that will fire only when all the constituent Deferreds have fired and will fire with the result. It even gives us free error-handling: if any of the Deferreds error out, it will immediately return a failed deferred, which will cause the await to raise an exception.

Now that we have all the players' details, we need to munch some data. We get to use one of Python's coolest built-ins, collections.Counter. This class takes a list of things and counts how many times it has seen each thing, which is exactly what we need for vote counting or popularity contests:

async def get_clan_details(clan):
     # ...
     favorite_card = collections.Counter([player["currentFavouriteCard"]["name"]
                                          for player in all_players])

Finally, we print it:

async def get_clan_details(clan):
     # ...
     print(json.dumps(favorite_card.most_common(), indent=4))

So, putting it all together, we have:

import collections, json, os, sys, urllib.parse
from twisted.internet import task, defer
import treq

with open(os.path.expanduser("~/.crtoken")) as fpin:
    token = fpin.read().strip()


async def get_clan_details(clan):
     headers = headers={b'Authorization': b'Bearer '+token.encode('ascii')}
     clan = urllib.parse.quote(clan)
     res = await treq.get("https://api.clashroyale.com/v1/clans/" + clan,
                          headers=headers)
     content = await res.json()
     player_tags = [urllib.parse.quote(player['tag'])
                    for player in content['memberList']]
     requests = [treq.get("https://api.clashroyale.com/v1/players/" + tag,
                          headers=headers)
                 for tag in player_tags]
     for request in requests:
         request.addCallback(lambda result: result.json())
     all_players = await defer.gatherResults(requests)
     favorite_card = collections.Counter([player["currentFavouriteCard"]["name"]
                                          for player in all_players])
     print(json.dumps(favorite_card.most_common(), indent=4))

def main(reactor, clan):
    return defer.ensureDeferred(get_clan_details(clan))

task.react(main, sys.argv[1:])

Thanks to the efficiency and expressive syntax of Twisted and treq, this is all the code we need to make asynchronous calls to an API. If you were wondering about the outcome, my clan's list of favorite cards is Wizard, Mega Knight, Valkyrie, and Royal Giant, in descending order.

(This post is based on the article I wrote for opensource.com)