CREATE CHANGEFEED is an Enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR.
The CREATE CHANGEFEED statement creates a new Enterprise changefeed, which targets an allowlist of tables called "watched rows".  Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka, Google Cloud Pub/Sub, a cloud storage sink, or a webhook sink). You can create, pause, resume, alter, or cancel an Enterprise changefeed.
We recommend reading the Changefeed Messages page for detail on understanding how changefeeds emit messages and Create and Configure Changefeeds for important usage considerations.
Required privileges
To create a changefeed, the user must be a member of the admin role or have the CREATECHANGEFEED parameter set.
Synopsis
Parameters
| Parameter | Description | 
|---|---|
| table_name | The name of the table (or tables in a comma separated list) to create a changefeed for. Note: Before creating a changefeed, consider the number of changefeeds versus the number of tables to include in a single changefeed. Each scenario can have an impact on total memory usage or changefeed performance. Refer to Create and Configure Changefeeds for more detail. | 
| sink | The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI. Note: If you create a changefeed without a sink, your changefeed will run as a core-style changefeed sending messages to the SQL client. For more detail, refer to the Create and Configure Changefeeds page. | 
| option/value | For a list of available options and their values, refer to Options. | 
Sink URI
The sink URI follows the basic format of:
'{scheme}://{host}:{port}?{query_parameters}'
| URI Component | Description | 
|---|---|
| scheme | The type of sink: kafka,gcpubsub, any cloud storage sink, or webhook sink. | 
| host | The sink's hostname or IP address. | 
| port | The sink's port. | 
| query_parameters | The sink's query parameters. | 
See Changefeed Sinks for considerations when using each sink and detail on configuration.
Kafka
Example of a Kafka sink URI:
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256'
Google Cloud Pub/Sub
The Google Cloud Pub/Sub sink is currently in beta.
New in v22.1: Example of a Google Cloud Pub/Sub sink URI:
'gcpubsub://{project name}?region={region}&topic_name={topic name}&AUTH=specified&CREDENTIALS={base64-encoded key}'
Use Cloud Storage for Bulk Operations explains the requirements for the authentication parameter with specified or implicit. See Changefeed Sinks for further consideration.
Cloud Storage
The following are example file URLs for each of the cloud storage schemes:
| Location | Example | 
|---|---|
| Amazon S3 | 's3://{BUCKET NAME}/{PATH}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}' | 
| Azure Blob Storage | 'azure://{CONTAINER NAME}/{PATH}?AZURE_ACCOUNT_NAME={ACCOUNT NAME}&AZURE_ACCOUNT_KEY={URL-ENCODED KEY}' | 
| Google Cloud | 'gs://{BUCKET NAME}/{PATH}?AUTH=specified&CREDENTIALS={ENCODED KEY' | 
| HTTP | 'http://localhost:8080/{PATH}' | 
Use Cloud Storage for Bulk Operations explains the requirements for authentication and encryption for each supported cloud storage sink. See Changefeed Sinks for considerations when using cloud storage.
Webhook
The webhook sink is currently in beta.
Example of a webhook URI:
'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
See Changefeed Sinks for specifics on webhook sink configuration.
Query parameters
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
Query parameters include:
| Parameter | Sink Type | Type | Description | 
|---|---|---|---|
| ca_cert | Kafka, webhook, (Confluent schema registry) | STRING | The base64-encoded ca_certfile. Specifyca_certfor a Kafka sink, webhook sink, and/or a Confluent schema registry.For usage with a Kafka sink, see Kafka Sink URI. It's necessary to state httpsin the schema registry's address when passingca_cert:confluent_schema_registry='https://schema_registry:8081?ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ'See confluent_schema_registryfor more detail on using this option.Note: To encode your ca.cert, runbase64 -w 0 ca.cert. | 
| client_cert | Kafka, webhook | STRING | The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key. | 
| client_key | Kafka, webhook | STRING | The base64-encoded private key for the PEM certificate. This is used with client_cert.Note: Client keys are often encrypted. You will receive an error if you pass an encrypted client key in your changefeed statement. To decrypt the client key, run: openssl rsa -in key.pem -out key.decrypt.pem -passin pass:{PASSWORD}. Once decrypted, be sure to update your changefeed statement to use the newkey.decrypt.pemfile instead. | 
| file_size | cloud | STRING | The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolvedoption, which flushes on a specified cadence.Default: 16MB | 
| insecure_tls_skip_verify | Kafka, webhook | BOOL | If true, disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication.Default: false | 
| partition_format | cloud | STRING | New in v22.1: Specify how changefeed file paths are partitioned in cloud storage sinks. Use partition_formatwith the following values:
 For example: CREATE CHANGEFEED FOR TABLE users INTO 'gs://...?AUTH...&partition_format=hourly'Default: daily | 
