Created
October 21, 2012 12:35
-
-
Save Kentzo/3926860 to your computer and use it in GitHub Desktop.
NEW_SOCKET from
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <CoreFoundation/CFSocket.h> | |
#include "CFInternal.h" | |
#include <dispatch/dispatch.h> | |
#include <dispatch/private.h> | |
#include <netinet/in.h> | |
#include <sys/sysctl.h> | |
#include <sys/socket.h> | |
#include <sys/ioctl.h> | |
#include <sys/stat.h> | |
#include <unistd.h> | |
#include <dlfcn.h> | |
#include <sys/select.h> | |
extern void _CFRunLoopSourceWakeUpRunLoops(CFRunLoopSourceRef rls); | |
#define INVALID_SOCKET (CFSocketNativeHandle)(-1) | |
#define MAX_SOCKADDR_LEN 256 | |
DISPATCH_HELPER_FUNCTIONS(sock, CFSocket) | |
static Boolean sockfd_is_readable(int fd) { | |
if (fd < 0 || 1048576 <= fd) HALT; | |
size_t sz = ((fd + CHAR_BIT) / CHAR_BIT) + 7; // generous | |
fd_set *fdset = malloc(sz); | |
int ret; | |
do { | |
memset(fdset, 0, sz); | |
FD_SET(fd, fdset); | |
struct timespec ts = {0, 1000UL}; // 1 us | |
ret = pselect(fd + 1, fdset, NULL, NULL, &ts, NULL); | |
} while (ret < 0 && (EINTR == errno || EAGAIN == errno)); | |
Boolean isSet = ((0 < ret) && FD_ISSET(fd, fdset)); | |
free(fdset); | |
return isSet; | |
} | |
static Boolean sockfd_is_writeable(int fd) { | |
if (fd < 0 || 1048576 <= fd) HALT; | |
size_t sz = ((fd + CHAR_BIT) / CHAR_BIT) + 7; // generous | |
fd_set *fdset = malloc(sz); | |
int ret; | |
do { | |
memset(fdset, 0, sz); | |
FD_SET(fd, fdset); | |
struct timespec ts = {0, 1000UL}; // 1 us | |
ret = pselect(fd + 1, NULL, fdset, NULL, &ts, NULL); | |
} while (ret < 0 && (EINTR == errno || EAGAIN == errno)); | |
Boolean isSet = ((0 < ret) && FD_ISSET(fd, fdset)); | |
free(fdset); | |
return isSet; | |
} | |
enum { | |
kCFSocketStateReady = 0, | |
kCFSocketStateInvalidating = 1, | |
kCFSocketStateInvalid = 2, | |
kCFSocketStateDeallocating = 3 | |
}; | |
struct __shared_blob { | |
dispatch_source_t _rdsrc; | |
dispatch_source_t _wrsrc; | |
CFRunLoopSourceRef _source; | |
CFSocketNativeHandle _socket; | |
uint8_t _closeFD; | |
uint8_t _refCnt; | |
}; | |
struct __CFSocket { | |
CFRuntimeBase _base; | |
struct __shared_blob *_shared; // non-NULL when valid, NULL when invalid | |
uint8_t _state:2; // mutable, not written safely | |
uint8_t _isSaneFD:1; // immutable | |
uint8_t _connOriented:1; // immutable | |
uint8_t _wantConnect:1; // immutable | |
uint8_t _wantWrite:1; // immutable | |
uint8_t _wantReadType:2; // immutable | |
uint8_t _error; | |
uint8_t _rsuspended:1; | |
uint8_t _wsuspended:1; | |
uint8_t _readable:1; | |
uint8_t _writeable:1; | |
uint8_t _unused:4; | |
uint8_t _reenableRead:1; | |
uint8_t _readDisabled:1; | |
uint8_t _reenableWrite:1; | |
uint8_t _writeDisabled:1; | |
uint8_t _connectDisabled:1; | |
uint8_t _connected:1; | |
uint8_t _leaveErrors:1; | |
uint8_t _closeOnInvalidate:1; | |
int32_t _runLoopCounter; | |
CFDataRef _address; // immutable, once created | |
CFDataRef _peerAddress; // immutable, once created | |
CFSocketCallBack _callout; // immutable | |
CFSocketContext _context; // immutable | |
}; | |
CF_INLINE Boolean __CFSocketIsValid(CFSocketRef sock) { | |
return kCFSocketStateReady == sock->_state; | |
} | |
static CFStringRef __CFSocketCopyDescription(CFTypeRef cf) { | |
CFSocketRef sock = (CFSocketRef)cf; | |
CFStringRef contextDesc = NULL; | |
if (NULL != sock->_context.info && NULL != sock->_context.copyDescription) { | |
contextDesc = sock->_context.copyDescription(sock->_context.info); | |
} | |
if (NULL == contextDesc) { | |
contextDesc = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFSocket context %p>"), sock->_context.info); | |
} | |
Dl_info info; | |
void *addr = sock->_callout; | |
const char *name = (dladdr(addr, &info) && info.dli_saddr == addr && info.dli_sname) ? info.dli_sname : "???"; | |
int avail = -1; | |
ioctlsocket(sock->_shared ? sock->_shared->_socket : -1, FIONREAD, &avail); | |
CFStringRef result = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR( | |
"<CFSocket %p [%p]>{valid = %s, socket = %d, " | |
"want connect = %s, connect disabled = %s, " | |
"want write = %s, reenable write = %s, write disabled = %s, " | |
"want read = %s, reenable read = %s, read disabled = %s, " | |
"leave errors = %s, close on invalidate = %s, connected = %s, " | |
"last error code = %d, bytes available for read = %d, " | |
"source = %p, callout = %s (%p), context = %@}"), | |
cf, CFGetAllocator(sock), __CFSocketIsValid(sock) ? "Yes" : "No", sock->_shared ? sock->_shared->_socket : -1, | |
sock->_wantConnect ? "Yes" : "No", sock->_connectDisabled ? "Yes" : "No", | |
sock->_wantWrite ? "Yes" : "No", sock->_reenableWrite ? "Yes" : "No", sock->_writeDisabled ? "Yes" : "No", | |
sock->_wantReadType ? "Yes" : "No", sock->_reenableRead ? "Yes" : "No", sock->_readDisabled? "Yes" : "No", | |
sock->_leaveErrors ? "Yes" : "No", sock->_closeOnInvalidate ? "Yes" : "No", sock->_connected ? "Yes" : "No", | |
sock->_error, avail, | |
sock->_shared ? sock->_shared->_source : NULL, name, addr, contextDesc); | |
if (NULL != contextDesc) { | |
CFRelease(contextDesc); | |
} | |
return result; | |
} | |
static void __CFSocketDeallocate(CFTypeRef cf) { | |
CHECK_FOR_FORK_RET(); | |
CFSocketRef sock = (CFSocketRef)cf; | |
// Since CFSockets are cached, we can only get here sometime after being invalidated | |
sock->_state = kCFSocketStateDeallocating; | |
if (sock->_peerAddress) { | |
CFRelease(sock->_peerAddress); | |
sock->_peerAddress = NULL; | |
} | |
if (sock->_address) { | |
CFRelease(sock->_address); | |
sock->_address = NULL; | |
} | |
} | |
static CFTypeID __kCFSocketTypeID = _kCFRuntimeNotATypeID; | |
static const CFRuntimeClass __CFSocketClass = { | |
0, | |
"CFSocket", | |
NULL, // init | |
NULL, // copy | |
__CFSocketDeallocate, | |
NULL, // equal | |
NULL, // hash | |
NULL, // | |
__CFSocketCopyDescription | |
}; | |
static CFMutableArrayRef __CFAllSockets = NULL; | |
CFTypeID CFSocketGetTypeID(void) { | |
if (_kCFRuntimeNotATypeID == __kCFSocketTypeID) { | |
__kCFSocketTypeID = _CFRuntimeRegisterClass(&__CFSocketClass); | |
__CFAllSockets = CFArrayCreateMutable(kCFAllocatorSystemDefault, 0, &kCFTypeArrayCallBacks); | |
struct rlimit lim1; | |
int ret1 = getrlimit(RLIMIT_NOFILE, &lim1); | |
int mib[] = {CTL_KERN, KERN_MAXFILESPERPROC}; | |
int maxfd = 0; | |
size_t len = sizeof(int); | |
int ret0 = sysctl(mib, 2, &maxfd, &len, NULL, 0); | |
if (0 == ret0 && 0 == ret1 && lim1.rlim_max < maxfd) maxfd = lim1.rlim_max; | |
if (0 == ret1 && lim1.rlim_cur < maxfd) { | |
struct rlimit lim2 = lim1; | |
lim2.rlim_cur += 2304; | |
if (maxfd < lim2.rlim_cur) lim2.rlim_cur = maxfd; | |
setrlimit(RLIMIT_NOFILE, &lim2); | |
// we try, but do not go to extraordinary measures | |
} | |
} | |
return __kCFSocketTypeID; | |
} | |
CFSocketRef CFSocketCreateWithNative(CFAllocatorRef allocator, CFSocketNativeHandle ufd, CFOptionFlags callBackTypes, CFSocketCallBack callout, const CFSocketContext *context) { | |
CHECK_FOR_FORK_RET(NULL); | |
CFSocketGetTypeID(); // cause initialization if necessary | |
struct stat statbuf; | |
int ret = fstat(ufd, &statbuf); | |
if (ret < 0) ufd = INVALID_SOCKET; | |
Boolean sane = false; | |
if (INVALID_SOCKET != ufd) { | |
uint32_t type = (statbuf.st_mode & S_IFMT); | |
sane = (S_IFSOCK == type) || (S_IFIFO == type) || (S_IFCHR == type); | |
if (1 && !sane) { | |
CFLog(kCFLogLevelWarning, CFSTR("*** CFSocketCreateWithNative(): creating CFSocket with silly fd type (%07o) -- may or may not work"), type); | |
} | |
} | |
if (INVALID_SOCKET != ufd) { | |
Boolean canHandle = false; | |
int tmp_kq = kqueue(); | |
if (0 <= tmp_kq) { | |
struct kevent ev[2]; | |
EV_SET(&ev[0], ufd, EVFILT_READ, EV_ADD, 0, 0, 0); | |
EV_SET(&ev[1], ufd, EVFILT_WRITE, EV_ADD, 0, 0, 0); | |
int ret = kevent(tmp_kq, ev, 2, NULL, 0, NULL); | |
canHandle = (0 <= ret); // if kevent(ADD) succeeds, can handle | |
close(tmp_kq); | |
} | |
if (1 && !canHandle) { | |
CFLog(kCFLogLevelWarning, CFSTR("*** CFSocketCreateWithNative(): creating CFSocket with unsupported fd type -- may or may not work")); | |
} | |
} | |
if (INVALID_SOCKET == ufd) { | |
// Historically, bad ufd was allowed, but gave an uncached and already-invalid CFSocketRef | |
SInt32 size = sizeof(struct __CFSocket) - sizeof(CFRuntimeBase); | |
CFSocketRef memory = (CFSocketRef)_CFRuntimeCreateInstance(allocator, CFSocketGetTypeID(), size, NULL); | |
if (NULL == memory) { | |
return NULL; | |
} | |
memory->_callout = callout; | |
memory->_state = kCFSocketStateInvalid; | |
return memory; | |
} | |
__block CFSocketRef sock = NULL; | |
dispatch_sync(__sockQueue(), ^{ | |
for (CFIndex idx = 0, cnt = CFArrayGetCount(__CFAllSockets); idx < cnt; idx++) { | |
CFSocketRef s = (CFSocketRef)CFArrayGetValueAtIndex(__CFAllSockets, idx); | |
if (s->_shared->_socket == ufd) { | |
CFRetain(s); | |
sock = s; | |
return; | |
} | |
} | |
SInt32 size = sizeof(struct __CFSocket) - sizeof(CFRuntimeBase); | |
CFSocketRef memory = (CFSocketRef)_CFRuntimeCreateInstance(allocator, CFSocketGetTypeID(), size, NULL); | |
if (NULL == memory) { | |
return; | |
} | |
int socketType = 0; | |
if (INVALID_SOCKET != ufd) { | |
socklen_t typeSize = sizeof(socketType); | |
int ret = getsockopt(ufd, SOL_SOCKET, SO_TYPE, (void *)&socketType, (socklen_t *)&typeSize); | |
if (ret < 0) socketType = 0; | |
} | |
memory->_rsuspended = true; | |
memory->_wsuspended = true; | |
memory->_readable = false; | |
memory->_writeable = false; | |
memory->_isSaneFD = sane ? 1 : 0; | |
memory->_wantReadType = (callBackTypes & 0x3); | |
memory->_reenableRead = memory->_wantReadType ? true : false; | |
memory->_readDisabled = false; | |
memory->_wantWrite = (callBackTypes & kCFSocketWriteCallBack) ? true : false; | |
memory->_reenableWrite = false; | |
memory->_writeDisabled = false; | |
memory->_wantConnect = (callBackTypes & kCFSocketConnectCallBack) ? true : false; | |
memory->_connectDisabled = false; | |
memory->_leaveErrors = false; | |
memory->_closeOnInvalidate = true; | |
memory->_connOriented = (SOCK_STREAM == socketType || SOCK_SEQPACKET == socketType); | |
memory->_connected = (memory->_wantReadType == kCFSocketAcceptCallBack || !memory->_connOriented) ? true : false; | |
memory->_error = 0; | |
memory->_runLoopCounter = 0; | |
memory->_address = NULL; | |
memory->_peerAddress = NULL; | |
memory->_context.info = NULL; | |
memory->_context.retain = NULL; | |
memory->_context.release = NULL; | |
memory->_context.copyDescription = NULL; | |
memory->_callout = callout; | |
if (NULL != context) { | |
objc_memmove_collectable(&memory->_context, context, sizeof(CFSocketContext)); | |
memory->_context.info = context->retain ? (void *)context->retain(context->info) : context->info; | |
} | |
struct __shared_blob *shared = malloc(sizeof(struct __shared_blob)); | |
shared->_rdsrc = NULL; | |
shared->_wrsrc = NULL; | |
shared->_source = NULL; | |
shared->_socket = ufd; | |
shared->_closeFD = true; // copy of _closeOnInvalidate | |
shared->_refCnt = 1; // one for the CFSocket | |
memory->_shared = shared; | |
if (memory->_wantReadType) { | |
dispatch_source_t dsrc = NULL; | |
if (sane) { | |
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, ufd, 0, __sockQueue()); | |
} else { | |
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, __sockQueue()); | |
dispatch_source_set_timer(dsrc, dispatch_time(DISPATCH_TIME_NOW, 0), NSEC_PER_SEC / 2, NSEC_PER_SEC); | |
} | |
dispatch_block_t event_block = ^{ | |
memory->_readable = true; | |
if (!memory->_rsuspended) { | |
dispatch_suspend(dsrc); | |
// CFLog(5, CFSTR("suspend %p due to read event block"), memory); | |
memory->_rsuspended = true; | |
} | |
if (shared->_source) { | |
CFRunLoopSourceSignal(shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(shared->_source); | |
} | |
}; | |
dispatch_block_t cancel_block = ^{ | |
shared->_rdsrc = NULL; | |
shared->_refCnt--; | |
if (0 == shared->_refCnt) { | |
if (shared->_closeFD) { | |
// thoroughly stop anything else from using the fd | |
(void)shutdown(shared->_socket, SHUT_RDWR); | |
int nullfd = open("/dev/null", O_RDONLY); | |
dup2(nullfd, shared->_socket); | |
close(nullfd); | |
close(shared->_socket); | |
} | |
free(shared); | |
} | |
dispatch_release(dsrc); | |
}; | |
dispatch_source_set_event_handler(dsrc, event_block); | |
dispatch_source_set_cancel_handler(dsrc, cancel_block); | |
shared->_rdsrc = dsrc; | |
} | |
if (memory->_wantWrite || memory->_wantConnect) { | |
dispatch_source_t dsrc = NULL; | |
if (sane) { | |
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, ufd, 0, __sockQueue()); | |
} else { | |
dsrc = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, __sockQueue()); | |
dispatch_source_set_timer(dsrc, dispatch_time(DISPATCH_TIME_NOW, 0), NSEC_PER_SEC / 2, NSEC_PER_SEC); | |
} | |
dispatch_block_t event_block = ^{ | |
memory->_writeable = true; | |
if (!memory->_wsuspended) { | |
dispatch_suspend(dsrc); | |
// CFLog(5, CFSTR("suspend %p due to write event block"), memory); | |
memory->_wsuspended = true; | |
} | |
if (shared->_source) { | |
CFRunLoopSourceSignal(shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(shared->_source); | |
} | |
}; | |
dispatch_block_t cancel_block = ^{ | |
shared->_wrsrc = NULL; | |
shared->_refCnt--; | |
if (0 == shared->_refCnt) { | |
if (shared->_closeFD) { | |
// thoroughly stop anything else from using the fd | |
(void)shutdown(shared->_socket, SHUT_RDWR); | |
int nullfd = open("/dev/null", O_RDONLY); | |
dup2(nullfd, shared->_socket); | |
close(nullfd); | |
close(shared->_socket); | |
} | |
free(shared); | |
} | |
dispatch_release(dsrc); | |
}; | |
dispatch_source_set_event_handler(dsrc, event_block); | |
dispatch_source_set_cancel_handler(dsrc, cancel_block); | |
shared->_wrsrc = dsrc; | |
} | |
if (shared->_rdsrc) { | |
shared->_refCnt++; | |
} | |
if (shared->_wrsrc) { | |
shared->_refCnt++; | |
} | |
memory->_state = kCFSocketStateReady; | |
CFArrayAppendValue(__CFAllSockets, memory); | |
sock = memory; | |
}); | |
// CFLog(5, CFSTR("CFSocketCreateWithNative(): created socket %p with callbacks 0x%x"), sock, callBackTypes); | |
if (sock && !CFSocketIsValid(sock)) { // must do this outside lock to avoid deadlock | |
CFRelease(sock); | |
sock = NULL; | |
} | |
return sock; | |
} | |
CFSocketNativeHandle CFSocketGetNative(CFSocketRef sock) { | |
CHECK_FOR_FORK_RET(INVALID_SOCKET); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
return sock->_shared ? sock->_shared->_socket : INVALID_SOCKET; | |
} | |
void CFSocketGetContext(CFSocketRef sock, CFSocketContext *context) { | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
CFAssert1(0 == context->version, __kCFLogAssertion, "%s(): context version not initialized to 0", __PRETTY_FUNCTION__); | |
objc_memmove_collectable(context, &sock->_context, sizeof(CFSocketContext)); | |
} | |
CFDataRef CFSocketCopyAddress(CFSocketRef sock) { | |
CHECK_FOR_FORK_RET(NULL); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
__block CFDataRef result = NULL; | |
dispatch_sync(__sockQueue(), ^{ | |
if (!sock->_address) { | |
if (!__CFSocketIsValid(sock)) return; | |
uint8_t name[MAX_SOCKADDR_LEN]; | |
socklen_t namelen = sizeof(name); | |
int ret = getsockname(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen); | |
if (0 == ret && 0 < namelen) { | |
sock->_address = CFDataCreate(CFGetAllocator(sock), name, namelen); | |
} | |
} | |
result = sock->_address ? (CFDataRef)CFRetain(sock->_address) : NULL; | |
}); | |
return result; | |
} | |
CFDataRef CFSocketCopyPeerAddress(CFSocketRef sock) { | |
CHECK_FOR_FORK_RET(NULL); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
__block CFDataRef result = NULL; | |
dispatch_sync(__sockQueue(), ^{ | |
if (!sock->_peerAddress) { | |
if (!__CFSocketIsValid(sock)) return; | |
uint8_t name[MAX_SOCKADDR_LEN]; | |
socklen_t namelen = sizeof(name); | |
int ret = getpeername(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen); | |
if (0 == ret && 0 < namelen) { | |
sock->_peerAddress = CFDataCreate(CFGetAllocator(sock), name, namelen); | |
} | |
} | |
result = sock->_peerAddress ? (CFDataRef)CFRetain(sock->_peerAddress) : NULL; | |
}); | |
return result; | |
} | |
CFOptionFlags CFSocketGetSocketFlags(CFSocketRef sock) { | |
CHECK_FOR_FORK(); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
__block CFOptionFlags flags = 0; | |
dispatch_sync(__sockQueue(), ^{ | |
if (sock->_reenableRead) flags |= sock->_wantReadType; // flags are same as types here | |
if (sock->_reenableWrite) flags |= kCFSocketAutomaticallyReenableWriteCallBack; | |
if (sock->_leaveErrors) flags |= kCFSocketLeaveErrors; | |
if (sock->_closeOnInvalidate) flags |= kCFSocketCloseOnInvalidate; | |
}); | |
return flags; | |
} | |
void CFSocketSetSocketFlags(CFSocketRef sock, CFOptionFlags flags) { | |
CHECK_FOR_FORK(); | |
// CFLog(5, CFSTR("CFSocketSetSocketFlags(%p, 0x%x) starting"), sock, flags); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
dispatch_sync(__sockQueue(), ^{ | |
sock->_reenableRead = (sock->_wantReadType && ((flags & 0x3) == sock->_wantReadType)) ? true : false; | |
sock->_reenableWrite = (sock->_wantWrite && (flags & kCFSocketAutomaticallyReenableWriteCallBack)) ? true : false; | |
sock->_leaveErrors = (flags & kCFSocketLeaveErrors) ? true : false; | |
sock->_closeOnInvalidate = (flags & kCFSocketCloseOnInvalidate) ? true : false; | |
if (sock->_shared) sock->_shared->_closeFD = sock->_closeOnInvalidate; | |
}); | |
// CFLog(5, CFSTR("CFSocketSetSocketFlags(%p, 0x%x) done"), sock, flags); | |
} | |
void CFSocketEnableCallBacks(CFSocketRef sock, CFOptionFlags callBackTypes) { | |
CHECK_FOR_FORK_RET(); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) starting"), sock, callBackTypes); | |
dispatch_sync(__sockQueue(), ^{ | |
if (!__CFSocketIsValid(sock)) return; | |
if (sock->_wantReadType && (callBackTypes & 0x3) == sock->_wantReadType) { | |
if (sockfd_is_readable(sock->_shared->_socket)) { | |
sock->_readable = true; | |
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) socket is readable"), sock, callBackTypes); | |
if (!sock->_rsuspended) { | |
dispatch_suspend(sock->_shared->_rdsrc); | |
sock->_rsuspended = true; | |
} | |
// If the source exists, but is now invalid, this next stuff is relatively harmless. | |
if (sock->_shared->_source) { | |
CFRunLoopSourceSignal(sock->_shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source); | |
} | |
} else if (sock->_rsuspended && sock->_shared->_rdsrc) { | |
sock->_rsuspended = false; | |
dispatch_resume(sock->_shared->_rdsrc); | |
} | |
sock->_readDisabled = false; | |
} | |
if (sock->_wantWrite && (callBackTypes & kCFSocketWriteCallBack)) { | |
if (sockfd_is_writeable(sock->_shared->_socket)) { | |
sock->_writeable = true; | |
if (!sock->_wsuspended) { | |
dispatch_suspend(sock->_shared->_wrsrc); | |
sock->_wsuspended = true; | |
} | |
// If the source exists, but is now invalid, this next stuff is relatively harmless. | |
if (sock->_shared->_source) { | |
CFRunLoopSourceSignal(sock->_shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source); | |
} | |
} else if (sock->_wsuspended && sock->_shared->_wrsrc) { | |
sock->_wsuspended = false; | |
dispatch_resume(sock->_shared->_wrsrc); | |
} | |
sock->_writeDisabled = false; | |
} | |
if (sock->_wantConnect && !sock->_connected && (callBackTypes & kCFSocketConnectCallBack)) { | |
if (sockfd_is_writeable(sock->_shared->_socket)) { | |
sock->_writeable = true; | |
if (!sock->_wsuspended) { | |
dispatch_suspend(sock->_shared->_wrsrc); | |
sock->_wsuspended = true; | |
} | |
// If the source exists, but is now invalid, this next stuff is relatively harmless. | |
if (sock->_shared->_source) { | |
CFRunLoopSourceSignal(sock->_shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source); | |
} | |
} else if (sock->_wsuspended && sock->_shared->_wrsrc) { | |
sock->_wsuspended = false; | |
dispatch_resume(sock->_shared->_wrsrc); | |
} | |
sock->_connectDisabled = false; | |
} | |
}); | |
// CFLog(5, CFSTR("CFSocketEnableCallBacks(%p, 0x%x) done"), sock, callBackTypes); | |
} | |
void CFSocketDisableCallBacks(CFSocketRef sock, CFOptionFlags callBackTypes) { | |
CHECK_FOR_FORK_RET(); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
// CFLog(5, CFSTR("CFSocketDisableCallBacks(%p, 0x%x) starting"), sock, callBackTypes); | |
dispatch_sync(__sockQueue(), ^{ | |
if (!__CFSocketIsValid(sock)) return; | |
if (sock->_wantReadType && (callBackTypes & 0x3) == sock->_wantReadType) { | |
if (!sock->_rsuspended && sock->_shared->_rdsrc) { | |
dispatch_suspend(sock->_shared->_rdsrc); | |
sock->_rsuspended = true; | |
} | |
sock->_readDisabled = true; | |
} | |
if (sock->_wantWrite && (callBackTypes & kCFSocketWriteCallBack)) { | |
if (!sock->_wsuspended && sock->_shared->_wrsrc) { | |
dispatch_suspend(sock->_shared->_wrsrc); | |
sock->_wsuspended = true; | |
} | |
sock->_writeDisabled = true; | |
} | |
if (sock->_wantConnect && !sock->_connected && (callBackTypes & kCFSocketConnectCallBack)) { | |
if (!sock->_wsuspended && sock->_shared->_wrsrc) { | |
dispatch_suspend(sock->_shared->_wrsrc); | |
sock->_wsuspended = true; | |
} | |
sock->_connectDisabled = true; | |
} | |
}); | |
// CFLog(5, CFSTR("CFSocketDisableCallBacks(%p, 0x%x) done"), sock, callBackTypes); | |
} | |
void CFSocketInvalidate(CFSocketRef sock) { | |
CHECK_FOR_FORK_RET(); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
CFRetain(sock); | |
// CFLog(5, CFSTR("CFSocketInvalidate(%p) starting"), sock); | |
__block CFRunLoopSourceRef source = NULL; | |
__block Boolean wasReady = false; | |
dispatch_sync(__sockQueue(), ^{ | |
wasReady = (sock->_state == kCFSocketStateReady); | |
if (wasReady) { | |
sock->_state = kCFSocketStateInvalidating; | |
OSMemoryBarrier(); | |
for (CFIndex idx = 0, cnt = CFArrayGetCount(__CFAllSockets); idx < cnt; idx++) { | |
CFSocketRef s = (CFSocketRef)CFArrayGetValueAtIndex(__CFAllSockets, idx); | |
if (s == sock) { | |
CFArrayRemoveValueAtIndex(__CFAllSockets, idx); | |
break; | |
} | |
} | |
if (sock->_shared->_rdsrc) { | |
dispatch_source_cancel(sock->_shared->_rdsrc); | |
if (sock->_rsuspended) { | |
sock->_rsuspended = false; | |
dispatch_resume(sock->_shared->_rdsrc); | |
} | |
} | |
if (sock->_shared->_wrsrc) { | |
dispatch_source_cancel(sock->_shared->_wrsrc); | |
if (sock->_wsuspended) { | |
sock->_wsuspended = false; | |
dispatch_resume(sock->_shared->_wrsrc); | |
} | |
} | |
source = sock->_shared->_source; | |
sock->_shared->_source = NULL; | |
sock->_shared->_refCnt--; | |
if (0 == sock->_shared->_refCnt) { | |
if (sock->_shared->_closeFD) { | |
// thoroughly stop anything else from using the fd | |
(void)shutdown(sock->_shared->_socket, SHUT_RDWR); | |
int nullfd = open("/dev/null", O_RDONLY); | |
dup2(nullfd, sock->_shared->_socket); | |
close(nullfd); | |
close(sock->_shared->_socket); | |
} | |
free(sock->_shared); | |
} | |
sock->_shared = NULL; | |
} | |
}); | |
if (wasReady) { | |
if (NULL != source) { | |
CFRunLoopSourceInvalidate(source); | |
CFRelease(source); | |
} | |
void *info = sock->_context.info; | |
sock->_context.info = NULL; | |
if (sock->_context.release) { | |
sock->_context.release(info); | |
} | |
sock->_state = kCFSocketStateInvalid; | |
OSMemoryBarrier(); | |
} | |
// CFLog(5, CFSTR("CFSocketInvalidate(%p) done%s"), sock, wasReady ? " -- done on this thread" : ""); | |
CFRelease(sock); | |
} | |
Boolean CFSocketIsValid(CFSocketRef sock) { | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
if (!__CFSocketIsValid(sock)) return false; | |
struct stat statbuf; | |
int ret = sock->_shared ? fstat(sock->_shared->_socket, &statbuf) : -1; | |
if (ret < 0) { | |
CFSocketInvalidate(sock); | |
return false; | |
} | |
return true; | |
} | |
static void __CFSocketPerform(void *info) { // CFRunLoop should only call this on one thread at a time | |
CHECK_FOR_FORK_RET(); | |
CFSocketRef sock = (CFSocketRef)info; | |
// CFLog(5, CFSTR("__CFSocketPerform(%p) starting '%@'"), sock, sock); | |
__block Boolean doRead = false, doWrite = false, doConnect = false, isValid = false; | |
__block int fd = INVALID_SOCKET; | |
__block SInt32 errorCode = 0; | |
__block int new_fd = INVALID_SOCKET; | |
__block CFDataRef address = NULL; | |
__block CFMutableDataRef data = NULL; | |
__block void *context_info = NULL; | |
__block void (*context_release)(const void *) = NULL; | |
dispatch_sync(__sockQueue(), ^{ | |
isValid = __CFSocketIsValid(sock); | |
if (!isValid) return; | |
fd = sock->_shared->_socket; | |
doRead = sock->_readable && sock->_wantReadType && !sock->_readDisabled; | |
if (doRead) { | |
sock->_readable = false; | |
doRead = sockfd_is_readable(fd); | |
// if (!doRead) CFLog(5, CFSTR("__CFSocketPerform(%p) socket is not actually readable"), sock); | |
} | |
doWrite = sock->_writeable && sock->_wantWrite && !sock->_writeDisabled; | |
doConnect = sock->_writeable && sock->_wantConnect && !sock->_connectDisabled && !sock->_connected; | |
if (doWrite || doConnect) { | |
sock->_writeable = false; | |
if (doWrite) doWrite = sockfd_is_writeable(fd); | |
if (doConnect) doConnect = sockfd_is_writeable(fd); | |
} | |
if (!sock->_leaveErrors && (doWrite || doConnect)) { // not on read, for whatever reason | |
int errorSize = sizeof(errorCode); | |
int ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&errorCode, (socklen_t *)&errorSize); | |
if (0 != ret) errorCode = 0; | |
sock->_error = errorCode; | |
} | |
sock->_connected = true; | |
// CFLog(5, CFSTR("__CFSocketPerform(%p) doing %d %d %d"), sock, doRead, doWrite, doConnect); | |
if (doRead) { | |
switch (sock->_wantReadType) { | |
case kCFSocketReadCallBack: | |
break; | |
case kCFSocketAcceptCallBack: { | |
uint8_t name[MAX_SOCKADDR_LEN]; | |
socklen_t namelen = sizeof(name); | |
new_fd = accept(fd, (struct sockaddr *)name, (socklen_t *)&namelen); | |
if (INVALID_SOCKET != new_fd) { | |
address = CFDataCreate(CFGetAllocator(sock), name, namelen); | |
} | |
break; | |
} | |
case kCFSocketDataCallBack: { | |
uint8_t name[MAX_SOCKADDR_LEN]; | |
socklen_t namelen = sizeof(name); | |
int avail = 0; | |
int ret = ioctlsocket(fd, FIONREAD, &avail); | |
if (ret < 0 || avail < 256) avail = 256; | |
if ((1 << 20) < avail) avail = (1 << 20); | |
data = CFDataCreateMutable(CFGetAllocator(sock), 0); | |
CFDataSetLength(data, avail); | |
ssize_t len = recvfrom(fd, CFDataGetMutableBytePtr(data), avail, 0, (struct sockaddr *)name, (socklen_t *)&namelen); | |
CFIndex datalen = (len < 0) ? 0 : len; | |
CFDataSetLength(data, datalen); | |
if (0 < namelen) { | |
address = CFDataCreate(CFGetAllocator(sock), name, namelen); | |
} else if (sock->_connOriented) { | |
// cannot call CFSocketCopyPeerAddress(), or deadlock | |
if (!sock->_peerAddress) { | |
uint8_t name[MAX_SOCKADDR_LEN]; | |
socklen_t namelen = sizeof(name); | |
int ret = getpeername(sock->_shared->_socket, (struct sockaddr *)name, (socklen_t *)&namelen); | |
if (0 == ret && 0 < namelen) { | |
sock->_peerAddress = CFDataCreate(CFGetAllocator(sock), name, namelen); | |
} | |
} | |
address = sock->_peerAddress ? (CFDataRef)CFRetain(sock->_peerAddress) : NULL; | |
} | |
if (NULL == address) { | |
address = CFDataCreate(CFGetAllocator(sock), NULL, 0); | |
} | |
break; | |
} | |
} | |
} | |
if (sock->_reenableRead) { | |
// CFLog(5, CFSTR("__CFSocketPerform(%p) reenabling read %d %p"), sock, sock->_rsuspended, sock->_shared->_rdsrc); | |
if (sock->_rsuspended && sock->_shared->_rdsrc) { | |
sock->_rsuspended = false; | |
dispatch_resume(sock->_shared->_rdsrc); | |
} | |
} | |
if (sock->_reenableWrite) { | |
if (sock->_wsuspended && sock->_shared->_wrsrc) { | |
sock->_wsuspended = false; | |
dispatch_resume(sock->_shared->_wrsrc); | |
} | |
} | |
if (sock->_context.retain && (doConnect || doRead || doWrite)) { | |
context_info = (void *)sock->_context.retain(sock->_context.info); | |
context_release = sock->_context.release; | |
} else { | |
context_info = sock->_context.info; | |
} | |
}); | |
// CFLog(5, CFSTR("__CFSocketPerform(%p) isValid:%d, doRead:%d, doWrite:%d, doConnect:%d error:%d"), sock, isValid, doRead, doWrite, doConnect, errorCode); | |
if (!isValid || !(doConnect || doRead || doWrite)) return; | |
Boolean calledOut = false; | |
if (doConnect) { | |
if (sock->_callout) sock->_callout(sock, kCFSocketConnectCallBack, NULL, (0 != errorCode) ? &errorCode : NULL, context_info); | |
calledOut = true; | |
} | |
if (doRead && (!calledOut || __CFSocketIsValid(sock))) { | |
switch (sock->_wantReadType) { | |
case kCFSocketReadCallBack: | |
if (sock->_callout) sock->_callout(sock, kCFSocketReadCallBack, NULL, NULL, context_info); | |
calledOut = true; | |
break; | |
case kCFSocketAcceptCallBack: | |
if (INVALID_SOCKET != new_fd) { | |
if (sock->_callout) sock->_callout(sock, kCFSocketAcceptCallBack, address, &new_fd, context_info); | |
calledOut = true; | |
} | |
break; | |
case kCFSocketDataCallBack: | |
if (sock->_callout) sock->_callout(sock, kCFSocketDataCallBack, address, data, context_info); | |
calledOut = true; | |
break; | |
} | |
} | |
if (doWrite && (!calledOut || __CFSocketIsValid(sock))) { | |
if (0 == errorCode) { | |
if (sock->_callout) sock->_callout(sock, kCFSocketWriteCallBack, NULL, NULL, context_info); | |
calledOut = true; | |
} | |
} | |
if (data && 0 == CFDataGetLength(data)) CFSocketInvalidate(sock); | |
if (address) CFRelease(address); | |
if (data) CFRelease(data); | |
if (context_release) { | |
context_release(context_info); | |
} | |
CHECK_FOR_FORK_RET(); | |
// CFLog(5, CFSTR("__CFSocketPerform(%p) done"), sock); | |
} | |
static void __CFSocketSchedule(void *info, CFRunLoopRef rl, CFStringRef mode) { | |
CFSocketRef sock = (CFSocketRef)info; | |
int32_t newVal = OSAtomicIncrement32Barrier(&sock->_runLoopCounter); | |
if (1 == newVal) { // on a transition from 0->1, the old code forced all desired callbacks enabled | |
CFOptionFlags types = sock->_wantReadType | (sock->_wantWrite ? kCFSocketWriteCallBack : 0) | (sock->_wantConnect ? kCFSocketConnectCallBack : 0); | |
CFSocketEnableCallBacks(sock, types); | |
} | |
CFRunLoopWakeUp(rl); | |
} | |
static void __CFSocketCancel(void *info, CFRunLoopRef rl, CFStringRef mode) { | |
CFSocketRef sock = (CFSocketRef)info; | |
OSAtomicDecrement32Barrier(&sock->_runLoopCounter); | |
CFRunLoopWakeUp(rl); | |
} | |
CFRunLoopSourceRef CFSocketCreateRunLoopSource(CFAllocatorRef allocator, CFSocketRef sock, CFIndex order) { | |
CHECK_FOR_FORK_RET(NULL); | |
__CFGenericValidateType(sock, CFSocketGetTypeID()); | |
if (!CFSocketIsValid(sock)) return NULL; | |
__block CFRunLoopSourceRef result = NULL; | |
dispatch_sync(__sockQueue(), ^{ | |
if (!__CFSocketIsValid(sock)) return; | |
if (NULL != sock->_shared->_source && !CFRunLoopSourceIsValid(sock->_shared->_source)) { | |
CFRelease(sock->_shared->_source); | |
sock->_shared->_source = NULL; | |
} | |
if (NULL == sock->_shared->_source) { | |
CFRunLoopSourceContext context; | |
context.version = 0; | |
context.info = (void *)sock; | |
context.retain = (const void *(*)(const void *))CFRetain; | |
context.release = (void (*)(const void *))CFRelease; | |
context.copyDescription = (CFStringRef (*)(const void *))__CFSocketCopyDescription; | |
context.equal = NULL; | |
context.hash = NULL; | |
context.schedule = __CFSocketSchedule; | |
context.cancel = __CFSocketCancel; | |
context.perform = __CFSocketPerform; | |
sock->_shared->_source = CFRunLoopSourceCreate(allocator, order, (CFRunLoopSourceContext *)&context); | |
if (sock->_shared->_source) { | |
if (sock->_wantReadType) { | |
if (sockfd_is_readable(sock->_shared->_socket)) { | |
sock->_readable = true; | |
if (!sock->_rsuspended) { | |
dispatch_suspend(sock->_shared->_rdsrc); | |
sock->_rsuspended = true; | |
} | |
if (sock->_shared->_source) { | |
CFRunLoopSourceSignal(sock->_shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source); | |
} | |
} else if (sock->_rsuspended && sock->_shared->_rdsrc) { | |
sock->_rsuspended = false; | |
dispatch_resume(sock->_shared->_rdsrc); | |
} | |
} | |
if (sock->_wantWrite || (sock->_wantConnect && !sock->_connected)) { | |
if (sockfd_is_writeable(sock->_shared->_socket)) { | |
sock->_writeable = true; | |
if (!sock->_wsuspended) { | |
dispatch_suspend(sock->_shared->_wrsrc); | |
sock->_wsuspended = true; | |
} | |
if (sock->_shared->_source) { | |
CFRunLoopSourceSignal(sock->_shared->_source); | |
_CFRunLoopSourceWakeUpRunLoops(sock->_shared->_source); | |
} | |
} else if (sock->_wsuspended && sock->_shared->_wrsrc) { | |
sock->_wsuspended = false; | |
dispatch_resume(sock->_shared->_wrsrc); | |
} | |
} | |
} | |
} | |
result = sock->_shared->_source ? (CFRunLoopSourceRef)CFRetain(sock->_shared->_source) : NULL; | |
}); | |
// CFLog(5, CFSTR("CFSocketCreateRunLoopSource(%p) => %p"), sock, result); | |
return result; | |
} | |
void __CFSocketSetSocketReadBufferAttrs(CFSocketRef s, CFTimeInterval timeout, CFIndex length) { | |
} | |
CFIndex __CFSocketRead(CFSocketRef s, UInt8* buffer, CFIndex length, int* error) { | |
*error = 0; | |
int ret = read(CFSocketGetNative(s), buffer, length); | |
if (ret < 0) { | |
*error = errno; | |
} | |
return ret; | |
} | |
Boolean __CFSocketGetBytesAvailable(CFSocketRef s, CFIndex* ctBytesAvailable) { | |
int bytesAvailable; | |
int ret = ioctlsocket(CFSocketGetNative(s), FIONREAD, &bytesAvailable); | |
if (ret < 0) return false; | |
*ctBytesAvailable = (CFIndex)bytesAvailable; | |
return true; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment