• Skip to primary navigation
  • Skip to content
  • Skip to primary sidebar
Clean Programmer

Clean Programmer

Programming & DevOps Resources

  • Home
  • Library
  • About
  • Contact

Count Unique in Druid Using DataSketches Theta Sketch

January 11, 2019 Monzurul Haque Shimul

In the last post, I’ve shown how to count unique for a dimension using Druid’s own cardinality aggregator and hyperUnique aggregator which are based on HyperLogLog algorithm. And in this post, I’m going to show how to count unique or count distinct in Druid using the Datasketches Theta Sketch algorithm which is available in druid-datasketches extension. I will assume you already have a good understanding of Druid architecture and you have Druid installed and running.

Datasketches

In big data analytics, there are often problem with certain types of queries that don’t scale because they require huge compute resources and time to generate exact results. Example includes count distinct, quantiles etc. If approximate results are acceptable, there is a class of specialized algorithms, called streaming algorithms, or sketches that can produce results orders-of magnitude faster and with mathematically proven error bounds. For interactive queries there may not be other viable alternatives, and in the case of real-time analysis, sketches are the only known solution. Datasketches is a library developed at Yahoo which provides several sketch algorithms for both real-time and batch processing. Sketches are fast and query results are approximate but within well defined error bounds that are user configurable by trading off sketch size with accuracy.

While calculating unique or distinct occurrences of a variable within a stream of data, a copy of each known distinct value of that variable needs to be kept and each new instance of the variable needs to be checked against the list of known distinct values to check if this constitutes a new or an existing value. In the context of calculating unique visitors to a website in real time, this means that, assuming the number of unique visitors is N, the system will have a memory requirement of O(N) and each hit to the website will take O(log N) to check whether we have a unique visitor.

To solve the computational challenges associated with unique identifiers, Datasketches has provided two families of Count Unique algorithms:

  • The Theta Sketch Framework algorithms that are tuned for operation on the java heap or off-heap.
  • The Hyper-Log Log algorithms when sketch size is of utmost concern.

druid-datasketches Extension

By leveraging the fast, approximate calculations of DataSketches, complex analytic queries such as cardinality estimation and retention analysis can be completed in less than one second in Druid. This allows developers to visualize the results in real-time, and to be able to slice and dice results across a variety of different filters. In this article, we’re going to use the Theta Sketch to count unique or count distinct in Druid. Let’s start.

To use the datasketches aggregators, make sure you include the extension in your config file. To do that, in conf/druid/_common/common.runtime.properties file, add “druid-datasketches” to druid.extensions.loadList property. If for example, the list already contains “druid-parser-route”, the final property should look like:

druid.extensions.loadList=["druid-parser-route", "druid-datasketches"]

Now restart the Druid servers to take effect.

Data

In this example, I will ingest RSVP events from Meetup.com. We can get the events data simply by running following command:

$ wget -qO- http://stream.meetup.com/2/rsvps > meetup.json

Let’s take a look at the data first. We receive events of the following type from Meetup:

{
   "venue":{
      "venue_name":"Upper Macungie Park",
      "lon":-75.628995,
      "lat":40.590295,
      "venue_id":1251399
   },
   "visibility":"public",
   "response":"yes",
   "guests":0,
   "member":{
      "member_id":83868692,
      "photo":"https://secure.meetupstatic.com/photos/member/5/3/d/6/thumb_120381462.jpeg",
      "member_name":"Bruce Kantor"
   },
   "rsvp_id":1755860828,
   "mtime":1541193507549,
   "event":{
      "event_name":"Fall Ride from Upper Macungie Park",
      "event_id":"256076170",
      "time":1541602800000,
      "event_url":"https://www.meetup.com/Lehigh-Wheelmen-Association/events/256076170/"
   },
   "group":{
      "group_topics":[
         {
            "urlkey":"cycling-training",
            "topic_name":"Cycling Training"
         },
         {
            "urlkey":"casual-bicycling",
            "topic_name":"Casual Bicycling"
         },
         {
            "urlkey":"excercise",
            "topic_name":"Exercise"
         },
         {
            "urlkey":"sports",
            "topic_name":"Sports and Recreation"
         },
         {
            "urlkey":"adventure",
            "topic_name":"Adventure"
         },
         {
            "urlkey":"mountain-biking",
            "topic_name":"Mountain Biking"
         },
         {
            "urlkey":"cycling-for-fitness",
            "topic_name":"Cycling for Fitness"
         },
         {
            "urlkey":"bike",
            "topic_name":"Bicycling"
         },
         {
            "urlkey":"road-cycling",
            "topic_name":"Road Cycling"
         },
         {
            "urlkey":"cycling",
            "topic_name":"Cycling"
         },
         {
            "urlkey":"bicycle-touring",
            "topic_name":"Bicycle Touring"
         }
      ],
      "group_city":"Trexlertown",
      "group_country":"us",
      "group_id":7325352,
      "group_name":"Lehigh Wheelmen Association (LWA)",
      "group_lon":-75.6,
      "group_urlname":"Lehigh-Wheelmen-Association",
      "group_state":"PA",
      "group_lat":40.55
   }
}

Explanation of the response json is documented in the Meetup API Doc.

Ingestion

Now, I would like you to ingest some Meetup.com RSVP data into Druid. You can look into some of my previous tutorials listed below to learn how to do that. 

  • Loading JSON data into Druid from batch file
  • Loading data into Druid from CSV file
  • Loading data into Druid from TSV file
  • Loading data from Apache Kafka to Druid

The following json is an “index spec” that tells Druid the location of the data file, the size of the segment and index periods, the dimensions we want to use and metrics (or aggregations) we want Druid to create. When we send this json to the overlord, it will spawn a task that indexes the data.

{
  "type": "index",
  "spec": {
    "dataSchema": {
      "dataSource": "meetup",
      "parser": {
        "type": "string",
        "parseSpec": {
          "format": "json",
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              {
                "type": "path",
                "name": "member_id",
                "expr": "$.member.member_id"
              },
              {
                "type": "path",
                "name": "venue_id",
                "expr": "$.venue.venue_id"
              },
              {
                "type": "path",
                "name": "event_id",
                "expr": "$.event.event_id"
              }
            ]
          },
          "timestampSpec": {
            "column": "mtime",
            "format": "auto"
          },
          "dimensionsSpec": {
            "dimensions": [
              "member_id",
              "event_id",
              "venue_id"
            ]
          }
        }
      },
      "metricsSpec": [
        {
          "name": "count",
          "type": "count"
        },
        {
          "name": "unique_member",
          "type": "thetaSketch",
          "fieldName": "member_id"
        },
        {
          "name": "unique_venue",
          "type": "thetaSketch",
          "fieldName": "venue_id"
        },
        {
          "name": "unique_event",
          "type": "thetaSketch",
          "fieldName": "event_id"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "HOUR",
        "rollup": false
      }
    },
    "ioConfig": {
      "type": "index",
      "firehose": {
        "type": "local",
        "baseDir": "/Users/shimul/work/dev",
        "filter": "meetup.json"
      },
      "appendToExisting": true
    },
    "tuningConfig": {
      "forceExtendableShardSpecs": true,
      "type": "index"
    }
  }
}

To submit this task, POST it to Druid from a terminal window which will print the ID of the task if the submission was successful. Let’s submit the task:

$ curl -X 'POST' -H 'Content-Type:application/json' -d @cardinality-ingest.json http://localhost:8090/druid/indexer/v1/task
{"task":"index_meetup_2019-01-07T08:53:05.270Z"}

To view the status of the ingestion task, go to overlord console: http://localhost:8090/console.html. Refresh the console periodically, and after the task is successful, you should see a “SUCCESS” status for the task.

After ingestion task finishes, the data will be loaded by historical nodes and available for querying within a minute or two. You can monitor the progress of loading data in the coordinator console, by checking whether there is a datasource “meetup” with a blue circle indicating “fully available”: http://localhost:8081/#/.

Query

Now if we want to know the number of unique members each hours, we use a timeseries query with a theta sketch aggregation on the member_id dimension:

{
  "queryType": "timeseries",
  "dataSource": "meetup",
  "intervals": "2019-01-07T00:00Z/2019-01-08T00:00Z",
  "granularity": "hour",
  "aggregations": [
    {
      "type" : "thetaSketch",
      "name" : "unique_member",
      "fieldName" : "member_id",
      "isInputThetaSketch": false,
      "size": 16384
     }
  ]
}

Running the query will result:

