Created
July 18, 2017 23:22
-
-
Save vinniefalco/0ead73591dce5f790b5ad513fc071301 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
template<class NextLayer> | |
template<class MutableBufferSequence> | |
std::size_t | |
stream<NextLayer>:: | |
read_some( | |
MutableBufferSequence const& buffers, | |
error_code& ec) | |
{ | |
static_assert(is_sync_stream<next_layer_type>::value, | |
"SyncStream requirements not met"); | |
static_assert(is_mutable_buffer_sequence< | |
MutableBufferSequence>::value, | |
"MutableBufferSequence requirements not met"); | |
using beast::detail::clamp; | |
using boost::asio::buffer; | |
using boost::asio::buffer_cast; | |
using boost::asio::buffer_size; | |
close_code code{}; | |
std::size_t bytes_written = 0; | |
loop: | |
// See if we need to read a frame header. This | |
// condition is structured to give the decompressor | |
// a chance to emit the final empty deflate block | |
// | |
if(rd_.remain == 0 && (! rd_.fh.fin || rd_.done)) | |
{ | |
// Read frame header | |
while(! parse_fh(rd_.fh, rd_.buf, code)) | |
{ | |
if(code != close_code::none) | |
goto do_close; | |
auto const bytes_transferred = | |
stream_.read_some( | |
rd_.buf.prepare(read_size( | |
rd_.buf, rd_.buf.max_size())), | |
ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
rd_.buf.commit(bytes_transferred); | |
} | |
// always apply the mask to the portion | |
// of the buffer holding payload data. | |
if(rd_.fh.len > 0 && rd_.fh.mask) | |
detail::mask_inplace(buffer_prefix( | |
clamp(rd_.fh.len), rd_.buf.mutable_data()), | |
rd_.key); | |
if(detail::is_control(rd_.fh.op)) | |
{ | |
// get control frame payload | |
auto const cb = buffer_prefix( | |
clamp(rd_.fh.len), rd_.buf.data()); | |
auto const len = buffer_size(cb); | |
BOOST_ASSERT(len == rd_.fh.len); | |
// process control frame | |
if(rd_.fh.op == detail::opcode::ping) | |
{ | |
ping_data payload; | |
detail::read_ping(payload, cb); | |
rd_.buf.consume(len); | |
if(wr_close_) | |
{ | |
// ignore ping when closing | |
goto loop; | |
} | |
if(ctrl_cb_) | |
ctrl_cb_(frame_type::ping, payload); | |
detail::frame_streambuf fb; | |
write_ping<flat_static_buffer_base>(fb, | |
detail::opcode::pong, payload); | |
boost::asio::write(stream_, fb.data(), ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
goto loop; | |
} | |
else if(rd_.fh.op == detail::opcode::pong) | |
{ | |
ping_data payload; | |
detail::read_ping(payload, cb); | |
rd_.buf.consume(len); | |
if(ctrl_cb_) | |
ctrl_cb_(frame_type::pong, payload); | |
goto loop; | |
} | |
BOOST_ASSERT(rd_.fh.op == detail::opcode::close); | |
{ | |
BOOST_ASSERT(! rd_close_); | |
rd_close_ = true; | |
detail::read_close(cr_, cb, code); | |
if(code != close_code::none) | |
goto do_close; | |
rd_.buf.consume(len); | |
if(ctrl_cb_) | |
ctrl_cb_(frame_type::close, cr_.reason); | |
if(! wr_close_) | |
{ | |
auto cr = cr_; | |
if(cr.code == close_code::none) | |
cr.code = close_code::normal; | |
cr.reason = ""; | |
detail::frame_streambuf fb; | |
wr_close_ = true; | |
write_close< | |
flat_static_buffer_base>(fb, cr); | |
boost::asio::write(stream_, fb.data(), ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
} | |
goto do_close; | |
} | |
} | |
if(rd_.fh.len == 0 && ! rd_.fh.fin) | |
{ | |
// empty non-final frame | |
goto loop; | |
} | |
rd_.done = false; | |
} | |
else | |
{ | |
ec.assign(0, ec.category()); | |
} | |
if(! pmd_ || ! pmd_->rd_set) | |
{ | |
if(rd_.buf.size() == 0 && rd_.buf.max_size() > | |
(std::min)(clamp(rd_.remain), | |
buffer_size(buffers))) | |
{ | |
// fill the read buffer, otherwise we | |
// get fewer bytes at the cost of one I/O. | |
auto const mb = rd_.buf.prepare( | |
read_size(rd_.buf, rd_.buf.max_size())); | |
auto const bytes_transferred = | |
stream_.read_some(mb, ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
if(rd_.fh.mask) | |
detail::mask_inplace(buffer_prefix( | |
clamp(rd_.remain), mb), rd_.key); | |
rd_.buf.commit(bytes_transferred); | |
} | |
if(rd_.buf.size() > 0) | |
{ | |
// return data from the read buffer | |
auto const bytes_transferred = | |
buffer_copy(buffers, rd_.buf.data(), | |
clamp(rd_.remain)); | |
auto const mb = buffer_prefix( | |
bytes_transferred, buffers); | |
rd_.remain -= bytes_transferred; | |
if(rd_.op == detail::opcode::text) | |
{ | |
if(! rd_.utf8.write(mb) || | |
(rd_.remain == 0 && rd_.fh.fin && | |
! rd_.utf8.finish())) | |
{ | |
code = close_code::bad_payload; | |
goto do_close; | |
} | |
} | |
bytes_written += bytes_transferred; | |
rd_.size += bytes_transferred; | |
rd_.buf.consume(bytes_transferred); | |
} | |
else | |
{ | |
// read into caller's buffer | |
auto const bytes_transferred = | |
stream_.read_some(buffer_prefix( | |
clamp(rd_.remain), buffers), ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
BOOST_ASSERT(bytes_transferred > 0); | |
auto const mb = buffer_prefix( | |
bytes_transferred, buffers); | |
rd_.remain -= bytes_transferred; | |
if(rd_.fh.mask) | |
detail::mask_inplace(mb, rd_.key); | |
if(rd_.op == detail::opcode::text) | |
{ | |
if(! rd_.utf8.write(mb) || | |
(rd_.remain == 0 && rd_.fh.fin && | |
! rd_.utf8.finish())) | |
{ | |
code = close_code::bad_payload; | |
goto do_close; | |
} | |
} | |
bytes_written += bytes_transferred; | |
rd_.size += bytes_transferred; | |
} | |
if(rd_.remain == 0 && rd_.fh.fin) | |
{ | |
rd_.done = true; | |
} | |
} | |
else | |
{ | |
// Read compressed message frame payload: | |
// inflate even if rd_.fh.len == 0, otherwise we | |
// never emit the end-of-stream deflate block. | |
// | |
bool did_read = false; | |
consuming_buffers<MutableBufferSequence> cb{buffers}; | |
while(buffer_size(cb) > 0) | |
{ | |
zlib::z_params zs; | |
{ | |
auto const out = buffer_front(cb); | |
zs.next_out = buffer_cast<void*>(out); | |
zs.avail_out = buffer_size(out); | |
BOOST_ASSERT(zs.avail_out > 0); | |
} | |
if(rd_.remain > 0) | |
{ | |
if(rd_.buf.size() > 0) | |
{ | |
// use what's there | |
auto const in = buffer_prefix( | |
clamp(rd_.remain), buffer_front( | |
rd_.buf.data())); | |
zs.avail_in = buffer_size(in); | |
zs.next_in = buffer_cast<void const*>(in); | |
} | |
else if(! did_read) | |
{ | |
// read new | |
auto const bytes_transferred = | |
stream_.read_some( | |
rd_.buf.prepare(read_size( | |
rd_.buf, rd_.buf.max_size())), | |
ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
BOOST_ASSERT(bytes_transferred > 0); | |
rd_.buf.commit(bytes_transferred); | |
if(rd_.fh.mask) | |
detail::mask_inplace( | |
buffer_prefix(clamp(rd_.remain), | |
rd_.buf.mutable_data()), rd_.key); | |
auto const in = buffer_prefix( | |
clamp(rd_.remain), buffer_front( | |
rd_.buf.data())); | |
zs.avail_in = buffer_size(in); | |
zs.next_in = buffer_cast<void const*>(in); | |
did_read = true; | |
} | |
else | |
{ | |
break; | |
} | |
} | |
else if(rd_.fh.fin) | |
{ | |
// append the empty block codes | |
static std::uint8_t constexpr | |
empty_block[4] = { | |
0x00, 0x00, 0xff, 0xff }; | |
zs.next_in = empty_block; | |
zs.avail_in = sizeof(empty_block); | |
pmd_->zi.write(zs, zlib::Flush::sync, ec); | |
BOOST_ASSERT(! ec); | |
if(ec) | |
{ | |
failed_ = true; | |
return bytes_written; | |
} | |
// VFALCO See: | |
// https://github.com/madler/zlib/issues/280 | |
BOOST_ASSERT(zs.total_out == 0); | |
cb.consume(zs.total_out); | |
rd_.size += zs.total_out; | |
bytes_written += zs.total_out; | |
if( | |
(role_ == role_type::client && | |
pmd_config_.server_no_context_takeover) || | |
(role_ == role_type::server && | |
pmd_config_.client_no_context_takeover)) | |
pmd_->zi.reset(); | |
rd_.done = true; | |
break; | |
} | |
else | |
{ | |
break; | |
} | |
pmd_->zi.write(zs, zlib::Flush::sync, ec); | |
BOOST_ASSERT(ec != zlib::error::end_of_stream); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
if(rd_msg_max_ && beast::detail::sum_exceeds( | |
rd_.size, zs.total_out, rd_msg_max_)) | |
{ | |
code = close_code::too_big; | |
goto do_close; | |
} | |
cb.consume(zs.total_out); | |
rd_.size += zs.total_out; | |
rd_.remain -= zs.total_in; | |
rd_.buf.consume(zs.total_in); | |
bytes_written += zs.total_out; | |
} | |
if(rd_.op == detail::opcode::text) | |
{ | |
// check utf8 | |
if(! rd_.utf8.write( | |
buffer_prefix(bytes_written, buffers)) || ( | |
rd_.remain == 0 && rd_.fh.fin && | |
! rd_.utf8.finish())) | |
{ | |
code = close_code::bad_payload; | |
goto do_close; | |
} | |
} | |
} | |
return bytes_written; | |
do_close: | |
if(code != close_code::none) | |
{ | |
// Fail the connection (per rfc6455) | |
if(! wr_close_) | |
{ | |
wr_close_ = true; | |
detail::frame_streambuf fb; | |
write_close<flat_static_buffer_base>(fb, code); | |
boost::asio::write(stream_, fb.data(), ec); | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
} | |
websocket_helpers::call_teardown(next_layer(), ec); | |
if(ec == boost::asio::error::eof) | |
{ | |
// Rationale: | |
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error | |
ec.assign(0, ec.category()); | |
} | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
ec = error::failed; | |
failed_ = true; | |
return bytes_written; | |
} | |
if(! ec) | |
{ | |
websocket_helpers::call_teardown(next_layer(), ec); | |
if(ec == boost::asio::error::eof) | |
{ | |
// (See above) | |
ec.assign(0, ec.category()); | |
} | |
} | |
if(! ec) | |
ec = error::closed; | |
failed_ = !!ec; | |
if(failed_) | |
return bytes_written; | |
return bytes_written; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment