Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .github/workflows/build_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ on:
push:
branches:
- main
- keycloak-realm-with-groups
- fail-fast-off
- newui-all-in
- rebuild-v1.0.0
- support-using-subdir-of-bpmn-spec-repo
- api-logs
tags: [v*]

jobs:
Expand Down Expand Up @@ -159,4 +155,5 @@ jobs:
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Adding markdown
run: echo 'TAGS ${{ steps.meta.outputs.tags }}' >> "$GITHUB_STEP_SUMMARY"
run: echo 'TAGS ${{ steps.meta.outputs.tags }}' >> "$GITHUB_STEP_SUMMARY"

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""empty message

Revision ID: 4efc3d8655be
Revises: e102044e51f8
Create Date: 2025-12-03 10:30:02.738210

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '4efc3d8655be'
down_revision = 'e102044e51f8'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('api_log',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('endpoint', sa.String(length=255), nullable=True),
sa.Column('method', sa.String(length=10), nullable=True),
sa.Column('request_body', sa.JSON(), nullable=True),
sa.Column('response_body', sa.JSON(), nullable=True),
sa.Column('status_code', sa.Integer(), nullable=True),
sa.Column('process_instance_id', sa.Integer(), nullable=True),
sa.Column('duration_ms', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('api_log', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_api_log_created_at'), ['created_at'], unique=False)
batch_op.create_index(batch_op.f('ix_api_log_duration_ms'), ['duration_ms'], unique=False)
batch_op.create_index(batch_op.f('ix_api_log_endpoint'), ['endpoint'], unique=False)
batch_op.create_index(batch_op.f('ix_api_log_method'), ['method'], unique=False)
batch_op.create_index(batch_op.f('ix_api_log_process_instance_id'), ['process_instance_id'], unique=False)
batch_op.create_index(batch_op.f('ix_api_log_status_code'), ['status_code'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('api_log', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_api_log_status_code'))
batch_op.drop_index(batch_op.f('ix_api_log_process_instance_id'))
batch_op.drop_index(batch_op.f('ix_api_log_method'))
batch_op.drop_index(batch_op.f('ix_api_log_endpoint'))
batch_op.drop_index(batch_op.f('ix_api_log_duration_ms'))
batch_op.drop_index(batch_op.f('ix_api_log_created_at'))

op.drop_table('api_log')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions spiffworkflow-backend/src/spiffworkflow_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.monitoring_service import configure_sentry
from spiffworkflow_backend.services.monitoring_service import setup_prometheus_metrics
from spiffworkflow_backend.utils.api_logging import setup_deferred_logging

# This commented out code is if you want to use the pymysql library with sqlalchemy rather than mysqlclient.
# mysqlclient can be hard to install when running non-docker local dev, but it is generally worth it because it is much faster.
Expand Down Expand Up @@ -78,6 +79,7 @@ def create_app() -> FlaskApp:
setup_config(app)
db.init_app(app)
migrate.init_app(app, db)
setup_deferred_logging(app)

connexion_app.add_error_handler(Exception, partial(handle_exception, app))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def config_from_env(variable_name: str, *, default: str | bool | int | None = No
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_SOURCE", default="spiffworkflow.org")
config_from_env("SPIFFWORKFLOW_BACKEND_API_LOGGING_ENABLED", default=False)

### permissions
config_from_env("SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def __str__(self) -> str:
msg += f"In file {self.file_name}. "
return msg

def to_dict(self) -> dict:
return self.serialized()

def serialized(self) -> dict[str, Any]:
initial_dict = self.__dict__
return_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,6 @@
FeatureFlagModel,
) # noqa: F401
from spiffworkflow_backend.models.process_caller_relationship import ProcessCallerRelationshipModel # noqa: F401
from spiffworkflow_backend.models.api_log_model import APILogModel # noqa: F401

add_listeners()
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from datetime import datetime
from datetime import timezone

from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db


@dataclass
class APILogModel(SpiffworkflowBaseDBModel):
__tablename__ = "api_log"

id: int = db.Column(db.Integer, primary_key=True)
created_at: datetime = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), index=True)
endpoint: str = db.Column(db.String(255), index=True)
method: str = db.Column(db.String(10), index=True)
request_body: dict | None = db.Column(db.JSON)
response_body: dict | None = db.Column(db.JSON)
status_code: int = db.Column(db.Integer, index=True)
duration_ms: int = db.Column(db.Integer, index=True)

# not a foreign key so we can create and keep the log regardless of the state or process instance
process_instance_id: int | None = db.Column(db.Integer, nullable=True, index=True)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.upsearch_service import UpsearchService
from spiffworkflow_backend.utils.api_logging import log_api_interaction


def message_model_list_from_root() -> flask.wrappers.Response:
Expand Down Expand Up @@ -90,6 +91,7 @@ def message_instance_list(
# -H 'authorization: Bearer [FIXME]' \
# -H 'content-type: application/json' \
# --data-raw '{"payload":{"sure": "yes", "food": "spicy"}}'
@log_api_interaction
def message_send(
modified_message_name: str,
body: dict[str, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.utils.api_logging import log_api_interaction


def process_instance_create(
Expand All @@ -74,14 +75,20 @@ def process_instance_run_deprecated(
force_run: bool = False,
execution_mode: str | None = None,
) -> flask.wrappers.Response:
return process_instance_run(
modified_process_model_identifier=modified_process_model_identifier,
process_instance_id=process_instance_id,
force_run=force_run,
execution_mode=execution_mode,
from typing import cast

return cast(
flask.wrappers.Response,
process_instance_run(
modified_process_model_identifier=modified_process_model_identifier,
process_instance_id=process_instance_id,
force_run=force_run,
execution_mode=execution_mode,
),
)


@log_api_interaction
def process_instance_run(
modified_process_model_identifier: str,
process_instance_id: int,
Expand Down
169 changes: 169 additions & 0 deletions spiffworkflow-backend/src/spiffworkflow_backend/utils/api_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import functools
import logging
import threading
import time
from collections.abc import Callable
from queue import Empty
from queue import Queue
from typing import Any

from flask import Flask
from flask import Response
from flask import current_app
from flask import g
from flask import has_request_context
from flask import request

from spiffworkflow_backend.models.api_log_model import APILogModel
from spiffworkflow_backend.models.db import db

logger = logging.getLogger(__name__)

# Thread-safe queue for deferred API log entries
_log_queue: Queue[APILogModel] = Queue()
_log_worker_started = False
_log_worker_lock = threading.Lock()


def _process_log_queue() -> None:
"""Process queued API log entries in batches."""
entries_to_commit = []

# Collect all pending entries
while not _log_queue.empty():
try:
entry = _log_queue.get_nowait()
entries_to_commit.append(entry)
except Empty:
break

if entries_to_commit:
try:
# Use a new session to avoid transaction conflicts
for entry in entries_to_commit:
db.session.add(entry)
db.session.commit()
logger.debug(f"Committed {len(entries_to_commit)} API log entries")
except Exception as e:
db.session.rollback()
logger.error(f"Failed to commit API log entries: {e}")


def _queue_api_log_entry(log_entry: APILogModel) -> None:
"""Queue an API log entry for deferred processing."""
_log_queue.put(log_entry)

# Try to mark that we have pending logs for this request context
if has_request_context(): # type: ignore[no-untyped-call]
try:
g.has_pending_api_logs = True
except RuntimeError:
# Fallback: process immediately if we can't set the flag
logger.debug("Processing API log immediately - couldn't set g flag")
_process_log_queue()
else:
# Outside Flask request context (like in tests), process logs immediately
logger.debug("Processing API log immediately - outside Flask request context")
_process_log_queue()


def setup_deferred_logging(app: Flask) -> None:
"""Set up teardown handlers for deferred API logging."""

@app.teardown_appcontext
def process_pending_logs(error: Exception | None) -> None:
"""Process any pending API logs after the request context ends."""
if hasattr(g, "has_pending_api_logs") and g.has_pending_api_logs:
_process_log_queue()


def log_api_interaction(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Check if API logging is enabled
if not current_app.config.get("SPIFFWORKFLOW_BACKEND_API_LOGGING_ENABLED", False):
return func(*args, **kwargs)

start_time = time.time()

# Capture request details
endpoint = ""
method = ""
request_body = None

if request:
try:
endpoint = request.path
method = request.method
request_body = request.get_json(silent=True)
except RuntimeError:
# Working outside of request context
pass
except Exception:
request_body = None

status_code = 500
response_body = None
process_instance_id = None
response = None

try:
response = func(*args, **kwargs)
except Exception as e:
# If an exception occurs, we try to get the status code from it if it's an API error
if hasattr(e, "status_code"):
status_code = e.status_code
if hasattr(e, "to_dict"):
response_body = e.to_dict()
raise e
finally:
duration_ms = int((time.time() - start_time) * 1000)

if response:
if isinstance(response, Response):
status_code = response.status_code
try:
if response.is_json:
response_body = response.get_json()
except Exception:
# We might fail to parse JSON if the response is not actually JSON
# despite the header, or some other issue. We'll just skip the body.
logger.warning("Failed to parse response body as JSON", exc_info=True)
pass
elif isinstance(response, tuple) and len(response) >= 2:
# Handle tuple responses like (data, status_code) or (data, status_code, headers)
response_body = response[0]
if isinstance(response[1], int):
status_code = response[1]
else:
# Handle other response types - assume it's the response body
response_body = response

# Extract process_instance_id if available in response
if response_body and isinstance(response_body, dict):
if "process_instance" in response_body and isinstance(response_body["process_instance"], dict):
process_instance_id = response_body["process_instance"].get("id")
elif "id" in response_body:
# Sometimes the response IS the process instance
process_instance_id = response_body.get("id")

# If not found in response, check if it was in the args (e.g. for run)
if not process_instance_id:
if "process_instance_id" in kwargs:
process_instance_id = kwargs["process_instance_id"]

log_entry = APILogModel(
endpoint=endpoint,
method=method,
request_body=request_body,
response_body=response_body,
status_code=status_code,
process_instance_id=process_instance_id,
duration_ms=duration_ms,
)
# Queue the log entry for deferred processing instead of immediate commit
_queue_api_log_entry(log_entry)

return response

return wrapper
Loading
Loading