Skip to content

Commit 19e4fa9

Browse files
committed
Fixed log format sniffing and processing with multiple threads.
- Improved log format sniffing to handle multiple threads correctly - Process valid log lines immediately during sniffing phase when dry_run=0
1 parent 023a50f commit 19e4fa9

File tree

1 file changed

+63
-12
lines changed

1 file changed

+63
-12
lines changed

src/parser.c

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2084,20 +2084,13 @@ read_line (GLog *glog, char *line, int *test, uint32_t *cnt, int dry_run) {
20842084
return logitem;
20852085
}
20862086

2087-
/* Parse chunk of lines to logitems */
20882087
static void *
20892088
read_lines_thread (void *arg) {
20902089
GJob *job = (GJob *) arg;
20912090
int i = 0;
20922091

20932092
for (i = 0; i < job->p; i++) {
2094-
/* ensure we don't process more than we should when testing for log format,
2095-
* else free chunk and stop processing threads */
2096-
if (!job->test || (job->test && job->cnt < conf.num_tests))
2097-
job->logitems[i] = read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
2098-
else
2099-
conf.stop_processing = 1;
2100-
2093+
job->logitems[i] = read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
21012094
#ifdef WITH_GETLINE
21022095
free (job->lines[i]);
21032096
#endif
@@ -2243,6 +2236,56 @@ free_jobs (GJob jobs[2][conf.jobs]) {
22432236
}
22442237
}
22452238

2239+
/* To encapsulate the initial log format sniffing and processing.
2240+
* Returns 0 on success (format verified), 1 on error (format mismatch). */
2241+
static int
2242+
perform_initial_sniff (FILE *fp, GLog *glog, GJob jobs[2][conf.jobs], int dry_run,
2243+
int *global_test_flag) {
2244+
uint32_t initial_cnt = 0;
2245+
/* Flag to track if a valid format has been found (1 = no, 0 = yes) */
2246+
int initial_test = 1;
2247+
char *s = NULL;
2248+
2249+
/* Read and parse lines until `conf.num_tests` are processed or EOF. */
2250+
while (initial_cnt < conf.num_tests && (s = fgetline (fp)) != NULL) {
2251+
GLogItem *logitem = read_line (glog, s, &initial_test, &initial_cnt, dry_run);
2252+
2253+
if (logitem != NULL) {
2254+
/* Intentional: If in actual processing mode (`dry_run == 0`) and valid,
2255+
* process the log item. This prevents redundant reads and supports pipes. */
2256+
if (!dry_run && logitem->errstr == NULL) {
2257+
process_log (logitem);
2258+
free_glog (logitem);
2259+
}
2260+
/* `read_line` or subsequent logic handles freeing `logitem` otherwise. */
2261+
}
2262+
free (s); /* Free buffer from fgetline. */
2263+
}
2264+
2265+
/* If no valid format was found within `conf.num_tests` lines, signal an
2266+
* error. */
2267+
if (initial_test) {
2268+
uncount_processed (glog);
2269+
uncount_invalid (glog);
2270+
free_jobs (jobs); /* Clean up all job structures. */
2271+
return 1; /* Indicate format mismatch/error. */
2272+
}
2273+
2274+
/* Format verified. Prepare for multi-threaded phase: The file pointer is
2275+
* deliberately not reset (no `fseek`). This allows continuous reading from the
2276+
* current position, crucial for pipes and efficiency. */
2277+
*global_test_flag = 0; /* Update global `test` flag to disable further format testing. */
2278+
2279+
/* Propagate disabled 'test' status to all GJob structures to prevent
2280+
* premature termination. */
2281+
for (int b = 0; b < 2; b++) {
2282+
for (int k = 0; k < conf.jobs; k++) {
2283+
jobs[b][k].test = 0;
2284+
}
2285+
}
2286+
return 0; /* Success: format verified. */
2287+
}
2288+
22462289
/* Reads lines from the given file pointer `fp` and processes them using
22472290
* parallel threads.
22482291
*
@@ -2263,6 +2306,16 @@ read_lines (FILE *fp, GLog *glog, int dry_run) {
22632306

22642307
init_jobs (jobs, glog, dry_run, test);
22652308

2309+
/* Perform initial single-threaded log format sniffing if `conf.num_tests` is
2310+
* active. This helps validate the log format early and sets up the state for
2311+
* multi-threaded processing. */
2312+
if (test) {
2313+
if (perform_initial_sniff (fp, glog, jobs, dry_run, &test) != 0) {
2314+
return 1; /* Sniffing failed (log format mismatch). */
2315+
}
2316+
cnt = 0; /* Reset main counter after sniff, as initial lines are handled. */
2317+
}
2318+
22662319
b = 0;
22672320
while (1) { /* b = 0 or 1 */
22682321
read_lines_from_file (fp, glog, jobs, b, &s);
@@ -2317,7 +2370,7 @@ read_lines (FILE *fp, GLog *glog, int dry_run) {
23172370
b = b ^ 1;
23182371
} // while (1)
23192372

2320-
/* After eof, process last data */
2373+
/* After EOF, process last data */
23212374
for (b = 0; b < 2; b++) {
23222375
for (k = 0; k < conf.jobs; k++) {
23232376
if (conf.jobs > 1 && jobs[b][k].running) {
@@ -2337,9 +2390,7 @@ read_lines (FILE *fp, GLog *glog, int dry_run) {
23372390

23382391
free_jobs (jobs);
23392392

2340-
/* if no data was available to read from (probably from a pipe) and still in
2341-
* test mode and still below the test count, we simply return until data
2342-
* becomes available */
2393+
/* Handle pipe case with insufficient data */
23432394
if (!s && (errno == EAGAIN || errno == EWOULDBLOCK) && test && cnt < conf.num_tests)
23442395
return 0;
23452396

0 commit comments

Comments
 (0)