Skip to content

Instantly share code, notes, and snippets.

@Kentzo
Created October 21, 2012 12:35
Show Gist options
  • Save Kentzo/3926860 to your computer and use it in GitHub Desktop.
Save Kentzo/3926860 to your computer and use it in GitHub Desktop.
NEW_SOCKET from
#include <CoreFoundation/CFSocket.h>
#include "CFInternal.h"
#include <dispatch/dispatch.h>
#include <dispatch/private.h>
#include <netinet/in.h>
#include <sys/sysctl.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <dlfcn.h>
#include <sys/select.h>
extern void _CFRunLoopSourceWakeUpRunLoops(CFRunLoopSourceRef rls);
#define INVALID_SOCKET (CFSocketNativeHandle)(-1)
#define MAX_SOCKADDR_LEN 256
DISPATCH_HELPER_FUNCTIONS(sock, CFSocket)
static Boolean sockfd_is_readable(int fd) {
if (fd < 0 || 1048576 <= fd) HALT;
size_t sz = ((fd + CHAR_BIT) / CHAR_BIT) + 7; // generous
fd_set *fdset = malloc(sz);
int ret;
do {
memset(fdset, 0, sz);
FD_SET(fd, fdset);
struct timespec ts = {0, 1000UL}; // 1 us
ret = pselect(fd + 1, fdset, NULL, NULL, &ts, NULL);
} while (ret < 0 && (EINTR == errno || EAGAIN == errno));
Boolean isSet = ((0 < ret) && FD_ISSET(fd, fdset));
free(fdset);
return isSet;
}
static Boolean sockfd_is_writeable(int fd) {
if (fd < 0 || 1048576 <= fd) HALT;
size_t sz = ((fd + CHAR_BIT) / CHAR_BIT) + 7; // generous
fd_set *fdset = malloc(sz);
int ret;
do {
memset(fdset, 0, sz);
FD_SET(fd, fdset);
struct timespec ts = {0, 1000UL}; // 1 us
ret = pselect(fd + 1, NULL, fdset, NULL, &ts, NULL);
} while (ret < 0 && (EINTR == errno || EAGAIN == errno));
Boolean isSet = ((0 < ret) && FD_ISSET(fd, fdset));
free(fdset);
return isSet;
}
enum {
kCFSocketStateReady = 0,
kCFSocketStateInvalidating = 1,
kCFSocketStateInvalid = 2,
kCFSocketStateDeallocating = 3
};
struct __shared_blob {
dispatch_source_t _rdsrc;
dispatch_source_t _wrsrc;
CFRunLoopSourceRef _source;
CFSocketNativeHandle _socket;
uint8_t _closeFD;
uint8_t _refCnt;
};
struct __CFSocket {
CFRuntimeBase _base;
struct __shared_blob *_shared; // non-NULL when valid, NULL when invalid
uint8_t _state:2; // mutable, not written safely
uint8_t _isSaneFD:1; // immutable
uint8_t _connOriented:1; // immutable
uint8_t _wantConnect:1; // immutable
uint8_t _wantWrite:1; // immutable
uint8_t _wantReadType:2; // immutable
uint8_t _error;
uint8_t _rsuspended:1;
uint8_t _wsuspended:1;
uint8_t _readable:1;
uint8_t _writeable:1;
uint8_t _unused:4;
uint8_t _reenableRead:1;
uint8_t _readDisabled:1;
uint8_t _reenableWrite:1;
uint8_t _writeDisabled:1;
uint8_t _connectDisabled:1;
uint8_t _connected:1;
uint8_t _leaveErrors:1;
uint8_t _closeOnInvalidate:1;
int32_t _runLoopCounter;
CFDataRef _address; // immutable, once created
CFDataRef _peerAddress; // immutable, once created
CFSocketCallBack _callout; // immutable
CFSocketContext _context; // immutable
};
CF_INLINE Boolean __CFSocketIsValid(CFSocketRef sock) {
return kCFSocketStateReady == sock->_state;
}
static CFStringRef __CFSocketCopyDescription(CFTypeRef cf) {
CFSocketRef sock = (CFSocketRef)cf;
CFStringRef contextDesc = NULL;
if (NULL != sock->_context.info && NULL != sock->_context.copyDescription) {
contextDesc = sock->_context.copyDescription(sock->_context.info);
}
if (NULL == contextDesc) {
contextDesc = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFSocket context %p>"), sock->_context.info);
}
Dl_info info;
void *addr = sock->_callout;
const char *name = (dladdr(addr, &info) && info.dli_saddr == addr && info.dli_sname) ? info.dli_sname : "???";
int avail = -1;
ioctlsocket(sock->_shared ? sock->_shared->_socket : -1, FIONREAD, &avail);
CFStringRef result = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR(
"<CFSocket %p [%p]>{valid = %s, socket = %d, "
"want connect = %s, connect disabled = %s, "
"want write = %s, reenable write = %s, write disabled = %s, "
"want read = %s, reenable read = %s, read disabled = %s, "
"leave errors = %s, close on invalidate = %s, connected = %s, "
"last error code = %d, bytes available for read = %d, "
"source = %p, callout = %s (%p), context = %@}"),
cf, CFGetAllocator(sock), __CFSocketIsValid(sock) ? "Yes" : "No", sock->_shared ? sock->_shared->_socket : -1,
sock->_wantConnect ? "Yes" : "No", sock->_connectDisabled ? "Yes" : "No",
sock->_wantWrite ? "Yes" : "No", sock->_reenableWrite ? "Yes" : "No", sock->_writeDisabled ? "Yes" : "No",
sock->_wantReadType ? "Yes" : "No", sock->_reenableRead ? "Yes" : "No", sock->_readDisabled? "Yes" : "No",
sock->_leaveErrors ? "Yes" : "No", sock->_closeOnInvalidate ? "Yes" : "No", sock->_connected ? "Yes" : "No",
sock->_error, avail,
sock->_shared ? sock->_shared->_source : NULL, name, addr, contextDesc);
if (NULL != contextDesc) {
CFRelease(contextDesc);
}
return result;
}
static void __CFSocketDeallocate(CFTypeRef cf) {
CHECK_FOR_FORK_RET();
CFSocketRef sock = (CFSocketRef)cf;
// Since CFSockets are cached, we can only get here sometime after being invalidated
sock->_state = kCFSocketStateDeallocating;
if (sock->_peerAddress) {
CFRelease(sock->_peerAddress);
sock->_peerAddress = NULL;
}
if (sock->_address) {
CFRelease(sock->_address);
sock->_address = NULL;
}
}
static CFTypeID __kCFSocketTypeID = _kCFRuntimeNotATypeID;
static const CFRuntimeClass __CFSocketClass = {
0,
"CFSocket",
NULL, // init
NULL, // copy
__CFSocketDeallocate,
NULL, // equal
NULL, // hash
NULL, //
__CFSocketCopyDescription
};
static CFMutableArrayRef __CFAllSockets = NULL;
CFTypeID CFSocketGetTypeID(void) {
if (_kCFRuntimeNotATypeID == __kCFSocketTypeID) {
__kCFSocketTypeID = _CFRuntimeRegisterClass(&__CFSocketClass);
__CFAllSockets = CFArrayCreateMutable(kCFAllocatorSystemDefault, 0, &kCFTypeArrayCallBacks);
struct rlimit lim1;
int ret1 = getrlimit(RLIMIT_NOFILE, &lim1);
int mib[] = {CTL_KERN, KERN_MAXFILESPERPROC};
int maxfd = 0;
size_t len = sizeof(int);
int ret0 = sysctl(mib, 2, &maxfd, &len, NULL, 0);
if (0 == ret0 && 0 == ret1 && lim1.rlim_max < maxfd) maxfd = lim1.rlim_max;
if (0 == ret1 && lim1.rlim_cur < maxfd) {
struct rlimit lim2 = lim1;
lim2.rlim_cur += 2304;
if (maxfd < lim2.rlim_cur) lim2.rlim_cur = maxfd;
setrlimit(RLIMIT_NOFILE, &lim2);
// we try, but do not go to extraordinary measures
}
}
return __kCFSocketTypeID;
}
CFSocketRef CFSocketCreateWithNative(CFAllocatorRef allocator, CFSocketNativeHandle ufd, CFOptionFlags callBackTypes, CFSocketCallBack callout, const CFSocketContext *context) {
CHECK_FOR_FORK_RET(NULL);
CFSocketGetTypeID(); // cause initialization if necessary
struct stat statbuf;
int ret = fstat(ufd, &statbuf);
if (ret < 0) ufd = INVALID_SOCKET;
Boolean sane = false;
if (INVALID_SOCKET != ufd) {
uint32_t type = (statbuf.st_mode & S_IFMT);
sane = (S_IFSOCK == type) || (S_IFIFO == type) || (S_IFCHR == type);
if (1 && !sane) {
CFLog(kCFLogLevelWarning, CFSTR("*** CFSocketCreateWithNative(): creating CFSocket with silly fd type (%07o) -- may or may not work"), type);
}
}
if (INVALID_SOCKET != ufd) {
Boolean canHandle = false;
int tmp_kq = kqueue();
if (0 <= tmp_kq) {
struct kevent ev[2];
EV_SET(&ev[0], ufd, EVFILT_READ, EV_ADD, 0, 0, 0);
EV_SET(&ev[1], ufd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
int ret = kevent(tmp_kq, ev, 2, NULL, 0, NULL);
canHandle = (0 <= ret); // if kevent(ADD) succeeds, can handle
close(tmp_kq);
}
if (1 && !canHandle) {
CFLog(kCFLogLevelWarning, CFSTR("*** CFSocketCreateWithNative(): creating CFSocket with unsupported fd type -- may or may not work"));
}
}
if (INVALID_SOCKET == ufd) {
// Historically, bad ufd was allowed, but gave an uncached and already-invalid CFSocketRef
SInt32 size = sizeof(struct __CFSocket) - sizeof(CFRuntimeBase);
CFSocketRef memory = (CFSocketRef)_CFRuntimeCreateInstance(allocator, CFSocketGetTypeID(), size, NULL);
if (NULL == memory) {
return NULL;
}
memory->_callout = callout;
memory->_state = kCFSocketStateInvalid;
return memory;
}
__block CFSocketRef sock = NULL;
dispatch_sync(__sockQueue(), ^{
for (CFIndex idx = 0, cnt = CFArrayGetCount(__CFAllSockets); idx < cnt; idx++) {
CFSocketRef s = (CFSocketRef)CFArrayGetValueAtIndex(__CFAllSockets, idx);
if (s->_shared->_socket == ufd) {
CFRetain(s);
sock = s;
return;
}
}
SInt32 size = sizeof(struct __CFSocket) - sizeof(CFRuntimeBase);
CFSocketRef memory = (CFSocketRef)_CFRuntimeCreateInstance(allocator, CFSocketGetTypeID(), size, NULL);
if (NULL == memory) {
return;
}
int socketType = 0;
if (INVALID_SOCKET != ufd) {
socklen_t typeSize = sizeof(socketType);
int ret = getsockopt(ufd, SOL_SOCKET, SO_TYPE, (void *)&socketType, (socklen_t *)&typeSize);
if (ret < 0) socketType = 0;
}
memory->_rsuspended = true;
memory->_wsuspended = true;
memory->_readable = false;
memory->_writeable = false;
memory->_isSaneFD = sane ? 1 : 0;
memory->_wantReadType = (callBackTypes & 0x3);
memory->_reenableRead = memory->_wantReadType ? true : false;
memory->_readDisabled = false;
memory->_wantWrite = (callBackTypes & kCFSocketWriteCallBack) ? true : false;
memory->_reenableWrite = false;
memory->_writeDisabled = false;
memory->_wantConnect = (callBackTypes & kCFSocketConnectCallBack) ? true : false;
memory->_connectDisabled = false;
memory->_leaveErrors = false;
memory->_closeOnInvalidate = true;
memory->_connOriented = (SOCK_STREAM == socketType || SOCK_SEQPACKET == socketType);
memory->_connected = (memory->_wantReadType == kCFSocketAcceptCallBack || !memory->_connOriented) ? true : false;
memory->_error = 0;
memory->_runLoopCounter = 0;
memory->_address = NULL;
memory->_peerAddress = NULL;
memory->_context.info = NULL;
memory->_context.retain = NULL;
memory->_context.release = NULL;
memory->_context.copyDescription = NULL;
memory->_callout = callout;
if (NULL != context) {
objc_memmove_collectable(&memory->_context, context, sizeof(CFSocketContext));
memory->_context.info = context->retain ? (void *)context->retain(context->info) : context->info;
}
struct __shared_blob *shared = malloc(sizeof(struct __shared_blob));
shared->_rdsrc = NULL;
shared->_wrsrc = NULL;
shared->_source = NULL;
shared->_socket = ufd;
shared->_closeFD = true; // copy of _closeOnInvalidate
shared->_refCnt = 1; // one for the CFSocket
memory->_shared = shared;
if (memory->_wantReadType) {
dispatch_source_t dsrc = NULL;
if (sane) {
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, ufd, 0, __sockQueue());
} else {
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, __sockQueue());
dispatch_source_set_timer(dsrc, dispatch_time(DISPATCH_TIME_NOW, 0), NSEC_PER_SEC / 2, NSEC_PER_SEC);
}
dispatch_block_t event_block = ^{
memory->_readable = true;
if (!memory->_rsuspended) {
dispatch_suspend(dsrc);
// CFLog(5, CFSTR("suspend %p due to read event block"), memory);
memory->_rsuspended = true;
}
if (shared->_source) {
CFRunLoopSourceSignal(shared->_source);
_CFRunLoopSourceWakeUpRunLoops(shared->_source);
}
};
dispatch_block_t cancel_block = ^{
shared->_rdsrc = NULL;
shared->_refCnt--;
if (0 == shared->_refCnt) {
if (shared->_closeFD) {
// thoroughly stop anything else from using the fd
(void)shutdown(shared->_socket, SHUT_RDWR);
int nullfd = open("/dev/null", O_RDONLY);
dup2(nullfd, shared->_socket);
close(nullfd);
close(shared->_socket);
}
free(shared);
}
dispatch_release(dsrc);
};
dispatch_source_set_event_handler(dsrc, event_block);
dispatch_source_set_cancel_handler(dsrc, cancel_block);
shared->_rdsrc = dsrc;
}
if (memory->_wantWrite || memory->_wantConnect) {
dispatch_source_t dsrc = NULL;
if (sane) {
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, ufd, 0, __sockQueue());
} else {
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, __sockQueue());
dispatch_source_set_timer(dsrc, dispatch_time(DISPATCH_TIME_NOW, 0), NSEC_PER_SEC / 2, NSEC_PER_SEC);
}
dispatch_block_t event_block = ^{
memory->_writeable = true;
if (!memory->_wsuspended) {
dispatch_suspend(dsrc);
// CFLog(5, CFSTR("suspend %p due to write event block"), memory);
memory->_wsuspended = true;
}
if (shared->_source) {
CFRunLoopSourceSignal(shared->_source);
_CFRunLoopSourceWakeUpRunLoops(shared->_source);
}
};
dispatch_block_t cancel_block = ^{
shared->_wrsrc = NULL;
shared->_refCnt--;
if (0 == shared->_refCnt) {
if (shared->_closeFD) {
// thoroughly stop anything else from using the fd
(void)shutdown(shared->_socket, SHUT_RDWR);
int nullfd = open("/dev/null", O_RDONLY);
dup2(nullfd, shared->_socket);
close(nullfd);
close(shared->_socket);
}
free(shared);
}
dispatch_release(dsrc);
};
dispatch_source_set_event_handler(dsrc, event_block);
dispatch_source_set_cancel_handler(dsrc, cancel_block);
shared->_wrsrc = dsrc;
}
if (shared->_rdsrc) {
shared->_refCnt++;
}
if (shared->_wrsrc) {
shared->_refCnt++;
}
memory->_state = kCFSocketStateReady;
CFArrayAppendValue(__CFAllSockets, memory);
sock = memory;
});
// CFLog(5, CFSTR("CFSocketCreateWithNative(): created socket %p with callbacks 0x%x"), sock, callBackTypes);
if (sock && !CFSocketIsValid(sock)) { // must do this outside lock to avoid deadlock
CFRelease(sock);
sock = NULL;
}
return sock;
}
CFSocketNativeHandle CFSocketGetNative(CFSocketRef sock) {
CHECK_FOR_FORK_RET(INVALID_SOCKET);
__CFGenericValidateType(sock, CFSocketGetTypeID());
return sock->_shared ? sock->_shared->_socket : INVALID_SOCKET;
}
void CFSocketGetContext(CFSocketRef sock, CFSocketContext *context) {
__CFGenericValidateType(sock, CFSocketGetTypeID());
CFAssert1(0 == context->version, __kCFLogAssertion, "%s(): context version not initialized to 0", __PRETTY_FUNCTION__);
objc_memmove_collectable(context, &sock->_context, sizeof(CFSocketContext));
}
CFDataRef CFSocketCopyAddress(CFSocketRef sock) {
CHECK_FOR_FORK_RET(NULL);
__CFGenericValidateType(sock, CFSocketGetTypeID());
__block CFDataRef result = NULL;
dispatch_sync(__sockQueue(), ^{
if (!sock->_address) {
if (!__CFSocketIsValid(sock)) return;
uint8_t name[MAX_SOCKADDR_LEN];
socklen_t namelen = sizeof(name);
int ret = getsockname(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen);
if (0 == ret && 0 < namelen) {
sock->_address = CFDataCreate(CFGetAllocator(sock), name, namelen);
}
}
result = sock->_address ? (CFDataRef)CFRetain(sock->_address) : NULL;
});
return result;
}
CFDataRef CFSocketCopyPeerAddress(CFSocketRef sock) {
CHECK_FOR_FORK_RET(NULL);
__CFGenericValidateType(sock, CFSocketGetTypeID());
__block CFDataRef result = NULL;
dispatch_sync(__sockQueue(), ^{
if (!sock->_peerAddress) {
if (!__CFSocketIsValid(sock)) return;
uint8_t name[MAX_SOCKADDR_LEN];
socklen_t namelen = sizeof(name);
int ret = getpeername(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen);
if (0 == ret && 0 < namelen) {
sock->_peerAddress = CFDataCreate(CFGetAllocator(sock), name, namelen);
}
}
result = sock->_peerAddress ? (CFDataRef)CFRetain(sock->_peerAddress) : NULL;
});
return result;
}
CFOptionFlags CFSocketGetSocketFlags(CFSocketRef sock) {
CHECK_FOR_FORK();
__CFGenericValidateType(sock, CFSocketGetTypeID());
__block CFOptionFlags flags = 0;
dispatch_sync(__sockQueue(), ^{
if (sock->_reenableRead) flags |= sock->_wantReadType; // flags are same as types here
if (sock->_reenableWrite) flags |= kCFSocketAutomaticallyReenableWriteCallBack;
if (sock->_leaveErrors) flags |= kCFSocketLeaveErrors;
if (sock->_closeOnInvalidate) flags |= kCFSocketCloseOnInvalidate;
});
return flags;
}
void CFSocketSetSocketFlags(CFSocketRef sock, CFOptionFlags flags) {
CHECK_FOR_FORK();
// CFLog(5, CFSTR("CFSocketSetSocketFlags(%p, 0x%x) starting"), sock, flags);
__CFGenericValidateType(sock, CFSocketGetTypeID());
dispatch_sync(__sockQueue(), ^{
sock->_reenableRead = (sock->_wantReadType && ((flags & 0x3) == sock->_wantReadType)) ? true : false;
sock->_reenableWrite = (sock->_wantWrite && (flags & kCFSocketAutomaticallyReenableWriteCallBack)) ? true : false;
sock->_leaveErrors = (flags & kCFSocketLeaveErrors) ? true : false;
sock->_closeOnInvalidate = (flags & kCFSocketCloseOnInvalidate) ? true : false;
if (sock->_shared) sock->_shared->_closeFD = sock->_closeOnInvalidate;
});
// CFLog(5, CFSTR("CFSocketSetSocketFlags(%p, 0x%x) done"), sock, flags);
}
void CFSocketEnableCallBacks(CFSocketRef sock, CFOptionFlags callBackTypes) {
CHECK_FOR_FORK_RET();
__CFGenericValidateType(sock, CFSocketGetTypeID());
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) starting"), sock, callBackTypes);
dispatch_sync(__sockQueue(), ^{
if (!__CFSocketIsValid(sock)) return;
if (sock->_wantReadType && (callBackTypes & 0x3) == sock->_wantReadType) {
if (sockfd_is_readable(sock->_shared->_socket)) {
sock->_readable = true;
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) socket is readable"), sock, callBackTypes);
if (!sock->_rsuspended) {
dispatch_suspend(sock->_shared->_rdsrc);
sock->_rsuspended = true;
}
// If the source exists, but is now invalid, this next stuff is relatively harmless.
if (sock->_shared->_source) {
CFRunLoopSourceSignal(sock->_shared->_source);
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source);
}
} else if (sock->_rsuspended && sock->_shared->_rdsrc) {
sock->_rsuspended = false;
dispatch_resume(sock->_shared->_rdsrc);
}
sock->_readDisabled = false;
}
if (sock->_wantWrite && (callBackTypes & kCFSocketWriteCallBack)) {
if (sockfd_is_writeable(sock->_shared->_socket)) {
sock->_writeable = true;
if (!sock->_wsuspended) {
dispatch_suspend(sock->_shared->_wrsrc);
sock->_wsuspended = true;
}
// If the source exists, but is now invalid, this next stuff is relatively harmless.
if (sock->_shared->_source) {
CFRunLoopSourceSignal(sock->_shared->_source);
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source);
}
} else if (sock->_wsuspended && sock->_shared->_wrsrc) {
sock->_wsuspended = false;
dispatch_resume(sock->_shared->_wrsrc);
}
sock->_writeDisabled = false;
}
if (sock->_wantConnect && !sock->_connected && (callBackTypes & kCFSocketConnectCallBack)) {
if (sockfd_is_writeable(sock->_shared->_socket)) {
sock->_writeable = true;
if (!sock->_wsuspended) {
dispatch_suspend(sock->_shared->_wrsrc);
sock->_wsuspended = true;
}
// If the source exists, but is now invalid, this next stuff is relatively harmless.
if (sock->_shared->_source) {
CFRunLoopSourceSignal(sock->_shared->_source);
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source);
}
} else if (sock->_wsuspended && sock->_shared->_wrsrc) {
sock->_wsuspended = false;
dispatch_resume(sock->_shared->_wrsrc);
}
sock->_connectDisabled = false;
}
});
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) done"), sock, callBackTypes);
}
void CFSocketDisableCallBacks(CFSocketRef sock, CFOptionFlags callBackTypes) {
CHECK_FOR_FORK_RET();
__CFGenericValidateType(sock, CFSocketGetTypeID());
// CFLog(5, CFSTR("CFSocketDisableCallBacks(%p, 0x%x) starting"), sock, callBackTypes);
dispatch_sync(__sockQueue(), ^{
if (!__CFSocketIsValid(sock)) return;
if (sock->_wantReadType && (callBackTypes & 0x3) == sock->_wantReadType) {
if (!sock->_rsuspended && sock->_shared->_rdsrc) {
dispatch_suspend(sock->_shared->_rdsrc);
sock->_rsuspended = true;
}
sock->_readDisabled = true;
}
if (sock->_wantWrite && (callBackTypes & kCFSocketWriteCallBack)) {
if (!sock->_wsuspended && sock->_shared->_wrsrc) {
dispatch_suspend(sock->_shared->_wrsrc);
sock->_wsuspended = true;
}
sock->_writeDisabled = true;
}
if (sock->_wantConnect && !sock->_connected && (callBackTypes & kCFSocketConnectCallBack)) {
if (!sock->_wsuspended && sock->_shared->_wrsrc) {
dispatch_suspend(sock->_shared->_wrsrc);
sock->_wsuspended = true;
}
sock->_connectDisabled = true;
}
});
// CFLog(5, CFSTR("CFSocketDisableCallBacks(%p, 0x%x) done"), sock, callBackTypes);
}
void CFSocketInvalidate(CFSocketRef sock) {
CHECK_FOR_FORK_RET();
__CFGenericValidateType(sock, CFSocketGetTypeID());
CFRetain(sock);
// CFLog(5, CFSTR("CFSocketInvalidate(%p) starting"), sock);
__block CFRunLoopSourceRef source = NULL;
__block Boolean wasReady = false;
dispatch_sync(__sockQueue(), ^{
wasReady = (sock->_state == kCFSocketStateReady);
if (wasReady) {
sock->_state = kCFSocketStateInvalidating;
OSMemoryBarrier();
for (CFIndex idx = 0, cnt = CFArrayGetCount(__CFAllSockets); idx < cnt; idx++) {
CFSocketRef s = (CFSocketRef)CFArrayGetValueAtIndex(__CFAllSockets, idx);
if (s == sock) {
CFArrayRemoveValueAtIndex(__CFAllSockets, idx);
break;
}
}
if (sock->_shared->_rdsrc) {
dispatch_source_cancel(sock->_shared->_rdsrc);
if (sock->_rsuspended) {
sock->_rsuspended = false;
dispatch_resume(sock->_shared->_rdsrc);
}
}
if (sock->_shared->_wrsrc) {
dispatch_source_cancel(sock->_shared->_wrsrc);
if (sock->_wsuspended) {
sock->_wsuspended = false;
dispatch_resume(sock->_shared->_wrsrc);
}
}
source = sock->_shared->_source;
sock->_shared->_source = NULL;
sock->_shared->_refCnt--;
if (0 == sock->_shared->_refCnt) {
if (sock->_shared->_closeFD) {
// thoroughly stop anything else from using the fd
(void)shutdown(sock->_shared->_socket, SHUT_RDWR);
int nullfd = open("/dev/null", O_RDONLY);
dup2(nullfd, sock->_shared->_socket);
close(nullfd);
close(sock->_shared->_socket);
}
free(sock->_shared);
}
sock->_shared = NULL;
}
});
if (wasReady) {
if (NULL != source) {
CFRunLoopSourceInvalidate(source);
CFRelease(source);
}
void *info = sock->_context.info;
sock->_context.info = NULL;
if (sock->_context.release) {
sock->_context.release(info);
}
sock->_state = kCFSocketStateInvalid;
OSMemoryBarrier();
}
// CFLog(5, CFSTR("CFSocketInvalidate(%p) done%s"), sock, wasReady ? " -- done on this thread" : "");
CFRelease(sock);
}
Boolean CFSocketIsValid(CFSocketRef sock) {
__CFGenericValidateType(sock, CFSocketGetTypeID());
if (!__CFSocketIsValid(sock)) return false;
struct stat statbuf;
int ret = sock->_shared ? fstat(sock->_shared->_socket, &statbuf) : -1;
if (ret < 0) {
CFSocketInvalidate(sock);
return false;
}
return true;
}
static void __CFSocketPerform(void *info) { // CFRunLoop should only call this on one thread at a time
CHECK_FOR_FORK_RET();
CFSocketRef sock = (CFSocketRef)info;
// CFLog(5, CFSTR("__CFSocketPerform(%p) starting '%@'"), sock, sock);
__block Boolean doRead = false, doWrite = false, doConnect = false, isValid = false;
__block int fd = INVALID_SOCKET;
__block SInt32 errorCode = 0;
__block int new_fd = INVALID_SOCKET;
__block CFDataRef address = NULL;
__block CFMutableDataRef data = NULL;
__block void *context_info = NULL;
__block void (*context_release)(const void *) = NULL;
dispatch_sync(__sockQueue(), ^{
isValid = __CFSocketIsValid(sock);
if (!isValid) return;
fd = sock->_shared->_socket;
doRead = sock->_readable && sock->_wantReadType && !sock->_readDisabled;
if (doRead) {
sock->_readable = false;
doRead = sockfd_is_readable(fd);
// if (!doRead) CFLog(5, CFSTR("__CFSocketPerform(%p) socket is not actually readable"), sock);
}
doWrite = sock->_writeable && sock->_wantWrite && !sock->_writeDisabled;
doConnect = sock->_writeable && sock->_wantConnect && !sock->_connectDisabled && !sock->_connected;
if (doWrite || doConnect) {
sock->_writeable = false;
if (doWrite) doWrite = sockfd_is_writeable(fd);
if (doConnect) doConnect = sockfd_is_writeable(fd);
}
if (!sock->_leaveErrors && (doWrite || doConnect)) { // not on read, for whatever reason
int errorSize = sizeof(errorCode);
int ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&errorCode, (socklen_t *)&errorSize);
if (0 != ret) errorCode = 0;
sock->_error = errorCode;
}
sock->_connected = true;
// CFLog(5, CFSTR("__CFSocketPerform(%p) doing %d %d %d"), sock, doRead, doWrite, doConnect);
if (doRead) {
switch (sock->_wantReadType) {
case kCFSocketReadCallBack:
break;
case kCFSocketAcceptCallBack: {
uint8_t name[MAX_SOCKADDR_LEN];
socklen_t namelen = sizeof(name);
new_fd = accept(fd, (struct sockaddr *)name, (socklen_t *)&namelen);
if (INVALID_SOCKET != new_fd) {
address = CFDataCreate(CFGetAllocator(sock), name, namelen);
}
break;
}
case kCFSocketDataCallBack: {
uint8_t name[MAX_SOCKADDR_LEN];
socklen_t namelen = sizeof(name);
int avail = 0;
int ret = ioctlsocket(fd, FIONREAD, &avail);
if (ret < 0 || avail < 256) avail = 256;
if ((1 << 20) < avail) avail = (1 << 20);
data = CFDataCreateMutable(CFGetAllocator(sock), 0);
CFDataSetLength(data, avail);
ssize_t len = recvfrom(fd, CFDataGetMutableBytePtr(data), avail, 0, (struct sockaddr *)name, (socklen_t *)&namelen);
CFIndex datalen = (len < 0) ? 0 : len;
CFDataSetLength(data, datalen);
if (0 < namelen) {
address = CFDataCreate(CFGetAllocator(sock), name, namelen);
} else if (sock->_connOriented) {
// cannot call CFSocketCopyPeerAddress(), or deadlock
if (!sock->_peerAddress) {
uint8_t name[MAX_SOCKADDR_LEN];
socklen_t namelen = sizeof(name);
int ret = getpeername(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen);
if (0 == ret && 0 < namelen) {
sock->_peerAddress = CFDataCreate(CFGetAllocator(sock), name, namelen);
}
}
address = sock->_peerAddress ? (CFDataRef)CFRetain(sock->_peerAddress) : NULL;
}
if (NULL == address) {
address = CFDataCreate(CFGetAllocator(sock), NULL, 0);
}
break;
}
}
}
if (sock->_reenableRead) {
// CFLog(5, CFSTR("__CFSocketPerform(%p) reenabling read %d %p"), sock, sock->_rsuspended, sock->_shared->_rdsrc);
if (sock->_rsuspended && sock->_shared->_rdsrc) {
sock->_rsuspended = false;
dispatch_resume(sock->_shared->_rdsrc);
}
}
if (sock->_reenableWrite) {
if (sock->_wsuspended && sock->_shared->_wrsrc) {
sock->_wsuspended = false;
dispatch_resume(sock->_shared->_wrsrc);
}
}
if (sock->_context.retain && (doConnect || doRead || doWrite)) {
context_info = (void *)sock->_context.retain(sock->_context.info);
context_release = sock->_context.release;
} else {
context_info = sock->_context.info;
}
});
// CFLog(5, CFSTR("__CFSocketPerform(%p) isValid:%d, doRead:%d, doWrite:%d, doConnect:%d error:%d"), sock, isValid, doRead, doWrite, doConnect, errorCode);
if (!isValid || !(doConnect || doRead || doWrite)) return;
Boolean calledOut = false;
if (doConnect) {
if (sock->_callout) sock->_callout(sock, kCFSocketConnectCallBack, NULL, (0 != errorCode) ? &errorCode : NULL, context_info);
calledOut = true;
}
if (doRead && (!calledOut || __CFSocketIsValid(sock))) {
switch (sock->_wantReadType) {
case kCFSocketReadCallBack:
if (sock->_callout) sock->_callout(sock, kCFSocketReadCallBack, NULL, NULL, context_info);
calledOut = true;
break;
case kCFSocketAcceptCallBack:
if (INVALID_SOCKET != new_fd) {
if (sock->_callout) sock->_callout(sock, kCFSocketAcceptCallBack, address, &new_fd, context_info);
calledOut = true;
}
break;
case kCFSocketDataCallBack:
if (sock->_callout) sock->_callout(sock, kCFSocketDataCallBack, address, data, context_info);
calledOut = true;
break;
}
}
if (doWrite && (!calledOut || __CFSocketIsValid(sock))) {
if (0 == errorCode) {
if (sock->_callout) sock->_callout(sock, kCFSocketWriteCallBack, NULL, NULL, context_info);
calledOut = true;
}
}
if (data && 0 == CFDataGetLength(data)) CFSocketInvalidate(sock);
if (address) CFRelease(address);
if (data) CFRelease(data);
if (context_release) {
context_release(context_info);
}
CHECK_FOR_FORK_RET();
// CFLog(5, CFSTR("__CFSocketPerform(%p) done"), sock);
}
static void __CFSocketSchedule(void *info, CFRunLoopRef rl, CFStringRef mode) {
CFSocketRef sock = (CFSocketRef)info;
int32_t newVal = OSAtomicIncrement32Barrier(&sock->_runLoopCounter);
if (1 == newVal) { // on a transition from 0->1, the old code forced all desired callbacks enabled
CFOptionFlags types = sock->_wantReadType | (sock->_wantWrite ? kCFSocketWriteCallBack : 0) | (sock->_wantConnect ? kCFSocketConnectCallBack : 0);
CFSocketEnableCallBacks(sock, types);
}
CFRunLoopWakeUp(rl);
}
static void __CFSocketCancel(void *info, CFRunLoopRef rl, CFStringRef mode) {
CFSocketRef sock = (CFSocketRef)info;
OSAtomicDecrement32Barrier(&sock->_runLoopCounter);
CFRunLoopWakeUp(rl);
}
CFRunLoopSourceRef CFSocketCreateRunLoopSource(CFAllocatorRef allocator, CFSocketRef sock, CFIndex order) {
CHECK_FOR_FORK_RET(NULL);
__CFGenericValidateType(sock, CFSocketGetTypeID());
if (!CFSocketIsValid(sock)) return NULL;
__block CFRunLoopSourceRef result = NULL;
dispatch_sync(__sockQueue(), ^{
if (!__CFSocketIsValid(sock)) return;
if (NULL != sock->_shared->_source && !CFRunLoopSourceIsValid(sock->_shared->_source)) {
CFRelease(sock->_shared->_source);
sock->_shared->_source = NULL;
}
if (NULL == sock->_shared->_source) {
CFRunLoopSourceContext context;
context.version = 0;
context.info = (void *)sock;
context.retain = (const void *(*)(const void *))CFRetain;
context.release = (void (*)(const void *))CFRelease;
context.copyDescription = (CFStringRef (*)(const void *))__CFSocketCopyDescription;
context.equal = NULL;
context.hash = NULL;
context.schedule = __CFSocketSchedule;
context.cancel = __CFSocketCancel;
context.perform = __CFSocketPerform;
sock->_shared->_source = CFRunLoopSourceCreate(allocator, order, (CFRunLoopSourceContext *)&context);
if (sock->_shared->_source) {
if (sock->_wantReadType) {
if (sockfd_is_readable(sock->_shared->_socket)) {
sock->_readable = true;
if (!sock->_rsuspended) {
dispatch_suspend(sock->_shared->_rdsrc);
sock->_rsuspended = true;
}
if (sock->_shared->_source) {
CFRunLoopSourceSignal(sock->_shared->_source);
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source);
}
} else if (sock->_rsuspended && sock->_shared->_rdsrc) {
sock->_rsuspended = false;
dispatch_resume(sock->_shared->_rdsrc);
}
}
if (sock->_wantWrite || (sock->_wantConnect && !sock->_connected)) {
if (sockfd_is_writeable(sock->_shared->_socket)) {
sock->_writeable = true;
if (!sock->_wsuspended) {
dispatch_suspend(sock->_shared->_wrsrc);
sock->_wsuspended = true;
}
if (sock->_shared->_source) {
CFRunLoopSourceSignal(sock->_shared->_source);
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source);
}
} else if (sock->_wsuspended && sock->_shared->_wrsrc) {
sock->_wsuspended = false;
dispatch_resume(sock->_shared->_wrsrc);
}
}
}
}
result = sock->_shared->_source ? (CFRunLoopSourceRef)CFRetain(sock->_shared->_source) : NULL;
});
// CFLog(5, CFSTR("CFSocketCreateRunLoopSource(%p) => %p"), sock, result);
return result;
}
void __CFSocketSetSocketReadBufferAttrs(CFSocketRef s, CFTimeInterval timeout, CFIndex length) {
}
CFIndex __CFSocketRead(CFSocketRef s, UInt8* buffer, CFIndex length, int* error) {
*error = 0;
int ret = read(CFSocketGetNative(s), buffer, length);
if (ret < 0) {
*error = errno;
}
return ret;
}
Boolean __CFSocketGetBytesAvailable(CFSocketRef s, CFIndex* ctBytesAvailable) {
int bytesAvailable;
int ret = ioctlsocket(CFSocketGetNative(s), FIONREAD, &bytesAvailable);
if (ret < 0) return false;
*ctBytesAvailable = (CFIndex)bytesAvailable;
return true;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment