diff options
Diffstat (limited to '')
-rw-r--r-- | src/datafile.c | 101 |
1 files changed, 89 insertions, 12 deletions
diff --git a/src/datafile.c b/src/datafile.c index da2acac..06ea0ea 100644 --- a/src/datafile.c +++ b/src/datafile.c @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 OARC, Inc. + * Copyright 2019-2022 OARC, Inc. * Copyright 2017-2018 Akamai Technologies * Copyright 2006-2016 Nominum, Inc. * All rights reserved. @@ -33,7 +33,10 @@ #include <sys/stat.h> #include <assert.h> -perf_datafile_t* perf_datafile_open(const char* filename) +static perf_result_t read_one_blob(perf_datafile_t* dfile, perf_buffer_t* wire); +static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines); + +perf_datafile_t* perf_datafile_open(const char* filename, perf_input_format_t format) { perf_datafile_t* dfile; struct stat buf; @@ -63,6 +66,18 @@ perf_datafile_t* perf_datafile_open(const char* filename) dfile->size = buf.st_size; } } + dfile->format = format; + switch (format) { + case input_format_text_query: + case input_format_text_update: + dfile->readfunc = read_one_line; + break; + case input_format_tcp_wire_format: + dfile->readfunc = read_one_blob; + break; + default: + perf_log_fatal("invalid datafile format"); + }; return dfile; } @@ -129,13 +144,15 @@ static perf_result_t read_more(perf_datafile_t* dfile) dfile->at = 0; } + /* leave space for \0 string termination at the end */ n = read(dfile->fd, &dfile->databuf[dfile->have], sizeof(dfile->databuf) - dfile->have - 1); if (n < 0) { return (PERF_R_FAILURE); + } else if (n == 0) { + return (PERF_R_EOF); } dfile->have += n; - dfile->databuf[dfile->have] = 0; if (dfile->is_file && dfile->have == dfile->size) { dfile->cached = true; @@ -144,6 +161,61 @@ static perf_result_t read_more(perf_datafile_t* dfile) return (PERF_R_SUCCESS); } +/* Binary format: (<uint16_t> for blob length, <bytes>), repeat + * Outputs single packet _without_ the length preambule. */ +static perf_result_t read_one_blob(perf_datafile_t* dfile, perf_buffer_t* wire) +{ + perf_result_t result; + uint16_t packet_size; /* 2-byte preambule like in the TCP stream */ + + while (true) { + if ((dfile->have - dfile->at) < sizeof(packet_size)) { + /* we don't have complete preambule yet */ + if (dfile->cached) { + if ((dfile->have - dfile->at) == 0) { + return PERF_R_EOF; + } else { + return PERF_R_INVALIDFILE; + } + } + result = read_more(dfile); + if (result != PERF_R_SUCCESS) { + if (result == PERF_R_EOF && (dfile->have - dfile->at) != 0) { + /* incomplete preambule at the end of file */ + result = PERF_R_INVALIDFILE; + } + return (result); + } + continue; + } + memcpy(&packet_size, &dfile->databuf[dfile->at], sizeof(uint16_t)); + packet_size = ntohs(packet_size); + break; + } + while (true) { + if (sizeof(packet_size) + packet_size > (dfile->have - dfile->at)) { + if (dfile->cached) { + return PERF_R_INVALIDFILE; + } + result = read_more(dfile); + if (result != PERF_R_SUCCESS) { + if (result == PERF_R_EOF) { + /* incomplete blob at the end of file */ + result = PERF_R_INVALIDFILE; + } + return (result); + } + continue; + } + break; + } + + perf_buffer_putmem(wire, ((unsigned char*)&dfile->databuf[dfile->at]) + sizeof(packet_size), packet_size); + dfile->at += sizeof(packet_size) + packet_size; + return (PERF_R_SUCCESS); +} + +/* String in output buffer is not \0 terminated, check length in dfile->have */ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines) { const char* cur; @@ -163,11 +235,12 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines) if (curlen == nrem) { if (!dfile->cached) { result = read_more(dfile); + /* line terminator for text input */ + dfile->databuf[dfile->have] = 0; if (result != PERF_R_SUCCESS) return (result); } if (dfile->have - dfile->at == 0) { - dfile->nruns++; return (PERF_R_EOF); } if (dfile->have - dfile->at > nrem) @@ -176,7 +249,7 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines) /* We now have a line. Advance the buffer past it. */ dfile->at += curlen; - if (dfile->have - dfile->at > 0) { + if (dfile->at < dfile->have) { dfile->at += 1; } @@ -194,7 +267,7 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines) return (PERF_R_SUCCESS); } -perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, bool is_update) +perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines) { const char* current; perf_result_t result; @@ -206,15 +279,16 @@ perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, b goto done; } - result = read_one_line(dfile, lines); + result = dfile->readfunc(dfile, lines); if (result == PERF_R_EOF) { if (!dfile->read_any) { result = PERF_R_INVALIDFILE; goto done; } + dfile->nruns++; if (dfile->maxruns != dfile->nruns) { reopen_file(dfile); - result = read_one_line(dfile, lines); + result = dfile->readfunc(dfile, lines); } } if (result != PERF_R_SUCCESS) { @@ -222,12 +296,15 @@ perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, b } dfile->read_any = true; - if (is_update) { + if (dfile->format == input_format_text_update) { while (true) { current = perf_buffer_used(lines); - result = read_one_line(dfile, lines); - if (result == PERF_R_EOF && dfile->maxruns != dfile->nruns) { - reopen_file(dfile); + result = dfile->readfunc(dfile, lines); + if (result == PERF_R_EOF) { + dfile->nruns++; + if (dfile->maxruns != dfile->nruns) { + reopen_file(dfile); + } } if (result != PERF_R_SUCCESS || strcasecmp(current, "send") == 0) break; |