Untitled diff

Created Diff never expires
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/
*/


package org.apache.nifi.processor.util.list;
package org.apache.nifi.processor.util.list;


import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.StringUtils;


import java.io.File;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HashSet;
import java.util.List;
import java.util.List;
import java.util.Map;
import java.util.Map;
import java.util.Properties;
import java.util.Properties;
import java.util.Set;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Collectors;


/**
/**
* <p>
* <p>
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* </p>
* </p>
* <p>
* <p>
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
* or entities that have been modified will be emitted from the Processor.
* or entities that have been modified will be emitted from the Processor.
* </p>
* </p>
* <p>
* <p>
* In order to make use of this abstract class, the entities listed must meet the following criteria:
* In order to make use of this abstract class, the entities listed must meet the following criteria:
* </p>
* </p>
* <ul>
* <ul>
* <li>
* <li>
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* </li>
* </li>
* <li>
* <li>
* If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* than the last timestamp pulled, then the entity is considered new.
* than the last timestamp pulled, then the entity is considered new.
* </li>
* </li>
* <li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </li>
* </ul>
* </ul>
* <p>
* <p>
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
* performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
* performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
* that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
* that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
* determine new entities.
* determine new entities.
* </p>
* </p>
* <p>
* <p>
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
* </p>
* </p>
* <p>
* <p>
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* the configured dataflow.
* the configured dataflow.
* </p>
* </p>
* <p>
* <p>
* Subclasses are responsible for the following:
* Subclasses are responsible for the following:
* </p>
* </p>
* <ul>
* <ul>
* <li>
* <li>
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* </li>
* </li>
* <li>
* <li>
* Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
* Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
* {@link #createAttributes(ListableEntity, ProcessContext)}.
* {@link #createAttributes(ListableEntity, ProcessContext)}.
* </li>
* </li>
* <li>
* <li>
* Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
* within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
* </li>
* </li>
* <li>
* <li>
* Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
* Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* </li>
* </li>
* <li>
* <li>
* Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of
* Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of
* getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision.
* getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision.
* </li>
* </li>
* </ul>
* </ul>
*/
*/
@TriggerSerially
@TriggerSerially
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
+ "The scope used depends on the implementation.")
+ "The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {


public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
.required(false)
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
.build();


public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
"Automatically detect time unit deterministically based on candidate entries timestamp."
"Automatically detect time unit deterministically based on candidate entries timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");


public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
.name("target-system-timestamp-precision")
.name("target-system-timestamp-precision")
.displayName("Target System Timestamp Precision")
.displayName("Target System Timestamp Precision")
.description("Specify timestamp precision at the target system."
.description("Specify timestamp precision at the target system."
+ " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+ " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
.required(true)
.required(true)
.allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
.allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
.defaultValue(PRECISION_AUTO_DETECT.getValue())
.defaultValue(PRECISION_AUTO_DETECT.getValue())
.build();
.build();


public static final Relationship REL_SUCCESS = new Relationship.Builder()
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.name("success")
.description("All FlowFiles that are received are routed to success")
.description("All FlowFiles that are received are routed to success")
.build();
.build();


/**
/**
* Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
* Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
* It does not necessary mean it has been processed as well.
* It does not necessary mean it has been processed as well.
* Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
* Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
*/
*/
private volatile Long lastListedLatestEntryTimestampMillis = null;
private volatile Long lastListedLatestEntryTimestampMillis = null;
/**
/**
* Represents the timestamp of an entity which was the latest one
* Represents the timestamp of an entity which was the latest one
* within those picked up and written to the output relationship at the previous cycle.
* within those picked up and written to the output relationship at the previous cycle.
*/
*/
private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
private volatile Long lastRunTimeNanos = 0L;
private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false;
private volatile boolean resetState = false;
private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();
private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();


/*
/*
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* near instantaneously after the prior iteration effectively voiding the built in buffer
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
*/
public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
static {
static {
final Map<TimeUnit, Long> nanos = new HashMap<>();
final Map<TimeUnit, Long> nanos = new HashMap<>();
nanos.put(TimeUnit.MILLISECONDS, 100L);
nanos.put(TimeUnit.MILLISECONDS, 100L);
nanos.put(TimeUnit.SECONDS, 1_000L);
nanos.put(TimeUnit.SECONDS, 1_000L);
nanos.put(TimeUnit.MINUTES, 60_000L);
nanos.put(TimeUnit.MINUTES, 60_000L);
LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
}
}
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
static final String IDENTIFIER_PREFIX = "id";
static final String IDENTIFIER_PREFIX = "id";


public File getPersistenceFile() {
public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
return new File("conf/state/" + getIdentifier());
}
}


@Override
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
return properties;
}
}


