Apresentando: MongoMotor

Oi, pessoal. Tudo certo?

A ideia hoje é falar um pouquinho sobre o MongoMotor, que é uma biblioteca para acesso assíncrono ao MongoDB usando Python.

O que é esse tal de assíncrono mesmo?

Em geral, operações de entrada e saída de dados (io) consomem bastante tempo aguardando a chegada/envio de dados e enquanto um programa aguarda estas operações todo o processamento é bloqueado e nada mais será feito até que os dados sejam recebidos ou enviados. Para lidar com esta situação, as soluções comuns são threads e multi-processos (vide apache) ou io assíncrono, utilizando eventos do sistema operacional (vide nginx).

Com io assíncrono, quando executamos alguma operação de io ao invés de aguardarmos o retorno, a operação é "deixada de lado" (num scheduler), liberando o código para processar outras coisas, e quando obtivermos alguma resposta da operação de io o sistema operacional enviará um evento informando que a resposta chegou e assim o processamento da operação original pode ser retomado.

Em Python, há muito tempo se tem projetos que usam a ideia de io assíncrono para resolver este problema, como o Tornado ou o Twisted, mas com a chegada do módulo asyncio à biblioteca padrão no Python 3.4 e com a inclusão da super simpática sintaxe async/await no Python 3.5, operações de io assíncrono se tornaram uma coisa muito mais corriqueira na linguagem.

Para uma super palestra sobre concorrência, veja este vídeo.

E esse async/await, hem?

Bem resumidamente, com async podemos definir corotinas em Python. Corotinas são funções que quando chamadas retornam um objeto Future e precisam ter a sua execução agendada por algum io loop. Algo assim:

async def my_coro():
    """Uma corotina que não faz nada"""

    # do something
    return 'some result'

# Uma corotina pode ser executada da seguinte maneira, por exemplo
import asyncio

loop = asyncio.get_event_loop()
loop.run_until_complete(my_coro())

Com await podemos informar que a execução do código sub-sequente deve se dar somente quando a execução da future sendo aguardada terminar. Assim:

async def my_coro():
    """Uma corotina que não faz nada"""

    # do something
    return 'some result'

async def other_coro():
    """Uma corotina que chama outra corotina"""

    # r será o valor retornado por my_coro depois que sua execução for terminada.
    # Se chamássemos `my_coro()` sem `await` r seria uma future.
    r = await my_coro()
    # faça algo com r
    return 'a new result'

# Agora agendamos a future retornada `other_coro()` 
loop = asyncio.get_event_loop()
loop.run_until_complete(other_coro())

Para mais informações veja: asyncio docs e leia esse post.

Finalmente o MongoMotor

O mongomotor tira proveito do Motor, um driver assíncrono para MongoDB, e do MongoEngine, com sua api à la Django ORM, criando assim uma biblioteca bem simples para acesso assíncrono ao mongodb. Basta criar classes representando seus documentos, declarar alguns atributos - se quiser - e é isso, já podemos acessar o mongodb de maneira assíncrona.

Código. Aleluia!

Acabou a parte chata e chegou o que todo mundo queria: código. Neste exemplo, vamos analisar as perguntas mais recentes no stackoverflow.

A primeira coisa que precisamos fazer é instalar o mongomotor. Isto é feito usando-se o pip. Num terminal digite o seguinte:

$ pip install mongomotor

Agora, com o mongomotor já instalado podemos começar a escrever o código. Num arquivo python, faça:

# -*- coding: utf-8 -*-

# A função connect é usada para connectar a um banco de dados
from mongomotor import connect

# Conectamos uma vez quando nosso programa começa e está feia a conexão.

# Usando connect() sem parâmetros, o mongomotor vai tentar se conectar ao
# mongo em localhost na porta 27017, o padrão para a instalação.
connect()

# Se necessário é possível passar outros parâmetros, além dos parâmetros de
# autenticação
connect(host='my.mongo.host', port=1234, username='myself', password='my-password')

Agora vamos definir os nossos documentos:

# A classe Document é a base para os nossos documentos que serão definidos
from mongomotor import Document

# Apesar de o mongo ser um banco de dados sem schema, usamos estes campos
# para declarar nosso schema ficando mais fácil o entendimento posterior
# do código.

# Documentos com campos dinâmicos podem ser criados Usando-se a classe
# mongomotor.DynamicDocument

from mongomotor.fields import URLField, StringField, ListField, ReferenceField, IntField


class Usuario(Document):
    """Um usuário que fez uma pergunta no stackoverflow."""

    # Este campo será um inteiro e é obrigatório, por isso o uso do
    # parâmetro required=True.
    # Usamos também o parâmetro unique=True para garantir que só exista
    # um documento com este valor.
    external_id = IntField(required=True, unique=True)
    """O id do usuário no so."""
    
    nome = StringField()
    """O nome usuário que será exibido. O nome, não o usuário. :P"""

    reputacao = IntField()
    """A reputação do usuário no site."""


class Pergunta(Document):

    external_id = IntField(required=True, unique=True)
    """O id da pergunta no so."""

    titulo = StringField(required=True)
    """O título da pergunta"""

    # URLField é uma string que será validada para verificar se é uma
    # url
    url = URLField(required=True, unique=True)
    """A url da pergunta no so."""

    # ReferenceField aponta para um outro documento.
    # NOTA: Esta relação é feita na apliação, não no mongodb server.
    usuario = ReferenceField(Usuario, required=True)
    """O usuário que fez a pergunta."""

    # ListField indica que o campo é uma lista. Neste caso teremos uma lista
    # de strings.
    tags = ListField(StringField())
    """A lista de tags da pergunta"""


# Para que o unique funcione precisamos criar os índices nas coleções.
Usuario.ensure_indexes()
Pergunta.ensure_indexes()

Pronto, nossos documentos já estão definidos. Para informações sobre todas as opções para definir documentos, veja aqui. 

Agora podemos inserir dados e fazer buscas nos documentos. Para inserir os dados vamos user a api do stackoverflow.  Para fazer requisições http assíncronas, usaremos a biblioteca aiohttp. Num terminal instale-a com:

$ pip install aiohttp

Aqui a função para baixar os dados. 

import json
# Usamos aiohttp para fazer requests http assíncronos
from aiohttp import ClientSession

SO_URL = 'https://api.stackexchange.com/2.2/questions?order=desc&sort=activity&site=stackoverflow'


async def get_so_questions():
    async with ClientSession() as session:
        async with session.get(SO_URL) as response:
            r = await response.read()

    # Retorna uma lista de dicionários. Cada dicionário contém informação
    # sobre uma pergunta.
    return json.loads(r.decode())['items']

O uso do aiohttp não está no escopo deste artigo, mas a ideia aqui é fazer as operações de io (no caso os requests http) de maneira assíncrona. 

Agora já podemos cadastrar alguns dados. Primeiro vamos criar um método para criar um usuário baseado na informação retornada pela api.

class Usuario(Document):
    """Um usuário que fez uma pergunta no stackoverflow."""

    # Este campo será um inteiro e é obrigatório, por isso o uso do
    # parâmetro required=True.
    # Usamos também o parâmetro unique=True para garantir que só exista
    # um documento com este valor.
    external_id = IntField(required=True, unique=True)
    """O id do usuário no so."""

    nome = StringField()
    """O nome de usuário que será exibido"""

    reputacao = IntField()
    """A reputação do usuário no site."""

    # Adicionamos este método para inserir usuários.
    @classmethod
    async def get_or_create(cls, user_info):
        """Retorna um usuário. Tenta obter um usuário através de sua
        external_id. Se não existir, cria um novo usuário.

        :param user_info: Um dicionário com informações do usuário enviado
          pela api.
        """

        external_id = user_info['user_id']
        nome = user_info['display_name']
        reputacao = user_info['reputation']
        try:
            # O atributo `objects` é um objeto do tipo QuerySet.
            # O método `get()` retorna um documento baseado nos parâmetros
            # passados a este método.
            user = await cls.objects.get(external_id=external_id)
        except cls.DoesNotExist:
            # Quando nenhum documento que se enquadra nos parâmetros
            # é encontrado uma exceção `DoesNotExist` é levantada.
            # Aqui neste caso criamos um novo documento
            user = cls(external_id=external_id, nome=nome, 
                       reputacao=reputacao)
            # E salvamos o documento usando o método `save()`
            await user.save()

        return user

Agora vamos escrever um pouco de código para inserir as perguntas baseado no retorno da api

class Pergunta(Document):
    """Uma pergunta feita no stackoverflow."""

    external_id = IntField(required=True, unique=True)
    """O id da pergunta no so."""

    titulo = StringField(required=True)
    """O título da pergunta"""

    # URLField é uma string que será validada para verificar se é uma
    # url
    url = URLField(required=True, unique=True)
    """A url da pergunta no so."""

    # ReferenceField aponta para um outro documento.
    # NOTA: Esta relação é feita na apliação, não no mongodb server.
    usuario = ReferenceField(Usuario, required=True)
    """O usuário que fez a pergunta."""

    # ListField indica que o campo é uma lista. Neste caso teremos uma lista
    # de strings.
    tags = ListField(StringField())
    """A lista de tags da pergunta"""


    @classmethod
    async def adicionar_perguntas(cls, perguntas):
        """Adiciona as perguntas retornadas pela api.

        :param perguntas: Uma lista de dicionários, cada um com informações
          sobre uma pergunta.
        """

        # Lista para armazenar as perguntas a medida em que formos criando
        # os documentos para salvá-las todas de uma vez só.
        instancias = []
        for pinfo in perguntas:
            # Primeiro criamos usuário
            usuario = await Usuario.get_or_create(pinfo['owner'])

            # Agora criamos a pergunta
            external_id = pinfo['question_id']
            url = pinfo['link']
            title = pinfo['title']
            tags = pinfo['tags']
            pergunta = cls(external_id=external_id, url=url, titulo=title,
                           tags=tags, usuario=usuario)

            # Adicionamos à lista de instâncias para serem salvas depois
            instancias.append(pergunta)

        # Agora salvamos todas as instâncias de uma vez só.
        await cls.objects.insert(instancias)

E uma função pra juntar tudo e popular o banco de dados.

async def populate_db():
    """Função para popular o banco de dados com as últimas perguntas do
    stackoverflow.
    """

    # Vamos limpar tudo primeiro
    await Pergunta.drop_collection()
    await Usuario.drop_collection()

    # Agora cadastramos as perguntas mais recentes
    perguntas = await get_so_questions()
    await Pergunta.adicionar_perguntas(perguntas)

Bom, depois de inserir alguns dados no banco, vamos fazer buscas nestes dados.

async def stats():
    """Função que mostra alguns dados obtidos através da api do stackoverflow.
    """

    # O método `count()` é usado para contar a quantidade de documentos
    # em um queryset.
    total_perguntas = await Pergunta.objects.count()
    total_usuarios = await Usuario.objects.count()

    print('Temos um total de {} perguntas de {} usuários diferentes\n'.format(
        total_perguntas, total_perguntas))

    # Podemos usar o método `order_by()` para ordenar os resultados.
    # Note que não é preciso o uso de await quando estamos filtrando/ordenando
    # um queryset. A operação de io só é executada quando um documento for
    # necessário
    usuarios = Usuario.objects.order_by('-reputacao')

    # Usamos método `fisrt` para pegar o primeiro resultado do queryset.
    # Aqui sim é necessário o uso de await.
    usuario = await usuarios.first()

    print('O usuário com maior reputação é: *{}* com reputação {}'.format(
        usuario.nome, usuario.reputacao))

    # Podemos usar o método `filter()` para filtrar os resultados de um
    # queryset
    fileterd_qs = Pergunta.objects.filter(usuario=usuario)
    # E podemos iterar sobre os resultados do queryset com `async for`
    print('As perguntas de *{}* são:'.format(usuario.nome))
    async for pergunta in fileterd_qs:
        print('- {}'.format(pergunta.titulo))
        print('  tags: {}'.format(', '.join(pergunta.tags)))

    print('')

    # Com o método `item_frequencies()` podemos contas as repetições
    # de items de listas em documentos de um queryset
    popular_tags = await Pergunta.objects.item_frequencies('tags')

    tags = sorted([(k, v) for k, v in popular_tags.items()],
                  key=lambda x: x[1], reverse=True)
    most_popular = tags[0]
    print('A tag mais popular é *{}* com {} perguntas'.format(most_popular[0],
                                                              most_popular[1]))

    # Podemos filtar um queryset com base em um item de uma lista, no
    # nosso exemplo, com uma tag
    print('As perguntas de *{}* são:'.format(most_popular[0]))
    async for pergunta in Pergunta.objects.filter(tags=most_popular[0]):
        print('- {}'.format(pergunta.titulo))
        print('  tags: {}'.format(', '.join(pergunta.tags)))

        # Note que para acessar uma referência é necessário o uso
        # de await
        usuario = await pergunta.usuario
        print('  usuario: {}'.format(usuario.nome))

    print('')

Para a documentação completa de como fazer buscas usando o mongomotor veja aqui.

E é isso. Acabamos de conhecer o básico do mongomotor. Pra finalizar, vamos fazer uma função que coloca tudo isso junto:

async def main():
    print('Populando o banco de dados...')
    await populate_db()
    print('Banco de dados populado!\n')
    await stats()

E por fim, colocar isso aqui no final do nosso arquivo:

if __name__ == '__main__':

    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

E feito! Nosso script final ficou assim:

1# -*- coding: utf-8 -*-
2
3import json
4from aiohttp import ClientSession
5from mongomotor import connect
6from mongomotor import Document
7from mongomotor.fields import (URLField, StringField, ListField, ReferenceField,
8                               IntField)
9
10connect()
11
12SO_URL = 'https://api.stackexchange.com/2.2/questions?order=desc&sort=activity&site=stackoverflow'
13
14
15async def get_so_questions():
16    async with ClientSession() as session:
17        async with session.get(SO_URL) as response:
18            r = await response.read()
19
20    return json.loads(r.decode())['items']
21
22
23class Usuario(Document):
24    """Um usuário que fez uma pergunta no stackoverflow."""
25
26    # Este campo será um inteiro e é obrigatório, por isso o uso do
27    # parâmetro required=True.
28    # Usamos também o parâmetro unique=True para garantir que só exista
29    # um documento com este valor.
30    external_id = IntField(required=True, unique=True)
31    """O id do usuário no so."""
32
33    nome = StringField()
34    """O nome de usuário que será exibido"""
35
36    reputacao = IntField()
37    """A reputação do usuário no site."""
38
39    # Adicionamos este método para inserir usuários.
40    @classmethod
41    async def get_or_create(cls, user_info):
42        """Retorna um usuário. Tenta obter um usuário através de sua
43        external_id. Se não existir, cria um novo usuário.
44
45        :param user_info: Um dicionário com informações do usuário enviado
46          pela api.
47        """
48
49        external_id = user_info['user_id']
50        nome = user_info['display_name']
51        reputacao = user_info['reputation']
52        try:
53            # O atributo `objects` é um objeto do tipo QuerySet.
54            # O método `get()` retorna um documento baseado nos parâmetros
55            # passados a este método.
56            user = await cls.objects.get(external_id=external_id)
57        except cls.DoesNotExist:
58            # Quando nenhum documento que se enquadra nos parâmetros
59            # é encontrado uma exceção `DoesNotExist` é levantada.
60            #
61            # Aqui neste caso criamos um novo documento
62            user = cls(external_id=external_id, nome=nome,
63                       reputacao=reputacao)
64            # E salvamos o documento usando o método `save()`
65            await user.save()
66
67        return user
68
69
70class Pergunta(Document):
71    """Uma pergunta feita no stackoverflow."""
72
73    external_id = IntField(required=True, unique=True)
74    """O id da pergunta no so."""
75
76    titulo = StringField(required=True)
77    """O título da pergunta"""
78
79    # URLField é uma string que será validada para verificar se é uma
80    # url
81    url = URLField(required=True, unique=True)
82    """A url da pergunta no so."""
83
84    # ReferenceField aponta para um outro documento.
85    # NOTA: Esta relação é feita na apliação, não no mongodb server.
86    usuario = ReferenceField(Usuario, required=True)
87    """O usuário que fez a pergunta."""
88
89    # ListField indica que o campo é uma lista. Neste caso teremos uma lista
90    # de strings.
91    tags = ListField(StringField())
92    """A lista de tags da pergunta"""
93
94
95    @classmethod
96    async def adicionar_perguntas(cls, perguntas):
97        """Adiciona as perguntas retornadas pela api.
98
99        :param perguntas: Uma lista de dicionários, cada um com informações
100          sobre uma pergunta.
101        """
102
103        # Lista para armazenar as perguntas a medida em que formos criando
104        # os documentos para salvá-las todas de uma vez só.
105        instancias = []
106        for pinfo in perguntas:
107            # Primeiro criamos usuário
108            usuario = await Usuario.get_or_create(pinfo['owner'])
109
110            # Agora criamos a pergunta
111            external_id = pinfo['question_id']
112            url = pinfo['link']
113            title = pinfo['title']
114            tags = pinfo['tags']
115            pergunta = cls(external_id=external_id, url=url, titulo=title,
116                           tags=tags, usuario=usuario)
117
118            # Adicionamos à lista de instâncias para serem salvas depois
119            instancias.append(pergunta)
120
121        # Agora salvamos todas as instâncias de uma vez só.
122        await cls.objects.insert(instancias)
123
124
125# Para que o unique funcione precisamos criar os índices nas coleções.
126Usuario.ensure_indexes()
127Pergunta.ensure_indexes()
128
129
130async def populate_db():
131    """Função para popular o banco de dados com as últimas perguntas do
132    stackoverflow.
133    """
134
135    # Vamos limpar tudo primeiro
136    await Pergunta.drop_collection()
137    await Usuario.drop_collection()
138
139    # Agora cadastramos as perguntas mais recentes
140    perguntas = await get_so_questions()
141    await Pergunta.adicionar_perguntas(perguntas)
142
143
144async def stats():
145    """Função que mostra alguns dados obtidos através da api do stackoverflow.
146    """
147
148    # O método `count()` é usado para contar a quantidade de documentos
149    # em um queryset.
150    total_perguntas = await Pergunta.objects.count()
151    total_usuarios = await Usuario.objects.count()
152
153    print('Temos um total de {} perguntas de {} usuários diferentes\n'.format(
154        total_perguntas, total_perguntas))
155
156    # Podemos usar o método `order_by()` para ordenar os resultados.
157    # Note que não é preciso o uso de await quando estamos filtrando/ordenando
158    # um queryset. A operação de io só é executada quando um documento for
159    # necessário
160    usuarios = Usuario.objects.order_by('-reputacao')
161
162    # Usamos método `fisrt` para pegar o primeiro resultado do queryset.
163    # Aqui sim é necessário o uso de await.
164    usuario = await usuarios.first()
165
166    print('O usuário com maior reputação é: *{}* com reputação {}'.format(
167        usuario.nome, usuario.reputacao))
168
169    # Podemos usar o método `filter()` para filtrar os resultados de um
170    # queryset
171    fileterd_qs = Pergunta.objects.filter(usuario=usuario)
172    # E podemos iterar sobre os resultados do queryset com `async for`
173    print('As perguntas de *{}* são:'.format(usuario.nome))
174    async for pergunta in fileterd_qs:
175        print('- {}'.format(pergunta.titulo))
176        print('  tags: {}'.format(', '.join(pergunta.tags)))
177
178    print('')
179
180    # Com o método `item_frequencies()` podemos contas as repetições
181    # de items de listas em documentos de um queryset
182    popular_tags = await Pergunta.objects.item_frequencies('tags')
183
184    tags = sorted([(k, v) for k, v in popular_tags.items()],
185                  key=lambda x: x[1], reverse=True)
186    most_popular = tags[0]
187    print('A tag mais popular é *{}* com {} perguntas'.format(most_popular[0],
188                                                              most_popular[1]))
189
190    # Podemos filtar um queryset com base em um item de uma lista, no
191    # nosso exemplo, com uma tag
192    print('As perguntas de *{}* são:'.format(most_popular[0]))
193    async for pergunta in Pergunta.objects.filter(tags=most_popular[0]):
194        print('- {}'.format(pergunta.titulo))
195        print('  tags: {}'.format(', '.join(pergunta.tags)))
196
197        # Note que para acessar uma referência é necessário o uso
198        # de await
199        usuario = await pergunta.usuario
200        print('  usuario: {}'.format(usuario.nome))
201
202    print('')
203
204
205async def main():
206    print('Populando o banco de dados...')
207    await populate_db()
208    print('Banco de dados populado!\n')
209    await stats()
210
211
212if __name__ == '__main__':
213
214    import asyncio
215    loop = asyncio.get_event_loop()
216    loop.run_until_complete(main())

