package com.gpc.tams.sync.inventory; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.UsernamePasswordCredentials; import org.apache.commons.httpclient.auth.AuthScope; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.util.CollectionUtils; import com.gpc.tams.config.ServerConfiguration; import com.gpc.tams.consumer.Consumer; import com.gpc.tams.consumer.converter.json.ConsumedServiceMapper; import com.gpc.tams.model.Part; import com.gpc.tams.model.Quantity; import com.gpc.tams.model.common.ActionDTO; import com.gpc.tams.model.request.PartRequest; import com.gpc.tams.repository.common.DataProcessedRegister; import com.gpc.tams.repository.communication.CommunicationUrl; import com.gpc.tams.repository.inventory.Inventory; import com.gpc.tams.repository.inventory.InventoryStock; import com.gpc.tams.repository.reference.RefQueueTable; import com.gpc.tams.repository.rowmapper.InventoryChangeMapper; import com.gpc.tams.sync.AbstractSynchronizer; import com.gpc.tams.util.DateUtil; /** * Class to control sending inventory feed to ESB. * This is run as part of a scheduler defined in taap-servlet.xml. */ public class InventorySynchronizer extends AbstractSynchronizer{ private static final String ESB_SERVICES_URL = "ESB_SERVICES_URL"; @Value("${INVENTORY_SYNC_QUEUE}")private String queue; @Value("${INVENTORY_SYNC_PORT}")private String inventorySyncPort; @Value("${INVENTORY_SYNC_SERVICE}")private String invenotrySyncService; @Value("${INVENTORY_SYNC_USERNAME}")private String invenotrySyncUserName; @Value("${INVENTORY_SYNC_PASSWORD}")private String invenotrySyncPassword; @Value("${INVENTORY_SYNC_DELAY}")private String delay; @Value("${INVENTORY_SYNC_LIMIT}")private String limit; @Autowired private HttpClient inventorySyncHttpClient; @Autowired private ServerConfiguration serverConfig; /** * Does all the preparatory work. * 1. Load communication URL. * 2. Set the URL in parent class. * 3. Load the tableId for queue. * * If anything goes wrong, we are not allowing server to start so that support get * involved quickly and the problem can be corrected sooner rather than later. */ @PostConstruct public void init(){ try{ final CommunicationUrl esbServerURL = communicationURLDAO.find(serverConfig.getString(ESB_SERVICES_URL)); setUrl(esbServerURL.getUrl(), Integer.valueOf(inventorySyncPort), invenotrySyncService); inventorySyncHttpClient.getParams().setAuthenticationPreemptive(true); inventorySyncHttpClient.getState().setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(invenotrySyncUserName, invenotrySyncPassword)); final List tables = referenceDAO.loadQueueTables(); for(final RefQueueTable table : tables){ if(queue.equals(table.getTableName())){ tableId = table.getId(); break; } } }catch(Exception exception){ context.logError(exception, ""); } } /* (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#getDelay() */ @Override protected long getDelay() { return Long.valueOf(delay); } /* (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#getServiceKey() */ @Override protected String getServiceKey() { return "feedInventory"; } /* (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#getRetry() */ @Override public int getRetry() { /* * No point of having a retry here. Because this service is going to run very frequently. */ return 0; } /* * (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#checkForUpdates(com.gpc.tams.repository.common.DataProcessedRegister) */ @Override protected RecordSynchronizationData checkForUpdates(final DataProcessedRegister processedRegister) { final int location = storeProfileDAO.find().getLocation(); /* * Check if store is allowed to send feed. If not, just return back by printing a debug statement. */ if(!inventoryProfileDAO.find(location).isUseInventoryFeed()){ context.logDebug("USE_INVENTORY_FEED switch is off. Not sending feed."); return null; } final int lastProcessedId = processedRegister == null ? (inventoryChangeQueueDAO.findMinimumChangeQueueId(tableId, location) - 1) : processedRegister.getLastPointerProcessed(); /* * We got the last processed id. Now we want to get the set from next record onwards. * Hence we add 1 to lastProcessedId and query database. */ final ListeligibleData = databaseOperationsService.getNextBatchOfInventory( lastProcessedId + 1, location, InventoryChangeMapper.INSTANCE); if(CollectionUtils.isEmpty(eligibleData)){ context.logInfo("** No Data found for Inventory Feed **"); return null; } context.logInfo("*************Data found***************"); /* * We want to control the size of feed from outside. * Query at this point pull 1000 records at a time and limit is * set to 500. We want to give option to support to have increase/decrease * limit of another 500. 0 < 500 < 1000 */ int feedSize = Integer.valueOf(limit); if(feedSize == 0 || feedSize > eligibleData.size()){ feedSize = eligibleData.size(); } /* * Records are ordered by INVENTORY_CHANGE_QUEUE.ID. Hence we take the first record * as the least and last record as the max. */ final int minQueueId = eligibleData.get(0).getInventoryChangeQueue().getId(); final int maxQueueId = eligibleData.get(feedSize - 1).getInventoryChangeQueue().getId(); context.logDebug("InventorySynchronizer: minQueueId= "+minQueueId); context.logDebug("InventorySynchronizer: maxQueueId= "+maxQueueId); /* * For some reason, JIA expects to send deleted records separately. Hence before proceeding * any further we split the records into two sets.One for deleted records and one for the * inserted/updated records. */ final ListavailableData = new ArrayList(); final ListdeletedData = new ArrayList(); for(int index = 0; index < feedSize; index++){ final Inventory data = eligibleData.get(index); if(data.getId() != -1){ availableData.add(data); } else { deletedData.add(data); } } final RecordSynchronizationData recordSynchronizationData = new RecordSynchronizationData(); recordSynchronizationData.setModifiedRecords(availableData); recordSynchronizationData.setDeletedRecords(deletedData); recordSynchronizationData.setFirstPointer(minQueueId); recordSynchronizationData.setLastPointer(maxQueueId); return recordSynchronizationData; } /* * (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#sendModifiedRecords(java.util.List, java.sql.Timestamp) */ @Override protected void sendModifiedRecords(List modifiedData, final Timestamp processedDate) { final PartRequest request = new PartRequest(); request.setHeader(populateHeader()); request.getHeader().setTimestamp(DateUtil.getEpoch(processedDate)); request.setAction(new ActionDTO(){ @Override public Boolean isEdit() { return true; } }); final Listparts = new ArrayList(); for(final Inventory data : modifiedData){ final Part part = new Part(); part.setLineAbbrev(data.getLineAbbrev()); part.setPartNumber(data.getPartNumber()); part.setDescription(data.getDescription()); part.setUnitOfMeasure(data.getRefUnitDesignatorCode()); part.setStandardPackSize(data.getStandardPackage()); /* * I don't want to do it here. But for some reason jolt is not * able to this work for us. We can analyze that later. */ part.set("dataAsOfTimestamp", request.getHeader().getTimestamp()); final InventoryStock stock = data.getInventoryStock(); final Quantity quantity = new Quantity(); quantity.setOnHand(stock.getOnHand()); quantity.setAvailable(stock.getOnHand().subtract(stock.getWorkInProgress())); quantity.setMinSaleQuantity(stock.getMinimumSaleQuantity().setScale(0, BigDecimal.ROUND_HALF_UP)); quantity.setMinStockedQuantity(stock.getMinimumStockQuantity()); quantity.setMaxStockedQuantity(stock.getMaximumStockQuantity()); quantity.setOnOrder(stock.getOnOrder()); quantity.setDaysOnOrder(request.getHeader().getTimestamp() + (stock.getDaysOnOrder() * 24 * 60 * 60)); part.setQuantity(quantity); parts.add(part); } if(!CollectionUtils.isEmpty(parts)){ context.logDebug("processAvailableData: Number of parts = "+parts.size()); request.setPart(parts.toArray(new Part[parts.size()])); new Consumer(new ConsumedServiceMapper(getServiceKey(), validateAndMutateDelegator), inventorySyncHttpClient).post(getUrl(), request, Object.class); } } /* * (non-Javadoc) * @see com.gpc.tams.sync.AbstractSynchronizer#sendDeletedRecords(java.util.List, java.sql.Timestamp) */ protected void sendDeletedRecords(List deletedData, final Timestamp processedDate) { context.logDebug("processDeletedData: processedDate= "+processedDate); final PartRequest request = new PartRequest(); request.setHeader(populateHeader()); request.getHeader().setTimestamp(DateUtil.getEpoch(processedDate)); request.setAction(new ActionDTO(){ @Override public Boolean isDelete() { return true; } }); final Listparts = new ArrayList(); for(final Inventory data : deletedData){ final Part part = new Part(); part.setLineAbbrev(data.getLineAbbrev()); part.setPartNumber(data.getPartNumber()); part.setDescription(data.getDescription()); parts.add(part); } if(!CollectionUtils.isEmpty(parts)){ context.logDebug("processDeletedData: Number of parts = "+parts.size()); request.setPart(parts.toArray(new Part[parts.size()])); new Consumer(new ConsumedServiceMapper(getServiceKey(), validateAndMutateDelegator), inventorySyncHttpClient).post(getUrl(), request, Object.class); } } }