init distroduce
This commit is contained in:
parent
b9c7775d07
commit
662046f34f
|
|
@ -0,0 +1,30 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
# ToDo: Model Async communication here
|
||||||
|
# ToDo: Configuration of Input
|
||||||
|
def configure_producer(msg, config):
|
||||||
|
from kafka import KafkaProducer
|
||||||
|
def send_messages(events):
|
||||||
|
producer = KafkaProducer(**config)
|
||||||
|
|
||||||
|
start_timestamp = datetime.now()
|
||||||
|
for event in range(events):
|
||||||
|
producer.send('test', msg(event)).get()
|
||||||
|
delta = datetime.now() - start_timestamp
|
||||||
|
|
||||||
|
return start_timestamp, delta.total_seconds()
|
||||||
|
|
||||||
|
return send_messages
|
||||||
|
|
||||||
|
# def action(step):
|
||||||
|
# step += 1
|
||||||
|
# percent = step / 10000
|
||||||
|
# print(str(datetime.now()) + " - " + str(percent) + ": " + str(step))
|
||||||
|
|
||||||
|
# def configure_consumer(config, elemental_action: function):
|
||||||
|
# from kafka import KafkaConsumer
|
||||||
|
# def receive_messages():
|
||||||
|
# consumer = KafkaConsumer(**config)
|
||||||
|
# return [elemental_action(i) for i, message in enumerate(consumer)]
|
||||||
|
#
|
||||||
|
# return receive_messages
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
from cadCAD.distroduce.executor.spark.jobs import distributed_produce
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
from cadCAD.distroduce.configuration.kakfa import configure_producer
|
||||||
|
from pyspark.context import SparkContext
|
||||||
|
|
||||||
|
ascii_art = r'''
|
||||||
|
___ _ __ _ __ __ __
|
||||||
|
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||||
|
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||||
|
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||||
|
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||||
|
/ ___// __// _ \/ _ // // // __// -_)
|
||||||
|
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||||
|
by Joshua E. Jodesty
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
def distributed_produce(
|
||||||
|
sc: SparkContext,
|
||||||
|
spark_run,
|
||||||
|
sim_time,
|
||||||
|
sim_runs,
|
||||||
|
rdd_parts,
|
||||||
|
parameterized_message,
|
||||||
|
prod_config
|
||||||
|
):
|
||||||
|
print(ascii_art)
|
||||||
|
message_counts = [sim_time] * sim_runs
|
||||||
|
msg_rdd = sc.parallelize(message_counts).repartition(rdd_parts)
|
||||||
|
parts = msg_rdd.getNumPartitions()
|
||||||
|
print()
|
||||||
|
print(f"RDD_{spark_run} - Partitions: {parts}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
produce = configure_producer(parameterized_message, prod_config)
|
||||||
|
return msg_rdd.map(produce).collect()
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
def flatten(l):
|
||||||
|
return [item for sublist in l for item in sublist]
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
```
|
||||||
|
___ _ __ _ __ __ __
|
||||||
|
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||||
|
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||||
|
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||||
|
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||||
|
/ ___// __// _ \/ _ // // // __// -_)
|
||||||
|
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||||
|
by Joshua E. Jodesty
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
**Description:**
|
||||||
|
Distributed Produce (**distroduce**) is a cadCAD feature leveraging Apache Spark and Apache Kafka to enable in-stream
|
||||||
|
data processing application development and throughput benchmarking for Kafka clusters.
|
||||||
|
|
||||||
|
**Properties:**
|
||||||
|
* enables cadCAD's user-defined simulation framework to publish events/messages to Kafka Clusters
|
||||||
|
* enables scalable message publishing Kafka clusters by distributing simulated event/message creation and publishing on
|
||||||
|
an EMR cluster using Spark and Kafka Producer
|
||||||
|
|
||||||
|
|
||||||
|
#### Installation / Build From Source:
|
||||||
|
```
|
||||||
|
pip3 install -r requirements.txt
|
||||||
|
zip -rq distributed_produce/dist/distroduce.zip cadCAD/distroduce/
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Usage:
|
||||||
|
```
|
||||||
|
spark-submit --py-files distributed_produce/dist/distroduce.zip distributed_produce/examples/event_bench/main.py
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
___ _ __ _ __ __ __
|
||||||
|
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||||
|
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||||
|
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||||
|
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||||
|
/ ___// __// _ \/ _ // // // __// -_)
|
||||||
|
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||||
|
by Joshua E. Jodesty
|
||||||
|
|
||||||
|
promulgate
|
||||||
Binary file not shown.
|
|
@ -0,0 +1,11 @@
|
||||||
|
from kafka import KafkaConsumer
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
|
||||||
|
|
||||||
|
i = 1
|
||||||
|
for message in consumer:
|
||||||
|
percent = i / 10000
|
||||||
|
print(str(datetime.now()) + " - " + str(percent) + ": " + str(i))
|
||||||
|
i += 1
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
from functools import reduce
|
||||||
|
from datetime import timedelta
|
||||||
|
from pprint import pprint
|
||||||
|
import time
|
||||||
|
from pathos.multiprocessing import ThreadPool as TPool
|
||||||
|
|
||||||
|
from cadCAD.utils import flatten
|
||||||
|
from cadCAD.distroduce.executor import distributed_produce
|
||||||
|
from distributed_produce.examples.event_bench.spark.session import spark_context as sc
|
||||||
|
|
||||||
|
|
||||||
|
def main(sc, spark_runs, rdd_parts, sim_time, sim_runs, parameterized_message):
|
||||||
|
publish_times, spark_job_times = [], []
|
||||||
|
prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0}
|
||||||
|
exec_spark_job = lambda run: distributed_produce(
|
||||||
|
sc, run, sim_time, sim_runs, rdd_parts, parameterized_message, prod_config
|
||||||
|
)
|
||||||
|
|
||||||
|
job = 0
|
||||||
|
with TPool(spark_runs) as t:
|
||||||
|
start_time = time.time()
|
||||||
|
publish_times.append(t.map(exec_spark_job, range(spark_runs)))
|
||||||
|
spark_job_times.append({'job_num': job, 'job_time': time.time() - start_time})
|
||||||
|
job += 1
|
||||||
|
|
||||||
|
publish_times = sorted(flatten(list(reduce(lambda a, b: a + b, publish_times))), key=lambda x: x[0])
|
||||||
|
# publish_start_sec = list(set([time[0].second for time in publish_times]))
|
||||||
|
publish_send_secs = [time[1] for time in publish_times]
|
||||||
|
publish_send_dts = [time[0] for time in publish_times]
|
||||||
|
|
||||||
|
send_time = publish_send_dts[-1] - publish_send_dts[0] + timedelta(microseconds=1000000 * publish_send_secs[-1])
|
||||||
|
sent_messages = sim_time * sim_runs
|
||||||
|
print(f"Spark Job Times: ")
|
||||||
|
pprint(spark_job_times)
|
||||||
|
print()
|
||||||
|
print(f"Messages per Second: {float(sent_messages) / (float(send_time.seconds) + float(send_time.microseconds/1000000))}")
|
||||||
|
print(f"Sent Messages: {sent_messages}")
|
||||||
|
print(f"Send Time (Seconds): {send_time}")
|
||||||
|
print(f"Avg. Send Loop Duration: {sum(publish_send_secs)/(spark_runs*25)}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sc.setLogLevel("ERROR")
|
||||||
|
spark_runs = 1
|
||||||
|
sim_time = 400
|
||||||
|
sim_runs = 25
|
||||||
|
rdd_parts = 8
|
||||||
|
|
||||||
|
def msg(t):
|
||||||
|
return str(f"*****************message_{t}").encode('utf-8')
|
||||||
|
|
||||||
|
main(sc, spark_runs, rdd_parts, sim_time, sim_runs, msg)
|
||||||
|
|
||||||
|
# prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0, 'max_in_flight_requests_per_connection': 1, 'batch_size': 0, 'retries': 0}
|
||||||
|
# print(f"Second Starts: {len(publish_start_sec)}")
|
||||||
|
# print(f"Start Seconds: {publish_start_sec}")
|
||||||
|
# pprint(publish_send_secs)
|
||||||
|
# pprint(publish_times)
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
from pyspark.sql import SparkSession
|
||||||
|
from pyspark.context import SparkContext
|
||||||
|
import os
|
||||||
|
|
||||||
|
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
|
||||||
|
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
|
||||||
|
|
||||||
|
spark = SparkSession\
|
||||||
|
.builder\
|
||||||
|
.appName("DistributedProduce")\
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
spark_context: SparkContext = spark.sparkContext
|
||||||
|
print(f"Spark UI: {spark_context.uiWebUrl}")
|
||||||
|
print()
|
||||||
Loading…
Reference in New Issue