package com.genpt.nsight.v4.service.inventory; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import com.genpt.nsight.NSightCache; import com.genpt.nsight.config.NSightProperties; import com.genpt.nsight.v1.SplunkLogger; import com.genpt.nsight.v1.model.BaseSite; import com.genpt.nsight.v1.model.PartAttributes; import com.genpt.nsight.v1.model.Quantity; import com.genpt.nsight.v1.model.Tracking; import com.genpt.nsight.v1.model.dto.SiteDTO; import com.genpt.nsight.v4.model.Part; import com.genpt.nsight.v4.model.ProductAvailabilityRequest; import com.genpt.nsight.v4.model.Site; import com.genpt.nsight.v4.model.inventory.availability.InventoryResponseMultiProduct; import com.genpt.nsight.v4.model.inventory.availability.InventoryResponseV1; import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.*; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.scheduler.Scheduler; @Service("AsyncInventoryComparator") @Slf4j public class AsyncInventoryComparator { @Autowired NSightCache nSightCache; @Autowired NSightProperties nSightProperties; private static final Logger LOGGER = LoggerFactory.getLogger(AsyncInventoryComparator.class); private ExecutorService asyncProcessAndCompareExecutor; private Tracking tracking = new Tracking(); @Value("${inventory.availabilityServiceThreads}") private int asyncThreads; private Scheduler scheduler; private Scheduler comparatorScheduler; @PostConstruct public void init(){ asyncProcessAndCompareExecutor = new ThreadPoolExecutor(5, asyncThreads, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000), new ThreadFactoryBuilder().setNameFormat("comparator-thread-%d").build(), (r, executor) -> { throw new RejectedExecutionException("Comparison rejected from queue");}); } @PreDestroy public void tearDown() { asyncProcessAndCompareExecutor.shutdown(); } public Map> processGCPFutures(ProductAvailabilityRequest productAvailabilityRequest, List listResponses, SplunkLogger splunkLogger){ Map> gcpInventoryMap = listResponses.stream() .flatMap(inventoryResponseV1 -> processInventoryResponsesFromBT(inventoryResponseV1, splunkLogger, productAvailabilityRequest).stream()) .collect(groupingBy(p -> p.getPartNumber() + "-" + p.getLineAbbrev())); return gcpInventoryMap; } public Map> processGCPFuturesMultiProduct(ProductAvailabilityRequest productAvailabilityRequest, InventoryResponseMultiProduct inventoryResponseMultiProduct, SplunkLogger splunkLogger){ Map> gcpInventoryMap = inventoryResponseMultiProduct.getParts().stream() .flatMap(inventoryResponseV1 -> processInventoryResponsesFromBT(inventoryResponseV1, splunkLogger, productAvailabilityRequest).stream()) .collect(groupingBy(p -> p.getPartNumber() + "-" + p.getLineAbbrev())); return gcpInventoryMap; } public void transfromGCPFutures(Map> nsightInventoryMap, ListenableFuture> gcpInventoryFutures, com.genpt.nsight.v4.model.ProductAvailabilityRequest request, boolean enableCompare) { ListenableFuture> future=gcpInventoryFutures; Futures.transform(future,responseList -> processAndCompareGCPFutures(nsightInventoryMap, request, responseList,enableCompare), asyncProcessAndCompareExecutor); } public void transfromGCPFuturesMultiProduct(Map> nsightInventoryMap, ListenableFuture gcpInventoryFutures, com.genpt.nsight.v4.model.ProductAvailabilityRequest request, boolean enableCompare) { ListenableFuture future=gcpInventoryFutures; Futures.transform(future,responseList -> processAndCompareGCPFutures(nsightInventoryMap, request, responseList.getParts(),enableCompare), asyncProcessAndCompareExecutor); } public Map> getGCPInventory(ListenableFuture> gcpInventoryFutures, ProductAvailabilityRequest request) { ListenableFuture> future=gcpInventoryFutures; try { return Futures.transform(future,responseList -> getGCPInventoryFromFutures( request, responseList), asyncProcessAndCompareExecutor).get(); } catch (Exception e) { LOGGER.error(" Error processing getGCPInventory", e.getMessage()); } return null; } public Map> getGCPInventoryMultiProduct(ListenableFuture gcpInventoryFutures, ProductAvailabilityRequest request) { ListenableFuture future=gcpInventoryFutures; try { return Futures.transform(future,responseList -> getGCPInventoryFromFutures( request, responseList.getParts()), asyncProcessAndCompareExecutor).get(); } catch (Exception e) { LOGGER.error(" Error processing getGCPInventoryMultiProduct", e.getMessage()); } return null; } public Map> processAndCompareGCPFutures(Map> nsightInventoryMap, com.genpt.nsight.v4.model.ProductAvailabilityRequest productAvailabilityRequest, List listResponses, boolean enableCompare) { SplunkLogger splunkLogger = new SplunkLogger(productAvailabilityRequest.getHeaderRequest()); try { long start = System.currentTimeMillis(); //process the GCP futures and get the inventory map Map> gcpInventoryMap = processGCPFutures(productAvailabilityRequest,listResponses,splunkLogger); //compare the responses if(enableCompare) compareResponses(gcpInventoryMap,nsightInventoryMap,splunkLogger); else LOGGER.info(splunkLogger.toString() + " Comparator: Processed GCP Responses and Skipped compareResponses"); return gcpInventoryMap; }catch(Exception e){ LOGGER.error(splunkLogger.toString() + " Error processing response", e.getMessage()); } return null; } public Map> getGCPInventoryFromFutures(com.genpt.nsight.v4.model.ProductAvailabilityRequest productAvailabilityRequest, List listResponses) { SplunkLogger splunkLogger = new SplunkLogger(productAvailabilityRequest.getHeaderRequest()); try { long start = System.currentTimeMillis(); //process the GCP futures and get the inventory map Map> gcpInventoryMap = processGCPFutures(productAvailabilityRequest,listResponses,splunkLogger); return gcpInventoryMap; }catch(Exception e){ LOGGER.error(splunkLogger.toString() + " Error processing response {}", e.getMessage()); } return null; } public void compareResponsesAsync(Map> gcpInventoryMap, Map> nsightInventoryMap, SplunkLogger splunkLogger){ asyncProcessAndCompareExecutor.submit(() -> { compareResponses(gcpInventoryMap,nsightInventoryMap,splunkLogger); }); LOGGER.info(splunkLogger.toString() + " Comparator: Finished compareResponsesAsync"); } public void compareResponses(Map> gcpInventoryMap, Map> nsightInventoryMap, SplunkLogger splunkLogger){ long start = System.currentTimeMillis(); Set nsightKeys = nsightInventoryMap.keySet(); Set gcpKeys = gcpInventoryMap.keySet(); Set keysOnlyInNsight = Sets.difference(nsightKeys,gcpKeys); //log those entries that are in NSight and not in GCP if(!keysOnlyInNsight.isEmpty()) LOGGER.info(splunkLogger.toString(), "Comparator: Following Parts were not found in GCP {} ", String.join(",", new ArrayList<>(keysOnlyInNsight))); // and now compare the common entries which is parts that exist in both Set mapCommonKeys = Sets.intersection(nsightKeys,gcpKeys); Map> nsightMapPartEntriesInGCP = nsightInventoryMap .entrySet() .stream() .filter(stringListEntry -> mapCommonKeys. contains(stringListEntry.getKey())) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); Map> finalGcpInventoryMap = gcpInventoryMap; //iterate over the nsight and GCP common entries which is an entry per product List logStatements = nsightMapPartEntriesInGCP .entrySet().stream().map(nsightEntry -> { String partKey = nsightEntry.getKey(); List gcpSitesWithPart = finalGcpInventoryMap.get(partKey); List nsightSitesWithPart = nsightEntry.getValue(); Map> gcpMapSiteByAvailability = gcpSitesWithPart.stream() .collect(groupingBy(part -> part.getSite().getSiteId())); Map> nsightMapSiteByAvailability = nsightSitesWithPart.stream() .collect(groupingBy(part -> part.getSite().getSiteId())); // now have to compare the site quantities for each site nsightMapSiteByAvailability.entrySet().forEach(nsightSiteMap -> { String siteKey = nsightSiteMap.getKey(); if(gcpMapSiteByAvailability.containsKey(siteKey)) { Part gcpPartAvailability = gcpMapSiteByAvailability.get(siteKey).get(0); Part nsightPartAvailability = nsightSiteMap.getValue().get(0); Optional nsightSite = Optional.ofNullable(nsightPartAvailability.getSite()); long nsightDataAsOfTS = nsightSite .flatMap(site->Optional.ofNullable(site.getSourceSystemDataAsOfDate())) .orElse(0L); Optional gcpSite = Optional .ofNullable(gcpPartAvailability.getSite()); long gcpDataAsOfTS = gcpSite .flatMap(site->Optional.ofNullable(site.getSourceSystemDataAsOfDate())) .orElse(0L); if (gcpDataAsOfTS != nsightDataAsOfTS) LOGGER.info(splunkLogger.toString() + " Comparator: Difference in Nsight dataAsOfTs {} and GCP sourceDataAsOfTs {} for Site: {} and Part: {}", nsightDataAsOfTS, gcpDataAsOfTS, siteKey, partKey); Optional gcpQuantity = gcpSite.map(BaseSite::getQuantity); Optional nsightQuantity = nsightSite.map(BaseSite::getQuantity); if (gcpQuantity.isPresent() && nsightQuantity.isPresent()) { //for now compare the available quantities Quantity quantityFromNS = nsightQuantity.get(); Quantity quantityFromGCP = gcpQuantity.get(); if (quantityFromNS.getAvailable().compareTo(quantityFromGCP.getAvailable()) != 0) LOGGER.info(splunkLogger.toString() + " Comparator: Difference in Nsight {} and GCP Available Quantities {} for Site: {} and Part: {}", quantityFromNS.getAvailable(), quantityFromGCP.getAvailable(), siteKey, partKey); else LOGGER.info(splunkLogger.toString() + " Comparator: No Difference in Nsight {} and GCP Available Quantities {} for Site: {} and Part: {}", quantityFromNS.getAvailable(), quantityFromGCP.getAvailable(), siteKey, partKey); } else LOGGER.info(splunkLogger.toString() + " Comparator: No GCP Quantities for Site: {} and Part: {} ", siteKey, partKey); } else LOGGER.info(splunkLogger.toString() + " Comparator: No GCP/NSight Availability for Site: {} and Part: {} ", siteKey, partKey); }); return splunkLogger.toString() +"Comparator: Done for Part: "+ partKey; }).collect(toList()); logStatements.forEach(s -> LOGGER.info(splunkLogger.toString() + s)); LOGGER.info(splunkLogger.toString() + " Comparator: Finished Comparing All Availability responses in {} ms", System.currentTimeMillis() - start); } private List processInventoryResponsesFromBT(InventoryResponseV1 inventoryResponseV1, SplunkLogger splunkLogger, ProductAvailabilityRequest productAvailabilityRequest) { List parts= new ArrayList<>(); InventoryResponseV1 finalInventoryResponseV = inventoryResponseV1; inventoryResponseV1.getQuantities() .forEach(quantityDetailsV1 -> {//for each location for that part String siteIdentifier = nSightCache.getAliasToSiteIdMap().get( quantityDetailsV1.getLocationIdentifier().getName() + "|" + quantityDetailsV1.getLocationIdentifier().getValue()); LOGGER.debug(splunkLogger.toString() + "Async GCP Response, found site {} with part {} {} ", siteIdentifier, finalInventoryResponseV.getProductCd(), finalInventoryResponseV.getProductNbr()); if (quantityDetailsV1.getAvailQty() == null && quantityDetailsV1.getOnHandQty() == null) {//check if both values LOGGER.warn(splunkLogger.toString() + " Available and OnHand Quantity is null for site : {} Product Cd {} Product Nbr {}", siteIdentifier, finalInventoryResponseV.getProductCd(), finalInventoryResponseV.getProductNbr()); return; } SiteDTO currentLocation = nSightCache.getSiteIdToSiteMap().get(siteIdentifier); if (siteIdentifier != null && currentLocation != null) {// site id could be null if the site is inactive, remove it? //let's populate a Set of sites where we have found Inventory //availabilityServiceContextV4.getSitesWithInvemtory().add(siteIdentifier); Part part = new Part(); part.setPartNumber(finalInventoryResponseV.getProductNbr()); part.setLineAbbrev(finalInventoryResponseV.getProductCd()); com.genpt.nsight.v4.model.Site site = new com.genpt.nsight.v4.model.Site(); part.setSite(site); site.setSiteType(quantityDetailsV1.getLocationIdentifier().getType()); //need to set the site identifier too site.setSiteId(siteIdentifier); //need to add SiteIdentifierList too if (nSightCache.getSiteIdToSiteMap().get(siteIdentifier) != null) { site.setSiteIdentifiers( nSightCache.getSiteIdToSiteMap().get(siteIdentifier) .getSiteIdentifierList()); } //for now only for B2B clients if (nSightProperties.getB2bServices().contains( productAvailabilityRequest.getHeaderRequest().getRequestor() .getRequestorApplication())) { if (currentLocation.getStoreAlias() != null) { site.setStoreAlias(currentLocation.getStoreAlias()); } } //availabilityServiceContextV4.getSiteWithPartsAvailable().add(siteIdentifier); BigDecimal availableQuantity = null; if (quantityDetailsV1.getAvailQty() != null) { availableQuantity = quantityDetailsV1.getAvailQty(); } //if (!availabilityServiceContextV4.isAggregate()) {//not the aggregate service if (true) {//not the aggregate service if (quantityDetailsV1.getDataAsOfTS() != 0) { site.setSourceSystemDataAsOfDate(quantityDetailsV1.getSourceDataAsOfTS()); } site.setTracking(tracking); Quantity quantity = new Quantity(); if (quantityDetailsV1.getOnHandQty() != null) { quantity.setOnHand(quantityDetailsV1.getOnHandQty()); } if (quantityDetailsV1.getAvailQty() != null) { quantity.setAvailable(quantityDetailsV1.getAvailQty()); } else { quantity.setAvailable(quantity.getOnHand()); } if (quantityDetailsV1.getOnOrdQty() != null) { quantity.setOnOrder(quantityDetailsV1.getOnOrdQty().intValue()); } quantity.setMinimumSellQuantity(quantityDetailsV1.getMinSellQty()); quantity.setMinStockedQuantity(quantityDetailsV1.getMinStockQty()); quantity.setMaxStockedQuantity(quantityDetailsV1.getMaxStockQty()); quantity.setIncrementalSellQuantity(quantityDetailsV1.getIncrSellQty()); if (quantityDetailsV1.getOnOrdDt() != null) { quantity.setOnOrderDate(quantityDetailsV1.getOnOrdDt()); } quantity.setInTransit(quantityDetailsV1.getInTransQty()); if (quantityDetailsV1.getInTransDt() != null) { quantity.setInTransitDate(quantityDetailsV1.getInTransDt()); } site.setQuantity(quantity); PartAttributes partAttributes = new PartAttributes(); if (quantityDetailsV1.getProductAttributes() != null) { partAttributes.setPartDescription( quantityDetailsV1.getProductAttributes().getDesc()); partAttributes.setUnitOfMeasure( quantityDetailsV1.getProductAttributes().getUom()); } site.setPartAttributes(partAttributes); //needed only in details and not aggregate } else {// aggregate } if (availableQuantity != null) { site.setAvailableQuantity( availableQuantity.intValue()); //Can use this for calculations } parts.add(part); }//end of valid location });// iterate sites for that part return parts; } }