Embedded Web Server
1 Introduction
Azkarra packs with an embedded web server that exposes several REST endpoints allowing you to deploy, manage and monitor your Kafka Streams topologies. In addition, the embedded web server is also used to execute interactive queries.
Internally Azkarras relies on Undertow a high performance non-blocking webserver.
2 Enabling Web Server
The embedded web server will be automatically enabled if your main Java class is annotated with either @AzkarraStreamsApplication or @EnableEmbeddedHttpServer.
Example :
@AzkarraStreamsApplication
public class SimpleStreamsApp {
public static void main(final String[] args) {
AzkarraApplication.run(SimpleStreamsApp.class, args);
}
}
In addition, the embedded web server can be enable using the AzkarraApplication#enableHttpServer() method.
application.enableHttpServer(true, HttpServerConf.with("localhost", 8080))
StreamsConfig.APPLICATION_SERVER_CONFIG
You should note that when the web service is enable, Azkarra will automatically configure theapplication.server property for each running KafkaStreams instance.
3 Configuration
Example :
azkarra {
...
server {
listener = localhost
port = 8080
headless = false
// These information will be exposes through the http endpoint GET /info
info {
app {
name = "@project.name@"
description = "@project.description@"
version = "@project.version@"
encoding = "@project.build.sourceEncoding@"
java.version = "@java.version@"
}
}
}
}
3 JSON Serialization
Azkarra allows you to run interactive queries to retrieve data from the state stores of Kafka Streams instances. Records are returned in JSON format, using the Jackson serialization library.
3.1 Registering custom Jackson Module
Azkarra defines several JSON serializers that are provided by the module azkarra-json-serializers on which the Azkarra server depends.
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>azkarra-json-serializers</artifactId>
<version>${azkarra.version}
</dependency>
While these serializers are sufficient in most of the cases, you may have the need to provide your own custom serializers in order to use the interactive queries.
Therefore, Azkarra will automatically register all components that implements the Module interface into the internal ObjectMapper instance used for JSON serializaton. This allows you to register custom record serializers for interactive queries.
Example using @Component:
@Factory
public class JacksonModuleFactory {
@Component
public Module customModule() {
var module = new SimpleModule();
module.addSerializer(MyCustomType.class, new MyCustomSerializer());
return module;
}
}
Example using AzkarraContext#register():
context.registerComponent(Module.class, () -> {
SimpleModule module = new SimpleModule();
module.addSerializer(MyCustomType.class, new MyCustomSerializer());
return module;
});
4 REST Extensions
Azkarra already exposes several REST endpoints that you can use to monitor and manage lifecycle of Kafka Streams instances. But sometimes, you may want to expose additional REST endpoints to extend Azkarra capabilities.
Azkarra provides a mechanism, called REST extensions, that allows you to register JAX-RS resources that will be loaded and initialized by the Undertow server.
For adding REST extensions, you will first have to set the server.rest.extensions property to true. Then, you will have to implement the AzkarraRestExtension interface.
AzkarraRestExtension
TheAzkarraRestExtension interface extends the Configurable interface to give you access to the azkarra.server config.
The AzkarraRestExtension :
/**
* A pluggable interface to allow registration of new JAX-RS resources like REST endpoints.
* The implementations are discovered using the standard Java {@link java.util.ServiceLoader} mechanism.
*
* Hence, the fully qualified name of the extension classes that implement the {@link AzkarraRestExtension}
* interface must be add to a {@code META-INF/services/io.streamthoughts.azkarra.api.server.AzkarraRestExtension} file.
*/
public interface AzkarraRestExtension extends Configurable, Closeable {
/**
* Configures this instance with the specified {@link Conf}.
* The configuration passed to the method correspond used for configuring the {@link EmbeddedHttpServer}
*
* @param configuration the {@link Conf} instance used to configure this instance.
*/
@Override
default void configure(final Conf configuration) {
}
/**
* The {@link AzkarraRestExtension} implementations should use this method to register JAX-RS resources.
* @param restContext the {@link AzkarraRestExtensionContext} instance.
*/
void register(final AzkarraRestExtensionContext restContext);
}
The register() method accepts an AzkarraRestExtensionContext instance as the only parameter that gives you access to the AzkarraContext instance and the javax.ws.rs.core.Configurable that you should use to register one or more JAX-RS resources.
The AzkarraRestExtensionContext interface:
/**
* This interfaces provides the capability for {@link AzkarraRestExtension} implementations
* to register JAX-RS resources using the provided {@link Configurable} and to get access to
* the {@link AzkarraContext} instance.
*
* @see AzkarraRestExtension
*/
public interface AzkarraRestExtensionContext {
/**
* Provides an implementation of {@link javax.ws.rs.core.Configurable} that
* must be used to register JAX-RS resources.
*
* @return the JAX-RS {@link javax.ws.rs.core.Configurable}.
*/
Configurable<? extends Configurable> configurable();
/**
* Provides the {@link AzkarraContext} instance that can be used to retrieve registered components.
*
* @return the {@link AzkarraContext} instance.
*/
AzkarraContext context();
}
Example:
public class StopKafkaRestExtensionContext implements AzkarraRestExtension {
@Override
public void register(final AzkarraRestExtensionContext restContext) {
AzkarraStreamsService service = restContext.context().getComponent(AzkarraStreamsService.class);
restContext.configurable().register(new StopKafkaStreamsEndpoint(service));
}
@Path("/")
public static class StopKafkaStreamsEndpoint {
private final AzkarraStreamsService service;
StopKafkaStreamsEndpoint(final AzkarraStreamsService service) {
this.service = service;
}
@GET
@Path("/stop")
public void stop() {
service.getAllStreams().forEach(applicationId -> service.stopStreams(applicationId, false));
}
}
}
Azkarra Server use the standard Java java.util.ServiceLoader mechanism to discover the REST implementations.
Therefore, for the REST extensions to be found, you will have to create a file META-INF/services/io.streamthoughts.azkarra.api.server.AzkarraRestExtension that contains the fully-qualified names of classes that implements the StopKafkaRestExtensionContext interface.
Example:
com.example.StopKafkaRestExtensionContext