In this project, I designed and implemented a real-time streaming end-to-end data engineering pipeline that captures real estate listings from Zoopla using the BrightData API. The data flows through a Kafka cluster, a message broker, which effectively manages the movement of data from the source to the storage system (sink), in this case, Cassandra DB. Utilizing Apache Spark, the pipeline handles large-scale data processing efficiently. This setup is specifically engineered to optimize real estate market analysis, providing a robust tool for dynamic and precise market evaluation. For an in-depth look at the project, you are welcome to visit my GitHub repository.
This project exemplifies the power of integrating multiple technologies to transform raw data into a valuable strategic asset, driving forward the capabilities of real estate market analytics.
Step 1. Clone the repository to your local machine:
Step 2. Building Docker Image:
Step 3. Start Docker Container (make sure the Docker client is up and running on your machine first!)
Step 3. Start Data Ingestion process:
Step 4. Start Spark Consumer:
In this example, I showcase how Apache Spark serves as an efficient consumer by extracting data from Apache Kafka and subsequently storing it in CassandraDB:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(**name**)
def get_cassandra_session():
"""Retrieve or create a Cassandra session."""
if 'cassandra_session' not in globals():
cluster = Cluster(["cassandra"])
globals()['cassandra_session'] = cluster.connect()
return globals()['cassandra_session']
def setup_cassandra(session):
"""Setup the keyspace and table in Cassandra."""
session.execute("""
CREATE KEYSPACE IF NOT EXISTS property_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
logger.info("Keyspace created successfully!")
session.execute("""
CREATE TABLE IF NOT EXISTS property_streams.properties (
price text, title text, link text, pictures list<text>, floor_plan text,
address text, bedrooms text, bathrooms text, receptions text, epc_rating text,
tenure text, time_remaining_on_lease text, service_charge text,
council_tax_band text, ground_rent text, PRIMARY KEY (link)
);
""")
logger.info("Table created successfully!")
def insert_data(\*\*kwargs):
"""Insert data into Cassandra table using a session created at the executor."""
session = get_cassandra_session()
session.execute("""
INSERT INTO property_streams.properties (
price, title, link, pictures, floor_plan, address, bedrooms, bathrooms,
receptions, epc_rating, tenure, time_remaining_on_lease, service_charge, council_tax_band, ground_rent
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
kwargs['price'], kwargs['title'], kwargs['link'], kwargs['pictures'],
kwargs['floor_plan'], kwargs['address'], kwargs['bedrooms'], kwargs['bathrooms'],
kwargs['receptions'], kwargs['epc_rating'], kwargs['tenure'], kwargs['time_remaining_on_lease'],
kwargs['service_charge'], kwargs['council_tax_band'], kwargs['ground_rent']
))
logger.info("Data inserted successfully!")
def define_kafka_to_cassandra_flow(spark):
"""Define data flow from Kafka to Cassandra using Spark."""
schema = StructType([
StructField("price", FloatType(), True),
StructField("title", StringType(), True),
StructField("link", StringType(), True),
StructField("pictures", ArrayType(StringType()), True),
StructField("floor_plan", StringType(), True),
StructField("address", StringType(), True),
StructField("bedrooms", StringType(), True),
StructField("bathrooms", StringType(), True),
StructField("receptions", StringType(), True),
StructField("epc_rating", StringType(), True),
StructField("tenure", StringType(), True),
StructField("time_remaining_on_lease", StringType(), True),
StructField("service_charge", StringType(), True),
StructField("council_tax_band", StringType(), True),
StructField("ground_rent", StringType(), True)
])
kafka_df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:9092")
.option("subscribe", "properties")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*"))
kafka_df.writeStream.foreachBatch(
lambda batch_df, _: batch_df.foreach(
lambda row: insert_data(**row.asDict())
)
).start().awaitTermination()
def main():
spark = SparkSession.builder.appName("RealEstateConsumer").config(
"spark.cassandra.connection.host", "cassandra"
).config(
"spark.jars.packages",
"com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
).getOrCreate()
session = get_cassandra_session()
setup_cassandra(session)
define_kafka_to_cassandra_flow(spark)
if **name** == "**main**":
main()