This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStreamWriter; | |
import java.sql.Connection; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.sql.Statement; | |
import java.text.MessageFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Stack; | |
import java.util.concurrent.ExecutionException; | |
import org.apache.commons.lang3.StringUtils; | |
import org.eclipse.core.runtime.Assert; | |
import org.eclipse.emf.cdo.CDOObject; | |
import org.eclipse.emf.cdo.common.id.CDOIDUtil; | |
import org.eclipse.emf.cdo.common.model.CDOModelUtil; | |
import org.eclipse.emf.cdo.eresource.CDOResource; | |
import org.eclipse.emf.cdo.eresource.EresourcePackage; | |
import org.eclipse.emf.cdo.server.db.IDBStore; | |
import org.eclipse.emf.cdo.server.db.mapping.IMappingStrategy; | |
import org.eclipse.emf.cdo.server.internal.db.CDODBSchema; | |
import org.eclipse.emf.db.util.DBQueryUtil; | |
import org.eclipse.emf.ecore.EAttribute; | |
import org.eclipse.emf.ecore.EClass; | |
import org.eclipse.emf.ecore.EReference; | |
import org.eclipse.emf.ecore.EStructuralFeature; | |
import org.eclipse.emf.ecore.EcorePackage; | |
import org.eclipse.emf.ecore.util.EcoreUtil; | |
import org.eclipse.emf.spi.cdo.InternalCDOObject; | |
import au.com.bytecode.opencsv.CSVWriter; | |
import com.google.common.base.Charsets; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.cache.CacheLoader; | |
import com.google.common.cache.LoadingCache; | |
import com.google.common.collect.HashMultimap; | |
import com.google.common.collect.LinkedHashMultimap; | |
import com.google.common.collect.LinkedListMultimap; | |
import com.google.common.collect.Multimap; | |
// Hidden imports | |
public final class MassiveInserter { | |
private static final int VALUE_HEADER=7; | |
private static final Logger LOG=Logger.getLogger(MassiveInserter.class); | |
private static final String NONE="0"; //$NON-NLS-1$ | |
private static class Container { | |
private final CDOObject container; | |
private final EReference ref; | |
public Container(CDOObject container, EReference ref) { | |
this.container=container; | |
this.ref=ref; | |
} | |
@Override | |
public int hashCode() { | |
final int prime=31; | |
int result=1; | |
result=prime * result + ((container == null) ? 0 : container.hashCode()); | |
result=prime * result + ((ref == null) ? 0 : ref.getFeatureID()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Container other=(Container) obj; | |
if (container == null) { | |
if (other.container != null) | |
return false; | |
} else if (!container.equals(other.container)) | |
return false; | |
if (ref == null) { | |
if (other.ref != null) | |
return false; | |
} else if (!ref.equals(other.ref)) | |
return false; | |
return true; | |
} | |
} | |
private static final class TemporaryStream implements AutoCloseable { | |
private final CSVWriter writer; | |
private final File file; | |
private boolean streamClosed; | |
private TemporaryStream(String prefix) throws IOException { | |
file=File.createTempFile(prefix + "-", ".csv"); //$NON-NLS-1$ //$NON-NLS-2$ | |
file.deleteOnExit(); | |
writer=new CSVWriter(new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8)); | |
} | |
public void loadIntoTable(Connection con, String tableName) throws SQLException, IOException { | |
closeStream(); | |
String filepath=StringUtils.replace(file.getPath(), "\\", "\\\\"); //$NON-NLS-1$ //$NON-NLS-2$ | |
String sql=MessageFormat.format("LOAD DATA LOCAL INFILE ''{0}'' INTO TABLE {1} CHARACTER SET utf8 FIELDS TERMINATED BY '','' ENCLOSED BY ''\"''", //$NON-NLS-1$ | |
filepath, tableName); | |
QueryUtil.logQuery(sql + " => file size is " + file.length()); //$NON-NLS-1$ | |
con.createStatement().execute(sql); | |
} | |
@Override | |
public void close() throws IOException { | |
closeStream(); | |
file.delete(); | |
} | |
private void closeStream() throws IOException { | |
if (!streamClosed) { | |
writer.close(); | |
streamClosed=true; | |
} | |
} | |
} | |
private final Multimap<EClass, String> class2column=LinkedListMultimap.create(); | |
private final Multimap<String, EStructuralFeature> feature2column=HashMultimap.create(); | |
private CDOResource resource; | |
public MassiveInserter() { | |
} | |
public void massiveLoad(List<? extends CDOObject> objects) { | |
LOG.debug("Fill Multimap"); //$NON-NLS-1$ | |
Multimap<Container, CDOObject> map=LinkedHashMultimap.create(); | |
Stack<CDOObject> stack=new Stack<>(); | |
for (CDOObject object : objects) { | |
Assert.isTrue(object.eContainer() == null && object.cdoResource() != null, "Object must directly belong to a CDOResource"); //$NON-NLS-1$ | |
Assert.isTrue(resource == null || resource.equals(object.cdoResource()), "Different parent"); //$NON-NLS-1$ | |
resource=object.cdoResource(); | |
Container rootContainer=new Container(resource, EresourcePackage.eINSTANCE.getCDOResource_Contents()); | |
map.put(rootContainer, object); | |
stack.add(object); | |
} | |
while (!stack.isEmpty()) { | |
CDOObject obj=stack.pop(); | |
for (EReference ref : obj.eClass().getEAllContainments()) { | |
Container container=new Container(obj, ref); | |
for (CDOObject child : (List<CDOObject>) obj.eGet(ref)) { | |
// if (!(child instanceof Information)) { | |
map.put(container, child); | |
stack.push(child); | |
// } | |
} | |
} | |
} | |
try (Connection con=RestQueryUtil.createJDBCConnection()) { | |
massiveLoad(con, map); | |
try (Statement stmt=con.createStatement()) { | |
String sql="UPDATE " + QueryUtil.getTableName(EresourcePackage.eINSTANCE.getCDOResource()) + " SET " | |
+ QueryUtil.getColumnName(EresourcePackage.eINSTANCE.getCDOResource_Contents()) + "=" | |
+ QueryUtil.getColumnName(EresourcePackage.eINSTANCE.getCDOResource_Contents()) + "+" + objects.size() + "," | |
+ CDODBSchema.ATTRIBUTES_REVISED + "=" + getRevision() + " WHERE " + CDODBSchema.ATTRIBUTES_ID + "=" | |
+ CDOIDUtil.getLong(resource.cdoID()); | |
QueryUtil.logQuery(sql); | |
stmt.execute(sql); | |
} | |
} catch (SQLException | IOException | ExecutionException e) { | |
LOG.error("Failed to load massively", e); //$NON-NLS-1$ | |
} | |
} | |
private void massiveLoad(Connection con, Multimap<Container, CDOObject> objects) throws SQLException, IOException, ExecutionException { | |
long timestamp=System.currentTimeMillis(); | |
IDBStore store=(IDBStore) ImporterApplication.getRepository().getStore(); | |
// initialize cdo_ids | |
initializeAllCdoIds(con, objects, store, timestamp); | |
initializeContent(con, objects, store, timestamp); | |
} | |
private void initializeContent(Connection con, Multimap<Container, CDOObject> objects, IDBStore store, long timestamp) throws SQLException, IOException, | |
ExecutionException { | |
LOG.debug("initialize content"); //$NON-NLS-1$ | |
Map<EClass, TemporaryStream> valueStreams=new HashMap<>(); | |
Map<EReference, TemporaryStream> resourceStreams=new HashMap<>(); | |
try { | |
IMappingStrategy mappingStrategy=store.getMappingStrategy(); | |
String timestampStr=Long.toString(timestamp); | |
String revision=getRevision(); | |
for (Container container : objects.keySet()) { | |
int index=((List<?>) container.container.eGet(container.ref)).size(); | |
Collection<CDOObject> list=objects.get(container); | |
// EClass clazz=null; | |
for (CDOObject object : list) { | |
EClass clazz=object.eClass(); | |
Assert.isTrue(((EClass) container.ref.getEType()).getInstanceClass().isAssignableFrom(clazz.getInstanceClass()), | |
"EClass does not match"); //$NON-NLS-1$ | |
// if (clazz == null) | |
// else | |
// Assert.isTrue(clazz.equals(object.eClass()), "EClasses differs"); //$NON-NLS-1$ | |
long cdoid=CDOIDUtil.getLong(object.cdoID()); | |
TemporaryStream valueStream=get(valueStreams, clazz); | |
String[] valueValues=loadValues(con, mappingStrategy, object); | |
valueValues[0]=Long.toString(cdoid); | |
valueValues[1]=revision /* cdo_version */; | |
valueValues[2]=timestampStr /* cdo_created */; | |
valueValues[3]=NONE /* cdo_revised */; | |
String containerId=Long.toString(CDOIDUtil.getLong(container.container.cdoID())); | |
boolean isRoot=(container.container instanceof CDOResource); | |
valueValues[4]=isRoot ? containerId : NONE /* cdo_resource */; | |
valueValues[5]=isRoot ? NONE : containerId /* cdo_container */; | |
valueValues[6]=Integer.toString(((InternalCDOObject) object).eContainerFeatureID()) /* cdo_feature */; | |
valueStream.writer.writeNext(valueValues); | |
TemporaryStream resourceStream=get(resourceStreams, container.ref); | |
String[] resourceValues=STRINGS.get(3); | |
resourceValues[0]=Long.toString(CDOIDUtil.getLong(container.container.cdoID())); | |
resourceValues[1]=Integer.toString(index++); | |
resourceValues[2]=Long.toString(cdoid); | |
resourceStream.writer.writeNext(resourceValues); | |
} | |
} | |
for (EClass clazz : valueStreams.keySet()) { | |
TemporaryStream stream=valueStreams.get(clazz); | |
stream.loadIntoTable(con, QueryUtil.getTableName(clazz)); | |
} | |
for (EReference ref : resourceStreams.keySet()) { | |
TemporaryStream stream=resourceStreams.get(ref); | |
stream.loadIntoTable(con, DBQueryUtil.getTableName(ref)); | |
} | |
LOG.debug("/initialize content"); //$NON-NLS-1$ | |
} finally { | |
List<TemporaryStream> list=new ArrayList<>(); | |
list.addAll(resourceStreams.values()); | |
list.addAll(resourceStreams.values()); | |
for (TemporaryStream stream : list) { | |
try { | |
stream.close(); | |
} catch (IOException e) { | |
LOG.error("Failed closing stream " + stream.file, e); //$NON-NLS-1$ | |
} | |
} | |
} | |
} | |
private String getRevision() { | |
return Integer.toString(resource.cdoRevision().getVersion() + 1); | |
} | |
private <T> TemporaryStream get(Map<T, TemporaryStream> valueStreams, T o) throws IOException { | |
if (!valueStreams.containsKey(o)) { | |
if (o instanceof EClass) | |
valueStreams.put(o, new TemporaryStream(((EClass) o).getName())); | |
else | |
valueStreams.put(o, new TemporaryStream(((EReference) o).getEContainingClass().getName() + "-" + ((EReference) o).getName())); //$NON-NLS-1$ | |
} | |
return valueStreams.get(o); | |
} | |
private void initializeAllCdoIds(Connection con, Multimap<Container, CDOObject> objects, IDBStore store, long timestamp) throws IOException, SQLException, | |
ExecutionException { | |
try (TemporaryStream objectsStream=new TemporaryStream("objects"); //$NON-NLS-1$ | |
Statement stmt=con.createStatement(); | |
ResultSet resultSet=stmt.executeQuery("select uri,id from cdo_external_refs;")) { //$NON-NLS-1$ | |
LOG.debug("initialize " + objects.values().size() + " CDOIDs into " + objectsStream.file.getPath()); //$NON-NLS-1$ //$NON-NLS-2$ | |
// IMetaDataManager metaDataManager=store.getMetaDataManager(); | |
Map<String, String> metaIds=new HashMap<>(); | |
while (resultSet.next()) { | |
metaIds.put(resultSet.getString(1), resultSet.getString(2)); | |
} | |
// new ExternalReferenceManager(store.getIDHandler()).; | |
long cdoid=QueryUtil.cdoID(store.getIDHandler().getNextCDOID(resource.cdoRevision())); | |
String timestampStr=Long.toString(timestamp); | |
for (Container container : objects.keySet()) { | |
for (CDOObject object : objects.get(container)) { | |
((InternalCDOObject) object).cdoInternalSetID(CDOIDUtil.createLong(cdoid)); | |
String[] objectValues=STRINGS.get(3); | |
objectValues[0]=Long.toString(cdoid /* cdo_source */); | |
// store.getIDHandler().mapURI(store, null, timestamp); | |
// store.getRepository().get | |
String uri=EcoreUtil.getURI(object.eClass()).toString(); | |
objectValues[1]=metaIds.get(uri);// Long.toString(CDOIDUtil.getLong(metaDataManager.getMetaID(object.eClass(), timestamp))); | |
objectValues[2]=timestampStr; | |
objectsStream.writer.writeNext(objectValues); | |
cdoid++; | |
} | |
} | |
objectsStream.loadIntoTable(con, CDODBSchema.CDO_OBJECTS); | |
store.getIDHandler().setLastObjectID(CDOIDUtil.createLong(cdoid)); | |
LOG.debug("/initialize all CDOIDs"); //$NON-NLS-1$ | |
} | |
} | |
private static final LoadingCache<Integer, String[]> STRINGS=CacheBuilder.newBuilder().build(new CacheLoader<Integer, String[]>() { | |
@Override | |
public String[] load(Integer length) throws Exception { | |
return new String[length]; | |
} | |
}); | |
private static final SimpleDateFormat SQL_DATE_FORMAT=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); //$NON-NLS-1$ | |
private String[] loadValues(Connection con, IMappingStrategy mappingStrategy, CDOObject object) throws SQLException, IOException, | |
ExecutionException { | |
EClass eClass=object.eClass(); | |
if (!class2column.containsKey(eClass)) { | |
try (Statement stmt=con.createStatement(); ResultSet rs=stmt.executeQuery("describe " + QueryUtil.getTableName(eClass))) { //$NON-NLS-1$ | |
while (rs.next()) { | |
String columnName=rs.getString(1); | |
if (!columnName.startsWith("cdo_")) //$NON-NLS-1$ | |
class2column.put(eClass, columnName); | |
} | |
} | |
for (EStructuralFeature feature : CDOModelUtil.getAllPersistentFeatures(eClass)) { | |
feature2column.put(QueryUtil.getColumnName(feature), feature); | |
} | |
} | |
Collection<String> columns=class2column.get(eClass); | |
String[] result=STRINGS.get(VALUE_HEADER + columns.size()); | |
int index=VALUE_HEADER; | |
for (String column : columns) { | |
boolean found=false; | |
for (EStructuralFeature feature : feature2column.get(column)) { | |
if (feature.getEContainingClass().isSuperTypeOf(eClass)) { | |
Assert.isTrue(!found, "Found column " + column + " many times"); //$NON-NLS-1$ //$NON-NLS-2$ | |
found=true; | |
String val=StringUtils.EMPTY; | |
if (feature instanceof EAttribute) { | |
EAttribute att=(EAttribute) feature; | |
Object value=object.eGet(feature); | |
if (value != null) { | |
if (value instanceof String) | |
val=(String) value; | |
else if (value != null && value instanceof Integer) | |
val=Integer.toString((Integer) value); | |
else if (value != null && value instanceof Double) | |
val=Double.toString((Double) value); | |
else if (value instanceof Boolean) | |
val=((Boolean) value) ? "1" : "0"; //$NON-NLS-1$ //$NON-NLS-2$ | |
else if (value instanceof Date) | |
val=SQL_DATE_FORMAT.format((Date) value); | |
} else if (att.getEType().equals(EcorePackage.eINSTANCE.getEDate())) { | |
val=SQL_DATE_FORMAT.format(new Date()); | |
} | |
} else { | |
EReference ref=(EReference) feature; | |
if (ref.getUpperBound() == EReference.UNBOUNDED_MULTIPLICITY) { | |
val=Integer.toString(((List) object.eGet(feature)).size()); | |
} | |
} | |
result[index++]=val; | |
} | |
} | |
Assert.isTrue(found, "Column " + column + " not found"); //$NON-NLS-1$ //$NON-NLS-2$ | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment