• Skip to primary navigation
  • Skip to main content
  • Skip to primary sidebar
  • Skip to footer
Clean Programmer

Clean Programmer

Programming & DevOps Resources

  • Home
  • About
  • Contact

Count Unique in Druid Using DataSketches HLL Sketch

January 12, 2019 Monzurul Haque Shimul

Intro

In the last post, I’ve shown how to count unique for a dimension using Datasketches Theta Sketch. And in this post, I’m going to show how to count unique or count distinct in Druid using the Datasketches HLL Sketch algorithm which is also available in druid-datasketches extension. HLL Sketch uses HyperLogLog algorithm to estimate the unique counts. I’ve already discussed and shown how to use Druid’s own implementation of HyperLogLog using the HyperUnique and the Cardinality aggregator in a previous post. Those who are already using either the HyperUnique aggregator or the cardinality aggregator may ask why to use Datasketches HLL aggregator. The answer will be, HLL Sketch provides more flexibility than hyperUnique or cardinality aggregators. HLL sketch provides option to choose sketch size and HLL sketch type.

Druid Datasketches Extension

DataSketches HLL Sketch module provides Druid aggregators for distinct counting based on HLL sketch from datasketches library. The functionality of this aggregator is very similar to any other aggregators I’ve shown so far. At ingestion time, this aggregator creates the HLL sketch objects to be stored in Druid segments. At query time, sketches are read and merged together. In the end, by default, you receive the estimate of the number of distinct values presented to the sketch. Also, you can use post aggregator to produce a union of sketch columns in the same row. You can use the HLL sketch aggregator on columns of any identifiers. It will return estimated cardinality of the column.

Let’s start with the configuration. I will assume you already have a good understanding of Druid architecture and you have Druid installed and running. To use the datasketches aggregators, make sure to include the extension in druid configuration file. To do that, in conf/druid/_common/common.runtime.properties file, add “druid-datasketches” to druid.extensions.loadList property. If for example, the list already contains “druid-parser-route”, the final property should look like:

druid.extensions.loadList=["druid-parser-route", "druid-datasketches"]

Restart the servers to take effect.

Data

Like all the previous examples, I will ingest RSVP events from Meetup.com in this example too. We can get the events data simply by running following command:

$ wget -qO- http://stream.meetup.com/2/rsvps > meetup.json

Let’s take a look at the data first. We receive events of the following type from Meetup:

{
   "venue":{
      "venue_name":"Upper Macungie Park",
      "lon":-75.628995,
      "lat":40.590295,
      "venue_id":1251399
   },
   "visibility":"public",
   "response":"yes",
   "guests":0,
   "member":{
      "member_id":83868692,
      "photo":"https://secure.meetupstatic.com/photos/member/5/3/d/6/thumb_120381462.jpeg",
      "member_name":"Bruce Kantor"
   },
   "rsvp_id":1755860828,
   "mtime":1541193507549,
   "event":{
      "event_name":"Fall Ride from Upper Macungie Park",
      "event_id":"256076170",
      "time":1541602800000,
      "event_url":"https://www.meetup.com/Lehigh-Wheelmen-Association/events/256076170/"
   },
   "group":{
      "group_topics":[
         {
            "urlkey":"cycling-training",
            "topic_name":"Cycling Training"
         },
         {
            "urlkey":"casual-bicycling",
            "topic_name":"Casual Bicycling"
         },
         {
            "urlkey":"excercise",
            "topic_name":"Exercise"
         },
         {
            "urlkey":"sports",
            "topic_name":"Sports and Recreation"
         },
         {
            "urlkey":"adventure",
            "topic_name":"Adventure"
         },
         {
            "urlkey":"mountain-biking",
            "topic_name":"Mountain Biking"
         },
         {
            "urlkey":"cycling-for-fitness",
            "topic_name":"Cycling for Fitness"
         },
         {
            "urlkey":"bike",
            "topic_name":"Bicycling"
         },
         {
            "urlkey":"road-cycling",
            "topic_name":"Road Cycling"
         },
         {
            "urlkey":"cycling",
            "topic_name":"Cycling"
         },
         {
            "urlkey":"bicycle-touring",
            "topic_name":"Bicycle Touring"
         }
      ],
      "group_city":"Trexlertown",
      "group_country":"us",
      "group_id":7325352,
      "group_name":"Lehigh Wheelmen Association (LWA)",
      "group_lon":-75.6,
      "group_urlname":"Lehigh-Wheelmen-Association",
      "group_state":"PA",
      "group_lat":40.55
   }
}

Explanation of the response json is documented in the Meetup API Doc.

Ingestion

The following json is an “index spec” that tells Druid the location of the data file, the size of the segment and index periods, the dimensions we want to use and metrics (or aggregations) we want Druid to create. When we send this json to the overlord, it will spawn a task that indexes the data.

{
  "type": "index",
  "spec": {
    "dataSchema": {
      "dataSource": "meetup",
      "parser": {
        "type": "string",
        "parseSpec": {
          "format": "json",
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              {
                "type": "path",
                "name": "member_id",
                "expr": "$.member.member_id"
              },
              {
                "type": "path",
                "name": "venue_id",
                "expr": "$.venue.venue_id"
              },
              {
                "type": "path",
                "name": "event_id",
                "expr": "$.event.event_id"
              }
            ]
          },
          "timestampSpec": {
            "column": "mtime",
            "format": "auto"
          },
          "dimensionsSpec": {
            "dimensions": [
              "member_id",
              "event_id",
              "venue_id"
            ]
          }
        }
      },
      "metricsSpec": [
        {
          "name": "count",
          "type": "count"
        },
        {
          "name": "unique_member",
          "type": "HLLSketchBuild",
          "fieldName": "member_id"
        },
        {
          "name": "unique_venue",
          "type": "HLLSketchBuild",
          "fieldName": "venue_id"
        },
        {
          "name": "unique_event",
          "type": "HLLSketchBuild",
          "fieldName": "event_id"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "HOUR",
        "rollup": false
      }
    },
    "ioConfig": {
      "type": "index",
      "firehose": {
        "type": "local",
        "baseDir": "/Users/shimul/work/dev",
        "filter": "meetup.json"
      },
      "appendToExisting": true
    },
    "tuningConfig": {
      "forceExtendableShardSpecs": true,
      "type": "index"
    }
  }
}

