{"id":261,"date":"2018-01-21T19:11:01","date_gmt":"2018-01-21T19:11:01","guid":{"rendered":"http:\/\/eipsoftware.com\/musings\/?p=261"},"modified":"2018-02-20T19:24:43","modified_gmt":"2018-02-20T19:24:43","slug":"spark-sqlcontext","status":"publish","type":"post","link":"https:\/\/eipsoftware.com\/musings\/spark-sqlcontext\/","title":{"rendered":"Spark &#8211; SQLContext"},"content":{"rendered":"<h4>Spark &#8211; SQLContext<\/h4>\n<p class=\"\">Today we will look at the SQLContext object from the PySpark library and how you can use it to connect to a local database.\u00a0 In the example below we will:<br \/>\nConnect to a local PostgreSQL database and read the contents into a dataframe.<br \/>\nRun some simple SQL queries<br \/>\nAnd join two data frames together<\/p>\n<p>Let&#8217;s get started.<\/p>\n<p><!--more--><\/p>\n<h5><strong>Initializing Environment<\/strong><\/h5>\n<p>In the environment I was using I had a local instance of a PostgreSQL database installed.<\/p>\n<p>I had a database called &#8220;Cloudera&#8221;. You can see all available databases in PostgreSQL by using the <strong>\\list<\/strong> command from a PostgreSQL prompt. There are 3 tables in the database, adclicks, buyclicks, and gameclicks.\u00a0 Use the <strong>\\dt<\/strong> command to show the tables in the database.<\/p>\n<h5>Python Code<\/h5>\n<p>First we need to import the object from the PySpark library. And will create an object of type SQLContext named sqlsc.<\/p>\n<pre class=\"lang:python decode:true \">from pyspark.sql import SQLContext\r\nsqlsc = SQLContext(sc)<\/pre>\n<p>Next will read the contents of the gameclicks table and store the result into a dataframe named df. And then we can see the results of the schema. I am using jdbc:postgresql driver to connect to the database.<\/p>\n<pre class=\"lang:python decode:true \">df = sqlsc.read.format(\"jdbc\") \\\r\n  .option(\"url\", \"jdbc:postgresql:\/\/localhost\/cloudera?user=cloudera\") \\\r\n  .option(\"dbtable\", \"gameclicks\") \\\r\n  .load()\r\n\r\ndf.printSchema()<\/pre>\n<p>The gameclicks schema is below.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-264\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-47-10.png\" alt=\"\" width=\"421\" height=\"153\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-47-10.png 421w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-47-10-300x109.png 300w\" sizes=\"auto, (max-width: 421px) 100vw, 421px\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>Now, we will count how many rows are in the data frame and show the top 5 rows.<\/p>\n<pre class=\"lang:python decode:true \">df.count()\r\n\r\ndf.show(5)<\/pre>\n<pre class=\"lang:python decode:true \">755806<\/pre>\n<p>&nbsp;<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-263\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-45-32.png\" alt=\"\" width=\"641\" height=\"152\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-45-32.png 641w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-45-32-300x71.png 300w\" sizes=\"auto, (max-width: 641px) 100vw, 641px\" \/><\/p>\n<h5>PySpark Queries<\/h5>\n<p>Now that the data frame is loaded we can perform some simple queries.<\/p>\n<p>First we will do a query that only brings back specified columns and first five rows. And you can see the results below.<\/p>\n<pre class=\"lang:python decode:true \">df.select(\"userid\", \"teamlevel\").show(5)<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-267\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-52-30.png\" alt=\"\" width=\"165\" height=\"149\" \/><\/p>\n<p>Next we can use the filter method on the data frame, similar to the\u00a0<strong>WHERE<\/strong> clause in a traditional SQL query.<\/p>\n<pre class=\"lang:python decode:true \">df.filter(df[\"teamlevel\"] &gt; 1).select(\"userid\" ,\"teamlevel\").show(5)<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-266\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-52-45.png\" alt=\"\" width=\"206\" height=\"153\" \/><\/p>\n<h5>Aggregation Queries<\/h5>\n<p>Now we will do some simple aggregation queries.<\/p>\n<p>Now let&#8217;s group the results by the value in the IsHit column.<\/p>\n<pre class=\"lang:python decode:true\">df.groupBy(\"IsHit\").count().show()<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-265\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-52-56.png\" alt=\"\" width=\"130\" height=\"104\" \/><\/p>\n<p>If we want to do additional types of aggregations beyond counting we will need to import the SQL functions from the PySpark library. And then we will take the mean and the sum of the IsHit column.<\/p>\n<pre class=\"lang:python decode:true\">from pyspark.sql.functions import *\r\ndf.select(mean(\"IsHit\"), sum(\"IsHit\")).show()<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-268\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-13-58-21.png\" alt=\"\" width=\"269\" height=\"90\" \/><\/p>\n<h5>Joining Tables<\/h5>\n<p>The last task for this post is to show how to join tables.<\/p>\n<p>We will read the adclicks table into a dataframe and join it to the game clicks table. We will join on the userid column.<\/p>\n<p>Similar to above we will use the jdbc driver to load the dataframe and store it in a data frame object.<\/p>\n<pre class=\"lang:python decode:true\">df2 = sqlsc.read.format(\"jdbc\") \\\r\n.option(\"url\", \"jdbc:postgresql:\/\/localhost\/cloudera?user=cloudera\") \\\r\n.option(\"dbtable\", \"adclicks\") \\\r\n.load()\r\n\r\ndf2.printSchema()<\/pre>\n<p>Here are the schema results<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-273\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-41.png\" alt=\"\" width=\"403\" height=\"142\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-41.png 403w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-41-300x106.png 300w\" sizes=\"auto, (max-width: 403px) 100vw, 403px\" \/><\/p>\n<p>Now we can use the .join method similar to an inner join in an SQL query and join the two data frames.<\/p>\n<pre class=\"lang:python decode:true\">dfMerge = df.join(df2, \"userid\")\r\n\r\ndfMerge.printSchema()<\/pre>\n<p>The merged schema<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-272\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-51.png\" alt=\"\" width=\"441\" height=\"241\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-51.png 441w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-06-51-300x164.png 300w\" sizes=\"auto, (max-width: 441px) 100vw, 441px\" \/><\/p>\n<pre class=\"lang:python decode:true\">dfMerge.show(7)<\/pre>\n<p>And now we can take a look at what the merged data frames look like.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-271\" src=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-07-14.png\" alt=\"\" width=\"979\" height=\"377\" srcset=\"https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-07-14.png 979w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-07-14-300x116.png 300w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-07-14-768x296.png 768w, https:\/\/eipsoftware.com\/musings\/wp-content\/uploads\/2018\/02\/Screenshot-from-2018-02-20-14-07-14-900x347.png 900w\" sizes=\"auto, (max-width: 979px) 100vw, 979px\" \/><\/p>\n<p>In this post we did quite a few things. Looked at loading tables from a PostgreSQL database, performing some simple SQL queries on the data frames, and joined the data frames together.<\/p>\n<p>I hope you found the above informative, let me know if you have any questions in the comments below.<\/p>\n<p><a href=\"mailto:michael.data@eipsoftware.com\">\u2014 michael.data@eipsoftware.com<\/a><\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Spark &#8211; SQLContext Today we will look at the SQLContext object from the PySpark library and how you can use it to connect to a local database.\u00a0 In the example below we will: Connect to a local PostgreSQL database and read the contents into a dataframe. Run some simple SQL queries And join two data [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_crdt_document":"","footnotes":""},"categories":[3,4,51],"tags":[28,30,52,53],"series":[],"class_list":["post-261","post","type-post","status-publish","format-standard","hentry","category-python","category-code","category-spark","tag-python","tag-code","tag-spark","tag-hadoop"],"_links":{"self":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/261","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/comments?post=261"}],"version-history":[{"count":5,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/261\/revisions"}],"predecessor-version":[{"id":276,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/posts\/261\/revisions\/276"}],"wp:attachment":[{"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/media?parent=261"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/categories?post=261"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/tags?post=261"},{"taxonomy":"series","embeddable":true,"href":"https:\/\/eipsoftware.com\/musings\/wp-json\/wp\/v2\/series?post=261"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}