| S3_storage_class | Amazon S3 cloud storage sink | STRING | Specify the Amazon S3 storage class for files created by the changefeed. See Create a changefeed with an S3 storage class for the available classes and an example. Default: STANDARD | 
| sasl_enabled | Kafka | BOOL | If true, the authentication protocol can be set to SCRAM or PLAIN using thesasl_mechanismparameter. You must havetls_enabledset totrueto use SASL.Default: false | 
| sasl_mechanism | Kafka | STRING | Can be set to SCRAM-SHA-256,SCRAM-SHA-512, orPLAIN. Asasl_userandsasl_passwordare required.Default: PLAIN | 
| sasl_password | Kafka | STRING | Your SASL password. Note: Passwords should be URL encoded since the value can contain characters that would cause authentication to fail. | 
| sasl_user | Kafka | STRING | Your SASL username. | 
| topic_name | Kafka, GC Pub/Sub | STRING | Allows arbitrary topic naming for Kafka and GC Pub/Sub topics. See the Kafka topic naming limitations or GC Pub/Sub topic naming for detail on supported characters etc. For example, CREATE CHANGEFEED FOR foo,bar INTO 'kafka://sink?topic_name=all'will emit all records to a topic namedall. Note that schemas will still be registered separately. When using Kafka, this option can be combined with thetopic_prefixoption (this is not supported for GC Pub/Sub).Default: table name. | 
| topic_prefix | Kafka, cloud | STRING | Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'would emit rows under the topicbar_fooinstead offoo. | 
| tls_enabled | Kafka | BOOL | If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with aca_cert(see below).Default: false | 
Options
| Option | Value | Description | 
|---|---|---|
| avro_schema_prefix | Schema prefix name | Provide a namespace for the schema of a table in addition to the default, the table name. This allows multiple databases or clusters to share the same schema registry when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo WITH format=avro, confluent_schema_registry='registry_url', avro_schema_prefix='super'will register subjects assuperfoo-keyandsuperfoo-valuewith the namespacesuper. | 
| compression | gzip | Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression. | 
| confluent_schema_registry | Schema Registry address | The Schema Registry address is required to use avro.Use the timeout={duration}query parameter (duration string) in your Confluent Schema Registry URI to change the default timeout for contacting the schema registry. By default, the timeout is 30 seconds.To connect to Confluent Cloud, use the following URL structure: 'https://{API_KEY_ID}:{API_SECRET_URL_ENCODED}@{CONFLUENT_REGISTRY_URL}:443'. See the Stream a Changefeed to a Confluent Cloud Kafka Cluster tutorial for further detail. | 
| cursor | Timestamp | Emit any changes after the given timestamp, but does not output the current state of the table first. If cursoris not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.When starting a changefeed at a specific cursor, thecursorcannot be before the configured garbage collection window (seegc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.cursorcan be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' | 
| diff | N/A | Publish a beforefield with each message, which includes the value of the row before the update was applied. | 
| end_time | Timestamp | New in v22.1: Indicate the timestamp up to which the changefeed will emit all events and then complete with a successfulstatus. Provide a future timestamp toend_timein number of nanoseconds since the Unix epoch. For example,end_time="1655402400000000000". You cannot useend_timeandinitial_scan = 'only'simultaneously. | 
| envelope | key_only/row* /wrapped | key_onlyemits only the key and no value, which is faster if you only want to know when the key changes.rowemits the row without any additional metadata fields in the message. *You can only userowwith Kafka sinks or sinkless changefeeds.rowdoes not supportavroformat.wrappedemits the full message including any metadata fields. See Responses for more detail on message format.Default: envelope=wrapped | 
| format | json/avro/csv* | Format of the emitted record. For mappings of CockroachDB types to Avro types, see the table and detail on Avro limitations. New in v22.1: * format=csvworks only in combination withinitial_scan = 'only'. You cannot combineformat=csvwith thedifforresolvedoptions. See Export data with changefeeds for details using these options to create a changefeed as an alternative toEXPORT.Default: format=json. | 
| full_table_name | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases. Note: This option cannot modify existing table names used as topics, subjects, etc., as part of an ALTER CHANGEFEEDstatement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option.Example: CREATE CHANGEFEED FOR foo... WITH full_table_namewill create the topic namedefaultdb.public.fooinstead offoo. | 
| initial_scan | yes/no/only | Control whether or not an initial scan will occur at the start time of a changefeed. Only one initial_scanoption (yes,no, oronly) can be used. If none of these are set, an initial scan will occur if there is nocursor, and will not occur if there is one. This preserves the behavior from previous releases. Withinitial_scan = 'only'set, the changefeed job will end with a successful status (succeeded) after the initial scan completes. You cannot specifyyes,no,onlysimultaneously.If used in conjunction with cursor, an initial scan will be performed at the cursor timestamp. If nocursoris specified, the initial scan is performed atnow().Although the initial_scan/no_initial_scansyntax from previous versions is still supported, you cannot combine the previous and current syntax.Note: You cannot use the new initial_scan = "yes"/"no"/"only"syntax withALTER CHANGEFEEDin v22.1. To ensure that you can modify a changefeed with theinitial_scanoptions, use the previous syntax ofinitial_scan,no_initial_scan, andinitial_scan_only.Default: initial_scan = 'yes' | 
| kafka_sink_config | STRING | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. New in v22.1.12: Set the message file compression type. See Kafka sink configuration for more detail on configuring all the available fields for this option. Example: CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}' | 
| key_in_value | N/A | Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_valueis automatically used for cloud storage sinks, webhook sinks, and GC Pub/Sub sinks. | 
| metrics_label | STRING | This is an experimental feature. Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated. The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels. WITH metrics_label=label_nameFor more detail on usage and considerations, see Using changefeed metrics labels. | 
| min_checkpoint_frequency | Duration string | Controls how often nodes flush their progress to the coordinating changefeed node. Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to 0s, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, thenmin_checkpoint_frequencyis the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.Note: resolvedmessages will not be emitted more frequently than the configuredmin_checkpoint_frequency(but may be emitted less frequently). Sincemin_checkpoint_frequencydefaults to30s, you must configuremin_checkpoint_frequencyto at least the desiredresolvedmessage frequency if you requireresolvedmessages more frequently than30s.Default: 30s | 
| mvcc_timestamp | N/A | Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestampoption, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. | 
| on_error | pause/fail | Use on_error=pauseto pause the changefeed when encountering non-retryable errors.on_error=pausewill pause the changefeed instead of sending it into a terminal failure state. Note: Retryable errors will continue to be retried with this option specified.Use with protect_data_from_gc_on_pauseto protect changes from garbage collection.Default: on_error=fail | 
| protect_data_from_gc_on_pause | N/A | When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected. If protect_data_from_gc_on_pauseis unset, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and addingprotect_data_from_gc_on_pauseto a changefeed will not protect data if the garbage collection window has already passed.Use with on_error=pauseto protect changes from garbage collection when encountering non-retryable errors.See Garbage collection and changefeeds for more detail on protecting changefeed data. Note: If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. | 
| resolved | Duration string | Emits resolved timestamp events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s'. This option will only emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If unspecified, all resolved timestamps are emitted as the high-water mark advances.Note: If you require resolvedmessage frequency under30s, then you must set themin_checkpoint_frequencyoption to at least the desiredresolvedfrequency. This is becauseresolvedmessages will not be emitted more frequently thanmin_checkpoint_frequency, but may be emitted less frequently. | 
| schema_change_events | default/column_changes | The type of schema change event that triggers the behavior specified by the schema_change_policyoption:
 Default: schema_change_events=default | 
| schema_change_policy | backfill/nobackfill/stop | The behavior to take when an event specified by the schema_change_eventsoption occurs:
 Default: schema_change_policy=backfill | 
| split_column_families | N/A | Use this option to create a changefeed on a table with multiple column families. The changefeed will emit messages for each of the table's column families. See Changefeeds on tables with column families for more usage detail. | 
| topic_in_value | BOOL | Set to include the topic in each emitted row update. Note this is automatically set for webhook sinks. | 
| updated | N/A | Include updated timestamps with each row. If a cursoris provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If acursoris not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid. | 
| virtual_columns | STRING | New in v22.1: Changefeeds omit virtual computed columns from emitted messages by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit NULLvalues for virtual computed columns, setvirtual_columns = "null"when you start a changefeed.You may also define virtual_columns = "omitted", though this is already the default behavior for v22.1+. If you do not set"omitted"on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values.Default: "omitted" | 
| webhook_auth_header | STRING | Pass a value (password, token etc.) to the HTTP Authorization header with a webhook request for a "Basic" HTTP authentication scheme. Example: With a username of "user" and password of "pwd", add a colon between "user:pwd" and then base64 encode, which results in "dXNlcjpwd2Q=". WITH webhook_auth_header='Basic dXNlcjpwd2Q='. | 
| webhook_client_timeout | INTERVAL | If a response is not recorded from the sink within this timeframe, it will error and retry to connect. Note this must be a positive value. Default: "3s" | 
| webhook_sink_config | STRING | Set fields to configure sink batching and retries. The schema is as follows: { "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }.Note that if either MessagesorBytesare nonzero, then a non-zero value forFrequencymust be provided.See Webhook sink configuration for more details on using this option. | 
Using the format=avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.
Files
The files emitted to a sink use the following naming conventions:
The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL.
General file format
/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]
For example:
/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
New in v22.1:
 When emitting changefeed messages to a cloud storage sink, you can specify a partition format for your files using the partition_format query parameter. This will result in the following file path formats:
- daily: This is the default option and will follow the same pattern as the previous general file format.
- hourly: This will partition into an hourly directory as the changefeed emits messages, like the following:- /2020-04-02/20/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
- flat: This will result in no file partitioning. The cloud storage path you specify when creating a changefeed will store all of the message files with no additional directories created.
Resolved file format
/[date]/[timestamp].RESOLVED
For example:
/2020-04-04/202004042351304139680000000000000.RESOLVED
Examples
Before running any of the examples in this section it is necessary to enable the kv.rangefeed.enabled cluster setting. If you are working on a CockroachDB Serverless cluster, this cluster setting is enabled by default.
The following examples show the syntax for managing changefeeds and starting changefeeds to specific sinks. The Options table on this page provides a list of all the available options. For information on sink-specific query parameters and configurations see the Changefeed Sinks page.
Create a changefeed connected to Kafka
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to Kafka, see the Create a changefeed connected to Kafka example. The parameters table on the Changefeed Sinks page provides a list of all kafka-specific query parameters.
Create a changefeed connected to Kafka using Avro
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH format = avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see this step-by-step example. The parameters table on the Changefeed Sinks page provides a list of all kafka-specific query parameters.
Create a changefeed connected to a cloud storage sink
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'scheme://host?parameters'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to a cloud storage sink, see the Changefeed Examples page. The parameters table on the Changefeed Sinks page provides a list of the available cloud storage parameters.
Create a changefeed with an S3 storage class
New in v22.1:
 To associate the changefeed message files with a specific storage class in your Amazon S3 bucket, use the S3_STORAGE_CLASS parameter with the class. For example, the following S3 connection URI specifies the INTELLIGENT_TIERING storage class:
CREATE CHANGEFEED FOR TABLE name INTO 's3://{BUCKET NAME}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}&S3_STORAGE_CLASS=INTELLIGENT_TIERING' WITH resolved;
Use the parameter to set one of these storage classes listed in Amazon's documentation. For more general usage information, see Amazon's Using Amazon S3 storage classes documentation.
Create a changefeed connected to a Google Cloud Pub/Sub
The Google Cloud Pub/Sub sink is currently in beta.
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'gcpubsub://project name?parameters'
  WITH resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to a Google Cloud Pub/Sub, see the Changefeed Examples page. The parameters table on the Changefeed Sinks page provides a list of the available Google Cloud Pub/Sub parameters.
Create a changefeed connected to a webhook sink
The webhook sink is currently in beta — see usage considerations, available parameters, and options for more information.
CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
  WITH updated;
+---------------------+
|      job_id         |
----------------------+
| 687842491801632769  |
+---------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to a webhook sink, see the Changefeed Examples page. The parameters table on the Changefeed Sinks page provides a list of the available webhook parameters.
Manage a changefeed
For Enterprise changefeeds, use SHOW CHANGEFEED JOBS to check the status of your changefeed jobs:
> SHOW CHANGEFEED JOBS;
Use the following SQL statements to pause, resume, or cancel a changefeed.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB.
Modify a changefeed
To modify an Enterprise changefeed, pause the job and then use:
ALTER CHANGEFEED job_id {ADD table DROP table SET option UNSET option};
You can add new table targets, remove them, set new changefeed options, and unset them.
For more information, see ALTER CHANGEFEED.
Configuring all changefeeds
It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.
To pause all running changefeeds:
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));
This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.
To resume all running changefeeds:
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));
This will resume the changefeeds and update the status for each of the changefeeds to running.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)
Use the high_water_timestamp to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';
Note that because the cursor is provided, the initial scan is not performed.