Skip to content

Commit 4f41b50

Browse files
authored
AAP-57817 Add Redis connection retry using redis-py 7.0+ built-in (#16176)
* AAP-57817 Add Redis connection retry using redis-py 7.0+ built-in mechanism * Refactor Redis client helpers to use settings and eliminate code duplication * Create awx/main/utils/redis.py and move Redis client functions to avoid circular imports * Fix subsystem_metrics to share Redis connection pool between client and pipeline * Cache Redis clients in RelayConsumer and RelayWebsocketStatsManager to avoid creating new connection pools on every call * Add cap and base config * Add Redis retry logic with exponential backoff to handle connection failures during long-running operations * Add REDIS_BACKOFF_CAP and REDIS_BACKOFF_BASE settings to allow adjustment of retry timing in worst-case scenarios without code changes * Simplify Redis retry tests by removing unnecessary reload logic
1 parent 0d86874 commit 4f41b50

File tree

17 files changed

+264
-24
lines changed

17 files changed

+264
-24
lines changed

awx/main/analytics/broadcast_websocket.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import datetime
22
import asyncio
33
import logging
4-
import redis
5-
import redis.asyncio
64
import re
75

86
from prometheus_client import (
@@ -15,6 +13,7 @@
1513
)
1614

1715
from django.conf import settings
16+
from awx.main.utils.redis import get_redis_client, get_redis_client_async
1817

1918

2019
BROADCAST_WEBSOCKET_REDIS_KEY_NAME = 'broadcast_websocket_stats'
@@ -66,6 +65,8 @@ def render(self, ts=None):
6665

6766

6867
class RelayWebsocketStatsManager:
68+
_redis_client = None # Cached Redis client for get_stats_sync()
69+
6970
def __init__(self, local_hostname):
7071
self._local_hostname = local_hostname
7172
self._stats = dict()
@@ -80,7 +81,7 @@ def delete_remote_host_stats(self, remote_hostname):
8081

8182
async def run_loop(self):
8283
try:
83-
redis_conn = await redis.asyncio.Redis.from_url(settings.BROKER_URL)
84+
redis_conn = get_redis_client_async()
8485
while True:
8586
stats_data_str = ''.join(stat.serialize() for stat in self._stats.values())
8687
await redis_conn.set(self._redis_key, stats_data_str)
@@ -103,8 +104,10 @@ def get_stats_sync(cls):
103104
"""
104105
Stringified verion of all the stats
105106
"""
106-
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
107-
stats_str = redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b''
107+
# Reuse cached Redis client to avoid creating new connection pools on every call
108+
if cls._redis_client is None:
109+
cls._redis_client = get_redis_client()
110+
stats_str = cls._redis_client.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b''
108111
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
109112

110113

awx/main/analytics/subsystem_metrics.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from awx.main.consumers import emit_channel_notification
1616
from awx.main.utils import is_testing
17+
from awx.main.utils.redis import get_redis_client
1718

1819
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
1920
logger = logging.getLogger('awx.main.analytics')
@@ -198,8 +199,8 @@ class Metrics(MetricsNamespace):
198199
def __init__(self, namespace, auto_pipe_execute=False, instance_name=None, metrics_have_changed=True, **kwargs):
199200
MetricsNamespace.__init__(self, namespace)
200201

201-
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
202-
self.conn = redis.Redis.from_url(settings.BROKER_URL)
202+
self.conn = get_redis_client()
203+
self.pipe = self.conn.pipeline()
203204
self.last_pipe_execute = time.time()
204205
# track if metrics have been modified since last saved to redis
205206
# start with True so that we get an initial save to redis

awx/main/consumers.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import time
44
import hmac
55
import asyncio
6-
import redis
76

87
from django.core.serializers.json import DjangoJSONEncoder
98
from django.conf import settings
@@ -14,6 +13,8 @@
1413
from channels.layers import get_channel_layer
1514
from channels.db import database_sync_to_async
1615

16+
from awx.main.utils.redis import get_redis_client_async
17+
1718
logger = logging.getLogger('awx.main.consumers')
1819
XRF_KEY = '_auth_user_xrf'
1920

@@ -94,6 +95,9 @@ async def connect(self):
9495
await self.channel_layer.group_add(settings.BROADCAST_WEBSOCKET_GROUP_NAME, self.channel_name)
9596
logger.info(f"client '{self.channel_name}' joined the broadcast group.")
9697

98+
# Initialize Redis client once for reuse across all message handling
99+
self._redis_conn = get_redis_client_async()
100+
97101
async def disconnect(self, code):
98102
logger.info(f"client '{self.channel_name}' disconnected from the broadcast group.")
99103
await self.channel_layer.group_discard(settings.BROADCAST_WEBSOCKET_GROUP_NAME, self.channel_name)
@@ -105,8 +109,9 @@ async def receive_json(self, data):
105109
(group, message) = unwrap_broadcast_msg(data)
106110
if group == "metrics":
107111
message = json.loads(message['text'])
108-
conn = redis.Redis.from_url(settings.BROKER_URL)
109-
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics'])
112+
await self._redis_conn.set(
113+
settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics']
114+
)
110115
else:
111116
await self.channel_layer.group_send(group, message)
112117

awx/main/dispatch/control.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
import uuid
33
import json
44

5-
from django.conf import settings
65
from django.db import connection
7-
import redis
86

97
from awx.main.dispatch import get_task_queuename
8+
from awx.main.utils.redis import get_redis_client
109

1110
from . import pg_bus_conn
1211

@@ -24,7 +23,7 @@ def __init__(self, service, host=None):
2423
self.queuename = host or get_task_queuename()
2524

2625
def status(self, *args, **kwargs):
27-
r = redis.Redis.from_url(settings.BROKER_URL)
26+
r = get_redis_client()
2827
if self.service == 'dispatcher':
2928
stats = r.get(f'awx_{self.service}_statistics') or b''
3029
return stats.decode('utf-8')

awx/main/dispatch/worker/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from ansible_base.lib.logging.runtime import log_excess_runtime
2121

22+
from awx.main.utils.redis import get_redis_client
2223
from awx.main.dispatch.pool import WorkerPool
2324
from awx.main.dispatch.periodic import Scheduler
2425
from awx.main.dispatch import pg_bus_conn
@@ -59,7 +60,7 @@ def __init__(self, name, worker, queues=[], pool=None):
5960
if pool is None:
6061
self.pool = WorkerPool()
6162
self.pool.init_workers(self.worker.work_loop)
62-
self.redis = redis.Redis.from_url(settings.BROKER_URL)
63+
self.redis = get_redis_client()
6364

6465
@property
6566
def listening_on(self):

awx/main/dispatch/worker/callback.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import redis
1717

18+
from awx.main.utils.redis import get_redis_client
1819
from awx.main.consumers import emit_channel_notification
1920
from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob
2021
from awx.main.constants import ACTIVE_STATES
@@ -72,7 +73,7 @@ class CallbackBrokerWorker(BaseWorker):
7273

7374
def __init__(self):
7475
self.buff = {}
75-
self.redis = redis.Redis.from_url(settings.BROKER_URL)
76+
self.redis = get_redis_client()
7677
self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False)
7778
self.queue_pop = 0
7879
self.queue_name = settings.CALLBACK_QUEUE

awx/main/models/ha.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
)
3434
from awx.main.models.unified_jobs import UnifiedJob
3535
from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity
36+
from awx.main.utils.redis import get_redis_client
3637
from awx.main.models.mixins import RelatedJobsMixin, ResourceMixin
3738
from awx.main.models.receptor_address import ReceptorAddress
3839

@@ -397,7 +398,7 @@ def local_health_check(self):
397398
try:
398399
# if redis is down for some reason, that means we can't persist
399400
# playbook event data; we should consider this a zero capacity event
400-
redis.Redis.from_url(settings.BROKER_URL).ping()
401+
get_redis_client().ping()
401402
except redis.ConnectionError:
402403
errors = _('Failed to connect to Redis')
403404

awx/main/queue.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
# Python
55
import json
66
import logging
7-
import redis
87

98
# Django
109
from django.conf import settings
1110

11+
# AWX
12+
from awx.main.utils.redis import get_redis_client
13+
1214
__all__ = ['CallbackQueueDispatcher']
1315

1416

@@ -26,7 +28,7 @@ class CallbackQueueDispatcher(object):
2628
def __init__(self):
2729
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
2830
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
29-
self.connection = redis.Redis.from_url(settings.BROKER_URL)
31+
self.connection = get_redis_client()
3032

3133
def dispatch(self, obj):
3234
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))

awx/main/routing.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from ansible_base.lib.channels.middleware import DrfAuthMiddlewareStack
1010

11+
from awx.main.utils.redis import get_redis_client
1112
from . import consumers
1213

1314

@@ -18,7 +19,7 @@
1819
class AWXProtocolTypeRouter(ProtocolTypeRouter):
1920
def __init__(self, *args, **kwargs):
2021
try:
21-
r = redis.Redis.from_url(settings.BROKER_URL)
22+
r = get_redis_client()
2223
for k in r.scan_iter('asgi:*', 500):
2324
logger.debug(f"cleaning up Redis key {k}")
2425
r.delete(k)

awx/main/tests/functional/conftest.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,34 @@ def swagger_autogen(requests=__SWAGGER_REQUESTS__):
7777

7878

7979
class FakeRedis:
80+
def __init__(self, *args, **kwargs):
81+
# Accept and ignore all arguments to match redis.Redis signature
82+
pass
83+
8084
def keys(self, *args, **kwargs):
8185
return []
8286

83-
def set(self):
87+
def set(self, *args, **kwargs):
8488
pass
8589

86-
def get(self):
90+
def get(self, *args, **kwargs):
91+
return None
92+
93+
def rpush(self, *args, **kwargs):
94+
return 1
95+
96+
def blpop(self, *args, **kwargs):
8797
return None
8898

99+
def delete(self, *args, **kwargs):
100+
pass
101+
102+
def llen(self, *args, **kwargs):
103+
return 0
104+
105+
def scan_iter(self, *args, **kwargs):
106+
return iter([])
107+
89108
@classmethod
90109
def from_url(cls, *args, **kwargs):
91110
return cls()

0 commit comments

Comments
 (0)