Skip to content

Instantly share code, notes, and snippets.

@cmoine
Created January 17, 2013 08:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cmoine/4554455 to your computer and use it in GitHub Desktop.
Save cmoine/4554455 to your computer and use it in GitHub Desktop.
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.core.runtime.Assert;
import org.eclipse.emf.cdo.CDOObject;
import org.eclipse.emf.cdo.common.id.CDOIDUtil;
import org.eclipse.emf.cdo.common.model.CDOModelUtil;
import org.eclipse.emf.cdo.eresource.CDOResource;
import org.eclipse.emf.cdo.eresource.EresourcePackage;
import org.eclipse.emf.cdo.server.db.IDBStore;
import org.eclipse.emf.cdo.server.db.mapping.IMappingStrategy;
import org.eclipse.emf.cdo.server.internal.db.CDODBSchema;
import org.eclipse.emf.db.util.DBQueryUtil;
import org.eclipse.emf.ecore.EAttribute;
import org.eclipse.emf.ecore.EClass;
import org.eclipse.emf.ecore.EReference;
import org.eclipse.emf.ecore.EStructuralFeature;
import org.eclipse.emf.ecore.EcorePackage;
import org.eclipse.emf.ecore.util.EcoreUtil;
import org.eclipse.emf.spi.cdo.InternalCDOObject;
import au.com.bytecode.opencsv.CSVWriter;
import com.google.common.base.Charsets;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
// Hidden imports
public final class MassiveInserter {
private static final int VALUE_HEADER=7;
private static final Logger LOG=Logger.getLogger(MassiveInserter.class);
private static final String NONE="0"; //$NON-NLS-1$
private static class Container {
private final CDOObject container;
private final EReference ref;
public Container(CDOObject container, EReference ref) {
this.container=container;
this.ref=ref;
}
@Override
public int hashCode() {
final int prime=31;
int result=1;
result=prime * result + ((container == null) ? 0 : container.hashCode());
result=prime * result + ((ref == null) ? 0 : ref.getFeatureID());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Container other=(Container) obj;
if (container == null) {
if (other.container != null)
return false;
} else if (!container.equals(other.container))
return false;
if (ref == null) {
if (other.ref != null)
return false;
} else if (!ref.equals(other.ref))
return false;
return true;
}
}
private static final class TemporaryStream implements AutoCloseable {
private final CSVWriter writer;
private final File file;
private boolean streamClosed;
private TemporaryStream(String prefix) throws IOException {
file=File.createTempFile(prefix + "-", ".csv"); //$NON-NLS-1$ //$NON-NLS-2$
file.deleteOnExit();
writer=new CSVWriter(new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8));
}
public void loadIntoTable(Connection con, String tableName) throws SQLException, IOException {
closeStream();
String filepath=StringUtils.replace(file.getPath(), "\\", "\\\\"); //$NON-NLS-1$ //$NON-NLS-2$
String sql=MessageFormat.format("LOAD DATA LOCAL INFILE ''{0}'' INTO TABLE {1} CHARACTER SET utf8 FIELDS TERMINATED BY '','' ENCLOSED BY ''\"''", //$NON-NLS-1$
filepath, tableName);
QueryUtil.logQuery(sql + " => file size is " + file.length()); //$NON-NLS-1$
con.createStatement().execute(sql);
}
@Override
public void close() throws IOException {
closeStream();
file.delete();
}
private void closeStream() throws IOException {
if (!streamClosed) {
writer.close();
streamClosed=true;
}
}
}
private final Multimap<EClass, String> class2column=LinkedListMultimap.create();
private final Multimap<String, EStructuralFeature> feature2column=HashMultimap.create();
private CDOResource resource;
public MassiveInserter() {
}
public void massiveLoad(List<? extends CDOObject> objects) {
LOG.debug("Fill Multimap"); //$NON-NLS-1$
Multimap<Container, CDOObject> map=LinkedHashMultimap.create();
Stack<CDOObject> stack=new Stack<>();
for (CDOObject object : objects) {
Assert.isTrue(object.eContainer() == null && object.cdoResource() != null, "Object must directly belong to a CDOResource"); //$NON-NLS-1$
Assert.isTrue(resource == null || resource.equals(object.cdoResource()), "Different parent"); //$NON-NLS-1$
resource=object.cdoResource();
Container rootContainer=new Container(resource, EresourcePackage.eINSTANCE.getCDOResource_Contents());
map.put(rootContainer, object);
stack.add(object);
}
while (!stack.isEmpty()) {
CDOObject obj=stack.pop();
for (EReference ref : obj.eClass().getEAllContainments()) {
Container container=new Container(obj, ref);
for (CDOObject child : (List<CDOObject>) obj.eGet(ref)) {
// if (!(child instanceof Information)) {
map.put(container, child);
stack.push(child);
// }
}
}
}
try (Connection con=RestQueryUtil.createJDBCConnection()) {
massiveLoad(con, map);
try (Statement stmt=con.createStatement()) {
String sql="UPDATE " + QueryUtil.getTableName(EresourcePackage.eINSTANCE.getCDOResource()) + " SET "
+ QueryUtil.getColumnName(EresourcePackage.eINSTANCE.getCDOResource_Contents()) + "="
+ QueryUtil.getColumnName(EresourcePackage.eINSTANCE.getCDOResource_Contents()) + "+" + objects.size() + ","
+ CDODBSchema.ATTRIBUTES_REVISED + "=" + getRevision() + " WHERE " + CDODBSchema.ATTRIBUTES_ID + "="
+ CDOIDUtil.getLong(resource.cdoID());
QueryUtil.logQuery(sql);
stmt.execute(sql);
}
} catch (SQLException | IOException | ExecutionException e) {
LOG.error("Failed to load massively", e); //$NON-NLS-1$
}
}
private void massiveLoad(Connection con, Multimap<Container, CDOObject> objects) throws SQLException, IOException, ExecutionException {
long timestamp=System.currentTimeMillis();
IDBStore store=(IDBStore) ImporterApplication.getRepository().getStore();
// initialize cdo_ids
initializeAllCdoIds(con, objects, store, timestamp);
initializeContent(con, objects, store, timestamp);
}
private void initializeContent(Connection con, Multimap<Container, CDOObject> objects, IDBStore store, long timestamp) throws SQLException, IOException,
ExecutionException {
LOG.debug("initialize content"); //$NON-NLS-1$
Map<EClass, TemporaryStream> valueStreams=new HashMap<>();
Map<EReference, TemporaryStream> resourceStreams=new HashMap<>();
try {
IMappingStrategy mappingStrategy=store.getMappingStrategy();
String timestampStr=Long.toString(timestamp);
String revision=getRevision();
for (Container container : objects.keySet()) {
int index=((List<?>) container.container.eGet(container.ref)).size();
Collection<CDOObject> list=objects.get(container);
// EClass clazz=null;
for (CDOObject object : list) {
EClass clazz=object.eClass();
Assert.isTrue(((EClass) container.ref.getEType()).getInstanceClass().isAssignableFrom(clazz.getInstanceClass()),
"EClass does not match"); //$NON-NLS-1$
// if (clazz == null)
// else
// Assert.isTrue(clazz.equals(object.eClass()), "EClasses differs"); //$NON-NLS-1$
long cdoid=CDOIDUtil.getLong(object.cdoID());
TemporaryStream valueStream=get(valueStreams, clazz);
String[] valueValues=loadValues(con, mappingStrategy, object);
valueValues[0]=Long.toString(cdoid);
valueValues[1]=revision /* cdo_version */;
valueValues[2]=timestampStr /* cdo_created */;
valueValues[3]=NONE /* cdo_revised */;
String containerId=Long.toString(CDOIDUtil.getLong(container.container.cdoID()));
boolean isRoot=(container.container instanceof CDOResource);
valueValues[4]=isRoot ? containerId : NONE /* cdo_resource */;
valueValues[5]=isRoot ? NONE : containerId /* cdo_container */;
valueValues[6]=Integer.toString(((InternalCDOObject) object).eContainerFeatureID()) /* cdo_feature */;
valueStream.writer.writeNext(valueValues);
TemporaryStream resourceStream=get(resourceStreams, container.ref);
String[] resourceValues=STRINGS.get(3);
resourceValues[0]=Long.toString(CDOIDUtil.getLong(container.container.cdoID()));
resourceValues[1]=Integer.toString(index++);
resourceValues[2]=Long.toString(cdoid);
resourceStream.writer.writeNext(resourceValues);
}
}
for (EClass clazz : valueStreams.keySet()) {
TemporaryStream stream=valueStreams.get(clazz);
stream.loadIntoTable(con, QueryUtil.getTableName(clazz));
}
for (EReference ref : resourceStreams.keySet()) {
TemporaryStream stream=resourceStreams.get(ref);
stream.loadIntoTable(con, DBQueryUtil.getTableName(ref));
}
LOG.debug("/initialize content"); //$NON-NLS-1$
} finally {
List<TemporaryStream> list=new ArrayList<>();
list.addAll(resourceStreams.values());
list.addAll(resourceStreams.values());
for (TemporaryStream stream : list) {
try {
stream.close();
} catch (IOException e) {
LOG.error("Failed closing stream " + stream.file, e); //$NON-NLS-1$
}
}
}
}
private String getRevision() {
return Integer.toString(resource.cdoRevision().getVersion() + 1);
}
private <T> TemporaryStream get(Map<T, TemporaryStream> valueStreams, T o) throws IOException {
if (!valueStreams.containsKey(o)) {
if (o instanceof EClass)
valueStreams.put(o, new TemporaryStream(((EClass) o).getName()));
else
valueStreams.put(o, new TemporaryStream(((EReference) o).getEContainingClass().getName() + "-" + ((EReference) o).getName())); //$NON-NLS-1$
}
return valueStreams.get(o);
}
private void initializeAllCdoIds(Connection con, Multimap<Container, CDOObject> objects, IDBStore store, long timestamp) throws IOException, SQLException,
ExecutionException {
try (TemporaryStream objectsStream=new TemporaryStream("objects"); //$NON-NLS-1$
Statement stmt=con.createStatement();
ResultSet resultSet=stmt.executeQuery("select uri,id from cdo_external_refs;")) { //$NON-NLS-1$
LOG.debug("initialize " + objects.values().size() + " CDOIDs into " + objectsStream.file.getPath()); //$NON-NLS-1$ //$NON-NLS-2$
// IMetaDataManager metaDataManager=store.getMetaDataManager();
Map<String, String> metaIds=new HashMap<>();
while (resultSet.next()) {
metaIds.put(resultSet.getString(1), resultSet.getString(2));
}
// new ExternalReferenceManager(store.getIDHandler()).;
long cdoid=QueryUtil.cdoID(store.getIDHandler().getNextCDOID(resource.cdoRevision()));
String timestampStr=Long.toString(timestamp);
for (Container container : objects.keySet()) {
for (CDOObject object : objects.get(container)) {
((InternalCDOObject) object).cdoInternalSetID(CDOIDUtil.createLong(cdoid));
String[] objectValues=STRINGS.get(3);
objectValues[0]=Long.toString(cdoid /* cdo_source */);
// store.getIDHandler().mapURI(store, null, timestamp);
// store.getRepository().get
String uri=EcoreUtil.getURI(object.eClass()).toString();
objectValues[1]=metaIds.get(uri);// Long.toString(CDOIDUtil.getLong(metaDataManager.getMetaID(object.eClass(), timestamp)));
objectValues[2]=timestampStr;
objectsStream.writer.writeNext(objectValues);
cdoid++;
}
}
objectsStream.loadIntoTable(con, CDODBSchema.CDO_OBJECTS);
store.getIDHandler().setLastObjectID(CDOIDUtil.createLong(cdoid));
LOG.debug("/initialize all CDOIDs"); //$NON-NLS-1$
}
}
private static final LoadingCache<Integer, String[]> STRINGS=CacheBuilder.newBuilder().build(new CacheLoader<Integer, String[]>() {
@Override
public String[] load(Integer length) throws Exception {
return new String[length];
}
});
private static final SimpleDateFormat SQL_DATE_FORMAT=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); //$NON-NLS-1$
private String[] loadValues(Connection con, IMappingStrategy mappingStrategy, CDOObject object) throws SQLException, IOException,
ExecutionException {
EClass eClass=object.eClass();
if (!class2column.containsKey(eClass)) {
try (Statement stmt=con.createStatement(); ResultSet rs=stmt.executeQuery("describe " + QueryUtil.getTableName(eClass))) { //$NON-NLS-1$
while (rs.next()) {
String columnName=rs.getString(1);
if (!columnName.startsWith("cdo_")) //$NON-NLS-1$
class2column.put(eClass, columnName);
}
}
for (EStructuralFeature feature : CDOModelUtil.getAllPersistentFeatures(eClass)) {
feature2column.put(QueryUtil.getColumnName(feature), feature);
}
}
Collection<String> columns=class2column.get(eClass);
String[] result=STRINGS.get(VALUE_HEADER + columns.size());
int index=VALUE_HEADER;
for (String column : columns) {
boolean found=false;
for (EStructuralFeature feature : feature2column.get(column)) {
if (feature.getEContainingClass().isSuperTypeOf(eClass)) {
Assert.isTrue(!found, "Found column " + column + " many times"); //$NON-NLS-1$ //$NON-NLS-2$
found=true;
String val=StringUtils.EMPTY;
if (feature instanceof EAttribute) {
EAttribute att=(EAttribute) feature;
Object value=object.eGet(feature);
if (value != null) {
if (value instanceof String)
val=(String) value;
else if (value != null && value instanceof Integer)
val=Integer.toString((Integer) value);
else if (value != null && value instanceof Double)
val=Double.toString((Double) value);
else if (value instanceof Boolean)
val=((Boolean) value) ? "1" : "0"; //$NON-NLS-1$ //$NON-NLS-2$
else if (value instanceof Date)
val=SQL_DATE_FORMAT.format((Date) value);
} else if (att.getEType().equals(EcorePackage.eINSTANCE.getEDate())) {
val=SQL_DATE_FORMAT.format(new Date());
}
} else {
EReference ref=(EReference) feature;
if (ref.getUpperBound() == EReference.UNBOUNDED_MULTIPLICITY) {
val=Integer.toString(((List) object.eGet(feature)).size());
}
}
result[index++]=val;
}
}
Assert.isTrue(found, "Column " + column + " not found"); //$NON-NLS-1$ //$NON-NLS-2$
}
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment