Skip to content

Instantly share code, notes, and snippets.

@sebhtml
Created February 7, 2013 02:27
Show Gist options
  • Save sebhtml/4727926 to your computer and use it in GitHub Desktop.
Save sebhtml/4727926 to your computer and use it in GitHub Desktop.
This is a draft patch for dumping scaffolds in a file in less time.
diff --git a/code/plugin_GenomeNeighbourhood/GenomeNeighbourhood.cpp b/code/plugin_GenomeNeighbourhood/GenomeNeighbourhood.cpp
index e9d80f6..7a42732 100644
--- a/code/plugin_GenomeNeighbourhood/GenomeNeighbourhood.cpp
+++ b/code/plugin_GenomeNeighbourhood/GenomeNeighbourhood.cpp
@@ -1107,7 +1107,6 @@ void GenomeNeighbourhood::resolveSymbols(ComputeCore*core){
m_parameters=(Parameters*)core->getObjectFromSymbol(m_plugin,"/RayAssembler/ObjectStore/Parameters.ray");
m_contigLengths=(map<PathHandle,int>*)core->getObjectFromSymbol(m_plugin,"/RayAssembler/ObjectStore/ContigLengths.ray");
-
m_virtualCommunicator=core->getVirtualCommunicator();
m_core=core;
diff --git a/code/plugin_MachineHelper/MachineHelper.cpp b/code/plugin_MachineHelper/MachineHelper.cpp
index 5e0e6de..c7181ea 100644
--- a/code/plugin_MachineHelper/MachineHelper.cpp
+++ b/code/plugin_MachineHelper/MachineHelper.cpp
@@ -1191,6 +1191,9 @@ void MachineHelper::registerPlugin(ComputeCore*core){
RAY_MPI_TAG_GOOD_JOB_SEE_YOU_SOON_REPLY=core->allocateMessageTagHandle(plugin);
core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GOOD_JOB_SEE_YOU_SOON_REPLY,"RAY_MPI_TAG_GOOD_JOB_SEE_YOU_SOON_REPLY");
+
+ core->setObjectSymbol(m_plugin,&(m_ed->m_EXTENSION_contigs),"/RayAssembler/ObjectStore/ContigPaths.ray");
+ core->setObjectSymbol(m_plugin,&(m_ed->m_EXTENSION_identifiers),"/RayAssembler/ObjectStore/ContigNames.ray");
}
void MachineHelper::resolveSymbols(ComputeCore*core){
@@ -1358,9 +1361,6 @@ void MachineHelper::resolveSymbols(ComputeCore*core){
core->setMessageTagToSlaveModeSwitch(m_plugin,RAY_MPI_TAG_GOOD_JOB_SEE_YOU_SOON, RAY_SLAVE_MODE_DIE);
- core->setObjectSymbol(m_plugin,&(m_ed->m_EXTENSION_contigs),"/RayAssembler/ObjectStore/ContigPaths.ray");
- core->setObjectSymbol(m_plugin,&(m_ed->m_EXTENSION_identifiers),"/RayAssembler/ObjectStore/ContigNames.ray");
-
__BindPlugin(MachineHelper);
__BindAdapter(MachineHelper,RAY_MASTER_MODE_LOAD_CONFIG);
diff --git a/code/plugin_MessageProcessor/MessageProcessor.cpp b/code/plugin_MessageProcessor/MessageProcessor.cpp
index b186fd4..d29b112 100644
--- a/code/plugin_MessageProcessor/MessageProcessor.cpp
+++ b/code/plugin_MessageProcessor/MessageProcessor.cpp
@@ -145,7 +145,6 @@ __CreateMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_SEND_COVERAGE_VALUES_REPL
__CreateMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE);
__CreateMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY);
__CreateMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING);
-__CreateMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK);
void MessageProcessor::call_RAY_MPI_TAG_CONTIG_INFO(Message*message){
MessageUnit*incoming=(MessageUnit*)message->getBuffer();
@@ -2454,33 +2453,6 @@ void MessageProcessor::call_RAY_MPI_TAG_I_FINISHED_SCAFFOLDING(Message*message){
}
}
-void MessageProcessor::call_RAY_MPI_TAG_GET_CONTIG_CHUNK(Message*message){
- MessageUnit*incoming=(MessageUnit*)message->getBuffer();
- PathHandle contigId=incoming[0];
- int position=incoming[1];
- int index=m_fusionData->m_FUSION_identifier_map[contigId];
- int length=m_ed->m_EXTENSION_contigs[index].size();
- MessageUnit*messageContent=(MessageUnit*)m_outboxAllocator->allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
- int outputPosition=0;
- int origin=outputPosition;
- outputPosition++;
- int count=0;
-
- while(position<length
- && (outputPosition+KMER_U64_ARRAY_SIZE)<(int)(MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit))){
- Kmer theKmerObject;
- m_ed->m_EXTENSION_contigs[index].at(position++,&theKmerObject);
- theKmerObject.pack(messageContent,&outputPosition);
- count++;
- }
- messageContent[origin]=count;
- Message aMessage(messageContent,
- m_virtualCommunicator->getElementsPerQuery(RAY_MPI_TAG_GET_CONTIG_CHUNK),
- message->getSource(),RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY,
- m_parameters->getRank());
- m_outbox->push_back(&aMessage);
-}
-
void MessageProcessor::setScaffolder(Scaffolder*a){
m_scaffolder=a;
}
@@ -3031,9 +3003,6 @@ void MessageProcessor::registerPlugin(ComputeCore*core){
core->setMessageTagObjectHandler(plugin,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING, __GetAdapter(MessageProcessor,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING));
core->setMessageTagSymbol(plugin,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING,"RAY_MPI_TAG_I_FINISHED_SCAFFOLDING");
- RAY_MPI_TAG_GET_CONTIG_CHUNK=core->allocateMessageTagHandle(plugin);
- core->setMessageTagObjectHandler(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK, __GetAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK));
- core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK,"RAY_MPI_TAG_GET_CONTIG_CHUNK");
}
@@ -3138,8 +3107,6 @@ void MessageProcessor::resolveSymbols(ComputeCore*core){
RAY_MPI_TAG_FINISH_FUSIONS=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_FINISH_FUSIONS");
RAY_MPI_TAG_FINISH_FUSIONS_FINISHED=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_FINISH_FUSIONS_FINISHED");
RAY_MPI_TAG_FUSION_DONE=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_FUSION_DONE");
- RAY_MPI_TAG_GET_CONTIG_CHUNK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK");
- RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY");
RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION");
RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION_REPLY");
RAY_MPI_TAG_GET_COVERAGE_AND_MARK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_COVERAGE_AND_MARK");
@@ -3213,8 +3180,6 @@ void MessageProcessor::resolveSymbols(ComputeCore*core){
core->setMessageTagToSlaveModeSwitch(m_plugin,RAY_MPI_TAG_AUTOMATIC_DISTANCE_DETECTION, RAY_SLAVE_MODE_AUTOMATIC_DISTANCE_DETECTION);
core->setMessageTagToSlaveModeSwitch(m_plugin,RAY_MPI_TAG_ASK_LIBRARY_DISTANCES, RAY_SLAVE_MODE_SEND_LIBRARY_DISTANCES);
-
- core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_GET_CONTIG_CHUNK, RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY );
core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_REQUEST_VERTEX_READS, RAY_MPI_TAG_REQUEST_VERTEX_READS_REPLY );
core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_GET_READ_MATE, RAY_MPI_TAG_GET_READ_MATE_REPLY );
core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_REQUEST_VERTEX_COVERAGE, RAY_MPI_TAG_REQUEST_VERTEX_COVERAGE_REPLY );
@@ -3234,7 +3199,6 @@ void MessageProcessor::resolveSymbols(ComputeCore*core){
core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_REQUEST_READ_SEQUENCE, RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY );
core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_REQUEST_VERTEX_OUTGOING_EDGES, RAY_MPI_TAG_REQUEST_VERTEX_OUTGOING_EDGES_REPLY );
- core->setMessageTagSize(m_plugin, RAY_MPI_TAG_GET_CONTIG_CHUNK, MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit) );
core->setMessageTagSize(m_plugin, RAY_MPI_TAG_REQUEST_VERTEX_READS, max(5,KMER_U64_ARRAY_SIZE+1) );
core->setMessageTagSize(m_plugin, RAY_MPI_TAG_GET_READ_MATE, 4 );
core->setMessageTagSize(m_plugin, RAY_MPI_TAG_REQUEST_VERTEX_COVERAGE, KMER_U64_ARRAY_SIZE );
@@ -3355,6 +3319,5 @@ void MessageProcessor::resolveSymbols(ComputeCore*core){
__BindAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE);
__BindAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY);
__BindAdapter(MessageProcessor,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING);
- __BindAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK);
}
diff --git a/code/plugin_MessageProcessor/MessageProcessor.h b/code/plugin_MessageProcessor/MessageProcessor.h
index cb4fac3..cc0356c 100644
--- a/code/plugin_MessageProcessor/MessageProcessor.h
+++ b/code/plugin_MessageProcessor/MessageProcessor.h
@@ -1,6 +1,6 @@
/*
- Ray
- Copyright (C) 2010, 2011, 2012 Sébastien Boisvert
+ Ray -- Parallel genome assemblies for parallel DNA sequencing
+ Copyright (C) 2010, 2011, 2012, 2013 Sébastien Boisvert
http://DeNovoAssembler.SourceForge.Net/
@@ -19,8 +19,8 @@
*/
-#ifndef _MessageProcessor
-#define _MessageProcessor
+#ifndef _MessageProcessor_h
+#define _MessageProcessor_h
#include <code/plugin_SeedExtender/OpenAssemblerChooser.h>
#include <code/plugin_SeedExtender/SeedExtender.h>
@@ -157,13 +157,13 @@ __DeclareMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_SEND_COVERAGE_VALUES_REP
__DeclareMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE);
__DeclareMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY);
__DeclareMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING);
-__DeclareMessageTagAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK);
/**
* MessageProcessor receives all the messages of a MPI rank
* Message objects may also be checked using the Message inbox (m_inbox)
*
* Sometimes, a message will generate a reply (_REPLY)
+ *
* \author Sébastien Boisvert
*/
class MessageProcessor : public CorePlugin {
@@ -270,7 +270,6 @@ class MessageProcessor : public CorePlugin {
__AddAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE);
__AddAdapter(MessageProcessor,RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY);
__AddAdapter(MessageProcessor,RAY_MPI_TAG_I_FINISHED_SCAFFOLDING);
- __AddAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK);
uint64_t m_bloomBits;
@@ -342,8 +341,6 @@ class MessageProcessor : public CorePlugin {
MessageTag RAY_MPI_TAG_FINISH_FUSIONS;
MessageTag RAY_MPI_TAG_FINISH_FUSIONS_FINISHED;
MessageTag RAY_MPI_TAG_FUSION_DONE;
- MessageTag RAY_MPI_TAG_GET_CONTIG_CHUNK;
- MessageTag RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY;
MessageTag RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION;
MessageTag RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION_REPLY;
MessageTag RAY_MPI_TAG_GET_COVERAGE_AND_MARK;
@@ -680,12 +677,9 @@ SequencesIndexer*m_si
void call_RAY_MPI_TAG_REQUEST_READ_SEQUENCE(Message*message);
void call_RAY_MPI_TAG_REQUEST_READ_SEQUENCE_REPLY(Message*message);
void call_RAY_MPI_TAG_I_FINISHED_SCAFFOLDING(Message*message);
- void call_RAY_MPI_TAG_GET_CONTIG_CHUNK(Message*message);
void registerPlugin(ComputeCore*core);
void resolveSymbols(ComputeCore*core);
};
#endif
-
-
diff --git a/code/plugin_Scaffolder/Scaffolder.cpp b/code/plugin_Scaffolder/Scaffolder.cpp
index 82d4a84..92c780d 100644
--- a/code/plugin_Scaffolder/Scaffolder.cpp
+++ b/code/plugin_Scaffolder/Scaffolder.cpp
@@ -1,5 +1,5 @@
/*
- Ray
+ Ray -- Parallel genome assemblies for parallel DNA sequencing
Copyright (C) 2011, 2012, 2013 Sébastien Boisvert
http://DeNovoAssembler.SourceForge.Net/
@@ -49,6 +49,9 @@ __CreatePlugin(Scaffolder);
__CreateMasterModeAdapter(Scaffolder,RAY_MASTER_MODE_WRITE_SCAFFOLDS);
__CreateSlaveModeAdapter(Scaffolder,RAY_SLAVE_MODE_SCAFFOLDER);
+__CreateMessageTagAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_CHUNK);
+__CreateMessageTagAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK);
+
void Scaffolder::addMasterLink(SummarizedLink*a){
m_masterLinks.push_back(*a);
}
@@ -1423,6 +1426,84 @@ Case 16. (allowed)
}
void Scaffolder::getContigSequence(PathHandle id){
+ int mode=0;
+ int CODE_PATH_VANILLA_KMERS=mode++;
+ int CODE_PATH_PACKED_REGION=mode++;
+
+ int configuredCodePath=CODE_PATH_VANILLA_KMERS;
+
+ if(configuredCodePath==CODE_PATH_VANILLA_KMERS)
+ getContigSequenceFromKmers(id);
+ else if(configuredCodePath==CODE_PATH_PACKED_REGION){
+ getContigSequenceFromPackedObjects(id);
+ }
+}
+
+void Scaffolder::getContigSequenceFromPackedObjects(PathHandle id){
+
+ if(!m_hasContigSequence_Initialised){
+ m_hasContigSequence_Initialised=true;
+ m_rankIdForContig=getRankFromPathUniqueId(id);
+ m_theLengthInNucleotides=m_contigLengths[id];
+ m_position=0;
+ m_requestedContigChunk=false;
+ m_contigPathBuffer.str("");
+ }
+
+ if(m_position<m_theLengthInNucleotides){
+ if(!m_requestedContigChunk){
+ MessageUnit*message=(MessageUnit*)m_outboxAllocator->allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
+
+ int bufferPosition=0;
+ message[bufferPosition++]=id;
+ message[bufferPosition++]=m_position;
+ Message aMessage(message,bufferPosition,
+ m_rankIdForContig,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK,m_parameters->getRank());
+ m_virtualCommunicator->pushMessage(m_workerId,&aMessage);
+
+ m_requestedContigChunk=true;
+
+ }else if(m_virtualCommunicator->isMessageProcessed(m_workerId)){
+
+ vector<MessageUnit> data;
+ m_virtualCommunicator->getMessageResponseElements(m_workerId,&data);
+ /* the position in the message buffer */
+ int position=0;
+ /* the first element is the number of nucleotides */
+ int count=data[position++];
+ int iterator=0;
+ while(iterator<count){
+ int elementIndex=(iterator*2 ) / (sizeof(MessageUnit)*BITS_PER_BYTE);
+ int offset=(iterator*2) % (sizeof(MessageUnit)*BITS_PER_BYTE);
+
+ #ifdef ASSERT
+ assert(sizeof(MessageUnit)==sizeof(uint64_t));
+ #endif
+
+ uint64_t value=data[position+elementIndex];
+
+ value <<= ( sizeof(MessageUnit) - BITS_PER_NUCLEOTIDE - offset);
+ value>>= (sizeof(MessageUnit) - BITS_PER_NUCLEOTIDE );
+
+ char nucleotide=codeToChar(value,m_parameters->getColorSpaceMode());
+
+ m_contigPathBuffer<<nucleotide;
+ }
+ m_position+=count;
+ m_requestedContigChunk=false;
+ }
+ }else{
+ m_contigSequence=m_contigPathBuffer.str();
+ m_hasContigSequence=true;
+
+ #ifdef ASSERT
+ assert((int)m_contigSequence.length()==m_theLengthInNucleotides);
+ #endif
+ }
+}
+
+void Scaffolder::getContigSequenceFromKmers(PathHandle id){
+
if(!m_hasContigSequence_Initialised){
m_hasContigSequence_Initialised=true;
m_rankIdForContig=getRankFromPathUniqueId(id);
@@ -1526,6 +1607,7 @@ void Scaffolder::call_RAY_MASTER_MODE_WRITE_SCAFFOLDS(){
m_positionOnScaffold++;
/*
+ *
* Only add a new line if there is something more to add.
*/
if(m_positionOnScaffold%columns==0 && contigPosition < length){
@@ -1591,6 +1673,86 @@ void Scaffolder::setTimePrinter(TimePrinter*a){
m_timePrinter=a;
}
+void Scaffolder::call_RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK(Message*message){
+// TODO: this returns a part of a region, in 2-bit encoding
+
+ MessageUnit*incoming=(MessageUnit*)message->getBuffer();
+ int thePosition=0;
+ PathHandle contigId=incoming[thePosition++];
+ int position=incoming[thePosition++];
+
+#ifdef ASSERT
+ assert(m_contigNameIndex->count(contigId)>0);
+#endif
+
+ int index=(*m_contigNameIndex)[contigId];
+ int length=(*m_contigs)[index].size();
+ MessageUnit*messageContent=(MessageUnit*)m_outboxAllocator->allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
+ int outputPosition=0;
+ int origin=outputPosition;
+ outputPosition++;
+ int count=0;
+
+ while(position<length
+ && (outputPosition+KMER_U64_ARRAY_SIZE)<(int)(MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit))){
+ Kmer theKmerObject;
+
+#ifdef ASSERT
+ assert(index<(int)m_contigs->size());
+#endif
+
+ (*m_contigs)[index].at(position++,&theKmerObject);
+ theKmerObject.pack(messageContent,&outputPosition);
+ count++;
+ }
+ messageContent[origin]=count;
+ Message aMessage(messageContent,
+ m_virtualCommunicator->getElementsPerQuery(RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK),
+ message->getSource(),RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY,
+ m_parameters->getRank());
+ m_outbox->push_back(&aMessage);
+}
+
+void Scaffolder::call_RAY_MPI_TAG_GET_CONTIG_CHUNK(Message*message){
+ MessageUnit*incoming=(MessageUnit*)message->getBuffer();
+ int thePosition=0;
+ PathHandle contigId=incoming[thePosition++];
+ int position=incoming[thePosition++];
+
+#ifdef ASSERT
+ assert(m_contigNameIndex->count(contigId)>0);
+#endif
+
+ int index=(*m_contigNameIndex)[contigId];
+ int length=(*m_contigs)[index].size();
+ MessageUnit*messageContent=(MessageUnit*)m_outboxAllocator->allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
+ int outputPosition=0;
+ int origin=outputPosition;
+ outputPosition++;
+ int count=0;
+
+ while(position<length
+ && (outputPosition+KMER_U64_ARRAY_SIZE)<(int)(MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit))){
+ Kmer theKmerObject;
+
+#ifdef ASSERT
+ assert(index<(int)m_contigs->size());
+#endif
+ (*m_contigs)[index].at(position++,&theKmerObject);
+ theKmerObject.pack(messageContent,&outputPosition);
+ count++;
+ }
+
+ messageContent[origin]=count;
+ Message aMessage(messageContent,
+ m_virtualCommunicator->getElementsPerQuery(RAY_MPI_TAG_GET_CONTIG_CHUNK),
+ message->getSource(),RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY,
+ m_parameters->getRank());
+ m_outbox->push_back(&aMessage);
+}
+
+
+
void Scaffolder::registerPlugin(ComputeCore*core){
PluginHandle plugin=core->allocatePluginHandle();
@@ -1614,6 +1776,19 @@ void Scaffolder::registerPlugin(ComputeCore*core){
RAY_MPI_TAG_START_SCAFFOLDER=core->allocateMessageTagHandle(plugin);
core->setMessageTagSymbol(plugin,RAY_MPI_TAG_START_SCAFFOLDER,"RAY_MPI_TAG_START_SCAFFOLDER");
+
+ RAY_MPI_TAG_GET_CONTIG_CHUNK=core->allocateMessageTagHandle(plugin);
+ core->setMessageTagObjectHandler(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK, __GetAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_CHUNK));
+ core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK,"RAY_MPI_TAG_GET_CONTIG_CHUNK");
+
+ RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK=core->allocateMessageTagHandle(plugin);
+ core->setMessageTagObjectHandler(plugin,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK, __GetAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_CHUNK));
+ core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK,"RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK");
+
+ RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY=core->allocateMessageTagHandle(plugin);
+ core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY,"RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY");
+ RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY=core->allocateMessageTagHandle(plugin);
+ core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY,"RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY");
}
void Scaffolder::resolveSymbols(ComputeCore*core){
@@ -1626,6 +1801,7 @@ void Scaffolder::resolveSymbols(ComputeCore*core){
RAY_MPI_TAG_CONTIG_INFO=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_CONTIG_INFO");
RAY_MPI_TAG_GET_CONTIG_CHUNK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK");
+ RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK");
RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION");
RAY_MPI_TAG_GET_PATH_LENGTH=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_PATH_LENGTH");
RAY_MPI_TAG_GET_READ_MARKERS=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_READ_MARKERS");
@@ -1640,12 +1816,33 @@ void Scaffolder::resolveSymbols(ComputeCore*core){
RAY_MPI_TAG_SCAFFOLDING_LINKS_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_SCAFFOLDING_LINKS_REPLY");
RAY_MPI_TAG_START_SCAFFOLDER=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_START_SCAFFOLDER");
+ RAY_MPI_TAG_GET_CONTIG_CHUNK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK");
+ RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY");
+ RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK");
+ RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY");
+
core->setMessageTagToSlaveModeSwitch(m_plugin, RAY_MPI_TAG_START_SCAFFOLDER, RAY_SLAVE_MODE_SCAFFOLDER );
core->setMasterModeNextMasterMode(m_plugin,RAY_MASTER_MODE_WRITE_SCAFFOLDS, RAY_MASTER_MODE_COUNT_SEARCH_ELEMENTS);
+// the two message tags below won't multiplex very well during the transit
+// in the virtual messaging layer of Ray Platform
+
+ core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_GET_CONTIG_CHUNK, RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY );
+ core->setMessageTagSize(m_plugin, RAY_MPI_TAG_GET_CONTIG_CHUNK, MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit) );
+ core->setMessageTagReplyMessageTag(m_plugin, RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK, RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY );
+ core->setMessageTagSize(m_plugin, RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK, MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit) );
+
__BindPlugin(Scaffolder);
__BindAdapter(Scaffolder,RAY_MASTER_MODE_WRITE_SCAFFOLDS);
__BindAdapter(Scaffolder,RAY_SLAVE_MODE_SCAFFOLDER);
+ __BindAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_CHUNK);
+ __BindAdapter(MessageProcessor,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK);
+
+#if 0
+ m_contigs=(vector<GraphPath>*)core->getObjectFromSymbol(m_plugin,"/RayAssembler/ObjectStore/ContigPaths.ray");
+#endif
+
+ m_contigNameIndex=(map<PathHandle,int>*)core->getObjectFromSymbol(m_plugin, "/RayAssembler/ObjectStore/ContigNameIndex.ray");
}
diff --git a/code/plugin_Scaffolder/Scaffolder.h b/code/plugin_Scaffolder/Scaffolder.h
index 8701abc..c8e4e16 100644
--- a/code/plugin_Scaffolder/Scaffolder.h
+++ b/code/plugin_Scaffolder/Scaffolder.h
@@ -1,5 +1,5 @@
/*
- Ray
+ Ray -- Parallel genome assemblies for parallel DNA sequencing
Copyright (C) 2011, 2012, 2013 Sébastien Boisvert
http://DeNovoAssembler.SourceForge.Net/
@@ -19,8 +19,8 @@
*/
-#ifndef _Scaffolder
-#define _Scaffolder
+#ifndef _Scaffolder_h
+#define _Scaffolder_h
#include "ScaffoldingLink.h"
#include "SummarizedLink.h"
@@ -47,6 +47,8 @@ __DeclarePlugin(Scaffolder);
__DeclareMasterModeAdapter(Scaffolder,RAY_MASTER_MODE_WRITE_SCAFFOLDS);
__DeclareSlaveModeAdapter(Scaffolder,RAY_SLAVE_MODE_SCAFFOLDER);
+__DeclareMessageTagAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_CHUNK);
+__DeclareMessageTagAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK);
/**
* Scaffolder class, it uses MPI through the virtual communicator.
@@ -57,6 +59,8 @@ class Scaffolder : public CorePlugin{
__AddAdapter(Scaffolder,RAY_MASTER_MODE_WRITE_SCAFFOLDS);
__AddAdapter(Scaffolder,RAY_SLAVE_MODE_SCAFFOLDER);
+ __AddAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_CHUNK);
+ __AddAdapter(Scaffolder,RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK);
ostringstream m_operationBuffer;
@@ -66,6 +70,9 @@ class Scaffolder : public CorePlugin{
MessageTag RAY_MPI_TAG_REQUEST_VERTEX_READS;
MessageTag RAY_MPI_TAG_CONTIG_INFO;
MessageTag RAY_MPI_TAG_GET_CONTIG_CHUNK;
+ MessageTag RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY;
+ MessageTag RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK;
+ MessageTag RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK_REPLY;
MessageTag RAY_MPI_TAG_GET_COVERAGE_AND_DIRECTION;
MessageTag RAY_MPI_TAG_GET_PATH_LENGTH;
MessageTag RAY_MPI_TAG_GET_READ_MARKERS;
@@ -188,10 +195,15 @@ class Scaffolder : public CorePlugin{
RingAllocator*m_outboxAllocator;
bool m_ready;
- /**
+/**
* gets a contig sequence by receiving several MPI messages
*/
+
+ map<PathHandle,int>*m_contigNameIndex;
+
void getContigSequence(PathHandle id);
+ void getContigSequenceFromKmers(PathHandle id);
+ void getContigSequenceFromPackedObjects(PathHandle id);
void processContig();
void processContigPosition();
void processVertex(Kmer*vertex);
@@ -211,26 +223,38 @@ class Scaffolder : public CorePlugin{
void getCoverageOfBlockOfLife();
Rank m_rank;
+
+/*
+ * Variables for fetching remote paths.
+ */
+
+ int m_theLengthInNucleotides;
+ ostringstream m_contigPathBuffer;
public:
bool m_initialised;
- /**
+/**
* Number of ranks that have finished scaffolding
*/
int m_numberOfRanksFinished;
-
- /**
+/**
* Constructor of the scaffolder
*/
void constructor(StaticVector*outbox,StaticVector*inbox,RingAllocator*outboxAllocator,Parameters*parameters,
VirtualCommunicator*vc,SwitchMan*switchMan);
- void call_RAY_SLAVE_MODE_SCAFFOLDER();
void setContigPaths(vector<PathHandle>*names,vector<GraphPath>*paths);
void addMasterLink(SummarizedLink*link);
void solve();
void addMasterContig(PathHandle name,int length);
+
void call_RAY_MASTER_MODE_WRITE_SCAFFOLDS();
+
+ void call_RAY_SLAVE_MODE_SCAFFOLDER();
+
+ void call_RAY_MPI_TAG_GET_CONTIG_CHUNK(Message*message);
+ void call_RAY_MPI_TAG_GET_CONTIG_PACKED_CHUNK(Message*message);
+
void printFinalMessage();
void setTimePrinter(TimePrinter*a);
@@ -240,4 +264,3 @@ public:
};
#endif
-
diff --git a/code/plugin_SeedExtender/SeedExtender.cpp b/code/plugin_SeedExtender/SeedExtender.cpp
index d1111a8..6fff453 100644
--- a/code/plugin_SeedExtender/SeedExtender.cpp
+++ b/code/plugin_SeedExtender/SeedExtender.cpp
@@ -1,5 +1,5 @@
/*
- Ray
+ Ray -- Parallel genome assemblies for parallel DNA sequencing
Copyright (C) 2010, 2011, 2012, 2013 Sébastien Boisvert
http://DeNovoAssembler.SourceForge.Net/
@@ -2668,9 +2668,6 @@ void SeedExtender::registerPlugin(ComputeCore*core){
RAY_MPI_TAG_CONTIG_INFO_REPLY=core->allocateMessageTagHandle(plugin);
core->setMessageTagSymbol(plugin,RAY_MPI_TAG_CONTIG_INFO_REPLY,"RAY_MPI_TAG_CONTIG_INFO_REPLY");
- RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY=core->allocateMessageTagHandle(plugin);
- core->setMessageTagSymbol(plugin,RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY,"RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY");
-
m_switchMan=core->getSwitchMan();
RAY_MPI_TAG_ASK_IS_ASSEMBLED=core->allocateMessageTagHandle(plugin);
@@ -2681,7 +2678,7 @@ void SeedExtender::registerPlugin(ComputeCore*core){
core->setMessageTagObjectHandler(plugin,RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY, __GetAdapter(SeedExtender,RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY));
core->setMessageTagSymbol(plugin,RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY,"RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY");
-
+ core->setObjectSymbol(m_plugin,&(m_fusionData->m_FUSION_identifier_map),"/RayAssembler/ObjectStore/ContigNameIndex.ray");
}
void SeedExtender::resolveSymbols(ComputeCore*core){
@@ -2701,8 +2698,6 @@ void SeedExtender::resolveSymbols(ComputeCore*core){
RAY_MPI_TAG_VERTEX_READS_FROM_LIST_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_VERTEX_READS_FROM_LIST_REPLY");
RAY_MPI_TAG_VERTEX_READS_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_VERTEX_READS_REPLY");
RAY_MPI_TAG_CONTIG_INFO_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_CONTIG_INFO_REPLY");
- RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY");
-
RAY_MPI_TAG_ASK_IS_ASSEMBLED=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_ASK_IS_ASSEMBLED");
RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY=core->getMessageTagFromSymbol(m_plugin,"RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY");
@@ -2713,5 +2708,4 @@ void SeedExtender::resolveSymbols(ComputeCore*core){
__BindAdapter(SeedExtender,RAY_MPI_TAG_ADD_GRAPH_PATH);
__BindAdapter(SeedExtender,RAY_MPI_TAG_ASK_IS_ASSEMBLED); /**/
__BindAdapter(SeedExtender,RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY); /**/
-
}
diff --git a/code/plugin_SeedExtender/SeedExtender.h b/code/plugin_SeedExtender/SeedExtender.h
index 27b2257..45ec60a 100644
--- a/code/plugin_SeedExtender/SeedExtender.h
+++ b/code/plugin_SeedExtender/SeedExtender.h
@@ -1,6 +1,6 @@
/*
- Ray
- Copyright (C) 2010, 2011, 2012 Sébastien Boisvert
+ Ray -- Parallel genome assemblies for parallel DNA sequencing
+ Copyright (C) 2010, 2011, 2012, 2013 Sébastien Boisvert
http://DeNovoAssembler.SourceForge.Net/
@@ -84,11 +84,9 @@ class SeedExtender: public CorePlugin {
void configureTheBeautifulHotSkippingTechnology();
-
Rank m_rank;
MessageTag RAY_MPI_TAG_CONTIG_INFO_REPLY;
- MessageTag RAY_MPI_TAG_GET_CONTIG_CHUNK_REPLY;
MessageTag RAY_MPI_TAG_ASK_IS_ASSEMBLED;
MessageTag RAY_MPI_TAG_ASK_IS_ASSEMBLED_REPLY;
@@ -108,8 +106,6 @@ class SeedExtender: public CorePlugin {
SlaveMode RAY_SLAVE_MODE_EXTENSION;
SlaveMode RAY_SLAVE_MODE_DO_NOTHING;
-
-
// all these parameters are not attributes.
vector<GraphPath>*m_seeds;
Kmer*m_currentVertex;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment