docs.sravz.com Open in urlscan Pro
217.76.52.221  Public Scan

Submitted URL: http://docs.sravz.com/docs/tech/cpp/redis-streams
Effective URL: https://docs.sravz.com/docs/tech/cpp/redis-streams/
Submission: On October 02 via manual from US — Scanned from CA

Form analysis 0 forms found in the DOM

Text Content

BLOG


 * Introduction
 * Sravz Platform
 * Presentations
 * Sravz Components
   * Dashboard
   * Datalake
   * Taskrunner
 * Data
   * Dashboard
   * Assets Details
   * Assets
   * Charts
   * Earnings
   * Fundamentals
   * Insider Trades
   * News
   * Portfolios
   * Quotes
 * Analytics
   * Pca Analysis
   * Time Series Analysis
   * Cross Asset Charts
   * Prophet Model Analysis
   * Rolling Statistics
   * Pyfolio Returns Analysis
   * Ytd Analysis
   * Covariance Analysis
   * Spread Analysis
   * Leveraged Funds
   * Long Short
 * Finance
   * Forex Assets
   * Futures
   * Index Assets
   * Mortgage Rates
   * Mutual Funds
   * Options
   * Rates Assets
 * Economics
   * Calendar
   * Fed Funds Rate
 * Reports
   * Indices
 * Tech
   * Airflow Dask
     * Ticker Fundamentals Uploader
     * Ticker Stats Generator
     * Etf Tickers Uploader
     * Etf Historical Quotes Uploader
     * Mutual Fund Fundamentals Uploader
   * Angular
     * Angular Igniteui Upgrade V14 V15
     * Angular Igniteui Upgrade V15 V16
     * Angular Igniteui Upgrade V16 V18
   * Cpp
     * Docker Dev Env Setup
     * Redis Streams
     * Boost Beast Websockets
     * Boost Beast Websocket Server
     * Boost Program Options
     * Boost Beast Rest API Server
     * Cpp Mongodb Service
     * Cpp Aws S3 Service
     * Rest API Router Controller Services
     * Boost Beast Rest API Server Cors
     * Cpp Aws S3 Select
     * Cmake
     * Fundamentals Controller
     * Boost Beast Rest API Models
   * Docker
     * Docker Swarm Setup
   * Golang
     * Golang Crash Course
     * Golang Websocket
   * Ibkr
     * Ibkr Python API
   * Python
     * Langchain
     * Openai
   * Redis
     * Redis Timeseries
   * Rust
     * Historical Ticker Plant
   * Ticker Plant
     * Historical
   * Wordpress
     * Docker Swarm Deployment
 * Links
   * About
   * Careers
   * Terms and Conditions

Redis Streams


USE CASE

Websocket <-> NodeJS (Backend) <-> Redis Stream <-> C++ (Backend)

Use this program to integrate different middleware. In Sravz Case we use NodeJS
backend to interact with C++ backend. NodeJS backend sends messages to C++
backend which process the messages, performs computationally heavy work and
sends back the response to NodeJS. NodeJS sends the respons to the web browser
on the websockets


LIBRARIES USED

 * C++ Boost ASIO
 * hiredis


DATAFLOW DIAGRAM

Session 1

 * Covers Boost ASIO based redis streams consumer & producer


VIDEO EXPLANATION OF THE CODE




SOURCE CODE

This file contains bidirectional Unicode text that may be interpreted or
compiled differently than what appears below. To review, open the file in an
editor that reveals hidden Unicode characters. Learn more about bidirectional
Unicode characters
Show hidden characters

