Skip to content

Commit 2fa2cd8

Browse files
authored
Add timeout and on duplicate to system tasks (#16169)
Modify the invocation of @task_awx to accept timeout and on_duplicate keyword arguments. These arguments are only used in the new dispatcher implementation. Add decorator params: - timeout - on_duplicate to tasks to ensure better recovery for stuck or long-running processes. --------- Signed-off-by: Seth Foster <[email protected]>
1 parent f818595 commit 2fa2cd8

File tree

6 files changed

+44
-46
lines changed

6 files changed

+44
-46
lines changed

awx/main/analytics/analytics_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
logger = logging.getLogger('awx.main.scheduler')
1010

1111

12-
@task_awx(queue=get_task_queuename)
12+
@task_awx(queue=get_task_queuename, timeout=300, on_duplicate='discard')
1313
def send_subsystem_metrics():
1414
DispatcherMetrics().send_metrics()
1515
CallbackReceiverMetrics().send_metrics()

awx/main/dispatch/publish.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from uuid import uuid4
66

77
from dispatcherd.publish import submit_task
8+
from dispatcherd.processors.blocker import Blocker
89
from dispatcherd.utils import resolve_callable
910

1011
from django_guid import get_guid
@@ -60,13 +61,17 @@ def print_time(dispatch_time=None):
6061
print(f"Time I was dispatched: {dispatch_time}")
6162
"""
6263

63-
def __init__(self, queue=None, bind_kwargs=None):
64+
def __init__(self, queue=None, bind_kwargs=None, timeout=None, on_duplicate=None):
6465
self.queue = queue
6566
self.bind_kwargs = bind_kwargs
67+
self.timeout = timeout
68+
self.on_duplicate = on_duplicate
6669

6770
def __call__(self, fn=None):
6871
queue = self.queue
6972
bind_kwargs = self.bind_kwargs
73+
timeout = self.timeout
74+
on_duplicate = self.on_duplicate
7075

7176
class PublisherMixin(object):
7277
queue = None
@@ -102,7 +107,19 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
102107
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
103108
# At this point we have the import string, and submit_task wants the method, so back to that
104109
actual_task = resolve_callable(cls.name)
105-
return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw)
110+
processor_options = ()
111+
if on_duplicate is not None:
112+
processor_options = (Blocker.Params(on_duplicate=on_duplicate),)
113+
return submit_task(
114+
actual_task,
115+
args=args,
116+
kwargs=kwargs,
117+
queue=queue,
118+
uuid=uuid,
119+
timeout=timeout,
120+
processor_options=processor_options,
121+
**kw,
122+
)
106123
except Exception:
107124
logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}")
108125
# Continue with original implementation if anything fails

awx/main/tasks/host_indirect.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def cleanup_old_indirect_host_entries() -> None:
159159
IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete()
160160

161161

162-
@task(queue=get_task_queuename)
162+
@task(queue=get_task_queuename, timeout=3600 * 5)
163163
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
164164
try:
165165
job = Job.objects.get(id=job_id)
@@ -201,7 +201,7 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non
201201
logger.exception(f'Error processing indirect host data for job_id={job_id}')
202202

203203

204-
@task(queue=get_task_queuename)
204+
@task(queue=get_task_queuename, timeout=3600 * 5)
205205
def cleanup_and_save_indirect_host_entries_fallback() -> None:
206206
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
207207
return

awx/main/tasks/receptor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ def reload_receptor():
852852
raise RuntimeError("Receptor reload failed")
853853

854854

855-
@task_awx()
855+
@task_awx(on_duplicate='queue_one')
856856
def write_receptor_config():
857857
"""
858858
This task runs async on each control node, K8S only.
@@ -875,7 +875,7 @@ def write_receptor_config():
875875
reload_receptor()
876876

877877

878-
@task_awx(queue=get_task_queuename)
878+
@task_awx(queue=get_task_queuename, on_duplicate='discard')
879879
def remove_deprovisioned_node(hostname):
880880
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
881881
InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)

awx/main/tasks/system.py

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def inform_cluster_of_shutdown():
184184
logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname)
185185

186186

187-
@task_awx(queue=get_task_queuename)
187+
@task_awx(queue=get_task_queuename, timeout=3600 * 5)
188188
def migrate_jsonfield(table, pkfield, columns):
189189
batchsize = 10000
190190
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
@@ -230,7 +230,7 @@ def migrate_jsonfield(table, pkfield, columns):
230230
logger.warning(f"Migration of {table} to jsonb is finished.")
231231

232232

233-
@task_awx(queue=get_task_queuename)
233+
@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
234234
def apply_cluster_membership_policies():
235235
from awx.main.signals import disable_activity_stream
236236

@@ -342,7 +342,7 @@ def apply_cluster_membership_policies():
342342
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
343343

344344

345-
@task_awx(queue='tower_settings_change')
345+
@task_awx(queue='tower_settings_change', timeout=600)
346346
def clear_setting_cache(setting_keys):
347347
# log that cache is being cleared
348348
logger.info(f"clear_setting_cache of keys {setting_keys}")
@@ -355,7 +355,7 @@ def clear_setting_cache(setting_keys):
355355
cache.delete_many(cache_keys)
356356

357357

358-
@task_awx(queue='tower_broadcast_all')
358+
@task_awx(queue='tower_broadcast_all', timeout=600)
359359
def delete_project_files(project_path):
360360
# TODO: possibly implement some retry logic
361361
lock_file = project_path + '.lock'
@@ -383,7 +383,7 @@ def profile_sql(threshold=1, minutes=1):
383383
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
384384

385385