O script pode ser baixado aqui.

Salve isto em um arquivo chamado mmso.py e num terminal execute:

$ python mmso.py

E voilà, eis a saída do nosso programa:

Populando o banco de dados...
Banco de dados populado!

Temos um total de 30 perguntas de 30 usuários diferentes

O usuário com maior reputação é: *jww* com reputação 49941
As perguntas de *jww* são:
- Undefined reference to symbol during link GCC inline assembly
  tags: c++, gcc, linker-errors, inline-assembly

A tag mais popular é *python* com 5 perguntas
As perguntas de *python* são:
- BeautifulSoup4 findChildren() is empty
  tags: python, html, parsing, beautifulsoup
  usuario: Meghan M.
- How to update weights in neural networks?
  tags: python, neural-network
  usuario: Absolute Idiot
- assign results to dummy variable _
  tags: python, python-3.x
  usuario: cs0815
- use opencv, cv2.videocapture in kivy with android - python for android
  tags: android, python, opencv, kivy, buildozer
  usuario: Vajira Prabuddhaka
- How to get the value of a Django Model Field object
  tags: python, django, django-models
  usuario: Hugo Luis Villalobos Canto

Para informações mais detalhadas sobre o mongomotor, veja a documentação.

O mongomotor é software livre, sinta-se à vontade para contribuir. :)

Comentários

  1. Luciano

    Luciano em 19/08/2018 18:19 #

    Deu um erro nessa linha:

    url = pinfo['url']

    pois mudou para

    url = pinfo['link']

  2. Juca Crispim

    Juca Crispim em 19/08/2018 20:57 #

    Valeu pelo toque, Luciano. Corrigi.

Faça seu comentário