/** * Sravz LLC * TODO: Make it Object Oriented * Use io_context instead of
io_service * Use async handlers * Video explanation:
https://www.youtube.com/watch?v=E3ciyWpZ2Xw * Documentation Link:
https://docs.sravz.com/docs/tech/cpp/redis-streams/ **/ #include <iostream>
#include <unordered_map> #include <redismods++/BloomFilter.h> #include
<redismods++/BloomFilter.h> #include
<redis-plus-plus/test/src/sw/redis++/utils.h> #include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/io_context.hpp> #include
<boost/asio/io_service.hpp> #include <boost/asio/ip/tcp.hpp> #include
<boost/asio/signal_set.hpp> #include <boost/thread.hpp> #include
<boost/asio/write.hpp> #include <boost/lexical_cast.hpp> #include
<boost/log/trivial.hpp> #include <cstdio> #include <boost/asio.hpp> #include
<cstddef> #include <string> #include <functional> using RedisInstance =
sw::redis::Redis; boost::mutex global_stream_lock; // Gets env variable bool
getenv(const char *name, std::string &env) { const char *ret = getenv(name); if
(ret) { env = std::string(ret); } else { std::cout << "Env variable: " << name
<< " not set!!!"; } return !!ret; } // Returns redis connection options to be
used in redis connection creation sw::redis::ConnectionOptions
getRedisConnectionOptions() { std::string REDIS_HOST; std::string REDIS_PORT;
std::string REDIS_PASSWORD; sw::redis::ConnectionOptions opts; if
(getenv("REDIS_HOST", REDIS_HOST)) opts.host = REDIS_HOST; if
(getenv("REDIS_PORT", REDIS_PORT)) opts.port = std::stoi(REDIS_PORT); if
(getenv("REDIS_PASSWORD", REDIS_PASSWORD)) opts.password = REDIS_PASSWORD;
return opts; } // Initialize the worker thread. Creates a redis connection per
thread // Connection is stored in a map with the key = thread ID void
initWorkerThread(boost::shared_ptr<boost::asio::io_service> io_service,
boost::shared_ptr<std::map<boost::thread::id, boost::shared_ptr<RedisInstance>>>
connectionPtrs) { // A global lock so the redis connection map can be updated
with the connection objects global_stream_lock.lock(); std::cout << "[" <<
boost::this_thread::get_id() << "] Thread Start" << std::endl; // A map with
threadID -> redis connection object // Each thread will use it's own resids
connection since redis connection is not thread safe
boost::shared_ptr<RedisInstance> redis_ptr(new
RedisInstance(getRedisConnectionOptions()));
(*connectionPtrs)[boost::this_thread::get_id()] = redis_ptr;
global_stream_lock.unlock(); // Start the IO service, this will start the main
thread io_service->run(); // Unlock global_stream_lock.lock(); std::cout << "["
<< boost::this_thread::get_id() << "] Thread Finish" << std::endl;
global_stream_lock.unlock(); } // Process the message from the stream and send a
response to the redis stream void processMessage(std::string message,
boost::shared_ptr<std::map<boost::thread::id, boost::shared_ptr<RedisInstance>>>
redisConnection_ptrs) { // Declare a variable to hold the redis message auto
key_response = "backend-node"; std::vector<std::pair<std::string, std::string>>
attrs_response = { {"f1", message +
boost::lexical_cast<std::string>(boost::this_thread::get_id())}, {"f2", message
+ boost::lexical_cast<std::string>(boost::this_thread::get_id())}}; // Get the
thread ID to be used to get the redis connection from the data structure which
holds // the redis conneciton object auto thread_id =
boost::this_thread::get_id(); std::cout << "Sending response to node with ID by
thread: " << thread_id << "\n"; // Send the response to the redis stream auto
response_id = (*redisConnection_ptrs)[thread_id]->xadd(key_response, "*",
attrs_response.begin(), attrs_response.end()); std::cout << "Response sent to
node with ID: " << response_id << "\n"; } // Runs Redis consumer. Listens to the
Redis stream and pulls a message from the stream. // Sends an ack to the stream
so the message is deleted from the stream void
runRedisConsumer(boost::shared_ptr<boost::asio::io_service> io_service,
boost::shared_ptr<RedisInstance> redis,
boost::shared_ptr<std::map<boost::thread::id, boost::shared_ptr<RedisInstance>>>
redisConnection_ptrs) { // Declare redis group name and other variables auto key
= "backend-cpp"; auto key_response = "backend-node";
sw::redis::test::KeyDeleter<RedisInstance> deleter(*redis, key); auto group =
"backend-cpp"; auto consumer1 = "consumer1"; // Declare variable to hold Redis
Message std::vector<std::pair<std::string, std::string>> attrs = { {"f1", "v1"},
{"f2", "v2"}}; auto id = redis->xadd(key, "*", attrs.begin(), attrs.end()); auto
keys = {std::make_pair(key, ">")}; // Create redis consumer group backend-cpp on
which this cpp program will send the response try { redis->xgroup_create(key,
group, "$", true); } catch(std::exception const& ex) { //
BOOST_LOG_TRIVIAL(warning) << "Cannot create consumer group: " << ex.what();
std::cout << "Cannot create consumer group: " << ex.what(); } //
std::chrono::milliseconds using Item = std::pair<std::string,
std::unordered_map<std::string, std::string>>; using Result =
std::unordered_map<std::string, std::vector<Item>>; // Create a timeer, redis
client will listen to the messages for this amount of time // and then restart
the listen loop auto timeout = std::chrono::milliseconds(10000); // Default wait
for 1 second // IO Service will block in this while loop and keep polling for
the messages while (true) { Result result; redis->xreadgroup(group,
boost::lexical_cast<std::string>(boost::this_thread::get_id()), key,
std::string(">"), timeout, // Timeout milliseconds - polls every second 1,
std::inserter(result, result.end())); // Add the response to the result variable
if (result.size() > 0 ) { std::cout << result.size() << " " <<
result[key][0].second["f1"] << " " << result[key][0].second["f2"] << "\n"; auto
acked_id = redis->xack(key, group, result[key][0].first); auto delete_id =
redis->xdel(key, result[key][0].first); // Ack the redis message so it will be
deleted from the redis stream std::cout << "Acked ID: " << acked_id << "\n"; if
(acked_id > 0) { timeout = std::chrono::milliseconds(0); } else { timeout =
std::chrono::milliseconds(1000); } // Post the message to the io_service so some
thread will process and message and send the response
io_service->post(boost::bind(processMessage, result[key][0].second["f1"],
redisConnection_ptrs)); // Empty the result and reuse the variable
result.erase(result.begin(), result.end()); } } } int main(int argc, char**
argv) { // Declare the IO Service boost::shared_ptr<boost::asio::io_service>
io_service( new boost::asio::io_service); // Add empty work to IO Service so the
IO Service does not exit when started
boost::shared_ptr<boost::asio::io_service::work> work( new
boost::asio::io_service::work(*io_service)); // Dispaly IO Service Thread ID
global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() <<
"] The program will exit when all work has finished." << std::endl;
global_stream_lock.unlock(); // Create Redis Connection Instance using
RedisInstance = sw::redis::Redis; RedisInstance
redis(getRedisConnectionOptions()); // Make the Redis Connection Instance a
shared pointer so it could be passed around boost::shared_ptr<RedisInstance>
redis_ptr(new RedisInstance(getRedisConnectionOptions()));
boost::shared_ptr<std::map<boost::thread::id, boost::shared_ptr<RedisInstance>>>
redisConnection_ptrs(new std::map<boost::thread::id,
boost::shared_ptr<RedisInstance>>); // Created Redis Stream Group Key IDs auto
key = "backend-cpp"; auto group = "backend-cpp"; try { redis.xgroup_create(key,
group, "$", true); } catch(std::exception const& ex) { //
BOOST_LOG_TRIVIAL(warning) << "Cannot create consumer group: " << ex.what();
std::cout << "Cannot create consumer group: " << ex.what() << "\n"; } // Create
some pthreads which join the IO Service and perform the work int
MAX_NUMBER_OF_THREADS = 2; std::string NUM_THREADS; if (getenv("NUM_THREADS",
NUM_THREADS)) MAX_NUMBER_OF_THREADS = std::stoi(NUM_THREADS);
boost::thread_group worker_threads; std::cout << "Starting: " << NUM_THREADS <<
" workers" << "\n"; for (int x = 0; x < MAX_NUMBER_OF_THREADS; ++x) {
worker_threads.create_thread( boost::bind(&initWorkerThread, io_service,
redisConnection_ptrs)); } // Pass the IO service to the Redis Consumer function
that will start the IO Service runRedisConsumer(io_service, redis_ptr,
redisConnection_ptrs); // Wait for the threads to join
worker_threads.join_all(); return 0; }

view raw gistfile1.txt hosted with ❤ by GitHub


Please enable JavaScript to view the comments powered by Disqus.
 * * Libraries Used
   * Dataflow Diagram
   * Video explanation of the code
   * Source Code