bulkfhir

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package bulkfhir helps manage communication and with bulk fhir APIs. At the moment, much of this package is still geared around the BCDA API, but is in the process of being generalized further.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrorUnimplemented indicates that this method is currently unimplemented.
	ErrorUnimplemented = errors.New("method not implemented yet")
	// ErrorUnauthorized indicates that the server considers this client
	// unauthorized. While authenticators should renew credentials automatically
	// if required, time-of-check-to-time-of-use may mean that this error is still
	// the result of expired credentials. Clients should consider retrying the
	// operation if needed.
	ErrorUnauthorized = errors.New("server indicates this client is unauthorized")
	// ErrorTimeout indicates the operation timed out.
	ErrorTimeout = errors.New("this operation timed out")
	// ErrorExportJobNotFound indicates that the Job URL returned a 404 status.
	ErrorExportJobNotFound = errors.New("job URL returned 404 not found")
	// ErrorUnexpectedStatusCode indicates an unexpected status code was present.
	ErrorUnexpectedStatusCode = errors.New("unexpected non-ok HTTP status code")
	// ErrorGreaterThanOneContentLocation indicates more than 1 Content-Location header was present.
	ErrorGreaterThanOneContentLocation = errors.New("greater than 1 Content-Location header")
	// ErrorUnexpectedNumberOfXProgress indicated unexpected number of X-Progress headers present.
	ErrorUnexpectedNumberOfXProgress = errors.New("unexpected number of x-progress headers")
	// ErrorRetryableHTTPStatus may be wrapped into other errors emitted by this package
	// to indicate to the caller that a retryable http error code was returned
	// from the server.
	// TODO(b/239596656): consider adding auto-retry logic within this package.
	ErrorRetryableHTTPStatus = errors.New("this is a retryable but unexpected HTTP status code error")
)
View Source
var ErrUnsetTransactionTime = errors.New("TransactionTime.Set has not been called")

ErrUnsetTransactionTime is returned from TransactionTime.Get if it is called before TransactionTime.Set is called.

View Source
var ExportGroupAll = "all"

ExportGroupAll is a default group ID of "all" which can be supplied to StartBulkDataExport. Depending on your FHIR server, the all patients group ID may differ, so be sure to consult relevant documentation.

Functions

func ResourceTypeCodeFromName

func ResourceTypeCodeFromName(name string) (cpb.ResourceTypeCode_Value, error)

ResourceTypeCodeFromName returns the ResourceTypeCode for the given enum.

func ResourceTypeCodeToName

func ResourceTypeCodeToName(val cpb.ResourceTypeCode_Value) (string, error)

ResourceTypeCodeToName returns the FHIR resource name corresponding to the ResourceTypeCode.

Types

type Authenticator

type Authenticator interface {

	// Authenticate unconditionally performs any credential exchange required to
	// make requests. It is generally not necessary to call this method, as it
	// will be called automatically by AddAuthenticationToRequest if credentials
	// have not yet been exchanged or have expired.
	Authenticate(hc *http.Client) error

	// AuthenticateIfNecessary performs any credential exchange required to make
	// requests, if the credentials have expired or have not yet been exchanged.
	// This can be used if you need to track authentication errors, but does not
	// need to be called otherwise; authentication will be done automatically when
	// requests are made using AddAuthenticationToRequest.
	AuthenticateIfNecessary(hc *http.Client) error

	// Add authentication credentials to an outbound request. This may perform
	// additional requests to perform credential exchange if required by the
	// authentication mechanism, both before any initial request, and on
	// subsequent requests if any acquired credentials have expired.
	//
	// Implementations should call their own AuthenticateIfNecessary method if
	// credential exchange is necessary.
	AddAuthenticationToRequest(hc *http.Client, req *http.Request) error
}

Authenticator defines a module used for obtaining authentication credentials and attaching them to outbound requests to the Bulk FHIR APIs.

func NewHTTPBasicOAuthAuthenticator

func NewHTTPBasicOAuthAuthenticator(username, password, tokenURL string, opts *HTTPBasicOAuthOptions) (Authenticator, error)

NewHTTPBasicOAuthAuthenticator creates a new Authenticator which uses 2-legged OAuth with HTTP Basic authentication to obtain a bearer token. The username and password are typically a client ID and client secret (respectively) supplied by the Bulk FHIR Provider.

func NewJWTOAuthAuthenticator

func NewJWTOAuthAuthenticator(issuer, subject, tokenURL string, keyProvider JWTKeyProvider, opts *JWTOAuthOptions) (Authenticator, error)

NewJWTOAuthAuthenticator creates a new Authenticator which uses 2-legged OAuth with JWT authentication (according to RFC9068) to obtain a bearer token.

type BearerToken

type BearerToken struct {
	Token                        string
	Expiry                       time.Time
	AlwaysAuthenticateIfNoExpiry bool
}

BearerToken encapsulates a bearer token presented as an Authorization header.

func DoOAuthExchange

