infinyon.com
Open in
urlscan Pro
2606:4700:3031::6815:30b2
Public Scan
Submitted URL: https://d2w2vp04.na1.hubspotlinks.com/Ctc/W+113/d2w2vP04/VW7wF56pMs2ZW1VW23T2vjHrJW2F3lfm4VZp5JN6L2Zsk3q90JV1-WJV7CgD09W7ZTHG65jdjFKW5...
Effective URL: https://infinyon.com/blog/2023/01/jupyter-with-real-time-data/?utm_campaign=Meeting%20Request%20Emails&utm_medium=ema...
Submission: On January 18 via api from US — Scanned from DE
Effective URL: https://infinyon.com/blog/2023/01/jupyter-with-real-time-data/?utm_campaign=Meeting%20Request%20Emails&utm_medium=ema...
Submission: On January 18 via api from US — Scanned from DE
Form analysis
0 forms found in the DOMText Content
Upcoming webinar | Real-time Pipeline Monitoring for the Energy Sector: Register Now * Products * InfinyOn Cloud * Fluvio OSS * Blog * Docs * Cloud Docs * APIs * CLI * Fluvio Docs * Learn * Webinars * Use Cases * Company * About Us * Careers * Press Releases * Resources * Try Cloud HOW TO PROCESS STREAMING DATA USING GOOGLE COLAB OR JUPYTER NOTEBOOK by Alexander Mikhalev (@AlexMikhalev) | January 17, 2023 | Reading time: 4 minutes For the past few years, organizations have been adopting real-time streaming services but continue using batch processing for machine learning ML tools and analytics. Using databases and ETL tools as a bridge between real-time and ML adds unnecessary complexity and lengthens the time to resolution. This blog aims to demonstrate that ML tools can interact with real-time streams using Python without needing ETL. You will create an account in InfinyOn Cloud, set up a Jupyter Notebook environment, and write a small script that joins the two. Let’s begin the journey to real-time analytics. A running example: PREREQUISITE This blog assumes the following: * An active InfinyOn Cloud Account - follow this tutorial to setup an account and provision a fluivo cluster. * Familiarity with Google Colab or Jupyter Notebooks. * An email account with OAuth 2.0 support. Let’s get started. PROVISION DATA STREAMING TOPICS IN INFINYON CLOUD Install fluvio CLI to manage your cluster in InfinyOn Cloud: Copy $ curl -fsS https://packages.fluvio.io/v1/install.sh | bash Login into your InfinyOn cloud account: Copy $ fluvio cloud login --use-oauth2 Create a new topic for our streams: Copy $ fluvio topic create hello-python Copy below data and save into a data.json file: Copy {"c":27.55,"d":0.41,"dp":1.5107,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709682} {"c":27.55,"d":0.41,"dp":1.5107,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709682} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} {"c":27.56,"d":0.42,"dp":1.5475,"h":27.74,"l":26.15,"o":26.3,"pc":27.14,"t":1668709710} Populate the topic with the records above. For simplicity, we are pre-populate the data stream, but this data could be populated in real-time by an event driven client or connector. Copy $ fluvio produce hello-python -f data.json Create output topic using CLI: Copy $ fluvio topic create hello-python-out RUN IN GOOGLE COLAB Open Notebook in Google Colab, and follow the instructions: After running list(lazy) let’s check the resulting stream: Copy $ fluvio consume hello-python-out -Bd Consuming records from 'hello-python-out' starting from the beginning of log {"c": 27.55, "d": 0.41, "dp": 1.5107, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709682, "median": 26.72} {"c": 27.55, "d": 0.41, "dp": 1.5107, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709682, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} {"c": 27.56, "d": 0.42, "dp": 1.5475, "h": 27.74, "l": 26.15, "o": 26.3, "pc": 27.14, "t": 1668709710, "median": 26.72} Congratulations, you are all set! In the next section, we’ll go over the setups required to run Jupyter locally. RUN ON LOCAL MACHINE There are a couple of prerequisites to run Jupyter on your local machine: * Install Conda Use Conda to create a new environment: Copy conda create -n fluvio_env python=3.9 Activate the environment: Copy conda activate fluvio_env Install jupyter and pandas: Copy pip install jupyter pip install pandas conda install -y matplotlib Start jupyter notebook: Copy jupyter notebook Create a new project: Next, run the same steps as in Google Colab example above: Copy !pip install fluvio==0.14.2 Login to InfinyOn cloud, and login using OAuth2 process with Google: Copy from fluvio import cloud cloud.login() Import dependencies: Copy import json import itertools from fluvio import Fluvio, Offset Connect to hello-python topic in InfinyOn cloud, and create consumer: Copy TOPIC_NAME = "hello-python" PARTITION = 0 fluvio = Fluvio.connect() records=[] consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION) Read the first eight records from the beginning of the data stream: Copy records = (json.loads(record.value_string()) for record in itertools.islice(consumer.stream(Offset.beginning()), 8)) This line runs instantly because it creates a generator. Generaters are interepreted from right to left: 1. Create a stream consumer 2. Take a slice of 8 records using itertools.islice 3. Turn each record into json by record.value_string() string and json.loads Let’s turn the eight records into pandas dataframe using json_normalize: Copy import pandas df = pandas.json_normalize(records) df.plot() Now you can apply any pandas data transformation or action to the streamed data. For example add column with median: Copy df['median'] = df.median(numeric_only=True, axis=1) And if you set offset to read from the end of the stream via Offset.end(), the notebook cell will be locked until you start populating data into the stream. The next step is to write back data into the stream, and we can do it using lazy using python generators as well. Create output producer: Copy OUTPUT_TOPIC="hello-python-out" producer = fluvio.topic_producer(OUTPUT_TOPIC) Create a lazy producer generator: Copy lazy = (producer.send_string(json.dumps(i)) for i in df.to_dict("records")) It runs instantly and it only returns iterable. Evaluate generator: Copy list(lazy) CONCLUSION In this blog post, we highlighted the significance of streams in contemporary development and emphasized that working with data streams in Python can be straightforward. The example showed how to read, process, and write streams via Python generators to make the process fast and efficient. The code used in the demonstration is available on Google Colab at this link. Machine learning developers can now bridge their Jupyter notebook experiments with real-time streaming capabilities with ease. So, be sure to join our Discord server if you want to talk to us or have any questions. Until next time! FURTHER READING * Handling XML data in Fluvio SmartModules * Transform streaming data in real-time with WebAssembly * How to Write to Apache Kafka from a Fluvio topic Building Real-time IoT Apps. Share On Y -------------------------------------------------------------------------------- Author Alexander Mikhalev -------------------------------------------------------------------------------- Fluvio Open Source -------------------------------------------------------------------------------- Try Infinyon Cloud -------------------------------------------------------------------------------- Subscribe to RSS feed * * * * -------------------------------------------------------------------------------- * Terms of Use * | * Security * | * Privacy Policy