Skip to content

Instantly share code, notes, and snippets.

@heluvaguy
Last active February 1, 2018 08:07
Show Gist options
  • Save heluvaguy/2cea86150d3e3bcd004f9288ff9b396b to your computer and use it in GitHub Desktop.
Save heluvaguy/2cea86150d3e3bcd004f9288ff9b396b to your computer and use it in GitHub Desktop.
DCN Oracle
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>Master configuration file for CD7 Application </comment>
<entry key="db.type">Oracle</entry>
<entry key="db.url">jdbc:oracle:thin:</entry>
<entry key="db.driver">oracle.jdbc.OracleDriver</entry>
<entry key="db.servername">192.168.1.30</entry>
<entry key="db.portnumber">1521</entry>
<entry key="db.databasename">ORCL</entry>
<entry key="db.username">scott</entry>
<entry key="db.password">tiger</entry>
<entry key="db.instancename"></entry>
<entry key="db.addparam"></entry>
<entry key="db.tablename">scott.dept</entry>
<entry key="db.columnnames">deptno,dname,loc</entry>
<entry key="db.notifications.localport">12345</entry>
<entry key="db.notifications.ignotedelete">false</entry>
<entry key="db.notifications.ignoteupdate">false</entry>
<entry key="db.notifications.ignoteinsert">false</entry>
<entry key="streams.hostname">streams-server.findabilitysciences.com</entry>
<entry key="streams.port">10080</entry>
</properties>
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="5 minutes">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="SIFT" class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator>
<key>jobName</key>
<defaultValue>ROOT</defaultValue>
</discriminator>
<sift>
<appender name="${jobName}-Logger"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/home/fs/projects/cd7_services/lehigh/cd7_scheduler/old_logs/${jobName}-status.log</file>
<append>true</append>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date | %-5level | %logger{0} | %caller{1} | %message%n
</pattern>
</layout>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/home/fs/projects/cd7_services/lehigh/cd7_scheduler/old_logs/${jobName}-status.%d{yyyy-MM-dd-HH}.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
</appender>
</sift>
</appender>
<appender name="EMAIL" class="ch.qos.logback.classic.net.SMTPAppender">
<evaluator class="ch.qos.logback.classic.boolex.OnMarkerEvaluator">
<marker>STATUS</marker>
<marker>FAILURE</marker>
</evaluator>
<asynchronousSending>false</asynchronousSending>
<smtpHost>smtp.mandrillapp.com</smtpHost>
<smtpPort>587</smtpPort>
<SSL>true</SSL>
<username>dev.findabilitysciences@gmail.com</username>
<password>fy87nvRc5DerXrn4P6BxYg</password>
<to>vamol@findabilitysciences.com</to>
<from>logger@cd7.com</from>
<subject>[cd7-scheduler] %p in %logger</subject>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date | %-5level | %logger{0} | %caller{1} | %message%n
</pattern>
</layout>
<cyclicBufferTracker class="ch.qos.logback.core.spi.CyclicBufferTracker">
<bufferSize>1</bufferSize>
</cyclicBufferTracker>
</appender>
<logger name="org.quartz" level="ERROR"/>
<logger name="org.twitter4j" level="ERROR"/>
<logger name="org.http" level="ERROR"/>
<root level="debug">
<appender-ref ref="STDOUT" />
<!--<appender-ref ref="SIFT" />-->
<!--<appender-ref ref="EMAIL" />-->
</root>
</configuration>
package com.gbm.tools;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.sql.*;
import java.util.Properties;
import com.gbm.util.SystemSettings;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OracleDCN {
static final Logger logger = LoggerFactory.getLogger(OracleDCN.class);
static OracleConnection conn = null;
static DatabaseChangeRegistration dcr = null;
static SystemSettings systemSettings = null;
public static void main(String[] args) {
if (args.length != 1) {
logger.error("Configuration file is not supplied");
}
systemSettings = new SystemSettings(args[0]);
OracleDCN oracleDCN = new OracleDCN();
attachShutdownHook();
try {
oracleDCN.run();
} catch (Exception ex) {
logger.error("Error ", ex);
}
}
private void run() {
try {
conn = connect();
} catch (SQLException ex) {
logger.error("unable to get connection", ex);
System.exit(1);
}
Properties prop = new Properties();
prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");
prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, systemSettings.isM_ignoreInsert());
prop.setProperty(OracleConnection.DCN_IGNORE_DELETEOP, systemSettings.isM_ignoreDelete());
prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, systemSettings.isM_ignoreUpdate());
prop.setProperty(OracleConnection.NTF_LOCAL_TCP_PORT, systemSettings.getM_localPort());
try {
dcr = conn.registerDatabaseChangeNotification(prop);
dcr.addListener(new DatabaseChangeListener() {
public void onDatabaseChangeNotification(DatabaseChangeEvent dce) {
//System.out.println("Changed row id : "+dce.getTableChangeDescription()[0].getRowChangeDescription()[0].getRowid().stringValue());
//System.out.println(dce);
//System.out.println(dce.getEventType());
//System.out.println(dce.getDatabaseName());
//System.out.println(dce.getQueryChangeDescription());
for (QueryChangeDescription queryChangeDescription : dce.getQueryChangeDescription()) {
for (TableChangeDescription tableChangeDescription : queryChangeDescription.getTableChangeDescription()) {
//System.out.println(tableChangeDescription.getTableName());
String tableName = tableChangeDescription.getTableName();
for (RowChangeDescription rowChangeDescription : tableChangeDescription.getRowChangeDescription()) {
//System.out.println(rowChangeDescription.getRowid().stringValue());
//System.out.println(rowChangeDescription.getRowOperation());
String rowId = rowChangeDescription.getRowid().stringValue();
String operation = rowChangeDescription.getRowOperation().name();
getRecord(tableName, rowId, operation);
}
}
}
}
});
Statement stmt = conn.createStatement();
((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);
ResultSet rs = stmt.executeQuery("select * from " + systemSettings.getM_tableName());
while (rs.next()) {
}
rs.close();
stmt.close();
logger.info("Registration Complete");
} catch (SQLException ex) {
if (conn != null) {
try {
conn.unregisterDatabaseChangeNotification(dcr);
conn.close();
} catch (SQLException e) {
logger.error("SQLException ", e);
}
}
}
}
private void getRecord(String tableName, String rowId, String operation) {
Connection connection = null;
Statement stmt = null;
ResultSet rs = null;
try {
connection = connect();
} catch (SQLException ex) {
logger.error("unable to get connection", ex);
System.exit(1);
}
try {
stmt = connection.createStatement();
rs = stmt.executeQuery("select " + systemSettings.getM_columnName() + " from " + systemSettings.getM_tableName() + " where rowid = '" + rowId + "'");
ResultSetMetaData resultSetMetaData = rs.getMetaData();
StringBuffer columnData = new StringBuffer();
while (rs.next()) {
for (int counter = 1; counter <= resultSetMetaData.getColumnCount(); counter++) {
if (counter != 1) {
columnData.append(",");
}
if (resultSetMetaData.getColumnType(counter) == Types.BIGINT
|| resultSetMetaData.getColumnType(counter) == Types.INTEGER
|| resultSetMetaData.getColumnType(counter) == Types.FLOAT
|| resultSetMetaData.getColumnType(counter) == Types.DOUBLE
|| resultSetMetaData.getColumnType(counter) == Types.DECIMAL
|| resultSetMetaData.getColumnType(counter) == Types.NUMERIC
) {
columnData.append(rs.getString(counter));
} else {
columnData.append("\"").append(rs.getString(counter)).append("\"");
}
}
System.out.println(columnData);
if (columnData.length() > 0) {
columnData.append(",").append(operation);
sendToPort(columnData.toString());
}
}
rs.close();
logger.info("Data Fetched");
} catch (SQLException ex) {
logger.error("SQLException ->", ex);
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("Error Closing statement while fetching records ->", e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("Error Closing connection while fetching records ->", e);
}
}
}
}
OracleConnection connect() throws SQLException {
OracleDriver dr = new OracleDriver();
Properties prop = new Properties();
prop.setProperty("user", systemSettings.getM_serverUserName());
prop.setProperty("password", systemSettings.getM_serverPassword());
return (OracleConnection) dr.connect(systemSettings.getLocalConnectionString(), prop);
}
private static void attachShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
stopScheduler(null);
}
});
}
public static void stopScheduler(String arge[]) {
try {
logger.info("Unregistered Notifications");
if (conn != null) {
conn.unregisterDatabaseChangeNotification(dcr);
conn.close();
}
} catch (SQLException ex) {
ex.printStackTrace();
}
}
public void sendToPort(String data) {
Socket socket = null;
OutputStream outstream = null;
PrintWriter out = null;
try {
socket = new Socket(systemSettings.getM_streamsServerHost(), systemSettings.getM_streamsServerPort());
outstream = socket.getOutputStream(); // have to insert the string
out = new PrintWriter(outstream);
out.print(data);
out.flush();
out.close();
} catch (UnknownHostException e) {
logger.error("UnknownHostException ->", e);
} catch (IOException e) {
logger.error("IOException ->", e);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.error("IOException Error closing socket. ->", e);
}
}
if (outstream != null) {
try {
outstream.close();
} catch (IOException e) {
logger.error("Closing Socket Stream. ->", e);
}
}
}
}
}
package com.gbm.util;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gbm.logger.LoggerMarkers;
import java.util.InvalidPropertiesFormatException;
import java.util.Properties;
public class PropertiesManager
{
static final Logger logger = LoggerFactory.getLogger(PropertiesManager.class);
public static String getProperty(String file, String key, String defaultValue)
{
Properties props = new Properties();
FileInputStream fis;
try
{
fis = new FileInputStream(file);
props.loadFromXML(fis);
}
catch(FileNotFoundException e)
{
logger.error(LoggerMarkers.errorMarker, "Error occured while reading file :: PropertiesManager.->", e);
}
catch(InvalidPropertiesFormatException e)
{
logger.error(LoggerMarkers.errorMarker, "Error occured due to Invalid property format :: PropertiesManager.->", e);
}
catch(IOException e)
{
logger.error(LoggerMarkers.errorMarker, "Error occured while executing getProperty() :: PropertiesManager.->", e);
}
String retValue = props.getProperty(key);
return (retValue != null ? retValue : defaultValue);
}
}
package com.gbm.util;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import javax.imageio.ImageReader;
import javax.imageio.stream.ImageInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import com.gbm.logger.LoggerMarkers;
public class SystemSettings {
static final Logger logger = LoggerFactory.getLogger(SystemSettings.class);
private String m_serverType = null;
private String m_serverURL = null;
private String m_serverDriver = null;
private String m_serverName = null;
private int m_serverPort = -1;
private String m_serverDB = null;
private String m_serverUserName = null;
private String m_serverPassword = null;
private String m_serverAdditionalParameter = null;
private String m_instanceName = null;
private String m_tableName = null;
private String m_columnName = null;
private String m_localPort = "-1";
private String m_ignoreDelete = "false";
private String m_ignoreUpdate = "false";
private String m_ignoreInsert = "false";
private String m_streamsServerHost=null;
private int m_streamsServerPort=-1;
public SystemSettings(String filePath) {
if (!(new File(filePath).exists())) {
logger.error("Configuration File Not Found procedureName = SystemSettings()");
} else {
// ======== User DB settings =======
m_serverType = PropertiesManager.getProperty(filePath, "db.type", null);
m_serverURL = PropertiesManager.getProperty(filePath, "db.url", null);
m_serverDriver = PropertiesManager.getProperty(filePath, "db.driver", null);
m_serverName = PropertiesManager.getProperty(filePath, "db.servername", null);
m_serverPort = Integer.parseInt(PropertiesManager.getProperty(filePath, "db.portnumber", "0"));
m_serverDB = PropertiesManager.getProperty(filePath, "db.databasename", null);
m_serverUserName = PropertiesManager.getProperty(filePath, "db.username", null);
m_serverPassword = PropertiesManager.getProperty(filePath, "db.password", null);
m_instanceName = PropertiesManager.getProperty(filePath, "db.instancename", null);
m_serverAdditionalParameter = PropertiesManager.getProperty(filePath, "db.addparam", null);
m_tableName = PropertiesManager.getProperty(filePath, "db.tablename", null);
m_columnName = PropertiesManager.getProperty(filePath, "db.columnnames", null);
m_localPort = (PropertiesManager.getProperty(filePath, "db.notifications.localport", "12345"));
m_ignoreDelete = (PropertiesManager.getProperty(filePath, "db.notifications.ignotedelete", "false"));
m_ignoreUpdate = (PropertiesManager.getProperty(filePath, "db.notifications.ignoteupdate", "false"));
m_ignoreInsert = (PropertiesManager.getProperty(filePath, "db.notifications.ignoteinsert", "false"));
m_streamsServerHost = PropertiesManager.getProperty(filePath, "streams.hostname", null);
m_streamsServerPort = Integer.parseInt(PropertiesManager.getProperty(filePath, "streams.port", "-1"));
}
}
public String getM_serverURL() {
return m_serverURL;
}
public void setM_serverURL(String m_serverURL) {
this.m_serverURL = m_serverURL;
}
public String getM_serverDriver() {
return m_serverDriver;
}
public void setM_serverDriver(String m_serverDriver) {
this.m_serverDriver = m_serverDriver;
}
public String getM_serverName() {
return m_serverName;
}
public void setM_serverName(String m_serverName) {
this.m_serverName = m_serverName;
}
public int getM_serverPort() {
return m_serverPort;
}
public void setM_serverPort(int m_serverPort) {
this.m_serverPort = m_serverPort;
}
public String getM_serverDB() {
return m_serverDB;
}
public void setM_serverDB(String m_serverDB) {
this.m_serverDB = m_serverDB;
}
public String getM_serverUserName() {
return m_serverUserName;
}
public void setM_serverUserName(String m_serverUserName) {
this.m_serverUserName = m_serverUserName;
}
public String getM_serverPassword() {
return m_serverPassword;
}
public void setM_serverPassword(String m_serverPassword) {
this.m_serverPassword = m_serverPassword;
}
public String getM_serverType() {
return m_serverType;
}
public void setM_serverType(String m_serverType) {
this.m_serverType = m_serverType;
}
public String getM_serverAdditionalParameter() {
return m_serverAdditionalParameter;
}
public void setM_serverAdditionalParameter(
String m_serverAdditionalParameter) {
this.m_serverAdditionalParameter = m_serverAdditionalParameter;
}
public String getM_instanceName() {
return m_instanceName;
}
public void setM_instanceName(String m_instanceName) {
this.m_instanceName = m_instanceName;
}
public String getM_tableName() {
return m_tableName;
}
public void setM_tableName(String m_tableName) {
this.m_tableName = m_tableName;
}
public String getM_columnName() {
return m_columnName;
}
public void setM_columnName(String m_columnName) {
this.m_columnName = m_columnName;
}
public String getM_localPort() {
return m_localPort;
}
public void setM_localPort(String m_localPort) {
this.m_localPort = m_localPort;
}
public String isM_ignoreDelete() {
return m_ignoreDelete;
}
public void setM_ignoreDelete(String m_ignoreDelete) {
this.m_ignoreDelete = m_ignoreDelete;
}
public String isM_ignoreUpdate() {
return m_ignoreUpdate;
}
public void setM_ignoreUpdate(String m_ignoreUpdate) {
this.m_ignoreUpdate = m_ignoreUpdate;
}
public String isM_ignoreInsert() {
return m_ignoreInsert;
}
public void setM_ignoreInsert(String m_ignoreInsert) {
this.m_ignoreInsert = m_ignoreInsert;
}
public String getM_streamsServerHost() {
return m_streamsServerHost;
}
public void setM_streamsServerHost(String m_streamsServerHost) {
this.m_streamsServerHost = m_streamsServerHost;
}
public int getM_streamsServerPort() {
return m_streamsServerPort;
}
public void setM_streamsServerPort(int m_streamsServerPort) {
this.m_streamsServerPort = m_streamsServerPort;
}
public boolean isReachable() {
logger.info("Inside isReachable() :: SystemSettings.");
if ((m_serverName != null) && !m_serverName.trim().equals("")
&& (m_serverURL != null) && !m_serverURL.trim().equals("")
&& (m_serverPort != -1) && (m_serverDB != null)
&& !m_serverDB.trim().equals("") && (m_serverUserName != null)
&& !m_serverUserName.trim().equals("")
&& (m_serverDriver != null)
&& !m_serverDriver.trim().equals("")
&& (m_serverPassword != null)
&& !m_serverPassword.trim().equals("")) {
Connection connection = null;
try {
Class.forName(m_serverDriver);
String connectionString = getConnectionString(m_serverType,
m_serverURL, m_serverName, m_serverPort, m_serverDB,
m_serverAdditionalParameter);
connection = DriverManager.getConnection(connectionString,
m_serverUserName, m_serverPassword);
if (connection.getMetaData().getDatabaseProductName()
.equalsIgnoreCase(m_serverType)) {
return true;
}
} catch (SQLException e) {
logger.error(LoggerMarkers.errorMarker, " :: SystemSettings SQLException -->", e);
} catch (ClassNotFoundException e) {
logger.error(LoggerMarkers.errorMarker, " :: SystemSettings ClassNotFoundException -->", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error(LoggerMarkers.errorMarker, "Error occured while closing Connection :: SystemSettings -->", e);
}
}
}
}
return false;
}
public Connection getLocalConnection() {
logger.info("Inside getLocalConnection() :: SystemSettings.");
if ((m_serverName != null) && !m_serverName.trim().equals("")
&& (m_serverURL != null) && !m_serverURL.trim().equals("")
&& (m_serverPort != -1) && (m_serverDB != null)
&& !m_serverDB.trim().equals("") && (m_serverUserName != null)
&& !m_serverUserName.trim().equals("")
&& (m_serverDriver != null)
&& !m_serverDriver.trim().equals("")
&& (m_serverPassword != null)
&& !m_serverPassword.trim().equals("")) {
try {
Class.forName(m_serverDriver);
String connectionString = getConnectionString(m_serverType,
m_serverURL, m_serverName, m_serverPort, m_serverDB,
m_serverAdditionalParameter);
Connection connection = DriverManager.getConnection(
connectionString, m_serverUserName, m_serverPassword);
return connection;
} catch (SQLException e) {
logger.error(LoggerMarkers.errorMarker, " SQLException :: SystemSettings ->", e);
} catch (ClassNotFoundException e) {
logger.error(LoggerMarkers.errorMarker, " ClassNotFoundException :: SystemSettings ->", e);
}
}
return null;
}
public String getLocalConnectionString() {
return getConnectionString(m_serverType,
m_serverURL, m_serverName, m_serverPort, m_serverDB,
m_serverAdditionalParameter);
}
private String getConnectionString(String serverType, String serverURL,
String serverName, int serverPort, String serverDb,
String additionalParametres) {
logger.info("Inside getConnectionString() :: SystemSettings.");
if (serverType.equalsIgnoreCase("MySQL"))
return serverURL + serverName + ":" + serverPort + "/" + serverDb
+ additionalParametres;
else if (serverType.equalsIgnoreCase("Microsoft SQL Server"))
return serverURL + serverName + ":" + serverPort + ";databasename="
+ serverDb + additionalParametres;
else if (serverType.equalsIgnoreCase("Oracle"))
return serverURL + "@" + serverName + ":" + serverPort + ":"
+ serverDb + additionalParametres;
else
return null;
}
public void printSettings() {
logger.info("Inside printSettings() :: SystemSettings.");
StringBuilder builder = new StringBuilder();
builder.append("1. Setting For FS Server ").append("\n");
builder.append("\tServer URL = " + m_serverURL).append(
"\n");
builder.append("\tServer Driver = " + m_serverDriver).append(
"\n");
builder.append("\tServer Name = " + m_serverName).append(
"\n");
builder.append("\tServer Instance Name = " + m_instanceName).append(
"\n");
builder.append("\tServer Port = " + m_serverPort).append(
"\n");
builder.append("\tServer Database = " + m_serverDB)
.append("\n");
builder.append(
"\tServer Additional Param = " + m_serverAdditionalParameter)
.append("\n");
logger.info(builder.toString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment