Skip to content

Instantly share code, notes, and snippets.

@kostja
Created October 1, 2015 21:14
Show Gist options
  • Save kostja/3beab3ecd488a38efbac to your computer and use it in GitHub Desktop.
Save kostja/3beab3ecd488a38efbac to your computer and use it in GitHub Desktop.
#ifndef TARANTOOL_IPC_H_INCLUDED
#define TARANTOOL_IPC_H_INCLUDED
/*
* Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <stdbool.h>
#include <tarantool_ev.h>
#include "salad/rlist.h"
/**
* @brief CHANNELS
*/
/**
* For messages which need a resource cleanup.
*/
struct ipc_msg {
void (*destroy)(struct ipc_msg *msg);
};
/**
* Channel - fiber communication media.
* Channel API doesn't throw.
*/
struct ipc_channel {
/**
* Readers blocked waiting for messages while
* the channel buffers is empty and/or there
* are no writers.
*/
struct rlist readers;
/**
* Writers blocked waiting for empty space while
* the channel buffer is full.
*/
struct rlist writers;
/** Channel buffer size, if the channel is buffered. */
uint32_t size;
/** Ring buffer read position. */
uint32_t beg;
/** The number of messages in the buffer. */
uint32_t count;
/* True if the channel is closed. */
bool closed;
/** Channel buffer, if any. */
struct ipc_msg **buf;
};
static inline size_t
ipc_channel_memsize(uint32_t size)
{
return sizeof(struct ipc_channel) + sizeof(ipc_msg *) * size;
}
/**
* Initialize a channel (the memory should have
* been correctly allocated for the channel).
*/
void
ipc_channel_create(struct ipc_channel *ch, uint32_t size);
/** Destroy a channel. Does not free allocated memory. */
void
ipc_channel_destroy(struct ipc_channel *ch);
/**
* Allocate and construct a channel.
*
* @param size of the channel buffer
* @return new channel
* @code
* struct ipc_channel *ch = ipc_channel_new(10);
* @endcode
*/
struct ipc_channel *
ipc_channel_new(uint32_t size);
/**
* Destroy and free an IPC channel.
*
* @param ch channel
*/
void
ipc_channel_delete(struct ipc_channel *ch);
/**
* Check if the channel buffer is empty.
*
* @param channel
*
* @retval true channel buffer is empty
* (always true for unbuffered
* channels)
* @retval false otherwise
*
* @code
* if (!ipc_channel_is_empty(ch))
* ipc_channel_get(ch, ...);
* @endcode
*/
static inline bool
ipc_channel_is_empty(struct ipc_channel *ch)
{
return ch->count == 0;
}
/**
* Check if the channel buffer is full.
*
* @param channel
*
* @return true if the channel buffer is full
* (always true for unbuffered channels)
*
* @return false otherwise
* @code
* if (!ipc_channel_is_full(ch))
* ipc_channel_put(ch, "message");
* @endcode
*/
static inline bool
ipc_channel_is_full(struct ipc_channel *ch)
{
return ch->count >= ch->size;
}
/**
* Put a message into a channel.
* This is for cases when messages need to have
* a custom destructor.
*/
int
ipc_channel_put_msg_timeout(struct ipc_channel *ch,
struct ipc_msg *msg,
ev_tstamp timeout);
/**
* Send a message over a channel or timeout.
*
* @param channel
* @param msg
* @param timeout
* @return 0 success
* @return -1, errno=ETIMEDOUT if timeout exceeded,
* errno=ECANCEL if the fiber is cancelled
* errno=EBADF if the channel is closed
* while waiting on it.
*
*/
int
ipc_channel_put_timeout(struct ipc_channel *ch,
void *data,
ev_tstamp timeout);
/**
* Send a message over a channel.
*
* Yields current fiber if the channel is full.
* The message does not require a custom
* destructor.
*
* @param channel
* @param data
*
* @code
* ipc_channel_put(ch, "message");
* @endcode
* @return -1 if the channel is closed
*/
static inline int
ipc_channel_put(struct ipc_channel *ch, void *data)
{
return ipc_channel_put_timeout(ch, msg, TIMEOUT_INFINITY);
}
/**
* Get a message from the channel, or time out.
* The caller is responsible for message destruction.
*/
int
ipc_channel_get_msg_timeout(struct ipc_channel *ch,
struct ipc_msg **msg,
ev_tstamp timeout);
/**
* Get data from a channel with a timeout
*
* @param channel
* @param timeout
*
* @return 0 on success, -1 on error (timeout, channel is
* closed)
* @code
* do {
* struct ipc_msg *msg;
* int rc = ipc_channel_get_timeout(ch, 0.5, );
* printf("message: %p\n", msg);
* } while (msg);
* @endcode
*/
int
ipc_channel_get_timeout(struct ipc_channel *ch,
void **data,
ev_tstamp timeout);
/**
* Fetch a message from the channel. Yields current fiber if the
* channel is empty.
*
* @param channel
* @return 0 on success, -1 on error
*/
static inline int
ipc_channel_get(struct ipc_channel *ch, void **data)
{
return ipc_channel_get_timeout(ch, data, TIMEOUT_INFINITY);
}
/**
* Check if the channel has reader fibers that wait
* for new messages.
*/
static inline bool
ipc_channel_has_readers(struct ipc_channel *ch)
{
return !rlist_empty(&ch->readers);
}
/**
* Check if the channel has writer fibers that wait
* for readers.
*/
static inline bool
ipc_channel_has_writers(struct ipc_channel *ch)
{
return !rlist_empty(&ch->writers);
}
/** Channel buffer size. */
static inline uint32_t
ipc_channel_size(struct ipc_channel *ch)
{
return ch->size;
}
/**
* The number of messages in the buffer.
* There may be more messages outstanding
* if the buffer is full.
*/
static inline uint32_t
ipc_channel_count(struct ipc_channel *ch)
{
return ch->count;
}
/**
* Close the channel. Discards all messages
* and wakes up all readers and writers.
*/
void
ipc_channel_close(struct ipc_channel *ch);
/**
* True if the channel is closed for both for reading
* and writing.
*/
static inline bool
ipc_channel_is_closed(struct ipc_channel *ch)
{
return ch->closed;
}
#endif /* TARANTOOL_IPC_H_INCLUDED */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment