Skip to content

Instantly share code, notes, and snippets.

@ArtemGr
Last active December 10, 2015 01:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ArtemGr/4364070 to your computer and use it in GitHub Desktop.
Save ArtemGr/4364070 to your computer and use it in GitHub Desktop.
Duplex ssh synchronization.
/*
MinGW:
boost: "sh bootstrap.sh --with-toolset=mingw --prefix=c:/spool/MinGW/msys/1.0/local"
edit "project-config.jam" removing "mingw"
"./b2 toolset=gcc"
libssh2: "git clone git://git.libssh2.org/libssh2.git && cd libssh2 && sh buildconf && ./configure --disable-shared --disable-examples-build"
"make && make install"
*/
#include <iostream>
using std::cout; using std::cerr; using std::endl; using std::flush;
#include <fstream>
#include <vector>
#include <map>
using std::map;
#include <string>
using std::string;
#include <sstream>
#include <utility>
using std::pair;
#include <stdexcept>
#include <memory>
using std::shared_ptr;
#include <glim/exception.hpp>
#define THROW(message) throw ::glim::Exception (string (message) + "\n" + stackTrace (2), __FILE__, __LINE__)
#include <sys/types.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <winsock2.h>
#include <errno.h>
#include <windows.h> // Sleep
#include <sys/utime.h> // _utime
#include <assert.h>
#include <WinSock2.h>
#if defined (__MINGW32__) || defined (__CYGWIN__)
#include <unistd.h> // unlink
#include "dbg/stack.hpp" // http://www.mr-edd.co.uk/code/stack_trace
#endif
#include <libssh2.h>
#include <libssh2_sftp.h>
// http://stackoverflow.com/a/10474617/257568
#if defined (__MINGW32__) || defined (__CYGWIN__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdelete-non-virtual-dtor"
#pragma GCC diagnostic ignored "-Wstrict-aliasing"
#endif
#include <boost/filesystem.hpp> // http://www.boost.org/doc/libs/1_51_0/libs/filesystem/doc/index.htm
namespace fs = boost::filesystem;
#include <boost/regex.hpp> // http://www.boost.org/doc/libs/1_51_0/libs/regex/doc/html/index.html
// std::to_string not yet in gcc
#include <boost/lexical_cast.hpp>
inline string to_string (int i) {return boost::lexical_cast<string> (i);}
#define BOOST_THREAD_USE_LIB
#include <boost/thread.hpp> // http://www.boost.org/doc/libs/1_51_0/doc/html/thread.html
#include <boost/chrono.hpp>
static inline void sleepMs (int ms = 50) {boost::this_thread::sleep_for (boost::chrono::milliseconds (ms));}
#if defined (__MINGW32__) || defined (__CYGWIN__)
#pragma GCC diagnostic pop
#endif
string stackTrace (int skip = 0) { // NB: Useful only with -O0.
#if defined (__MINGW32__) || defined (__CYGWIN__)
std::ostringstream oss; dbg::stack stack;
for (auto it = stack.begin(), end = stack.end(); it != end; ++it)
if (--skip < 0)
oss << " " << *it << '\n';
return oss.str();
#else
return string();
#endif
}
struct FileInfo {
uint32_t size;
uint64_t mtime;
bool dir;
FileInfo(): size (0), mtime (0), dir (false) {}
FileInfo (const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
size = attrs.flags & LIBSSH2_SFTP_ATTR_SIZE ? attrs.filesize : 0;
mtime = attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME ? attrs.mtime : 0;
dir = attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS ? LIBSSH2_SFTP_S_ISDIR (attrs.permissions) : 0;
}
FileInfo (const fs::directory_entry& ent) {
const bool isDir = fs::is_directory (ent.status());
size = isDir ? 0 : fs::file_size (ent.path());
mtime = fs::last_write_time (ent.path());
dir = isDir;
}
FileInfo (const char* path) {
struct stat si; if (stat (path, &si)) THROW (string ("!stat: ") + path);
size = (uint32_t) si.st_size;
mtime = (uint64_t) si.st_mtime;
dir = LIBSSH2_SFTP_S_ISDIR (si.st_mode);
}
FileInfo (const struct stat& fi) {
size = (uint32_t) fi.st_size;
mtime = (uint64_t) fi.st_mtime;
dir = LIBSSH2_SFTP_S_ISDIR (fi.st_mode);
}
bool isNil() const {return size == 0 && mtime == 0 && dir == false;}
};
inline std::ostream& operator << (std::ostream& os, const FileInfo& fi) {
return os << "{size:" << fi.size << ",mtime:" << fi.mtime << '}';
}
inline bool operator != (const FileInfo& fi1, const FileInfo& fi2) {
return fi1.size != fi2.size || fi1.mtime != fi2.mtime || fi1.dir != fi2.dir;
}
typedef map<string,FileInfo> tree_t;
tree_t operator - (const tree_t& tree, const tree_t& minus) {
tree_t tr (tree);
for (auto it = minus.begin(), end = minus.end(); it != end; ++it) tr.erase (it->first);
return tr;
}
std::ostream& operator << (std::ostream& os, const tree_t& tr) {
os << '{';
auto it = tr.begin(), end = tr.end();
while (it != end) {
os << it->first << ':' << it->second;
if (++it != end) os << ',';
}
os << '}';
return os;
}
struct File {
string _path;
FileInfo _fileInfo;
string _body;
File (const string& path, const FileInfo& fi, const string& body): _path (path), _fileInfo (fi), _body (body) {}
};
struct FileSystem {
virtual void connect (bool force = false) = 0;
virtual tree_t ls (const char* path) = 0;
virtual tree_t tree (const char* path = "", const char* ifMatches = NULL) {
boost::regex filter (ifMatches && *ifMatches ? ifMatches : ".");
tree_t tr;
std::function<void(string)> loop; loop = [&](string subdir){
string full = subdir.empty() ? path : string (path) + '/' + subdir;
tree_t lst = ls (full.c_str()); for (auto it = lst.begin(), end = lst.end(); it != end; ++it) {
string name = it->first;
if (name.size() > 2 && name.substr (0, 2) == "./") name = name.substr (2);
if (name.empty() || name[0] == '.') continue;
if (it->second.dir) loop (subdir.empty() ? name : subdir + '/' + name);
else {
string itPath (subdir.empty() ? name : subdir + '/' + name);
// http://www.boost.org/doc/libs/1_51_0/libs/regex/doc/html/boost_regex/ref/regex_search.html
if (boost::regex_search (itPath, filter)) tr[itPath] = it->second;
}
}
};
loop (string());
return tr;
}
virtual File read (const char* path, FileInfo* fi = NULL) = 0;
virtual void write (const File& file) = 0;
virtual void remove (const char* path) = 0;
virtual string toString() const = 0;
virtual ~FileSystem() {}
};
inline std::ostream& operator << (std::ostream& os, const FileSystem& fs) {
return os << fs.toString();
}
void mkdirs (string path, const std::function<void(string)>& mkdirFun) {
auto slash = path.find_last_of ('/');
if (slash == string::npos) return;
string dir = path.substr (0, slash);
mkdirs (dir, mkdirFun);
mkdirFun (dir);
}
struct LocalFs: public FileSystem {
string _root;
LocalFs (string root): _root (root) {}
virtual void connect (bool force = false) {
fs::create_directories (_root);
if (!FileInfo (_root.c_str()) .dir) THROW (_root + " is not a directory.");
}
virtual tree_t ls (const char* path) {
fs::directory_iterator dit (_root + '/' + path), eod;
tree_t tr; while (dit != eod) tr[dit->path().filename().string()] = FileInfo (*dit++);
return tr;
}
virtual File read (const char* path, FileInfo* fi = NULL) {
std::ostringstream buf;
string fpath (_root + '/' + path);
// Read ecpp files in text mode, converting CRLF to LF.
bool ecpp = fpath.find (".ecpp", fpath.size() - 5) != string::npos;
std::ios_base::openmode mode = ecpp ? std::ios_base::in : (std::ios_base::in | std::ios_base::binary);
std::ifstream is (fpath, mode);
buf << is.rdbuf();
return File (path, fi ? *fi : FileInfo (fpath.c_str()), buf.str());
}
virtual void write (const File& file) {
string fpath (_root + '/' + file._path);
fs::create_directories (fs::path (fpath) .parent_path());
std::ofstream os (fpath.c_str(), std::ios_base::out | std::ios_base::binary); os << file._body; os.close();
struct _utimbuf tb;
tb.actime = file._fileInfo.mtime;
tb.modtime = file._fileInfo.mtime;
_utime ((const char*) fpath.c_str(), &tb); // http://msdn.microsoft.com/en-us/library/4wacf567%28v=vs.110%29.aspx
}
virtual void remove (const char* path) {
string fpath (_root + '/' + path);
#ifdef _MSC_VER
_unlink (fpath.c_str());
#else
unlink (fpath.c_str());
#endif
}
virtual string toString() const {return _root;}
virtual ~LocalFs() {}
};
string readChannel (LIBSSH2_SESSION *session, LIBSSH2_CHANNEL* channel, int limit = -1, bool readError = false) {
char buf[32*1024]; // cf. http://article.gmane.org/gmane.network.ssh.libssh2.devel/691
int got = 0;
std::ostringstream sbuf;
readMore:
int need = sizeof (buf);
if (limit > 0 && limit - got < need) need = limit - got;
assert (need <= (int) sizeof (buf));
int rc = libssh2_channel_read (channel, buf, need);
if (rc == 0 && readError) rc = libssh2_channel_read_stderr (channel, buf, need);
if (rc > 0) sbuf.write (buf, rc);
else if (rc == LIBSSH2_ERROR_EAGAIN || rc == LIBSSH2_ERROR_TIMEOUT) {sleepMs(); goto readMore;}
else if (rc < 0) {
char* errmsg; int errmsg_len;
libssh2_session_last_error (session, &errmsg, &errmsg_len, 0);
std::ostringstream oss; oss << "!read; rc: " << rc << "; got: " << got << "; emsg: " << errmsg;
THROW (oss.str());
}
got += rc;
if (limit == -1 && rc > 0) goto readMore;
if (limit > 0 && got < limit) goto readMore;
return sbuf.str();
}
struct SshFs: public FileSystem {
string _host; int _port; string _root;
shared_ptr<void> _sock;
shared_ptr<LIBSSH2_SESSION> _session;
shared_ptr<LIBSSH2_SFTP> _sftp;
SshFs (string host, int port, const string& root): _host (host), _port (port), _root (root) {}
virtual void connect (bool force = false) { // cf. http://www.libssh2.org/examples/scp.html
tryAgain: try {
if (_session.get() && !force) {
int seconds_to_next = 0;
int rc = libssh2_keepalive_send (_session.get(), &seconds_to_next);
if (rc == 0) return; // Existing session is still active.
}
struct hostent *ip = gethostbyname (_host.c_str());
if (!ip) THROW (_host);
_sftp.reset();
_session.reset();
_sock.reset ((void*) socket (AF_INET, SOCK_STREAM, 0), [](void* sock) {
if ((SOCKET) sock != INVALID_SOCKET) closesocket ((SOCKET) sock);});
if ((SOCKET) _sock.get() == INVALID_SOCKET) THROW ("!socket");
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons (_port);
memcpy (&sin.sin_addr, ip->h_addr_list[0], ip->h_length);
int rc = ::connect ((SOCKET) _sock.get(), (struct sockaddr*)(&sin), sizeof (struct sockaddr_in));
if (rc) THROW (string ("!connect: ") + strerror (errno));
_session.reset (libssh2_session_init(), [](LIBSSH2_SESSION* ses){
if (ses) {libssh2_session_disconnect (ses, "bye"); libssh2_session_free (ses);}});
if (!_session.get()) THROW ("!session");
auto serr = [this]()->string {
char* errmsg; int errmsg_len;
libssh2_session_last_error (_session.get(), &errmsg, &errmsg_len, 0);
return string (errmsg, errmsg_len);
};
libssh2_session_set_blocking (_session.get(), 1); // Make sure we use the blocking mode.
// http://thread.gmane.org/gmane.network.ssh.libssh2.devel/6203/focus=6204
rc = libssh2_session_flag (_session.get(), LIBSSH2_FLAG_COMPRESS, 1);
if (rc) cout << "warning: !compress " + serr() << endl;
rc = libssh2_session_handshake (_session.get(), (SOCKET) _sock.get());
if (rc) THROW ("!handshake: " + serr());
if (libssh2_userauth_password (_session.get(), USER, PASS)) THROW ("!pass: " + serr());
// This will produce LIBSSH2_ERROR_BAD_USE on long writes!
//libssh2_keepalive_config (_session.get(), 1, 2);
} catch (const std::exception& ex) {
cerr << "!connect: " << ex.what() << endl;
Sleep (1000);
goto tryAgain;
}
}
shared_ptr<LIBSSH2_SFTP> getSftp() {
if (_sftp.get() == NULL) _sftp.reset (libssh2_sftp_init (_session.get()), [](LIBSSH2_SFTP* ses){if (ses) libssh2_sftp_shutdown (ses);});
return _sftp;
}
string fullPath (string relativePath) {return _root.empty() ? relativePath : _root + '/' + relativePath;}
virtual tree_t ls (const char* path) {
// cf. http://www.libssh2.org/examples/sftpdir.html
tree_t tr;
string dpath (fullPath (path));
shared_ptr<LIBSSH2_SFTP_HANDLE> dir (libssh2_sftp_opendir (getSftp().get(), dpath.c_str()),
[](LIBSSH2_SFTP_HANDLE* dh){libssh2_sftp_closedir (dh);});
char name[128], longentry[128]; LIBSSH2_SFTP_ATTRIBUTES attrs;
loop:
int len = libssh2_sftp_readdir_ex (dir.get(), name, sizeof (name), longentry, sizeof (longentry), &attrs);
if (len == LIBSSH2_ERROR_EAGAIN) {sleepMs(); goto loop;}
if (len < 0) THROW ("!readdir: " + dpath);
if (len > 0) {
tr[name] = FileInfo (attrs);
goto loop;
}
return tr;
}
virtual File read (const char* path, FileInfo* fi = NULL) {
// cf. http://www.libssh2.org/examples/scp.html
string fpath (fullPath (path));
struct stat fileinfo;
shared_ptr<LIBSSH2_CHANNEL> channel (
libssh2_scp_recv (_session.get(), fpath.c_str(), &fileinfo),
[](LIBSSH2_CHANNEL* chan) {if (chan) {libssh2_channel_close (chan); libssh2_channel_wait_closed (chan); libssh2_channel_free (chan);}});
if (!channel.get()) THROW ("!recv: " + _host + ':' + fpath);
string body = readChannel (_session.get(), channel.get(), fileinfo.st_size);
return File (path, fi ? *fi : FileInfo (fileinfo), body);
}
virtual void write (const File& file) {
// cf. http://www.libssh2.org/examples/scp_write.html
string fpath (fullPath (file._path));
mkdirs (fpath, [&](string dir){
repeatMkdir:
int rc = libssh2_sftp_mkdir_ex (getSftp().get(), dir.c_str(), dir.size(), 0775);
if (rc == LIBSSH2_ERROR_EAGAIN) {sleepMs(); goto repeatMkdir;}
});
shared_ptr<LIBSSH2_CHANNEL> channel ( // http://www.libssh2.org/libssh2_scp_send64.html
libssh2_scp_send64 (_session.get(), fpath.c_str(), 0664, file._body.size(), file._fileInfo.mtime, file._fileInfo.mtime),
[](LIBSSH2_CHANNEL* chan) {if (chan) {libssh2_channel_close (chan); libssh2_channel_wait_closed (chan); libssh2_channel_free (chan);}});
if (!channel.get()) {
char *errmsg; int errlen; libssh2_session_last_error (_session.get(), &errmsg, &errlen, 0);
THROW ("!send: " + _host + ':' + fpath + "; " + errmsg);}
uint32_t pos = 0;
while (pos < file._body.size()) {
// 2012-10-07: libssh2 1.4.2 and 1.4.3(git) have problems sending large files.
int rc = libssh2_channel_write_ex (channel.get(), 0, file._body.data() + pos, file._body.size() - pos);
if (rc == 0) {sleepMs(); continue;} // http://thread.gmane.org/gmane.network.ssh.libssh2.devel/6205
if (rc == LIBSSH2_ERROR_EAGAIN) {sleepMs(); continue;}
if (rc == LIBSSH2_ERROR_TIMEOUT) {sleepMs(); continue;} // Timeout is okay, http://thread.gmane.org/gmane.network.ssh.libssh2.devel/5792/focus=5793
if (rc == LIBSSH2_ERROR_BAD_USE) THROW ("!write: LIBSSH2_ERROR_BAD_USE");
if (rc > 0) pos += rc;
else if (rc < 0) THROW ("!write: " + _host + ':' + fpath + "; rc: " + to_string (rc));
}
repeatSendEof:
int rc = libssh2_channel_send_eof (channel.get());
if (rc == LIBSSH2_ERROR_EAGAIN) {sleepMs(); goto repeatSendEof;}
if (rc) THROW ("!send_eof; rc: " + to_string (rc));
repeatWaitEof:
rc = libssh2_channel_wait_eof (channel.get());
if (rc == LIBSSH2_ERROR_EAGAIN) {sleepMs(); goto repeatWaitEof;}
if (rc == LIBSSH2_ERROR_TIMEOUT) THROW ("!wait_eof; rc: LIBSSH2_ERROR_TIMEOUT");
if (rc) THROW ("!wait_eof; rc: " + to_string (rc));
}
virtual void remove (const char* path) {
string fpath (fullPath (path));
repeatUnlink:
int rc = libssh2_sftp_unlink_ex (getSftp().get(), fpath.c_str(), fpath.size()); // http://www.libssh2.org/libssh2_sftp_unlink_ex.html
if (rc == LIBSSH2_ERROR_EAGAIN) {sleepMs(); goto repeatUnlink;}
if (rc) {
int sftpCode = libssh2_sftp_last_error (_sftp.get());
if (sftpCode == LIBSSH2_FX_NO_SUCH_FILE) return; // File already removed, that's okay.
THROW ("!unlink: " + _host + ':' + fpath + "; rc: " + to_string (rc) + "; sftpCode: " + to_string (sftpCode));
}
}
virtual string toString() const {return _host + ':' + _root;}
virtual ~SshFs() {}
};
tree_t changedFiles (const tree_t& was, const tree_t& now) {
tree_t changed;
for (tree_t::const_iterator it = now.begin(), end = now.end(); it != end; ++it) {
tree_t::const_iterator wit = was.find (it->first);
if (wit == was.end()) changed.insert (*it); // New file.
else if (wit->second != it->second) changed.insert (*it); // File changed.
}
for (tree_t::const_iterator it = was.begin(), end = was.end(); it != end; ++it) {
if (now.count (it->first) == 0) changed[it->first] = FileInfo(); // File removed.
}
return changed;
}
void copyFiles (const tree_t& files, FileSystem& from, FileSystem& to) {
if (files.empty()) return;
for (tree_t::const_iterator it = files.begin(), end = files.end(); it != end; ++it) {
if (it->second.isNil()) {
cout << "Removing " << it->first << " from " << to << " ... " << flush;
to.remove (it->first.c_str());
cout << "done." << endl;
} else {
cout << "Copying " << it->first << " from " << from << " to " << to << " ... " << flush;
to.write (from.read (it->first.c_str()));
cout << "done." << endl;
}
}
}
pair<int, string> compile (SshFs& fs) {
// cf. http://www.libssh2.org/examples/ssh2_exec.html; http://www.libssh2.org/examples/ssh2.html
shared_ptr<LIBSSH2_CHANNEL> channel ( // http://www.libssh2.org//libssh2_channel_open_ex.html
libssh2_channel_open_ex (fs._session.get(), "session", sizeof ("session") - 1,
LIBSSH2_CHANNEL_WINDOW_DEFAULT, LIBSSH2_CHANNEL_PACKET_DEFAULT, NULL, 0),
[](LIBSSH2_CHANNEL* chan) {if (chan) {libssh2_channel_close (chan); libssh2_channel_wait_closed (chan); libssh2_channel_free (chan);}});
if (!channel.get()) {
char *errmsg; int errlen; libssh2_session_last_error (fs._session.get(), &errmsg, &errlen, 0);
THROW ("!compile channel: " + fs._host + "; " + errmsg);}
const char* cmd = "make -f /root/work/proge/makefile fromSync";
int rc = libssh2_channel_exec (channel.get(), cmd); if (rc) {
char *errmsg; int errlen; libssh2_session_last_error (fs._session.get(), &errmsg, &errlen, 0);
THROW ("!compile startup: " + fs._host + "; " + errmsg);}
string response = readChannel (fs._session.get(), channel.get(), -1, true);
return pair<int, string> (libssh2_channel_get_exit_status (channel.get()), response);
}
bool haveMatch (const tree_t& tr, const char* pattern) {
if (tr.empty()) return false;
boost::regex re (pattern);
for (auto it = tr.begin(), end = tr.end(); it != end; ++it)
if (boost::regex_search (it->first, re)) return true;
return false;
}
/*noreturn*/ void sync () {
LocalFs here ("progeSync"); here.connect();
cout << "Connecting to foo ... " << flush;
SshFs foo ("foo.server4you.de", 22, "/var/www/prog.com"); foo.connect();
cout << "connected." << endl;
cout << "Connecting to bar ... " << flush;
SshFs bar ("bar.server4you.de", 22, "/var/www/prog.com"); bar.connect();
cout << "connected." << endl;
auto getTree = [&](FileSystem* fs, const char* path, const char* filter){
repeat: try {return fs->tree (path, filter);} catch (const std::exception& ex) {
cerr << ex.what() << endl;
fs->connect (true);
goto repeat;
}
};
std::function<tree_t()> getLocalTree ([&]{return getTree (&here, "", "");});
std::function<tree_t()> getRemoteTree ([&]{return getTree (&foo, "", "");});
cout << "Fetching indexes ... " << flush;
tree_t localTree (getLocalTree());
tree_t remoteTree (getRemoteTree());
cout << "done." << endl;
tree_t initialSync (changedFiles (localTree, remoteTree));
auto repeatOnError = [&](std::function<void()> fun, const char* emsg){
repeat: try {fun();} catch (const std::exception& ex) {
cerr << emsg << ex.what() << endl;
Sleep (500);
foo.connect (true); bar.connect (true);
goto repeat;
}
};
if (!initialSync.empty()) {
repeatOnError ([&](){
copyFiles (initialSync, foo, here);
}, "error copying files from remote: ");
localTree = getLocalTree();
}
bool statusMessage = true;
for (uint32_t count = 1;; ++count) {
if (statusMessage) {
cout << "--- Waiting for changes ---" << endl;
statusMessage = false;
}
tree_t newLocalTree (getLocalTree());
tree_t localChanges (changedFiles (localTree, newLocalTree));
localTree = newLocalTree;
repeatOnError ([&](){foo.connect();}, "foo connection: "); // Keepalive.
tree_t remoteChanges;
if (count % 100 == 0) {
cout << "Fetching remote index ... " << flush;
tree_t newRemoteTree (getRemoteTree());
cout << "done." << endl;
remoteChanges = changedFiles (remoteTree, newRemoteTree) - localChanges;
remoteTree = newRemoteTree;
statusMessage = true;
}
// Propagate local changes to foo.
if (!localChanges.empty()) repeatOnError ([&](){
cout << "localChanges: " << localChanges << endl;
copyFiles (localChanges, here, foo);
if (haveMatch (localChanges, "\\.ecpp$")) {
cout << "compiling ... " << flush;
pair<int, string> cret = compile (foo);
cout << "status: " << cret.first << "; output:" << endl << cret.second << endl;
}
statusMessage = true;
}, "error propagating localChanges: ");
// To bar.
if (!localChanges.empty()) repeatOnError ([&](){
copyFiles (localChanges, here, bar);
if (haveMatch (localChanges, "\\.ecpp$")) {
cout << "compiling ... " << flush;
pair<int, string> cret = compile (bar);
cout << "status: " << cret.first << "; output: " << endl << cret.second << endl;
}
}, "error propagating localChanges: ");
// Propagate remote changes from foo to local filesystem.
if (!remoteChanges.empty()) repeatOnError ([&](){
cout << "remoteChanges: " << remoteChanges << endl;
copyFiles (remoteChanges, foo, here);
localTree = getLocalTree();
statusMessage = true;
}, "error getting remoteChanges: ");
// If we've updated remote then we need to reindex it in order not to propagate our own changes back.
if (!localChanges.empty()) repeatOnError ([&](){
remoteTree = getRemoteTree();
}, "error reading remote tree: ");
if (!statusMessage) Sleep (500);
}
}
int main () {
WSADATA wsadata;
int rc = WSAStartup (MAKEWORD (2,0), &wsadata); // Init winsock.
if (rc) {cerr << "!winsock" << endl; exit (1);}
rc = libssh2_init (0);
if (rc) {cerr << "!libssh2_init" << endl; exit (1);}
try {sync();} catch (const std::exception& ex) {
cerr << ex.what() << endl; libssh2_exit(); exit (1);}
libssh2_exit();
return 0;
}
@ncm
Copy link

ncm commented Jan 31, 2013

Would you please see if this libssh2 patch fixes your keepalive problem? Tx.

Index: src/transport.c
===================================================================
--- src/transport.c (revision 67831)
+++ src/transport.c (working copy)
@@ -583,30 +583,12 @@
 }

 static int
-send_existing(LIBSSH2_SESSION *session, const unsigned char *data,
-              size_t data_len, ssize_t *ret)
+send_existing(LIBSSH2_SESSION *session)
 {
     ssize_t rc;
     ssize_t length;
     struct transportpacket *p = &session->packet;

-    if (!p->olen) {
-        *ret = 0;
-        return LIBSSH2_ERROR_NONE;
-    }
-
-    /* send as much as possible of the existing packet */
-    if ((data != p->odata) || (data_len != p->olen)) {
-        /* When we are about to complete the sending of a packet, it is vital
-           that the caller doesn't try to send a new/different packet since
-           we don't add this one up until the previous one has been sent. To
-           make the caller really notice his/hers flaw, we return error for
-           this case */
-        return LIBSSH2_ERROR_BAD_USE;
-    }
-
-    *ret = 1;                   /* set to make our parent return */
-
     /* number of bytes left to send */
     length = p->ototal_num - p->osent;

@@ -627,9 +609,7 @@
         /* the remainder of the package was sent */
         p->ototal_num = 0;
         p->olen = 0;
-        /* we leave *ret set so that the parent returns as we MUST return back
-           a send success now, so that we don't risk sending EAGAIN later
-           which then would confuse the parent function */
+        session->socket_block_directions &= ~LIBSSH2_SESSION_BLOCK_OUTBOUND;
         return LIBSSH2_ERROR_NONE;

     }
