public abstract class State<S>
extends Object
mapWithState
operation of a pair DStream
(Scala)
or a JavaPairDStream
(Java).
Scala example of using State
:
// A mapping function that maintains an integer state and returns a String
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
// Check if state exists
if (state.exists) {
val existingState = state.get // Get the existing state
val shouldRemove = ... // Decide whether to remove the state
if (shouldRemove) {
state.remove() // Remove the state
} else {
val newState = ...
state.update(newState) // Set the new state
}
} else {
val initialState = ...
state.update(initialState) // Set the initial state
}
... // return something
}
Java example of using State
:
// A mapping function that maintains an integer state and returns a String
Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
new Function3<String, Optional<Integer>, State<Integer>, String>() {
@Override
public String call(String key, Optional<Integer> value, State<Integer> state) {
if (state.exists()) {
int existingState = state.get(); // Get the existing state
boolean shouldRemove = ...; // Decide whether to remove the state
if (shouldRemove) {
state.remove(); // Remove the state
} else {
int newState = ...;
state.update(newState); // Set the new state
}
} else {
int initialState = ...; // Set the initial state
state.update(initialState);
}
// return something
}
};
Constructor and Description |
---|
State() |
Modifier and Type | Method and Description |
---|---|
abstract boolean |
exists()
Whether the state already exists
|
abstract S |
get()
Get the state if it exists, otherwise it will throw
java.util.NoSuchElementException . |
scala.Option<S> |
getOption()
Get the state as a
Option . |
abstract boolean |
isTimingOut()
Whether the state is timing out and going to be removed by the system after the current batch.
|
abstract void |
remove()
Remove the state if it exists.
|
String |
toString() |
abstract void |
update(S newState)
Update the state with a new value.
|
public abstract boolean exists()
public abstract S get()
java.util.NoSuchElementException
.
Check with exists()
whether the state exists or not before calling get()
.
java.util.NoSuchElementException
- If the state does not exist.public abstract void update(S newState)
State cannot be updated if it has been already removed (that is, remove()
has already been
called) or it is going to be removed due to timeout (that is, isTimingOut()
is true
).
newState
- (undocumented)IllegalArgumentException
- If the state has already been removed, or is
going to be removedpublic abstract void remove()
State cannot be updated if it has been already removed (that is, remove()
has already been
called) or it is going to be removed due to timeout (that is, isTimingOut()
is true
).
public abstract boolean isTimingOut()
StatSpec
and the key has not received any new data
for that timeout duration.public final scala.Option<S> getOption()
Option
. It will be Some(state)
if it exists, otherwise None
.public final String toString()
toString
in class Object