Skip to content

Commit d2911ea

Browse files
committed
refactor: Improve error handling and response validation in config fetching and log sending
1 parent a3fe149 commit d2911ea

File tree

2 files changed

+70
-25
lines changed

2 files changed

+70
-25
lines changed

src/worker/config.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ pub async fn fetch_config(
133133
.send()
134134
.await?;
135135

136-
match response.status() {
136+
let status = response.status();
137+
match status {
137138
StatusCode::OK => {
138139
// Check if response is gzipped by looking at Content-Encoding header first
139140
let content_encoding = response.headers()
@@ -176,15 +177,44 @@ pub async fn fetch_config(
176177
.map_err(|e| format!("Response contains invalid UTF-8: {}", e))?
177178
};
178179

179-
let body: ConfigApiResponse = serde_json::from_str(&json_text)
180-
.map_err(|e| format!("Failed to parse JSON response: {}", e))?;
180+
// Check if response body is empty
181+
let json_text = json_text.trim();
182+
if json_text.is_empty() {
183+
return Err("API returned empty response body".into());
184+
}
185+
186+
let body: ConfigApiResponse = serde_json::from_str(json_text)
187+
.map_err(|e| {
188+
let preview = if json_text.len() > 200 {
189+
format!("{}...", &json_text[..200])
190+
} else {
191+
json_text.to_string()
192+
};
193+
format!("Failed to parse JSON response: {}. Response preview: {}", e, preview)
194+
})?;
181195
// Update global config snapshot
182196
set_global_config(body.config.clone());
183197
Ok(body)
184198
}
185199
StatusCode::BAD_REQUEST | StatusCode::NOT_FOUND | StatusCode::INTERNAL_SERVER_ERROR => {
186-
let body: ErrorResponse = serde_json::from_str(&response.text().await?)?;
187-
Err(format!("API Error: {}", body.error).into())
200+
let response_text = response.text().await?;
201+
let trimmed = response_text.trim();
202+
let status_code = status.as_u16();
203+
if trimmed.is_empty() {
204+
return Err(format!("API returned empty response body with status {}", status_code).into());
205+
}
206+
match serde_json::from_str::<ErrorResponse>(trimmed) {
207+
Ok(body) => Err(format!("API Error: {}", body.error).into()),
208+
Err(e) => {
209+
let preview = if trimmed.len() > 200 {
210+
format!("{}...", &trimmed[..200])
211+
} else {
212+
trimmed.to_string()
213+
};
214+
Err(format!("API returned status {} but response is not valid JSON: {}. Response preview: {}",
215+
status_code, e, preview).into())
216+
}
217+
}
188218
}
189219

190220
status => Err(format!(

src/worker/log.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ use chrono::{DateTime, Utc};
88

99
use crate::http_client;
1010

11+
/// Maximum batch size allowed by the API server
12+
const API_MAX_BATCH_SIZE: usize = 1000;
13+
1114
/// Configuration for sending access logs to arxignis server
1215
#[derive(Debug, Clone)]
1316
pub struct LogSenderConfig {
@@ -27,7 +30,7 @@ impl LogSenderConfig {
2730
enabled,
2831
base_url,
2932
api_key,
30-
batch_size_limit: 5000, // Default: 5000 logs per batch
33+
batch_size_limit: 1000, // Default: 1000 logs per batch (API limit)
3134
batch_size_bytes: 5 * 1024 * 1024, // Default: 5MB
3235
batch_timeout_secs: 10, // Default: 10 seconds
3336
include_request_body: false, // Default: disabled
@@ -206,6 +209,7 @@ pub fn send_event(event: UnifiedEvent) {
206209
}
207210

208211
/// Send a batch of events to the /events endpoint
212+
/// Automatically splits large batches into chunks of API_MAX_BATCH_SIZE
209213
async fn send_event_batch(events: Vec<UnifiedEvent>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
210214
if events.is_empty() {
211215
return Ok(());
@@ -232,27 +236,38 @@ async fn send_event_batch(events: Vec<UnifiedEvent>) -> Result<(), Box<dyn std::
232236
.map_err(|e| format!("Failed to get global HTTP client: {}", e))?;
233237

234238
let url = format!("{}/events", config.base_url);
235-
let json = serde_json::to_string(&events)?;
236-
237-
log::debug!("Sending {} events to {}", events.len(), url);
238-
239-
let response = client
240-
.post(&url)
241-
.header("Authorization", format!("Bearer {}", config.api_key))
242-
.header("Content-Type", "application/json")
243-
.body(json)
244-
.send()
245-
.await?;
246-
247-
if !response.status().is_success() {
248-
let status = response.status();
249-
let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
250-
log::warn!("Failed to send event batch to /events endpoint: {} - {} (batch size: {})", status, error_text, events.len());
251-
return Err(format!("HTTP {}: {}", status, error_text).into());
252-
} else {
253-
log::debug!("Successfully sent event batch to /events endpoint (batch size: {})", events.len());
239+
240+
// Split events into chunks of API_MAX_BATCH_SIZE to respect API limits
241+
let chunks: Vec<_> = events.chunks(API_MAX_BATCH_SIZE).collect();
242+
let total_events = events.len();
243+
244+
for (chunk_idx, chunk) in chunks.iter().enumerate() {
245+
let json = serde_json::to_string(chunk)?;
246+
247+
log::debug!("Sending chunk {}/{} ({} events) to {}",
248+
chunk_idx + 1, chunks.len(), chunk.len(), url);
249+
250+
let response = client
251+
.post(&url)
252+
.header("Authorization", format!("Bearer {}", config.api_key))
253+
.header("Content-Type", "application/json")
254+
.body(json)
255+
.send()
256+
.await?;
257+
258+
if !response.status().is_success() {
259+
let status = response.status();
260+
let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
261+
log::warn!("Failed to send event batch chunk {}/{} to /events endpoint: {} - {} (chunk size: {}, total batch: {})",
262+
chunk_idx + 1, chunks.len(), status, error_text, chunk.len(), total_events);
263+
return Err(format!("HTTP {}: {}", status, error_text).into());
264+
} else {
265+
log::debug!("Successfully sent event batch chunk {}/{} to /events endpoint (chunk size: {})",
266+
chunk_idx + 1, chunks.len(), chunk.len());
267+
}
254268
}
255269

270+
log::debug!("Successfully sent all {} events in {} chunk(s) to /events endpoint", total_events, chunks.len());
256271
Ok(())
257272
}
258273

0 commit comments

Comments
 (0)