In today’s fast-paced, data-driven world, businesses need to process information as it happens. Batch processing alone can no longer keep up with real-time decision-making needs. Over the past few weeks, I embarked on a hands-on project to build a real-time streaming pipeline using Apache Kafka, Apache Flink, and PostgreSQL.
The goal was to ingest New York City Green Taxi trip data, process it in near real-time, perform windowed aggregations, and store the results in a database for analysis. Along the way, I faced challenges with message duplication, stream processing reliability, and proper integration between components — challenges that reflect real-world scenarios in streaming analytics.
In this article, I’ll walk through each step of the pipeline, the lessons learned, and best practices for building scalable streaming solutions.
The first step in our pipeline was to set up a Kafka producer to read NYC green taxi trip data and stream it into a Kafka topic. This step ensures we can work with live data for real-time processing.
We used the NYC Green Taxi dataset in Parquet format:
Example URL: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-10.parquet
Key columns used for streaming:
lpep_pickup_datetime – Trip start time
lpep_dropoff_datetime – Trip end time
PULocationID – Pickup location
DOLocationID – Dropoff location
trip_distance – Trip distance in miles
passenger_count – Number of passengers
total_amount – Total fare amount
We implemented the producer in Python using the kafka-python library. Key aspects included:
Reading Data: The Parquet dataset is read into a Pandas DataFrame. To speed up testing, we first limited the rows, but in production, the full dataset is streamed.
Serialization: Each row is serialized into JSON format before sending to Kafka. This ensures consistent data format for downstream consumers.
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
for _, row in df.iterrows():
producer.send("green-trips", value=row.to_dict())
producer.flush()
Duplicate Handling: Initially, running the producer multiple times would resend the same data, resulting in duplicates. In production, strategies like Kafka message keys, idempotent producers, or upserts on the sink side are used to prevent duplication.
After setting up the producer, we ran it as a Python package:
uv run python -m src.producers.producer
Output confirmed the number of rows loaded and sent to Kafka:
Downloading dataset...
Loaded rows: 49,416
Sending data to Kafka...
Took 7.10 seconds
We also verified the data in Kafka by consuming a few messages using Redpanda’s CLI:
docker compose exec redpanda rpk topic consume green-trips -n 5
Before introducing a full-fledged stream processing engine like Flink, it was important to validate that data flowing through Kafka could be consumed and processed correctly. To achieve this, we implemented a basic Kafka consumer in Python.
This step acts as a sanity check in any streaming pipeline:
Is data arriving correctly?
Can we deserialize it?
Can we perform simple computations on it?
We used the kafka-python library to create a simple consumer that reads messages from the green-trips topic.
consumer = KafkaConsumer(
"green-trips",
bootstrap_servers="localhost:9092",
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=True
)
Key configurations explained:
auto_offset_reset="earliest"
Ensures we read all messages from the beginning of the topic (useful for testing).
value_deserializer
Converts incoming byte data back into JSON format for processing.
enable_auto_commit=True
Automatically tracks offsets, so the consumer knows where it left off.
To validate the pipeline, we implemented a simple metric:
Count the number of trips where distance > 5 miles
count = 0
for message in consumer:
trip = message.value
if trip["trip_distance"] > 5:
count += 1
print("Trips > 5 miles:", count)
After validating our pipeline with a basic Kafka consumer, the next step was to introduce Apache Flink for real-time, stateful stream processing.
Unlike a simple consumer, Flink allows us to:
process streams continuously
maintain state across events
perform time-based (windowed) aggregations
write results to an external system reliably
We wanted to compute:
Number of trips per pickup location (PULocationID) every 5 minutes
This is a classic tumbling window aggregation problem.
In Flink, we define streams as tables using SQL DDL.
The first step was to create a table that reads from the Kafka topic:
CREATE TABLE green_trips (
lpep_pickup_datetime STRING,
lpep_dropoff_datetime STRING,
PULocationID INT,
DOLocationID INT,
passenger_count DOUBLE,
trip_distance DOUBLE,
tip_amount DOUBLE,
total_amount DOUBLE,
event_time AS TO_TIMESTAMP(lpep_pickup_datetime),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'green-trips',
'properties.bootstrap.servers' = 'redpanda:29092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
We convert lpep_pickup_datetime into a timestamp:
event_time AS TO_TIMESTAMP(lpep_pickup_datetime)
This allows Flink to process events based on when they actually occurred, not when they arrived.
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
Watermarks help Flink decide:
“How long should I wait for late data before closing a window?”
We then defined a 5-minute tumbling window aggregation:
SELECT
window_start,
PULocationID,
COUNT(*) AS num_trips
FROM TABLE(
TUMBLE(TABLE green_trips, DESCRIPTOR(event_time), INTERVAL '5' MINUTES)
)
GROUP BY window_start, PULocationID;
Next, we created a sink table to store results:
CREATE TABLE processed_events_5min (
window_start TIMESTAMP(3),
PULocationID INT,
num_trips BIGINT,
PRIMARY KEY (window_start, PULocationID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres',
'table-name' = 'processed_events_5min',
'username' = 'postgres',
'password' = 'postgres',
'driver' = 'org.postgresql.Driver'
);
Important Lesson:
Flink does not support enforced primary keys in streaming mode.
We had to explicitly use:
PRIMARY KEY (...) NOT ENFORCED
Finally, we connected everything:
INSERT INTO processed_events_5min
SELECT
window_start,
PULocationID,
COUNT(*) AS num_trips
FROM TABLE(
TUMBLE(TABLE green_trips, DESCRIPTOR(event_time), INTERVAL '5' MINUTES)
)
GROUP BY window_start, PULocationID;
We deployed the job using:
docker compose exec jobmanager ./bin/flink run -py /opt/src/job/aggregation_5min.py --pyFiles /opt/src -d
Once submitted, the job appears in the Flink UI (localhost:8081) where we can monitor:
job status (RUNNING / RESTARTING)
task performance
logs and errors
This step introduced several real-world issues:
Caused by TaskManager crashes or misconfigurations
Required checking logs and understanding failure points
JSON structure from Kafka must exactly match Flink table schema
Incorrect JDBC or Kafka configs can silently fail or restart jobs
Building this real-time streaming pipeline was more than just a technical exercise — it was a deep dive into how modern data systems behave in the real world.
From ingesting raw data into Kafka, to processing it with Flink, and finally persisting aggregated results in PostgreSQL, this project highlighted both the power and complexity of streaming architectures.
Unlike batch processing:
Results are not immediate
Systems are always running
You must think in terms of continuous data flow, not fixed datasets
Kafka → ingestion and buffering
Flink → computation and stateful processing
PostgreSQL → storage and querying
Understanding this separation made debugging
Here’s my homework solution: https://github.com/stephandoh/zoomcamp7948458
You can sign up here: https://github.com/DataTalksClub/data-engineering-zoomcamp/