MacLochlainns Weblog

Michael McLaughlin's Technical Blog

Site Admin

MySQL with CTEs

without comments

As an example for my class on the usefulness of Common Table Expressions (CTEs), I created three examples with Python. They extend an exercise in Chapter 9 on subqueries from Learning SQL by Alan Beaulieu. All of the examples work with the sakila sample database.

These bullets describe the examples:

  1. Uses local variables and a range for loop and if statement that uses the variables to evaluate and add an element to the derived table (or query result set) from MySQL.
  2. Uses a CTE with substitution variables from the Python program, which eliminates the need to evaluate and add an element to the query result set because the query does that.
  3. Uses a table to hold the variables necessary to evaluate and add the element to the query result set.
  4. This is the first Python program:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    
    # Import the library.
    import sys
    import mysql.connector
    from mysql.connector import errorcode
     
    # Declare a list of tuples.
    dict = [{'level':'Hollywood Star','min_roles':30,'max_roles':99999}
           ,{'level':'Prolific Actor','min_roles':20,'max_roles':29}
           ,{'level':'Newcomer','min_roles':1,'max_roles':19}]
     
    #  Attempt the query.
    # ============================================================
    #  Use a try-catch block to manage the connection.
    # ============================================================
    try:
      # Open connection.
      cnx = mysql.connector.connect(user='student', password='student',
                                    host='127.0.0.1',
                                    database='sakila')
      # Create cursor.
      cursor = cnx.cursor()
     
      # Set the query statement.
      query = ("SELECT   a.actor_id "
               ",        a.first_name       "
               ",        a.last_name "
               ",        COUNT(fa.actor_id) AS films "
               "FROM     actor a INNER JOIN film_actor fa "
               "ON       a.actor_id = fa.actor_id "
               "GROUP BY a.actor_id "
               ",        a.first_name "
               ",        a.last_name "
               "ORDER BY a.last_name "
               ",        a.first_name")
     
      # Execute cursor.
      cursor.execute(query)
     
      # Display the rows returned by the query.
      for (actor_id, first_name, last_name, films) in cursor:
        for i in range(len(dict)):
          if films >= dict[i]["min_roles"] and films <= dict[i]["max_roles"]:
            print('{0} {1} is a {2} with {3} films.'.format( first_name.title()
                                                           , last_name.title()
                                                           , dict[i]["level"]
                                                           , films))
     
      # Close cursor.
      cursor.close()
     
    # ------------------------------------------------------------
    # Handle exception and close connection.
    except mysql.connector.Error as e:
      if e.errno == errorcode.ER_ACCESS_DENIED_ERROR:
        print("Something is wrong with your user name or password")
      elif e.errno == errorcode.ER_BAD_DB_ERROR:
        print("Database does not exist")
      else:
        print("Error code:", e.errno)        # error number
        print("SQLSTATE value:", e.sqlstate) # SQLSTATE value
        print("Error message:", e.msg)       # error message
     
    # Close the connection when the try block completes.
    else:
      cnx.close()

    The Python dictionary on lines 7 thru 9 and range for loop and if statement on lines 41 and 42 can be eliminated by putting the literal values in a Common Table Expression (CTE). That’s because a CROSS JOIN matches all rows in the CTE against the base table before filtering them.

    The match of all rows in the CTE against the base table effectively replaces the range for loop in the original code. The WHERE clause replaces the if statement in the original code.

    Another optimization for readability of the final query puts the grouped query into a CTE as well. That way the final query simply demonstrates the filtering process.

    This is the second Python program, and it converts the Python dictionary to a list of lists and assigns the lists to param tuple:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    
    # Import the library.
    import sys
    import mysql.connector
    from mysql.connector import errorcode
     
    # Declare a list of lists.
    list = [['Hollywood Star',30,99999]
           ,['Prolific Actor',20,29]
           ,['Newcomer',1,19]]
     
    # Declare a tuple of the set of lists.
    param = (list[0] + list[1] + list[2])
     
    #  Attempt the query.
    # ============================================================
    #  Use a try-catch block to manage the connection.
    # ============================================================
    try:
      # Open connection.
      cnx = mysql.connector.connect(user='student', password='student',
                                    host='127.0.0.1',
                                    database='sakila')
      # Create cursor.
      cursor = cnx.cursor()
     
      # Set the query statement.
      query = ("WITH actors AS "
               "(SELECT   a.actor_id "
               " ,        a.first_name "
               " ,        a.last_name "
               " ,        COUNT(fa.actor_id) AS num_roles "
               " FROM     actor a INNER JOIN film_actor fa "
               " ON       a.actor_id = fa.actor_id "
               " GROUP BY a.actor_id "
               " ,        a.first_name "
               " ,        a.last_name ) "
               " , levels AS "
               "(SELECT  %s AS level "
               " ,       %s AS min_roles "
               " ,       %s AS max_roles "
               " UNION ALL "
               " SELECT  %s AS level "
               " ,       %s AS min_roles "
               " ,       %s AS max_roles "
               " UNION ALL "
               " SELECT  %s AS level "
               " ,       %s AS min_roles "
               " ,       %s AS max_roles) "
               " SELECT a.first_name "
               " ,      a.last_name "
               " ,      l.level "
               " ,      a.num_roles "
               " FROM   actors a CROSS JOIN levels l "
               " WHERE  a.num_roles BETWEEN l.min_roles AND l.max_roles "
               " ORDER BY a.last_name "
               " ,        a.first_name")
     
      # Execute cursor.
      cursor.execute(query, param)
     
      # Display the rows returned by the query.
      for (first_name, last_name, level, num_roles) in cursor:
        print('{0} {1} is a {2} with {3} films.'.format( first_name.title()
                                                       , last_name.title()
                                                       , level.title()
                                                       , num_roles))
     
      # Close cursor.
      cursor.close()
     
    # ------------------------------------------------------------
    # Handle exception and close connection.
    except mysql.connector.Error as e:
      if e.errno == errorcode.ER_ACCESS_DENIED_ERROR:
        print("Something is wrong with your user name or password")
      elif e.errno == errorcode.ER_BAD_DB_ERROR:
        print("Database does not exist")
      else:
        print("Error code:", e.errno)        # error number
        print("SQLSTATE value:", e.sqlstate) # SQLSTATE value
        print("Error message:", e.msg)       # error message
     
    # Close the connection when the try block completes.
    else:
      cnx.close()

    This is the third Python program requires some SQL setup. You should run this script inside the sakila database first. It basically takes the variables out of the code and stores them in a table. This is more likely what you would do to ensure maintainability of ever changing range values like these if you built a solution like this in a real application. It leaves the aggregation process inside a CTE and simplifies the final query.

    -- Conditionally drop the levels table.
    DROP TABLE IF EXISTS levels;
     
    -- Create the levels list.
    CREATE TABLE levels
    ( level      VARCHAR(16)
    , min_roles  INT
    , max_roles  INT );
     
    -- Insert values into the list table.
    INSERT INTO levels
    ( level, min_roles, max_roles )
    VALUES
     ('Hollywood Star', 30, 99999)
    ,('Prolific Actor', 20, 29)
    ,('Newcommer',1,19);

    After seeding the data in the levels table, you can test the query natively in MySQL, like this:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    -- Query the data.
    WITH actors AS
     (SELECT   a.actor_id
      ,        a.first_name
      ,        a.last_name
      ,        COUNT(*) AS num_roles
      FROM     actor a INNER JOIN film_actor fa
      ON       a.actor_id = fa.actor_id
      GROUP BY actor_id)
    SELECT   a.first_name
    ,        a.last_name
    ,        l.level
    ,        a.num_roles
    FROM     actors a CROSS JOIN levels l
    WHERE    a.num_roles BETWEEN l.min_roles AND l.max_roles
    ORDER BY a.last_name
    ,        a.first_name;

    There’s also a syntax that makes this type of query appear to be an INNER JOIN when it’s actually a filtered CROSS JOIN. If you adopt that syntax, you would rewrite lines 14 and 15:

    14
    15
    
    FROM   actors a INNER JOIN levels l
    WHERE  a.num_roles BETWEEN l.min_roles AND l.max_roles;

    Then, you can run this version without the second CTE element:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    
    # Import the library.
    import sys
    import mysql.connector
    from mysql.connector import errorcode
     
    #  Attempt the query.
    # ============================================================
    #  Use a try-catch block to manage the connection.
    # ============================================================
    try:
      # Open connection.
      cnx = mysql.connector.connect(user='student', password='student',
                                    host='127.0.0.1',
                                    database='sakila')
      # Create cursor.
      cursor = cnx.cursor()
     
      # Set the query statement.
      query = ("WITH actors AS "
               "(SELECT   a.actor_id "
               " ,        a.first_name "
               " ,        a.last_name "
               " ,        COUNT(fa.actor_id) AS num_roles "
               " FROM     actor a INNER JOIN film_actor fa "
               " ON       a.actor_id = fa.actor_id "
               " GROUP BY a.actor_id "
               " ,        a.first_name "
               " ,        a.last_name ) "
               " SELECT   a.first_name "
               " ,        a.last_name "
               " ,        l.level "
               " ,        a.num_roles "
               " FROM     actors a CROSS JOIN levels l "
               " WHERE    a.num_roles BETWEEN l.min_roles AND l.max_roles "
               " ORDER BY a.last_name "
               " ,        a.first_name")
     
      # Execute cursor.
      cursor.execute(query)
     
      # Display the rows returned by the query.
      for (first_name, last_name, level, num_roles) in cursor:
        print('{0} {1} is a {2} with {3} films.'.format( first_name.title()
                                                       , last_name.title()
                                                       , level.title()
                                                       , num_roles))
     
      # Close cursor.
      cursor.close()
     
    # ------------------------------------------------------------
    # Handle exception and close connection.
    except mysql.connector.Error as e:
      if e.errno == errorcode.ER_ACCESS_DENIED_ERROR:
        print("Something is wrong with your user name or password")
      elif e.errno == errorcode.ER_BAD_DB_ERROR:
        print("Database does not exist")
      else:
        print("Error code:", e.errno)        # error number
        print("SQLSTATE value:", e.sqlstate) # SQLSTATE value
        print("Error message:", e.msg)       # error message
     
    # Close the connection when the try block completes.
    else:
      cnx.close()

    As always, I hope this helps those trying to understand how CTEs can solve problems that would otherwise be coded in external imperative languages like Python.

Written by maclochlainn

November 3rd, 2021 at 10:01 am