To submit this task, POST it to Druid from a terminal window which will print the ID of the task if the submission was successful. Let’s submit the task:

$ curl -X 'POST' -H 'Content-Type:application/json' -d @hll-sketch-ingest.json http://localhost:8090/druid/indexer/v1/task
{"task":"index_meetup_2019-01-07T08:53:05.270Z"}

To view the status of the ingestion task, go to overlord console: http://localhost:8090/console.html. Refresh the console periodically, and after the task is successful, you should see a “SUCCESS” status for the task.

After ingestion task finishes, the data will be loaded by historical nodes and available for querying within a minute or two. You can monitor the progress of loading data in the coordinator console, by checking whether there is a datasource “meetup” with a blue circle indicating “fully available”: http://localhost:8081/#/.

Query

Now if we want to know the number of unique members each hours, we can use a timeseries query with a hll sketch aggregation on the member_id dimension:

{
  "queryType": "timeseries",
  "dataSource": "meetup",
  "intervals": "2019-01-07T00:00Z/2019-01-08T00:00Z",
  "granularity": "hour",
  "aggregations": [
    {
      "type": "HLLSketchBuild",
      "name": "unique_member_hll",
      "fieldName": "member_id",
      "lgK": "12",
      "tgtHllType": "HLL_4"
    }
  ]
}

Here I’ve used two additional parameters in the aggregator,

  • lgK is optional and refers to log2 of K that is the number of buckets in the sketch, parameter that controls the size and the accuracy. Must be a power of 2 from 4 to 21 inclusively. The default value is 12. Value 21 will give you most accurate result but will use more resources to estimate the result.
  • tgtHllType is also optional and it defines the type of the target HLL sketch. The value must be “HLL_4”, “HLL_6” or “HLL_8” and the default value is set to “HLL_4”.

These two parameters give you the flexibility to control the number of buckets in the sketch and the type of the sketch. The number of buckets in the HLL Sketch controls the accuracy of the estimated result. You can also use them during data ingestion.

Now if you look at the ingestion spec, you’ll find that we’ve already ingested them as hll sketch objects. So we can run the following query which will be faster than the previous one.

{
  "queryType": "timeseries",
  "dataSource": "meetup",
  "intervals": "2019-01-07T00:00Z/2019-01-08T00:00Z",
  "granularity": "hour",
  "aggregations": [
    {
      "type" : "HLLSketchMerge",
      "name" : "unique_member",
      "fieldName" : "unique_member"
     }
  ]
}

Remember, you need to use HLLSketchBuild type HLL Sketch aggregator during ingestion and HLLSketchMerge type HLL Sketch aggregator during query for already sketched metrics.

In the example ingestion spec, I have also ingested event_id and venue_id HLL sketches to count unique number of events, venues. Now I want you to do some practice by writing queries for them. Do some experiments modifying the parameters which will give you a better understanding about them.

Summary

That’s it. Thanks for reading this far. I have shown how to ingest data into druid and then query for some unique numbers of a dimension using the Datasketches HLL Sketch aggregator. I’ve shown both the way of querying sketched metrics and non-sketched dimensions. I recommend to use the HLL sketch aggregator during ingestion and query on the sketched metrics to get better performance.

Reference

  • http://druid.io/docs/latest/development/extensions-core/datasketches-hll.html
  • https://datasketches.github.io/
  • https://www.meetup.com/meetup_api/

Related Posts

Count Unique in Druid Using DataSketches Theta Sketch
Getting Unique Counts from Druid Using HyperLogLog
Configure Druid to Use Lookups from JSON File

Categories: Druid Tags: druid.io, imply.io

Primary Sidebar

Categories

  • Apache Kafka
  • Druid
  • Git
  • Java
  • Java EE
  • Redis
  • Spring
  • Uncategorized
  • Weblogic
  • Wildfly

Featured Posts

Loading data into Druid from TSV file

How to configure JMS in WildFly

Calculate factorial number using Java Stream API

Automate git reposotory migration from local git server to bitbucket – Part 1

How to Configure Druid to Use PostgreSQL as Metadata Storage

How to Configure Druid to Use Zenko CloudServer (Scality S3 Server) as Deep Storage

Footer

Monzurul Haque Shimul

I’m a full-stack software engineer with 10 years of experience in design and development of large scaled Enterprise Software Systems built on Java and Java EE related tools and technologies. I’m also a contributor on GitHub, Stack Overflow, DZone. My core expertise lies in building JVM-based, scalable, reactive, data-driven applications.

Follow

  • Email
  • GitHub
  • LinkedIn
  • Twitter

© 2019 CLEAN PROGRAMMER

  • Home
  • Archive
  • About
  • Contact
  • Privacy Policy
We use cookies to ensure that we give you the best experience on our website. If you continue to use this site we will assume that you are happy with it.OKNoRead more
Revoke Cookies