func DoOAuthExchange(hc *http.Client, req *http.Request, defaultExpiry time.Duration, alwaysAuthenticateIfNoExpiresIn bool) (*BearerToken, error)

DoOAuthExchange sends a HTTP request which is expected to return a JSON response with "token" and "expires_in" fields.

type BearerTokenAuthenticator

type BearerTokenAuthenticator struct {
	Exchanger CredentialExchanger
	// contains filtered or unexported fields
}

BearerTokenAuthenticator is an implementation of Authenticator which uses a CredentialExchanger to obtain a bearer token which is presented in an Authorization header.

Note: this implementation is not thread safe.

func (*BearerTokenAuthenticator) AddAuthenticationToRequest

func (bta *BearerTokenAuthenticator) AddAuthenticationToRequest(hc *http.Client, req *http.Request) error

AddAuthenticationToRequest is Authenticator.AddAuthenticationToRequest.

This Authenticator adds an access token as an Authorization: Bearer {token} header, automatically requesting/refreshing the token as necessary.

func (*BearerTokenAuthenticator) Authenticate

func (bta *BearerTokenAuthenticator) Authenticate(hc *http.Client) error

Authenticate is Authenticator.Authenticate.

This Authenticator uses the CredentialExchanger it contains to obtain a bearer token.

func (*BearerTokenAuthenticator) AuthenticateIfNecessary

func (bta *BearerTokenAuthenticator) AuthenticateIfNecessary(hc *http.Client) error

AuthenticateIfNecessary is Authenticator.AuthenticateIfNecessary.

This Authenticator uses the CredentialExchanger it contains to obtain a bearer token.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a Bulk FHIR API client at some API version.

func NewClient

func NewClient(baseURL string, authenticator Authenticator) (*Client, error)

NewClient creates and returns a new bulk fhir API Client for the input baseURL, using the given authenticator.

func (*Client) Authenticate

func (c *Client) Authenticate() error

Authenticate calls through to the Authenticator the client was built with to unconditionally perform credential exchange.

func (*Client) AuthenticateIfNecessary

func (c *Client) AuthenticateIfNecessary() error

AuthenticateIfNecessary calls through to the Authenticator the client was built with to perform credential exchange if necessary.

func (*Client) Close

func (c *Client) Close() error

Close is a placeholder for any cleanup actions needed for the Client. Please call this when finished with a Client.

func (*Client) GetData

func (c *Client) GetData(bcdaURL string) (dataStream io.ReadCloser, err error)

GetData retrieves the NDJSON data result from the provided BCDA result url. The caller must close the dataStream io.ReadCloser when finished.

func (*Client) JobStatus

func (c *Client) JobStatus(jobStatusURL string) (st JobStatus, err error)

JobStatus retrieves the current JobStatus via the bulk fhir API for the provided job status URL.

func (*Client) MonitorJobStatus

func (c *Client) MonitorJobStatus(jobStatusURL string, checkPeriod, timeout time.Duration) <-chan *MonitorResult

MonitorJobStatus will asynchronously check the status of job at the provided checkPeriod until either the job completes or until the timeout. Each time the job status is checked, a MonitorResult will be emitted to the returned channel for the caller to consume. When the timeout is reached or the job is completed, the final completed JobStatus will be sent to the channel (or the ErrorTimeout error), and the channel will be closed. If an ErrorUnauthroized is encountered, MonitorJobStatus will attempt to reauthenticate and continue trying.

func (*Client) StartBulkDataExport

func (c *Client) StartBulkDataExport(types []cpb.ResourceTypeCode_Value, since time.Time, groupID string) (jobStatusURL string, err error)

StartBulkDataExport starts a job via the bulk FHIR API to begin exporting the requested resource types since the provided timestamp for the provided group, and returns the URL to query the job status (from the response Content- Location header). StartBulkDataExportAll can be used if you wish to export all FHIR resources without a group ID.

func (*Client) StartBulkDataExportAll

func (c *Client) StartBulkDataExportAll(types []cpb.ResourceTypeCode_Value, since time.Time) (jobStatusURL string, err error)

StartBulkDataExportAll starts a job via the bulk FHIR to begin exporting the requested resource types since the provided timestamp for all patients and returns the URL to query the job status.

type CredentialExchanger

type CredentialExchanger interface {
	Authenticate(hc *http.Client) (*BearerToken, error)
}

CredentialExchanger is used by bearerTokenAuthenticator to exchange long-lived credentials for a short lived bearer token.

type HTTPBasicOAuthOptions

type HTTPBasicOAuthOptions struct {
	// OAuth scopes used when authenticating.
	Scopes []string

	// Whether the authenticator should always refresh if the authentication
	// server does not provide an "expires_in" duration in the response. The
	// default behaviour is to automatically authenticate upon first use (when
	// AuthenticateIfNecessary or AddAuthenticationToRequest is called), and then
	// to not authenticate again if no expiry time can be determined.
	//
	// Consider using DefaultExpiry instead to provide an expiry duration that is
	// used for determining the expiry time after each credential exchange.
	AlwaysAuthenticateIfNoExpiresIn bool

	// A default expiry duration to use if the authentication server does not
	// provide an "expires_in" duration in the response.
	DefaultExpiry time.Duration
}

