From fd0de2d1c085f5a1758f4f0c39b068b2e2c73daa Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Tue, 24 Sep 2019 12:00:55 -0400 Subject: [PATCH] dev2 --- .gitignore | 5 +- README.md | 175 ++---------------- cadCAD/distroduce/__init__.py | 0 cadCAD/distroduce/configuration/__init__.py | 0 cadCAD/distroduce/configuration/kakfa.py | 30 +++ cadCAD/distroduce/executor/__init__.py | 1 + .../executor/spark/jobs/__init__.py | 34 ++++ cadCAD/distroduce/utils/__init__.py | 2 + distributed_produce/README.md | 32 ++++ distributed_produce/ascii_art.txt | 10 + distributed_produce/dist/distroduce.zip | Bin 0 -> 15277 bytes .../examples/event_bench/consumer.py | 11 ++ .../examples/event_bench/main.py | 58 ++++++ .../event_bench/spark/session/__init__.py | 15 ++ requirements.txt | 4 +- 15 files changed, 214 insertions(+), 163 deletions(-) create mode 100644 cadCAD/distroduce/__init__.py create mode 100644 cadCAD/distroduce/configuration/__init__.py create mode 100644 cadCAD/distroduce/configuration/kakfa.py create mode 100644 cadCAD/distroduce/executor/__init__.py create mode 100644 cadCAD/distroduce/executor/spark/jobs/__init__.py create mode 100644 cadCAD/distroduce/utils/__init__.py create mode 100644 distributed_produce/README.md create mode 100644 distributed_produce/ascii_art.txt create mode 100644 distributed_produce/dist/distroduce.zip create mode 100644 distributed_produce/examples/event_bench/consumer.py create mode 100644 distributed_produce/examples/event_bench/main.py create mode 100644 distributed_produce/examples/event_bench/spark/session/__init__.py diff --git a/.gitignore b/.gitignore index 4735bc2..c15c07f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,7 @@ testing/udo_test.py Simulation.md -monkeytype.sqlite3 \ No newline at end of file +monkeytype.sqlite3 + +distributed_produce/bash/ +distributed_produce/notes.txt \ No newline at end of file diff --git a/README.md b/README.md index eab8129..339c9a2 100644 --- a/README.md +++ b/README.md @@ -6,168 +6,21 @@ \___/\__,_/\__,_/\____/_/ |_/_____/ by BlockScience ``` +***cadCAD*** is a Python package that assists in the processes of designing, testing and validating complex systems +through simulation, with support for Monte Carlo methods, A/B testing and parameter sweeping. -**Introduction:** - -***cadCAD*** is a Python library that assists in the processes of designing, testing and validating complex systems through -simulation. At its core, cadCAD is a differential games engine that supports parameter sweeping and Monte Carlo analyses -and can be easily integrated with other scientific computing Python modules and data science workflows. - -**Description:** - -cadCAD (complex adaptive systems computer-aided design) is a python based, unified modeling framework for stochastic -dynamical systems and differential games for research, validation, and Computer Aided Design of economic systems created -by BlockScience. It is capable of modeling systems at all levels of abstraction from Agent Based Modeling (ABM) to -System Dynamics (SD), and enabling smooth integration of computational social science simulations with empirical data -science workflows. - - -An economic system is treated as a state-based model and defined through a set of endogenous and exogenous state -variables which are updated through mechanisms and environmental processes, respectively. Behavioral models, which may -be deterministic or stochastic, provide the evolution of the system within the action space of the mechanisms. -Mathematical formulations of these economic games treat agent utility as derived from the state rather than direct from -an action, creating a rich, dynamic modeling framework. Simulations may be run with a range of initial conditions and -parameters for states, behaviors, mechanisms, and environmental processes to understand and visualize network behavior -under various conditions. Support for A/B testing policies, Monte Carlo analysis, and other common numerical methods is -provided. - - -For example, cadCAD tool allows us to represent a company’s or community’s current business model along with a desired -future state and helps make informed, rigorously tested decisions on how to get from today’s stage to the future state. -It allows us to use code to solidify our conceptualized ideas and see if the outcome meets our expectations. We can -iteratively refine our work until we have constructed a model that closely reflects reality at the start of the model, -and see how it evolves. We can then use these results to inform business decisions. - -#### Documentation: -* ##### [Tutorials](tutorials) -* ##### [System Model Configuration](documentation/Simulation_Configuration.md) -* ##### [System Simulation Execution](documentation/Simulation_Execution.md) -* ##### [Policy Aggregation](documentation/Policy_Aggregation.md) -* ##### [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md) - -#### 0. Installation: - -**Python 3.6.5** :: Anaconda, Inc. - -**Option A:** Build From Source +# Getting Started +## 1. Install cadCAD +cadCAD requires [Python 3](https://www.python.org/downloads/) +cadCAD can be installed using Python’s package manager, [pip](https://pypi.org/project/cadCAD/) ```bash -pip3 install -r requirements.txt -python3 setup.py sdist bdist_wheel -pip3 install dist/*.whl +pip install cadCAD ``` +## 2. Learn the basics +Check out our tutorials (available both as [Jupyter Notebooks](tutorials) and +[videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we)) to familiarize yourself +with some system modelling concepts and cadCAD terminology. Alternatively, go straight to the +[documentation](documentation). -**Option B:** Proprietary Build Access - -***IMPORTANT NOTE:*** Tokens are issued to those with access to proprietary builds of cadCAD and BlockScience employees **ONLY**. -Replace \ with an issued token in the script below. -```bash -pip3 install pandas pathos fn funcy tabulate -pip3 install cadCAD --extra-index-url https://@repo.fury.io/blockscience/ -``` - - -#### 1. [Configure System Model](documentation/Simulation_Configuration.md) - -#### 2. [Execute Simulations:](documentation/Simulation_Execution.md) - -##### Single Process Execution: -Example System Model Configurations: -* [System Model A](documentation/examples/sys_model_A.py): -`/documentation/examples/sys_model_A.py` -* [System Model B](documentation/examples/sys_model_B.py): -`/documentation/examples/sys_model_B.py` - -Example Simulation Executions: -* [System Model A](documentation/examples/sys_model_A_exec.py): -`/documentation/examples/sys_model_A_exec.py` -* [System Model B](documentation/examples/sys_model_B_exec.py): -`/documentation/examples/sys_model_B_exec.py` - -```python -import pandas as pd -from tabulate import tabulate -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from documentation.examples import sys_model_A -from cadCAD import configs - -exec_mode = ExecutionMode() - -# Single Process Execution using a Single System Model Configuration: -# sys_model_A -sys_model_A = [configs[0]] # sys_model_A -single_proc_ctx = ExecutionContext(context=exec_mode.single_proc) -sys_model_A_simulation = Executor(exec_context=single_proc_ctx, configs=sys_model_A) - -sys_model_A_raw_result, sys_model_A_tensor_field = sys_model_A_simulation.execute() -sys_model_A_result = pd.DataFrame(sys_model_A_raw_result) -print() -print("Tensor Field: sys_model_A") -print(tabulate(sys_model_A_tensor_field, headers='keys', tablefmt='psql')) -print("Result: System Events DataFrame") -print(tabulate(sys_model_A_result, headers='keys', tablefmt='psql')) -print() -``` - -##### Multiple Simulations (Concurrent): -###### Multiple Simulation Execution (Multi Process Execution) -System Model Configurations: -* [System Model A](documentation/examples/sys_model_A.py): -`/documentation/examples/sys_model_A.py` -* [System Model B](documentation/examples/sys_model_B.py): -`/documentation/examples/sys_model_B.py` - -[Example Simulation Executions:](documentation/examples/sys_model_AB_exec.py) -`/documentation/examples/sys_model_AB_exec.py` - -```python -import pandas as pd -from tabulate import tabulate -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from documentation.examples import sys_model_A, sys_model_B -from cadCAD import configs - -exec_mode = ExecutionMode() - -# # Multiple Processes Execution using Multiple System Model Configurations: -# # sys_model_A & sys_model_B -multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -sys_model_AB_simulation = Executor(exec_context=multi_proc_ctx, configs=configs) - -i = 0 -config_names = ['sys_model_A', 'sys_model_B'] -for sys_model_AB_raw_result, sys_model_AB_tensor_field in sys_model_AB_simulation.execute(): - sys_model_AB_result = pd.DataFrame(sys_model_AB_raw_result) - print() - print(f"Tensor Field: {config_names[i]}") - print(tabulate(sys_model_AB_tensor_field, headers='keys', tablefmt='psql')) - print("Result: System Events DataFrame:") - print(tabulate(sys_model_AB_result, headers='keys', tablefmt='psql')) - print() - i += 1 -``` - -##### Parameter Sweep Simulation (Concurrent): -[Example:](documentation/examples/param_sweep.py) -`/documentation/examples/param_sweep.py` - -```python -import pandas as pd -from tabulate import tabulate -# The following imports NEED to be in the exact order -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from documentation.examples import param_sweep -from cadCAD import configs - -exec_mode = ExecutionMode() -multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -run = Executor(exec_context=multi_proc_ctx, configs=configs) - -for raw_result, tensor_field in run.execute(): - result = pd.DataFrame(raw_result) - print() - print("Tensor Field:") - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - print("Output:") - print(tabulate(result, headers='keys', tablefmt='psql')) - print() -``` +## 3. Connect +Find other cadCAD users at our [Discourse](https://community.cadcad.org/). We are a small but rapidly growing community. diff --git a/cadCAD/distroduce/__init__.py b/cadCAD/distroduce/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cadCAD/distroduce/configuration/__init__.py b/cadCAD/distroduce/configuration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cadCAD/distroduce/configuration/kakfa.py b/cadCAD/distroduce/configuration/kakfa.py new file mode 100644 index 0000000..258232f --- /dev/null +++ b/cadCAD/distroduce/configuration/kakfa.py @@ -0,0 +1,30 @@ +from datetime import datetime + +# ToDo: Model Async communication here +# ToDo: Configuration of Input +def configure_producer(msg, config): + from kafka import KafkaProducer + def send_messages(events): + producer = KafkaProducer(**config) + + start_timestamp = datetime.now() + for event in range(events): + producer.send('test', msg(event)).get() + delta = datetime.now() - start_timestamp + + return start_timestamp, delta.total_seconds() + + return send_messages + +# def action(step): +# step += 1 +# percent = step / 10000 +# print(str(datetime.now()) + " - " + str(percent) + ": " + str(step)) + +# def configure_consumer(config, elemental_action: function): +# from kafka import KafkaConsumer +# def receive_messages(): +# consumer = KafkaConsumer(**config) +# return [elemental_action(i) for i, message in enumerate(consumer)] +# +# return receive_messages \ No newline at end of file diff --git a/cadCAD/distroduce/executor/__init__.py b/cadCAD/distroduce/executor/__init__.py new file mode 100644 index 0000000..a1ade4e --- /dev/null +++ b/cadCAD/distroduce/executor/__init__.py @@ -0,0 +1 @@ +from cadCAD.distroduce.executor.spark.jobs import distributed_produce \ No newline at end of file diff --git a/cadCAD/distroduce/executor/spark/jobs/__init__.py b/cadCAD/distroduce/executor/spark/jobs/__init__.py new file mode 100644 index 0000000..b22f7e4 --- /dev/null +++ b/cadCAD/distroduce/executor/spark/jobs/__init__.py @@ -0,0 +1,34 @@ +from cadCAD.distroduce.configuration.kakfa import configure_producer +from pyspark.context import SparkContext + +ascii_art = r''' + ___ _ __ _ __ __ __ + / _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ / + / // // /(_-``RzRLI`4NA}($A|sSdNJM0hjD%FOrOfQr$&Rc-M!x5y z56*Fn`hNfT{qgI%-i_<(y6^jS@8|VA6l4$((E;FJSOq-tzyI>*Hw?gefU&uwlY_0X zvylnAstOtaVb(cVXVfiN$Jq@70EhSk4gf$T0R4A`BZZJa3Nf;^xn*wZ>|o$zZfkQ& zkqDNMcF`UuQcus^#@tCykIl~gx4~sQ_jG3L{=#6o|_^JkM)421MXIqo9gvFA>6|irTDnmUYBArcT7bRhgk!K0oGrr|EU<^KKc{4#2a# z&n+_CYU9WDCs*`Ed~EFJs||_gv>JVlu#sr*yhY1&ji6j( zqgwW<7(kVMn?UHHb}wcQZrV4S2pf^_Ii;JeSWi&{cQZrgP!+~GC47|2aeb))3z{$V ziHd=_Doq4y5wFXV%S^GC-&JB67uBFxDby0Wwnr4Qb7y}h9Ig3Ol*({Vj-NYAL)BtyjWrBVg{S*h}%RFfa|>rt`F27+|bHq$C*__NX_ zUTIgfVznh-&jed4TErs07n&N^FsJ`fCuR85(0uDf-MaR8qNEDH&W$#BCim-wN)yK zsbqfFA(NR=tIh4YdH0PS9M^=L-eQt5DZS(Yv-6Q^YpWUc0(Ptlc4~dA+DTPD;uQnF zYbfCA+Rvu4rr1(nP;O;`KEcyw(_UGeKA{|e6BC92zx*8mL$U8h$h2=!>##+&Tj6Py zi2X>oNVe5368QS#))k6ww*$M=47c<0wqm-0QPSIY(zf!t$D<6lV_JbxX|3sbUu3*O5wUD^s$$iHFXmEaRjQk%fQQ-WLnGh z_O{A9JUU;{lIIF@o^&d$N~;W^<&TkK$(f_%pX*q?g*k()tI}JQ-Vm76fD`w)!&xuY zp!0>Mn|vsu8Mp3-NSiQ%Fu(%t&8CnIX-{CZfE8L zd6F?Sy4dInMSD2e!oV)t&6J;fh*=R6?wD!}A9>Go_-=eo;9K- z3%uIC5YFQ5px{n9iyulmqh5?{2WP?f?aY3PJ7@98KQM1sdId`FI#9d`A!EIl$bSNC zZ5gaHcR0W%kuunKIWVuR@deb=Q=awmb?KLFyJOtpr$QJI(&jh#g;PQ49(CiTy66iX z1Sugl1hLK{PS$*{bDAn_)p;a_+!x4jgHf6KqkeE15gA~lI_B{>gwWLp;$!$B2BMRq zdU;9)+%1{Y=*+!^S{T4;w`rvo!cwCghWqh;zUk}lw6}~WwN+|I(YdyHAFu*67hTqh z-ZBb2@`W3}=bDHRJ5rJ;16U1rP4txeZnn9m(Js_gkhfvrSGdEF0C)5-#f9{Rvy-`% z<0yGM7 z6AYMcjq278DBYqC0CuJ)wF2=1S_K2n7XdVoo9;^2IlvvWD3HN6=wIe7<)VEA5h!&Y zgFdd4#O`4}eP@&ne=Y~=9)WFAS8{Sp^!+b1iM~U)g6k)JFGD;;gGo}6lytnwTxpOb zq%)Z9z94d8OzbuNuXPn@=@*85dVX%sS5?h8VG=)Bi4@e|4va41u;u)O7ywV_sR1Z6 zT%5cA9do9_v-UA*n1baQtk$u|#16m02ja1BS+&c=sMke>l9@MQMs<9&$dmWH)X&T5 z<;%s^X2ZI3>vN6=-OJAWT5jero@;!9$Za7~XUE;85;_ORq~iGAm!dNOqY5oPOgq!b zXqo1Hb#ByhO!ecMuq^26E_@MWZ|2?9k`~g+-_k_#N~ii>Mvv8eP+Du8xa6<5&NNYR z_Y7#w39W`lB!B6`Q|okB^nSwk=6LW9J_w)XVm^h@2GY@aO9UANJ-vU4s)SHB;mXds`@Gwo&g(!srQ+ruOcZxYyI?+ z-V&4=;pc1g4Sh|i)ih*PX^wu5=ZQCM7p|8N%vQY<;*q)isQsgcx^2Rn@U6>D4@n1; zV^kA+Q*zEJS|WMdj5YVSh{ZX>yJdy67FeQmPC9++?x{2BwVl2x?~^iV{vbYPaMh`j zYWb6{$CeGe-`lk|?*o0x1nT5K02v+=HxnagCtHWzRuu-?U5_|I>`r?lkhDdTDJ))4 zn-T_n6h}zgi}asS5CRfBa`G56)ah+&ByvV2(|RnXWUDCpjjiR5Gpur~Og(KaE%H*VY}wss7+Tt-W!V^et`&!MB(%+vUD!iOAg{^aH*(%7r}qojbxqj&=qPmb(cCKJFcn zV6YFmM~Fz)(fpId6QBb&_>c{Cz_wPB zm9L{Dc(uaZl$_P^>kfVLPzk38*Rz_{jtI6=cR@Vok{um}_Hc&Gi>;4H0-2I97s_ao zJVwQaWz}YCp3(~Sd&E4A(thMF4xB(8;ERmR<`wd((gYG;LwlXoK1#hF!_lB|hUw?D zynH(mR*jo(fhG2(wLz&XH@~}}n0G~8wz~pfZ;iTIBS}5~q$m~{Rbj^;6*(!oR*vSD zMHjc`ry``bbbzN!n8@-PaJyzft@nzv1nCWgMrY^JO6@Vcq&mf?Ik<*wWXyz?Eg=*B zp)x%=C{Ijnb~qUXnB zt7dr8kv#kL60K*_-n6S0Zc!3yQ;`jOwsk}bCnsG&Y)qNl>}{bM=ixiY)_{Wf1Np*L z;_>u5h;IuX4>1naA-&U?`~{fg+Sem(olQI+HSHX?%elnr9=M}T7J;)Wt^S=#L)?ia?{9Zt53>G5 zXgjCV?8YpTtm?}2OeIEKnZJN&{-@&h=iBpx*}k`%8T{b<+kG*nd2yQ@>*#4j+_zgf zguTMM&+#Q^u53D)elf4^iyFY(lu6$X86+GOAd@w{EaauGUz_v>MaEWWL1J|C_i1Ri zRlQf?h2Z4yKnZ7LU@Reag&mrj(_sqHL1hhr?i~9S0xb(X#tzGZ!O)#38JeL3*WO8z z_iEu!;{y|Q9vA+?e3nxwfvWIhpzKyS*h$el@nYkk3hx6|xaePMIg+*mYZhTSgwG_Z z+Gdag=gz`)sk7S5Xan^ULD)K_8n^FIPRqZ5zj`N)fs&FNH4@M2Ya?AkI^#Ga+#Jho zWY6h#b>KjDVS9GE6%m{Cm|XYFw~ z$NalIuSa?=P_6p#jS|Z?fml1G;QMoc@1os~C}MIfh!iZwXfoPSNe(U5n|5MaKz*9v zn_Az)CN8;9k$T6XC49!7xrvr1=-h;ns}@m<{^m10cb+{0#_~h}J?qeCK5cBm(t7TR zm(;A)wOE>-q>naMO2UhN*fawOb!D_{=WQun=^_+o%~PGuU+)b{b#^rJYMvD*mJd!O z)k3wV9yU`8i)~6w%tG__y%O zQU+!jAry<7%FAMC*-9ELpU1HVu zW-`+zC`p57>f zpY#_#MRa#cg`{(JU)*^aVVBvyY%|LYpZwu9n%=eOo#>YnIZtw*g3B_Y(9;{AG6S6%C95azuiimd3ks zB#k#-M__vG;rdpZ({{|(@+%Oh*IlQb#c*G`84-OdGlo8M_QKRuhRL&sgmiJnlT$N| zji!?(adnMf&#H_7i<*&`nx)DRG}lSSxsIXq7*64dL(HD{;ePS|nZ5r9nZ4av_5^B{ zff=8}Q(`a0|I7tUP~QZ9Pz&+VV~rT5*aKYIi8BD2E2B9^BDA?+$_z$Y{-020&;uNN z5q8*tyGRdlEGMD{OQ$WjfujOVzI@Opa7yWW9yonZIzXW1?hvj!SLB8U!W0zwI|3pbr52){aP8WHNFT>_F(0#$m|A!slm*v zF2+VJRGiWPJ{)HF-j526Pr@FZ`_^f9PvfQ;rN6v!Q@@DCS0|^d$kjc??HXQXVp!-@ zB7B_n{T+0|Zy0n$=-vnxzD#G} z<|JZ5f2wrR_uWs%jOOe^G=d+9MxbOCzIz_w0E+-!IixVrLh@8677VzbVYhD?iGtiG z3Hs=d7rN)a1C0&)aAVu0tK@FjyEt{ooQAbh1mwb|t)M|)ILW?$*_9Ki>=x=l;1NL` zL-_Ox_MCD0rg0#d)cb$LVAaw*Q zGf7CiFG2kJ*{2j*Nm?p-DY3CBQCiwc*%S&fDY1##N_l5EIsQw_GiFv6hHNSXSE;PJ zg8H99UuiNtineG(E&Y0_3FAOYiYY^$?7(-Lhnx9cN*>_1z^9~R4|-UtkfKjWOlX>G z0F;zR$p6EFfaEiN86AHMin%Bv0Kj<~(_yk*U{zlZ*MN1jZ1^B0PT;~;NQbUp?t|Om z0}^F389_M~;j-L@)=6n~6!0wEI`cjYn8c*EJ;fX=7gs*gyv--$&r)6a{!`;}gS!Bi za{had>6?zpdCRw^--lL~qTk@b?vc>y zbDvS=xba?WpoGYQS@F5M>XOW;auNb^DrO5!eO{)okp?fqO=P?!wI3g9lU^1haUi>g zx~ey-#1J*$-_pPNs;Y8|)QfB=MXKgu)U5|DzZ&>cB5@wrPz_b z;V>0K`7t1d|Hnv&&{nLQcWqDr?X${Dc&HOEE5}!Rb6+dDR{UU)&Q18V*hWpX+{bpK z*$Ab1g7D%JJi0q3_0%KUqu*38B4KEMbvHw5BgX_^CLon)2z z!!$C#bIEN5_?D9Yl&Nq4vfBZLcrr{%fTnNSyhk_jM5g8^pUFS@#Qx?szXR$|$(}^4 zZPd!yyHeN18)=_XW$>0asND*>+&bTm>jL!3lwvmbaek);OpZ=oBPH=D9SIKIx=0qM zv>411b&(LS{lSV9ZDClbcBm!U2SzGy9K)o`$GVMOeb2VbOQVkn_w-xD8}?Bw?V; zUGGKt%#IiI##FWe!e&{~g{3404CASvMH{Ck%O799UZH+T5DoRIOU!=wYk=G3n#0mMYgt zN`q?#84jA0WtpA#zjEP+l8EMfYoh(!ls7vS&yKHk{d)Z34TiH= zlEd}IQ=vC;gv}Ke-)rZ#f9Q@tKx(=c)^7c+NuIi#ax;Z$Gs|Daic9M}qCj*#YBwX{ zwdSQq__|G0G9m00ok~|U@3IBaBK0kD+OqSXA6~nJR}l+CqMWmq=1cP<#ug; zbGLCLncD*0FMVTDtj+`;-AnPWSFtbYlZEyY)Q^o|RdzThy-|>XgC|FTeI0@UAO;O| z@Gl&|ycqaT2&xfj_Z8rw^Vrb>NgxD*SEU02q4~y>#Y3}=w0q(KfOOyZ%sf+p>eK0CPl!=@ikrylZQ*+KPB zL2s^3rGC%9&}-ev794)#wQB(gImiKtpniU&1^c`ZNDcVK7tE0E`uXtlqJwGzks;LV z!TFOe;KxQVWy3BcT!?guLV=L|A0ctFXQ1hb^MAzddS)1wSZKq+ti)d!P6lDP4Jr)7 z4<_C3Qi5@5x!Yv+^BeMA&cOt$KS{+Npub5h*ij*Z04ELlPX`j~UtmE#OgRYa(b2;H z7l5FjBM?9U;>Uylw0D5`9VGDk0GN5O8>GW8FAf&Ry(1*=LHSc9;CC1>mF&g?+I&g~ z^GUE^n|~-UCqOy;vf^N198aNT4;GA+J5u>B#KVsk{!G9 zuoMt3oH_?R|=2ImLpr^Zni-4GrS_nLBnE!a97Hy6hbA+kq~#=B3L8jo$$d#K{|UHi2E9k zT+vQp1?1h~K`V@i{?3X$r^D!OCl~^~sWJQs!0tX`4YKN_h@Wm@+pUhnw?+qH>z{+A zZ4ca^jDmT8wBJslGm01@qsn2bIIzB*;4bL(jRwM9Xr!Z&yec{vlbus)+KbZPq#*2;Ne&UQSTa~XJ+wib5I3+pA#X?y zf}B55$NwQbQV4j1X9KtnZK#g5;lL6I^EK3d*E-DdA5_zP8a4a7Aee}Uxk~I7 zJXjYb<9{$;7^zPI3Fb<8q}tswKg=EQUC=>_jJZ~sZ} baR~*}93&7I0sct`Hb5$9Ra?CP{@4EiC_^Ed literal 0 HcmV?d00001 diff --git a/distributed_produce/examples/event_bench/consumer.py b/distributed_produce/examples/event_bench/consumer.py new file mode 100644 index 0000000..9c65289 --- /dev/null +++ b/distributed_produce/examples/event_bench/consumer.py @@ -0,0 +1,11 @@ +from kafka import KafkaConsumer +from datetime import datetime + + +consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092']) + +i = 1 +for message in consumer: + percent = i / 10000 + print(str(datetime.now()) + " - " + str(percent) + ": " + str(i)) + i += 1 diff --git a/distributed_produce/examples/event_bench/main.py b/distributed_produce/examples/event_bench/main.py new file mode 100644 index 0000000..c4d1edf --- /dev/null +++ b/distributed_produce/examples/event_bench/main.py @@ -0,0 +1,58 @@ +from functools import reduce +from datetime import timedelta +from pprint import pprint +import time +from pathos.multiprocessing import ThreadPool as TPool + +from cadCAD.utils import flatten +from cadCAD.distroduce.executor import distributed_produce +from distributed_produce.examples.event_bench.spark.session import spark_context as sc + + +def main(sc, spark_runs, rdd_parts, sim_time, sim_runs, parameterized_message): + publish_times, spark_job_times = [], [] + prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0} + exec_spark_job = lambda run: distributed_produce( + sc, run, sim_time, sim_runs, rdd_parts, parameterized_message, prod_config + ) + + job = 0 + with TPool(spark_runs) as t: + start_time = time.time() + publish_times.append(t.map(exec_spark_job, range(spark_runs))) + spark_job_times.append({'job_num': job, 'job_time': time.time() - start_time}) + job += 1 + + publish_times = sorted(flatten(list(reduce(lambda a, b: a + b, publish_times))), key=lambda x: x[0]) + # publish_start_sec = list(set([time[0].second for time in publish_times])) + publish_send_secs = [time[1] for time in publish_times] + publish_send_dts = [time[0] for time in publish_times] + + send_time = publish_send_dts[-1] - publish_send_dts[0] + timedelta(microseconds=1000000 * publish_send_secs[-1]) + sent_messages = sim_time * sim_runs + print(f"Spark Job Times: ") + pprint(spark_job_times) + print() + print(f"Messages per Second: {float(sent_messages) / (float(send_time.seconds) + float(send_time.microseconds/1000000))}") + print(f"Sent Messages: {sent_messages}") + print(f"Send Time (Seconds): {send_time}") + print(f"Avg. Send Loop Duration: {sum(publish_send_secs)/(spark_runs*25)}") + + +if __name__ == "__main__": + sc.setLogLevel("ERROR") + spark_runs = 1 + sim_time = 400 + sim_runs = 25 + rdd_parts = 8 + + def msg(t): + return str(f"*****************message_{t}").encode('utf-8') + + main(sc, spark_runs, rdd_parts, sim_time, sim_runs, msg) + + # prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0, 'max_in_flight_requests_per_connection': 1, 'batch_size': 0, 'retries': 0} + # print(f"Second Starts: {len(publish_start_sec)}") + # print(f"Start Seconds: {publish_start_sec}") + # pprint(publish_send_secs) + # pprint(publish_times) diff --git a/distributed_produce/examples/event_bench/spark/session/__init__.py b/distributed_produce/examples/event_bench/spark/session/__init__.py new file mode 100644 index 0000000..9562c2a --- /dev/null +++ b/distributed_produce/examples/event_bench/spark/session/__init__.py @@ -0,0 +1,15 @@ +from pyspark.sql import SparkSession +from pyspark.context import SparkContext +import os + +os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' +os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' + +spark = SparkSession\ + .builder\ + .appName("DistributedProduce")\ + .getOrCreate() + +spark_context: SparkContext = spark.sparkContext +print(f"Spark UI: {spark_context.uiWebUrl}") +print() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2030bc8..640db81 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ wheel pathos fn tabulate -funcy \ No newline at end of file +funcy +pyspark +kafka-python \ No newline at end of file