386-
@task_awx(queue=get_task_queuename)
386+
@task_awx(queue=get_task_queuename, timeout=1800)
387387
def send_notifications(notification_list, job_id=None):
388388
if not isinstance(notification_list, list):
389389
raise TypeError("notification_list should be of type list")
@@ -428,13 +428,13 @@ def events_processed_hook(unified_job):
428428
save_indirect_host_entries.delay(unified_job.id)
429429

430430

431-
@task_awx(queue=get_task_queuename)
431+
@task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard')
432432
def gather_analytics():
433433
if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
434434
analytics.gather()
435435

436436

437-
@task_awx(queue=get_task_queuename)
437+
@task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one')
438438
def purge_old_stdout_files():
439439
nowtime = time.time()
440440
for f in os.listdir(settings.JOBOUTPUT_ROOT):
@@ -496,37 +496,18 @@ def run(cls, **kwargs):
496496
cls.run_remote(this_inst, **kwargs)
497497

498498

499-
@task_awx(queue='tower_broadcast_all')
499+
@task_awx(queue='tower_broadcast_all', timeout=3600)
500500
def handle_removed_image(remove_images=None):
501501
"""Special broadcast invocation of this method to handle case of deleted EE"""
502502
CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='')
503503

504504

505-
@task_awx(queue=get_task_queuename)
505+
@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
506506
def cleanup_images_and_files():
507507
CleanupImagesAndFiles.run(image_prune=True)
508508

509509

510-
@task_awx(queue=get_task_queuename)
511-
def cluster_node_health_check(node):
512-
"""
513-
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
514-
"""
515-
if node == '':
516-
logger.warning('Local health check incorrectly called with blank string')
517-
return
518-
elif node != settings.CLUSTER_HOST_ID:
519-
logger.warning(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}')
520-
return
521-
try:
522-
this_inst = Instance.objects.me()
523-
except Instance.DoesNotExist:
524-
logger.warning(f'Instance record for {node} missing, could not check capacity.')
525-
return
526-
this_inst.local_health_check()
527-
528-
529-
@task_awx(queue=get_task_queuename)
510+
@task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one')
530511
def execution_node_health_check(node):
531512
if node == '':
532513
logger.warning('Remote health check incorrectly called with blank string')
@@ -850,7 +831,7 @@ def _heartbeat_handle_lost_instances(lost_instances, this_inst):
850831
logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname))
851832

852833

853-
@task_awx(queue=get_task_queuename)
834+
@task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one')
854835
def awx_receptor_workunit_reaper():
855836
"""
856837
When an AWX job is launched via receptor, files such as status, stdin, and stdout are created
@@ -896,7 +877,7 @@ def awx_receptor_workunit_reaper():
896877
administrative_workunit_reaper(receptor_work_list)
897878

898879

899-
@task_awx(queue=get_task_queuename)
880+
@task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one')
900881
def awx_k8s_reaper():
901882
if not settings.RECEPTOR_RELEASE_WORK:
902883
return
@@ -919,7 +900,7 @@ def awx_k8s_reaper():
919900
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
920901

921902

922-
@task_awx(queue=get_task_queuename)
903+
@task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard')
923904
def awx_periodic_scheduler():
924905
lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000
925906
with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired:
@@ -978,7 +959,7 @@ def awx_periodic_scheduler():
978959
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
979960

980961

981-
@task_awx(queue=get_task_queuename)
962+
@task_awx(queue=get_task_queuename, timeout=3600)
982963
def handle_failure_notifications(task_ids):
983964
"""A task-ified version of the method that sends notifications."""
984965
found_task_ids = set()
@@ -993,7 +974,7 @@ def handle_failure_notifications(task_ids):
993974
logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database')
994975

995976

996-
@task_awx(queue=get_task_queuename)
977+
@task_awx(queue=get_task_queuename, timeout=3600 * 5)
997978
def update_inventory_computed_fields(inventory_id):
998979
"""
999980
Signal handler and wrapper around inventory.update_computed_fields to
@@ -1043,7 +1024,7 @@ def update_smart_memberships_for_inventory(smart_inventory):
10431024
return False
10441025

10451026

1046-
@task_awx(queue=get_task_queuename)
1027+
@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
10471028
def update_host_smart_inventory_memberships():
10481029
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
10491030
changed_inventories = set([])
@@ -1059,7 +1040,7 @@ def update_host_smart_inventory_memberships():
10591040
smart_inventory.update_computed_fields()
10601041

10611042

1062-
@task_awx(queue=get_task_queuename)
1043+
@task_awx(queue=get_task_queuename, timeout=3600 * 5)
10631044
def delete_inventory(inventory_id, user_id, retries=5):
10641045
# Delete inventory as user
10651046
if user_id is None:
@@ -1121,7 +1102,7 @@ def _reconstruct_relationships(copy_mapping):
11211102
new_obj.save()
11221103

11231104

1124-
@task_awx(queue=get_task_queuename)
1105+
@task_awx(queue=get_task_queuename, timeout=600)
11251106
def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None):
11261107
logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
11271108

@@ -1176,7 +1157,7 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, p
11761157
update_inventory_computed_fields.delay(new_obj.id)
11771158

11781159

1179-
@task_awx(queue=get_task_queuename)
1160+
@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='discard')
11801161
def periodic_resource_sync():
11811162
if not getattr(settings, 'RESOURCE_SERVER', None):
11821163
logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured")

awx/main/utils/external_logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def escape_quotes(x):
139139
return tmpl
140140

141141

142-
@task_awx(queue='rsyslog_configurer')
142+
@task_awx(queue='rsyslog_configurer', timeout=600, on_duplicate='queue_one')
143143
def reconfigure_rsyslog():
144144
tmpl = construct_rsyslog_conf_template()
145145
# Write config to a temp file then move it to preserve atomicity

0 commit comments

Comments
 (0)