$ curl -X POST -H "Content-Type: application/json" -d @theta_unique_member_query.json "http://localhost:8082/druid/v2/?pretty"
[ {
  "timestamp" : "2019-01-07T01:00:00.000Z",
  "result" : {
    "unique_member" : 1.0
  }
}, {
  "timestamp" : "2019-01-07T02:00:00.000Z",
  "result" : {
    "unique_member" : 0.0
  }
}, {
  "timestamp" : "2019-01-07T03:00:00.000Z",
  "result" : {
    "unique_member" : 0.0
  }
}, {
  "timestamp" : "2019-01-07T04:00:00.000Z",
  "result" : {
    "unique_member" : 1.0
  }
}, {
  "timestamp" : "2019-01-07T05:00:00.000Z",
  "result" : {
    "unique_member" : 2.0
  }
}, {
  "timestamp" : "2019-01-07T06:00:00.000Z",
  "result" : {
    "unique_member" : 5.0
  }
}, {
  "timestamp" : "2019-01-07T07:00:00.000Z",
  "result" : {
    "unique_member" : 576.0
  }
}, {
  "timestamp" : "2019-01-07T08:00:00.000Z",
  "result" : {
    "unique_member" : 2065.0
  }
} ]

The problem with running the above query is that on each execution Druid needs to run the sketch aggregator to estimate the unique count. That’s why it is recommended to use it at ingestion time as well to make querying faster. Now if you see the ingestion spec, you’ll find that we’ve already ingested them as theta sketch objects. So we can run the following query which will be faster than the previous one.

{
  "queryType": "timeseries",
  "dataSource": "meetup",
  "intervals": "2019-01-07T00:00Z/2019-01-08T00:00Z",
  "granularity": "hour",
  "aggregations": [
    {
      "type" : "thetaSketch",
      "name" : "unique_member",
      "fieldName" : "unique_member",
      "isInputThetaSketch": true,
      "size": 16384
     }
  ]
}

Here, the parameter size is optional but must be a power of 2 if given. Internally, size refers to the maximum number of entries sketch object will retain. Higher size means higher accuracy but more space to store sketches. Note that after you index with a particular size, druid will persist sketch in segments and you will use size greater or equal to that at query time. See the DataSketches site for details. In general, it is recommended to just sticking to default size which is 16384. You can do some experiments by changing the size. You’ll get different estimation values for different values of size.

In the example ingestion spec, I have also ingested event_id and venue_id sketches to count unique number of events, venues. Now I want you to do some practice by writing queries for them.

Summary

And that’s it for now. Thanks for reading this far. I have shown how to ingest data into druid and then query for some unique numbers of a dimension using the datasketches theta sketch aggregator. I’ve shown both the way of querying sketched metrics and non-sketched dimensions. I recommend you to use the sketch aggregator during ingestion and query on the sketched metrics to get better performance. In the next post, I will demonstrate how to do the similar with HLL sketch provided by Druid DataSketches extension. Keep coming back.

References

  • http://druid.io/docs/latest/development/extensions-core/datasketches-theta.html
  • https://datasketches.github.io/
  • https://www.infoq.com/news/2016/01/Yahoo-open-sources-data-sketches
  • https://www.meetup.com/meetup_api/

Druid druid.io imply.io

Reader Interactions

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Primary Sidebar

Categories

  • Apache Kafka
  • Druid
  • Git
  • Java
  • Java EE
  • Redis
  • Spring
  • Uncategorized
  • Weblogic
  • Wildfly

Featured Posts

How to setup Zookeeper Cluster

Making a GET request with request body using Spring RestTemplate

How to configure JMS in WildFly

How to Reindex Data in Druid with Native Batch Ingestion

Git Alias: Make your Git experience simpler, easier, faster and clean

Tags

bash bitbucket cassandra cloudserver curl docker druid.io eclipselink ejb git imply.io java java-ee jaxws jboss jboss-cli jdbc jdk jms kafka maven minio mssql mysql ojdbc oracle postgresql redis rest rest-template S3 scality sdk sdkman soap spring sqlserver stream stream api weblogic web services wildfly wsdl zenko zookeeper

Archives

  • January 2019
  • December 2018
  • November 2018
  • October 2018
  • September 2018
  • August 2018
  • July 2018
  • June 2018
  • May 2018

Copyright © 2019 · CLEAN PROGRAMMER

  • Privacy Policy
We use cookies to ensure that we give you the best experience on our website. If you continue to use this site we will assume that you are happy with it.OKNoRead more
Revoke Cookies