infinyon.com Open in urlscan Pro
2606:4700:3031::6815:30b2  Public Scan

Submitted URL: https://d2w2vp04.na1.hubspotlinks.com/Ctc/W+113/d2w2vP04/VW7wF56pMs2ZW1VW23T2vjHrJW2F3lfm4VZp5JN6L2Zsk3q90JV1-WJV7CgD09W7ZTHG65jdjFKW5...
Effective URL: https://infinyon.com/blog/2023/01/jupyter-with-real-time-data/?utm_campaign=Meeting%20Request%20Emails&utm_medium=ema...
Submission: On January 18 via api from US — Scanned from DE

Form analysis 0 forms found in the DOM

Text Content

Upcoming webinar | Real-time Pipeline Monitoring for the Energy Sector: Register
Now
 * Products
   * InfinyOn Cloud
   * Fluvio OSS
 * Blog
 * Docs
   * Cloud Docs
   * APIs
   * CLI
   * Fluvio Docs
 * Learn
   * Webinars
   * Use Cases
 * Company
   * About Us
   * Careers
   * Press Releases
   * Resources
 * Try Cloud


HOW TO PROCESS STREAMING DATA USING GOOGLE COLAB OR JUPYTER NOTEBOOK

by Alexander Mikhalev (@AlexMikhalev)
| January 17, 2023
| Reading time: 4 minutes

For the past few years, organizations have been adopting real-time streaming
services but continue using batch processing for machine learning ML tools and
analytics. Using databases and ETL tools as a bridge between real-time and ML
adds unnecessary complexity and lengthens the time to resolution. This blog aims
to demonstrate that ML tools can interact with real-time streams using Python
without needing ETL. You will create an account in InfinyOn Cloud, set up a
Jupyter Notebook environment, and write a small script that joins the two. Let’s
begin the journey to real-time analytics.



A running example:


 


PREREQUISITE

This blog assumes the following:

 * An active InfinyOn Cloud Account - follow this tutorial to setup an account
   and provision a fluivo cluster.
 * Familiarity with Google Colab or Jupyter Notebooks.
 * An email account with OAuth 2.0 support.

Let’s get started.

 


PROVISION DATA STREAMING TOPICS IN INFINYON CLOUD

Install fluvio CLI to manage your cluster in InfinyOn Cloud:


Copy

$ curl -fsS https://packages.fluvio.io/v1/install.sh | bash


Login into your InfinyOn cloud account:


Copy

$ fluvio cloud login --use-oauth2


Create a new topic for our streams:


Copy

$ fluvio topic create hello-python


Copy below data and save into a data.json file:


Copy

{"c":27.55,"d":0.41,"dp":1.5107,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709682}
{"c":27.55,"d":0.41,"dp":1.5107,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709682}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}
{"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710}


Populate the topic with the records above. For simplicity, we are pre-populate
the data stream, but this data could be populated in real-time by an event
driven client or connector.


Copy

$ fluvio produce hello-python -f data.json


Create output topic using CLI:


Copy

$ fluvio topic create hello-python-out


 


RUN IN GOOGLE COLAB

Open Notebook in Google Colab, and follow the instructions:



After running list(lazy) let’s check the resulting stream:


Copy

$ fluvio consume hello-python-out -Bd
Consuming records from 'hello-python-out' starting from the beginning of log
{"c": 27.55, "d": 0.41, "dp": 1.5107, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709682, "median": 26.72}
{"c": 27.55, "d": 0.41, "dp": 1.5107, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709682, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}
{"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72}


Congratulations, you are all set! In the next section, we’ll go over the setups
required to run Jupyter locally.

 


RUN ON LOCAL MACHINE

There are a couple of prerequisites to run Jupyter on your local machine:

 * Install Conda

Use Conda to create a new environment:


Copy

conda create -n fluvio_env python=3.9


Activate the environment:


Copy

conda activate fluvio_env


Install jupyter and pandas:


Copy

pip install jupyter
pip install pandas
conda install -y matplotlib


Start jupyter notebook:


Copy

jupyter notebook


Create a new project:



Next, run the same steps as in Google Colab example above:


Copy

!pip install fluvio==0.14.2


Login to InfinyOn cloud, and login using OAuth2 process with Google:


Copy

from fluvio import cloud
cloud.login()


Import dependencies:


Copy

import json
import itertools
from fluvio import Fluvio, Offset


Connect to hello-python topic in InfinyOn cloud, and create consumer:


Copy

TOPIC_NAME = "hello-python"
PARTITION = 0
fluvio = Fluvio.connect()
records=[]
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)


Read the first eight records from the beginning of the data stream:


Copy

records = (json.loads(record.value_string()) for record in itertools.islice(consumer.stream(Offset.beginning()), 8))


This line runs instantly because it creates a generator. Generaters are
interepreted from right to left:

 1. Create a stream consumer
 2. Take a slice of 8 records using itertools.islice
 3. Turn each record into json by record.value_string() string and json.loads

Let’s turn the eight records into pandas dataframe using json_normalize:


Copy

import pandas
df = pandas.json_normalize(records)
df.plot()


Now you can apply any pandas data transformation or action to the streamed data.
For example add column with median:


Copy

df['median'] = df.median(numeric_only=True, axis=1)


And if you set offset to read from the end of the stream via Offset.end(), the
notebook cell will be locked until you start populating data into the stream.

The next step is to write back data into the stream, and we can do it using lazy
using python generators as well. Create output producer:


Copy

OUTPUT_TOPIC="hello-python-out"
producer = fluvio.topic_producer(OUTPUT_TOPIC)


Create a lazy producer generator:


Copy

lazy = (producer.send_string(json.dumps(i)) for i in df.to_dict("records"))


It runs instantly and it only returns iterable. Evaluate generator:


Copy

list(lazy)


 


CONCLUSION

In this blog post, we highlighted the significance of streams in contemporary
development and emphasized that working with data streams in Python can be
straightforward. The example showed how to read, process, and write streams via
Python generators to make the process fast and efficient. The code used in the
demonstration is available on Google Colab at this link.

Machine learning developers can now bridge their Jupyter notebook experiments
with real-time streaming capabilities with ease. So, be sure to join our Discord
server if you want to talk to us or have any questions. Until next time!

 


FURTHER READING

 * Handling XML data in Fluvio SmartModules
 * Transform streaming data in real-time with WebAssembly
 * How to Write to Apache Kafka from a Fluvio topic

Building Real-time IoT Apps.
Share On
Y

--------------------------------------------------------------------------------

Author
Alexander Mikhalev

--------------------------------------------------------------------------------

Fluvio Open Source

--------------------------------------------------------------------------------

Try Infinyon Cloud

--------------------------------------------------------------------------------

Subscribe to RSS feed
 * 
 * 
 * 
 * 

--------------------------------------------------------------------------------

 * Terms of Use
 * |
 * Security
 * |
 * Privacy Policy