Skip to content

Instantly share code, notes, and snippets.

@tfoldi
Created September 15, 2010 12:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tfoldi/580677 to your computer and use it in GitHub Desktop.
Save tfoldi/580677 to your computer and use it in GitHub Desktop.
Greenplum MapReduce function written in C language
wordcount:
cc -g -ggdb -shared -o wordcount.so wordcount.c -I`pg_config --includedir-server` -I`pg_config --pkgincludedir`/internal -I`pg_config --pkgincludedir` -fPIC
#include "postgres.h"
#include "funcapi.h"
#include "executor/executor.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(wordcount);
Datum wordcount(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
int call_cntr;
int max_calls;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) {
MemoryContext oldcontext;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* total number of desired executions, unlimited in our case */
funcctx->max_calls = -1;
/* string pointer to know where to continue */
funcctx->user_fctx = palloc(sizeof(size_t));
*(size_t *) funcctx->user_fctx = 0;
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) !=
TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
/*
* generate attribute metadata needed later to produce tuples from raw
* C strings
*/
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
attinmeta = funcctx->attinmeta;
if (*(size_t *) funcctx->user_fctx != -1 )
{
char **values;
char *tok;
char *value = palloc(VARSIZE(PG_GETARG_TEXT_P(0)) - VARHDRSZ+ sizeof(char));
HeapTuple tuple;
Datum result;
snprintf(value, VARSIZE(PG_GETARG_TEXT_P(0)) - VARHDRSZ,
"%s", pstrdup(PG_GETARG_TEXT_P(0)->vl_dat));
/*
* Prepare a values array for building the returned tuple.
* This should be an array of C strings which will
* be processed later by the type input functions.
*/
values = (char **) palloc(2 * sizeof(char *));
values[0] = (char *) palloc(VARSIZE(PG_GETARG_TEXT_P(0)) * sizeof(char));
values[1] = (char *) palloc(4 * sizeof(char));
tok = strtok(value + *(size_t *) funcctx->user_fctx, " \r\n\t");
if (tok != NULL) {
snprintf(values[0], 64, "%s", tok);
snprintf(values[1], 2, "%d", 1);
/* build a tuple */
tuple = BuildTupleFromCStrings(attinmeta, values);
/* make the tuple into a datum */
result = HeapTupleGetDatum(tuple);
tok = strtok(NULL, " \r\n\t");
if (!tok)
*(size_t *) funcctx->user_fctx = -1;
else
*(size_t *) funcctx->user_fctx = tok - value;
/* clean up (this is not really necessary) */
pfree(values[0]);
pfree(values[1]);
pfree(values);
pfree(value);
SRF_RETURN_NEXT(funcctx, result);
} else {
*(size_t *) funcctx->user_fctx = -1;
}
} else { /* do when there is no more left */
pfree(funcctx->user_fctx);
SRF_RETURN_DONE(funcctx);
}
}
%YAML 1.1
---
VERSION: 1.0.0.1
DEFINE:
- INPUT:
NAME: book
FILE:
- localhost.localdomain:/home/gpadmin/gpmrdata/whitepaper.txt
- MAP:
NAME: wordsplit_c
LIBRARY: /home/gpadmin/gpmapreduce/wordcount.so
FUNCTION: wordcount
LANGUAGE: c
OPTIMIZE: STRICT IMMUTABLE
PARAMETERS: value text
RETURNS:
- key text
- value integer
EXECUTE:
- RUN:
SOURCE: book
MAP: wordsplit_c
REDUCE: SUM
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment