
I'm using serialization library to create UDP packets. My program is sending/receiving hundreds of these each second. I'm currently creating iarchive/oarchive for every packet sent/received. This is quite a performance bottleneck. Is it possible to reuse single binary archive object for all UDP packets? I.e. somehow reinitialize underlying stream with new data and read/write to the same archive again? thank you, Marcin

I would guess its possible but I don't know for sure that's were any performance bottleneck is. The new version of native binary archive uses streambuf i/o instead of basic_stream i/o which should be noticibly faster since it does use a mutex lock on each i/o. That might help. Initalizing an archive basically just consists of writing a small header you might try using no_header on construction to see if that helps. You might try reusing the stream and just create a new archive- that might help. You might create a custom variation of a standard stream buf which maps to sending UDP packets. You might make your own version of native_binary which doesn't use a stream but uses some faster way if setting up to send UDP packets. Good Luck Robert Ramey Marcin Kalicinski wrote:
I'm using serialization library to create UDP packets. My program is sending/receiving hundreds of these each second. I'm currently creating iarchive/oarchive for every packet sent/received. This is quite a performance bottleneck. Is it possible to reuse single binary archive object for all UDP packets? I.e. somehow reinitialize underlying stream with new data and read/write to the same archive again?
thank you, Marcin
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost

At 12:31 AM +0000 3/18/06, Marcin Kalicinski wrote:
I'm using serialization library to create UDP packets. My program is sending/receiving hundreds of these each second. I'm currently creating iarchive/oarchive for every packet sent/received. This is quite a performance bottleneck. Is it possible to reuse single binary archive object for all UDP packets? I.e. somehow reinitialize underlying stream with new data and read/write to the same archive again?
At 6:35 PM -0800 3/17/06, Robert Ramey wrote:
I would guess its possible but I don't know for sure that's were any performance bottleneck is.
This was an issue I brought up last July. At the time I mentioned that I'd done some measurements and modified the serialization library by adding a reset member function to basic_iarchive_impl and basic_oarchive_impl so that they could be reused, and got a significant performance improvement by doing so. Unfortunately I didn't follow up more fully at the time (I was doing the relevant tests just before going on vacation) and haven't had time to do anything about it since then. Below is a description of the test and the relevant numbers. I'm presently attempting to ensure that my project's schedule has some time allotted to my working on this, possibly in April (I hope), with the intent of submitting a patch back to you. I've cleaned up the test and re-ran it with my current configuration, with the results provided below. I'm also appending the test code to the end of this message, in case you want to have a look, or run it against a different configuration. First, here is the timing breakdown. Construct stream and archive on each iteration. All times are in microseconds (averaged over 1 million loop executions). 19.9 mutex, condition, and process switching overhead 9.3 stream construct 23.0 archive construct 14.1 serialize data 4.4 polymorphic archive overhead ----- 70.7 This breakdown was produced by running different variants of the test which remove or modify its behavior according to command-line arguments. The variant checks are inline in the main loop rather than having different loop variants in order to prevent the compiler from optimizing away dead code in variants that don't do much. Tests were run on 2GHz Pentium laptop, SuSE 9.3 (gcc 3.3.5) against Boost 1.33.1 with the addition of the review version (0.9) of Boost.Shmem from the vault. The streams used for archive input and output are stream<array_sink> and stream<array_source> from the boost.iostreams library. The associated buffers are in shared memory. The archives are constructed with no_header and no_codecvt options. The no_codecvt option has a very large impact (roughly 50usec). I think I looked into that once and found a data structure that was being initialized at archive construction time. If so, for an application that can't use that option archive reuse becomes still more desirable. So stream construction/destruction is accounting for 13%, and archive construction/destruction is accounting for 1/3 (!) of the time in this test. As you can see, for small transfers like this the construction / destruction time for the archive and stream is pretty significant. Some additional hacked up tests and discussion on the mailing list back in July suggested that the dominating factor was probably memory allocation by various standard container members of the archives, though I haven't actually tried to track it down to that level. I haven't run my tests against your recent changes to use stream_buffers instead of streams internally in the the archive implementation. I expect that will help some too (though one might hope that boost.iostreams array_sink and array_source are pretty efficient already). But even if your changes made the serialization time drop to zero, it would still be a smaller improvement for this situation than being able to reuse the archives. (This is not at all intended as a criticism of those changes, merely noting that this is not the biggest factor for this specific use case.) Back in July I hacked in a patch to the serialization library to add a reset operation (described in email back then, though the details are probably not that important). Using that and resetting the streams (via seekp/seekg) dropped the construction time down into the noise. I haven't repeated that, since I'm planning to do something better than a quick hack soon anyway, but if one substitutes say 1usec for each for a reset rather than construction (which is probably pessimistic), one gets: Reset and reuse stream and archive 19.9 mutex, condition, and process switching overhead 1.0 stream reset 1.0 archive reset 14.1 serialize data 4.4 polymorphic archive overhead ----- 40.4 And at this point the recent stream_buffer improvements start looking highly productive. Test code below: ============================================================================== // Copyright 2006, Kim Barrett. // Distributed under the Boost Software License, Version 1.0 // (see http://www.boost.org/LICENSE_1_1.txt) // // assumes boost installed in /opt/irobot, this file called new-shm-test.cpp // // g++ -Wall -fPIC -pthread -D_REENTRANT -ftemplate-depth-256 -O3 // -I/opt/irobot/include -L/opt/irobot/lib -o new-shm-test // new-shm-test.cpp -lboost_serialization -lpthread // // in one shell, run this program as server, specifying arguments zero or one // of the following arguments: // --no_polymorphic : don't use polymorphic archive // --no_transport : don't transport data, only construct archive objects // --no_archive : don't construct archives, only streams // --no_stream : don't construct streams // // in another shell, run this program as client, by specifying arguments: // --iterations <count> // // Tests were run on kab's Dell Inspirion 8500 laptop, running SuSE9.3 // (gcc 3.3.5) against Boost 1.33.1 with the addition of the review version // (0.9) of Boost.Shmem from the boost vault. // // Data, iterations = 1,000,000: run 5, discard high&low, average remaining // poly, transport: 70.645, 70.837, 70.678, 71.193, 71.363 :: 70.903 // non-poly, transport: 66.357, 66.535, 61.596, 66.737, 66.995 :: 66.523 // poly, no-transport: 52.204, 52.460, 54.251, 52.480, 52.309 :: 52.416 // non-poly, no-trans: 52.609, 52.286, 51.957, 52.123, 52.226 :: 52.212 // no-archive: 29.049, 29.230, 30.194, 29.205, 29.298 :: 29.244 // no-stream: 20.374, 19.936, 19.684, 20.216, 19.665 :: 19.945 // // 4.4 polymorphic transport overhead // 14.1 non-polymorphic transport // 23.0 archive construction // 9.3 stream construction // 19.9 mutex, condition, process switching &etc // ----- // 70.7 total (slightly less than actual, due to rounding) #define NEW_SHM_TEST_DEBUG_PRINT 0 //#define NEW_SHM_TEST_DEBUG_PRINT 1 #if NEW_SHM_TEST_DEBUG_PRINT #include <iostream> #endif #include <string> #include <boost/lexical_cast.hpp> #include <boost/iostreams/stream.hpp> #include <boost/shmem/named_shared_object.hpp> #include <boost/shmem/sync/shared_mutex.hpp> #include <boost/shmem/sync/shared_condition.hpp> #include <boost/archive/binary_iarchive.hpp> #include <boost/archive/binary_oarchive.hpp> #include <boost/archive/polymorphic_binary_iarchive.hpp> #include <boost/archive/polymorphic_binary_oarchive.hpp> #include <boost/serialization/access.hpp> #include <boost/serialization/nvp.hpp> #undef NDEBUG #include <cassert> namespace io = boost::iostreams; namespace shmem = boost::shmem; namespace archive = boost::archive; namespace serial = boost::serialization; class test_struct { friend class boost::serialization::access; double d; float f; signed long sl; unsigned long ul; signed short ss; unsigned short us; template<typename Archive> void serialize(Archive& ar, unsigned int /* version */) { ar & BOOST_SERIALIZATION_NVP(d); ar & BOOST_SERIALIZATION_NVP(f); ar & BOOST_SERIALIZATION_NVP(sl); ar & BOOST_SERIALIZATION_NVP(ul); ar & BOOST_SERIALIZATION_NVP(ss); ar & BOOST_SERIALIZATION_NVP(us); } public: test_struct() { } test_struct(double d_, float f_, signed long sl_, unsigned long ul_, signed short ss_, unsigned short us_) : d(d_), f(f_), sl(sl_), ul(ul_), ss(ss_), us(us_) { } bool operator==(const test_struct& rhs) { return d == rhs.d && f == rhs.f && sl == rhs.sl && ul == rhs.ul && ss == rhs.ss && us == rhs.us; } }; test_struct expected(2.0, 2.0, -10, 10, -5, 5); // arguments: // --iterations count // --no_polymorphic // --no_transport // --no_archive // --no_stream const int archive_flags = archive::no_header | archive::no_codecvt; struct parsed_args { const char* segment_name; size_t segment_size; size_t iterations; size_t buffer_size; bool transport; bool polymorphic; bool make_archive; bool make_stream; bool server; }; const parsed_args parse_args(int argc, char* argv[]) { parsed_args result; memset(&result, 0, sizeof result); result.segment_name = "/archive_test"; result.segment_size = 2000; result.buffer_size = 1000; result.transport = true; result.polymorphic = true; result.make_archive = true; result.make_stream = true; result.server = true; int i = 1; while (i < argc) { const std::string arg(argv[i]); if (arg == "--iterations") { assert(++i < argc); result.iterations = boost::lexical_cast<size_t>(argv[i]); result.server = false; } else if (arg == "--no_transport") { result.transport = false; } else if (arg == "--no_polymorphic") { result.polymorphic = false; } else if (arg == "--no_archive") { result.make_archive = false; } else if (arg == "--no_stream") { result.make_stream = false; } else { assert(false); } i += 1; } return result; } void test_server(const parsed_args& args) { shmem::named_shared_object segment; assert(segment.create(args.segment_name, args.segment_size)); shmem::shared_mutex* mutex( segment.construct<shmem::shared_mutex>("mutex")()); assert(mutex != 0); shmem::shared_condition* condition( segment.construct<shmem::shared_condition>("condition")()); assert(condition != 0); char* shared_buffer(segment.construct<char>("buffer")[args.buffer_size]()); assert(shared_buffer != 0); parsed_args* shared_args(segment.construct<parsed_args>("arguments")()); assert(shared_args != 0); volatile int* value_count(segment.construct<int>("value_count")()); assert(value_count != 0); volatile int* reply_count(segment.construct<int>("reply_count")()); assert(reply_count != 0); volatile int* stop_request(segment.construct<int>("stop_request")()); assert(stop_request != 0); *value_count = 0; *reply_count = 0; *shared_args = args; *stop_request = 0; int last_value_count = 0; test_struct local_struct; shmem::shared_mutex::scoped_lock lock(*mutex); while (!*stop_request) { #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "Waiting for a value after " << *value_count << std::endl; #endif while (!*stop_request && (*value_count == last_value_count)) { condition->wait(lock); } if (*stop_request) break; last_value_count = *value_count; #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "Got a value, reading it\n"; #endif if (args.make_stream) { io::stream<io::array_source> in(shared_buffer, args.buffer_size); if (!args.make_archive) { // do nothing } else if (args.polymorphic) { archive::polymorphic_binary_iarchive ar(in, archive_flags); if (args.transport) { *static_cast<archive::polymorphic_iarchive*>(&ar) & local_struct; } } else { archive::binary_iarchive ar(in, archive_flags); if (args.transport) ar & local_struct; } #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "Writing reply\n"; #endif io::stream<io::array_sink> out(shared_buffer, args.buffer_size); if (!args.make_archive) { // do nothing } else if (args.polymorphic) { archive::polymorphic_binary_oarchive ar(out, archive_flags); if (args.transport) { *static_cast<archive::polymorphic_oarchive*>(&ar) & local_struct; } } else { archive::binary_oarchive ar(out, archive_flags); if (args.transport) ar & local_struct; } } *reply_count += 1; #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "Notifying of reply " << *reply_count << std::endl; #endif condition->notify_all(); } } void test_client(parsed_args args) { size_t iterations = args.iterations; shmem::named_shared_object segment; assert(segment.open(args.segment_name)); shmem::shared_mutex* mutex( segment.find<shmem::shared_mutex>("mutex").first); assert(mutex != 0); shmem::shared_condition* condition( segment.find<shmem::shared_condition>("condition").first); assert(condition != 0); std::pair<char*, size_t> buffer_info(segment.find<char>("buffer")); char* shared_buffer(buffer_info.first); assert(shared_buffer != 0); size_t buffer_size(buffer_info.second); assert(buffer_size != 0); parsed_args* argsp(segment.find<parsed_args>("arguments").first); assert(argsp != 0); args = *argsp; // overwrite from shared after getting iters volatile int* value_count(segment.find<int>("value_count").first); assert(value_count != 0); volatile int* reply_count(segment.find<int>("reply_count").first); assert(reply_count != 0); volatile int* stop_request(segment.find<int>("stop_request").first); assert(stop_request != 0); test_struct local_struct; shmem::shared_mutex::scoped_lock lock(*mutex); int last_reply_count = *reply_count; for (size_t i = 0; i < iterations; i++) { #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "writing value\n"; #endif if (args.make_stream) { io::stream<io::array_sink> out(shared_buffer, buffer_size); if (!args.make_archive) { // do nothing } else if (args.polymorphic) { archive::polymorphic_binary_oarchive ar(out, archive_flags); if (args.transport) { *static_cast<archive::polymorphic_oarchive*>(&ar) & expected; } } else { archive::binary_oarchive ar(out, archive_flags); if (args.transport) ar & expected; } } *value_count += 1; #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "notifying of value " << *value_count << std::endl; #endif condition->notify_all(); #if NEW_SHM_TEST_DEBUG_PRINT std::cerr << "waiting for reply from " << *reply_count << std::endl; #endif while (*reply_count == last_reply_count) { condition->wait(lock); } last_reply_count = *reply_count; if (args.make_stream) { io::stream<io::array_source> in(shared_buffer, buffer_size); if (!args.make_archive) { // do nothing } else if (args.polymorphic) { archive::polymorphic_binary_iarchive ar(in, archive_flags); if (args.transport) { *static_cast<archive::polymorphic_iarchive*>(&ar) & local_struct; } } else { archive::binary_iarchive ar(in, archive_flags); if (args.transport) ar & local_struct; } } } *stop_request = 1; condition->notify_all(); } int main(int argc, char* argv[]) { const parsed_args args(parse_args(argc, argv)); if (args.server) { test_server(args); } else { test_client(args); } return 0; }
participants (3)
-
Kim Barrett
-
Marcin Kalicinski
-
Robert Ramey