From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../thrift/lib/cpp/test/TransportTest.cpp | 1089 ++++++++++++++++++++ 1 file changed, 1089 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/cpp/test/TransportTest.cpp (limited to 'src/jaegertracing/thrift/lib/cpp/test/TransportTest.cpp') diff --git a/src/jaegertracing/thrift/lib/cpp/test/TransportTest.cpp b/src/jaegertracing/thrift/lib/cpp/test/TransportTest.cpp new file mode 100644 index 000000000..a890aa8ce --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/TransportTest.cpp @@ -0,0 +1,1089 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#if _WIN32 +#include +#include +#endif + +using namespace apache::thrift::transport; +using namespace apache::thrift; + +static boost::mt19937 rng; + +void initrand(unsigned int seed) { + rng.seed(seed); +} + +class SizeGenerator { +public: + virtual ~SizeGenerator() = default; + virtual uint32_t nextSize() = 0; + virtual std::string describe() const = 0; +}; + +class ConstantSizeGenerator : public SizeGenerator { +public: + ConstantSizeGenerator(uint32_t value) : value_(value) {} + uint32_t nextSize() override { return value_; } + std::string describe() const override { + std::ostringstream desc; + desc << value_; + return desc.str(); + } + +private: + uint32_t value_; +}; + +class RandomSizeGenerator : public SizeGenerator { +public: + RandomSizeGenerator(uint32_t min, uint32_t max) + : generator_(rng, boost::uniform_int(min, max)) {} + + uint32_t nextSize() override { return generator_(); } + + std::string describe() const override { + std::ostringstream desc; + desc << "rand(" << getMin() << ", " << getMax() << ")"; + return desc.str(); + } + + uint32_t getMin() const { return (generator_.distribution().min)(); } + uint32_t getMax() const { return (generator_.distribution().max)(); } + +private: + boost::variate_generator > generator_; +}; + +/** + * This class exists solely to make the TEST_RW() macro easier to use. + * - it can be constructed implicitly from an integer + * - it can contain either a ConstantSizeGenerator or a RandomSizeGenerator + * (TEST_RW can't take a SizeGenerator pointer or reference, since it needs + * to make a copy of the generator to bind it to the test function.) + */ +class GenericSizeGenerator : public SizeGenerator { +public: + GenericSizeGenerator(uint32_t value) : generator_(new ConstantSizeGenerator(value)) {} + GenericSizeGenerator(uint32_t min, uint32_t max) + : generator_(new RandomSizeGenerator(min, max)) {} + + uint32_t nextSize() override { return generator_->nextSize(); } + std::string describe() const override { return generator_->describe(); } + +private: + std::shared_ptr generator_; +}; + +/************************************************************************** + * Classes to set up coupled transports + **************************************************************************/ + +/** + * Helper class to represent a coupled pair of transports. + * + * Data written to the out transport can be read from the in transport. + * + * This is used as the base class for the various coupled transport + * implementations. It shouldn't be instantiated directly. + */ +template +class CoupledTransports { +public: + virtual ~CoupledTransports() = default; + typedef Transport_ TransportType; + + CoupledTransports() : in(), out() {} + + std::shared_ptr in; + std::shared_ptr out; + +private: + CoupledTransports(const CoupledTransports&) = delete; + CoupledTransports& operator=(const CoupledTransports&) = delete; +}; + +/** + * Coupled TMemoryBuffers + */ +class CoupledMemoryBuffers : public CoupledTransports { +public: + CoupledMemoryBuffers() : buf(new TMemoryBuffer) { + in = buf; + out = buf; + } + + std::shared_ptr buf; +}; + +/** + * Helper template class for creating coupled transports that wrap + * another transport. + */ +template +class CoupledWrapperTransportsT : public CoupledTransports { +public: + CoupledWrapperTransportsT() { + if (inner_.in) { + this->in.reset(new WrapperTransport_(inner_.in)); + } + if (inner_.out) { + this->out.reset(new WrapperTransport_(inner_.out)); + } + } + + InnerCoupledTransports_ inner_; +}; + +/** + * Coupled TBufferedTransports. + */ +template +class CoupledBufferedTransportsT + : public CoupledWrapperTransportsT {}; + +typedef CoupledBufferedTransportsT CoupledBufferedTransports; + +/** + * Coupled TFramedTransports. + */ +template +class CoupledFramedTransportsT + : public CoupledWrapperTransportsT {}; + +typedef CoupledFramedTransportsT CoupledFramedTransports; + +/** + * Coupled TZlibTransports. + */ +template +class CoupledZlibTransportsT : public CoupledWrapperTransportsT {}; + +typedef CoupledZlibTransportsT CoupledZlibTransports; + +#ifndef _WIN32 +// FD transport doesn't make much sense on Windows. +/** + * Coupled TFDTransports. + */ +class CoupledFDTransports : public CoupledTransports { +public: + CoupledFDTransports() { + int pipes[2]; + + if (pipe(pipes) != 0) { + return; + } + + in.reset(new TFDTransport(pipes[0], TFDTransport::CLOSE_ON_DESTROY)); + out.reset(new TFDTransport(pipes[1], TFDTransport::CLOSE_ON_DESTROY)); + } +}; +#else +/** + * Coupled pipe transports + */ +class CoupledPipeTransports : public CoupledTransports { +public: + HANDLE hRead; + HANDLE hWrite; + + CoupledPipeTransports() { + BOOST_REQUIRE(CreatePipe(&hRead, &hWrite, NULL, 1048576 * 2)); + in.reset(new TPipe(hRead, hWrite)); + in->open(); + out = in; + } +}; +#endif + +/** + * Coupled TSockets + */ +class CoupledSocketTransports : public CoupledTransports { +public: + CoupledSocketTransports() { + THRIFT_SOCKET sockets[2] = {0}; + if (THRIFT_SOCKETPAIR(PF_UNIX, SOCK_STREAM, 0, sockets) != 0) { + return; + } + + in.reset(new TSocket(sockets[0])); + out.reset(new TSocket(sockets[1])); + out->setSendTimeout(100); + } +}; + +// These could be made to work on Windows, but I don't care enough to make it happen +#ifndef _WIN32 +/** + * Coupled TFileTransports + */ +class CoupledFileTransports : public CoupledTransports { +public: + CoupledFileTransports() { +#ifndef _WIN32 + const char* tmp_dir = "/tmp"; +#define FILENAME_SUFFIX "/thrift.transport_test" +#else + const char* tmp_dir = getenv("TMP"); +#define FILENAME_SUFFIX "\\thrift.transport_test" +#endif + + // Create a temporary file to use + filename.resize(strlen(tmp_dir) + strlen(FILENAME_SUFFIX)); + THRIFT_SNPRINTF(&filename[0], filename.size(), "%s" FILENAME_SUFFIX, tmp_dir); +#undef FILENAME_SUFFIX + + { std::ofstream dummy_creation(filename.c_str(), std::ofstream::trunc); } + + in.reset(new TFileTransport(filename, true)); + out.reset(new TFileTransport(filename)); + } + + ~CoupledFileTransports() override { remove(filename.c_str()); } + + std::string filename; +}; +#endif + +/** + * Wrapper around another CoupledTransports implementation that exposes the + * transports as TTransport pointers. + * + * This is used since accessing a transport via a "TTransport*" exercises a + * different code path than using the base pointer class. As part of the + * template code changes, most transport methods are no longer virtual. + */ +template +class CoupledTTransports : public CoupledTransports { +public: + CoupledTTransports() : transports() { + in = transports.in; + out = transports.out; + } + + CoupledTransports_ transports; +}; + +/** + * Wrapper around another CoupledTransports implementation that exposes the + * transports as TBufferBase pointers. + * + * This can only be instantiated with a transport type that is a subclass of + * TBufferBase. + */ +template +class CoupledBufferBases : public CoupledTransports { +public: + CoupledBufferBases() : transports() { + in = transports.in; + out = transports.out; + } + + CoupledTransports_ transports; +}; + +/************************************************************************** + * Alarm handling code for use in tests that check the transport blocking + * semantics. + * + * If the transport ends up blocking, we don't want to hang forever. We use + * SIGALRM to fire schedule signal to wake up and try to write data so the + * transport will unblock. + * + * It isn't really the safest thing in the world to be mucking around with + * complicated global data structures in a signal handler. It should probably + * be okay though, since we know the main thread should always be blocked in a + * read() request when the signal handler is running. + **************************************************************************/ + +struct TriggerInfo { + TriggerInfo(int seconds, const std::shared_ptr& transport, uint32_t writeLength) + : timeoutSeconds(seconds), transport(transport), writeLength(writeLength), next(nullptr) {} + + int timeoutSeconds; + std::shared_ptr transport; + uint32_t writeLength; + TriggerInfo* next; +}; + +apache::thrift::concurrency::Monitor g_alarm_monitor; +TriggerInfo* g_triggerInfo; +unsigned int g_numTriggersFired; +bool g_teardown = false; + +void alarm_handler() { + TriggerInfo* info = nullptr; + { + apache::thrift::concurrency::Synchronized s(g_alarm_monitor); + // The alarm timed out, which almost certainly means we're stuck + // on a transport that is incorrectly blocked. + ++g_numTriggersFired; + + // Note: we print messages to stdout instead of stderr, since + // tools/test/runner only records stdout messages in the failure messages for + // boost tests. (boost prints its test info to stdout.) + printf("Timeout alarm expired; attempting to unblock transport\n"); + if (g_triggerInfo == nullptr) { + printf(" trigger stack is empty!\n"); + } + + // Pop off the first TriggerInfo. + // If there is another one, schedule an alarm for it. + info = g_triggerInfo; + g_triggerInfo = info->next; + } + + // Write some data to the transport to hopefully unblock it. + auto* buf = new uint8_t[info->writeLength]; + memset(buf, 'b', info->writeLength); + boost::scoped_array array(buf); + info->transport->write(buf, info->writeLength); + info->transport->flush(); + + delete info; +} + +void alarm_handler_wrapper() { + int64_t timeout = 0; // timeout of 0 means wait forever + while (true) { + bool fireHandler = false; + { + apache::thrift::concurrency::Synchronized s(g_alarm_monitor); + if (g_teardown) + return; + // calculate timeout + if (g_triggerInfo == nullptr) { + timeout = 0; + } else { + timeout = g_triggerInfo->timeoutSeconds * 1000; + } + + int waitResult = g_alarm_monitor.waitForTimeRelative(timeout); + if (waitResult == THRIFT_ETIMEDOUT) + fireHandler = true; + } + if (fireHandler) + alarm_handler(); // calling outside the lock + } +} + +/** + * Add a trigger to be scheduled "seconds" seconds after the + * last currently scheduled trigger. + * + * (Note that this is not "seconds" from now. That might be more logical, but + * would require slightly more complicated sorting, rather than just appending + * to the end.) + */ +void add_trigger(unsigned int seconds, + const std::shared_ptr& transport, + uint32_t write_len) { + auto* info = new TriggerInfo(seconds, transport, write_len); + { + apache::thrift::concurrency::Synchronized s(g_alarm_monitor); + if (g_triggerInfo == nullptr) { + // This is the first trigger. + // Set g_triggerInfo, and schedule the alarm + g_triggerInfo = info; + g_alarm_monitor.notify(); + } else { + // Add this trigger to the end of the list + TriggerInfo* prev = g_triggerInfo; + while (prev->next) { + prev = prev->next; + } + prev->next = info; + } + } +} + +void clear_triggers() { + TriggerInfo* info = nullptr; + + { + apache::thrift::concurrency::Synchronized s(g_alarm_monitor); + info = g_triggerInfo; + g_triggerInfo = nullptr; + g_numTriggersFired = 0; + g_alarm_monitor.notify(); + } + + while (info != nullptr) { + TriggerInfo* next = info->next; + delete info; + info = next; + } +} + +void set_trigger(unsigned int seconds, + const std::shared_ptr& transport, + uint32_t write_len) { + clear_triggers(); + add_trigger(seconds, transport, write_len); +} + +/************************************************************************** + * Test functions + **************************************************************************/ + +/** + * Test interleaved write and read calls. + * + * Generates a buffer totalSize bytes long, then writes it to the transport, + * and verifies the written data can be read back correctly. + * + * Mode of operation: + * - call wChunkGenerator to figure out how large of a chunk to write + * - call wSizeGenerator to get the size for individual write() calls, + * and do this repeatedly until the entire chunk is written. + * - call rChunkGenerator to figure out how large of a chunk to read + * - call rSizeGenerator to get the size for individual read() calls, + * and do this repeatedly until the entire chunk is read. + * - repeat until the full buffer is written and read back, + * then compare the data read back against the original buffer + * + * + * - If any of the size generators return 0, this means to use the maximum + * possible size. + * + * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that + * there are never more than maxOutstanding bytes waiting to be read back. + */ +template +void test_rw(uint32_t totalSize, + SizeGenerator& wSizeGenerator, + SizeGenerator& rSizeGenerator, + SizeGenerator& wChunkGenerator, + SizeGenerator& rChunkGenerator, + uint32_t maxOutstanding) { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + boost::shared_array wbuf = boost::shared_array(new uint8_t[totalSize]); + boost::shared_array rbuf = boost::shared_array(new uint8_t[totalSize]); + + // store some data in wbuf + for (uint32_t n = 0; n < totalSize; ++n) { + wbuf[n] = (n & 0xff); + } + // clear rbuf + memset(rbuf.get(), 0, totalSize); + + uint32_t total_written = 0; + uint32_t total_read = 0; + while (total_read < totalSize) { + // Determine how large a chunk of data to write + uint32_t wchunk_size = wChunkGenerator.nextSize(); + if (wchunk_size == 0 || wchunk_size > totalSize - total_written) { + wchunk_size = totalSize - total_written; + } + + // Make sure (total_written - total_read) + wchunk_size + // is less than maxOutstanding + if (maxOutstanding > 0 && wchunk_size > maxOutstanding - (total_written - total_read)) { + wchunk_size = maxOutstanding - (total_written - total_read); + } + + // Write the chunk + uint32_t chunk_written = 0; + while (chunk_written < wchunk_size) { + uint32_t write_size = wSizeGenerator.nextSize(); + if (write_size == 0 || write_size > wchunk_size - chunk_written) { + write_size = wchunk_size - chunk_written; + } + + try { + transports.out->write(wbuf.get() + total_written, write_size); + } catch (TTransportException& te) { + if (te.getType() == TTransportException::TIMED_OUT) + break; + throw te; + } + chunk_written += write_size; + total_written += write_size; + } + + // Flush the data, so it will be available in the read transport + // Don't flush if wchunk_size is 0. (This should only happen if + // total_written == totalSize already, and we're only reading now.) + if (wchunk_size > 0) { + transports.out->flush(); + } + + // Determine how large a chunk of data to read back + uint32_t rchunk_size = rChunkGenerator.nextSize(); + if (rchunk_size == 0 || rchunk_size > total_written - total_read) { + rchunk_size = total_written - total_read; + } + + // Read the chunk + uint32_t chunk_read = 0; + while (chunk_read < rchunk_size) { + uint32_t read_size = rSizeGenerator.nextSize(); + if (read_size == 0 || read_size > rchunk_size - chunk_read) { + read_size = rchunk_size - chunk_read; + } + + int bytes_read = -1; + try { + bytes_read = transports.in->read(rbuf.get() + total_read, read_size); + } catch (TTransportException& e) { + BOOST_FAIL("read(pos=" << total_read << ", size=" << read_size << ") threw exception \"" + << e.what() << "\"; written so far: " << total_written << " / " + << totalSize << " bytes"); + } + + BOOST_REQUIRE_MESSAGE(bytes_read > 0, + "read(pos=" << total_read << ", size=" << read_size << ") returned " + << bytes_read << "; written so far: " << total_written + << " / " << totalSize << " bytes"); + chunk_read += bytes_read; + total_read += bytes_read; + } + } + + // make sure the data read back is identical to the data written + BOOST_CHECK_EQUAL(memcmp(rbuf.get(), wbuf.get(), totalSize), 0); +} + +template +void test_read_part_available() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t write_buf[16]; + uint8_t read_buf[16]; + memset(write_buf, 'a', sizeof(write_buf)); + + // Attemping to read 10 bytes when only 9 are available should return 9 + // immediately. + transports.out->write(write_buf, 9); + transports.out->flush(); + set_trigger(3, transports.out, 1); + uint32_t bytes_read = transports.in->read(read_buf, 10); + BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0); + BOOST_CHECK_EQUAL(bytes_read, (uint32_t)9); + + clear_triggers(); +} + +template +void test_read_part_available_in_chunks() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t write_buf[16]; + uint8_t read_buf[16]; + memset(write_buf, 'a', sizeof(write_buf)); + + // Write 10 bytes (in a single frame, for transports that use framing) + transports.out->write(write_buf, 10); + transports.out->flush(); + + // Read 1 byte, to force the transport to read the frame + uint32_t bytes_read = transports.in->read(read_buf, 1); + BOOST_CHECK_EQUAL(bytes_read, 1u); + + // Read more than what is remaining and verify the transport does not block + set_trigger(3, transports.out, 1); + bytes_read = transports.in->read(read_buf, 10); + BOOST_CHECK_EQUAL(g_numTriggersFired, 0u); + BOOST_CHECK_EQUAL(bytes_read, 9u); + + clear_triggers(); +} + +template +void test_read_partial_midframe() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t write_buf[16]; + uint8_t read_buf[16]; + memset(write_buf, 'a', sizeof(write_buf)); + + // Attempt to read 10 bytes, when only 9 are available, but after we have + // already read part of the data that is available. This exercises a + // different code path for several of the transports. + // + // For transports that add their own framing (e.g., TFramedTransport and + // TFileTransport), the two flush calls break up the data in to a 10 byte + // frame and a 3 byte frame. The first read then puts us partway through the + // first frame, and then we attempt to read past the end of that frame, and + // through the next frame, too. + // + // For buffered transports that perform read-ahead (e.g., + // TBufferedTransport), the read-ahead will most likely see all 13 bytes + // written on the first read. The next read will then attempt to read past + // the end of the read-ahead buffer. + // + // Flush 10 bytes, then 3 bytes. This creates 2 separate frames for + // transports that track framing internally. + transports.out->write(write_buf, 10); + transports.out->flush(); + transports.out->write(write_buf, 3); + transports.out->flush(); + + // Now read 4 bytes, so that we are partway through the written data. + uint32_t bytes_read = transports.in->read(read_buf, 4); + BOOST_CHECK_EQUAL(bytes_read, (uint32_t)4); + + // Now attempt to read 10 bytes. Only 9 more are available. + // + // We should be able to get all 9 bytes, but it might take multiple read + // calls, since it is valid for read() to return fewer bytes than requested. + // (Most transports do immediately return 9 bytes, but the framing transports + // tend to only return to the end of the current frame, which is 6 bytes in + // this case.) + uint32_t total_read = 0; + while (total_read < 9) { + set_trigger(3, transports.out, 1); + bytes_read = transports.in->read(read_buf, 10); + BOOST_REQUIRE_EQUAL(g_numTriggersFired, (unsigned int)0); + BOOST_REQUIRE_GT(bytes_read, (uint32_t)0); + total_read += bytes_read; + BOOST_REQUIRE_LE(total_read, (uint32_t)9); + } + + BOOST_CHECK_EQUAL(total_read, (uint32_t)9); + + clear_triggers(); +} + +template +void test_borrow_part_available() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t write_buf[16]; + uint8_t read_buf[16]; + memset(write_buf, 'a', sizeof(write_buf)); + + // Attemping to borrow 10 bytes when only 9 are available should return NULL + // immediately. + transports.out->write(write_buf, 9); + transports.out->flush(); + set_trigger(3, transports.out, 1); + uint32_t borrow_len = 10; + const uint8_t* borrowed_buf = transports.in->borrow(read_buf, &borrow_len); + BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0); + BOOST_CHECK(borrowed_buf == nullptr); + + clear_triggers(); +} + +template +void test_read_none_available() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t read_buf[16]; + + // Attempting to read when no data is available should either block until + // some data is available, or fail immediately. (e.g., TSocket blocks, + // TMemoryBuffer just fails.) + // + // If the transport blocks, it should succeed once some data is available, + // even if less than the amount requested becomes available. + set_trigger(1, transports.out, 2); + add_trigger(1, transports.out, 8); + uint32_t bytes_read = transports.in->read(read_buf, 10); + if (bytes_read == 0) { + BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0); + clear_triggers(); + } else { + BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)1); + BOOST_CHECK_EQUAL(bytes_read, (uint32_t)2); + } + + clear_triggers(); +} + +template +void test_borrow_none_available() { + CoupledTransports transports; + BOOST_REQUIRE(transports.in != nullptr); + BOOST_REQUIRE(transports.out != nullptr); + + uint8_t write_buf[16]; + memset(write_buf, 'a', sizeof(write_buf)); + + // Attempting to borrow when no data is available should fail immediately + set_trigger(1, transports.out, 10); + uint32_t borrow_len = 10; + const uint8_t* borrowed_buf = transports.in->borrow(nullptr, &borrow_len); + BOOST_CHECK(borrowed_buf == nullptr); + BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0); + + clear_triggers(); +} + +/************************************************************************** + * Test case generation + * + * Pretty ugly and annoying. This would be much easier if we the unit test + * framework didn't force each test to be a separate function. + * - Writing a completely separate function definition for each of these would + * result in a lot of repetitive boilerplate code. + * - Combining many tests into a single function makes it more difficult to + * tell precisely which tests failed. It also means you can't get a progress + * update after each test, and the tests are already fairly slow. + * - Similar registration could be achieved with BOOST_TEST_CASE_TEMPLATE, + * but it requires a lot of awkward MPL code, and results in useless test + * case names. (The names are generated from std::type_info::name(), which + * is compiler-dependent. gcc returns mangled names.) + **************************************************************************/ + +#define ADD_TEST_RW(CoupledTransports, totalSize, ...) \ + addTestRW(BOOST_STRINGIZE(CoupledTransports), totalSize, ##__VA_ARGS__); + +#define TEST_RW(CoupledTransports, totalSize, ...) \ + do { \ + /* Add the test as specified, to test the non-virtual function calls */ \ + ADD_TEST_RW(CoupledTransports, totalSize, ##__VA_ARGS__); \ + /* \ + * Also test using the transport as a TTransport*, to test \ + * the read_virt()/write_virt() calls \ + */ \ + ADD_TEST_RW(CoupledTTransports, totalSize, ##__VA_ARGS__); \ + /* Test wrapping the transport with TBufferedTransport */ \ + ADD_TEST_RW(CoupledBufferedTransportsT, totalSize, ##__VA_ARGS__); \ + /* Test wrapping the transport with TFramedTransports */ \ + ADD_TEST_RW(CoupledFramedTransportsT, totalSize, ##__VA_ARGS__); \ + /* Test wrapping the transport with TZlibTransport */ \ + ADD_TEST_RW(CoupledZlibTransportsT, totalSize, ##__VA_ARGS__); \ + } while (0) + +#define ADD_TEST_BLOCKING(CoupledTransports) \ + addTestBlocking(BOOST_STRINGIZE(CoupledTransports)); + +#define TEST_BLOCKING_BEHAVIOR(CoupledTransports) \ + ADD_TEST_BLOCKING(CoupledTransports); \ + ADD_TEST_BLOCKING(CoupledTTransports); \ + ADD_TEST_BLOCKING(CoupledBufferedTransportsT); \ + ADD_TEST_BLOCKING(CoupledFramedTransportsT); \ + ADD_TEST_BLOCKING(CoupledZlibTransportsT); + +class TransportTestGen { +public: + TransportTestGen(boost::unit_test::test_suite* suite, float sizeMultiplier) + : suite_(suite), sizeMultiplier_(sizeMultiplier) {} + + void generate() { + GenericSizeGenerator rand4k(1, 4096); + + /* + * We do the basically the same set of tests for each transport type, + * although we tweak the parameters in some places. + */ + + // TMemoryBuffer tests + TEST_RW(CoupledMemoryBuffers, 1024 * 1024, 0, 0); + TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k); + TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163); + TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1); + + TEST_RW(CoupledMemoryBuffers, 1024 * 256, 0, 0, rand4k, rand4k); + TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k, rand4k, rand4k); + TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163, rand4k, rand4k); + TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1, rand4k, rand4k); + + TEST_BLOCKING_BEHAVIOR(CoupledMemoryBuffers); + +#ifndef _WIN32 + // TFDTransport tests + // Since CoupledFDTransports tests with a pipe, writes will block + // if there is too much outstanding unread data in the pipe. + uint32_t fd_max_outstanding = 4096; + TEST_RW(CoupledFDTransports, 1024 * 1024, 0, 0, 0, 0, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, 0, 0, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, 0, 0, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, 0, 0, fd_max_outstanding); + + TEST_RW(CoupledFDTransports, 1024 * 256, 0, 0, rand4k, rand4k, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, rand4k, rand4k, fd_max_outstanding); + TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, rand4k, rand4k, fd_max_outstanding); + + TEST_BLOCKING_BEHAVIOR(CoupledFDTransports); +#else + // TPipe tests (WIN32 only) + TEST_RW(CoupledPipeTransports, 1024 * 1024, 0, 0); + TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k); + TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163); + TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1); + + TEST_RW(CoupledPipeTransports, 1024 * 256, 0, 0, rand4k, rand4k); + TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k); + TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163, rand4k, rand4k); + TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1, rand4k, rand4k); + + TEST_BLOCKING_BEHAVIOR(CoupledPipeTransports); +#endif //_WIN32 + + // TSocket tests + uint32_t socket_max_outstanding = 4096; + TEST_RW(CoupledSocketTransports, 1024 * 1024, 0, 0, 0, 0, socket_max_outstanding); + TEST_RW(CoupledSocketTransports, 1024 * 256, rand4k, rand4k, 0, 0, socket_max_outstanding); + TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, 0, 0, socket_max_outstanding); + // Doh. Apparently writing to a socket has some additional overhead for + // each send() call. If we have more than ~400 outstanding 1-byte write + // requests, additional send() calls start blocking. + TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, 0, 0, socket_max_outstanding); + TEST_RW(CoupledSocketTransports, 1024 * 256, 0, 0, rand4k, rand4k, socket_max_outstanding); + TEST_RW(CoupledSocketTransports, + 1024 * 256, + rand4k, + rand4k, + rand4k, + rand4k, + socket_max_outstanding); + TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, rand4k, rand4k, socket_max_outstanding); + TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, rand4k, rand4k, socket_max_outstanding); + + TEST_BLOCKING_BEHAVIOR(CoupledSocketTransports); + +// These could be made to work on Windows, but I don't care enough to make it happen +#ifndef _WIN32 + // TFileTransport tests + // We use smaller buffer sizes here, since TFileTransport is fairly slow. + // + // TFileTransport can't write more than 16MB at once + uint32_t max_write_at_once = 1024 * 1024 * 16 - 4; + TEST_RW(CoupledFileTransports, 1024 * 1024, max_write_at_once, 0); + TEST_RW(CoupledFileTransports, 1024 * 128, rand4k, rand4k); + TEST_RW(CoupledFileTransports, 1024 * 128, 167, 163); + TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1); + + TEST_RW(CoupledFileTransports, 1024 * 64, 0, 0, rand4k, rand4k); + TEST_RW(CoupledFileTransports, 1024 * 64, rand4k, rand4k, rand4k, rand4k); + TEST_RW(CoupledFileTransports, 1024 * 64, 167, 163, rand4k, rand4k); + TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1, rand4k, rand4k); + + TEST_BLOCKING_BEHAVIOR(CoupledFileTransports); +#endif + + // Add some tests that access TBufferedTransport and TFramedTransport + // via TTransport pointers and TBufferBase pointers. + ADD_TEST_RW(CoupledTTransports, + 1024 * 1024, + rand4k, + rand4k, + rand4k, + rand4k); + ADD_TEST_RW(CoupledBufferBases, + 1024 * 1024, + rand4k, + rand4k, + rand4k, + rand4k); + ADD_TEST_RW(CoupledTTransports, + 1024 * 1024, + rand4k, + rand4k, + rand4k, + rand4k); + ADD_TEST_RW(CoupledBufferBases, + 1024 * 1024, + rand4k, + rand4k, + rand4k, + rand4k); + + // Test using TZlibTransport via a TTransport pointer + ADD_TEST_RW(CoupledTTransports, + 1024 * 1024, + rand4k, + rand4k, + rand4k, + rand4k); + } + +#if (BOOST_VERSION >= 105900) +#define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME, __FILE__, __LINE__) +#else +#define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME) +#endif + +private: + template + void addTestRW(const char* transport_name, + uint32_t totalSize, + GenericSizeGenerator wSizeGen, + GenericSizeGenerator rSizeGen, + GenericSizeGenerator wChunkSizeGen = 0, + GenericSizeGenerator rChunkSizeGen = 0, + uint32_t maxOutstanding = 0, + uint32_t expectedFailures = 0) { + // adjust totalSize by the specified sizeMultiplier_ first + totalSize = static_cast(totalSize * sizeMultiplier_); + + std::ostringstream name; + name << transport_name << "::test_rw(" << totalSize << ", " << wSizeGen.describe() << ", " + << rSizeGen.describe() << ", " << wChunkSizeGen.describe() << ", " + << rChunkSizeGen.describe() << ", " << maxOutstanding << ")"; + +#if (BOOST_VERSION >= 105900) + std::function test_func +#else + boost::unit_test::callback0<> test_func +#endif + = std::bind(test_rw, + totalSize, + wSizeGen, + rSizeGen, + wChunkSizeGen, + rChunkSizeGen, + maxOutstanding); + suite_->add(MAKE_TEST_CASE(test_func, name.str()), expectedFailures); + } + + template + void addTestBlocking(const char* transportName, uint32_t expectedFailures = 0) { + char name[1024]; + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available()", transportName); + suite_->add(MAKE_TEST_CASE(test_read_part_available, name), expectedFailures); + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available_in_chunks()", transportName); + suite_->add(MAKE_TEST_CASE(test_read_part_available_in_chunks, name), expectedFailures); + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_partial_midframe()", transportName); + suite_->add(MAKE_TEST_CASE(test_read_partial_midframe, name), expectedFailures); + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_none_available()", transportName); + suite_->add(MAKE_TEST_CASE(test_read_none_available, name), expectedFailures); + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_part_available()", transportName); + suite_->add(MAKE_TEST_CASE(test_borrow_part_available, name), expectedFailures); + + THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_none_available()", transportName); + suite_->add(MAKE_TEST_CASE(test_borrow_none_available, name), expectedFailures); + } + + boost::unit_test::test_suite* suite_; + // sizeMultiplier_ is configurable via the command line, and allows the + // user to adjust between smaller buffers that can be tested quickly, + // or larger buffers that more thoroughly exercise the code, but take + // longer. + float sizeMultiplier_; +}; + +/************************************************************************** + * General Initialization + **************************************************************************/ + +struct global_fixture { + std::shared_ptr alarmThread_; + global_fixture() { +#if _WIN32 + apache::thrift::transport::TWinsockSingleton::create(); +#endif + + apache::thrift::concurrency::ThreadFactory factory; + factory.setDetached(false); + + alarmThread_ = factory.newThread( + apache::thrift::concurrency::FunctionRunner::create(alarm_handler_wrapper)); + alarmThread_->start(); + } + ~global_fixture() { + { + apache::thrift::concurrency::Synchronized s(g_alarm_monitor); + g_teardown = true; + g_alarm_monitor.notify(); + } + alarmThread_->join(); + } +}; + +#if (BOOST_VERSION >= 105900) +BOOST_GLOBAL_FIXTURE(global_fixture); +#else +BOOST_GLOBAL_FIXTURE(global_fixture) +#endif + +#ifdef BOOST_TEST_DYN_LINK +bool init_unit_test_suite() { + struct timeval tv; + THRIFT_GETTIMEOFDAY(&tv, nullptr); + int seed = tv.tv_sec ^ tv.tv_usec; + + initrand(seed); + + boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite(); + suite->p_name.value = "TransportTest"; + TransportTestGen transport_test_generator(suite, 1); + transport_test_generator.generate(); + return true; +} + +int main( int argc, char* argv[] ) { + return ::boost::unit_test::unit_test_main(&init_unit_test_suite,argc,argv); +} +#else +boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) { + THRIFT_UNUSED_VARIABLE(argc); + THRIFT_UNUSED_VARIABLE(argv); + struct timeval tv; + THRIFT_GETTIMEOFDAY(&tv, NULL); + int seed = tv.tv_sec ^ tv.tv_usec; + + initrand(seed); + + boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite(); + suite->p_name.value = "TransportTest"; + TransportTestGen transport_test_generator(suite, 1); + transport_test_generator.generate(); + return NULL; +} +#endif -- cgit v1.2.3