Skip to content

Instantly share code, notes, and snippets.

@lichray
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lichray/f5f577cf1b76ed5c26f2 to your computer and use it in GitHub Desktop.
Save lichray/f5f577cf1b76ed5c26f2 to your computer and use it in GitHub Desktop.
librsync job reader & writer
/*
* Copyright 2015 Rackspace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <librsync.h>
#include <array>
#include <cstring>
#if defined(_WIN32)
#include <ciso646>
#include <io.h>
#else
#include <unistd.h>
#define _read read
#define _write write
#endif
namespace deuterium
{
const size_t iobufsize = 16 * 1024;
struct rsync_base
{
rsync_base(int fd, rs_job_t* job) :
fd_(fd),
job_(job),
rbuf_()
{}
char const* message() const
{
return rs_strerror(r_);
}
~rsync_base()
{
rs_job_free(job_);
}
protected:
int fd_;
rs_job_t* job_;
rs_buffers_t rbuf_;
rs_result r_;
};
struct rsync_writer : rsync_base
{
rsync_writer(int fd, rs_job_t* job) :
rsync_base(fd, job)
{
reset_buffer();
}
bool update(char* p, size_t sz)
{
rbuf_.next_in = p;
rbuf_.avail_in = sz;
continue_job();
return r_ == RS_BLOCKED or r_ == RS_DONE;
}
bool final()
{
rbuf_.eof_in = true;
continue_job();
while (r_ == RS_BLOCKED and rbuf_.avail_out != buf_.size() and
do_flush())
r_ = rs_job_iter(job_, &rbuf_);
return r_ == RS_DONE and do_flush();
}
private:
void continue_job()
{
for (;;)
{
r_ = rs_job_iter(job_, &rbuf_);
// output buffer out-of-space
if (r_ == RS_BLOCKED and rbuf_.avail_in != 0 and
do_flush())
continue;
break;
}
}
void reset_buffer()
{
rbuf_.next_out = buf_.data();
rbuf_.avail_out = buf_.size();
}
bool do_flush()
{
auto blen = int(buf_.size() - rbuf_.avail_out);
bool ok = _write(fd_, buf_.data(), blen) == blen;
if (ok)
reset_buffer();
else
r_ = RS_IO_ERROR;
return ok;
}
std::array<char, iobufsize> buf_;
};
struct rsync_reader : rsync_base
{
rsync_reader(int fd, rs_job_t* job) :
rsync_base(fd, job)
{
reset_buffer();
}
bool apply(char* p, size_t sz)
{
rbuf_.next_out = p;
rbuf_.avail_out = sz;
continue_job();
return r_ == RS_BLOCKED or r_ == RS_DONE;
}
size_t pending_bytes() const
{
return rbuf_.avail_out;
}
bool load_all()
{
do
continue_job();
while (r_ == RS_BLOCKED);
return r_ == RS_DONE;
}
private:
void continue_job()
{
// enough inactive space to load data
if ((buf_.size() - rbuf_.avail_in) >= iobufsize)
{
// not enough unused space
if ((buf_.size() - (rbuf_.next_in - buf_.data() +
rbuf_.avail_in)) < iobufsize)
{
memmove(buf_.data(), rbuf_.next_in,
rbuf_.avail_in);
reset_buffer();
}
auto n = _read(fd_, rbuf_.next_in +
rbuf_.avail_in, iobufsize);
if (n == -1)
{
r_ = RS_IO_ERROR;
return;
}
else if (n == 0)
rbuf_.eof_in = true;
else
rbuf_.avail_in += n;
}
r_ = rs_job_iter(job_, &rbuf_);
}
void reset_buffer()
{
rbuf_.next_in = buf_.data();
}
// make enough space to load pending data + iobufsize
// hope byte-oriented read(2) still likes unaligned address
std::array<char, iobufsize * 2> buf_;
};
}
#undef _write
#undef _read
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment