summaryrefslogtreecommitdiffstats
path: root/src/datafile.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/datafile.c101
1 files changed, 89 insertions, 12 deletions
diff --git a/src/datafile.c b/src/datafile.c
index da2acac..06ea0ea 100644
--- a/src/datafile.c
+++ b/src/datafile.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2019-2021 OARC, Inc.
+ * Copyright 2019-2022 OARC, Inc.
* Copyright 2017-2018 Akamai Technologies
* Copyright 2006-2016 Nominum, Inc.
* All rights reserved.
@@ -33,7 +33,10 @@
#include <sys/stat.h>
#include <assert.h>
-perf_datafile_t* perf_datafile_open(const char* filename)
+static perf_result_t read_one_blob(perf_datafile_t* dfile, perf_buffer_t* wire);
+static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines);
+
+perf_datafile_t* perf_datafile_open(const char* filename, perf_input_format_t format)
{
perf_datafile_t* dfile;
struct stat buf;
@@ -63,6 +66,18 @@ perf_datafile_t* perf_datafile_open(const char* filename)
dfile->size = buf.st_size;
}
}
+ dfile->format = format;
+ switch (format) {
+ case input_format_text_query:
+ case input_format_text_update:
+ dfile->readfunc = read_one_line;
+ break;
+ case input_format_tcp_wire_format:
+ dfile->readfunc = read_one_blob;
+ break;
+ default:
+ perf_log_fatal("invalid datafile format");
+ };
return dfile;
}
@@ -129,13 +144,15 @@ static perf_result_t read_more(perf_datafile_t* dfile)
dfile->at = 0;
}
+ /* leave space for \0 string termination at the end */
n = read(dfile->fd, &dfile->databuf[dfile->have], sizeof(dfile->databuf) - dfile->have - 1);
if (n < 0) {
return (PERF_R_FAILURE);
+ } else if (n == 0) {
+ return (PERF_R_EOF);
}
dfile->have += n;
- dfile->databuf[dfile->have] = 0;
if (dfile->is_file && dfile->have == dfile->size) {
dfile->cached = true;
@@ -144,6 +161,61 @@ static perf_result_t read_more(perf_datafile_t* dfile)
return (PERF_R_SUCCESS);
}
+/* Binary format: (<uint16_t> for blob length, <bytes>), repeat
+ * Outputs single packet _without_ the length preambule. */
+static perf_result_t read_one_blob(perf_datafile_t* dfile, perf_buffer_t* wire)
+{
+ perf_result_t result;
+ uint16_t packet_size; /* 2-byte preambule like in the TCP stream */
+
+ while (true) {
+ if ((dfile->have - dfile->at) < sizeof(packet_size)) {
+ /* we don't have complete preambule yet */
+ if (dfile->cached) {
+ if ((dfile->have - dfile->at) == 0) {
+ return PERF_R_EOF;
+ } else {
+ return PERF_R_INVALIDFILE;
+ }
+ }
+ result = read_more(dfile);
+ if (result != PERF_R_SUCCESS) {
+ if (result == PERF_R_EOF && (dfile->have - dfile->at) != 0) {
+ /* incomplete preambule at the end of file */
+ result = PERF_R_INVALIDFILE;
+ }
+ return (result);
+ }
+ continue;
+ }
+ memcpy(&packet_size, &dfile->databuf[dfile->at], sizeof(uint16_t));
+ packet_size = ntohs(packet_size);
+ break;
+ }
+ while (true) {
+ if (sizeof(packet_size) + packet_size > (dfile->have - dfile->at)) {
+ if (dfile->cached) {
+ return PERF_R_INVALIDFILE;
+ }
+ result = read_more(dfile);
+ if (result != PERF_R_SUCCESS) {
+ if (result == PERF_R_EOF) {
+ /* incomplete blob at the end of file */
+ result = PERF_R_INVALIDFILE;
+ }
+ return (result);
+ }
+ continue;
+ }
+ break;
+ }
+
+ perf_buffer_putmem(wire, ((unsigned char*)&dfile->databuf[dfile->at]) + sizeof(packet_size), packet_size);
+ dfile->at += sizeof(packet_size) + packet_size;
+ return (PERF_R_SUCCESS);
+}
+
+/* String in output buffer is not \0 terminated, check length in dfile->have */
static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines)
{
const char* cur;
@@ -163,11 +235,12 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines)
if (curlen == nrem) {
if (!dfile->cached) {
result = read_more(dfile);
+ /* line terminator for text input */
+ dfile->databuf[dfile->have] = 0;
if (result != PERF_R_SUCCESS)
return (result);
}
if (dfile->have - dfile->at == 0) {
- dfile->nruns++;
return (PERF_R_EOF);
}
if (dfile->have - dfile->at > nrem)
@@ -176,7 +249,7 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines)
/* We now have a line. Advance the buffer past it. */
dfile->at += curlen;
- if (dfile->have - dfile->at > 0) {
+ if (dfile->at < dfile->have) {
dfile->at += 1;
}
@@ -194,7 +267,7 @@ static perf_result_t read_one_line(perf_datafile_t* dfile, perf_buffer_t* lines)
return (PERF_R_SUCCESS);
}
-perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, bool is_update)
+perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines)
{
const char* current;
perf_result_t result;
@@ -206,15 +279,16 @@ perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, b
goto done;
}
- result = read_one_line(dfile, lines);
+ result = dfile->readfunc(dfile, lines);
if (result == PERF_R_EOF) {
if (!dfile->read_any) {
result = PERF_R_INVALIDFILE;
goto done;
}
+ dfile->nruns++;
if (dfile->maxruns != dfile->nruns) {
reopen_file(dfile);
- result = read_one_line(dfile, lines);
+ result = dfile->readfunc(dfile, lines);
}
}
if (result != PERF_R_SUCCESS) {
@@ -222,12 +296,15 @@ perf_result_t perf_datafile_next(perf_datafile_t* dfile, perf_buffer_t* lines, b
}
dfile->read_any = true;
- if (is_update) {
+ if (dfile->format == input_format_text_update) {
while (true) {
current = perf_buffer_used(lines);
- result = read_one_line(dfile, lines);
- if (result == PERF_R_EOF && dfile->maxruns != dfile->nruns) {
- reopen_file(dfile);
+ result = dfile->readfunc(dfile, lines);
+ if (result == PERF_R_EOF) {
+ dfile->nruns++;
+ if (dfile->maxruns != dfile->nruns) {
+ reopen_file(dfile);
+ }
}
if (result != PERF_R_SUCCESS || strcasecmp(current, "send") == 0)
break;