* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* </p>
* </p>
* <p>
* <p>
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
* or entities that have been modified will be emitted from the Processor.
* or entities that have been modified will be emitted from the Processor.
* </p>
* </p>
* <p>
* <p>
* In order to make use of this abstract class, the entities listed must meet the following criteria:
* In order to make use of this abstract class, the entities listed must meet the following criteria:
* </p>
* </p>
* <ul>
* <ul>
* <li>
* <li>
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* </li>
* </li>
* <li>
* <li>
* If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* than the last timestamp pulled, then the entity is considered new.
* than the last timestamp pulled, then the entity is considered new.
* </li>
* </li>
* <li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </li>
* </ul>
* </ul>
* <p>
* <p>
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
* performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
* performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
* that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
* that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
* determine new entities.
* determine new entities.
* </p>
* </p>
* <p>
* <p>
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
* </p>
* </p>
* <p>
* <p>
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* the configured dataflow.
* the configured dataflow.
* </p>
* </p>
* <p>
* <p>
* Subclasses are responsible for the following:
* Subclasses are responsible for the following:
* </p>
* </p>
* <ul>
* <ul>
* <li>
* <li>
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* </li>
* </li>
* <li>
* <li>
* Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
* Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
* Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
* within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
* </li>
* </li>
* <li>
* <li>
* Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
* Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* </li>
* </li>
* <li>
* <li>
* Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of
* Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of
* getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision.
* getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision.
* </li>
* </li>
* </ul>
* </ul>
*/
*/
@TriggerSerially
@TriggerSerially
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
+ "The scope used depends on the implementation.")
+ "The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
"Automatically detect time unit deterministically based on candidate entries timestamp."
"Automatically detect time unit deterministically based on candidate entries timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
.name("target-system-timestamp-precision")
.name("target-system-timestamp-precision")
.displayName("Target System Timestamp Precision")
.displayName("Target System Timestamp Precision")
.description("Specify timestamp precision at the target system."
.description("Specify timestamp precision at the target system."
+ " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+ " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")