StreamsLifecycleInterceptor
Azkarra maintains an intercepting filter chain internally to easily perform operations while starting or stopping a Kafka Streams instances by implementing and registering
StreamsLifecycleInterceptor instances.
Azkarra provides built-in interceptors for common operations like waiting for topics to be created before starting a streams instance.
1 The StreamsLifecycleInterceptor Interface
The StreamsLifecycleInterceptor interface defines two methods onStart and onStop that are respectively invoked
before the streams instance is started or before is stopped.
You should always take care to call chain.execute() to not break the chain.
public interface StreamsLifecycleInterceptor {
/**
* Intercepts the streams instance before being started.
*/
default void onStart(final StreamsLifecycleContext context, final StreamsLifecycleChain chain) {
// code here is executed before the streams is being started.
chain.execute();
// code here is executed after the streams was started (successfully or not).
}
/**
* Intercepts the streams instance before being stopped.
*/
default void onStop(final StreamsLifecycleContext context, final StreamsLifecycleChain chain) {
// code here is executed before the streams is being stopped.
chain.execute();
// code here is executed after the streams was stopped.
}
}
The information about the current streams application, such as the application ID or the topology description, can be retrieved from the StreamsLifecycleContext argument.
The StreamsLifecycleContext object can also be used for updating the current state of the Kafka Streams instance.
2 Registering an Interceptor
StreamsLifecycleInterceptor can be registered like any other components using the registerComponent methods that are exposed by
the AzkarraContext class or dynamically using the component-scan mechanism.
The AzkarraContext will be responsible to add the registered interceptors to the StreamsExecutionEnvironments and topologies.
The interceptors can also be directly add on a StreamsExecutionEnvironment level using the addStreamsLifecycleInterceptor method.
When, an interceptor is add to an environment, then it will be executed for all topologies running in that environment.
env.addStreamsLifecycleInterceptor(() -> new MyCustomInterceptor());
Finally, interceptors can be defined per topology through the used of the Executed#withInterceptor method.
env.addTopology(
()-> new WordCountTopology(),
Executed.as("wordcount").withInterceptor(() -> new MyCustomInterceptor())
);
3 Configuring an Interceptor
Like any other component, a StreamLifecycleInterceptor can implement the Configurable interface.
The Conf object passed to the configure() method corresponds to the topology configuration.
4 WaitForSourceTopicsInterceptor
When starting a new KafkaStreams instance, the application will fail while performing tasks assignment if one of the source topic is missing
(error: INCOMPLETE_SOURCE_TOPIC_METADATA).
To prevent from such error, Azkarra provides the built-in WaitForSourceTopicsInterceptor that block the KafkaStreams startup until all source topics are created.
The WaitForSourceTopicsInterceptor can be enable by setting the global application property azkarra.context.enable.wait.for.topics to true in
your application.conf file.
In addition, you can enable that interceptor per environment using the StreamsExecutionEnvironment#setWaitForTopicsToBeCreated method.
4.1 Configuration properties
| Property | Type | Description |
|---|---|---|
wait.for.topics.timeout.enable |
boolean | If true, enable and configure the interceptor. |
wait.for.topics.timeout.ms |
boolean | Wait until topics are created or this timeout is reached (Default is LONG.MAX_VALUE). |
wait.for.topics.exclude.patterns |
list | The list list of topics (regex) that the interceptor should not wait for (optional). |
5 AutoCreateTopicsInterceptor
During the development phase, you may find yourself creating and deleting Kafka topics manually and before each run of your application.
To ease this operation, Azkarra provides the built-in AutoCreateTopicsInterceptor which can be used to automatically create the source and sink topics
before the streams application is started.
When enabled, the AutoCreateTopicsInterceptor is automatically configured by the AzkarraContext.
The AzkarraContext will use the following properties to configure the AutoCreateTopicsInterceptor.
5.1 Configuration properties
| Property | Type | Description |
|---|---|---|
auto.create.topics.enable |
boolean | If true, creates all source and sink topics used by the topology. |
auto.create.topics.num.partitions |
int | The default number of partition. |
auto.create.topics.replication.factor| int |
The default replication factor. | |
auto.create.topics.configs |
Map[string, string] | The configuration to be used for creating topics. |
You can also add and configure a AutoCreateTopicsInterceptor to a StreamsExecutionEnvironment instance :
Here is a simple example :
StreamsExecutionEnvironment env = DefaultStreamsExecutionEnvironment.create();
env.addStreamsLifecycleInterceptor( () -> {
AutoCreateTopicsInterceptor interceptor = new AutoCreateTopicsInterceptor();
interceptor.setReplicationFactor((short)3);
interceptor.setNumPartitions(6);
return interceptor;
});
5.2 Defining the list of Topics
By default, the AutoCreateTopicsInterceptor resolves the list of topics to be created from the TopologyDescription object.
But, you can also specify your own list of NewTopic to be created.
env.addStreamsLifecycleInterceptor( () -> {
AutoCreateTopicsInterceptor interceptor = new AutoCreateTopicsInterceptor();
interceptor.setTopics(Collections.singletonList(
new NewTopic("my-source-topic", 6, (short)3))
);
return interceptor;
});
When, the AutoCreateTopicsInterceptor is enable on context-level, the AzkarraContext will lookup for registered components of type NewTopic.
If you run multiple streams topologies (or environments) you can use the @Restricted annotation to specify the target environment or streams of the component.
Here is a simple example :
@Factory
public class TopicsFactory {
@Component
@Restricted(type = "streams", names = "wordCountTopology")
public NewTopic sourceTopic() {
return new NewTopic("my-source-topic", 6, (short)3);
}
}
5.3 Automatically deleting topics
The AutoCreateTopicsInterceptor can also be used for automatically deleting any topics used by the topology when the streams instance is stopped.
Note: This property should be used with care and not enable for production.
| Property | Type | Description |
|---|---|---|
auto.delete.topics.enable |
boolean | If true, deletes all topics after the streams is stopped (should only be used for development) |
6 MonitoringStreamsInterceptor
As of Azkarra v0.7.0, you can configure the built-in MonitoringStreamsInterceptor to periodically publish a state event of your KafkaStreams instance directly into a Kafka topic (default: _azkarra-streams-monitoring).
The MonitoringStreamsInterceptor can be enable by setting the global application property azkarra.context.monitoring.streams.interceptor.enable to true in
your application.conf file
6.1 The event format
Azkarra emits the state of KafkaStreams instances in the form of events that adhere to the CloudEvents specification.
The CloudEvent specification is developed under the Cloud Native Computing Foundation with the aim to describe a standardized and protocol-agnostic definition of the structure and metadata description of events.
Currently, Azkarra only supports the Structuted Content mode for mapping CloudEvents to Kafka message. That means that the message-value contains event metadata and data together in a single envelope, encoded in JSON.
The following example shows a CloudEvent published by the MonitoringStreamsInterceptor using default configuration.
{
"id": "appid:basic-word-count-0-8-0;appsrv:localhost:8080;ts:1600951440046", ①
"source": "arn://kafka=CFPFUjoZQnWcST8Dhh5Grw/host=localhost/port=8080", ②
"subject": "arn://kafka=CFPFUjoZQnWcST8Dhh5Grw/host=localhost/port=8080/streams=basic-word-count-0-8-0",
"specversion": "1.0", ③
"type": "io.streamthoughts.azkarra.streams.stateupdateevent", ④
"time": "2020-05-08T22:13:39.636+0000", ⑤
"datacontenttype": "application/json", ⑥
"ioazkarramonitorintervalms": 10000, ⑦
"ioazkarrastreamsappid": "basic-word-count", ⑧
"ioazkarraversion": "0.8.0", ⑨
"ioazkarrastreamsappserver": "localhost:8082" ⑩
"data": { ⑪
"state": "RUNNING",
"threads": [
{
"name": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1",
"state": "RUNNING",
"active_tasks": [
{
"task_id": "0_0",
"topic_partitions": [
{
"topic": "streams-plaintext-input",
"partition": 0
}
]
},
{
"task_id": "1_0",
"topic_partitions": [
{
"topic": "basic-word-count-count-repartition",
"partition": 0
}
]
}
],
"standby_tasks": [],
"clients": {
"admin_client_id": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-admin",
"consumer_client_id": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1-consumer",
"producer_client_ids": [
"basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1-producer"
],
"restore_consumer_client_id": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1-restore-consumer"
}
}
],
"offsets": {
"group": "basic-word-count",
"consumers": [
{
"client_id": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1-consumer",
"stream_thread": "basic-word-count-ab756b57-25ed-4c84-b4ef-93e9a84057ad-StreamThread-1",
"positions": [
{
"topic": "streams-plaintext-input",
"partition": 0,
"consumed_offset": 10,
"consumed_timestamp": 1588975991664,
"committed_offset": 11,
"committed_timestamp": 1588976019189,
"log_end_offset": 11,
"log_start_offset": 0,
"lag": 0
},
{
"topic": "basic-word-count-count-repartition",
"partition": 0,
"consumed_offset": 14,
"consumed_timestamp": 1588975991664,
"committed_offset": 15,
"committed_timestamp": 1588976019209,
"log_end_offset": 15,
"log_start_offset": 15,
"lag": 0
}
]
}
]
},
"stores": [
{
"name": "count",
"positions": [
{
"partition": 0,
"current_offset": 0,
"log_end_offset": 0,
"offset_lag": 0
}
]
}
],
"state_changed_time": 1588975839528
}
}
- ① The unique id of the state change event, based on the :
application.id,application.serverand Unix epoch - ② The source of the event based on the: Kafka Cluster ID and
application.server(note: arn=Azkarra Resource Name) - ③ The CloudEvents specification versions
- ④ The type of change state event
- ⑤ Time of the state change or the event is emit
- ⑥ The content type of the data attribute; i.e JSON
- ⑦ The period the interceptor use to send a state change event for the current KafkaStream instance.
- ⑧ The
application.idproperty value attached the KafkaStreams instance. - ⑨ The version of Azkarra Streams
- ⑩ The
application.serverproperty value attached the KafkaStreams instance. - ⑪ The actual state of KafkaStreams
You can also add your on CloudEvent extension attributes by configuring the property monitoring.streams.interceptor.ce.extensions.
6.2 Configuration properties
| Property | Type | Description |
|---|---|---|
monitoring.streams.interceptor.enable |
boolean | If true, enable and configure the interceptor |
monitoring.streams.interceptor.interval.ms |
long | The period the interceptor should use to send a streams state event (Default is 10 seconds). |
monitoring.streams.interceptor.topic |
string | The topic on which monitoring event will be sent (Default is _azkarra-streams-monitoring). |
monitoring.streams.interceptor.advertised.server |
string | The server name that will be included in monitoring events. If not specified, the streams application.server property is used. |
monitoring.streams.interceptor.ce.extensions |
list | The list of extension attributes that should be included in monitoring events. |
monitoring.streams.interceptor.info.enabled.stores.lag |
boolean | If true, the interceptor will also report offset-lag for local state stores |
7 KafkaBrokerReaderInterceptor
This interceptor waits for a number of broker to be available before starting the Kafka Streams application.
7.1 Configuration properties
| Property | Type | Description |
|---|---|---|
kafka.ready.interceptor.enable |
boolean | If true, enable and configure the interceptor |
kafka.ready.interceptor.timeout.ms |
long | Wait until brokers are available or this timeout is reached (Default is 60000). |
kafka.ready.interceptor.retry.backoff.ms |
long | The amount of time to wait before verifying that brokers are available (Default is 1000). |
kafka.ready.interceptor.min.available.brokers |
int | The minimal number of broker that should be alive for the interceptor stops waiting (Default is 1) |