Date and Time Functions [TOC]
Name
Description
[current_date]
Gives current date as a date column
[current_timestamp]
[date_format]
[to_date]
Converts column to date type (with an optional date format)
[to_timestamp]
Converts column to timestamp type (with an optional timestamp format)
[unix_timestamp]
Converts current or specified time to Unix timestamp (in seconds)
[window]
Generates time windows (i.e. tumbling, sliding and delayed windows)
current_date - Current Date As Date Column
current_date
function gives the current date as a [date] column.
1 2 3 4 5 6 7 8 9 10 11 val df = spark.range(1).select(current_date) scala> df.show +--------------+ |current_date()| +--------------+ | 2017-09-16| +--------------+ scala> df.printSchema root |-- current_date(): date (nullable = false)
Internally, current_date
creates a Column with CurrentDate
Catalyst leaf expression.
1 2 3 4 5 6 7 8 val c = current_date() import org.apache.spark.sql.catalyst.expressions.CurrentDate val cd = c.expr.asInstanceOf[CurrentDate] scala> println(cd.prettyName) current_date scala> println(cd.numberedTreeString) 00 current_date(None)
1 date_format(dateExpr: Column, format: String): Column
Internally, date_format
creates a Column with DateFormatClass
binary expression. DateFormatClass
takes the expression from dateExpr
column and format
.
1 2 3 4 5 6 7 8 9 10 11 val c = date_format($"date", "dd/MM/yyyy") import org.apache.spark.sql.catalyst.expressions.DateFormatClass val dfc = c.expr.asInstanceOf[DateFormatClass] scala> println(dfc.prettyName) date_format scala> println(dfc.numberedTreeString) 00 date_format('date, dd/MM/yyyy, None) 01 :- 'date 02 +- dd/MM/yyyy
current_timestamp 1 current_timestamp(): Column
Note : current_timestamp
is also now
function in SQL.
unix_timestamp - Converting Current or Specified Time to Unix Timestamp 1 2 3 unix_timestamp(): Column (1) unix_timestamp(time: Column): Column (2) unix_timestamp(time: Column, format: String): Column
Gives current timestamp (in seconds)
Converts time
string in format yyyy-MM-dd HH:mm:ss
to Unix timestamp (in seconds)
unix_timestamp
converts the current or specified time
in the specified format
to a Unix timestamp (in seconds).
unix_timestamp
supports a column of type Date
, Timestamp
or String
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 // no time and format => current time scala> spark.range(1).select(unix_timestamp as "current_timestamp").show +-----------------+ |current_timestamp| +-----------------+ | 1493362850| +-----------------+ // no format so yyyy-MM-dd HH:mm:ss assumed scala> Seq("2017-01-01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show +-------------------+--------------+ | time|unix_timestamp| +-------------------+--------------+ |2017-01-01 00:00:00| 1483225200| +-------------------+--------------+ scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time", "yyyy/MM/dd")).show +-------------------+--------------+ | time|unix_timestamp| +-------------------+--------------+ |2017/01/01 00:00:00| 1483225200| +-------------------+--------------+
unix_timestamp
returns null
if conversion fails.
1 2 3 4 5 6 7 // note slashes as date separators scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show +-------------------+--------------+ | time|unix_timestamp| +-------------------+--------------+ |2017/01/01 00:00:00| null| +-------------------+--------------+
Note: unix_timestamp
is also supported in [SQL mode]
1 2 3 scala> spark.sql("SELECT unix_timestamp() as unix_timestamp").show unix_timestamp 1493369225
Internally, unix_timestamp
creates a Column with UnixTimestamp binary expression (possibly with CurrentTimestamp
).
window - Generating Time Windows 1 2 3 4 5 6 7 8 9 10 11 12 window( timeColumn: Column, windowDuration: String): Column (1) window( timeColumn: Column, windowDuration: String, slideDuration: String): Column (2) window( timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column (3)
Creates a tumbling time window with slideDuration
as windowDuration
and 0 second
for startTime
Creates a sliding time window with 0 second
for startTime
Creates a delayed time window
window
generates tumbling , sliding or delayed time windows of windowDuration
duration given a timeColumn
timestamp specifying column.
Note: From Tumbling Window (Azure Stream Analytics) :Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. |
Note: From Introducing Stream Windows in Apache Flink :Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.Tumbling windows discretize a stream into non-overlapping windows.
1 2 scala> val timeColumn = window('time, "5 seconds") timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`
timeColumn
should be of TimestampType , i.e. with java.sql.Timestamp values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 // https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html import java.time.LocalDateTime // https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html import java.sql.Timestamp val levels = Seq( // (year, month, dayOfMonth, hour, minute, second) ((2012, 12, 12, 12, 12, 12), 5), ((2012, 12, 12, 12, 12, 14), 9), ((2012, 12, 12, 13, 13, 14), 4), ((2016, 8, 13, 0, 0, 0), 10), ((2017, 5, 27, 0, 0, 0), 15)). map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }. map { case (ts, a) => (Timestamp.valueOf(ts), a) }. toDF("time", "level") scala> levels.show +-------------------+-----+ | time|level| +-------------------+-----+ |2012-12-12 12:12:12| 5| |2012-12-12 12:12:14| 9| |2012-12-12 13:13:14| 4| |2016-08-13 00:00:00| 10| |2017-05-27 00:00:00| 15| +-------------------+-----+ val q = levels.select(window($"time", "5 seconds"), $"level") scala> q.show(truncate = false) +---------------------------------------------+-----+ |window |level| +---------------------------------------------+-----+ |[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 | |[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 | |[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 | |[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 | |[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 | +---------------------------------------------+-----+ scala> q.printSchema root |-- window: struct (nullable = true) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- level: integer (nullable = false) // calculating the sum of levels every 5 seconds val sums = levels. groupBy(window($"time", "5 seconds")). agg(sum("level") as "level_sum"). select("window.start", "window.end", "level_sum") scala> sums.show +-------------------+-------------------+---------+ | start| end|level_sum| +-------------------+-------------------+---------+ |2012-12-12 13:13:10|2012-12-12 13:13:15| 4| |2012-12-12 12:12:10|2012-12-12 12:12:15| 14| |2016-08-13 00:00:00|2016-08-13 00:00:05| 10| |2017-05-27 00:00:00|2017-05-27 00:00:05| 15| +-------------------+-------------------+---------+
windowDuration
and slideDuration
are strings specifying the width of the window for duration and sliding identifiers, respectively.
Use CalendarInterval
for valid window identifiers.
window
is available as of Spark 2.0.0 .
Internally, window
creates a Column (with TimeWindow expression) available as window
alias.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 // q is the query defined earlier scala> q.show(truncate = false) +---------------------------------------------+-----+ |window |level| +---------------------------------------------+-----+ |[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 | |[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 | |[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 | |[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 | |[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 | +---------------------------------------------+-----+ scala> println(timeColumn.expr.numberedTreeString) 00 timewindow('time, 5000000, 5000000, 0) AS window#22 01 +- timewindow('time, 5000000, 5000000, 0) 02 +- 'time
Example — Traffic Sensor The example is borrowed from Introducing Stream Windows in Apache Flink .
The example shows how to use window
function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.
to_date — Converting Column To DateType 1 2 to_date(e: Column): Column to_date(e: Column, fmt: String): Column
to_date
converts the column into DateType (by casting to DateType
).
fmt
follows the formatting styles .
Internally, to_date
creates a Column with ParseToDate expression (and Literal
expression for fmt
).
Use ParseToDate expression to use a column for the values of fmt
.
to_timestamp — Converting Column To TimestampType 1 2 to_timestamp(s: Column): Column to_timestamp(s: Column, fmt: String): Column
to_timestamp
converts the column into TimestampType (by casting to TimestampType
).
fmt
follows the formatting styles .
Internally, to_timestamp
creates a Column with ParseToTimestamp expression (and Literal
expression for fmt
).
Use ParseToTimestamp expression to use a column for the values of fmt
.