@@ -645,7 +625,7 @@

     p->osent += rc;         /* we sent away this much data */

-    return rc < length ? LIBSSH2_ERROR_EAGAIN : LIBSSH2_ERROR_NONE;
+    return LIBSSH2_ERROR_EAGAIN;
 }

 /*
@@ -709,18 +689,37 @@
     if(data2)
         debugdump(session, "libssh2_transport_write plain2", data2, data2_len);

-    /* FIRST, check if we have a pending write to complete. send_existing
-       only sanity-check data and data_len and not data2 and data2_len!! */
-    rc = send_existing(session, data, data_len, &ret);
-    if (rc)
-        return rc;
+    /* FIRST, check if we have a pending write to complete. */

-    session->socket_block_directions &= ~LIBSSH2_SESSION_BLOCK_OUTBOUND;
+    if (p->olen) {
+        /* Part of a previous write remains to be sent:

-    if (ret)
-        /* set by send_existing if data was sent */
-        return rc;
+        /* is the pending data from a regular _ssh2_channel_write()? */
+        if (p->odata[0] == SSH_MSG_CHANNEL_DATA ||
+                p->odata[0] == SSH_MSG_CHANNEL_EXTENDED_DATA) {
+            
+            /* is the present argument a control message? */
+            if (p->odata != data) {
+                /* we cannot resume sending the user message because we would
+                 * have no way to report the result; so, we cannot send the
+                 * control message either.
+                 */
+                return LIBSSH2_ERROR_EAGAIN;
+            }
+            /* present argument is user data, assume it is a re-try */
+            rc = send_existing(session);
+            /* must report result to the caller immediately */
+            return rc;
+        }

+        /* pending data is from a control message */
+        rc = send_existing(session);
+        if (rc)
+            return rc;
+
+        /* completed, send present argument too */
+    }
+
     encrypted = (session->state & LIBSSH2_STATE_NEWKEYS) ? 1 : 0;

     if (encrypted && session->local.comp->compress) {

@ArtemGr
Copy link
Author

ArtemGr commented Jun 17, 2013

Oops, ncm, I haven't seen this comment till now. Will give it a go.

@ArtemGr
Copy link
Author

ArtemGr commented Jun 25, 2013

Nathan, I've applied the patch (to the latest git of libssh) and trying it. Will report in a few days.

@ArtemGr
Copy link
Author

ArtemGr commented Jun 25, 2013

Nathan, looks good so far, I have no problem uploading large files with keepalive enabled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment