Oi, pessoal. Tudo certo?
A ideia hoje é falar um pouquinho sobre o MongoMotor, que é uma biblioteca para acesso assíncrono ao MongoDB usando Python.
O que é esse tal de assíncrono mesmo?
Em geral, operações de entrada e saída de dados (io) consomem bastante tempo aguardando a chegada/envio de dados e enquanto um programa aguarda estas operações todo o processamento é bloqueado e nada mais será feito até que os dados sejam recebidos ou enviados. Para lidar com esta situação, as soluções comuns são threads e multi-processos (vide apache) ou io assíncrono, utilizando eventos do sistema operacional (vide nginx).
Com io assíncrono, quando executamos alguma operação de io ao invés de aguardarmos o retorno, a operação é "deixada de lado" (num scheduler), liberando o código para processar outras coisas, e quando obtivermos alguma resposta da operação de io o sistema operacional enviará um evento informando que a resposta chegou e assim o processamento da operação original pode ser retomado.
Em Python, há muito tempo se tem projetos que usam a ideia de io assíncrono para resolver este problema, como o Tornado ou o Twisted, mas com a chegada do módulo asyncio à biblioteca padrão no Python 3.4 e com a inclusão da super simpática sintaxe async/await no Python 3.5, operações de io assíncrono se tornaram uma coisa muito mais corriqueira na linguagem.
Para uma super palestra sobre concorrência, veja este vídeo.
E esse async/await, hem?
Bem resumidamente, com async podemos definir corotinas em Python. Corotinas são funções que quando chamadas retornam um objeto Future e precisam ter a sua execução agendada por algum io loop. Algo assim:
async def my_coro(): """Uma corotina que não faz nada""" # do something return 'some result' # Uma corotina pode ser executada da seguinte maneira, por exemplo import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(my_coro())
Com await podemos informar que a execução do código sub-sequente deve se dar somente quando a execução da future sendo aguardada terminar. Assim:
async def my_coro(): """Uma corotina que não faz nada""" # do something return 'some result' async def other_coro(): """Uma corotina que chama outra corotina""" # r será o valor retornado por my_coro depois que sua execução for terminada. # Se chamássemos `my_coro()` sem `await` r seria uma future. r = await my_coro() # faça algo com r return 'a new result' # Agora agendamos a future retornada `other_coro()` loop = asyncio.get_event_loop() loop.run_until_complete(other_coro())
Para mais informações veja: asyncio docs e leia esse post.
Finalmente o MongoMotor
O mongomotor tira proveito do Motor, um driver assíncrono para MongoDB, e do MongoEngine, com sua api à la Django ORM, criando assim uma biblioteca bem simples para acesso assíncrono ao mongodb. Basta criar classes representando seus documentos, declarar alguns atributos - se quiser - e é isso, já podemos acessar o mongodb de maneira assíncrona.
Código. Aleluia!
Acabou a parte chata e chegou o que todo mundo queria: código. Neste exemplo, vamos analisar as perguntas mais recentes no stackoverflow.
A primeira coisa que precisamos fazer é instalar o mongomotor. Isto é feito usando-se o pip. Num terminal digite o seguinte:
$ pip install mongomotor
Agora, com o mongomotor já instalado podemos começar a escrever o código. Num arquivo python, faça:
# -*- coding: utf-8 -*- # A função connect é usada para connectar a um banco de dados from mongomotor import connect # Conectamos uma vez quando nosso programa começa e está feia a conexão. # Usando connect() sem parâmetros, o mongomotor vai tentar se conectar ao # mongo em localhost na porta 27017, o padrão para a instalação. connect() # Se necessário é possível passar outros parâmetros, além dos parâmetros de # autenticação connect(host='my.mongo.host', port=1234, username='myself', password='my-password')
Agora vamos definir os nossos documentos:
# A classe Document é a base para os nossos documentos que serão definidos from mongomotor import Document # Apesar de o mongo ser um banco de dados sem schema, usamos estes campos # para declarar nosso schema ficando mais fácil o entendimento posterior # do código. # Documentos com campos dinâmicos podem ser criados Usando-se a classe # mongomotor.DynamicDocument from mongomotor.fields import URLField, StringField, ListField, ReferenceField, IntField class Usuario(Document): """Um usuário que fez uma pergunta no stackoverflow.""" # Este campo será um inteiro e é obrigatório, por isso o uso do # parâmetro required=True. # Usamos também o parâmetro unique=True para garantir que só exista # um documento com este valor. external_id = IntField(required=True, unique=True) """O id do usuário no so.""" nome = StringField() """O nome usuário que será exibido. O nome, não o usuário. :P""" reputacao = IntField() """A reputação do usuário no site.""" class Pergunta(Document): external_id = IntField(required=True, unique=True) """O id da pergunta no so.""" titulo = StringField(required=True) """O título da pergunta""" # URLField é uma string que será validada para verificar se é uma # url url = URLField(required=True, unique=True) """A url da pergunta no so.""" # ReferenceField aponta para um outro documento. # NOTA: Esta relação é feita na apliação, não no mongodb server. usuario = ReferenceField(Usuario, required=True) """O usuário que fez a pergunta.""" # ListField indica que o campo é uma lista. Neste caso teremos uma lista # de strings. tags = ListField(StringField()) """A lista de tags da pergunta""" # Para que o unique funcione precisamos criar os índices nas coleções. Usuario.ensure_indexes() Pergunta.ensure_indexes()
Pronto, nossos documentos já estão definidos. Para informações sobre todas as opções para definir documentos, veja aqui.
Agora podemos inserir dados e fazer buscas nos documentos. Para inserir os dados vamos user a api do stackoverflow. Para fazer requisições http assíncronas, usaremos a biblioteca aiohttp. Num terminal instale-a com:
$ pip install aiohttp
Aqui a função para baixar os dados.
import json # Usamos aiohttp para fazer requests http assíncronos from aiohttp import ClientSession SO_URL = 'https://api.stackexchange.com/2.2/questions?order=desc&sort=activity&site=stackoverflow' async def get_so_questions(): async with ClientSession() as session: async with session.get(SO_URL) as response: r = await response.read() # Retorna uma lista de dicionários. Cada dicionário contém informação # sobre uma pergunta. return json.loads(r.decode())['items']
O uso do aiohttp não está no escopo deste artigo, mas a ideia aqui é fazer as operações de io (no caso os requests http) de maneira assíncrona.
Agora já podemos cadastrar alguns dados. Primeiro vamos criar um método para criar um usuário baseado na informação retornada pela api.
class Usuario(Document): """Um usuário que fez uma pergunta no stackoverflow.""" # Este campo será um inteiro e é obrigatório, por isso o uso do # parâmetro required=True. # Usamos também o parâmetro unique=True para garantir que só exista # um documento com este valor. external_id = IntField(required=True, unique=True) """O id do usuário no so.""" nome = StringField() """O nome de usuário que será exibido""" reputacao = IntField() """A reputação do usuário no site.""" # Adicionamos este método para inserir usuários. @classmethod async def get_or_create(cls, user_info): """Retorna um usuário. Tenta obter um usuário através de sua external_id. Se não existir, cria um novo usuário. :param user_info: Um dicionário com informações do usuário enviado pela api. """ external_id = user_info['user_id'] nome = user_info['display_name'] reputacao = user_info['reputation'] try: # O atributo `objects` é um objeto do tipo QuerySet. # O método `get()` retorna um documento baseado nos parâmetros # passados a este método. user = await cls.objects.get(external_id=external_id) except cls.DoesNotExist: # Quando nenhum documento que se enquadra nos parâmetros # é encontrado uma exceção `DoesNotExist` é levantada. # Aqui neste caso criamos um novo documento user = cls(external_id=external_id, nome=nome, reputacao=reputacao) # E salvamos o documento usando o método `save()` await user.save() return user
Agora vamos escrever um pouco de código para inserir as perguntas baseado no retorno da api
class Pergunta(Document): """Uma pergunta feita no stackoverflow.""" external_id = IntField(required=True, unique=True) """O id da pergunta no so.""" titulo = StringField(required=True) """O título da pergunta""" # URLField é uma string que será validada para verificar se é uma # url url = URLField(required=True, unique=True) """A url da pergunta no so.""" # ReferenceField aponta para um outro documento. # NOTA: Esta relação é feita na apliação, não no mongodb server. usuario = ReferenceField(Usuario, required=True) """O usuário que fez a pergunta.""" # ListField indica que o campo é uma lista. Neste caso teremos uma lista # de strings. tags = ListField(StringField()) """A lista de tags da pergunta""" @classmethod async def adicionar_perguntas(cls, perguntas): """Adiciona as perguntas retornadas pela api. :param perguntas: Uma lista de dicionários, cada um com informações sobre uma pergunta. """ # Lista para armazenar as perguntas a medida em que formos criando # os documentos para salvá-las todas de uma vez só. instancias = [] for pinfo in perguntas: # Primeiro criamos usuário usuario = await Usuario.get_or_create(pinfo['owner']) # Agora criamos a pergunta external_id = pinfo['question_id'] url = pinfo['link'] title = pinfo['title'] tags = pinfo['tags'] pergunta = cls(external_id=external_id, url=url, titulo=title, tags=tags, usuario=usuario) # Adicionamos à lista de instâncias para serem salvas depois instancias.append(pergunta) # Agora salvamos todas as instâncias de uma vez só. await cls.objects.insert(instancias)
E uma função pra juntar tudo e popular o banco de dados.
async def populate_db(): """Função para popular o banco de dados com as últimas perguntas do stackoverflow. """ # Vamos limpar tudo primeiro await Pergunta.drop_collection() await Usuario.drop_collection() # Agora cadastramos as perguntas mais recentes perguntas = await get_so_questions() await Pergunta.adicionar_perguntas(perguntas)
Bom, depois de inserir alguns dados no banco, vamos fazer buscas nestes dados.
async def stats(): """Função que mostra alguns dados obtidos através da api do stackoverflow. """ # O método `count()` é usado para contar a quantidade de documentos # em um queryset. total_perguntas = await Pergunta.objects.count() total_usuarios = await Usuario.objects.count() print('Temos um total de {} perguntas de {} usuários diferentes\n'.format( total_perguntas, total_perguntas)) # Podemos usar o método `order_by()` para ordenar os resultados. # Note que não é preciso o uso de await quando estamos filtrando/ordenando # um queryset. A operação de io só é executada quando um documento for # necessário usuarios = Usuario.objects.order_by('-reputacao') # Usamos método `fisrt` para pegar o primeiro resultado do queryset. # Aqui sim é necessário o uso de await. usuario = await usuarios.first() print('O usuário com maior reputação é: *{}* com reputação {}'.format( usuario.nome, usuario.reputacao)) # Podemos usar o método `filter()` para filtrar os resultados de um # queryset fileterd_qs = Pergunta.objects.filter(usuario=usuario) # E podemos iterar sobre os resultados do queryset com `async for` print('As perguntas de *{}* são:'.format(usuario.nome)) async for pergunta in fileterd_qs: print('- {}'.format(pergunta.titulo)) print(' tags: {}'.format(', '.join(pergunta.tags))) print('') # Com o método `item_frequencies()` podemos contas as repetições # de items de listas em documentos de um queryset popular_tags = await Pergunta.objects.item_frequencies('tags') tags = sorted([(k, v) for k, v in popular_tags.items()], key=lambda x: x[1], reverse=True) most_popular = tags[0] print('A tag mais popular é *{}* com {} perguntas'.format(most_popular[0], most_popular[1])) # Podemos filtar um queryset com base em um item de uma lista, no # nosso exemplo, com uma tag print('As perguntas de *{}* são:'.format(most_popular[0])) async for pergunta in Pergunta.objects.filter(tags=most_popular[0]): print('- {}'.format(pergunta.titulo)) print(' tags: {}'.format(', '.join(pergunta.tags))) # Note que para acessar uma referência é necessário o uso # de await usuario = await pergunta.usuario print(' usuario: {}'.format(usuario.nome)) print('')
Para a documentação completa de como fazer buscas usando o mongomotor veja aqui.
E é isso. Acabamos de conhecer o básico do mongomotor. Pra finalizar, vamos fazer uma função que coloca tudo isso junto:
async def main(): print('Populando o banco de dados...') await populate_db() print('Banco de dados populado!\n') await stats()
E por fim, colocar isso aqui no final do nosso arquivo:
if __name__ == '__main__': import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(main())
E feito! Nosso script final ficou assim:
1# -*- coding: utf-8 -*- 2 3import json 4from aiohttp import ClientSession 5from mongomotor import connect 6from mongomotor import Document 7from mongomotor.fields import (URLField, StringField, ListField, ReferenceField, 8 IntField) 9 10connect() 11 12SO_URL = 'https://api.stackexchange.com/2.2/questions?order=desc&sort=activity&site=stackoverflow' 13 14 15async def get_so_questions(): 16 async with ClientSession() as session: 17 async with session.get(SO_URL) as response: 18 r = await response.read() 19 20 return json.loads(r.decode())['items'] 21 22 23class Usuario(Document): 24 """Um usuário que fez uma pergunta no stackoverflow.""" 25 26 # Este campo será um inteiro e é obrigatório, por isso o uso do 27 # parâmetro required=True. 28 # Usamos também o parâmetro unique=True para garantir que só exista 29 # um documento com este valor. 30 external_id = IntField(required=True, unique=True) 31 """O id do usuário no so.""" 32 33 nome = StringField() 34 """O nome de usuário que será exibido""" 35 36 reputacao = IntField() 37 """A reputação do usuário no site.""" 38 39 # Adicionamos este método para inserir usuários. 40 @classmethod 41 async def get_or_create(cls, user_info): 42 """Retorna um usuário. Tenta obter um usuário através de sua 43 external_id. Se não existir, cria um novo usuário. 44 45 :param user_info: Um dicionário com informações do usuário enviado 46 pela api. 47 """ 48 49 external_id = user_info['user_id'] 50 nome = user_info['display_name'] 51 reputacao = user_info['reputation'] 52 try: 53 # O atributo `objects` é um objeto do tipo QuerySet. 54 # O método `get()` retorna um documento baseado nos parâmetros 55 # passados a este método. 56 user = await cls.objects.get(external_id=external_id) 57 except cls.DoesNotExist: 58 # Quando nenhum documento que se enquadra nos parâmetros 59 # é encontrado uma exceção `DoesNotExist` é levantada. 60 # 61 # Aqui neste caso criamos um novo documento 62 user = cls(external_id=external_id, nome=nome, 63 reputacao=reputacao) 64 # E salvamos o documento usando o método `save()` 65 await user.save() 66 67 return user 68 69 70class Pergunta(Document): 71 """Uma pergunta feita no stackoverflow.""" 72 73 external_id = IntField(required=True, unique=True) 74 """O id da pergunta no so.""" 75 76 titulo = StringField(required=True) 77 """O título da pergunta""" 78 79 # URLField é uma string que será validada para verificar se é uma 80 # url 81 url = URLField(required=True, unique=True) 82 """A url da pergunta no so.""" 83 84 # ReferenceField aponta para um outro documento. 85 # NOTA: Esta relação é feita na apliação, não no mongodb server. 86 usuario = ReferenceField(Usuario, required=True) 87 """O usuário que fez a pergunta.""" 88 89 # ListField indica que o campo é uma lista. Neste caso teremos uma lista 90 # de strings. 91 tags = ListField(StringField()) 92 """A lista de tags da pergunta""" 93 94 95 @classmethod 96 async def adicionar_perguntas(cls, perguntas): 97 """Adiciona as perguntas retornadas pela api. 98 99 :param perguntas: Uma lista de dicionários, cada um com informações 100 sobre uma pergunta. 101 """ 102 103 # Lista para armazenar as perguntas a medida em que formos criando 104 # os documentos para salvá-las todas de uma vez só. 105 instancias = [] 106 for pinfo in perguntas: 107 # Primeiro criamos usuário 108 usuario = await Usuario.get_or_create(pinfo['owner']) 109 110 # Agora criamos a pergunta 111 external_id = pinfo['question_id'] 112 url = pinfo['link'] 113 title = pinfo['title'] 114 tags = pinfo['tags'] 115 pergunta = cls(external_id=external_id, url=url, titulo=title, 116 tags=tags, usuario=usuario) 117 118 # Adicionamos à lista de instâncias para serem salvas depois 119 instancias.append(pergunta) 120 121 # Agora salvamos todas as instâncias de uma vez só. 122 await cls.objects.insert(instancias) 123 124 125# Para que o unique funcione precisamos criar os índices nas coleções. 126Usuario.ensure_indexes() 127Pergunta.ensure_indexes() 128 129 130async def populate_db(): 131 """Função para popular o banco de dados com as últimas perguntas do 132 stackoverflow. 133 """ 134 135 # Vamos limpar tudo primeiro 136 await Pergunta.drop_collection() 137 await Usuario.drop_collection() 138 139 # Agora cadastramos as perguntas mais recentes 140 perguntas = await get_so_questions() 141 await Pergunta.adicionar_perguntas(perguntas) 142 143 144async def stats(): 145 """Função que mostra alguns dados obtidos através da api do stackoverflow. 146 """ 147 148 # O método `count()` é usado para contar a quantidade de documentos 149 # em um queryset. 150 total_perguntas = await Pergunta.objects.count() 151 total_usuarios = await Usuario.objects.count() 152 153 print('Temos um total de {} perguntas de {} usuários diferentes\n'.format( 154 total_perguntas, total_perguntas)) 155 156 # Podemos usar o método `order_by()` para ordenar os resultados. 157 # Note que não é preciso o uso de await quando estamos filtrando/ordenando 158 # um queryset. A operação de io só é executada quando um documento for 159 # necessário 160 usuarios = Usuario.objects.order_by('-reputacao') 161 162 # Usamos método `fisrt` para pegar o primeiro resultado do queryset. 163 # Aqui sim é necessário o uso de await. 164 usuario = await usuarios.first() 165 166 print('O usuário com maior reputação é: *{}* com reputação {}'.format( 167 usuario.nome, usuario.reputacao)) 168 169 # Podemos usar o método `filter()` para filtrar os resultados de um 170 # queryset 171 fileterd_qs = Pergunta.objects.filter(usuario=usuario) 172 # E podemos iterar sobre os resultados do queryset com `async for` 173 print('As perguntas de *{}* são:'.format(usuario.nome)) 174 async for pergunta in fileterd_qs: 175 print('- {}'.format(pergunta.titulo)) 176 print(' tags: {}'.format(', '.join(pergunta.tags))) 177 178 print('') 179 180 # Com o método `item_frequencies()` podemos contas as repetições 181 # de items de listas em documentos de um queryset 182 popular_tags = await Pergunta.objects.item_frequencies('tags') 183 184 tags = sorted([(k, v) for k, v in popular_tags.items()], 185 key=lambda x: x[1], reverse=True) 186 most_popular = tags[0] 187 print('A tag mais popular é *{}* com {} perguntas'.format(most_popular[0], 188 most_popular[1])) 189 190 # Podemos filtar um queryset com base em um item de uma lista, no 191 # nosso exemplo, com uma tag 192 print('As perguntas de *{}* são:'.format(most_popular[0])) 193 async for pergunta in Pergunta.objects.filter(tags=most_popular[0]): 194 print('- {}'.format(pergunta.titulo)) 195 print(' tags: {}'.format(', '.join(pergunta.tags))) 196 197 # Note que para acessar uma referência é necessário o uso 198 # de await 199 usuario = await pergunta.usuario 200 print(' usuario: {}'.format(usuario.nome)) 201 202 print('') 203 204 205async def main(): 206 print('Populando o banco de dados...') 207 await populate_db() 208 print('Banco de dados populado!\n') 209 await stats() 210 211 212if __name__ == '__main__': 213 214 import asyncio 215 loop = asyncio.get_event_loop() 216 loop.run_until_complete(main())
O script pode ser baixado aqui.
Salve isto em um arquivo chamado mmso.py e num terminal execute:
$ python mmso.py
E voilà, eis a saída do nosso programa:
Populando o banco de dados... Banco de dados populado! Temos um total de 30 perguntas de 30 usuários diferentes O usuário com maior reputação é: *jww* com reputação 49941 As perguntas de *jww* são: - Undefined reference to symbol during link GCC inline assembly tags: c++, gcc, linker-errors, inline-assembly A tag mais popular é *python* com 5 perguntas As perguntas de *python* são: - BeautifulSoup4 findChildren() is empty tags: python, html, parsing, beautifulsoup usuario: Meghan M. - How to update weights in neural networks? tags: python, neural-network usuario: Absolute Idiot - assign results to dummy variable _ tags: python, python-3.x usuario: cs0815 - use opencv, cv2.videocapture in kivy with android - python for android tags: android, python, opencv, kivy, buildozer usuario: Vajira Prabuddhaka - How to get the value of a Django Model Field object tags: python, django, django-models usuario: Hugo Luis Villalobos Canto
Para informações mais detalhadas sobre o mongomotor, veja a documentação.
O mongomotor é software livre, sinta-se à vontade para contribuir. :)
Luciano em 19/08/2018 18:19 #
Deu um erro nessa linha:
url = pinfo['url']
pois mudou para
url = pinfo['link']
Juca Crispim em 19/08/2018 20:57 #
Valeu pelo toque, Luciano. Corrigi.