Skip to content

Instantly share code, notes, and snippets.

@NISH1001
Created October 6, 2017 06:36
Show Gist options
  • Save NISH1001/2a249ef642c32ec790da7ef6d81a95e4 to your computer and use it in GitHub Desktop.
Save NISH1001/2a249ef642c32ec790da7ef6d81a95e4 to your computer and use it in GitHub Desktop.
bulk api for salesforce
package salesforceintegration.salesforce.salesforceclient;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import salesforceintegration.salesforce.enums.SalesforceObjectType;
import salesforceintegration.salesforce.exceptions.SalesforceInvalidAccessTokenException;
import salesforceintegration.utils.JSONUtils;
/**
* This class is a client for bulk API
* It is used to call bulk api of Salesforce.
*
* A job is created to process the records subsequently.
* A job has many batch.
* A batch has many records to be processed.
*
* So, processing is done in batch.
* A job should be created before creating a batch.
*
* The API tutorial for beginners is available on:
* https://trailhead.salesforce.com/modules/api_basics/units/api_basics_bulk
*
*/
public class SalesforceBulkAPIClient {
private static final Logger logger = LoggerFactory.getLogger(SalesforceBulkAPIClient.class);
/**
* It is the base Url for Bulk API call
*/
private String baseUrl = "/services/async/36.0/job";
public SalesforceBulkAPIClient(String baseUrl) {
this.baseUrl = baseUrl;
}
/**
* This creates a job for processing the records in a batch.
*
* @param accessToken
* @param salesforceObjectType
* @return
* Map with information like status, jobId, etc.
* @throws SalesforceInvalidAccessTokenException
*/
public Map<String, String> createInsertJob(String accessToken, String instanceUrl,
SalesforceObjectType salesforceObjectType)
throws IOException, SalesforceInvalidAccessTokenException {
Map<String, String> responseMap = new HashMap<String, String>();
Map<String, String> body = new HashMap<String, String>();
body.put("operation", "insert");
body.put("object", salesforceObjectType.name());
body.put("contentType", "JSON");
String bodyJson = JSONUtils.toString(body);
String url = instanceUrl + this.baseUrl;
logger.info("=== async url === " + url);
HttpPost httpPost = new HttpPost(url);
CloseableHttpClient httpClient = HttpClients.createDefault();
httpPost.addHeader("X-SFDC-Session", accessToken);
httpPost.addHeader("Content-Type", "application/json" );
httpPost.addHeader("Accept", "application/json" );
StringEntity params = new StringEntity(bodyJson,ContentType.APPLICATION_JSON);
httpPost.setEntity(params);
CloseableHttpResponse response = null;
// first try sending POST request
try {
response = httpClient.execute(httpPost);
} catch (IOException e) {
logger.error("Error sending Post Request to Salesforce. Please check your connection.");
throw e;
}
// if access token has expired, throw the exception
if (response.getStatusLine().getStatusCode() == 401) {
throw new SalesforceInvalidAccessTokenException("Access token invalid...");
}
HttpEntity entity = response.getEntity();
String responseString = null;
try {
responseString = EntityUtils.toString(entity, "UTF-8");
response.close();
} catch (ParseException e) {
logger.info("Cannot parse the response");
throw e;
} catch (IOException e) {
logger.error("Error sending POST Request to Salesforce. Please check your connection.");
throw e;
}
// now, parse the result and construct the map to be returned
try {
JSONObject json = new JSONObject(responseString);
responseMap.put("id", json.getString("id"));
responseMap.put("jobId", json.getString("id"));
responseMap.put("state", json.getString("state"));
responseMap.put("operation", json.getString("operation"));
responseMap.put("object", json.getString("object"));
} catch (JSONException e) {
logger.info("Invalid refresh token...");
throw new IllegalArgumentException("Invalid refresh token. "
+ "Fresh Integration needed for this account!");
}
return responseMap;
}
/**
* It is used to POST all the records using Bulk API
*
* @param accessToken
* It is the access token for the user
* @param jobId
* It is the id for the job
* @param records
* It is the list of map (array of json)
* @return
* Map of response info
*/
public Map<String, String> createBatch(String accessToken, String instanceUrl, String jobId,
List<Map<String, Object>> records ) throws IOException,
SalesforceInvalidAccessTokenException{
Map<String, String> responseMap = new HashMap<String, String>();
String bodyJson = JSONUtils.toString(records);
String url = instanceUrl + this.baseUrl + "/" + jobId + "/batch";
HttpPost httpPost = new HttpPost(url);
CloseableHttpClient httpClient = HttpClients.createDefault();
httpPost.addHeader("X-SFDC-Session", accessToken);
httpPost.addHeader("Content-Type", "application/json" );
httpPost.addHeader("Accept", "application/json" );
StringEntity params = new StringEntity(bodyJson,ContentType.APPLICATION_JSON);
httpPost.setEntity(params);
CloseableHttpResponse response = null;
// first try sending POST request
try {
response = httpClient.execute(httpPost);
} catch (IOException e) {
logger.error("Error sending Post Request to Salesforce. Please check your connection.");
throw e;
}
// if access token has expired, throw the exception
if (response.getStatusLine().getStatusCode() == 401) {
throw new SalesforceInvalidAccessTokenException("Access token invalid...");
}
HttpEntity entity = response.getEntity();
String responseString = null;
try {
responseString = EntityUtils.toString(entity, "UTF-8");
response.close();
} catch (ParseException e) {
logger.info("Cannot parse the response");
throw e;
} catch (IOException e) {
logger.error("Error sending POST Request to Salesforce. Please check your connection.");
throw e;
}
// now, parse the result and construct the map to be returned
try {
JSONObject json = new JSONObject(responseString);
responseMap.put("id", json.getString("id"));
responseMap.put("batchId", json.getString("id"));
responseMap.put("jobId", jobId);
responseMap.put("state", json.getString("state"));
responseMap.put("numberRecordsFailed", Integer.toString(json.getInt("numberRecordsFailed")));
responseMap.put("numberRecordsProcessed", Integer.toString(json.getInt("numberRecordsProcessed")));
} catch (JSONException e) {
logger.info(responseString);
logger.info("JSON Exception...");
throw new IllegalArgumentException("JSON Parse Exception has occured");
}
return responseMap;
}
/**
* It is used to close the job so that it will not accept further batches
*
* @param accessToken
* It is the access token for the authenticated user
* @param jobId
* It is the id of the job
* @return
* @throws IOException
* @throws SalesforceInvalidAccessTokenException
*/
public Map<String, String> closeJob(String accessToken, String instanceUrl, String jobId)
throws IOException, SalesforceInvalidAccessTokenException {
Map<String, String> responseMap = new HashMap<String, String>();
Map<String, String> body = new HashMap<String, String>();
body.put("state", "Closed");
String bodyJson = JSONUtils.toString(body);
String url = instanceUrl + this.baseUrl + "/" + jobId;
HttpPost httpPost = new HttpPost(url);
CloseableHttpClient httpClient = HttpClients.createDefault();
httpPost.addHeader("X-SFDC-Session", accessToken);
httpPost.addHeader("Content-Type", "application/json" );
httpPost.addHeader("Accept", "application/json" );
StringEntity params = new StringEntity(bodyJson,ContentType.APPLICATION_JSON);
httpPost.setEntity(params);
CloseableHttpResponse response = null;
// first try sending POST request
try {
response = httpClient.execute(httpPost);
} catch (IOException e) {
logger.error("Error sending Post Request to Salesforce. Please check your connection.");
throw e;
}
// if access token has expired, throw the exception
if (response.getStatusLine().getStatusCode() == 401) {
throw new SalesforceInvalidAccessTokenException("Access token invalid...");
}
HttpEntity entity = response.getEntity();
String responseString = null;
try {
responseString = EntityUtils.toString(entity, "UTF-8");
response.close();
} catch (ParseException e) {
logger.info("Cannot parse the response");
throw e;
} catch (IOException e) {
logger.error("Error sending POST Request to Salesforce. Please check your connection.");
throw e;
}
// now, parse the result and construct the map to be returned
try {
JSONObject json = new JSONObject(responseString);
responseMap.put("id", json.getString("id"));
responseMap.put("jobId", json.getString("id"));
responseMap.put("state", json.getString("state"));
responseMap.put("operation", json.getString("operation"));
responseMap.put("state", json.getString("state"));
responseMap.put("numberRecordsFailed", Integer.toString(json.getInt("numberRecordsFailed")));
responseMap.put("numberRecordsProcessed", Integer.toString(json.getInt("numberRecordsProcessed")));
} catch (JSONException e) {
logger.info("JSON Exception...");
throw new IllegalArgumentException("JSON Parse Exception has occured");
}
return responseMap;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment