deleted init integration - 'distributed_produce'

This commit is contained in:
Joshua E. Jodesty 2019-10-01 12:03:43 -04:00
parent 9fc12e7cce
commit cdfc4b474f
6 changed files with 0 additions and 126 deletions

View File

@ -1,32 +0,0 @@
```
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
```
**Description:**
Distributed Produce (**distroduce**) is a cadCAD feature leveraging Apache Spark and Apache Kafka to enable in-stream
data processing application development and throughput benchmarking for Kafka clusters.
**Properties:**
* enables cadCAD's user-defined simulation framework to publish events/messages to Kafka Clusters
* enables scalable message publishing Kafka clusters by distributing simulated event/message creation and publishing on
an EMR cluster using Spark and Kafka Producer
#### Installation / Build From Source:
```
pip3 install -r requirements.txt
zip -rq distributed_produce/dist/distroduce.zip cadCAD/distroduce/
```
#### Usage:
```
spark-submit --py-files distributed_produce/dist/distroduce.zip distributed_produce/examples/event_bench/main.py
```

View File

@ -1,10 +0,0 @@
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
promulgate

Binary file not shown.

View File

@ -1,10 +0,0 @@
from kafka import KafkaConsumer
from datetime import datetime
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
i = 1
for message in consumer:
percent = i / 10000
print(str(datetime.now()) + " - " + str(percent) + ": " + str(i))
i += 1

View File

@ -1,58 +0,0 @@
from functools import reduce
from datetime import timedelta
from pprint import pprint
import time
from pathos.multiprocessing import ThreadPool as TPool
from cadCAD.utils import flatten
from cadCAD.distroduce.executor import distributed_produce
from distributed_produce.examples.event_bench.spark.session import spark_context as sc
def main(sc, spark_runs, rdd_parts, sim_time, sim_runs, parameterized_message):
publish_times, spark_job_times = [], []
prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}
exec_spark_job = lambda run: distributed_produce(
sc, run, sim_time, sim_runs, rdd_parts, parameterized_message, prod_config
)
job = 0
with TPool(spark_runs) as t:
start_time = time.time()
publish_times.append(t.map(exec_spark_job, range(spark_runs)))
spark_job_times.append({'job_num': job, 'job_time': time.time() - start_time})
job += 1
publish_times = sorted(flatten(list(reduce(lambda a, b: a + b, publish_times))), key=lambda x: x[0])
# publish_start_sec = list(set([time[0].second for time in publish_times]))
publish_send_secs = [time[1] for time in publish_times]
publish_send_dts = [time[0] for time in publish_times]
send_time = publish_send_dts[-1] - publish_send_dts[0] + timedelta(microseconds=1000000 * publish_send_secs[-1])
sent_messages = sim_time * sim_runs
print(f"Spark Job Times: ")
pprint(spark_job_times)
print()
print(f"Messages per Second: {float(sent_messages) / (float(send_time.seconds) + float(send_time.microseconds/1000000))}")
print(f"Sent Messages: {sent_messages}")
print(f"Send Time (Seconds): {send_time}")
print(f"Avg. Send Loop Duration: {sum(publish_send_secs)/(spark_runs*25)}")
if __name__ == "__main__":
sc.setLogLevel("ERROR")
spark_runs = 1
sim_time = 400
sim_runs = 25
rdd_parts = 8
def msg(t):
return str(f"*****************message_{t}").encode('utf-8')
main(sc, spark_runs, rdd_parts, sim_time, sim_runs, msg)
# prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0, 'max_in_flight_requests_per_connection': 1, 'batch_size': 0, 'retries': 0}
# print(f"Second Starts: {len(publish_start_sec)}")
# print(f"Start Seconds: {publish_start_sec}")
# pprint(publish_send_secs)
# pprint(publish_times)

View File

@ -1,16 +0,0 @@
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
# import os
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
spark = SparkSession\
.builder\
.appName("DistributedProduce")\
.getOrCreate()
spark_context: SparkContext = spark.sparkContext
print(f"Spark UI: {spark_context.uiWebUrl}")
print()