HTTPBasicOAuthOptions contains optional parameters used by NewHTTPBasicOAuthAuthenticator.

type JWTKeyProvider

type JWTKeyProvider interface {
	Key() (*rsa.PrivateKey, error)
	KeyID() string
}

A JWTKeyProvider provides the RSA private key used for signing JSON Web Tokens.

func NewPEMFileKeyProvider

func NewPEMFileKeyProvider(filename, keyID string) JWTKeyProvider

NewPEMFileKeyProvider returns a JWTKeyProvider which reads a PEM-encoded key from the given file.

type JWTOAuthOptions

type JWTOAuthOptions struct {
	// How long the generated JWT is valid for (according to its "exp" claim).
	// Defaults to 5 minutes if unset.
	JWTLifetime time.Duration

	// OAuth scopes used when authenticating.
	Scopes []string

	// Whether the authenticator should always refresh if the authentication
	// server does not provide an "expires_in" duration in the response. The
	// default behaviour is to automatically authenticate upon first use (when
	// AuthenticateIfNecessary or AddAuthenticationToRequest is called), and then
	// to not authenticate again if no expiry time can be determined.
	//
	// Consider using DefaultExpiry instead to provide an expiry duration that is
	// used for determining the expiry time after each credential exchange.
	AlwaysAuthenticateIfNoExpiresIn bool

	// A default expiry duration to use if the authentication server does not
	// provide an "expires_in" duration in the response.
	DefaultExpiry time.Duration
}

JWTOAuthOptions contains optional parameters used by NewJWTOAuthAuthenticator.

type JobStatus

type JobStatus struct {
	IsComplete      bool
	PercentComplete int
	RetryAfter      time.Duration
	// ResultURLs holds the final NDJSON URLs for the job by resource type (if the job is complete).
	ResultURLs map[cpb.ResourceTypeCode_Value][]string
	// Indicates the FHIR server time when the bulk data export was processed.
	TransactionTime time.Time
}

JobStatus represents the current status of a bulk fhir export Job, returned from GetJobStatus.

type MonitorResult

type MonitorResult struct {
	// Status holdes the JobStatus
	Status JobStatus
	// Error holds an error associated with this entry (if any)
	Error error
}

MonitorResult holds either a JobStatus or an error.

type TransactionTime

type TransactionTime struct {
	// contains filtered or unexported fields
}

A TransactionTime holds the transaction time for a bulk FHIR export. It is used to allow constructing processing pipelines before the export operation is started; pipeline steps may hold a pointer to the TransactionTime, and call Get once they receive a resource to process or store (by which time the cache should have been populated).

func NewTransactionTime

func NewTransactionTime() *TransactionTime

NewTransactionTime returns a new TransactionTime.

func (*TransactionTime) Get

func (tt *TransactionTime) Get() (time.Time, error)

Get the timestamp from the cache. Returns an error if Set() has not yet been called.

func (*TransactionTime) Set

func (tt *TransactionTime) Set(timestamp time.Time)

Set the timestamp in the cache.

type TransactionTimeStore

type TransactionTimeStore interface {
	// Load a previously stored transaction time. If no transaction time has
	// previously been stored (i.e. if the program has never been successfully run
	// with the current configuration), this should return a zero time with no
	// error.
	Load(ctx context.Context) (time.Time, error)
	// Store() saves the given timestamp to persistent storage so that it can be
	// retrieved by Load() the next time the program is run.
	Store(ctx context.Context, ts time.Time) error
}

TransactionTimeStore manages the transaction time of Bulk FHIR fetches. The transaction timestamp of a successful export is saved so that it can be used as the _since parameter for the subsequent export.

func NewGCSTransactionTimeStore

func NewGCSTransactionTimeStore(ctx context.Context, gcsEndpoint, uri string) (TransactionTimeStore, error)

NewGCSTransactionTimeStore returns an implementation of TransactionTimeStore which persists the since timestamp to a file in GCS at the given URI. A new line is appended to the file on each run, so that the entire history of transaction times may be seen.

func NewInMemoryTransactionTimeStore

func NewInMemoryTransactionTimeStore(timestamp string) (TransactionTimeStore, error)

NewInMemoryTransactionTimeStore returns an implementation of TransactionTimeStore which does not persist the since timestamp anywhere. It is initialised with a string timestamp, which may be blank.

func NewLocalFileTransactionTimeStore

func NewLocalFileTransactionTimeStore(path string) TransactionTimeStore

NewLocalFileTransactionTimeStore returns an implementation of TransactionTimeStore which persists the since timestamp to a local file at the given path. A new line is appended to the file on each run, so that the entire history of transaction times may be seen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL