How to Stream and Apply Real-Time Prediction Models on High-Throughput Time-Series Data
Most of the stream processing libraries are not python friendly while the majority of machine learning and data mining libraries are python based. Although the Faust library aims to bring Kafka Streaming ideas into the Python ecosystem, it may pose challenges in terms of ease of use. This document serves as a tutorial and offers best practices for effectively utilizing Faust.
In the first section, I present an introductory overview of stream processing concepts, drawing extensively from the book Designing Data-Intensive Applications [1]. Following that, I explore the key functionalities of the Faust library, placing emphasis on Faust windows, which are often difficult to grasp from the available documentation and utilize efficiently. Consequently, I propose an alternative approach to utilizing Faust windows by leveraging the library’s own functions. Lastly, I share my experience implementing a similar pipeline on the Google Cloud Platform.
A stream refers to unbounded data that is incrementally made available over time. An event is a small, self-contained object that contains the details of something happened at some point in time e.g. user interaction. An event is generated by a producer (e.g. temperature sensor) and may be consumed by some consumers (e.g. online dashboard). Traditional databases are ill-suited for storing events in high throughput event streams. This is due to the need for consumers to periodically poll the database to identify new events, resulting in significant overhead. Instead, it is better for consumers to be notified when new events appear and messaging systems are designed for doing this.
A message broker is a widely adopted system for messaging, in which producers write messages to the broker, and consumers are notified by the broker and receive these messages. AMQP-based message brokers, like RabbitMQ, are commonly employed for asynchronous message passing between services and task queues. Unlike databases, they adopt a transient messaging mindset and delete a message only after it has been acknowledged by its consumers. When processing messages becomes resource-intensive, parallelization can be achieved by employing multiple consumers that read from the same topic in a load-balanced manner. In this approach, messages are randomly assigned to consumers for processing, potentially resulting in a different order of processing compared to the order of receiving.
On the other hand, log-based message brokers such as Apache Kafka combine the durability of database storage with the low-latency notification capabilities of messaging systems. They utilize a partitioned-log structure, where each partition represents an append-only sequence of records stored on disk. This design enables the re-reading of old messages. Load balancing in Kafka is achieved by assigning a consumer to each partition and in this way, the order of message processing aligns with the order of receiving, but the number of consumers is limited to the number of partitions available.
Stream processing involves performing actions on a stream, such as processing a stream and generate a new one, storing event data in a database, or visualizing data on a dashboard. Stream analytics is a common use case where we aggregate information from a sequence of events within a defined time window. Tumbling windows (non-overlapping) and hopping windows (overlapping) are popular window types used in stream analytics. Examples of stream analytics use cases can be simply counting the number of events in the previous hour, or applying a complex time-series prediction model on events.
Stream analytics faces the challenge of distinguishing between event creation time (event time) and event processing time as the processing of events may introduce delays due to queuing or network issues. Defining windows based on processing time is a simpler approach, especially when the processing delay is minimal. However, defining windows based on event time poses a greater challenge. This is because it is uncertain whether all the data within a window has been received or if there are still pending events. Hence, it becomes necessary to handle straggler events that arrive after the window has been considered complete.
In applications involving complex stream analytics, such as time-series prediction, it is often necessary to process a sequence of ordered messages within a window as a cohesive unit. In this situation, the messages exhibit strong inter-dependencies, making it difficult to acknowledge and remove individual messages from the broker. Consequently, a log-based message broker presents itself as a preferable option for utilization. Furthermore, parallel processing may not be feasible or overly intricate to implement in this context, as all the messages within a window need to be considered together. However, applying a complex ML model to the data can be computationally intensive, necessitating an alternative approach to parallel processing. This document aims to propose a solution for effectively employing a resource-intensive machine learning model in a high-throughput stream processing application.
There are several stream processing libraries available, such as Apache Kafka Streams, Flink, Samza, Storm, and Spark Streaming. Each of these libraries has its own strengths and weaknesses, but many of them are not particularly Python-friendly. However, Faust is a Python-based stream processing library that use Kafka as the underlying messaging system and aims to bring the ideas of Kafka Streams to the Python ecosystem. Unfortunately, Faust’s documentation can be confusing, and the source code can be difficult to comprehend. For instance, understanding how windows work in Faust is challenging without referring to the complex source code. Additionally, there are numerous open issues in the Faust (v1) and the Faust-Streaming (v2) repositories, and resolving these issues is not a straightforward process. In the following, essential knowledge about Faust’s underlying structure will be provided, along with code snippets to assist in effectively utilizing the Faust library.
To utilize Faust, the initial step involves creating an App and configuring the project by specifying the broker and other necessary parameters. One of the useful parameters is the table_cleanup_interval
that will be discussed later.
app = faust.App(
app_name,
broker=broker_address,
store=rocksdb_address,
table_cleanup_interval=table_cleanup_interval
)
Then you can define a stream processor using the agent decorator to consume from a Kafka topic and do something for every event it receives.
schema = faust.Schema(value_serializer='json')
topic = app.topic(topic_name, schema=schema)@app.agent(topic)
async def processor(stream):
async for event in stream:
print(event)
For keeping state in a stream processor, we can use Faust Table. A table is a distributed in-memory dictionary, backed by a Kafka changelog topic. You can think of table
as a python dictionary that can be set within a stream processor.
table = app.Table(table_name, default=int)@app.agent(topic)
async def processor(stream):
async for event in stream:
table[key] += event
Faust Windows
Let’s consider a time-series problem where every second, we require samples from the previous 10 seconds to predict something. So we need 10s overlapping windows with 1s overlap. To achieve this functionality, we can utilize Faust windowed tables which are inadequately explained in the Faust documentation and often lead to confusion.
Ideally, a stream processing library should automatically perform the following tasks:
- Maintain a state for each window (list of events);
- Identify the relevant windows for a new event (the last 10 windows);
- Update the state of these windows (append the new event to the end of their respective lists);
- Apply a function when a window is closed, using the window’s state as input.
In the code snippet below, you can observe the suggested approach in the Faust documentation for constructing a window and utilizing it in a streaming processor (refer to this example from the Faust library):
# Based on Fuast example
# Do not use thiswindow_wrapper = app.Table(
table_name, default=list, on_window_close=window_close
).hopping(
10, 1, expires=expire_time
)
@app.agent(topic)
async def processor(stream):
async for event in stream:
window_set = window_wrapper[key]
prev = window_set.value()
prev.append(event)
window_wrapper[key] = prev
In the provided code, the object window_wrapper
is an instance of the WindowWrapper class that provides some of the required functionalities. The expires
parameter determines the duration of a window’s lifespan, starting from its creation. Once this specified time has elapsed, the window is considered closed. Faust performs periodic checks on the table_cleanup_interval
duration to identify closed windows. It then applies the window_close
function, using the window state as its input.
When you call window_wrapper[key]
it returns an object of type WindowSet, which internally contains all the relevant windows. By calling window_set.value()
, you can access the state of the latest window, and you can also access previous window states by calling window_set.delta(30)
which gives the state at 30 seconds ago. Additionally, you can update the state of the latest window by assigning a new value to window_wrapper[key]
. This approach works fine for tumbling windows. However, it does not work for hopping windows where we need to update the state of multiple windows.
[Faust Documentation:] At this point, when accessing data from a hopping table, we always access the latest window for a given timestamp and we have no way of modifying this behavior.
While Faust provides support for maintaining the state of windows, identifying relevant windows, and applying a function on closed windows, it does not fully address the third functionality which involves updating the state of all relevant windows. In the following, I propose a new approach for utilizing Faust windows that encompasses this functionality as well.
Windows Reinvented
Comprehending the functionality and operation of Faust windows proved challenging for me until I delved into the source code. Faust windows are built upon an underlying Faust table, which I’ll refer to as the inner table moving forward. Surprisingly, the Faust documentation does not emphasize the inner table or provide a clear explanation of its role in implementing windows. However, it is the most crucial component in the window implementation. Therefore, in the following section, I will begin by defining the inner table and then proceed to discuss the window wrappers.
inner_table = app.Table(
table_name, default=list, partitions=1, on_window_close=window_close
)# for tumbling window:
window_wrapper = inner_table.tumbling(
window_size, key_index=True, expires=timedelta(seconds=window_size)
)
# for hopping window:
window_wrapper = inner_table.hopping(
window_size, slide, key_index=True, expires=timedelta(seconds=window_size)
)
Let’s now examine how Faust handles the first and second functionalities (keeping state and identifying relevant windows). Faust utilizes the concept of a window range, represented by a simple (start, end) tuple, to determine which windows are associated with a given timestamp. If the timestamp falls within the start and end times of a window, that window is considered relevant. Faust creates a record within the inner table using a key composed of the pair (key, window range) and updates it accordingly.
However, when invoking window_wrapper[key]
, it merely retrieves the present window range by relying on the current timestamp, and subsequently returns inner_table[(key, current_window_range)]
. This poses an issue since utilizing the window wrapper only impacts the most recent window, even if the event pertains to multiple windows. Therefore, in the subsequent function, I opted to employ the inner_table
instead. This enables me to obtain all the relevant window ranges and directly update each associated window using the inner table:
async def update_table(events, key, window_wrapper, inner_table):
t = window_wrapper.get_timestamp()
for window_range in inner_table._window_ranges(t):
prev = inner_table[(key, window_range)]
prev.extend(events)
inner_table[(key, window_range)] = prev
Within this function, the initial line is responsible for locating the current timestamp, while inner_table._window_ranges(t)
retrieves all pertinent window ranges for that timestamp. We subsequently proceed to update each relevant window within a for loop. This approach allows us to utilize the update_table
function for both tumbling and hopping windows effectively.
It’s worth noting that update_table
accepts a list of events instead of just one, and employs the extends
method instead of append
. This choice is motivated by the fact that when attempting to update a table incrementally within a high-throughput pipeline, you often encounter the warning “producer buffer full size” which significantly hampers efficiency. Consequently, it is advisable to update tables in mini-batches, as demonstrated in the following:
@app.agent(topic)
async def processor(stream):
batch = []
async for event in stream:
batch.append(event)
if len(batch) >= 200:
await update_table(batch, key, window_wrapper, inner_table)
batch = []
Multiprocessing
In Faust, each worker operates with a single process. Consequently, if the processing of a window is computationally intensive, it can result in a delay which is unacceptable for real-time applications. To address this issue, I propose leveraging the Python multiprocessing library within the window_close
function. By doing so, we can distribute the processing load across multiple processes and mitigate the delay caused by heavy window processing, ensuring better real-time performance.
from multiprocessing import Poolasync def window_close(key, events):
pool.apply_async(compute, (events,), callback=produce)
def compute(events):
# implement the logic here
return result
def produce(result):
if isinstance(result, Exception):
print(f'EXCEPTION {result}')
return
# producer is a KafkaProducer
producer.send(topic_name, value=result, key='result'.encode())
pool = Pool(processes=num_process)
In the provided code, a pool of processes is created. Within the window_close
function, pool.apply_async
is utilized to delegate the job to a new worker and retrieve the result. A callback function is invoked when the result is ready.
In this specific code, the result is sent to a new Kafka topic using a Kafka producer. This setup enables the creation of a chain of Kafka topics, where each topic serves as the input for another stream processor. This allows for a sequential flow of data between the Kafka topics, facilitating efficient data processing and enabling the chaining of multiple stream processors.
I would like to briefly discuss my negative experience with the Google Cloud Platform (GCP). GCP recommends using Google Pub/Sub as the message broker, Apache Beam as the stream processing library, Google Dataflow for execution, and Google BigQuery as the database. However, when I attempted to use this stack, I encountered numerous issues that made it quite challenging.
Working with Google Pub/Sub in Python proved to be slow (check this and this), leading me to abandon it in favor of Kafka. Apache Beam is a well-documented library, however, using it with Kafka presented its own set of problems. The direct runner was buggy, requiring the use of Dataflow and resulting in significant time delays as I waited for machine provisioning. Furthermore, I experienced issues with delayed triggering of windows, despite my unsuccessful attempts to resolve the problem (check this GitHub issue and this Stack Overflow post). Also debugging the entire system was a major challenge due to the complex integration of multiple components, leaving me with limited control over the logs and making it difficult to pinpoint the root cause of issues within Pub/Sub, Beam, Dataflow, or BigQuery. In summary, my experience with the Google Cloud Platform was marred by the slow performance of Google Pub/Sub in Python, the bugs encountered when using Apache Beam with Kafka, and the overall difficulty in debugging the interconnected systems.