@Override
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
resetTimeStates(); // clear lastListingTime so that we have to fetch new time
resetTimeStates(); // clear lastListingTime so that we have to fetch new time
resetState = true;
resetState = true;
}
}
}
}




@Override
@Override
public Set<Relationship> getRelationships() {
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_SUCCESS);
return relationships;
return relationships;
}
}


@OnPrimaryNodeStateChange
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
}


@OnScheduled
@OnScheduled
public final void updateState(final ProcessContext context) throws IOException {
public final void updateState(final ProcessContext context) throws IOException {
final String path = getPath(context);
final String path = getPath(context);
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);


// Check if state already exists for this path. If so, we have already migrated the state.
// Check if state already exists for this path. If so, we have already migrated the state.
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
if (stateMap.getVersion() == -1L) {
if (stateMap.getVersion() == -1L) {
try {
try {
// Migrate state from the old way of managing state (distributed cache service and local file)
// Migrate state from the old way of managing state (distributed cache service and local file)
// to the new mechanism (State Manager).
// to the new mechanism (State Manager).
migrateState(path, client, context.getStateManager(), getStateScope(context));
migrateState(path, client, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
} catch (final IOException ioe) {
throw new IOException("Failed to properly migrate state to State Manager", ioe);
throw new IOException("Failed to properly migrate state to State Manager", ioe);
}
}
}
}


// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
resetTimeStates();
resetTimeStates();
}
}


if (resetState) {
if (resetState) {
context.getStateManager().clear(getStateScope(context));
context.getStateManager().clear(getStateScope(context));
resetState = false;
resetState = false;
}
}
}
}


/**
/**
* This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of
* This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of
* the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager,
* the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager,
* if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran
* if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran
*
*
* @param path the path to migrate state for
* @param path the path to migrate state for
* @param client the DistributedMapCacheClient that is capable of obtaining the current state
* @param client the DistributedMapCacheClient that is capable of obtaining the current state
* @param stateManager the StateManager to use in order to store the new state
* @param stateManager the StateManager to use in order to store the new state
* @param scope the scope to use
* @param scope the scope to use
* @throws IOException if unable to retrieve or store the state
* @throws IOException if unable to retrieve or store the state
*/
*/
private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
Long minTimestamp = null;
Long minTimestamp = null;


// Retrieve state from Distributed Cache Client, establishing the latest file seen
// Retrieve state from Distributed Cache Client, establishing the latest file seen
if (client != null) {
if (client != null) {
final StringSerDe serde = new StringSerDe();
final StringSerDe serde = new StringSerDe();
final String serializedState = client.get(getKey(path), serde, serde);
final String serializedState = client.get(getKey(path), serde, serde);
if (serializedState != null && !serializedState.isEmpty()) {
if (serializedState != null && !serializedState.isEmpty()) {
final EntityListing listing = deserialize(serializedState);
final EntityListing listing = deserialize(serializedState);
minTimestamp = listing.getLatestTimestamp().getTime();
minTimestamp = listing.getLatestTimestamp().getTime();
}
}


// remove entry from distributed cache server
// remove entry from distributed cache server
if (client != null) {
if (client != null) {
try {
try {
client.remove(path, new StringSerDe());
client.remove(path, new StringSerDe());
} catch (final IOException ioe) {
} catch (final IOException ioe) {
getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
+ "State Management service, so the Distributed Cache Service is no longer needed.");
+ "State Management service, so the Distributed Cache Service is no longer needed.");
}
}
}
}
}
}


// Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one
// Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one
final File persistenceFile = getPersistenceFile();
final File persistenceFile = getPersistenceFile();
if (persistenceFile.exists()) {
if (persistenceFile.exists()) {
final Properties props = new Properties();
final Properties props = new Properties();


try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
props.load(fis);
props.load(fis);
}
}


final String locallyPersistedValue = props.getProperty(path);
final String locallyPersistedValue = props.getProperty(path);
if (locallyPersistedValue != null) {
if (locallyPersistedValue != null) {
final EntityListing listing = deserialize(locallyPersistedValue);
final EntityListing listing = deserialize(locallyPersistedValue);
final long localTimestamp = listing.getLatestTimestamp().getTime();
final long localTimestamp = listing.getLatestTimestamp().getTime();
// if the local file's latest timestamp is beyond that of the value provided from the cache, replace
// if the local file's latest timestamp is beyond that of the value provided from the cache, replace
if (minTimestamp == null || localTimestamp > minTimestamp) {
if (minTimestamp == null || localTimestamp > minTimestamp) {
minTimestamp = localTimestamp;
minTimestamp = localTimestamp;
latestIdentifiersProcessed.clear();
latestIdentifiersProcessed.clear();
latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
}
}
}
}


// delete the local file, since it is no longer needed
// delete the local file, since it is no longer needed
if (persistenceFile.exists() && !persistenceFile.delete()) {
if (persistenceFile.exists() && !persistenceFile.delete()) {
getLogger().warn("Migrated state but failed to delete local persistence file");
getLogger().warn("Migrated state but failed to delete local persistence file");
}
}
}
}


if (minTimestamp != null) {
if (minTimestamp != null) {
persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
}
}
}
}


private void persist(final long latestListedEntryTimestampThisCycleMillis,
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
final List<String> processedIdentifiesWithLatestTimestamp,
final StateManager stateManager, final Scope scope) throws IOException {
final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
}
}
stateManager.setState(updatedState, scope);
stateManager.setState(updatedState, scope);
}
}


protected String getKey(final String directory) {
protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
return getIdentifier() + ".lastListingTime." + directory;
}
}


private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
final ObjectMapper mapper = new ObjectMapper();
final ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(serializedState, EntityListing.class);
return mapper.readValue(serializedState, EntityListing.class);
}
}




@Override
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;


if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
try {
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
// if just elected the primary node
if (justElectedPrimaryNode) {
justElectedPrimaryNode = false;
}
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
latestIdentifiersProcessed.clear();
latestIdentifiersProcessed.clear();
for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
final String k = state.getKey();
final String v = state.getValue();
final String v = state.getValue();
if (v == null || v.isEmpty()) {
if (v == null || v.isEmpty()) {
continue;
continue;
}
}


if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
minTimestampToListMillis = Long.parseLong(v);
minTimestampToListMillis = Long.parseLong(v);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
context.yield();
context.yield();
return;
return;
} else {
} else {
this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
}
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
latestIdentifiersProcessed.add(v);
latestIdentifiersProcessed.add(v);
}
}
}
}
justElectedPrimaryNode = false;
} catch (final IOException ioe) {
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
context.yield();
return;
return;
}
}
}
}


final List<T> entityList;
final List<T> entityList;
final long currentRunTimeNanos = System.nanoTime();
final long currentRunTimeNanos = System.nanoTime();
final long currentRunTimeMillis = System.currentTimeMillis();
final long currentRunTimeMillis = System.currentTimeMillis();
try {
try {
// track of when this last executed for consideration of the lag nanos
// track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestampToListMillis);
entityList = performListing(context, minTimestampToListMillis);
} catch (final IOException e) {
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
getLogger().error("Failed to perform listing on remote host due to {}", e);
context.yield();
context.yield();
return;
return;
}
}


if (entityList == null || entityList.isEmpty()) {
if (entityList == null || entityList.isEmpty()) {
context.yield();
context.yield();
return;
return;
}
}


Long latestListedEntryTimestampThisCycleMillis = null;
Long latestListedEntryTimestampThisCycleMillis = null;
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();


// Build a sorted map to determine the latest possible entries
// Build a sorted map to determine the latest possible entries
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
boolean targetSystemHasSeconds = false;
for (final T entity : entityList) {
for (final T entity : entityList) {
final long entityTimestampMillis = entity.getTimestamp();
final long entityTimestampMillis = entity.getTimestamp();
if (!targetSystemHasMilliseconds) {
if (!targetSystemHasMilliseconds) {
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
}
}
if (!targetSystemHasSeconds) {
if (!targetSystemHasSeconds) {
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
}
// New entries are all those that occur at or after the associated timestamp
// New entries are all those that occur at or after the associated timestamp
final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;
final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;


if (newEntry) {
if (newEntry) {
List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
if (entitiesForTimestamp == null) {
if (entitiesForTimestamp == null) {
entitiesForTimestamp = new ArrayList<T>();
entitiesForTimestamp = new ArrayList<T>();
orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp);
orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp);
}
}
entitiesForTimestamp.add(entity);
entitiesForTimestamp.add(entity);
}
}
}
}


int flowfilesCreated = 0;
int flowfilesCreated = 0;


if (orderedEntries.size() > 0) {
if (orderedEntries.size() > 0) {
latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();


// Determine target system time precision.
// Determine target system time precision.
String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue()
if (StringUtils.isBlank(specifiedPrecision)) {