Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinniefalco/0ead73591dce5f790b5ad513fc071301 to your computer and use it in GitHub Desktop.
Save vinniefalco/0ead73591dce5f790b5ad513fc071301 to your computer and use it in GitHub Desktop.
template<class NextLayer>
template<class MutableBufferSequence>
std::size_t
stream<NextLayer>::
read_some(
MutableBufferSequence const& buffers,
error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
close_code code{};
std::size_t bytes_written = 0;
loop:
// See if we need to read a frame header. This
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
//
if(rd_.remain == 0 && (! rd_.fh.fin || rd_.done))
{
// Read frame header
while(! parse_fh(rd_.fh, rd_.buf, code))
{
if(code != close_code::none)
goto do_close;
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
rd_.buf.commit(bytes_transferred);
}
// always apply the mask to the portion
// of the buffer holding payload data.
if(rd_.fh.len > 0 && rd_.fh.mask)
detail::mask_inplace(buffer_prefix(
clamp(rd_.fh.len), rd_.buf.mutable_data()),
rd_.key);
if(detail::is_control(rd_.fh.op))
{
// get control frame payload
auto const cb = buffer_prefix(
clamp(rd_.fh.len), rd_.buf.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == rd_.fh.len);
// process control frame
if(rd_.fh.op == detail::opcode::ping)
{
ping_data payload;
detail::read_ping(payload, cb);
rd_.buf.consume(len);
if(wr_close_)
{
// ignore ping when closing
goto loop;
}
if(ctrl_cb_)
ctrl_cb_(frame_type::ping, payload);
detail::frame_streambuf fb;
write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
goto loop;
}
else if(rd_.fh.op == detail::opcode::pong)
{
ping_data payload;
detail::read_ping(payload, cb);
rd_.buf.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::pong, payload);
goto loop;
}
BOOST_ASSERT(rd_.fh.op == detail::opcode::close);
{
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
detail::read_close(cr_, cb, code);
if(code != close_code::none)
goto do_close;
rd_.buf.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::close, cr_.reason);
if(! wr_close_)
{
auto cr = cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
detail::frame_streambuf fb;
wr_close_ = true;
write_close<
flat_static_buffer_base>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
}
goto do_close;
}
}
if(rd_.fh.len == 0 && ! rd_.fh.fin)
{
// empty non-final frame
goto loop;
}
rd_.done = false;
}
else
{
ec.assign(0, ec.category());
}
if(! pmd_ || ! pmd_->rd_set)
{
if(rd_.buf.size() == 0 && rd_.buf.max_size() >
(std::min)(clamp(rd_.remain),
buffer_size(buffers)))
{
// fill the read buffer, otherwise we
// get fewer bytes at the cost of one I/O.
auto const mb = rd_.buf.prepare(
read_size(rd_.buf, rd_.buf.max_size()));
auto const bytes_transferred =
stream_.read_some(mb, ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
if(rd_.fh.mask)
detail::mask_inplace(buffer_prefix(
clamp(rd_.remain), mb), rd_.key);
rd_.buf.commit(bytes_transferred);
}
if(rd_.buf.size() > 0)
{
// return data from the read buffer
auto const bytes_transferred =
buffer_copy(buffers, rd_.buf.data(),
clamp(rd_.remain));
auto const mb = buffer_prefix(
bytes_transferred, buffers);
rd_.remain -= bytes_transferred;
if(rd_.op == detail::opcode::text)
{
if(! rd_.utf8.write(mb) ||
(rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
{
code = close_code::bad_payload;
goto do_close;
}
}
bytes_written += bytes_transferred;
rd_.size += bytes_transferred;
rd_.buf.consume(bytes_transferred);
}
else
{
// read into caller's buffer
auto const bytes_transferred =
stream_.read_some(buffer_prefix(
clamp(rd_.remain), buffers), ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
bytes_transferred, buffers);
rd_.remain -= bytes_transferred;
if(rd_.fh.mask)
detail::mask_inplace(mb, rd_.key);
if(rd_.op == detail::opcode::text)
{
if(! rd_.utf8.write(mb) ||
(rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
{
code = close_code::bad_payload;
goto do_close;
}
}
bytes_written += bytes_transferred;
rd_.size += bytes_transferred;
}
if(rd_.remain == 0 && rd_.fh.fin)
{
rd_.done = true;
}
}
else
{
// Read compressed message frame payload:
// inflate even if rd_.fh.len == 0, otherwise we
// never emit the end-of-stream deflate block.
//
bool did_read = false;
consuming_buffers<MutableBufferSequence> cb{buffers};
while(buffer_size(cb) > 0)
{
zlib::z_params zs;
{
auto const out = buffer_front(cb);
zs.next_out = buffer_cast<void*>(out);
zs.avail_out = buffer_size(out);
BOOST_ASSERT(zs.avail_out > 0);
}
if(rd_.remain > 0)
{
if(rd_.buf.size() > 0)
{
// use what's there
auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front(
rd_.buf.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
}
else if(! did_read)
{
// read new
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_.buf.commit(bytes_transferred);
if(rd_.fh.mask)
detail::mask_inplace(
buffer_prefix(clamp(rd_.remain),
rd_.buf.mutable_data()), rd_.key);
auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front(
rd_.buf.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
did_read = true;
}
else
{
break;
}
}
else if(rd_.fh.fin)
{
// append the empty block codes
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec);
if(ec)
{
failed_ = true;
return bytes_written;
}
// VFALCO See:
// https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0);
cb.consume(zs.total_out);
rd_.size += zs.total_out;
bytes_written += zs.total_out;
if(
(role_ == role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role_ == role_type::server &&
pmd_config_.client_no_context_takeover))
pmd_->zi.reset();
rd_.done = true;
break;
}
else
{
break;
}
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream);
failed_ = !!ec;
if(failed_)
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_.size, zs.total_out, rd_msg_max_))
{
code = close_code::too_big;
goto do_close;
}
cb.consume(zs.total_out);
rd_.size += zs.total_out;
rd_.remain -= zs.total_in;
rd_.buf.consume(zs.total_in);
bytes_written += zs.total_out;
}
if(rd_.op == detail::opcode::text)
{
// check utf8
if(! rd_.utf8.write(
buffer_prefix(bytes_written, buffers)) || (
rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
{
code = close_code::bad_payload;
goto do_close;
}
}
}
return bytes_written;
do_close:
if(code != close_code::none)
{
// Fail the connection (per rfc6455)
if(! wr_close_)
{
wr_close_ = true;
detail::frame_streambuf fb;
write_close<flat_static_buffer_base>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
}
websocket_helpers::call_teardown(next_layer(), ec);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
failed_ = !!ec;
if(failed_)
return bytes_written;
ec = error::failed;
failed_ = true;
return bytes_written;
}
if(! ec)
{
websocket_helpers::call_teardown(next_layer(), ec);
if(ec == boost::asio::error::eof)
{
// (See above)
ec.assign(0, ec.category());
}
}
if(! ec)
ec = error::closed;
failed_ = !!ec;
if(failed_)
return bytes_written;
return bytes_written;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment