Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions awx/main/models/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,26 @@ def save(self, *args, **kwargs):

def delete(self, *args, **kwargs):
self._update_host_smart_inventory_memeberships()

# Explicitly clean up role assignments before deletion to prevent
# orphaned assignments. This ensures RoleUserAssignment and
# RoleTeamAssignment records are properly cascade deleted
try:
from ansible_base.rbac.models import ObjectRole
from ansible_base.rbac import permission_registry

ct = permission_registry.content_type_model.objects.get_for_model(self)
object_roles = ObjectRole.objects.filter(content_type=ct, object_id=self.pk)
deleted_roles_count = object_roles.count()
object_roles.delete()

if deleted_roles_count > 0:
logger.debug(f"Cleaned up {deleted_roles_count} ObjectRole records " f"for inventory {self.pk}")

except Exception as e:
# Log the error but don't prevent inventory deletion
logger.warning(f"Failed to clean up role assignments for inventory " f"{self.pk}: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider this to be redundant with the existing signals.

But for the background task, that makes sense to me.


super(Inventory, self).delete(*args, **kwargs)

'''
Expand Down
20 changes: 19 additions & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,25 @@ def delete_inventory(inventory_id, user_id, retries=5):
user = None
with ignore_inventory_computed_fields(), ignore_inventory_group_removal(), impersonate(user):
try:
Inventory.objects.get(id=inventory_id).delete()
inventory = Inventory.objects.get(id=inventory_id)

# Defense-in-depth: Explicitly clean up role assignments before
# deletion. This ensures cleanup happens even if model signals
# don't fire properly
try:
from ansible_base.rbac.models import ObjectRole
from ansible_base.rbac import permission_registry

ct = permission_registry.content_type_model.objects.get_for_model(inventory)
object_roles = ObjectRole.objects.filter(content_type=ct, object_id=inventory_id)
deleted_roles_count = object_roles.count()
if deleted_roles_count > 0:
object_roles.delete()
logger.debug('Async deletion: Cleaned up {} ObjectRole records ' 'for inventory {}'.format(deleted_roles_count, inventory_id))
except Exception as e:
logger.warning('Failed to clean up role assignments during async ' 'deletion of inventory {}: {}'.format(inventory_id, e))

inventory.delete()
emit_channel_notification('inventories-status_changed', {'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'})
logger.debug('Deleted inventory {} as user {}.'.format(inventory_id, user_id))
except Inventory.DoesNotExist:
Expand Down
102 changes: 102 additions & 0 deletions awx/main/tests/functional/dab_rbac/test_inventory_role_assignments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright (c) 2024 Ansible, Inc.
# All Rights Reserved.

import pytest

from awx.main.models import Inventory, User
from awx.main.tasks.system import delete_inventory
from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment


@pytest.mark.django_db
def test_inventory_role_assignments_deleted_sync(inventory, alice, setup_managed_roles):
"""Test role assignments are properly cleaned up when inventory is deleted synchronously."""
# Get the managed Inventory Admin role definition
inv_rd = RoleDefinition.objects.get(name='Inventory Admin')

# Create a role assignment for the inventory
inv_rd.give_permission(alice, inventory)

# Verify assignment exists
assert RoleUserAssignment.objects.filter(user=alice, object_id=str(inventory.pk), role_definition=inv_rd).exists()

inventory_id = str(inventory.pk)

# Delete inventory synchronously
inventory.delete()

# Verify role assignment is cleaned up
assert not RoleUserAssignment.objects.filter(object_id=inventory_id, role_definition=inv_rd).exists()


@pytest.mark.django_db
def test_inventory_role_assignments_deleted_async(inventory, alice, setup_managed_roles):
"""Test role assignments are properly cleaned up via schedule_deletion."""
from unittest.mock import patch

# Get the managed Inventory Admin role definition
inv_rd = RoleDefinition.objects.get(name='Inventory Admin')

# Create a role assignment for the inventory
inv_rd.give_permission(alice, inventory)

inventory_id = inventory.pk
user_id = alice.pk

# Verify assignment exists before deletion
assert RoleUserAssignment.objects.filter(user=alice, object_id=str(inventory_id), role_definition=inv_rd).exists()

# Mock the WebSocket notification to avoid Redis dependency
with patch('awx.main.tasks.system.emit_channel_notification'):
# Call delete_inventory directly (simulating Celery task)
delete_inventory(inventory_id, user_id)

# Verify role assignment is cleaned up
assert not RoleUserAssignment.objects.filter(user_id=user_id, object_id=str(inventory_id), role_definition=inv_rd).exists()


@pytest.mark.django_db
def test_multiple_inventory_role_assignments_cleanup(inventory, alice, bob, setup_managed_roles):
"""Test multiple role assignments are cleaned up when inventory is deleted."""
# Get managed role definitions
inv_rd = RoleDefinition.objects.get(name='Inventory Admin')
use_rd = RoleDefinition.objects.get(name='Inventory Use')

# Create multiple role assignments
inv_rd.give_permission(alice, inventory)
use_rd.give_permission(bob, inventory)

inventory_id = str(inventory.pk)

# Verify assignments exist
assert RoleUserAssignment.objects.filter(object_id=inventory_id).count() == 2

# Delete inventory
inventory.delete()

# Verify all assignments are cleaned up
assert RoleUserAssignment.objects.filter(object_id=inventory_id).count() == 0


@pytest.mark.django_db
def test_aap_52518_reproduction(organization, setup_managed_roles):
"""Test the exact scenario described in AAP-52518."""
# Create user and inventory per Jira scenario
user = User.objects.create(username='testuser', email='[email protected]', is_active=True)
inventory = Inventory.objects.create(name='Test Controller Inventory', organization=organization, description='Test inventory for AAP-52518')

# Step 3: Create role user assignment for user to become Inventory Admin
inv_admin_role = RoleDefinition.objects.get(name='Inventory Admin')
inv_admin_role.give_permission(user, inventory)

# Verify assignment was created
assert RoleUserAssignment.objects.filter(user=user, object_id=str(inventory.pk), role_definition=inv_admin_role).exists()

inventory_id = inventory.pk
user_id = user.pk

# Step 4: Delete the inventory
inventory.delete()

# Step 5: Verify no orphaned role assignments remain
assert RoleUserAssignment.objects.filter(user_id=user_id, object_id=str(inventory_id)).count() == 0
Loading