{"id":277,"date":"2018-01-22T19:52:31","date_gmt":"2018-01-22T19:52:31","guid":{"rendered":"http:\/\/eipsoftware.com\/musings\/?p=277"},"modified":"2018-02-20T19:52:43","modified_gmt":"2018-02-20T19:52:43","slug":"spark-streaming-data","status":"publish","type":"post","link":"https:\/\/eipsoftware.com\/musings\/spark-streaming-data\/","title":{"rendered":"Spark &#8211; Streaming Data"},"content":{"rendered":"<h4>Spark &#8211; Streaming Data, Capturing and Querying<\/h4>\n<p>Today we will look at how to capture streaming data and perform some simple queries as the data is streamed. We will use the regular expressions library and the PySpak library.\u00a0 The streaming data comes from a weather station that transmits different weather at different intervals.\u00a0 We will need to find the correct data out of the stream and output the results.<\/p>\n<p>Let&#8217;s get started.<\/p>\n<p><!--more--><\/p>\n<h5>Environment Initialization<\/h5>\n<p>The streaming data comes from a weather station located in Southern California, USA.\u00a0 The University of San Diego has a web portal to be able to read the data as it is passed from the weather stations to their computer center. We will connect to the web portal and read the raw values as they are repeated.<\/p>\n<p>Some measurements are transmitted more frequently than others.\u00a0 Wind speed and wind direction are transmitted more often, while temperature and rain are transmitted at longer intervals.<\/p>\n<p><strong>Example Weather Data<\/strong><\/p>\n<pre class=\"lang:yaml decode:true \"># Example weather station data\r\n#\r\n# 1419408015\t0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M\r\n# 1419408016\t0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M\r\n# 1419408016\t0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H\r\n# 1419408017\t0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M\r\n# 1419408018\t0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M\r\n# 1419408019\t0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M<\/pre>\n<p>Description of some of the codes used in the weather data.<\/p>\n<pre class=\"lang:yaml decode:true\"># Key for measurements:\r\n#\r\n# Sn      Wind speed minimum m\/s, km\/h, mph, knots #,M, K, S, N\r\n# Sm      Wind speed average m\/s, km\/h, mph, knots #,M, K, S, N\r\n# Sx      Wind speed maximum m\/s, km\/h, mph, knots #,M, K, S, N\r\n# Dn      Wind direction minimum deg #, D\r\n# Dm      Wind direction average deg #, D\r\n# Dx      Wind direction maximum deg #, D\r\n# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I\r\n# Ta      Air temperature \u00b0C, \u00b0F #, C, F\r\n# Tp      Internal temperature \u00b0C, \u00b0F #, C, F\r\n# Ua      Relative humidity %RH #, P\r\n# Rc      Rain accumulation mm, in #, M, I\r\n# Rd      Rain duration s #, S\r\n# Ri      Rain intensity mm\/h, in\/h #, M, I\r\n# Rp      Rain peak intensity mm\/h, in\/h #, M, I\r\n# Hc      Hail accumulation hits\/cm2, hits\/in2, hits #, M, I, H\r\n# Hd      Hail duration s #, S\r\n# Hi      Hail intensity hits\/cm2h, hits\/in2h, hits\/ h #, M, I, H\r\n# Hp      Hail peak intensity hits\/cm2h, hits\/in2h, hits\/ h #, M, I, H\r\n# Th      Heating temperature \u00b0C, \u00b0F #, C, F\r\n# Vh      Heating voltage V #, N, V, W, F2\r\n# Vs      Supply voltage V V\r\n# Vr      3.5 V ref. voltage V V<\/pre>\n<p>Today we will be looking at the Wind Direction average weather data as it is streamed from the weather station.<\/p>\n<h5>Parse the Weather Data<\/h5>\n<p>We will use a regular expression to extract the wind direction weather data from the other transmitted data.<\/p>\n<pre class=\"lang:python decode:true \"># Parse a line of weather station data, returning the average wind direction measurement \r\n# import the regular expressions library\r\nimport re\r\n\r\n# Function for parsing each line of weather data\r\ndef parse(line):\r\n    match = re.search(\"Dm=(\\d+)\", line)\r\n    if match:\r\n        val = match.group(1)\r\n        return [int(val)]\r\n    return []<\/pre>\n<p>As the weather data is read in, we will search for lines that <strong>Dm=\u00a0<\/strong>and extract the direction of the wind.\u00a0 The integer value refers to the compass direction for the wind direction.\u00a0 For example 0 would be North, 90 for East, etc.<\/p>\n<p>Next we will use the streaming object from the PySpark library to connect to the web portal and get the weather data.<\/p>\n<pre class=\"lang:python decode:true \">from pyspark.streaming import StreamingContext\r\nssc = StreamingContext(sc, 1)\r\n\r\n# connect to the web portal at port 12020\r\nlines = ssc.socketTextStream(\"redacted.redacted.ucsd.edu\" ,12020)<\/pre>\n<p><em>Note: I removed the real web address because I don&#8217;t have permission to grant to other people access to the weather data.<\/em><\/p>\n<p>As each line of weather data is received it will be stored in the variable lines.<\/p>\n<pre class=\"lang:python decode:true \">vals = lines.flatMap(parse)\r\nwindow = vals.window(10 ,5)\r\n\r\n# custom function to get summary statistics, max and min\r\ndef stats(rdd):\r\n    print(rdd.collect())\r\n    if rdd.count()&gt;0:\r\n        print(\"max = {}, min={}\".format(rdd.max(), rdd.min()))\r\n\r\nwindow.foreachRDD(lambda rdd: stats(rdd))\r\n\r\n<\/pre>\n<p>We defined a custom function to read through the lines of data and print out the max and min values for the specified window of data.<\/p>\n<p>Next we will open the connection to the streaming data and view the results.<\/p>\n<pre class=\"lang:python decode:true\">ssc.start()<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-279\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-48-11.png\" alt=\"\" width=\"707\" height=\"655\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-48-11.png 707w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-48-11-300x278.png 300w\" sizes=\"auto, (max-width: 707px) 100vw, 707px\" \/><\/p>\n<p>As the streaming data comes in we can see how the function is looking at the last 10 values and determining the max and min value.<\/p>\n<p>And lastly we need to make sure we close our streaming connection.<\/p>\n<pre class=\"lang:python decode:true\">ssc.stop()<\/pre>\n<p>In this post we created custom functions to query streaming data, made a connection to streaming data source, and had the real time results show us the current max and min values for the specified window.<\/p>\n<p>I hope you found the above informative, let me know if you have any questions in the comments below.<\/p>\n<p><a href=\"mailto:michael.data@eipsoftware.com\">\u2014 michael.data@eipsoftware.com<\/a><\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Spark &#8211; Streaming Data, Capturing and Querying Today we will look at how to capture streaming data and perform some simple queries as the data is streamed. We will use the regular expressions library and the PySpak library.\u00a0 The streaming data comes from a weather station that transmits different weather at different intervals.\u00a0 We will [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_crdt_document":"","footnotes":""},"categories":[3,4,51],"tags":[28,30,52,53],"series":[],"class_list":["post-277","post","type-post","status-publish","format-standard","hentry","category-python","category-code","category-spark","tag-python","tag-code","tag-spark","tag-hadoop"],"_links":{"self":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/277","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/comments?post=277"}],"version-history":[{"count":2,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/277\/revisions"}],"predecessor-version":[{"id":280,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/277\/revisions\/280"}],"wp:attachment":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/media?parent=277"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/categories?post=277"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/tags?post=277"},{"taxonomy":"series","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/series?post=277"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}