Skip to content

plugins_controller

plugins.plugins_controller.get_zipcode_from_plugins(zipcode) async

Async call to all plugins at decame time The first task that returns successfully returns and cancels the others.

Parameters:

Name Type Description Default
zipcode PositiveInt

zipcode needed to search address on api's

required

Returns:

Name Type Description
DictResponse DictResponse

'data' key has all addresses (db model) based on filter or empty list; 'provider' key has the service provider plugin

Todo

Create all tasks based on config Add logs

Source code in plugins/plugins_controller.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async def get_zipcode_from_plugins(
	zipcode: PositiveInt,
) -> DictResponse:
	"""
	Async call to all plugins at decame time
	The first task that returns successfully returns and cancels the others.

	Args:
			zipcode (PositiveInt): zipcode needed to search address on api's

	Returns:
			DictResponse: 'data' key has all addresses
					(db model) based on filter or empty list;
					'provider' key has the service provider plugin

	Todo:
			Create all tasks based on config
			Add logs

	"""
	tasks = []
	for service in [CepAberto, ViaCep]:
		try:
			service_instance = service()
			tasks.append(create_task(service_instance.get_address_by_zipcode(zipcode)))
		except Exception:
			# async insert logs
			...

	for task in as_completed(tasks):
		try:
			result = await task
			break
		except Exception as e:
			print('Erro:', e)
			result = {'data': [], 'provider': 'Plugins'}
			# there is no address found in this task
			# async insert logs

	return result