0%

Date and Time Functions

Date and Time Functions

[TOC]

Name Description
[current_date] Gives current date as a date column
[current_timestamp]
[date_format]
[to_date] Converts column to date type (with an optional date format)
[to_timestamp] Converts column to timestamp type (with an optional timestamp format)
[unix_timestamp] Converts current or specified time to Unix timestamp (in seconds)
[window] Generates time windows (i.e. tumbling, sliding and delayed windows)

current_date - Current Date As Date Column

1
current_date(): Column

current_date function gives the current date as a [date] column.

1
2
3
4
5
6
7
8
9
10
11
val df = spark.range(1).select(current_date)
scala> df.show
+--------------+
|current_date()|
+--------------+
| 2017-09-16|
+--------------+

scala> df.printSchema
root
|-- current_date(): date (nullable = false)

Internally, current_date creates a Column with CurrentDate Catalyst leaf expression.

1
2
3
4
5
6
7
8
val c = current_date()
import org.apache.spark.sql.catalyst.expressions.CurrentDate
val cd = c.expr.asInstanceOf[CurrentDate]
scala> println(cd.prettyName)
current_date

scala> println(cd.numberedTreeString)
00 current_date(None)

date_format

1
date_format(dateExpr: Column, format: String): Column

Internally, date_format creates a Column with DateFormatClass binary expression. DateFormatClass takes the expression from dateExpr column and format.

1
2
3
4
5
6
7
8
9
10
11
val c = date_format($"date", "dd/MM/yyyy")

import org.apache.spark.sql.catalyst.expressions.DateFormatClass
val dfc = c.expr.asInstanceOf[DateFormatClass]
scala> println(dfc.prettyName)
date_format

scala> println(dfc.numberedTreeString)
00 date_format('date, dd/MM/yyyy, None)
01 :- 'date
02 +- dd/MM/yyyy

current_timestamp

1
current_timestamp(): Column

Note : current_timestamp is also now function in SQL.

unix_timestamp - Converting Current or Specified Time to Unix Timestamp

1
2
3
unix_timestamp(): Column  (1)
unix_timestamp(time: Column): Column (2)
unix_timestamp(time: Column, format: String): Column
  1. Gives current timestamp (in seconds)
  2. Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds)

unix_timestamp converts the current or specified time in the specified format to a Unix timestamp (in seconds).

unix_timestamp supports a column of type Date, Timestamp or String.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// no time and format => current time
scala> spark.range(1).select(unix_timestamp as "current_timestamp").show
+-----------------+
|current_timestamp|
+-----------------+
| 1493362850|
+-----------------+

// no format so yyyy-MM-dd HH:mm:ss assumed
scala> Seq("2017-01-01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017-01-01 00:00:00| 1483225200|
+-------------------+--------------+

scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time", "yyyy/MM/dd")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00| 1483225200|
+-------------------+--------------+

unix_timestamp returns null if conversion fails.

1
2
3
4
5
6
7
// note slashes as date separators
scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00| null|
+-------------------+--------------+

Note: unix_timestamp is also supported in [SQL mode]

1
2
3
scala> spark.sql("SELECT unix_timestamp() as unix_timestamp").show 
unix_timestamp
1493369225

Internally, unix_timestamp creates a Column with UnixTimestamp binary expression (possibly with CurrentTimestamp).

window - Generating Time Windows 

1
2
3
4
5
6
7
8
9
10
11
12
window(
timeColumn: Column,
windowDuration: String): Column (1)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String): Column (2)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column (3)
  1. Creates a tumbling time window with slideDuration as windowDuration and 0 second for startTime
  2. Creates a sliding time window with 0 second for startTime
  3. Creates a delayed time window

window generates tumbling, sliding or delayed time windows of windowDuration duration given a timeColumn timestamp specifying column.

Note: From Tumbling Window (Azure Stream Analytics):Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. |

Note: From Introducing Stream Windows in Apache Flink:Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.Tumbling windows discretize a stream into non-overlapping windows.

1
2
scala> val timeColumn = window('time, "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

timeColumn should be of TimestampType, i.e. with java.sql.Timestamp values.

Tip Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
// (year, month, dayOfMonth, hour, minute, second)
((2012, 12, 12, 12, 12, 12), 5),
((2012, 12, 12, 12, 12, 14), 9),
((2012, 12, 12, 13, 13, 14), 4),
((2016, 8, 13, 0, 0, 0), 10),
((2017, 5, 27, 0, 0, 0), 15)).
map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
toDF("time", "level")
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+

val q = levels.select(window($"time", "5 seconds"), $"level")
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 |
+---------------------------------------------+-----+

scala> q.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- level: integer (nullable = false)

// calculating the sum of levels every 5 seconds
val sums = levels.
groupBy(window($"time", "5 seconds")).
agg(sum("level") as "level_sum").
select("window.start", "window.end", "level_sum")
scala> sums.show
+-------------------+-------------------+---------+
| start| end|level_sum|
+-------------------+-------------------+---------+
|2012-12-12 13:13:10|2012-12-12 13:13:15| 4|
|2012-12-12 12:12:10|2012-12-12 12:12:15| 14|
|2016-08-13 00:00:00|2016-08-13 00:00:05| 10|
|2017-05-27 00:00:00|2017-05-27 00:00:05| 15|
+-------------------+-------------------+---------+

windowDuration and slideDuration are strings specifying the width of the window for duration and sliding identifiers, respectively.

Use CalendarInterval for valid window identifiers.

window is available as of Spark 2.0.0.

Internally, window creates a Column (with TimeWindow expression) available as window alias.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// q is the query defined earlier
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 |
+---------------------------------------------+-----+

scala> println(timeColumn.expr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#22
01 +- timewindow('time, 5000000, 5000000, 0)
02 +- 'time

Example — Traffic Sensor

The example is borrowed from Introducing Stream Windows in Apache Flink.

The example shows how to use window function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.

to_date — Converting Column To DateType 

1
2
to_date(e: Column): Column
to_date(e: Column, fmt: String): Column

to_date converts the column into DateType (by casting to DateType).

fmt follows the formatting styles.

Internally, to_date creates a Column with ParseToDate expression (and Literal expression for fmt).

Use ParseToDate expression to use a column for the values of fmt.

to_timestamp — Converting Column To TimestampType 

1
2
to_timestamp(s: Column): Column
to_timestamp(s: Column, fmt: String): Column

to_timestamp converts the column into TimestampType (by casting to TimestampType).

fmt follows the formatting styles.

Internally, to_timestamp creates a Column with ParseToTimestamp expression (and Literalexpression for fmt).

Use ParseToTimestamp expression to use a column for the values of fmt.